Grove

WebSocket Transport

Bidirectional multiplexed CRDT sync over a single persistent WebSocket connection.

Grove CRDT supports WebSocket as a transport layer for sync operations. Unlike HTTP pull/push (request-response) or SSE (server-to-client streaming), WebSocket provides bidirectional, multiplexed communication over a single persistent connection. This reduces latency, eliminates connection overhead for frequent syncs, and enables real-time push from both client and server.

Overview

The WebSocket transport replaces (or complements) the HTTP and SSE transports:

FeatureHTTPSSEWebSocket
Pull changesRequest-responseN/AMultiplexed message
Push changesRequest-responseN/AMultiplexed message
Real-time streamingN/AServer-to-clientBidirectional
Presence updatesSeparate POSTVia SSE eventsSame connection
Connection overheadPer requestOne long-livedOne long-lived
Header supportFullLimitedOn handshake

WebSocket is ideal for high-frequency sync scenarios, real-time collaboration with presence, and environments where reducing connection count matters.

Architecture

Client (Browser/Node)                   Server (Go)
─────────────────────                   ──────────────
WebSocketTransport                      WebSocketHandler
  ├── pull()       ──── pull request ────→ handlePull()
  │                ←─── pull response ───┤
  ├── push()       ──── push request ────→ handlePush()
  │                ←─── push response ───┤
  ├── subscribe()  ──── subscribe msg ───→ handleSubscribe()
  │                ←─── change events ───┤ (continuous)
  ├── presence()   ──── presence msg ────→ handlePresence()
  │                ←─── presence events ─┤
  └── rooms()      ──── room msg ────────→ handleRoom()
                   ←─── room events ─────┤

All operations are multiplexed over a single WebSocket connection. Each message includes a type and an optional request ID for correlating responses.

Message Types

Messages are JSON-encoded with a type field:

type WSMessageType string

const (
    WSPullRequest      WSMessageType = "pull_request"
    WSPullResponse     WSMessageType = "pull_response"
    WSPushRequest      WSMessageType = "push_request"
    WSPushResponse     WSMessageType = "push_response"
    WSSubscribe        WSMessageType = "subscribe"
    WSChangeEvent      WSMessageType = "change"
    WSChangesEvent     WSMessageType = "changes"
    WSPresenceUpdate   WSMessageType = "presence_update"
    WSPresenceEvent    WSMessageType = "presence_event"
    WSRoomMessage      WSMessageType = "room"
    WSError            WSMessageType = "error"
    WSPing             WSMessageType = "ping"
    WSPong             WSMessageType = "pong"
)

Frame Format

{
  "type": "pull_request",
  "id": "req-1",
  "payload": {
    "tables": ["documents"],
    "since": "1706000000000000000:0:node-1"
  }
}
FieldTypeDescription
typeWSMessageTypeMessage type for routing
idstringOptional request ID for correlating request/response pairs
payloadjson.RawMessageType-specific payload

Server Setup (Go)

WebSocketHandler

The WebSocketHandler handles incoming WebSocket connections and dispatches messages to the SyncController:

ctrl := crdt.NewSyncController(plugin,
    crdt.WithPresenceEnabled(true),
)
defer ctrl.Close()

wsHandler := crdt.NewWebSocketHandler(ctrl)

// Register the WebSocket endpoint.
http.HandleFunc("/sync/ws", wsHandler.ServeHTTP)

Forge Integration

app.Use(groveext.New(
    groveext.WithDriver(pgdb),
    groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
    groveext.WithSyncController(
        crdt.WithPresenceEnabled(true),
        crdt.WithWebSocketEnabled(true),
    ),
))
// Auto-registers: /sync/ws (WebSocket endpoint)

WebSocketConn Interface

The WebSocketHandler is not tied to a specific WebSocket library. It accepts any implementation of the WebSocketConn interface:

type WebSocketConn interface {
    ReadMessage() (messageType int, p []byte, err error)
    WriteMessage(messageType int, data []byte) error
    Close() error
    SetReadDeadline(t time.Time) error
    SetWriteDeadline(t time.Time) error
}

This is compatible with both gorilla/websocket and nhooyr.io/websocket:

gorilla/websocket

import "github.com/gorilla/websocket"

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool { return true },
}

http.HandleFunc("/sync/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }
    wsHandler.HandleConn(r.Context(), conn)
})

nhooyr.io/websocket

import "nhooyr.io/websocket"

http.HandleFunc("/sync/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, err := websocket.Accept(w, r, nil)
    if err != nil {
        return
    }
    // Wrap nhooyr conn to satisfy WebSocketConn interface.
    wsHandler.HandleConn(r.Context(), crdt.NewNhooyrConn(conn))
})

Client Setup (TypeScript)

WebSocketTransport

The WebSocketTransport implements both the Transport and StreamTransport interfaces over a single WebSocket connection:

import { CRDTClient, WebSocketTransport } from "@grove-js/crdt";

const wsTransport = new WebSocketTransport({
    url: "wss://api.example.com/sync/ws",
    nodeID: "browser-1",
    reconnectDelay: 5000,        // Reconnect delay in ms (default: 5000)
    pingInterval: 30000,          // Keepalive ping interval (default: 30000)
    headers: {                    // Sent during handshake
        Authorization: "Bearer <token>",
    },
});

const client = new CRDTClient({
    nodeID: "browser-1",
    tables: ["documents"],
    transport: wsTransport,
    streamTransport: wsTransport,  // Same instance for both
});

// Pull, push, and streaming all go over the WebSocket.
const { changes } = await client.pull();
await client.push(pendingChanges);

// Real-time subscription is automatic.
const stream = client.stream();
stream.on((event) => {
    store.applyChanges(event.type === "change" ? [event.data] : event.data);
});
stream.connect();

WebSocketTransport Options

OptionTypeDefaultDescription
urlstringRequiredWebSocket server URL (ws:// or wss://)
nodeIDstringRequiredClient node ID
reconnectDelaynumber5000Delay before reconnect attempt (ms)
maxReconnectDelaynumber30000Maximum reconnect delay with backoff (ms)
pingIntervalnumber30000Keepalive ping interval (ms)
headersRecord<string, string>{}Headers sent during WebSocket handshake
protocolsstring[][]WebSocket sub-protocols

Connection Lifecycle

const ws = new WebSocketTransport({ url: "wss://...", nodeID: "browser-1" });

// Connect explicitly (or auto-connects on first operation).
await ws.connect();

// Check connection status.
console.log(ws.connected); // true

// Listen for connection events.
ws.on("connected", () => console.log("Connected"));
ws.on("disconnected", () => console.log("Disconnected"));
ws.on("reconnecting", (attempt) => console.log(`Reconnect attempt ${attempt}`));
ws.on("error", (err) => console.error("WebSocket error:", err));

// Disconnect.
ws.disconnect();

React Integration

Use WebSocketTransport with CRDTProvider:

import { CRDTProvider } from "@grove-js/crdt/react";
import { WebSocketTransport } from "@grove-js/crdt";

const nodeID = `browser-${crypto.randomUUID()}`;
const wsTransport = new WebSocketTransport({
    url: "wss://api.example.com/sync/ws",
    nodeID,
});

function App() {
    return (
        <CRDTProvider config={{
            nodeID,
            tables: ["documents"],
            transport: wsTransport,
            streamTransport: wsTransport,
        }}>
            <DocumentList />
        </CRDTProvider>
    );
}

Multiplexing

All operations share a single connection. The transport handles request/response correlation via message IDs:

Client                                Server
──────                                ──────
[pull_request  id=1] ─────────────→   handlePull
[push_request  id=2] ─────────────→   handlePush
                     ←───────────── [pull_response  id=1]
                     ←───────────── [push_response  id=2]
                     ←───────────── [change event]
                     ←───────────── [presence event]
[presence_update]    ─────────────→   handlePresence

Requests and responses are matched by id. Events (changes, presence) are pushed without an id and dispatched to the appropriate subscriber.

Error Handling

WebSocket errors are delivered as error frames:

{
  "type": "error",
  "id": "req-1",
  "payload": {
    "code": "SYNC_VERSION_MISMATCH",
    "message": "Protocol version 2 required, got 1"
  }
}

On the client, these are converted to typed errors:

try {
    await client.pull();
} catch (err) {
    if (err instanceof SyncError) {
        console.error("Sync error:", err.code, err.message);
    }
}

Comparison with HTTP + SSE

ScenarioRecommended Transport
Simple sync with occasional updatesHTTP pull/push
Real-time streaming (server-to-client only)SSE
High-frequency bidirectional syncWebSocket
Real-time collaboration with presenceWebSocket
Mobile apps with connection constraintsWebSocket
Serverless / edge deploymentsHTTP pull/push

You can also combine transports: use HTTP for initial sync and WebSocket for real-time updates, or fall back to HTTP + SSE when WebSocket is unavailable.

On this page