Sync Protocol
Transport interface, HTTP client, Syncer orchestration, multi-peer topologies, and sync hooks.
Grove CRDT sync uses a two-phase pull/push protocol over a pluggable transport layer. This page covers the transport interface, wire format, syncer orchestration, multi-peer topologies, and sync hooks.
Transport Interface
The Transport interface defines how nodes communicate:
type Transport interface {
Pull(ctx context.Context, req *PullRequest) (*PullResponse, error)
Push(ctx context.Context, req *PushRequest) (*PushResponse, error)
}Grove provides two built-in transports:
HTTPClientfor standard HTTP pull/pushStreamingTransportwhich adds SSE streaming on top of HTTP
You can implement custom transports for WebSocket, gRPC, NATS, Kafka, or any other protocol.
Wire Format
All sync communication uses JSON. The wire types are:
PullRequest
{
"tables": ["documents", "comments"],
"since": {"ts": 1706000000000000, "c": 0, "node": "node-1"},
"node_id": "node-1"
}| Field | Type | Description |
|---|---|---|
tables | string[] | Tables to pull changes for |
since | HLC | Return changes after this HLC |
node_id | string | Requesting node identifier |
PullResponse
{
"changes": [
{
"table": "documents",
"pk": "doc-1",
"field": "title",
"crdt_type": "lww",
"hlc": {"ts": 1706000001000000, "c": 0, "node": "node-2"},
"node_id": "node-2",
"value": "\"Updated Title\""
}
],
"latest_hlc": {"ts": 1706000001000000, "c": 0, "node": "node-2"}
}PushRequest
{
"changes": [/* ChangeRecord[] */],
"node_id": "node-1"
}PushResponse
{
"merged": 5,
"latest_hlc": {"ts": 1706000002000000, "c": 0, "node": "node-2"}
}ChangeRecord
A single field-level change record carries all the data needed for merge:
| Field | Type | Description |
|---|---|---|
table | string | Source table |
pk | string | Primary key |
field | string | Field name |
crdt_type | string | "lww", "counter", or "set" |
hlc | HLC | Clock value of the change |
node_id | string | Node that produced the change |
value | json.RawMessage | LWW value (omitted for counter/set) |
tombstone | bool | True if this is a delete |
counter_delta | CounterDelta? | Counter increment/decrement |
set_op | SetOperation? | Set add/remove operation |
HTTP Transport
The built-in HTTP transport communicates via POST /pull and POST /push:
// Basic HTTP transport.
transport := crdt.HTTPTransport("https://cloud.example.com/sync")
// With a custom HTTP client (for auth, timeouts, TLS).
httpClient := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
transport := crdt.HTTPTransportWithClient("https://cloud.example.com/sync", httpClient)Syncer
The Syncer orchestrates the two-phase sync protocol:
- Pull phase — Request changes from the remote node since the last sync point, merge them into local state using the merge engine
- Push phase — Read local changes since the last sync point, filter out echoed changes, push to the remote node
Single Sync
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(transport),
crdt.WithSyncTables("documents", "comments"),
)
report, err := syncer.Sync(ctx)
// report.Pulled = 5, report.Pushed = 3, report.Merged = 5Background Loop
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(transport),
crdt.WithSyncTables("documents"),
crdt.WithSyncInterval(30 * time.Second), // Default: 30s
)
// Blocks until ctx is cancelled.
go syncer.Run(ctx)Real-Time Push
Push a single change to all peers immediately (useful for CDC-driven sync):
err := syncer.PushChange(ctx, "documents", "doc-1", "title", crdt.TypeLWW,
json.RawMessage(`"New Title"`), plugin.Clock().Now())Multi-Peer Topologies
Edge-to-Cloud (Single Peer)
The simplest topology: edge nodes sync with a central cloud node.
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(crdt.HTTPTransport("https://cloud.example.com/sync")),
crdt.WithSyncTables("documents"),
)Hub-and-Spoke (Multiple Peers)
A central node syncs with multiple edge nodes:
syncer := crdt.NewSyncer(plugin,
crdt.WithPeers(
crdt.HTTPTransport("https://edge-1.example.com/sync"),
crdt.HTTPTransport("https://edge-2.example.com/sync"),
crdt.HTTPTransport("https://edge-3.example.com/sync"),
),
crdt.WithSyncTables("documents"),
)P2P (Peer-to-Peer)
Each node syncs with all other nodes. Use WithPeers on each node:
// On Node A:
syncer := crdt.NewSyncer(plugin,
crdt.WithPeers(
crdt.HTTPTransport("https://node-b.example.com/sync"),
crdt.HTTPTransport("https://node-c.example.com/sync"),
),
crdt.WithSyncTables("documents"),
)Sync Hooks
Sync hooks intercept changes during sync operations for validation, transformation, filtering, or audit logging. Implement the SyncHook interface:
type SyncHook interface {
BeforeInboundChange(ctx context.Context, change *ChangeRecord) (*ChangeRecord, error)
AfterInboundChange(ctx context.Context, change *ChangeRecord) error
BeforeOutboundChange(ctx context.Context, change *ChangeRecord) (*ChangeRecord, error)
BeforeOutboundRead(ctx context.Context, changes []ChangeRecord) ([]ChangeRecord, error)
}| Method | When Called | Return |
|---|---|---|
BeforeInboundChange | Before merging a remote change locally | Modified change, nil to skip, error to abort |
AfterInboundChange | After a remote change is merged | Error (best-effort, doesn't abort) |
BeforeOutboundChange | Before pushing a local change to peers | Modified change, nil to skip, error to abort |
BeforeOutboundRead | Before returning changes in a pull response | Filtered/modified slice |
BaseSyncHook
Embed BaseSyncHook and override only the methods you need:
type TenantFilter struct {
crdt.BaseSyncHook
tenantID string
}
func (h *TenantFilter) BeforeInboundChange(ctx context.Context, c *crdt.ChangeRecord) (*crdt.ChangeRecord, error) {
// Reject changes from other tenants.
if c.Table != "tenant_"+h.tenantID+"_documents" {
return nil, nil // Skip this change
}
return c, nil
}
func (h *TenantFilter) BeforeOutboundRead(ctx context.Context, changes []crdt.ChangeRecord) ([]crdt.ChangeRecord, error) {
// Only send this tenant's changes.
var filtered []crdt.ChangeRecord
for _, c := range changes {
if c.Table == "tenant_"+h.tenantID+"_documents" {
filtered = append(filtered, c)
}
}
return filtered, nil
}Registering Hooks
// On the plugin (applies to syncer and controller).
plugin := crdt.New(
crdt.WithNodeID("node-1"),
crdt.WithSyncHook(&TenantFilter{tenantID: "acme"}),
crdt.WithSyncHook(&AuditLogger{}),
)
// On the controller only (additional hooks for server-side).
ctrl := crdt.NewSyncController(plugin,
crdt.WithControllerSyncHook(&RateLimiter{}),
)Hook Chain
Multiple hooks are called in registration order. For Before* methods, the output of one hook becomes the input of the next. A nil return skips the change entirely. An error aborts the chain.
Standalone HTTP Handler
For apps not using Forge, create a standard http.Handler:
handler := crdt.NewHTTPHandler(plugin,
crdt.WithStreamPollInterval(2 * time.Second),
)
mux := http.NewServeMux()
mux.Handle("/sync/", http.StripPrefix("/sync", handler))
http.ListenAndServe(":8080", mux)This registers:
POST /sync/pull— Pull changesPOST /sync/push— Push changes
For SSE streaming with the standalone handler, see SSE Streaming.
Forge Integration
When using the Grove Forge extension, routes are registered automatically:
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncer(syncer),
groveext.WithSyncController(
crdt.WithStreamPollInterval(2 * time.Second),
crdt.WithStreamKeepAlive(30 * time.Second),
),
groveext.WithBasePath("/api/sync"), // Custom prefix
))See Forge Extension for the full configuration reference.