Grove

Getting Started

Set up CRDT-enabled models, create the plugin, ensure shadow tables, and perform your first sync.

This guide walks through the complete setup for Grove CRDTs: defining models, creating the plugin, setting up shadow tables, and performing your first sync between nodes.

Installation

go get github.com/xraph/grove/crdt

Define a CRDT-Enabled Model

Tag fields with crdt:lww, crdt:counter, or crdt:set to opt into CRDT behavior. Fields without CRDT tags are unaffected.

package models

import "github.com/xraph/grove"

type Document struct {
    grove.BaseModel `grove:"table:documents,alias:d"`
    ID        string   `grove:"id,pk"`
    Title     string   `grove:"title,crdt:lww"`       // Last-Writer-Wins
    ViewCount int64    `grove:"view_count,crdt:counter"` // PN-Counter
    Tags      []string `grove:"tags,type:jsonb,crdt:set"` // OR-Set
    Author    string   `grove:"author"`                // Normal field (no CRDT)
}

Each CRDT type has different merge behavior:

  • crdt:lww — The write with the highest HLC timestamp wins. Best for scalar values like strings, booleans, or JSON objects.
  • crdt:counter — Tracks per-node increments and decrements separately. Best for counters that multiple nodes can update concurrently.
  • crdt:set — Add-wins observed-remove set. Best for collections like tags or member lists.

Create the CRDT Plugin

The plugin intercepts Grove mutations and tracks changes in shadow tables.

import (
    "github.com/xraph/grove/crdt"
    "github.com/xraph/grove/hook"
)

// Create the plugin with a unique node identifier.
plugin := crdt.New(
    crdt.WithNodeID("node-1"),           // Required: unique per node
    crdt.WithTables("documents"),         // Optional: restrict to specific tables
    crdt.WithTombstoneTTL(7 * 24 * time.Hour), // Optional: tombstone expiry (default: 7d)
)

// Register with the Grove DB hook engine.
db.Hooks().AddHook(plugin, hook.Scope{
    Tables: []string{"documents"},
})

The NodeID must be unique across all nodes in the sync cluster. Use descriptive names like "cloud-us-east", "edge-store-42", or "browser-abc123".

Shadow Tables

CRDT metadata is stored in shadow tables named _<table>_crdt. These sit alongside your primary table without modifying its schema.

Option A: Ensure at Runtime

// Create shadow table for the "documents" table.
err := plugin.EnsureShadowTable(ctx, "documents")
if err != nil {
    log.Fatal(err)
}

Option B: Use Migrations

Register CRDT migrations with your Grove migration group:

import "github.com/xraph/grove/crdt"

// In your migration setup
db.RegisterMigrations(crdt.Migrations)

The migration creates the shadow table with columns for field name, HLC timestamp, counter, node ID, and CRDT state (JSON), along with a composite sync index for efficient change queries.

Perform Your First Sync

Single Sync Round

import "github.com/xraph/grove/crdt"

// Create a transport pointing at the remote node.
transport := crdt.HTTPTransport("https://cloud.example.com/sync")

// Create a syncer.
syncer := crdt.NewSyncer(plugin,
    crdt.WithTransport(transport),
    crdt.WithSyncTables("documents"),
)

// Run one sync round: pull remote changes, merge locally, push local changes.
report, err := syncer.Sync(ctx)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Synced: %s\n", report) // pulled=5 pushed=3 merged=5 conflicts=0

Background Sync Loop

For continuous sync, run the syncer in a background goroutine:

// Sync every 30 seconds (configurable).
syncer := crdt.NewSyncer(plugin,
    crdt.WithTransport(transport),
    crdt.WithSyncTables("documents"),
    crdt.WithSyncInterval(30 * time.Second),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Run blocks until ctx is cancelled.
go syncer.Run(ctx)

Real-Time Streaming

For lower latency, add SSE streaming alongside periodic sync:

streamTransport := crdt.NewStreamingTransport(
    "https://cloud.example.com/sync",
    crdt.WithStreamTables("documents"),
    crdt.WithStreamReconnect(5 * time.Second),
)

syncer := crdt.NewSyncer(plugin,
    crdt.WithTransport(streamTransport),
    crdt.WithSyncTables("documents"),
    crdt.WithSyncInterval(30 * time.Second), // Fallback poll
)

go syncer.Run(ctx)        // Periodic pull/push
go syncer.StreamSync(ctx) // SSE real-time changes

Forge Integration

When running inside a Forge app, use the Grove extension for automatic route registration, SSE streaming, and lifecycle management:

import (
    "github.com/xraph/forge"
    "github.com/xraph/grove/drivers/pgdriver"
    groveext "github.com/xraph/grove/extension"
    "github.com/xraph/grove/crdt"
    "github.com/xraph/grove/hook"
)

pgdb := pgdriver.New()
pgdb.Open(ctx, "postgres://user:pass@localhost:5432/mydb")

plugin := crdt.New(crdt.WithNodeID("node-1"))
syncer := crdt.NewSyncer(plugin,
    crdt.WithTransport(crdt.HTTPTransport("https://peer.example.com/sync")),
    crdt.WithSyncTables("documents"),
)

app := forge.New()
app.Use(groveext.New(
    groveext.WithDriver(pgdb),
    groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
    groveext.WithSyncer(syncer),
    groveext.WithSyncController(
        crdt.WithStreamPollInterval(2 * time.Second),
        crdt.WithStreamKeepAlive(30 * time.Second),
        crdt.WithPresenceEnabled(true),              // Enable presence tracking
        crdt.WithPresenceTTL(30 * time.Second),       // Entry expiry (default: 30s)
    ),
))
app.Start(ctx)

This automatically registers sync and presence endpoints:

MethodPathDescription
POST/sync/pullRemote nodes pull changes
POST/sync/pushRemote nodes push changes
GET/sync/streamSSE stream of real-time changes and presence events
POST/sync/presenceUpdate or leave a presence topic
GET/sync/presenceGet current presence snapshot for a topic

Use groveext.WithBasePath("/api/sync") to change the route prefix.

Inspecting State

For debugging, inspect the full CRDT state of any record:

state, err := plugin.Inspect(ctx, "documents", "doc-1")
if err != nil {
    log.Fatal(err)
}

result := crdt.InspectState(state)
fmt.Println(result)
// Output:
// CRDT State for documents[doc-1]
//   title (lww): "Hello World" (node=node-1, hlc=HLC{ts:1234 c:0 node:node-1})
//   view_count (counter): 42 [node-1: +30 -0] [node-2: +12 -0]
//   tags (set): ["go", "crdt"]

Next Steps

On this page