Grove

Sync Protocol

Transport interface, HTTP client, Syncer orchestration, multi-peer topologies, and sync hooks.

Grove CRDT sync uses a two-phase pull/push protocol over a pluggable transport layer. This page covers the transport interface, wire format, syncer orchestration, multi-peer topologies, and sync hooks.

Transport Interface

The Transport interface defines how nodes communicate:

type Transport interface {
    Pull(ctx context.Context, req *PullRequest) (*PullResponse, error)
    Push(ctx context.Context, req *PushRequest) (*PushResponse, error)
}

Grove provides two built-in transports:

  • HTTPClient for standard HTTP pull/push
  • StreamingTransport which adds SSE streaming on top of HTTP

You can implement custom transports for WebSocket, gRPC, NATS, Kafka, or any other protocol.

Wire Format

All sync communication uses JSON. The wire types are:

PullRequest

{
  "tables": ["documents", "comments"],
  "since": {"ts": 1706000000000000, "c": 0, "node": "node-1"},
  "node_id": "node-1"
}
FieldTypeDescription
tablesstring[]Tables to pull changes for
sinceHLCReturn changes after this HLC
node_idstringRequesting node identifier

PullResponse

{
  "changes": [
    {
      "table": "documents",
      "pk": "doc-1",
      "field": "title",
      "crdt_type": "lww",
      "hlc": {"ts": 1706000001000000, "c": 0, "node": "node-2"},
      "node_id": "node-2",
      "value": "\"Updated Title\""
    }
  ],
  "latest_hlc": {"ts": 1706000001000000, "c": 0, "node": "node-2"}
}

PushRequest

{
  "changes": [/* ChangeRecord[] */],
  "node_id": "node-1"
}

PushResponse

{
  "merged": 5,
  "latest_hlc": {"ts": 1706000002000000, "c": 0, "node": "node-2"}
}

ChangeRecord

A single field-level change record carries all the data needed for merge:

FieldTypeDescription
tablestringSource table
pkstringPrimary key
fieldstringField name
crdt_typestring"lww", "counter", or "set"
hlcHLCClock value of the change
node_idstringNode that produced the change
valuejson.RawMessageLWW value (omitted for counter/set)
tombstoneboolTrue if this is a delete
counter_deltaCounterDelta?Counter increment/decrement
set_opSetOperation?Set add/remove operation

HTTP Transport

The built-in HTTP transport communicates via POST /pull and POST /push:

// Basic HTTP transport.
transport := crdt.HTTPTransport("https://cloud.example.com/sync")

// With a custom HTTP client (for auth, timeouts, TLS).
httpClient := &http.Client{
    Timeout: 30 * time.Second,
    Transport: &http.Transport{
        TLSClientConfig: tlsConfig,
    },
}
transport := crdt.HTTPTransportWithClient("https://cloud.example.com/sync", httpClient)

Syncer

The Syncer orchestrates the two-phase sync protocol:

  1. Pull phase — Request changes from the remote node since the last sync point, merge them into local state using the merge engine
  2. Push phase — Read local changes since the last sync point, filter out echoed changes, push to the remote node

Single Sync

syncer := crdt.NewSyncer(plugin,
    crdt.WithTransport(transport),
    crdt.WithSyncTables("documents", "comments"),
)

report, err := syncer.Sync(ctx)
// report.Pulled = 5, report.Pushed = 3, report.Merged = 5

Background Loop

syncer := crdt.NewSyncer(plugin,
    crdt.WithTransport(transport),
    crdt.WithSyncTables("documents"),
    crdt.WithSyncInterval(30 * time.Second), // Default: 30s
)

// Blocks until ctx is cancelled.
go syncer.Run(ctx)

Real-Time Push

Push a single change to all peers immediately (useful for CDC-driven sync):

err := syncer.PushChange(ctx, "documents", "doc-1", "title", crdt.TypeLWW,
    json.RawMessage(`"New Title"`), plugin.Clock().Now())

Multi-Peer Topologies

Edge-to-Cloud (Single Peer)

The simplest topology: edge nodes sync with a central cloud node.

syncer := crdt.NewSyncer(plugin,
    crdt.WithTransport(crdt.HTTPTransport("https://cloud.example.com/sync")),
    crdt.WithSyncTables("documents"),
)

Hub-and-Spoke (Multiple Peers)

A central node syncs with multiple edge nodes:

syncer := crdt.NewSyncer(plugin,
    crdt.WithPeers(
        crdt.HTTPTransport("https://edge-1.example.com/sync"),
        crdt.HTTPTransport("https://edge-2.example.com/sync"),
        crdt.HTTPTransport("https://edge-3.example.com/sync"),
    ),
    crdt.WithSyncTables("documents"),
)

P2P (Peer-to-Peer)

Each node syncs with all other nodes. Use WithPeers on each node:

// On Node A:
syncer := crdt.NewSyncer(plugin,
    crdt.WithPeers(
        crdt.HTTPTransport("https://node-b.example.com/sync"),
        crdt.HTTPTransport("https://node-c.example.com/sync"),
    ),
    crdt.WithSyncTables("documents"),
)

Sync Hooks

Sync hooks intercept changes during sync operations for validation, transformation, filtering, or audit logging. Implement the SyncHook interface:

type SyncHook interface {
    BeforeInboundChange(ctx context.Context, change *ChangeRecord) (*ChangeRecord, error)
    AfterInboundChange(ctx context.Context, change *ChangeRecord) error
    BeforeOutboundChange(ctx context.Context, change *ChangeRecord) (*ChangeRecord, error)
    BeforeOutboundRead(ctx context.Context, changes []ChangeRecord) ([]ChangeRecord, error)
}
MethodWhen CalledReturn
BeforeInboundChangeBefore merging a remote change locallyModified change, nil to skip, error to abort
AfterInboundChangeAfter a remote change is mergedError (best-effort, doesn't abort)
BeforeOutboundChangeBefore pushing a local change to peersModified change, nil to skip, error to abort
BeforeOutboundReadBefore returning changes in a pull responseFiltered/modified slice

BaseSyncHook

Embed BaseSyncHook and override only the methods you need:

type TenantFilter struct {
    crdt.BaseSyncHook
    tenantID string
}

func (h *TenantFilter) BeforeInboundChange(ctx context.Context, c *crdt.ChangeRecord) (*crdt.ChangeRecord, error) {
    // Reject changes from other tenants.
    if c.Table != "tenant_"+h.tenantID+"_documents" {
        return nil, nil // Skip this change
    }
    return c, nil
}

func (h *TenantFilter) BeforeOutboundRead(ctx context.Context, changes []crdt.ChangeRecord) ([]crdt.ChangeRecord, error) {
    // Only send this tenant's changes.
    var filtered []crdt.ChangeRecord
    for _, c := range changes {
        if c.Table == "tenant_"+h.tenantID+"_documents" {
            filtered = append(filtered, c)
        }
    }
    return filtered, nil
}

Registering Hooks

// On the plugin (applies to syncer and controller).
plugin := crdt.New(
    crdt.WithNodeID("node-1"),
    crdt.WithSyncHook(&TenantFilter{tenantID: "acme"}),
    crdt.WithSyncHook(&AuditLogger{}),
)

// On the controller only (additional hooks for server-side).
ctrl := crdt.NewSyncController(plugin,
    crdt.WithControllerSyncHook(&RateLimiter{}),
)

Hook Chain

Multiple hooks are called in registration order. For Before* methods, the output of one hook becomes the input of the next. A nil return skips the change entirely. An error aborts the chain.

Standalone HTTP Handler

For apps not using Forge, create a standard http.Handler:

handler := crdt.NewHTTPHandler(plugin,
    crdt.WithStreamPollInterval(2 * time.Second),
)

mux := http.NewServeMux()
mux.Handle("/sync/", http.StripPrefix("/sync", handler))
http.ListenAndServe(":8080", mux)

This registers:

  • POST /sync/pull — Pull changes
  • POST /sync/push — Push changes

For SSE streaming with the standalone handler, see SSE Streaming.

Forge Integration

When using the Grove Forge extension, routes are registered automatically:

app.Use(groveext.New(
    groveext.WithDriver(pgdb),
    groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
    groveext.WithSyncer(syncer),
    groveext.WithSyncController(
        crdt.WithStreamPollInterval(2 * time.Second),
        crdt.WithStreamKeepAlive(30 * time.Second),
    ),
    groveext.WithBasePath("/api/sync"), // Custom prefix
))

See Forge Extension for the full configuration reference.

On this page