ts
source
.pipeTo(
new WritableStream({
write(chunkOrHead) {
if (headDeferred) {
const head = chunkOrHead as Record<number | string, unknown>;
for (const [key, value] of Object.entries(chunkOrHead)) {
const parsed = decode(value as any);
head[key] = parsed;
}
headDeferred.resolve(head as THead);
headDeferred = null;
return;
}
const chunk = chunkOrHead as ChunkData;
const [idx] = chunk;
const controller = streamManager.getOrCreate(idx);
controller.enqueue(chunk);
},
close: () => closeOrAbort(new Error('Stream closed')),
abort: closeOrAbort,
}),
{
signal: opts.abortController.signal,
},
)
.catch((error) => {
opts.onError?.({ error });
closeOrAbort(error);
});
ts
source
.pipeTo(
new WritableStream({
write(chunkOrHead) {
if (headDeferred) {
const head = chunkOrHead as Record<number | string, unknown>;
for (const [key, value] of Object.entries(chunkOrHead)) {
const parsed = decode(value as any);
head[key] = parsed;
}
headDeferred.resolve(head as THead);
headDeferred = null;
return;
}
const chunk = chunkOrHead as ChunkData;
const [idx] = chunk;
const controller = streamManager.getOrCreate(idx);
controller.enqueue(chunk);
},
close: () => closeOrAbort(new Error('Stream closed')),
abort: closeOrAbort,
}),
{
signal: opts.abortController.signal,
},
)
.catch((error) => {
opts.onError?.({ error });
closeOrAbort(error);
});