Bun.serve()
支援伺服器端 WebSockets,具備即時壓縮、TLS 支援,以及 Bun 原生的發布-訂閱 API。
⚡️ 吞吐量提升 7 倍 — Bun 的 WebSockets 速度飛快。對於 Linux x64 上的簡易聊天室,Bun 每秒可處理的請求數比 Node.js + "ws"
多出 7 倍。
每秒傳送的訊息數 | 執行階段 | 用戶端 |
---|---|---|
~700,000 | (Bun.serve ) Bun v0.2.1 (x64) | 16 |
~100,000 | (ws ) Node v18.10.0 (x64) | 16 |
在底層,Bun 的 WebSocket 實作是建立在 uWebSockets 之上。
啟動 WebSocket 伺服器
以下是以 Bun.serve
建置的簡易 WebSocket 伺服器範例,其中所有傳入的請求都會在 fetch
處理常式中升級為 WebSocket 連線。Socket 處理常式在 websocket
參數中宣告。
Bun.serve({
fetch(req, server) {
// upgrade the request to a WebSocket
if (server.upgrade(req)) {
return; // do not return a Response
}
return new Response("Upgrade failed", { status: 500 });
},
websocket: {}, // handlers
});
以下是支援的 WebSocket 事件處理常式:
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
message(ws, message) {}, // a message is received
open(ws) {}, // a socket is opened
close(ws, code, message) {}, // a socket is closed
drain(ws) {}, // the socket is ready to receive more data
},
});
專為速度設計的 API
每個處理常式的第一個引數是處理事件的 ServerWebSocket
實例。ServerWebSocket
類別是 WebSocket
的快速 Bun 原生實作,並具有一些額外功能。
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
message(ws, message) {
ws.send(message); // echo back the message
},
},
});
傳送訊息
每個 ServerWebSocket
實例都有一個 .send()
方法,用於將訊息傳送給用戶端。它支援多種輸入類型。
ws.send("Hello world"); // string
ws.send(response.arrayBuffer()); // ArrayBuffer
ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView
標頭
一旦升級成功,Bun 將依照規範傳送 101 Switching Protocols
回應。額外的 headers
可以附加到呼叫 server.upgrade()
中的這個 Response
。
Bun.serve({
fetch(req, server) {
const sessionId = await generateSessionId();
server.upgrade(req, {
headers: {
"Set-Cookie": `SessionId=${sessionId}`,
},
});
},
websocket: {}, // handlers
});
情境資料
情境 data
可以附加到 .upgrade()
呼叫中的新 WebSocket。此資料可在 WebSocket 處理器內的 ws.data
屬性上取得。
type WebSocketData = {
createdAt: number;
channelId: string;
authToken: string;
};
// TypeScript: specify the type of `data`
Bun.serve<WebSocketData>({
fetch(req, server) {
// use a library to parse cookies
const cookies = parseCookies(req.headers.get("Cookie"));
server.upgrade(req, {
// this object must conform to WebSocketData
data: {
createdAt: Date.now(),
channelId: new URL(req.url).searchParams.get("channelId"),
authToken: cookies["X-Token"],
},
});
return undefined;
},
websocket: {
// handler called when a message is received
async message(ws, message) {
const user = getUserFromToken(ws.data.authToken);
await saveMessageToDatabase({
channel: ws.data.channelId,
message: String(message),
userId: user.id,
});
},
},
});
若要從瀏覽器連線到此伺服器,請建立新的 WebSocket
。
const socket = new WebSocket("ws://127.0.0.1:3000/chat");
socket.addEventListener("message", event => {
console.log(event.data);
})
識別使用者 — 頁面上目前設定的 cookies 將會與 WebSocket 升級請求一起傳送,並可在 fetch
處理器中的 req.headers
上取得。解析這些 cookies 以判斷連線使用者的身分,並據此設定 data
的值。
發布/訂閱
Bun 的 ServerWebSocket
實作針對基於主題的廣播實作了原生發布-訂閱 API。個別的 sockets 可以 .subscribe()
訂閱一個主題(以字串識別符指定),並 .publish()
訊息給該主題的所有其他訂閱者(排除自身)。這種基於主題的廣播 API 類似於 MQTT 和 Redis Pub/Sub。
const server = Bun.serve<{ username: string }>({
fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === "/chat") {
console.log(`upgrade!`);
const username = getUsernameFromReq(req);
const success = server.upgrade(req, { data: { username } });
return success
? undefined
: new Response("WebSocket upgrade error", { status: 400 });
}
return new Response("Hello world");
},
websocket: {
open(ws) {
const msg = `${ws.data.username} has entered the chat`;
ws.subscribe("the-group-chat");
server.publish("the-group-chat", msg);
},
message(ws, message) {
// this is a group chat
// so the server re-broadcasts incoming message to everyone
server.publish("the-group-chat", `${ws.data.username}: ${message}`);
},
close(ws) {
const msg = `${ws.data.username} has left the chat`;
ws.unsubscribe("the-group-chat");
server.publish("the-group-chat", msg);
},
},
});
console.log(`Listening on ${server.hostname}:${server.port}`);
呼叫 .publish(data)
將會把訊息傳送給主題的所有訂閱者,除了 呼叫 .publish()
的 socket 之外。若要將訊息傳送給主題的所有訂閱者,請在 Server
實例上使用 .publish()
方法。
const server = Bun.serve({
websocket: {
// ...
},
});
// listen for some external event
server.publish("the-group-chat", "Hello world");
壓縮
可以使用 perMessageDeflate
參數啟用單一訊息 壓縮。
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
// enable compression and decompression
perMessageDeflate: true,
},
});
可以透過傳遞 boolean
作為 .send()
的第二個參數,為個別訊息啟用壓縮。
ws.send("Hello world", true);
若要對壓縮特性進行細緻的控制,請參閱參考。
背壓
ServerWebSocket
的 .send(message)
方法會傳回一個 number
,指示操作的結果。
-1
— 訊息已排入佇列,但存在背壓0
— 訊息因連線問題而丟棄1+
— 已傳送的位元組數
這讓您可以更好地控制伺服器中的背壓。
逾時與限制
預設情況下,如果 WebSocket 連線閒置 120 秒,Bun 將會關閉連線。可以使用 idleTimeout
參數設定此行為。
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
idleTimeout: 60, // 60 seconds
// ...
},
});
如果 Bun 收到大於 16 MB 的訊息,也會關閉 WebSocket 連線。可以使用 maxPayloadLength
參數設定此行為。
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
maxPayloadLength: 1024 * 1024, // 1 MB
// ...
},
});
連線到 Websocket
伺服器
Bun 實作了 WebSocket
類別。若要建立連線到 ws://
或 wss://
伺服器的 WebSocket 客戶端,請建立 WebSocket
的實例,就像在瀏覽器中一樣。
const socket = new WebSocket("ws://127.0.0.1:3000");
在瀏覽器中,頁面上目前設定的 cookies 將會與 WebSocket 升級請求一起傳送。這是 WebSocket
API 的標準功能。
為了方便起見,Bun 讓您可以直接在建構函式中設定自訂標頭。這是 WebSocket
標準的 Bun 特定擴充功能。這在瀏覽器中將無法運作。
const socket = new WebSocket("ws://127.0.0.1:3000", {
headers: {
// custom headers
},
});
若要將事件監聽器新增至 socket
// message is received
socket.addEventListener("message", event => {});
// socket opened
socket.addEventListener("open", event => {});
// socket closed
socket.addEventListener("close", event => {});
// error handler
socket.addEventListener("error", event => {});
參考
namespace Bun {
export function serve(params: {
fetch: (req: Request, server: Server) => Response | Promise<Response>;
websocket?: {
message: (
ws: ServerWebSocket,
message: string | ArrayBuffer | Uint8Array,
) => void;
open?: (ws: ServerWebSocket) => void;
close?: (ws: ServerWebSocket, code: number, reason: string) => void;
error?: (ws: ServerWebSocket, error: Error) => void;
drain?: (ws: ServerWebSocket) => void;
maxPayloadLength?: number; // default: 16 * 1024 * 1024 = 16 MB
idleTimeout?: number; // default: 120 (seconds)
backpressureLimit?: number; // default: 1024 * 1024 = 1 MB
closeOnBackpressureLimit?: boolean; // default: false
sendPings?: boolean; // default: true
publishToSelf?: boolean; // default: false
perMessageDeflate?:
| boolean
| {
compress?: boolean | Compressor;
decompress?: boolean | Compressor;
};
};
}): Server;
}
type Compressor =
| `"disable"`
| `"shared"`
| `"dedicated"`
| `"3KB"`
| `"4KB"`
| `"8KB"`
| `"16KB"`
| `"32KB"`
| `"64KB"`
| `"128KB"`
| `"256KB"`;
interface Server {
pendingWebSockets: number;
publish(
topic: string,
data: string | ArrayBufferView | ArrayBuffer,
compress?: boolean,
): number;
upgrade(
req: Request,
options?: {
headers?: HeadersInit;
data?: any;
},
): boolean;
}
interface ServerWebSocket {
readonly data: any;
readonly readyState: number;
readonly remoteAddress: string;
send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
close(code?: number, reason?: string): void;
subscribe(topic: string): void;
unsubscribe(topic: string): void;
publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
isSubscribed(topic: string): boolean;
cork(cb: (ws: ServerWebSocket) => void): void;
}