CRDT Types
Available CRDT data types in Grove KV — Counter, Register, Set, and Map — with API reference, merge semantics, and usage examples.
The kvcrdt package provides four CRDT types that persist their state in a KV store. Each type wraps a core grove/crdt state struct and uses the adapter pattern described in the overview. All types share a common set of options and follow the same read-modify-write cycle.
Creating Types
All CRDT types are created with a KV store reference, a key, and optional configuration:
import (
"github.com/xraph/grove/kv"
kvcrdt "github.com/xraph/grove/kv/crdt"
)
store := kv.NewMemoryStore()
counter := kvcrdt.NewCounter(store, "page:views", kvcrdt.WithNodeID("node-1"))
register := kvcrdt.NewRegister[string](store, "user:name", kvcrdt.WithNodeID("node-1"))
set := kvcrdt.NewSet[string](store, "tags:article:1", kvcrdt.WithNodeID("node-1"))
m := kvcrdt.NewMap(store, "config:app", kvcrdt.WithNodeID("node-1"))Shared Options
| Option | Signature | Description |
|---|---|---|
WithNodeID | WithNodeID(id string) Option | Sets the node identifier. Each node must have a unique ID. Defaults to "default". |
WithClock | WithClock(clock crdt.Clock) Option | Sets a custom HLC clock. Defaults to crdt.NewHybridClock(nodeID). |
WithCodec | WithCodec(cc codec.Codec) Option | Sets the codec for serializing CRDT state. Defaults to JSON. |
Counter
A distributed positive-negative counter backed by crdt.PNCounterState. Each node tracks its own increments and decrements independently. The counter value is the sum of all increments minus the sum of all decrements across all nodes.
Merge Algorithm
MergeCounter(local, remote):
for each nodeID in union(local.nodes, remote.nodes):
merged.Increments[nodeID] = max(local.Increments[nodeID], remote.Increments[nodeID])
merged.Decrements[nodeID] = max(local.Decrements[nodeID], remote.Decrements[nodeID])
return merged
Value(state):
return sum(Increments) - sum(Decrements)Taking the per-node max ensures that merge is idempotent and order-independent. Two nodes can independently increment and the merged counter will reflect both contributions.
API
| Method | Signature | Description |
|---|---|---|
NewCounter | NewCounter(store *kv.Store, key string, opts ...Option) *Counter | Creates a new CRDT counter |
Increment | (c *Counter) Increment(ctx context.Context, delta int64) error | Adds delta to this node's increment total |
Decrement | (c *Counter) Decrement(ctx context.Context, delta int64) error | Adds delta to this node's decrement total |
Value | (c *Counter) Value(ctx context.Context) (int64, error) | Returns the current counter value (all increments minus all decrements) |
Merge | (c *Counter) Merge(ctx context.Context, remote *crdt.PNCounterState) error | Merges a remote counter state into the local state |
State | (c *Counter) State(ctx context.Context) (*crdt.PNCounterState, error) | Returns the raw state for sync or inspection |
Example: Distributed Page View Counter
ctx := context.Background()
// Node US-East tracks page views.
counterEast := kvcrdt.NewCounter(storeEast, "crdt:page:home:views",
kvcrdt.WithNodeID("us-east-1"),
)
_ = counterEast.Increment(ctx, 100)
// Node EU-West tracks page views independently.
counterWest := kvcrdt.NewCounter(storeWest, "crdt:page:home:views",
kvcrdt.WithNodeID("eu-west-1"),
)
_ = counterWest.Increment(ctx, 250)
// After sync, both stores contain the merged state.
// Total value: 100 + 250 = 350
syncer := kvcrdt.NewSyncer(storeEast, storeWest)
_, _ = syncer.Sync(ctx)
val, _ := counterEast.Value(ctx)
fmt.Println("Total page views:", val) // Total page views: 350Concurrent Update Example
Node A: Increment(+5) -> Increments{"A": 5}
Node B: Increment(+3) -> Increments{"B": 3}
Node A: Decrement(2) -> Decrements{"A": 2}
After sync both nodes:
Increments{"A": 5, "B": 3}, Decrements{"A": 2}
Value = (5 + 3) - 2 = 6Register[T]
A distributed Last-Writer-Wins register backed by crdt.LWWRegister. The value with the highest HLC timestamp wins. The type parameter T determines the value type (e.g., string, int, a struct).
Merge Algorithm
MergeLWW(local, remote):
if remote.HLC > local.HLC:
return remote
if local.HLC > remote.HLC:
return local
// Tie: compare NodeIDs lexicographically
if remote.NodeID > local.NodeID:
return remote
return localHLC comparison uses timestamp first, then logical counter, then node ID. This guarantees a total ordering with no ambiguity, even when wall-clock times are identical.
API
| Method | Signature | Description |
|---|---|---|
NewRegister | NewRegister[T any](store *kv.Store, key string, opts ...Option) *Register[T] | Creates a new LWW register |
Set | (r *Register[T]) Set(ctx context.Context, value T) error | Writes a new value timestamped with the local HLC |
Get | (r *Register[T]) Get(ctx context.Context) (T, error) | Reads the current value. Returns kv.ErrNotFound if unset. |
Merge | (r *Register[T]) Merge(ctx context.Context, remote *crdt.LWWRegister) error | Merges a remote register, keeping the winner per LWW semantics |
State | (r *Register[T]) State(ctx context.Context) (*crdt.LWWRegister, error) | Returns the raw state for sync or inspection |
Example: User Profile Field
ctx := context.Background()
// Node A sets the user's display name.
regA := kvcrdt.NewRegister[string](storeA, "crdt:user:42:name",
kvcrdt.WithNodeID("node-a"),
)
_ = regA.Set(ctx, "Alice")
// Node B concurrently sets a different name.
regB := kvcrdt.NewRegister[string](storeB, "crdt:user:42:name",
kvcrdt.WithNodeID("node-b"),
)
_ = regB.Set(ctx, "Alice Smith")
// After sync, the write with the higher HLC wins on both nodes.
syncer := kvcrdt.NewSyncer(storeA, storeB)
_, _ = syncer.Sync(ctx)
name, _ := regA.Get(ctx)
fmt.Println("Name:", name) // Name: Alice Smith (higher HLC wins)Best For
- Scalar values: strings, booleans, numbers
- Fields where "last write" is the desired conflict resolution
- Configuration values, status fields, user profile attributes
Set[T]
A distributed Observed-Remove Set (OR-Set) backed by crdt.ORSetState. When one node adds an element and another concurrently removes it, the add wins (add-wins semantics). Removals only affect elements that were observed at the time of removal.
Merge Algorithm
MergeSet(local, remote):
for each element in union(local.Entries, remote.Entries):
merged.Entries[element] = union(local.tags, remote.tags)
merged.Removed = union(local.Removed, remote.Removed)
return merged
Elements(state):
result = []
for each element in state.Entries:
if any tag for this element is NOT in state.Removed:
result.add(element)
return resultEach Add operation creates a unique tag (nodeID:HLC). Remove marks all current tags for that element as removed. If a concurrent Add creates a new tag after the remove was issued, the element survives because the new tag is not in the removed set.
API
| Method | Signature | Description |
|---|---|---|
NewSet | NewSet[T any](store *kv.Store, key string, opts ...Option) *Set[T] | Creates a new OR-Set |
Add | (s *Set[T]) Add(ctx context.Context, element T) error | Inserts an element into the set |
Remove | (s *Set[T]) Remove(ctx context.Context, element T) error | Removes an element by marking all its current tags as removed |
Members | (s *Set[T]) Members(ctx context.Context) ([]T, error) | Returns all elements currently in the set |
Contains | (s *Set[T]) Contains(ctx context.Context, element T) (bool, error) | Returns true if the element is in the set |
Merge | (s *Set[T]) Merge(ctx context.Context, remote *crdt.ORSetState) error | Merges a remote OR-Set state into the local state |
State | (s *Set[T]) State(ctx context.Context) (*crdt.ORSetState, error) | Returns the raw state for sync or inspection |
Example: Distributed Tag Set
ctx := context.Background()
// Node A adds tags to an article.
tagsA := kvcrdt.NewSet[string](storeA, "crdt:tags:article:1",
kvcrdt.WithNodeID("node-a"),
)
_ = tagsA.Add(ctx, "go")
_ = tagsA.Add(ctx, "crdt")
// Node B concurrently adds and removes tags.
tagsB := kvcrdt.NewSet[string](storeB, "crdt:tags:article:1",
kvcrdt.WithNodeID("node-b"),
)
_ = tagsB.Add(ctx, "distributed")
_ = tagsB.Remove(ctx, "crdt") // Only removes if "crdt" was observed by Node B
// After sync, both nodes converge.
syncer := kvcrdt.NewSyncer(storeA, storeB)
_, _ = syncer.Sync(ctx)
members, _ := tagsA.Members(ctx)
fmt.Println("Tags:", members) // Tags: [go crdt distributed]
// "crdt" survives because Node A's add created a tag that Node B never observed.Concurrent Update Example
Initial state: Set = {"go", "api"}
Node A: Remove("api")
-> Marks all existing tags for "api" as removed
Node B: Add("api")
-> Creates a new tag for "api" with a fresh HLC
After sync:
"api" has Node B's new tag, which is NOT in the removed set
-> Set = {"go", "api"} (add wins)Best For
- Tags, labels, categories
- Member lists, collaborator lists, access control groups
- Any collection where concurrent add/remove must converge safely
Map
A distributed CRDT Map where each field is an independent LWW register. It stores a crdt.State with per-field FieldState entries. Each field is merged independently using LWW semantics, so concurrent updates to different fields never conflict.
Merge Algorithm
MergeMap(local, remote):
for each field in remote.Fields:
if field not in local.Fields OR remote.HLC > local.HLC:
local.Fields[field] = remote.Fields[field]
return localEach field carries its own HLC timestamp and node ID. When merging, each field is compared independently. This means two nodes can update different fields concurrently with no conflict, and updates to the same field resolve via LWW.
API
| Method | Signature | Description |
|---|---|---|
NewMap | NewMap(store *kv.Store, key string, opts ...Option) *Map | Creates a new CRDT Map |
Set | (m *Map) Set(ctx context.Context, field string, value any) error | Sets a field to a value using LWW semantics |
Get | (m *Map) Get(ctx context.Context, field string, dest any) error | Reads a field value and decodes it into dest. Returns kv.ErrNotFound if the field does not exist. |
Delete | (m *Map) Delete(ctx context.Context, field string) error | Removes a field from the map |
Keys | (m *Map) Keys(ctx context.Context) ([]string, error) | Returns all field names in the map |
All | (m *Map) All(ctx context.Context) (map[string]json.RawMessage, error) | Returns all fields as a map of field name to raw JSON values |
Merge | (m *Map) Merge(ctx context.Context, remote *crdt.State) error | Merges a remote CRDT State into the local state using per-field LWW merge |
State | (m *Map) State(ctx context.Context) (*crdt.State, error) | Returns the raw state for sync or inspection |
Example: Distributed Configuration Map
ctx := context.Background()
// Node A sets configuration values.
cfgA := kvcrdt.NewMap(storeA, "crdt:config:app", kvcrdt.WithNodeID("node-a"))
_ = cfgA.Set(ctx, "max_connections", 100)
_ = cfgA.Set(ctx, "log_level", "info")
// Node B concurrently updates different fields.
cfgB := kvcrdt.NewMap(storeB, "crdt:config:app", kvcrdt.WithNodeID("node-b"))
_ = cfgB.Set(ctx, "log_level", "debug") // Conflicts with Node A on this field
_ = cfgB.Set(ctx, "feature_x", true) // No conflict, different field
// After sync, per-field LWW resolves conflicts.
syncer := kvcrdt.NewSyncer(storeA, storeB)
_, _ = syncer.Sync(ctx)
var logLevel string
_ = cfgA.Get(ctx, "log_level", &logLevel)
fmt.Println("Log level:", logLevel) // Whichever node had the higher HLC wins
keys, _ := cfgA.Keys(ctx)
fmt.Println("Keys:", keys) // Keys: [max_connections log_level feature_x]Best For
- Application configuration that multiple services update
- Structured documents with independently-editable fields
- Feature flags with per-field granularity
- Any key-value structure where different fields should merge independently
Choosing the Right Type
| Scenario | Recommended Type | Why |
|---|---|---|
| Page view counter | Counter | Multiple nodes increment independently |
| User display name | Register[string] | Last edit should win |
| Article tags | Set[string] | Concurrent add/remove must converge |
| App configuration | Map | Per-field independent LWW merge |
| Inventory quantity | Counter | Distributed increment/decrement across warehouses |
| Session data | Map | Multiple fields updated by different services |
| ACL member list | Set[string] | Add-wins prevents accidental permission loss |
| Feature flag value | Register[bool] | Simple scalar with LWW resolution |