Web Streams API
History
No longer experimental.
Use of this API no longer emit a runtime warning.
An implementation of the WHATWG Streams Standard.
The WHATWG Streams Standard (or "web streams") defines an API for handling streaming data. It is similar to the Node.js Streams API but emerged later and has become the "standard" API for streaming data across many JavaScript environments.
There are three primary types of objects:
ReadableStream
- Represents a source of streaming data.WritableStream
- Represents a destination for streaming data.TransformStream
- Represents an algorithm for transforming streaming data.
This example creates a simple ReadableStream
that pushes the current performance.now()
timestamp once every second forever. An async iterable is used to read the data from the stream.
import {
ReadableStream,
} from 'node:stream/web';
import {
setInterval as every,
} from 'node:timers/promises';
import {
performance,
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
for await (const value of stream)
console.log(value);
Node.js streams can be converted to web streams and vice versa via the toWeb
and fromWeb
methods present on stream.Readable
, stream.Writable
and stream.Duplex
objects.
For more details refer to the relevant documentation:
stream.Readable.toWeb
stream.Readable.fromWeb
stream.Writable.toWeb
stream.Writable.fromWeb
stream.Duplex.toWeb
stream.Duplex.fromWeb
ReadableStream
History
This class is now exposed on the global object.
new ReadableStream(underlyingSource ?, strategy?)
Property | Type | Description | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
underlyingSource | <Object> | - | ||||||||||||||||||||||||||||||||||||||||||
| ||||||||||||||||||||||||||||||||||||||||||||
strategy | <Object> | - | ||||||||||||||||||||||||||||||||||||||||||
|
Property | Type | Description |
---|---|---|
- | <boolean> | Set to true if there is an active reader for this <ReadableStream> . |
The readableStream.locked
property is false
by default, and is switched to true
while there is an active reader consuming the stream's data.
readableStream.cancel(reason?): undefined
Property | Type | Description |
---|---|---|
reason | <any> | - |
Returns | - | A promise fulfilled with undefined once cancelation has been completed. |
readableStream.getReader(options?): ReadableStreamDefaultReader|ReadableStreamBYOBReader
Property | Type | Description | |||
---|---|---|---|---|---|
options | <Object> | - | |||
| |||||
Returns | <ReadableStreamDefaultReader> | <ReadableStreamBYOBReader> | - |
import { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());
Causes the readableStream.locked
to be true
.
readableStream.pipeThrough(transform, options?): ReadableStream
Property | Type | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
transform | <Object> | - | ||||||||||||
| ||||||||||||||
options | <Object> | - | ||||||||||||
| ||||||||||||||
Returns | <ReadableStream> | From transform.readable . |
Connects this <ReadableStream>
to the pair of <ReadableStream>
and <WritableStream>
provided in the transform
argument such that the data from this <ReadableStream>
is written in to transform.writable
, possibly transformed, then pushed to transform.readable
. Once the pipeline is configured, transform.readable
is returned.
Causes the readableStream.locked
to be true
while the pipe operation is active.
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: A
readableStream.pipeTo(destination, options?): undefined
Property | Type | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
destination | <WritableStream> | A <WritableStream> to which this ReadableStream 's data will be written. | ||||||||||||
options | <Object> | - | ||||||||||||
| ||||||||||||||
Returns | - | A promise fulfilled with undefined |
Causes the readableStream.locked
to be true
while the pipe operation is active.
readableStream.tee
History
Support teeing a readable byte stream.
readableStream.tee(): ReadableStream[]
Property | Type | Description |
---|---|---|
Returns | <ReadableStream[]> | - |
Returns a pair of new <ReadableStream>
instances to which this ReadableStream
's data will be forwarded. Each will receive the same data.
Causes the readableStream.locked
to be true
.
readableStream.values(options?)
Property | Type | Description | |||
---|---|---|---|---|---|
options | <Object> | - | |||
|
Creates and returns an async iterator usable for consuming this ReadableStream
's data.
Causes the readableStream.locked
to be true
while the async iterator is active.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());
The <ReadableStream>
object supports the async iterator protocol using for await
syntax.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());
The async iterator will consume the <ReadableStream>
until it terminates.
By default, if the async iterator exits early (via either a break
, return
, or a throw
), the <ReadableStream>
will be closed. To prevent automatic closing of the <ReadableStream>
, use the readableStream.values()
method to acquire the async iterator and set the preventCancel
option to true
.
The <ReadableStream>
must not be locked (that is, it must not have an existing active reader). During the async iteration, the <ReadableStream>
will be locked.
A <ReadableStream>
instance can be transferred using a <MessagePort>
.
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);
ReadableStream.from(iterable)
Property | Type | Description |
---|---|---|
iterable | <Iterable> | Object implementing the Symbol.asyncIterator or Symbol.iterator iterable protocol. |
A utility method that creates a new <ReadableStream>
from an iterable.
import { ReadableStream } from 'node:stream/web';
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'
To pipe the resulting <ReadableStream>
into a <WritableStream>
the <Iterable>
should yield a sequence of <Buffer>
, <TypedArray>
, or <DataView>
objects.
import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';
async function* asyncIterableGenerator() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
const stream = ReadableStream.from(asyncIterableGenerator());
await stream.pipeTo(createWritableStreamSomehow());
ReadableStreamDefaultReader
History
This class is now exposed on the global object.
By default, calling readableStream.getReader()
with no arguments will return an instance of ReadableStreamDefaultReader
. The default reader treats the chunks of data passed through the stream as opaque values, which allows the <ReadableStream>
to work with generally any JavaScript value.
new ReadableStreamDefaultReader(stream)
Property | Type | Description |
---|---|---|
stream | <ReadableStream> | - |
Creates a new <ReadableStreamDefaultReader>
that is locked to the given <ReadableStream>
.
readableStreamDefaultReader.cancel(reason?): undefined
Property | Type | Description |
---|---|---|
reason | <any> | - |
Returns | - | A promise fulfilled with undefined . |
Cancels the <ReadableStream>
and returns a promise that is fulfilled when the underlying stream has been canceled.
Property | Type | Description |
---|---|---|
- | <Promise> | Fulfilled with undefined when the associated <ReadableStream> is closed or rejected if the stream errors or the reader's lock is released before the stream finishes closing. |
read(): undefined
Requests the next chunk of data from the underlying <ReadableStream>
and returns a promise that is fulfilled with the data once it is available.
readableStreamDefaultReader.releaseLock()
Releases this reader's lock on the underlying <ReadableStream>
.
ReadableStreamBYOBReader
History
This class is now exposed on the global object.
The ReadableStreamBYOBReader
is an alternative consumer for byte-oriented <ReadableStream>
s (those that are created with underlyingSource.type
set equal to 'bytes'
when the ReadableStream
was created).
The BYOB
is short for "bring your own buffer". This is a pattern that allows for more efficient reading of byte-oriented data that avoids extraneous copying.
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)
Property | Type | Description |
---|---|---|
stream | <ReadableStream> | - |
Creates a new ReadableStreamBYOBReader
that is locked to the given <ReadableStream>
.
readableStreamBYOBReader.cancel(reason?): undefined
Property | Type | Description |
---|---|---|
reason | <any> | - |
Returns | - | A promise fulfilled with undefined . |
Cancels the <ReadableStream>
and returns a promise that is fulfilled when the underlying stream has been canceled.
Property | Type | Description |
---|---|---|
- | <Promise> | Fulfilled with undefined when the associated <ReadableStream> is closed or rejected if the stream errors or the reader's lock is released before the stream finishes closing. |
read(view, options?): undefined
Property | Type | Description | ||||||
---|---|---|---|---|---|---|---|---|
view | <Buffer> | <TypedArray> | <DataView> | - | ||||||
options | <Object> | - | ||||||
| ||||||||
Returns | - | A promise fulfilled with an object: | ||||||
|
Requests the next chunk of data from the underlying <ReadableStream>
and returns a promise that is fulfilled with the data once it is available.
Do not pass a pooled <Buffer>
object instance in to this method. Pooled Buffer
objects are created using Buffer.allocUnsafe()
, or Buffer.from()
, or are often returned by various node:fs
module callbacks. These types of Buffer
s use a shared underlying <ArrayBuffer>
object that contains all of the data from all of the pooled Buffer
instances. When a Buffer
, <TypedArray>
, or <DataView>
is passed in to readableStreamBYOBReader.read()
, the view's underlying ArrayBuffer
is detached, invalidating all existing views that may exist on that ArrayBuffer
. This can have disastrous consequences for your application.
readableStreamBYOBReader.releaseLock()
Releases this reader's lock on the underlying <ReadableStream>
.
Every <ReadableStream>
has a controller that is responsible for the internal state and management of the stream's queue. The ReadableStreamDefaultController
is the default controller implementation for ReadableStream
s that are not byte-oriented.
readableStreamDefaultController.close()
Closes the <ReadableStream>
to which this controller is associated.
Property | Type | Description |
---|---|---|
- | <number> | - |
Returns the amount of data remaining to fill the <ReadableStream>
's queue.
readableStreamDefaultController.enqueue(chunk?)
Property | Type | Description |
---|---|---|
chunk | <any> | - |
Appends a new chunk of data to the <ReadableStream>
's queue.
readableStreamDefaultController.error(error?)
Property | Type | Description |
---|---|---|
error | <any> | - |
Signals an error that causes the <ReadableStream>
to error and close.
ReadableByteStreamController
History
Support handling a BYOB pull request from a released reader.
Every <ReadableStream>
has a controller that is responsible for the internal state and management of the stream's queue. The ReadableByteStreamController
is for byte-oriented ReadableStream
s.
Property | Type | Description |
---|---|---|
- | <ReadableStreamBYOBRequest> | - |
readableByteStreamController.close()
Closes the <ReadableStream>
to which this controller is associated.
Property | Type | Description |
---|---|---|
- | <number> | - |
Returns the amount of data remaining to fill the <ReadableStream>
's queue.
readableByteStreamController.enqueue(chunk)
chunk
:<Buffer>
|<TypedArray>
|<DataView>
Appends a new chunk of data to the <ReadableStream>
's queue.
readableByteStreamController.error(error?)
Property | Type | Description |
---|---|---|
error | <any> | - |
Signals an error that causes the <ReadableStream>
to error and close.
ReadableStreamBYOBRequest
History
This class is now exposed on the global object.
When using ReadableByteStreamController
in byte-oriented streams, and when using the ReadableStreamBYOBReader
, the readableByteStreamController.byobRequest
property provides access to a ReadableStreamBYOBRequest
instance that represents the current read request. The object is used to gain access to the ArrayBuffer
/TypedArray
that has been provided for the read request to fill, and provides methods for signaling that the data has been provided.
readableStreamBYOBRequest.respond(bytesWritten)
Property | Type | Description |
---|---|---|
bytesWritten | <number> | - |
Signals that a bytesWritten
number of bytes have been written to readableStreamBYOBRequest.view
.
readableStreamBYOBRequest.respondWithNewView(view)
Property | Type | Description |
---|---|---|
view | <Buffer> | <TypedArray> | <DataView> | - |
Signals that the request has been fulfilled with bytes written to a new Buffer
, TypedArray
, or DataView
.
Property | Type | Description |
---|---|---|
- | <Buffer> | <TypedArray> | <DataView> | - |
WritableStream
History
This class is now exposed on the global object.
The WritableStream
is a destination to which stream data is sent.
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');
new WritableStream(underlyingSink?, strategy?)
Property | Type | Description | |||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
underlyingSink | <Object> | - | |||||||||||||||||||||||||||||||||||||||||||||||||||
| |||||||||||||||||||||||||||||||||||||||||||||||||||||
strategy | <Object> | - | |||||||||||||||||||||||||||||||||||||||||||||||||||
|
writableStream.abort(reason?): undefined
Property | Type | Description |
---|---|---|
reason | <any> | - |
Returns | - | A promise fulfilled with undefined . |
Abruptly terminates the WritableStream
. All queued writes will be canceled with their associated promises rejected.
writableStream.close(): undefined
Property | Type | Description |
---|---|---|
Returns | - | A promise fulfilled with undefined . |
Closes the WritableStream
when no additional writes are expected.
writableStream.getWriter(): WritableStreamDefaultWriter
Property | Type | Description |
---|---|---|
Returns | <WritableStreamDefaultWriter> | - |
Creates and returns a new writer instance that can be used to write data into the WritableStream
.
Property | Type | Description |
---|---|---|
- | <boolean> | - |
The writableStream.locked
property is false
by default, and is switched to true
while there is an active writer attached to this WritableStream
.
A <WritableStream>
instance can be transferred using a <MessagePort>
.
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);
WritableStreamDefaultWriter
History
This class is now exposed on the global object.
new WritableStreamDefaultWriter(stream)
Property | Type | Description |
---|---|---|
stream | <WritableStream> | - |
Creates a new WritableStreamDefaultWriter
that is locked to the given WritableStream
.
writableStreamDefaultWriter.abort(reason?): undefined
Property | Type | Description |
---|---|---|
reason | <any> | - |
Returns | - | A promise fulfilled with undefined . |
Abruptly terminates the WritableStream
. All queued writes will be canceled with their associated promises rejected.
writableStreamDefaultWriter.close(): undefined
Property | Type | Description |
---|---|---|
Returns | - | A promise fulfilled with undefined . |
Closes the WritableStream
when no additional writes are expected.
Property | Type | Description |
---|---|---|
- | <Promise> | Fulfilled with undefined when the associated <WritableStream> is closed or rejected if the stream errors or the writer's lock is released before the stream finishes closing. |
Property | Type | Description |
---|---|---|
- | <number> | - |
The amount of data required to fill the <WritableStream>
's queue.
Property | Type | Description |
---|---|---|
- | <Promise> | Fulfilled with undefined when the writer is ready to be used. |
writableStreamDefaultWriter.releaseLock()
Releases this writer's lock on the underlying <ReadableStream>
.
writableStreamDefaultWriter.write(chunk?)
chunk
:<any>
- Returns: A promise fulfilled with
undefined
.
Appends a new chunk of data to the <WritableStream>
's queue.
WritableStreamDefaultController
History
This class is now exposed on the global object.
The WritableStreamDefaultController
manages the <WritableStream>
's internal state.
writableStreamDefaultController.error(error?)
Property | Type | Description |
---|---|---|
error | <any> | - |
Called by user-code to signal that an error has occurred while processing the WritableStream
data. When called, the <WritableStream>
will be aborted, with currently pending writes canceled.
Property | Type | Description |
---|---|---|
- | <AbortSignal> | An AbortSignal that can be used to cancel pending write or close operations when a <WritableStream> is aborted. |
TransformStream
History
This class is now exposed on the global object.
A TransformStream
consists of a <ReadableStream>
and a <WritableStream>
that are connected such that the data written to the WritableStream
is received, and potentially transformed, before being pushed into the ReadableStream
's queue.
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);
new TransformStream(transformer?, writableStrategy?, readableStrategy?)
Property | Type | Description | |||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
transformer | <Object> | - | |||||||||||||||||||||||||||||||||||||||||||||
| |||||||||||||||||||||||||||||||||||||||||||||||
writableStrategy | <Object> | - | |||||||||||||||||||||||||||||||||||||||||||||
| |||||||||||||||||||||||||||||||||||||||||||||||
readableStrategy | <Object> | - | |||||||||||||||||||||||||||||||||||||||||||||
|
Property | Type | Description |
---|---|---|
- | <ReadableStream> | - |
Property | Type | Description |
---|---|---|
- | <WritableStream> | - |
A <TransformStream>
instance can be transferred using a <MessagePort>
.
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);
TransformStreamDefaultController
History
This class is now exposed on the global object.
The TransformStreamDefaultController
manages the internal state of the TransformStream
.
Property | Type | Description |
---|---|---|
- | <number> | - |
The amount of data required to fill the readable side's queue.
transformStreamDefaultController.enqueue(chunk?)
Property | Type | Description |
---|---|---|
chunk | <any> | - |
Appends a chunk of data to the readable side's queue.
transformStreamDefaultController.error(reason?)
Property | Type | Description |
---|---|---|
reason | <any> | - |
Signals to both the readable and writable side that an error has occurred while processing the transform data, causing both sides to be abruptly closed.
transformStreamDefaultController.terminate()
Closes the readable side of the transport and causes the writable side to be abruptly closed with an error.
ByteLengthQueuingStrategy
History
This class is now exposed on the global object.
new ByteLengthQueuingStrategy(init)
Property | Type | Description |
---|---|---|
- | <number> | - |
Property | Type | Description |
---|---|---|
- | <Function> | - |
CountQueuingStrategy
History
This class is now exposed on the global object.
new CountQueuingStrategy(init)
Property | Type | Description |
---|---|---|
- | <number> | - |
Property | Type | Description |
---|---|---|
- | <Function> | - |
TextEncoderStream
History
This class is now exposed on the global object.
new TextEncoderStream()
Creates a new TextEncoderStream
instance.
Property | Type | Description |
---|---|---|
- | <string> | - |
The encoding supported by the TextEncoderStream
instance.
Property | Type | Description |
---|---|---|
- | <ReadableStream> | - |
Property | Type | Description |
---|---|---|
- | <WritableStream> | - |
TextDecoderStream
History
This class is now exposed on the global object.
new TextDecoderStream(encoding?, options?)
Property | Type | Description | ||||||
---|---|---|---|---|---|---|---|---|
encoding | <string> | Identifies the encoding that this TextDecoder instance supports. Default: 'utf-8' . | ||||||
options | <Object> | - | ||||||
|
Creates a new TextDecoderStream
instance.
Property | Type | Description |
---|---|---|
- | <string> | - |
The encoding supported by the TextDecoderStream
instance.
Property | Type | Description |
---|---|---|
- | <boolean> | - |
The value will be true
if decoding errors result in a TypeError
being thrown.
Property | Type | Description |
---|---|---|
- | <boolean> | - |
The value will be true
if the decoding result will include the byte order mark.
Property | Type | Description |
---|---|---|
- | <ReadableStream> | - |
Property | Type | Description |
---|---|---|
- | <WritableStream> | - |
CompressionStream
History
This class is now exposed on the global object.
CompressionStream Constructor
History
format now accepts deflate-raw
value.
new CompressionStream(format)
Property | Type | Description |
---|---|---|
format | <string> | One of 'deflate' , 'deflate-raw' , or 'gzip' . |
Property | Type | Description |
---|---|---|
- | <ReadableStream> | - |
Property | Type | Description |
---|---|---|
- | <WritableStream> | - |
DecompressionStream
History
This class is now exposed on the global object.
DecompressionStream Constructor
History
format now accepts deflate-raw
value.
new DecompressionStream(format)
Property | Type | Description |
---|---|---|
format | <string> | One of 'deflate' , 'deflate-raw' , or 'gzip' . |
Property | Type | Description |
---|---|---|
- | <ReadableStream> | - |
Property | Type | Description |
---|---|---|
- | <WritableStream> | - |
Utility Consumers
History
The utility consumer functions provide common options for consuming streams.
They are accessed using:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';
streamConsumers.arrayBuffer(stream): Promise
Property | Type | Description |
---|---|---|
stream | <ReadableStream> | <stream.Readable> | <AsyncIterator> | - |
Returns | <Promise> | Fulfills with an ArrayBuffer containing the full contents of the stream. |
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
streamConsumers.blob(stream): Promise
Property | Type | Description |
---|---|---|
stream | <ReadableStream> | <stream.Readable> | <AsyncIterator> | - |
Returns | <Promise> | Fulfills with a <Blob> containing the full contents of the stream. |
import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
streamConsumers.buffer(stream): Promise
Property | Type | Description |
---|---|---|
stream | <ReadableStream> | <stream.Readable> | <AsyncIterator> | - |
Returns | <Promise> | Fulfills with a <Buffer> containing the full contents of the stream. |
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
streamConsumers.json(stream): Promise
Property | Type | Description |
---|---|---|
stream | <ReadableStream> | <stream.Readable> | <AsyncIterator> | - |
Returns | <Promise> | Fulfills with the contents of the stream parsed as a UTF-8 encoded string that is then passed through JSON.parse() . |
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
streamConsumers.text(stream): Promise
Property | Type | Description |
---|---|---|
stream | <ReadableStream> | <stream.Readable> | <AsyncIterator> | - |
Returns | <Promise> | Fulfills with the contents of the stream parsed as a UTF-8 encoded string. |
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27