Grove

SSE Streaming

Real-time change propagation via Server-Sent Events with auto-reconnect, query parameters, and combined sync strategies.

Grove CRDT supports Server-Sent Events (SSE) for real-time change propagation alongside the periodic pull/push sync. SSE provides low-latency updates while periodic sync acts as a reliable fallback.

Overview

SSE streaming works alongside periodic sync:

┌──────────┐   SSE (real-time)   ┌──────────┐
│  Node A  │ ←────────────────── │  Node B  │
│          │   Pull/Push (poll)  │          │
│          │ ←──────────────────→│          │
└──────────┘                     └──────────┘
  • SSE delivers changes in real-time with sub-second latency
  • Pull/push runs periodically as a fallback to catch missed events
  • Together, they provide reliable, low-latency sync

Client-Side: StreamingTransport

The StreamingTransport extends HTTPClient with SSE streaming support:

transport := crdt.NewStreamingTransport(
    "https://cloud.example.com/sync",
    crdt.WithStreamTables("documents", "comments"),
    crdt.WithStreamReconnect(5 * time.Second),
    crdt.WithStreamLogger(logger),
)

Options

OptionDefaultDescription
WithStreamTables(tables...)All tablesRestrict which tables the stream subscribes to
WithStreamReconnect(duration)5sDelay before reconnecting after disconnection
WithStreamLogger(logger)slog.Default()Logger for stream events and errors

Streaming Changes

StreamChanges connects to the SSE endpoint and calls the handler for each change:

err := transport.StreamChanges(ctx, sinceHLC, func(change crdt.ChangeRecord) {
    // Process each change as it arrives.
    fmt.Printf("Change: %s.%s.%s = %s\n",
        change.Table, change.PK, change.Field, change.Value)
})

This method blocks until the context is cancelled. On disconnection, it auto-reconnects after the configured delay, using the latest received HLC as the starting point.

Combined Sync Strategy

For production use, run both periodic sync and SSE streaming:

transport := crdt.NewStreamingTransport(
    "https://cloud.example.com/sync",
    crdt.WithStreamTables("documents"),
)

syncer := crdt.NewSyncer(plugin,
    crdt.WithTransport(transport),
    crdt.WithSyncTables("documents"),
    crdt.WithSyncInterval(30 * time.Second),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start both in goroutines.
go syncer.Run(ctx)        // Periodic pull/push every 30s
go syncer.StreamSync(ctx) // SSE real-time stream

StreamSync automatically:

  • Connects to all peers with StreamingTransport
  • Merges incoming changes via the merge engine
  • Updates the local HLC clock
  • Tracks the last synced HLC per peer
  • Falls back gracefully if a peer doesn't support streaming

Server-Side: SyncController

The SyncController handles the SSE endpoint. It polls shadow tables for new changes and streams them to connected clients.

Forge Integration

With the Forge extension, the SSE endpoint is registered automatically at GET /sync/stream:

app.Use(groveext.New(
    groveext.WithDriver(pgdb),
    groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
    groveext.WithSyncController(
        crdt.WithStreamPollInterval(2 * time.Second),  // Check for new changes every 2s
        crdt.WithStreamKeepAlive(30 * time.Second),     // Send keep-alive every 30s
    ),
))

Controller Options

OptionDefaultDescription
WithStreamPollInterval(duration)1sHow often to check for new changes
WithStreamKeepAlive(duration)15sInterval for SSE keep-alive comments
WithControllerSyncHook(hook)NoneAdditional sync hooks for the controller

StreamChangesSince

For custom server implementations, use StreamChangesSince directly:

ctrl := crdt.NewSyncController(plugin,
    crdt.WithStreamPollInterval(1 * time.Second),
)

ch, err := ctrl.StreamChangesSince(ctx, []string{"documents"}, sinceHLC)
if err != nil {
    log.Fatal(err)
}

for changes := range ch {
    for _, change := range changes {
        // Send to client via SSE, WebSocket, etc.
        fmt.Printf("New change: %+v\n", change)
    }
}

SSE Event Format

The SSE stream uses standard Server-Sent Events format:

Single Change Event

event: change
data: {"table":"documents","pk":"doc-1","field":"title","crdt_type":"lww","hlc":{"ts":1706000001,"c":0,"node":"node-2"},"node_id":"node-2","value":"\"Updated\""}

Batch Change Event

event: changes
data: [{"table":"documents","pk":"doc-1",...},{"table":"documents","pk":"doc-2",...}]

Keep-Alive Comment

: keep-alive

SSE comments (lines starting with :) are used as keep-alive signals to prevent connection timeouts.

Presence Event

When presence is enabled on the server, the stream also includes "presence" events:

event: presence
data: {"type":"join","node_id":"browser-abc","topic":"documents:doc-1","data":{"name":"Alice"}}

event: presence
data: {"type":"update","node_id":"browser-abc","topic":"documents:doc-1","data":{"name":"Alice","isTyping":true}}

event: presence
data: {"type":"leave","node_id":"browser-abc","topic":"documents:doc-1"}

Event types: "join" (new node), "update" (data changed), "leave" (node left, disconnected, or TTL expired).

Automatic Disconnect Cleanup

When a client's SSE connection drops, the server automatically removes all presence entries for that client and broadcasts "leave" events. This requires the node_id query parameter on the stream URL so the server knows which client disconnected.

Query Parameters

The SSE endpoint accepts query parameters to filter the stream:

ParameterTypeDescription
tablesstringComma-separated table names
since_tsint64HLC timestamp (nanoseconds)
since_countuint32HLC counter
since_nodestringHLC node ID
node_idstringClient node ID (used for presence cleanup on disconnect)

Example URL:

GET /sync/stream?tables=documents,comments&since_ts=1706000000&since_count=0&since_node=node-1

TypeScript SSE Client

The TypeScript client (@grove-js/crdt) includes a CRDTStream class that uses fetch() + ReadableStream instead of EventSource to support custom headers:

import { CRDTStream } from "@grove-js/crdt";

const stream = new CRDTStream("https://api.example.com/sync", {
    tables: ["documents"],
    reconnectDelay: 5000,
});

stream.on((event) => {
    switch (event.type) {
        case "change":
            console.log("Single change:", event.data);
            break;
        case "changes":
            console.log("Batch:", event.data.length, "changes");
            break;
        case "presence":
            console.log("Presence:", event.data.type, event.data.node_id);
            break;
        case "connected":
            console.log("Connected to SSE stream");
            break;
        case "disconnected":
            console.log("Disconnected, will reconnect...");
            break;
        case "error":
            console.error("Stream error:", event.error);
            break;
    }
});

stream.connect();

// Later:
stream.disconnect();

For React integration, see the TypeScript Client page.

Error Handling

Client-Side

The StreamingTransport handles errors gracefully:

  • Connection failures: Logged and retried after reconnectDelay
  • Parse errors: Individual events are skipped, stream continues
  • Context cancellation: Clean shutdown, no reconnect attempt
  • Non-200 responses: Treated as connection failure, triggers reconnect

Server-Side

The SyncController handles errors in the poll loop:

  • Database errors: Logged, poll continues on next tick
  • Hook errors: Logged, changes for that batch are skipped
  • Client disconnect: Context cancelled, goroutine exits cleanly

On this page