WebSocket Transport
Bidirectional multiplexed CRDT sync over a single persistent WebSocket connection.
Grove CRDT supports WebSocket as a transport layer for sync operations. Unlike HTTP pull/push (request-response) or SSE (server-to-client streaming), WebSocket provides bidirectional, multiplexed communication over a single persistent connection. This reduces latency, eliminates connection overhead for frequent syncs, and enables real-time push from both client and server.
Overview
The WebSocket transport replaces (or complements) the HTTP and SSE transports:
| Feature | HTTP | SSE | WebSocket |
|---|---|---|---|
| Pull changes | Request-response | N/A | Multiplexed message |
| Push changes | Request-response | N/A | Multiplexed message |
| Real-time streaming | N/A | Server-to-client | Bidirectional |
| Presence updates | Separate POST | Via SSE events | Same connection |
| Connection overhead | Per request | One long-lived | One long-lived |
| Header support | Full | Limited | On handshake |
WebSocket is ideal for high-frequency sync scenarios, real-time collaboration with presence, and environments where reducing connection count matters.
Architecture
Client (Browser/Node) Server (Go)
───────────────────── ──────────────
WebSocketTransport WebSocketHandler
├── pull() ──── pull request ────→ handlePull()
│ ←─── pull response ───┤
├── push() ──── push request ────→ handlePush()
│ ←─── push response ───┤
├── subscribe() ──── subscribe msg ───→ handleSubscribe()
│ ←─── change events ───┤ (continuous)
├── presence() ──── presence msg ────→ handlePresence()
│ ←─── presence events ─┤
└── rooms() ──── room msg ────────→ handleRoom()
←─── room events ─────┤All operations are multiplexed over a single WebSocket connection. Each message includes a type and an optional request ID for correlating responses.
Message Types
Messages are JSON-encoded with a type field:
type WSMessageType string
const (
WSPullRequest WSMessageType = "pull_request"
WSPullResponse WSMessageType = "pull_response"
WSPushRequest WSMessageType = "push_request"
WSPushResponse WSMessageType = "push_response"
WSSubscribe WSMessageType = "subscribe"
WSChangeEvent WSMessageType = "change"
WSChangesEvent WSMessageType = "changes"
WSPresenceUpdate WSMessageType = "presence_update"
WSPresenceEvent WSMessageType = "presence_event"
WSRoomMessage WSMessageType = "room"
WSError WSMessageType = "error"
WSPing WSMessageType = "ping"
WSPong WSMessageType = "pong"
)Frame Format
{
"type": "pull_request",
"id": "req-1",
"payload": {
"tables": ["documents"],
"since": "1706000000000000000:0:node-1"
}
}| Field | Type | Description |
|---|---|---|
type | WSMessageType | Message type for routing |
id | string | Optional request ID for correlating request/response pairs |
payload | json.RawMessage | Type-specific payload |
Server Setup (Go)
WebSocketHandler
The WebSocketHandler handles incoming WebSocket connections and dispatches messages to the SyncController:
ctrl := crdt.NewSyncController(plugin,
crdt.WithPresenceEnabled(true),
)
defer ctrl.Close()
wsHandler := crdt.NewWebSocketHandler(ctrl)
// Register the WebSocket endpoint.
http.HandleFunc("/sync/ws", wsHandler.ServeHTTP)Forge Integration
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncController(
crdt.WithPresenceEnabled(true),
crdt.WithWebSocketEnabled(true),
),
))
// Auto-registers: /sync/ws (WebSocket endpoint)WebSocketConn Interface
The WebSocketHandler is not tied to a specific WebSocket library. It accepts any implementation of the WebSocketConn interface:
type WebSocketConn interface {
ReadMessage() (messageType int, p []byte, err error)
WriteMessage(messageType int, data []byte) error
Close() error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}This is compatible with both gorilla/websocket and nhooyr.io/websocket:
gorilla/websocket
import "github.com/gorilla/websocket"
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
http.HandleFunc("/sync/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
wsHandler.HandleConn(r.Context(), conn)
})nhooyr.io/websocket
import "nhooyr.io/websocket"
http.HandleFunc("/sync/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
return
}
// Wrap nhooyr conn to satisfy WebSocketConn interface.
wsHandler.HandleConn(r.Context(), crdt.NewNhooyrConn(conn))
})Client Setup (TypeScript)
WebSocketTransport
The WebSocketTransport implements both the Transport and StreamTransport interfaces over a single WebSocket connection:
import { CRDTClient, WebSocketTransport } from "@grove-js/crdt";
const wsTransport = new WebSocketTransport({
url: "wss://api.example.com/sync/ws",
nodeID: "browser-1",
reconnectDelay: 5000, // Reconnect delay in ms (default: 5000)
pingInterval: 30000, // Keepalive ping interval (default: 30000)
headers: { // Sent during handshake
Authorization: "Bearer <token>",
},
});
const client = new CRDTClient({
nodeID: "browser-1",
tables: ["documents"],
transport: wsTransport,
streamTransport: wsTransport, // Same instance for both
});
// Pull, push, and streaming all go over the WebSocket.
const { changes } = await client.pull();
await client.push(pendingChanges);
// Real-time subscription is automatic.
const stream = client.stream();
stream.on((event) => {
store.applyChanges(event.type === "change" ? [event.data] : event.data);
});
stream.connect();WebSocketTransport Options
| Option | Type | Default | Description |
|---|---|---|---|
url | string | Required | WebSocket server URL (ws:// or wss://) |
nodeID | string | Required | Client node ID |
reconnectDelay | number | 5000 | Delay before reconnect attempt (ms) |
maxReconnectDelay | number | 30000 | Maximum reconnect delay with backoff (ms) |
pingInterval | number | 30000 | Keepalive ping interval (ms) |
headers | Record<string, string> | {} | Headers sent during WebSocket handshake |
protocols | string[] | [] | WebSocket sub-protocols |
Connection Lifecycle
const ws = new WebSocketTransport({ url: "wss://...", nodeID: "browser-1" });
// Connect explicitly (or auto-connects on first operation).
await ws.connect();
// Check connection status.
console.log(ws.connected); // true
// Listen for connection events.
ws.on("connected", () => console.log("Connected"));
ws.on("disconnected", () => console.log("Disconnected"));
ws.on("reconnecting", (attempt) => console.log(`Reconnect attempt ${attempt}`));
ws.on("error", (err) => console.error("WebSocket error:", err));
// Disconnect.
ws.disconnect();React Integration
Use WebSocketTransport with CRDTProvider:
import { CRDTProvider } from "@grove-js/crdt/react";
import { WebSocketTransport } from "@grove-js/crdt";
const nodeID = `browser-${crypto.randomUUID()}`;
const wsTransport = new WebSocketTransport({
url: "wss://api.example.com/sync/ws",
nodeID,
});
function App() {
return (
<CRDTProvider config={{
nodeID,
tables: ["documents"],
transport: wsTransport,
streamTransport: wsTransport,
}}>
<DocumentList />
</CRDTProvider>
);
}Multiplexing
All operations share a single connection. The transport handles request/response correlation via message IDs:
Client Server
────── ──────
[pull_request id=1] ─────────────→ handlePull
[push_request id=2] ─────────────→ handlePush
←───────────── [pull_response id=1]
←───────────── [push_response id=2]
←───────────── [change event]
←───────────── [presence event]
[presence_update] ─────────────→ handlePresenceRequests and responses are matched by id. Events (changes, presence) are pushed without an id and dispatched to the appropriate subscriber.
Error Handling
WebSocket errors are delivered as error frames:
{
"type": "error",
"id": "req-1",
"payload": {
"code": "SYNC_VERSION_MISMATCH",
"message": "Protocol version 2 required, got 1"
}
}On the client, these are converted to typed errors:
try {
await client.pull();
} catch (err) {
if (err instanceof SyncError) {
console.error("Sync error:", err.code, err.message);
}
}Comparison with HTTP + SSE
| Scenario | Recommended Transport |
|---|---|
| Simple sync with occasional updates | HTTP pull/push |
| Real-time streaming (server-to-client only) | SSE |
| High-frequency bidirectional sync | WebSocket |
| Real-time collaboration with presence | WebSocket |
| Mobile apps with connection constraints | WebSocket |
| Serverless / edge deployments | HTTP pull/push |
You can also combine transports: use HTTP for initial sync and WebSocket for real-time updates, or fall back to HTTP + SSE when WebSocket is unavailable.