SSE Streaming
Real-time change propagation via Server-Sent Events with auto-reconnect, query parameters, and combined sync strategies.
Grove CRDT supports Server-Sent Events (SSE) for real-time change propagation alongside the periodic pull/push sync. SSE provides low-latency updates while periodic sync acts as a reliable fallback.
Overview
SSE streaming works alongside periodic sync:
┌──────────┐ SSE (real-time) ┌──────────┐
│ Node A │ ←────────────────── │ Node B │
│ │ Pull/Push (poll) │ │
│ │ ←──────────────────→│ │
└──────────┘ └──────────┘- SSE delivers changes in real-time with sub-second latency
- Pull/push runs periodically as a fallback to catch missed events
- Together, they provide reliable, low-latency sync
Client-Side: StreamingTransport
The StreamingTransport extends HTTPClient with SSE streaming support:
transport := crdt.NewStreamingTransport(
"https://cloud.example.com/sync",
crdt.WithStreamTables("documents", "comments"),
crdt.WithStreamReconnect(5 * time.Second),
crdt.WithStreamLogger(logger),
)Options
| Option | Default | Description |
|---|---|---|
WithStreamTables(tables...) | All tables | Restrict which tables the stream subscribes to |
WithStreamReconnect(duration) | 5s | Delay before reconnecting after disconnection |
WithStreamLogger(logger) | slog.Default() | Logger for stream events and errors |
Streaming Changes
StreamChanges connects to the SSE endpoint and calls the handler for each change:
err := transport.StreamChanges(ctx, sinceHLC, func(change crdt.ChangeRecord) {
// Process each change as it arrives.
fmt.Printf("Change: %s.%s.%s = %s\n",
change.Table, change.PK, change.Field, change.Value)
})This method blocks until the context is cancelled. On disconnection, it auto-reconnects after the configured delay, using the latest received HLC as the starting point.
Combined Sync Strategy
For production use, run both periodic sync and SSE streaming:
transport := crdt.NewStreamingTransport(
"https://cloud.example.com/sync",
crdt.WithStreamTables("documents"),
)
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(transport),
crdt.WithSyncTables("documents"),
crdt.WithSyncInterval(30 * time.Second),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start both in goroutines.
go syncer.Run(ctx) // Periodic pull/push every 30s
go syncer.StreamSync(ctx) // SSE real-time streamStreamSync automatically:
- Connects to all peers with
StreamingTransport - Merges incoming changes via the merge engine
- Updates the local HLC clock
- Tracks the last synced HLC per peer
- Falls back gracefully if a peer doesn't support streaming
Server-Side: SyncController
The SyncController handles the SSE endpoint. It polls shadow tables for new changes and streams them to connected clients.
Forge Integration
With the Forge extension, the SSE endpoint is registered automatically at GET /sync/stream:
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncController(
crdt.WithStreamPollInterval(2 * time.Second), // Check for new changes every 2s
crdt.WithStreamKeepAlive(30 * time.Second), // Send keep-alive every 30s
),
))Controller Options
| Option | Default | Description |
|---|---|---|
WithStreamPollInterval(duration) | 1s | How often to check for new changes |
WithStreamKeepAlive(duration) | 15s | Interval for SSE keep-alive comments |
WithControllerSyncHook(hook) | None | Additional sync hooks for the controller |
StreamChangesSince
For custom server implementations, use StreamChangesSince directly:
ctrl := crdt.NewSyncController(plugin,
crdt.WithStreamPollInterval(1 * time.Second),
)
ch, err := ctrl.StreamChangesSince(ctx, []string{"documents"}, sinceHLC)
if err != nil {
log.Fatal(err)
}
for changes := range ch {
for _, change := range changes {
// Send to client via SSE, WebSocket, etc.
fmt.Printf("New change: %+v\n", change)
}
}SSE Event Format
The SSE stream uses standard Server-Sent Events format:
Single Change Event
event: change
data: {"table":"documents","pk":"doc-1","field":"title","crdt_type":"lww","hlc":{"ts":1706000001,"c":0,"node":"node-2"},"node_id":"node-2","value":"\"Updated\""}Batch Change Event
event: changes
data: [{"table":"documents","pk":"doc-1",...},{"table":"documents","pk":"doc-2",...}]Keep-Alive Comment
: keep-aliveSSE comments (lines starting with :) are used as keep-alive signals to prevent connection timeouts.
Presence Event
When presence is enabled on the server, the stream also includes "presence" events:
event: presence
data: {"type":"join","node_id":"browser-abc","topic":"documents:doc-1","data":{"name":"Alice"}}
event: presence
data: {"type":"update","node_id":"browser-abc","topic":"documents:doc-1","data":{"name":"Alice","isTyping":true}}
event: presence
data: {"type":"leave","node_id":"browser-abc","topic":"documents:doc-1"}Event types: "join" (new node), "update" (data changed), "leave" (node left, disconnected, or TTL expired).
Automatic Disconnect Cleanup
When a client's SSE connection drops, the server automatically removes all presence entries for that client and broadcasts "leave" events. This requires the node_id query parameter on the stream URL so the server knows which client disconnected.
Query Parameters
The SSE endpoint accepts query parameters to filter the stream:
| Parameter | Type | Description |
|---|---|---|
tables | string | Comma-separated table names |
since_ts | int64 | HLC timestamp (nanoseconds) |
since_count | uint32 | HLC counter |
since_node | string | HLC node ID |
node_id | string | Client node ID (used for presence cleanup on disconnect) |
Example URL:
GET /sync/stream?tables=documents,comments&since_ts=1706000000&since_count=0&since_node=node-1TypeScript SSE Client
The TypeScript client (@grove-js/crdt) includes a CRDTStream class that uses fetch() + ReadableStream instead of EventSource to support custom headers:
import { CRDTStream } from "@grove-js/crdt";
const stream = new CRDTStream("https://api.example.com/sync", {
tables: ["documents"],
reconnectDelay: 5000,
});
stream.on((event) => {
switch (event.type) {
case "change":
console.log("Single change:", event.data);
break;
case "changes":
console.log("Batch:", event.data.length, "changes");
break;
case "presence":
console.log("Presence:", event.data.type, event.data.node_id);
break;
case "connected":
console.log("Connected to SSE stream");
break;
case "disconnected":
console.log("Disconnected, will reconnect...");
break;
case "error":
console.error("Stream error:", event.error);
break;
}
});
stream.connect();
// Later:
stream.disconnect();For React integration, see the TypeScript Client page.
Error Handling
Client-Side
The StreamingTransport handles errors gracefully:
- Connection failures: Logged and retried after
reconnectDelay - Parse errors: Individual events are skipped, stream continues
- Context cancellation: Clean shutdown, no reconnect attempt
- Non-200 responses: Treated as connection failure, triggers reconnect
Server-Side
The SyncController handles errors in the poll loop:
- Database errors: Logged, poll continues on next tick
- Hook errors: Logged, changes for that batch are skipped
- Client disconnect: Context cancelled, goroutine exits cleanly