CRDT: Multi-Node Sync
Set up multi-node CRDT sync with Grove — hub-and-spoke, peer-to-peer, and full Forge integration examples.
This guide demonstrates setting up multi-node CRDT sync with Grove. We cover hub-and-spoke (cloud + edge nodes), peer-to-peer, and full Forge integration.
Architecture Options
Grove CRDT supports three topologies:
Hub-and-Spoke Peer-to-Peer Edge-to-Cloud
┌─────┐ ┌───┐ ┌───┐ ┌──────┐
│ Hub │ │ A │───│ B │ │ Edge │
└──┬──┘ └─┬─┘ └─┬─┘ └──┬───┘
┌───┼───┐ │ │ │
┌─┴─┐│┌─┴─┐ ┌─┴─┐ │ ┌──┴───┐
│ A ││ │ C │ │ C │─────┘ │Cloud │
└───┘│└───┘ └───┘ └──────┘
┌─┴─┐
│ B │
└───┘Hub-and-Spoke: Cloud Node
The cloud node acts as the central sync server. It receives changes from all edge nodes and serves them back.
Cloud Server
package main
import (
"context"
"log"
"github.com/xraph/forge"
"github.com/xraph/grove"
"github.com/xraph/grove/crdt"
"github.com/xraph/grove/hook"
"github.com/xraph/grove/drivers/pgdriver"
groveext "github.com/xraph/grove/extension"
)
func main() {
ctx := context.Background()
pgdb := pgdriver.New()
if err := pgdb.Open(ctx, "postgres://user:pass@localhost:5432/mydb"); err != nil {
log.Fatal(err)
}
// Cloud node's CRDT plugin.
plugin := crdt.New(
crdt.WithNodeID("cloud-us-east"),
crdt.WithTables("orders", "inventory"),
)
// Cloud doesn't need a syncer — it only serves sync endpoints.
app := forge.New()
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{
Tables: []string{"orders", "inventory"},
}),
groveext.WithSyncController(
crdt.WithStreamPollInterval(1 * time.Second),
crdt.WithStreamKeepAlive(15 * time.Second),
),
groveext.WithMigrations(crdt.Migrations),
))
// Registers: POST /sync/pull, POST /sync/push, GET /sync/stream
if err := app.Start(ctx); err != nil {
log.Fatal(err)
}
}Edge Node
Each edge node syncs with the cloud:
package main
import (
"context"
"log"
"time"
"github.com/xraph/grove"
"github.com/xraph/grove/crdt"
"github.com/xraph/grove/hook"
"github.com/xraph/grove/drivers/sqlitedriver"
)
func main() {
ctx := context.Background()
// Local SQLite for offline-capable storage.
driver := sqlitedriver.New()
driver.Open(ctx, "file:edge.db")
db, _ := grove.Open(driver)
defer db.Close()
plugin := crdt.New(
crdt.WithNodeID("edge-store-42"),
crdt.WithTables("orders", "inventory"),
)
db.Hooks().AddHook(plugin, hook.Scope{
Tables: []string{"orders", "inventory"},
})
plugin.EnsureShadowTable(ctx, "orders")
plugin.EnsureShadowTable(ctx, "inventory")
// Connect to cloud with streaming.
transport := crdt.NewStreamingTransport(
"https://cloud-us-east.example.com/sync",
crdt.WithStreamTables("orders", "inventory"),
crdt.WithStreamReconnect(5 * time.Second),
)
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(transport),
crdt.WithSyncTables("orders", "inventory"),
crdt.WithSyncInterval(30 * time.Second),
)
// Run both sync strategies.
go syncer.Run(ctx)
go syncer.StreamSync(ctx)
// Edge node processes local orders...
select {
case <-ctx.Done():
}
}Hub-and-Spoke: Hub Syncs with Multiple Edges
If the hub needs to actively sync (push) to edges, configure it with multiple peer transports:
// On the hub, actively sync with all edge nodes.
syncer := crdt.NewSyncer(plugin,
crdt.WithPeers(
crdt.NewStreamingTransport("https://edge-1.example.com/sync",
crdt.WithStreamTables("orders", "inventory"),
),
crdt.NewStreamingTransport("https://edge-2.example.com/sync",
crdt.WithStreamTables("orders", "inventory"),
),
crdt.NewStreamingTransport("https://edge-3.example.com/sync",
crdt.WithStreamTables("orders", "inventory"),
),
),
crdt.WithSyncTables("orders", "inventory"),
crdt.WithSyncInterval(15 * time.Second),
)
go syncer.Run(ctx)
go syncer.StreamSync(ctx)Peer-to-Peer Sync
In P2P mode, every node syncs directly with every other node. Each node serves sync endpoints and connects to all peers.
Node A
// Node A serves sync endpoints and connects to B and C.
plugin := crdt.New(crdt.WithNodeID("node-a"))
// Serve endpoints via Forge.
app := forge.New()
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncController(),
))
// Connect to peers.
syncer := crdt.NewSyncer(plugin,
crdt.WithPeers(
crdt.NewStreamingTransport("https://node-b.example.com/sync"),
crdt.NewStreamingTransport("https://node-c.example.com/sync"),
),
crdt.WithSyncTables("documents"),
crdt.WithSyncInterval(30 * time.Second),
)
go syncer.Run(ctx)
go syncer.StreamSync(ctx)
app.Start(ctx)Node B
// Node B serves endpoints and connects to A and C.
plugin := crdt.New(crdt.WithNodeID("node-b"))
app := forge.New()
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncController(),
))
syncer := crdt.NewSyncer(plugin,
crdt.WithPeers(
crdt.NewStreamingTransport("https://node-a.example.com/sync"),
crdt.NewStreamingTransport("https://node-c.example.com/sync"),
),
crdt.WithSyncTables("documents"),
)
go syncer.Run(ctx)
go syncer.StreamSync(ctx)
app.Start(ctx)Sync Hooks for Multi-Tenant
Use sync hooks to isolate data between tenants in a shared cluster:
type TenantIsolation struct {
crdt.BaseSyncHook
tenantID string
}
// Only accept inbound changes for this tenant's tables.
func (h *TenantIsolation) BeforeInboundChange(ctx context.Context, c *crdt.ChangeRecord) (*crdt.ChangeRecord, error) {
if !strings.HasPrefix(c.Table, "tenant_"+h.tenantID+"_") {
return nil, nil // Skip changes for other tenants
}
return c, nil
}
// Only send outbound changes for this tenant.
func (h *TenantIsolation) BeforeOutboundRead(ctx context.Context, changes []crdt.ChangeRecord) ([]crdt.ChangeRecord, error) {
var filtered []crdt.ChangeRecord
for _, c := range changes {
if strings.HasPrefix(c.Table, "tenant_"+h.tenantID+"_") {
filtered = append(filtered, c)
}
}
return filtered, nil
}
// Register the hook.
plugin := crdt.New(
crdt.WithNodeID("node-1"),
crdt.WithSyncHook(&TenantIsolation{tenantID: "acme"}),
)Audit Logging Hook
Log all sync operations for debugging and compliance:
type AuditLogger struct {
crdt.BaseSyncHook
logger *slog.Logger
}
func (h *AuditLogger) AfterInboundChange(ctx context.Context, c *crdt.ChangeRecord) error {
h.logger.Info("inbound change merged",
slog.String("table", c.Table),
slog.String("pk", c.PK),
slog.String("field", c.Field),
slog.String("from_node", c.NodeID),
slog.String("type", string(c.CRDTType)),
)
return nil
}
func (h *AuditLogger) BeforeOutboundChange(ctx context.Context, c *crdt.ChangeRecord) (*crdt.ChangeRecord, error) {
h.logger.Info("outbound change",
slog.String("table", c.Table),
slog.String("pk", c.PK),
slog.String("field", c.Field),
)
return c, nil
}Full Two-Node Forge Example
A complete example with two Forge nodes syncing documents:
Shared Model
package models
import "github.com/xraph/grove"
type Document struct {
grove.BaseModel `grove:"table:documents,alias:d"`
ID string `grove:"id,pk"`
Title string `grove:"title,crdt:lww"`
Body string `grove:"body,crdt:lww"`
ViewCount int64 `grove:"view_count,crdt:counter"`
Tags []string `grove:"tags,type:jsonb,crdt:set"`
}Node 1 (Primary)
func main() {
ctx := context.Background()
pgdb := pgdriver.New()
pgdb.Open(ctx, "postgres://user:pass@localhost:5432/node1")
plugin := crdt.New(crdt.WithNodeID("node-1"))
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(crdt.NewStreamingTransport("http://localhost:8082/sync")),
crdt.WithSyncTables("documents"),
crdt.WithSyncInterval(10 * time.Second),
)
app := forge.New()
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncer(syncer),
groveext.WithSyncController(),
groveext.WithMigrations(crdt.Migrations),
))
app.POST("/documents", createDocHandler)
app.PUT("/documents/:id", updateDocHandler)
app.GET("/documents", listDocsHandler)
app.Start(ctx) // Listens on :8081
}Node 2 (Replica)
func main() {
ctx := context.Background()
pgdb := pgdriver.New()
pgdb.Open(ctx, "postgres://user:pass@localhost:5432/node2")
plugin := crdt.New(crdt.WithNodeID("node-2"))
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(crdt.NewStreamingTransport("http://localhost:8081/sync")),
crdt.WithSyncTables("documents"),
crdt.WithSyncInterval(10 * time.Second),
)
app := forge.New()
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncer(syncer),
groveext.WithSyncController(),
groveext.WithMigrations(crdt.Migrations),
))
app.POST("/documents", createDocHandler)
app.PUT("/documents/:id", updateDocHandler)
app.GET("/documents", listDocsHandler)
app.Start(ctx) // Listens on :8082
}Both nodes can accept writes independently. Changes sync bidirectionally via the CRDT merge engine, with SSE streaming for real-time propagation and periodic pull/push as a fallback.