Getting Started
Set up CRDT-enabled models, create the plugin, ensure shadow tables, and perform your first sync.
This guide walks through the complete setup for Grove CRDTs: defining models, creating the plugin, setting up shadow tables, and performing your first sync between nodes.
Installation
go get github.com/xraph/grove/crdtDefine a CRDT-Enabled Model
Tag fields with crdt:lww, crdt:counter, or crdt:set to opt into CRDT behavior. Fields without CRDT tags are unaffected.
package models
import "github.com/xraph/grove"
type Document struct {
grove.BaseModel `grove:"table:documents,alias:d"`
ID string `grove:"id,pk"`
Title string `grove:"title,crdt:lww"` // Last-Writer-Wins
ViewCount int64 `grove:"view_count,crdt:counter"` // PN-Counter
Tags []string `grove:"tags,type:jsonb,crdt:set"` // OR-Set
Author string `grove:"author"` // Normal field (no CRDT)
}Each CRDT type has different merge behavior:
crdt:lww— The write with the highest HLC timestamp wins. Best for scalar values like strings, booleans, or JSON objects.crdt:counter— Tracks per-node increments and decrements separately. Best for counters that multiple nodes can update concurrently.crdt:set— Add-wins observed-remove set. Best for collections like tags or member lists.
Create the CRDT Plugin
The plugin intercepts Grove mutations and tracks changes in shadow tables.
import (
"github.com/xraph/grove/crdt"
"github.com/xraph/grove/hook"
)
// Create the plugin with a unique node identifier.
plugin := crdt.New(
crdt.WithNodeID("node-1"), // Required: unique per node
crdt.WithTables("documents"), // Optional: restrict to specific tables
crdt.WithTombstoneTTL(7 * 24 * time.Hour), // Optional: tombstone expiry (default: 7d)
)
// Register with the Grove DB hook engine.
db.Hooks().AddHook(plugin, hook.Scope{
Tables: []string{"documents"},
})The NodeID must be unique across all nodes in the sync cluster. Use descriptive names like "cloud-us-east", "edge-store-42", or "browser-abc123".
Shadow Tables
CRDT metadata is stored in shadow tables named _<table>_crdt. These sit alongside your primary table without modifying its schema.
Option A: Ensure at Runtime
// Create shadow table for the "documents" table.
err := plugin.EnsureShadowTable(ctx, "documents")
if err != nil {
log.Fatal(err)
}Option B: Use Migrations
Register CRDT migrations with your Grove migration group:
import "github.com/xraph/grove/crdt"
// In your migration setup
db.RegisterMigrations(crdt.Migrations)The migration creates the shadow table with columns for field name, HLC timestamp, counter, node ID, and CRDT state (JSON), along with a composite sync index for efficient change queries.
Perform Your First Sync
Single Sync Round
import "github.com/xraph/grove/crdt"
// Create a transport pointing at the remote node.
transport := crdt.HTTPTransport("https://cloud.example.com/sync")
// Create a syncer.
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(transport),
crdt.WithSyncTables("documents"),
)
// Run one sync round: pull remote changes, merge locally, push local changes.
report, err := syncer.Sync(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Synced: %s\n", report) // pulled=5 pushed=3 merged=5 conflicts=0Background Sync Loop
For continuous sync, run the syncer in a background goroutine:
// Sync every 30 seconds (configurable).
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(transport),
crdt.WithSyncTables("documents"),
crdt.WithSyncInterval(30 * time.Second),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Run blocks until ctx is cancelled.
go syncer.Run(ctx)Real-Time Streaming
For lower latency, add SSE streaming alongside periodic sync:
streamTransport := crdt.NewStreamingTransport(
"https://cloud.example.com/sync",
crdt.WithStreamTables("documents"),
crdt.WithStreamReconnect(5 * time.Second),
)
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(streamTransport),
crdt.WithSyncTables("documents"),
crdt.WithSyncInterval(30 * time.Second), // Fallback poll
)
go syncer.Run(ctx) // Periodic pull/push
go syncer.StreamSync(ctx) // SSE real-time changesForge Integration
When running inside a Forge app, use the Grove extension for automatic route registration, SSE streaming, and lifecycle management:
import (
"github.com/xraph/forge"
"github.com/xraph/grove/drivers/pgdriver"
groveext "github.com/xraph/grove/extension"
"github.com/xraph/grove/crdt"
"github.com/xraph/grove/hook"
)
pgdb := pgdriver.New()
pgdb.Open(ctx, "postgres://user:pass@localhost:5432/mydb")
plugin := crdt.New(crdt.WithNodeID("node-1"))
syncer := crdt.NewSyncer(plugin,
crdt.WithTransport(crdt.HTTPTransport("https://peer.example.com/sync")),
crdt.WithSyncTables("documents"),
)
app := forge.New()
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncer(syncer),
groveext.WithSyncController(
crdt.WithStreamPollInterval(2 * time.Second),
crdt.WithStreamKeepAlive(30 * time.Second),
crdt.WithPresenceEnabled(true), // Enable presence tracking
crdt.WithPresenceTTL(30 * time.Second), // Entry expiry (default: 30s)
),
))
app.Start(ctx)This automatically registers sync and presence endpoints:
| Method | Path | Description |
|---|---|---|
| POST | /sync/pull | Remote nodes pull changes |
| POST | /sync/push | Remote nodes push changes |
| GET | /sync/stream | SSE stream of real-time changes and presence events |
| POST | /sync/presence | Update or leave a presence topic |
| GET | /sync/presence | Get current presence snapshot for a topic |
Use groveext.WithBasePath("/api/sync") to change the route prefix.
Inspecting State
For debugging, inspect the full CRDT state of any record:
state, err := plugin.Inspect(ctx, "documents", "doc-1")
if err != nil {
log.Fatal(err)
}
result := crdt.InspectState(state)
fmt.Println(result)
// Output:
// CRDT State for documents[doc-1]
// title (lww): "Hello World" (node=node-1, hlc=HLC{ts:1234 c:0 node:node-1})
// view_count (counter): 42 [node-1: +30 -0] [node-2: +12 -0]
// tags (set): ["go", "crdt"]Next Steps
- CRDT Types for detailed merge semantics of each type
- Sync Protocol for transport, multi-peer, and hook configuration
- SSE Streaming for real-time change propagation
- Presence & Awareness for ephemeral real-time state (typing indicators, cursors, online users)
- TypeScript Client for browser and React integration