Grove
CRDT Integration

CRDT Types

Available CRDT data types in Grove KV — Counter, Register, Set, and Map — with API reference, merge semantics, and usage examples.

The kvcrdt package provides four CRDT types that persist their state in a KV store. Each type wraps a core grove/crdt state struct and uses the adapter pattern described in the overview. All types share a common set of options and follow the same read-modify-write cycle.

Creating Types

All CRDT types are created with a KV store reference, a key, and optional configuration:

import (
    "github.com/xraph/grove/kv"
    kvcrdt "github.com/xraph/grove/kv/crdt"
)

store := kv.NewMemoryStore()

counter  := kvcrdt.NewCounter(store, "page:views", kvcrdt.WithNodeID("node-1"))
register := kvcrdt.NewRegister[string](store, "user:name", kvcrdt.WithNodeID("node-1"))
set      := kvcrdt.NewSet[string](store, "tags:article:1", kvcrdt.WithNodeID("node-1"))
m        := kvcrdt.NewMap(store, "config:app", kvcrdt.WithNodeID("node-1"))

Shared Options

OptionSignatureDescription
WithNodeIDWithNodeID(id string) OptionSets the node identifier. Each node must have a unique ID. Defaults to "default".
WithClockWithClock(clock crdt.Clock) OptionSets a custom HLC clock. Defaults to crdt.NewHybridClock(nodeID).
WithCodecWithCodec(cc codec.Codec) OptionSets the codec for serializing CRDT state. Defaults to JSON.

Counter

A distributed positive-negative counter backed by crdt.PNCounterState. Each node tracks its own increments and decrements independently. The counter value is the sum of all increments minus the sum of all decrements across all nodes.

Merge Algorithm

MergeCounter(local, remote):
  for each nodeID in union(local.nodes, remote.nodes):
    merged.Increments[nodeID] = max(local.Increments[nodeID], remote.Increments[nodeID])
    merged.Decrements[nodeID] = max(local.Decrements[nodeID], remote.Decrements[nodeID])
  return merged

Value(state):
  return sum(Increments) - sum(Decrements)

Taking the per-node max ensures that merge is idempotent and order-independent. Two nodes can independently increment and the merged counter will reflect both contributions.

API

MethodSignatureDescription
NewCounterNewCounter(store *kv.Store, key string, opts ...Option) *CounterCreates a new CRDT counter
Increment(c *Counter) Increment(ctx context.Context, delta int64) errorAdds delta to this node's increment total
Decrement(c *Counter) Decrement(ctx context.Context, delta int64) errorAdds delta to this node's decrement total
Value(c *Counter) Value(ctx context.Context) (int64, error)Returns the current counter value (all increments minus all decrements)
Merge(c *Counter) Merge(ctx context.Context, remote *crdt.PNCounterState) errorMerges a remote counter state into the local state
State(c *Counter) State(ctx context.Context) (*crdt.PNCounterState, error)Returns the raw state for sync or inspection

Example: Distributed Page View Counter

ctx := context.Background()

// Node US-East tracks page views.
counterEast := kvcrdt.NewCounter(storeEast, "crdt:page:home:views",
    kvcrdt.WithNodeID("us-east-1"),
)
_ = counterEast.Increment(ctx, 100)

// Node EU-West tracks page views independently.
counterWest := kvcrdt.NewCounter(storeWest, "crdt:page:home:views",
    kvcrdt.WithNodeID("eu-west-1"),
)
_ = counterWest.Increment(ctx, 250)

// After sync, both stores contain the merged state.
// Total value: 100 + 250 = 350
syncer := kvcrdt.NewSyncer(storeEast, storeWest)
_, _ = syncer.Sync(ctx)

val, _ := counterEast.Value(ctx)
fmt.Println("Total page views:", val) // Total page views: 350

Concurrent Update Example

Node A: Increment(+5)  -> Increments{"A": 5}
Node B: Increment(+3)  -> Increments{"B": 3}
Node A: Decrement(2)   -> Decrements{"A": 2}

After sync both nodes:
  Increments{"A": 5, "B": 3}, Decrements{"A": 2}
  Value = (5 + 3) - 2 = 6

Register[T]

A distributed Last-Writer-Wins register backed by crdt.LWWRegister. The value with the highest HLC timestamp wins. The type parameter T determines the value type (e.g., string, int, a struct).

Merge Algorithm

MergeLWW(local, remote):
  if remote.HLC > local.HLC:
    return remote
  if local.HLC > remote.HLC:
    return local
  // Tie: compare NodeIDs lexicographically
  if remote.NodeID > local.NodeID:
    return remote
  return local

HLC comparison uses timestamp first, then logical counter, then node ID. This guarantees a total ordering with no ambiguity, even when wall-clock times are identical.

API

MethodSignatureDescription
NewRegisterNewRegister[T any](store *kv.Store, key string, opts ...Option) *Register[T]Creates a new LWW register
Set(r *Register[T]) Set(ctx context.Context, value T) errorWrites a new value timestamped with the local HLC
Get(r *Register[T]) Get(ctx context.Context) (T, error)Reads the current value. Returns kv.ErrNotFound if unset.
Merge(r *Register[T]) Merge(ctx context.Context, remote *crdt.LWWRegister) errorMerges a remote register, keeping the winner per LWW semantics
State(r *Register[T]) State(ctx context.Context) (*crdt.LWWRegister, error)Returns the raw state for sync or inspection

Example: User Profile Field

ctx := context.Background()

// Node A sets the user's display name.
regA := kvcrdt.NewRegister[string](storeA, "crdt:user:42:name",
    kvcrdt.WithNodeID("node-a"),
)
_ = regA.Set(ctx, "Alice")

// Node B concurrently sets a different name.
regB := kvcrdt.NewRegister[string](storeB, "crdt:user:42:name",
    kvcrdt.WithNodeID("node-b"),
)
_ = regB.Set(ctx, "Alice Smith")

// After sync, the write with the higher HLC wins on both nodes.
syncer := kvcrdt.NewSyncer(storeA, storeB)
_, _ = syncer.Sync(ctx)

name, _ := regA.Get(ctx)
fmt.Println("Name:", name) // Name: Alice Smith (higher HLC wins)

Best For

  • Scalar values: strings, booleans, numbers
  • Fields where "last write" is the desired conflict resolution
  • Configuration values, status fields, user profile attributes

Set[T]

A distributed Observed-Remove Set (OR-Set) backed by crdt.ORSetState. When one node adds an element and another concurrently removes it, the add wins (add-wins semantics). Removals only affect elements that were observed at the time of removal.

Merge Algorithm

MergeSet(local, remote):
  for each element in union(local.Entries, remote.Entries):
    merged.Entries[element] = union(local.tags, remote.tags)
  merged.Removed = union(local.Removed, remote.Removed)
  return merged

Elements(state):
  result = []
  for each element in state.Entries:
    if any tag for this element is NOT in state.Removed:
      result.add(element)
  return result

Each Add operation creates a unique tag (nodeID:HLC). Remove marks all current tags for that element as removed. If a concurrent Add creates a new tag after the remove was issued, the element survives because the new tag is not in the removed set.

API

MethodSignatureDescription
NewSetNewSet[T any](store *kv.Store, key string, opts ...Option) *Set[T]Creates a new OR-Set
Add(s *Set[T]) Add(ctx context.Context, element T) errorInserts an element into the set
Remove(s *Set[T]) Remove(ctx context.Context, element T) errorRemoves an element by marking all its current tags as removed
Members(s *Set[T]) Members(ctx context.Context) ([]T, error)Returns all elements currently in the set
Contains(s *Set[T]) Contains(ctx context.Context, element T) (bool, error)Returns true if the element is in the set
Merge(s *Set[T]) Merge(ctx context.Context, remote *crdt.ORSetState) errorMerges a remote OR-Set state into the local state
State(s *Set[T]) State(ctx context.Context) (*crdt.ORSetState, error)Returns the raw state for sync or inspection

Example: Distributed Tag Set

ctx := context.Background()

// Node A adds tags to an article.
tagsA := kvcrdt.NewSet[string](storeA, "crdt:tags:article:1",
    kvcrdt.WithNodeID("node-a"),
)
_ = tagsA.Add(ctx, "go")
_ = tagsA.Add(ctx, "crdt")

// Node B concurrently adds and removes tags.
tagsB := kvcrdt.NewSet[string](storeB, "crdt:tags:article:1",
    kvcrdt.WithNodeID("node-b"),
)
_ = tagsB.Add(ctx, "distributed")
_ = tagsB.Remove(ctx, "crdt") // Only removes if "crdt" was observed by Node B

// After sync, both nodes converge.
syncer := kvcrdt.NewSyncer(storeA, storeB)
_, _ = syncer.Sync(ctx)

members, _ := tagsA.Members(ctx)
fmt.Println("Tags:", members) // Tags: [go crdt distributed]
// "crdt" survives because Node A's add created a tag that Node B never observed.

Concurrent Update Example

Initial state: Set = {"go", "api"}

Node A: Remove("api")
  -> Marks all existing tags for "api" as removed

Node B: Add("api")
  -> Creates a new tag for "api" with a fresh HLC

After sync:
  "api" has Node B's new tag, which is NOT in the removed set
  -> Set = {"go", "api"}  (add wins)

Best For

  • Tags, labels, categories
  • Member lists, collaborator lists, access control groups
  • Any collection where concurrent add/remove must converge safely

Map

A distributed CRDT Map where each field is an independent LWW register. It stores a crdt.State with per-field FieldState entries. Each field is merged independently using LWW semantics, so concurrent updates to different fields never conflict.

Merge Algorithm

MergeMap(local, remote):
  for each field in remote.Fields:
    if field not in local.Fields OR remote.HLC > local.HLC:
      local.Fields[field] = remote.Fields[field]
  return local

Each field carries its own HLC timestamp and node ID. When merging, each field is compared independently. This means two nodes can update different fields concurrently with no conflict, and updates to the same field resolve via LWW.

API

MethodSignatureDescription
NewMapNewMap(store *kv.Store, key string, opts ...Option) *MapCreates a new CRDT Map
Set(m *Map) Set(ctx context.Context, field string, value any) errorSets a field to a value using LWW semantics
Get(m *Map) Get(ctx context.Context, field string, dest any) errorReads a field value and decodes it into dest. Returns kv.ErrNotFound if the field does not exist.
Delete(m *Map) Delete(ctx context.Context, field string) errorRemoves a field from the map
Keys(m *Map) Keys(ctx context.Context) ([]string, error)Returns all field names in the map
All(m *Map) All(ctx context.Context) (map[string]json.RawMessage, error)Returns all fields as a map of field name to raw JSON values
Merge(m *Map) Merge(ctx context.Context, remote *crdt.State) errorMerges a remote CRDT State into the local state using per-field LWW merge
State(m *Map) State(ctx context.Context) (*crdt.State, error)Returns the raw state for sync or inspection

Example: Distributed Configuration Map

ctx := context.Background()

// Node A sets configuration values.
cfgA := kvcrdt.NewMap(storeA, "crdt:config:app", kvcrdt.WithNodeID("node-a"))
_ = cfgA.Set(ctx, "max_connections", 100)
_ = cfgA.Set(ctx, "log_level", "info")

// Node B concurrently updates different fields.
cfgB := kvcrdt.NewMap(storeB, "crdt:config:app", kvcrdt.WithNodeID("node-b"))
_ = cfgB.Set(ctx, "log_level", "debug")   // Conflicts with Node A on this field
_ = cfgB.Set(ctx, "feature_x", true)      // No conflict, different field

// After sync, per-field LWW resolves conflicts.
syncer := kvcrdt.NewSyncer(storeA, storeB)
_, _ = syncer.Sync(ctx)

var logLevel string
_ = cfgA.Get(ctx, "log_level", &logLevel)
fmt.Println("Log level:", logLevel) // Whichever node had the higher HLC wins

keys, _ := cfgA.Keys(ctx)
fmt.Println("Keys:", keys) // Keys: [max_connections log_level feature_x]

Best For

  • Application configuration that multiple services update
  • Structured documents with independently-editable fields
  • Feature flags with per-field granularity
  • Any key-value structure where different fields should merge independently

Choosing the Right Type

ScenarioRecommended TypeWhy
Page view counterCounterMultiple nodes increment independently
User display nameRegister[string]Last edit should win
Article tagsSet[string]Concurrent add/remove must converge
App configurationMapPer-field independent LWW merge
Inventory quantityCounterDistributed increment/decrement across warehouses
Session dataMapMultiple fields updated by different services
ACL member listSet[string]Add-wins prevents accidental permission loss
Feature flag valueRegister[bool]Simple scalar with LWW resolution

On this page