串流是處理二進位資料的重要抽象概念,無需一次將所有資料載入記憶體。它們通常用於讀取和寫入檔案、傳送和接收網路請求,以及處理大量資料。
Bun 實作了 Web API ReadableStream
和 WritableStream
。
Bun 也實作了 node:stream
模組,包括 Readable
、Writable
和 Duplex
。如需完整文件,請參閱 Node.js 文件。
建立簡單的 ReadableStream
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
可以使用 for await
語法逐塊讀取 ReadableStream
的內容。
for await (const chunk of stream) {
console.log(chunk);
// => "hello"
// => "world"
}
直接 ReadableStream
Bun 實作了最佳化版本的 ReadableStream
,可避免不必要的資料複製和佇列管理邏輯。使用傳統的 ReadableStream
,資料塊會被放入佇列。每個資料塊都會被複製到佇列中,並停留在佇列中,直到串流準備好傳送更多資料。
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
使用直接 ReadableStream
,資料塊會直接寫入串流。不會發生佇列,也無需將資料塊資料複製到記憶體中。controller
API 已更新以反映這一點;您呼叫 .write
而不是 .enqueue()
。
const stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hello");
controller.write("world");
},
});
當使用直接的 ReadableStream
時,所有區塊排隊都由目的地處理。串流的消費者會收到傳遞給 controller.write()
的確切內容,而不會進行任何編碼或修改。
非同步生成器串流
Bun 也支援非同步生成器函式作為 Response
和 Request
的來源。這是一種建立 ReadableStream
的簡單方法,可以從非同步來源提取資料。
const response = new Response(async function* () {
yield "hello";
yield "world";
}());
await response.text(); // "helloworld"
您也可以直接使用 [Symbol.asyncIterator]
。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});
await response.text(); // "helloworld"
如果您需要更精細地控制串流,yield
將會回傳直接的 ReadableStream 控制器。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
await response.text(); // "hello"
Bun.ArrayBufferSink
Bun.ArrayBufferSink
類別是一個快速的增量寫入器,用於建構未知大小的 ArrayBuffer
。
const sink = new Bun.ArrayBufferSink();
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]
若要改為以 Uint8Array
形式檢索資料,請將 asUint8Array
選項傳遞給 start
方法。
const sink = new Bun.ArrayBufferSink();
sink.start({
asUint8Array: true
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]
.write()
方法支援字串、類型化陣列、ArrayBuffer
和 SharedArrayBuffer
。
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);
sink.end();
一旦呼叫 .end()
,就無法再將資料寫入 ArrayBufferSink
。然而,在緩衝串流的上下文中,持續寫入資料並定期 .flush()
內容(例如,寫入 WriteableStream
)很有用。為了支援此功能,請將 stream: true
傳遞給建構函式。
const sink = new Bun.ArrayBufferSink();
sink.start({
stream: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]
sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]
.flush()
方法會將緩衝的資料以 ArrayBuffer
形式(或在 asUint8Array: true
的情況下為 Uint8Array
)回傳,並清除內部緩衝區。
若要手動設定內部緩衝區的大小(以位元組為單位),請傳遞 highWaterMark
的值。
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 1024, // 1 MB
});
參考