Grove
CRDT Integration

Sync Engine

Cross-store CRDT synchronization with the Grove KV Syncer — bidirectional merge, background sync loops, and key pattern filtering.

The kvcrdt.Syncer provides bidirectional CRDT state synchronization between two KV stores. It scans for keys matching a configurable pattern, loads the serialized CRDT state from both stores, merges them using per-field LWW semantics, and writes the merged result back to both stores.

Creating a Syncer

A Syncer requires two KV stores (primary and replica) and accepts optional configuration:

import (
    "time"

    "github.com/xraph/grove/kv"
    kvcrdt "github.com/xraph/grove/kv/crdt"
)

primaryStore := kv.NewMemoryStore()
replicaStore := kv.NewMemoryStore()

syncer := kvcrdt.NewSyncer(primaryStore, replicaStore,
    kvcrdt.WithSyncInterval(10 * time.Second),
    kvcrdt.WithKeyPattern("crdt:*"),
)

Syncer Options

OptionSignatureDefaultDescription
WithSyncIntervalWithSyncInterval(d time.Duration) SyncerOption5sInterval between sync rounds when running in background mode
WithKeyPatternWithKeyPattern(pattern string) SyncerOption"crdt:*"Key pattern used to scan for CRDT keys during sync

The key pattern determines which keys the Syncer examines. Only keys matching the pattern are scanned and merged. Use a prefix convention (e.g., crdt:*) to separate CRDT-managed keys from regular KV data.

Single Sync

Call Sync to perform a single round of bidirectional merge:

report, err := syncer.Sync(ctx)
if err != nil {
    log.Fatal("sync failed:", err)
}

fmt.Printf("Merged: %d, Pushed: %d, Pulled: %d\n",
    report.Merged, report.Pushed, report.Pulled)

Sync returns a crdt.SyncReport with counters for the operations performed:

FieldDescription
MergedKeys where both stores had state and a merge was performed
PushedKeys that existed only in the primary and were copied to the replica
PulledKeys that existed only in the replica and were copied to the primary

A single Sync call is useful for:

  • On-demand sync triggered by an API call or event
  • One-time migration of CRDT state between stores
  • Testing and debugging sync behavior

Background Sync Loop

Call Start to begin a periodic sync goroutine that runs at the configured interval:

syncer := kvcrdt.NewSyncer(primaryStore, replicaStore,
    kvcrdt.WithSyncInterval(10 * time.Second),
    kvcrdt.WithKeyPattern("crdt:*"),
)

// Start the background sync loop.
syncer.Start(ctx)

// ... application runs ...

// Stop the sync loop gracefully.
syncer.Stop()

Start

Start(ctx context.Context) launches a goroutine that calls Sync at every tick of the configured interval. The goroutine stops when the context is cancelled or Stop is called.

Stop

Stop() cancels the background context and waits for the sync goroutine to finish. It is safe to call Stop multiple times. Always call Stop before shutting down to ensure the final sync round completes cleanly.

// Typical lifecycle in a server.
syncer.Start(ctx)
defer syncer.Stop()

// Block until shutdown signal.
<-shutdownCh

How Sync Works

Each Sync call executes the following steps:

1. Scan primary store
   |  Find all keys matching the key pattern (e.g., "crdt:*")
   v
2. For each key in primary:
   |  a. Load raw bytes from primary store
   |  b. Load raw bytes from replica store
   |  c. If both exist: unmarshal as crdt.State, merge per-field LWW, write merged state to both stores
   |  d. If only primary has it: copy raw bytes to replica (push)
   v
3. Scan replica store
   |  Find keys matching the pattern that were NOT in the primary scan
   v
4. For each new key in replica:
   |  Copy raw bytes to primary (pull)
   v
5. Return SyncReport with counts

Merge Details

When both stores contain state for the same key, the Syncer attempts to unmarshal the raw bytes as a crdt.State (the map-style CRDT with per-field FieldState entries). For each field, the entry with the higher HLC timestamp wins:

For each field in remote.Fields:
  if field not in local.Fields OR remote.HLC > local.HLC:
    merged.Fields[field] = remote.Fields[field]

The merged state is then written back to both stores, ensuring they converge.

If the raw bytes cannot be unmarshaled as a crdt.State (e.g., the key holds a Counter or Set serialized directly), the Syncer falls back to a simple copy from primary to replica when the replica has no value for that key.

Use Cases

Multi-Region Replication

Deploy a KV store at each region. Application nodes write CRDT state to their local store. A Syncer running between each pair of regional stores merges state periodically:

// US-East to EU-West sync.
syncerEU := kvcrdt.NewSyncer(storeUSEast, storeEUWest,
    kvcrdt.WithSyncInterval(30 * time.Second),
    kvcrdt.WithKeyPattern("crdt:*"),
)
syncerEU.Start(ctx)
defer syncerEU.Stop()

// US-East to AP-Southeast sync.
syncerAP := kvcrdt.NewSyncer(storeUSEast, storeAPSoutheast,
    kvcrdt.WithSyncInterval(30 * time.Second),
    kvcrdt.WithKeyPattern("crdt:*"),
)
syncerAP.Start(ctx)
defer syncerAP.Stop()

Because CRDT merge is commutative, associative, and idempotent, the order in which regions sync does not matter. All regions will converge to the same state.

Edge-to-Cloud Sync

An edge device (e.g., a point-of-sale terminal or IoT gateway) uses an embedded KV store and accumulates CRDT operations while operating independently. When the device connects to the cloud, a single Sync call merges all changes:

// On the edge device, triggered when connectivity is available.
syncer := kvcrdt.NewSyncer(edgeStore, cloudStore)
report, err := syncer.Sync(ctx)
if err != nil {
    log.Println("edge sync failed, will retry:", err)
    return
}
log.Printf("Edge sync complete: merged=%d pushed=%d pulled=%d",
    report.Merged, report.Pushed, report.Pulled)

Selective Key Sync

Use WithKeyPattern to sync only a subset of keys. This is useful when different Syncers handle different data domains:

// Sync only configuration keys.
configSyncer := kvcrdt.NewSyncer(storeA, storeB,
    kvcrdt.WithKeyPattern("crdt:config:*"),
    kvcrdt.WithSyncInterval(5 * time.Second),
)

// Sync only metrics keys, less frequently.
metricsSyncer := kvcrdt.NewSyncer(storeA, storeB,
    kvcrdt.WithKeyPattern("crdt:metrics:*"),
    kvcrdt.WithSyncInterval(60 * time.Second),
)

configSyncer.Start(ctx)
metricsSyncer.Start(ctx)
defer configSyncer.Stop()
defer metricsSyncer.Stop()

Key Naming Conventions

The default key pattern is crdt:*. While you can use any pattern, a consistent naming convention makes sync configuration simpler and prevents accidental sync of non-CRDT data:

PatternExample KeyDescription
crdt:config:*crdt:config:appApplication configuration maps
crdt:metrics:*crdt:metrics:page:viewsDistributed counters and metrics
crdt:user:*crdt:user:42:namePer-user CRDT registers
crdt:tags:*crdt:tags:article:1Distributed tag sets

Prefix all CRDT keys with crdt: to ensure the default Syncer pattern discovers them and to keep CRDT state separate from regular KV entries.

On this page