Grove

API Reference

Complete Go API reference for the Grove CRDT package — types, plugin, clock, merge, sync, transport, server, hooks, and options.

Complete reference for the github.com/xraph/grove/crdt package.

Core Types

TypeDescription
CRDTTypeString enum: "lww", "counter", "set"
StateFull CRDT state for a record (all fields)
FieldStateCRDT state for a single field
ChangeRecordWire format for a single field-level change
CounterDeltaIncrement/decrement payload for counter changes
SetOperationAdd/remove payload for set changes
SetOpString enum: "add", "remove"
SyncReportSummary of a sync operation (pulled, pushed, merged, conflicts)

CRDTType Constants

ConstantValueDescription
TypeLWW"lww"Last-Writer-Wins Register
TypeCounter"counter"PN-Counter
TypeSet"set"Observed-Remove Set

State

type State struct {
    Table        string                   `json:"table"`
    PK           string                   `json:"pk"`
    Fields       map[string]*FieldState   `json:"fields"`
    Tombstone    bool                     `json:"tombstone"`
    TombstoneHLC HLC                      `json:"tombstone_hlc,omitempty"`
}
FunctionSignatureDescription
NewState(table, pk string) *StateCreate an empty state
ValidCRDTType(t string) boolCheck if a string is a valid CRDT type

ChangeRecord

type ChangeRecord struct {
    Table        string          `json:"table"`
    PK           string          `json:"pk"`
    Field        string          `json:"field"`
    CRDTType     CRDTType        `json:"crdt_type"`
    HLC          HLC             `json:"hlc"`
    NodeID       string          `json:"node_id"`
    Value        json.RawMessage `json:"value,omitempty"`
    Tombstone    bool            `json:"tombstone,omitempty"`
    CounterDelta *CounterDelta   `json:"counter_delta,omitempty"`
    SetOp        *SetOperation   `json:"set_op,omitempty"`
}

HLC Clock

TypeDescription
HLCHybrid Logical Clock value: {Timestamp, Counter, NodeID}
ClockInterface for HLC generation (Now(), Update())
HybridClockDefault Clock implementation
ClockOptionFunctional option for HybridClock

HLC Methods

MethodSignatureDescription
IsZero() boolTrue if HLC has not been set
Compare(other HLC) intReturns -1, 0, or 1
After(other HLC) boolTrue if strictly after other
String() stringHuman-readable format

HybridClock

FunctionSignatureDescription
NewHybridClock(nodeID string, opts ...ClockOption) *HybridClockCreate a new HLC
Now() HLCGenerate next causally-ordered HLC
Update(remote HLC)Merge remote HLC into local state

Clock Options

OptionSignatureDescription
WithMaxDrift(d time.Duration) ClockOptionMax tolerable clock drift (default: 5s)
WithNowFunc(fn func() time.Time) ClockOptionOverride wall clock (testing)

Merge Functions

LWW-Register

Type/FunctionSignatureDescription
LWWRegisterstructValue + Clock + NodeID
MergeLWW(local, remote *LWWRegister) *LWWRegisterHigher HLC wins

PN-Counter

Type/FunctionSignatureDescription
PNCounterStatestructIncrements/Decrements map[string]int64
NewPNCounterState() *PNCounterStateCreate empty state
MergeCounter(local, remote *PNCounterState) *PNCounterStatePer-node max
Value(s *PNCounterState) int64sum(inc) - sum(dec)
CounterFromFieldState(fs *FieldState) *PNCounterStateExtract from FieldState

OR-Set

Type/FunctionSignatureDescription
ORSetStatestructEntries map[string][]Tag, Removed map[string]bool
Tagstruct{NodeID, HLC}
NewORSetState() *ORSetStateCreate empty state
MergeSet(local, remote *ORSetState) *ORSetStateAdd-wins union
Elements(s *ORSetState) []json.RawMessageActive elements
SetFromFieldState(fs *FieldState) *ORSetStateExtract from FieldState

MergeEngine

FunctionSignatureDescription
NewMergeEngine() *MergeEngineCreate a new merge engine
MergeField(local, remote *FieldState) (*FieldState, error)Dispatch by CRDT type
MergeState(local, remote *State) (*State, error)Merge all fields of a record

Plugin

FunctionSignatureDescription
New(opts ...Option) *PluginCreate the CRDT plugin
Name() stringReturns "crdt"
Init(ctx context.Context, db any) errorInitialize the plugin
Clock() ClockGet the plugin's clock
NodeID() stringGet the node identifier
MergeEngine() *MergeEngineGet the merge engine
SyncHooks() *SyncHookChainGet the sync hook chain
SetExecutor(exec Executor)Set the database executor
MetadataStore() *MetadataStoreGet the metadata store
Inspect(ctx, table, pk) (*State, error)Debug: read full CRDT state
CleanupTombstones(ctx, table) errorRemove expired tombstones
EnsureShadowTable(ctx, table) errorCreate shadow table DDL

Plugin Options

OptionSignatureDescription
WithNodeID(id string) OptionSet node identifier (required)
WithClock(c Clock) OptionOverride clock implementation
WithTombstoneTTL(d time.Duration) OptionTombstone expiry (default: 7 days)
WithMaxClockDrift(d time.Duration) OptionClock drift limit (default: 5s)
WithTables(tables ...string) OptionRestrict to specific tables
WithSyncHook(hook SyncHook) OptionAdd a sync hook

Sync

Syncer

FunctionSignatureDescription
NewSyncer(plugin *Plugin, opts ...SyncerOption) *SyncerCreate a syncer
Sync(ctx) (*SyncReport, error)One sync round with all peers
Run(ctx) errorBackground sync loop (blocks)
PushChange(ctx, table, pk, field, crdtType, value, clock) errorPush single change
StreamSync(ctx) errorSSE stream sync (blocks)

Syncer Options

OptionSignatureDescription
WithTransport(t Transport) SyncerOptionSet primary transport
WithPeers(peers ...Transport) SyncerOptionAdd multiple peer transports
WithSyncInterval(d time.Duration) SyncerOptionSync interval (default: 30s)
WithSyncTables(tables ...string) SyncerOptionTables to sync
WithGossipInterval(d time.Duration) SyncerOptionP2P gossip interval

Transport

Interface

type Transport interface {
    Pull(ctx context.Context, req *PullRequest) (*PullResponse, error)
    Push(ctx context.Context, req *PushRequest) (*PushResponse, error)
}

HTTPClient

FunctionSignatureDescription
HTTPTransport(baseURL string) *HTTPClientCreate HTTP transport
HTTPTransportWithClient(baseURL string, client *http.Client) *HTTPClientWith custom HTTP client
Pull(ctx, req) (*PullResponse, error)POST /pull
Push(ctx, req) (*PushResponse, error)POST /push

StreamingTransport

FunctionSignatureDescription
NewStreamingTransport(baseURL string, opts ...StreamingOption) *StreamingTransportCreate streaming transport
StreamChanges(ctx, since HLC, handler func(ChangeRecord)) errorSSE stream (blocks)

Streaming Options

OptionSignatureDescription
WithStreamTables(tables ...string) StreamingOptionTable filter for SSE
WithStreamReconnect(d time.Duration) StreamingOptionReconnect delay (default: 5s)
WithStreamLogger(l *slog.Logger) StreamingOptionLogger for stream events

Wire Types

TypeDescription
PullRequest{Tables, Since, NodeID}
PullResponse{Changes, LatestHLC}
PushRequest{Changes, NodeID}
PushResponse{Merged, LatestHLC}

Server

SyncController

FunctionSignatureDescription
NewSyncController(plugin, opts...) *SyncControllerCreate controller
HandlePull(ctx, *PullRequest) (*PullResponse, error)Core pull logic
HandlePush(ctx, *PushRequest) (*PushResponse, error)Core push logic
StreamChangesSince(ctx, tables, since) (<-chan []ChangeRecord, error)Change channel
HandlePresenceUpdate(ctx, *PresenceUpdate) (*PresenceEvent, error)Process a presence update
HandleGetPresence(ctx, topic string) (*PresenceSnapshot, error)Get presence snapshot for a topic
Presence() *PresenceManagerAccess the presence manager (nil if disabled)
PresenceChannel() <-chan PresenceEventBroadcast channel for stream integration
Close()Clean up controller resources

Controller Options

OptionSignatureDescription
WithStreamPollInterval(d time.Duration) SyncControllerOptionPoll interval (default: 1s)
WithStreamKeepAlive(d time.Duration) SyncControllerOptionKeep-alive interval (default: 15s)
WithControllerSyncHook(hook SyncHook) SyncControllerOptionAdditional controller hook
WithPresenceEnabled(bool) SyncControllerOptionEnable presence subsystem (default: false)
WithPresenceTTL(d time.Duration) SyncControllerOptionPresence entry TTL (default: 30s)

Standalone Handler

FunctionSignatureDescription
NewHTTPHandler(plugin, opts...) http.HandlerStandard http.Handler for sync

Endpoints: POST /pull, POST /push, and when presence is enabled: POST /presence, GET /presence

Sync Hooks

Interface

type SyncHook interface {
    BeforeInboundChange(ctx, *ChangeRecord) (*ChangeRecord, error)
    AfterInboundChange(ctx, *ChangeRecord) error
    BeforeOutboundChange(ctx, *ChangeRecord) (*ChangeRecord, error)
    BeforeOutboundRead(ctx, []ChangeRecord) ([]ChangeRecord, error)
}
TypeDescription
SyncHookInterface for intercepting sync data
BaseSyncHookNo-op default (embed and override)
SyncHookChainComposes multiple hooks sequentially

SyncHookChain

FunctionSignatureDescription
NewSyncHookChain(hooks ...SyncHook) *SyncHookChainCreate a chain
Add(hook SyncHook)Append a hook
Len() intNumber of hooks

Inspect / Debug

TypeDescription
InspectResultHuman-readable record state
InspectFieldHuman-readable field state
FunctionSignatureDescription
InspectState(state *State) *InspectResultConvert to human-readable form
String() stringFormatted output

Presence

Ephemeral awareness system for tracking connected clients. See Presence & Awareness for the full guide.

Presence Types

TypeDescription
PresenceStateA client's presence data: {NodeID, Topic, Data, UpdatedAt, ExpiresAt}
PresenceUpdateRequest body for updates: {NodeID, Topic, Data}
PresenceEventSSE broadcast event: {Type, NodeID, Topic, Data}
PresenceSnapshotGET response: {Topic, States}

Event Type Constants

ConstantValueDescription
PresenceJoin"join"New node joined a topic
PresenceUpdateEvt"update"Existing node updated its data
PresenceLeave"leave"Node left or entry expired

PresenceManager

FunctionSignatureDescription
NewPresenceManager(ttl time.Duration, onChange func(PresenceEvent), logger *slog.Logger) *PresenceManagerCreate with TTL, callback, and logger; starts cleanup goroutine
Update(update PresenceUpdate) PresenceEventUpsert presence, returns join/update event
Remove(topic, nodeID string) *PresenceEventExplicitly remove an entry
RemoveNode(nodeID string) []PresenceEventRemove all entries for a node (on disconnect)
Get(topic string) []PresenceStateGet all active presence for a topic
GetTopicsForNode(nodeID string) []stringGet all topics a node is present in
Close()Stop the cleanup goroutine
MarshalPresenceEvent(event PresenceEvent) ([]byte, error)Serialize event to JSON for SSE

Migrations

FunctionSignatureDescription
ShadowTableDDL(table string) stringDDL for shadow table creation
ShadowTableSyncIndex(table string) stringDDL for sync index
Migrationsgrove.MigrationGroupPre-built migration group

On this page