Bun

WebSockets

Bun.serve() 支援伺服器端 WebSockets,具備即時壓縮、TLS 支援,以及 Bun 原生的發布-訂閱 API。

⚡️ 吞吐量提升 7 倍 — Bun 的 WebSockets 速度飛快。對於 Linux x64 上的簡易聊天室,Bun 每秒可處理的請求數比 Node.js + "ws" 多出 7 倍。

每秒傳送的訊息數執行階段用戶端
~700,000(Bun.serve) Bun v0.2.1 (x64)16
~100,000(ws) Node v18.10.0 (x64)16

在底層,Bun 的 WebSocket 實作是建立在 uWebSockets 之上。

啟動 WebSocket 伺服器

以下是以 Bun.serve 建置的簡易 WebSocket 伺服器範例,其中所有傳入的請求都會在 fetch 處理常式中升級為 WebSocket 連線。Socket 處理常式在 websocket 參數中宣告。

Bun.serve({
  fetch(req, server) {
    // upgrade the request to a WebSocket
    if (server.upgrade(req)) {
      return; // do not return a Response
    }
    return new Response("Upgrade failed", { status: 500 });
  },
  websocket: {}, // handlers
});

以下是支援的 WebSocket 事件處理常式:

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    message(ws, message) {}, // a message is received
    open(ws) {}, // a socket is opened
    close(ws, code, message) {}, // a socket is closed
    drain(ws) {}, // the socket is ready to receive more data
  },
});

專為速度設計的 API

每個處理常式的第一個引數是處理事件的 ServerWebSocket 實例。ServerWebSocket 類別是 WebSocket 的快速 Bun 原生實作,並具有一些額外功能。

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    message(ws, message) {
      ws.send(message); // echo back the message
    },
  },
});

傳送訊息

每個 ServerWebSocket 實例都有一個 .send() 方法,用於將訊息傳送給用戶端。它支援多種輸入類型。

ws.send("Hello world"); // string
ws.send(response.arrayBuffer()); // ArrayBuffer
ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView

標頭

一旦升級成功,Bun 將依照規範傳送 101 Switching Protocols 回應。額外的 headers 可以附加到呼叫 server.upgrade() 中的這個 Response

Bun.serve({
  fetch(req, server) {
    const sessionId = await generateSessionId();
    server.upgrade(req, {
      headers: {
        "Set-Cookie": `SessionId=${sessionId}`,
      },
    });
  },
  websocket: {}, // handlers
});

情境資料

情境 data 可以附加到 .upgrade() 呼叫中的新 WebSocket。此資料可在 WebSocket 處理器內的 ws.data 屬性上取得。

type WebSocketData = {
  createdAt: number;
  channelId: string;
  authToken: string;
};

// TypeScript: specify the type of `data`
Bun.serve<WebSocketData>({
  fetch(req, server) {
    // use a library to parse cookies
    const cookies = parseCookies(req.headers.get("Cookie"));
    server.upgrade(req, {
      // this object must conform to WebSocketData
      data: {
        createdAt: Date.now(),
        channelId: new URL(req.url).searchParams.get("channelId"),
        authToken: cookies["X-Token"],
      },
    });

    return undefined;
  },
  websocket: {
    // handler called when a message is received
    async message(ws, message) {
      const user = getUserFromToken(ws.data.authToken);

      await saveMessageToDatabase({
        channel: ws.data.channelId,
        message: String(message),
        userId: user.id,
      });
    },
  },
});

若要從瀏覽器連線到此伺服器,請建立新的 WebSocket

browser.js
const socket = new WebSocket("ws://127.0.0.1:3000/chat");

socket.addEventListener("message", event => {
  console.log(event.data);
})

識別使用者 — 頁面上目前設定的 cookies 將會與 WebSocket 升級請求一起傳送,並可在 fetch 處理器中的 req.headers 上取得。解析這些 cookies 以判斷連線使用者的身分,並據此設定 data 的值。

發布/訂閱

Bun 的 ServerWebSocket 實作針對基於主題的廣播實作了原生發布-訂閱 API。個別的 sockets 可以 .subscribe() 訂閱一個主題(以字串識別符指定),並 .publish() 訊息給該主題的所有其他訂閱者(排除自身)。這種基於主題的廣播 API 類似於 MQTTRedis Pub/Sub

const server = Bun.serve<{ username: string }>({
  fetch(req, server) {
    const url = new URL(req.url);
    if (url.pathname === "/chat") {
      console.log(`upgrade!`);
      const username = getUsernameFromReq(req);
      const success = server.upgrade(req, { data: { username } });
      return success
        ? undefined
        : new Response("WebSocket upgrade error", { status: 400 });
    }

    return new Response("Hello world");
  },
  websocket: {
    open(ws) {
      const msg = `${ws.data.username} has entered the chat`;
      ws.subscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
    message(ws, message) {
      // this is a group chat
      // so the server re-broadcasts incoming message to everyone
      server.publish("the-group-chat", `${ws.data.username}: ${message}`);
    },
    close(ws) {
      const msg = `${ws.data.username} has left the chat`;
      ws.unsubscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
  },
});

console.log(`Listening on ${server.hostname}:${server.port}`);

呼叫 .publish(data) 將會把訊息傳送給主題的所有訂閱者,除了 呼叫 .publish() 的 socket 之外。若要將訊息傳送給主題的所有訂閱者,請在 Server 實例上使用 .publish() 方法。

const server = Bun.serve({
  websocket: {
    // ...
  },
});

// listen for some external event
server.publish("the-group-chat", "Hello world");

壓縮

可以使用 perMessageDeflate 參數啟用單一訊息 壓縮

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    // enable compression and decompression
    perMessageDeflate: true,
  },
});

可以透過傳遞 boolean 作為 .send() 的第二個參數,為個別訊息啟用壓縮。

ws.send("Hello world", true);

若要對壓縮特性進行細緻的控制,請參閱參考

背壓

ServerWebSocket.send(message) 方法會傳回一個 number,指示操作的結果。

  • -1 — 訊息已排入佇列,但存在背壓
  • 0 — 訊息因連線問題而丟棄
  • 1+ — 已傳送的位元組數

這讓您可以更好地控制伺服器中的背壓。

逾時與限制

預設情況下,如果 WebSocket 連線閒置 120 秒,Bun 將會關閉連線。可以使用 idleTimeout 參數設定此行為。

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    idleTimeout: 60, // 60 seconds

    // ...
  },
});

如果 Bun 收到大於 16 MB 的訊息,也會關閉 WebSocket 連線。可以使用 maxPayloadLength 參數設定此行為。

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    maxPayloadLength: 1024 * 1024, // 1 MB

    // ...
  },
});

連線到 Websocket 伺服器

Bun 實作了 WebSocket 類別。若要建立連線到 ws://wss:// 伺服器的 WebSocket 客戶端,請建立 WebSocket 的實例,就像在瀏覽器中一樣。

const socket = new WebSocket("ws://127.0.0.1:3000");

在瀏覽器中,頁面上目前設定的 cookies 將會與 WebSocket 升級請求一起傳送。這是 WebSocket API 的標準功能。

為了方便起見,Bun 讓您可以直接在建構函式中設定自訂標頭。這是 WebSocket 標準的 Bun 特定擴充功能。這在瀏覽器中將無法運作。

const socket = new WebSocket("ws://127.0.0.1:3000", {
  headers: {
    // custom headers
  },
});

若要將事件監聽器新增至 socket

// message is received
socket.addEventListener("message", event => {});

// socket opened
socket.addEventListener("open", event => {});

// socket closed
socket.addEventListener("close", event => {});

// error handler
socket.addEventListener("error", event => {});

參考

namespace Bun {
  export function serve(params: {
    fetch: (req: Request, server: Server) => Response | Promise<Response>;
    websocket?: {
      message: (
        ws: ServerWebSocket,
        message: string | ArrayBuffer | Uint8Array,
      ) => void;
      open?: (ws: ServerWebSocket) => void;
      close?: (ws: ServerWebSocket, code: number, reason: string) => void;
      error?: (ws: ServerWebSocket, error: Error) => void;
      drain?: (ws: ServerWebSocket) => void;

      maxPayloadLength?: number; // default: 16 * 1024 * 1024 = 16 MB
      idleTimeout?: number; // default: 120 (seconds)
      backpressureLimit?: number; // default: 1024 * 1024 = 1 MB
      closeOnBackpressureLimit?: boolean; // default: false
      sendPings?: boolean; // default: true
      publishToSelf?: boolean; // default: false

      perMessageDeflate?:
        | boolean
        | {
            compress?: boolean | Compressor;
            decompress?: boolean | Compressor;
          };
    };
  }): Server;
}

type Compressor =
  | `"disable"`
  | `"shared"`
  | `"dedicated"`
  | `"3KB"`
  | `"4KB"`
  | `"8KB"`
  | `"16KB"`
  | `"32KB"`
  | `"64KB"`
  | `"128KB"`
  | `"256KB"`;

interface Server {
  pendingWebSockets: number;
  publish(
    topic: string,
    data: string | ArrayBufferView | ArrayBuffer,
    compress?: boolean,
  ): number;
  upgrade(
    req: Request,
    options?: {
      headers?: HeadersInit;
      data?: any;
    },
  ): boolean;
}

interface ServerWebSocket {
  readonly data: any;
  readonly readyState: number;
  readonly remoteAddress: string;
  send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
  close(code?: number, reason?: string): void;
  subscribe(topic: string): void;
  unsubscribe(topic: string): void;
  publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
  isSubscribed(topic: string): boolean;
  cork(cb: (ws: ServerWebSocket) => void): void;
}