串流是一個重要的抽象化,用於處理二進制資料,而不會一次全部載入到記憶體中。它們通常用於讀寫檔案、傳送和接收網路請求,以及處理大量資料。
Bun 實作 Web API ReadableStream
和 WritableStream
。
Bun 也實作 node:stream
模組,包含 Readable
、Writable
和 Duplex
。有關完整文件,請參閱 Node.js 文件。
建立一個簡單的 ReadableStream
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
ReadableStream
的內容可以使用 for await
語法逐塊讀取。
for await (const chunk of stream) {
console.log(chunk);
// => "hello"
// => "world"
}
直接 ReadableStream
Bun 實作一個最佳化的 ReadableStream
版本,避免不必要的資料複製和佇列管理邏輯。使用傳統 ReadableStream
時,資料塊會被 加入佇列。每個塊會複製到一個佇列,直到串流準備傳送更多資料為止。
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
使用直接 ReadableStream
時,資料塊會直接寫入串流。不會發生佇列,也不需要將塊資料複製到記憶體中。controller
API 已更新以反映這一點;您會呼叫 .write
,而不是 .enqueue()
。
const stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hello");
controller.write("world");
},
});
使用直接 ReadableStream
時,所有塊佇列都由目的地處理。串流的使用者會收到傳遞給 controller.write()
的內容,而不會進行任何編碼或修改。
非同步產生器串流
Bun 也支援非同步產生器函式作為 Response
和 Request
的來源。這是建立從非同步來源擷取資料的 ReadableStream
的簡單方法。
const response = new Response(async function* () {
yield "hello";
yield "world";
}());
await response.text(); // "helloworld"
您也可以直接使用 [Symbol.asyncIterator]
。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});
await response.text(); // "helloworld"
如果您需要更精細地控制串流,yield
會傳回直接 ReadableStream 控制器。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
await response.text(); // "hello"
Bun.ArrayBufferSink
Bun.ArrayBufferSink
類別是一個快速增量寫入器,用於建構大小未知的 ArrayBuffer
。
const sink = new Bun.ArrayBufferSink();
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]
若要以 Uint8Array
形式擷取資料,請將 asUint8Array
選項傳遞給 start
方法。
const sink = new Bun.ArrayBufferSink();
sink.start({
asUint8Array: true
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]
.write()
方法支援字串、型別化陣列、ArrayBuffer
和 SharedArrayBuffer
。
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);
sink.end();
呼叫 .end()
後,無法再將更多資料寫入 ArrayBufferSink
。然而,在緩衝串流的背景下,持續寫入資料並定期 .flush()
內容(例如,寫入 WriteableStream
)很有用。若要支援這一點,請將 stream: true
傳遞給建構函式。
const sink = new Bun.ArrayBufferSink();
sink.start({
stream: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]
sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]
.flush()
方法會將緩衝資料傳回為 ArrayBuffer
(或 Uint8Array
,如果 asUint8Array: true
),並清除內部緩衝區。
若要手動設定內部緩衝區的大小(以位元組為單位),請傳遞 highWaterMark
的值
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 1024, // 1 MB
});
參考