Jump to headingpipeline<A extends PipelineSource<any>,B extends PipelineDestination<A, any>,>(): B extends WritableStream ? B : WritableStreamA module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.
import { pipeline } from 'node:stream';
import fs from 'node:fs';
import zlib from 'node:zlib';
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
},
);
The pipeline API provides a promise version.
stream.pipeline() will call stream.destroy(err) on all streams except:
Readablestreams which have emitted'end'or'close'.Writablestreams which have emitted'finish'or'close'.
stream.pipeline() leaves dangling event listeners on the streams
after the callback has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors. If the last
stream is readable, dangling event listeners will be removed so that the last
stream can be consumed later.
stream.pipeline() closes all the streams when an error is raised.
The IncomingRequest usage with pipeline could lead to an unexpected behavior
once it would destroy the socket without sending the expected response.
See the example below:
import fs from 'node:fs';
import http from 'node:http';
import { pipeline } from 'node:stream';
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});
Type Parameters Jump to heading
Jump to headingA extends PipelineSource<any>Jump to headingB extends PipelineDestination<A, any>Parameters Jump to heading
Jump to headingsource: AJump to headingdestination: BJump to headingcallback: PipelineCallback<B>Called when the pipeline is fully done.
Return Type Jump to heading
Jump to headingpipeline<A extends PipelineSource<any>,T1 extends PipelineTransform<A, any>,B extends PipelineDestination<T1, any>,>(): B extends WritableStream ? B : WritableStreamType Parameters Jump to heading
Jump to headingA extends PipelineSource<any>Jump to headingT1 extends PipelineTransform<A, any>Jump to headingB extends PipelineDestination<T1, any>Parameters Jump to heading
Jump to headingsource: AJump to headingtransform1: T1Jump to headingdestination: BJump to headingcallback: PipelineCallback<B>Return Type Jump to heading
Jump to headingpipeline<A extends PipelineSource<any>,T1 extends PipelineTransform<A, any>,T2 extends PipelineTransform<T1, any>,B extends PipelineDestination<T2, any>,>(): B extends WritableStream ? B : WritableStreamType Parameters Jump to heading
Jump to headingA extends PipelineSource<any>Jump to headingT1 extends PipelineTransform<A, any>Jump to headingT2 extends PipelineTransform<T1, any>Jump to headingB extends PipelineDestination<T2, any>Parameters Jump to heading
Jump to headingsource: AJump to headingtransform1: T1Jump to headingtransform2: T2Jump to headingdestination: BJump to headingcallback: PipelineCallback<B>Return Type Jump to heading
Jump to headingpipeline<A extends PipelineSource<any>,T1 extends PipelineTransform<A, any>,T2 extends PipelineTransform<T1, any>,T3 extends PipelineTransform<T2, any>,B extends PipelineDestination<T3, any>,>(source: A,transform1: T1,transform2: T2,transform3: T3,destination: B,callback: PipelineCallback<B>,): B extends WritableStream ? B : WritableStreamType Parameters Jump to heading
Jump to headingA extends PipelineSource<any>Jump to headingT1 extends PipelineTransform<A, any>Jump to headingT2 extends PipelineTransform<T1, any>Jump to headingT3 extends PipelineTransform<T2, any>Jump to headingB extends PipelineDestination<T3, any>Parameters Jump to heading
Jump to headingsource: AJump to headingtransform1: T1Jump to headingtransform2: T2Jump to headingtransform3: T3Jump to headingdestination: BJump to headingcallback: PipelineCallback<B>Return Type Jump to heading
Jump to headingpipeline<A extends PipelineSource<any>,T1 extends PipelineTransform<A, any>,T2 extends PipelineTransform<T1, any>,T3 extends PipelineTransform<T2, any>,T4 extends PipelineTransform<T3, any>,B extends PipelineDestination<T4, any>,>(source: A,transform1: T1,transform2: T2,transform3: T3,transform4: T4,destination: B,callback: PipelineCallback<B>,): B extends WritableStream ? B : WritableStreamType Parameters Jump to heading
Jump to headingA extends PipelineSource<any>Jump to headingT1 extends PipelineTransform<A, any>Jump to headingT2 extends PipelineTransform<T1, any>Jump to headingT3 extends PipelineTransform<T2, any>Jump to headingT4 extends PipelineTransform<T3, any>Jump to headingB extends PipelineDestination<T4, any>Parameters Jump to heading
Jump to headingsource: AJump to headingtransform1: T1Jump to headingtransform2: T2Jump to headingtransform3: T3Jump to headingtransform4: T4Jump to headingdestination: BJump to headingcallback: PipelineCallback<B>Return Type Jump to heading
Jump to headingpipeline(streams: ReadonlyArray<ReadableStream
| WritableStream
| ReadWriteStream>,callback: (err: ErrnoException | null) => void,): WritableStreamParameters Jump to heading
Jump to headingstreams: ReadonlyArray<ReadableStream
| WritableStream
| ReadWriteStream>Jump to headingcallback: (err: ErrnoException | null) => voidReturn Type Jump to heading
WritableStreamJump to headingpipeline(stream1: ReadableStream,stream2: ReadWriteStream | WritableStream,...streams: Array<ReadWriteStream
| WritableStream
| ((err: ErrnoException | null) => void)>,): WritableStreamParameters Jump to heading
Jump to headingstream1: ReadableStreamJump to headingstream2: ReadWriteStream | WritableStreamJump to heading<span>...streams</span>: Array<ReadWriteStream
| WritableStream
| ((err: ErrnoException | null) => void)>Return Type Jump to heading
WritableStream