Grove

CRDT: Multi-Node Sync

Set up multi-node CRDT sync with Grove — hub-and-spoke, peer-to-peer, and full Forge integration examples.

This guide demonstrates setting up multi-node CRDT sync with Grove. We cover hub-and-spoke (cloud + edge nodes), peer-to-peer, and full Forge integration.

Architecture Options

Grove CRDT supports three topologies:

Hub-and-Spoke          Peer-to-Peer           Edge-to-Cloud

   ┌─────┐            ┌───┐   ┌───┐          ┌──────┐
   │ Hub │            │ A │───│ B │          │ Edge │
   └──┬──┘            └─┬─┘   └─┬─┘          └──┬───┘
  ┌───┼───┐              │       │               │
┌─┴─┐│┌─┴─┐           ┌─┴─┐     │            ┌──┴───┐
│ A ││ │ C │           │ C │─────┘            │Cloud │
└───┘│└───┘            └───┘                  └──────┘
   ┌─┴─┐
   │ B │
   └───┘

Hub-and-Spoke: Cloud Node

The cloud node acts as the central sync server. It receives changes from all edge nodes and serves them back.

Cloud Server

package main

import (
    "context"
    "log"

    "github.com/xraph/forge"
    "github.com/xraph/grove"
    "github.com/xraph/grove/crdt"
    "github.com/xraph/grove/hook"
    "github.com/xraph/grove/drivers/pgdriver"
    groveext "github.com/xraph/grove/extension"
)

func main() {
    ctx := context.Background()

    pgdb := pgdriver.New()
    if err := pgdb.Open(ctx, "postgres://user:pass@localhost:5432/mydb"); err != nil {
        log.Fatal(err)
    }

    // Cloud node's CRDT plugin.
    plugin := crdt.New(
        crdt.WithNodeID("cloud-us-east"),
        crdt.WithTables("orders", "inventory"),
    )

    // Cloud doesn't need a syncer — it only serves sync endpoints.
    app := forge.New()
    app.Use(groveext.New(
        groveext.WithDriver(pgdb),
        groveext.WithCRDT(plugin, hook.Scope{
            Tables: []string{"orders", "inventory"},
        }),
        groveext.WithSyncController(
            crdt.WithStreamPollInterval(1 * time.Second),
            crdt.WithStreamKeepAlive(15 * time.Second),
        ),
        groveext.WithMigrations(crdt.Migrations),
    ))

    // Registers: POST /sync/pull, POST /sync/push, GET /sync/stream
    if err := app.Start(ctx); err != nil {
        log.Fatal(err)
    }
}

Edge Node

Each edge node syncs with the cloud:

package main

import (
    "context"
    "log"
    "time"

    "github.com/xraph/grove"
    "github.com/xraph/grove/crdt"
    "github.com/xraph/grove/hook"
    "github.com/xraph/grove/drivers/sqlitedriver"
)

func main() {
    ctx := context.Background()

    // Local SQLite for offline-capable storage.
    driver := sqlitedriver.New()
    driver.Open(ctx, "file:edge.db")

    db, _ := grove.Open(driver)
    defer db.Close()

    plugin := crdt.New(
        crdt.WithNodeID("edge-store-42"),
        crdt.WithTables("orders", "inventory"),
    )

    db.Hooks().AddHook(plugin, hook.Scope{
        Tables: []string{"orders", "inventory"},
    })

    plugin.EnsureShadowTable(ctx, "orders")
    plugin.EnsureShadowTable(ctx, "inventory")

    // Connect to cloud with streaming.
    transport := crdt.NewStreamingTransport(
        "https://cloud-us-east.example.com/sync",
        crdt.WithStreamTables("orders", "inventory"),
        crdt.WithStreamReconnect(5 * time.Second),
    )

    syncer := crdt.NewSyncer(plugin,
        crdt.WithTransport(transport),
        crdt.WithSyncTables("orders", "inventory"),
        crdt.WithSyncInterval(30 * time.Second),
    )

    // Run both sync strategies.
    go syncer.Run(ctx)
    go syncer.StreamSync(ctx)

    // Edge node processes local orders...
    select {
    case <-ctx.Done():
    }
}

Hub-and-Spoke: Hub Syncs with Multiple Edges

If the hub needs to actively sync (push) to edges, configure it with multiple peer transports:

// On the hub, actively sync with all edge nodes.
syncer := crdt.NewSyncer(plugin,
    crdt.WithPeers(
        crdt.NewStreamingTransport("https://edge-1.example.com/sync",
            crdt.WithStreamTables("orders", "inventory"),
        ),
        crdt.NewStreamingTransport("https://edge-2.example.com/sync",
            crdt.WithStreamTables("orders", "inventory"),
        ),
        crdt.NewStreamingTransport("https://edge-3.example.com/sync",
            crdt.WithStreamTables("orders", "inventory"),
        ),
    ),
    crdt.WithSyncTables("orders", "inventory"),
    crdt.WithSyncInterval(15 * time.Second),
)

go syncer.Run(ctx)
go syncer.StreamSync(ctx)

Peer-to-Peer Sync

In P2P mode, every node syncs directly with every other node. Each node serves sync endpoints and connects to all peers.

Node A

// Node A serves sync endpoints and connects to B and C.
plugin := crdt.New(crdt.WithNodeID("node-a"))

// Serve endpoints via Forge.
app := forge.New()
app.Use(groveext.New(
    groveext.WithDriver(pgdb),
    groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
    groveext.WithSyncController(),
))

// Connect to peers.
syncer := crdt.NewSyncer(plugin,
    crdt.WithPeers(
        crdt.NewStreamingTransport("https://node-b.example.com/sync"),
        crdt.NewStreamingTransport("https://node-c.example.com/sync"),
    ),
    crdt.WithSyncTables("documents"),
    crdt.WithSyncInterval(30 * time.Second),
)

go syncer.Run(ctx)
go syncer.StreamSync(ctx)
app.Start(ctx)

Node B

// Node B serves endpoints and connects to A and C.
plugin := crdt.New(crdt.WithNodeID("node-b"))

app := forge.New()
app.Use(groveext.New(
    groveext.WithDriver(pgdb),
    groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
    groveext.WithSyncController(),
))

syncer := crdt.NewSyncer(plugin,
    crdt.WithPeers(
        crdt.NewStreamingTransport("https://node-a.example.com/sync"),
        crdt.NewStreamingTransport("https://node-c.example.com/sync"),
    ),
    crdt.WithSyncTables("documents"),
)

go syncer.Run(ctx)
go syncer.StreamSync(ctx)
app.Start(ctx)

Sync Hooks for Multi-Tenant

Use sync hooks to isolate data between tenants in a shared cluster:

type TenantIsolation struct {
    crdt.BaseSyncHook
    tenantID string
}

// Only accept inbound changes for this tenant's tables.
func (h *TenantIsolation) BeforeInboundChange(ctx context.Context, c *crdt.ChangeRecord) (*crdt.ChangeRecord, error) {
    if !strings.HasPrefix(c.Table, "tenant_"+h.tenantID+"_") {
        return nil, nil // Skip changes for other tenants
    }
    return c, nil
}

// Only send outbound changes for this tenant.
func (h *TenantIsolation) BeforeOutboundRead(ctx context.Context, changes []crdt.ChangeRecord) ([]crdt.ChangeRecord, error) {
    var filtered []crdt.ChangeRecord
    for _, c := range changes {
        if strings.HasPrefix(c.Table, "tenant_"+h.tenantID+"_") {
            filtered = append(filtered, c)
        }
    }
    return filtered, nil
}

// Register the hook.
plugin := crdt.New(
    crdt.WithNodeID("node-1"),
    crdt.WithSyncHook(&TenantIsolation{tenantID: "acme"}),
)

Audit Logging Hook

Log all sync operations for debugging and compliance:

type AuditLogger struct {
    crdt.BaseSyncHook
    logger *slog.Logger
}

func (h *AuditLogger) AfterInboundChange(ctx context.Context, c *crdt.ChangeRecord) error {
    h.logger.Info("inbound change merged",
        slog.String("table", c.Table),
        slog.String("pk", c.PK),
        slog.String("field", c.Field),
        slog.String("from_node", c.NodeID),
        slog.String("type", string(c.CRDTType)),
    )
    return nil
}

func (h *AuditLogger) BeforeOutboundChange(ctx context.Context, c *crdt.ChangeRecord) (*crdt.ChangeRecord, error) {
    h.logger.Info("outbound change",
        slog.String("table", c.Table),
        slog.String("pk", c.PK),
        slog.String("field", c.Field),
    )
    return c, nil
}

Full Two-Node Forge Example

A complete example with two Forge nodes syncing documents:

Shared Model

package models

import "github.com/xraph/grove"

type Document struct {
    grove.BaseModel `grove:"table:documents,alias:d"`
    ID        string   `grove:"id,pk"`
    Title     string   `grove:"title,crdt:lww"`
    Body      string   `grove:"body,crdt:lww"`
    ViewCount int64    `grove:"view_count,crdt:counter"`
    Tags      []string `grove:"tags,type:jsonb,crdt:set"`
}

Node 1 (Primary)

func main() {
    ctx := context.Background()

    pgdb := pgdriver.New()
    pgdb.Open(ctx, "postgres://user:pass@localhost:5432/node1")

    plugin := crdt.New(crdt.WithNodeID("node-1"))

    syncer := crdt.NewSyncer(plugin,
        crdt.WithTransport(crdt.NewStreamingTransport("http://localhost:8082/sync")),
        crdt.WithSyncTables("documents"),
        crdt.WithSyncInterval(10 * time.Second),
    )

    app := forge.New()
    app.Use(groveext.New(
        groveext.WithDriver(pgdb),
        groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
        groveext.WithSyncer(syncer),
        groveext.WithSyncController(),
        groveext.WithMigrations(crdt.Migrations),
    ))

    app.POST("/documents", createDocHandler)
    app.PUT("/documents/:id", updateDocHandler)
    app.GET("/documents", listDocsHandler)

    app.Start(ctx) // Listens on :8081
}

Node 2 (Replica)

func main() {
    ctx := context.Background()

    pgdb := pgdriver.New()
    pgdb.Open(ctx, "postgres://user:pass@localhost:5432/node2")

    plugin := crdt.New(crdt.WithNodeID("node-2"))

    syncer := crdt.NewSyncer(plugin,
        crdt.WithTransport(crdt.NewStreamingTransport("http://localhost:8081/sync")),
        crdt.WithSyncTables("documents"),
        crdt.WithSyncInterval(10 * time.Second),
    )

    app := forge.New()
    app.Use(groveext.New(
        groveext.WithDriver(pgdb),
        groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
        groveext.WithSyncer(syncer),
        groveext.WithSyncController(),
        groveext.WithMigrations(crdt.Migrations),
    ))

    app.POST("/documents", createDocHandler)
    app.PUT("/documents/:id", updateDocHandler)
    app.GET("/documents", listDocsHandler)

    app.Start(ctx) // Listens on :8082
}

Both nodes can accept writes independently. Changes sync bidirectionally via the CRDT merge engine, with SSE streaming for real-time propagation and periodic pull/push as a fallback.

On this page