API Reference
Complete Go API reference for the Grove CRDT package — types, plugin, clock, merge, sync, transport, server, hooks, and options.
Complete reference for the github.com/xraph/grove/crdt package.
| Type | Description |
|---|
CRDTType | String enum: "lww", "counter", "set" |
State | Full CRDT state for a record (all fields) |
FieldState | CRDT state for a single field |
ChangeRecord | Wire format for a single field-level change |
CounterDelta | Increment/decrement payload for counter changes |
SetOperation | Add/remove payload for set changes |
SetOp | String enum: "add", "remove" |
SyncReport | Summary of a sync operation (pulled, pushed, merged, conflicts) |
| Constant | Value | Description |
|---|
TypeLWW | "lww" | Last-Writer-Wins Register |
TypeCounter | "counter" | PN-Counter |
TypeSet | "set" | Observed-Remove Set |
type State struct {
Table string `json:"table"`
PK string `json:"pk"`
Fields map[string]*FieldState `json:"fields"`
Tombstone bool `json:"tombstone"`
TombstoneHLC HLC `json:"tombstone_hlc,omitempty"`
}
| Function | Signature | Description |
|---|
NewState | (table, pk string) *State | Create an empty state |
ValidCRDTType | (t string) bool | Check if a string is a valid CRDT type |
type ChangeRecord struct {
Table string `json:"table"`
PK string `json:"pk"`
Field string `json:"field"`
CRDTType CRDTType `json:"crdt_type"`
HLC HLC `json:"hlc"`
NodeID string `json:"node_id"`
Value json.RawMessage `json:"value,omitempty"`
Tombstone bool `json:"tombstone,omitempty"`
CounterDelta *CounterDelta `json:"counter_delta,omitempty"`
SetOp *SetOperation `json:"set_op,omitempty"`
}
| Type | Description |
|---|
HLC | Hybrid Logical Clock value: {Timestamp, Counter, NodeID} |
Clock | Interface for HLC generation (Now(), Update()) |
HybridClock | Default Clock implementation |
ClockOption | Functional option for HybridClock |
| Method | Signature | Description |
|---|
IsZero | () bool | True if HLC has not been set |
Compare | (other HLC) int | Returns -1, 0, or 1 |
After | (other HLC) bool | True if strictly after other |
String | () string | Human-readable format |
| Function | Signature | Description |
|---|
NewHybridClock | (nodeID string, opts ...ClockOption) *HybridClock | Create a new HLC |
Now | () HLC | Generate next causally-ordered HLC |
Update | (remote HLC) | Merge remote HLC into local state |
| Option | Signature | Description |
|---|
WithMaxDrift | (d time.Duration) ClockOption | Max tolerable clock drift (default: 5s) |
WithNowFunc | (fn func() time.Time) ClockOption | Override wall clock (testing) |
| Type/Function | Signature | Description |
|---|
LWWRegister | struct | Value + Clock + NodeID |
MergeLWW | (local, remote *LWWRegister) *LWWRegister | Higher HLC wins |
| Type/Function | Signature | Description |
|---|
PNCounterState | struct | Increments/Decrements map[string]int64 |
NewPNCounterState | () *PNCounterState | Create empty state |
MergeCounter | (local, remote *PNCounterState) *PNCounterState | Per-node max |
Value | (s *PNCounterState) int64 | sum(inc) - sum(dec) |
CounterFromFieldState | (fs *FieldState) *PNCounterState | Extract from FieldState |
| Type/Function | Signature | Description |
|---|
ORSetState | struct | Entries map[string][]Tag, Removed map[string]bool |
Tag | struct | {NodeID, HLC} |
NewORSetState | () *ORSetState | Create empty state |
MergeSet | (local, remote *ORSetState) *ORSetState | Add-wins union |
Elements | (s *ORSetState) []json.RawMessage | Active elements |
SetFromFieldState | (fs *FieldState) *ORSetState | Extract from FieldState |
| Function | Signature | Description |
|---|
NewMergeEngine | () *MergeEngine | Create a new merge engine |
MergeField | (local, remote *FieldState) (*FieldState, error) | Dispatch by CRDT type |
MergeState | (local, remote *State) (*State, error) | Merge all fields of a record |
| Function | Signature | Description |
|---|
New | (opts ...Option) *Plugin | Create the CRDT plugin |
Name | () string | Returns "crdt" |
Init | (ctx context.Context, db any) error | Initialize the plugin |
Clock | () Clock | Get the plugin's clock |
NodeID | () string | Get the node identifier |
MergeEngine | () *MergeEngine | Get the merge engine |
SyncHooks | () *SyncHookChain | Get the sync hook chain |
SetExecutor | (exec Executor) | Set the database executor |
MetadataStore | () *MetadataStore | Get the metadata store |
Inspect | (ctx, table, pk) (*State, error) | Debug: read full CRDT state |
CleanupTombstones | (ctx, table) error | Remove expired tombstones |
EnsureShadowTable | (ctx, table) error | Create shadow table DDL |
| Option | Signature | Description |
|---|
WithNodeID | (id string) Option | Set node identifier (required) |
WithClock | (c Clock) Option | Override clock implementation |
WithTombstoneTTL | (d time.Duration) Option | Tombstone expiry (default: 7 days) |
WithMaxClockDrift | (d time.Duration) Option | Clock drift limit (default: 5s) |
WithTables | (tables ...string) Option | Restrict to specific tables |
WithSyncHook | (hook SyncHook) Option | Add a sync hook |
| Function | Signature | Description |
|---|
NewSyncer | (plugin *Plugin, opts ...SyncerOption) *Syncer | Create a syncer |
Sync | (ctx) (*SyncReport, error) | One sync round with all peers |
Run | (ctx) error | Background sync loop (blocks) |
PushChange | (ctx, table, pk, field, crdtType, value, clock) error | Push single change |
StreamSync | (ctx) error | SSE stream sync (blocks) |
| Option | Signature | Description |
|---|
WithTransport | (t Transport) SyncerOption | Set primary transport |
WithPeers | (peers ...Transport) SyncerOption | Add multiple peer transports |
WithSyncInterval | (d time.Duration) SyncerOption | Sync interval (default: 30s) |
WithSyncTables | (tables ...string) SyncerOption | Tables to sync |
WithGossipInterval | (d time.Duration) SyncerOption | P2P gossip interval |
type Transport interface {
Pull(ctx context.Context, req *PullRequest) (*PullResponse, error)
Push(ctx context.Context, req *PushRequest) (*PushResponse, error)
}
| Function | Signature | Description |
|---|
HTTPTransport | (baseURL string) *HTTPClient | Create HTTP transport |
HTTPTransportWithClient | (baseURL string, client *http.Client) *HTTPClient | With custom HTTP client |
Pull | (ctx, req) (*PullResponse, error) | POST /pull |
Push | (ctx, req) (*PushResponse, error) | POST /push |
| Function | Signature | Description |
|---|
NewStreamingTransport | (baseURL string, opts ...StreamingOption) *StreamingTransport | Create streaming transport |
StreamChanges | (ctx, since HLC, handler func(ChangeRecord)) error | SSE stream (blocks) |
| Option | Signature | Description |
|---|
WithStreamTables | (tables ...string) StreamingOption | Table filter for SSE |
WithStreamReconnect | (d time.Duration) StreamingOption | Reconnect delay (default: 5s) |
WithStreamLogger | (l *slog.Logger) StreamingOption | Logger for stream events |
| Type | Description |
|---|
PullRequest | {Tables, Since, NodeID} |
PullResponse | {Changes, LatestHLC} |
PushRequest | {Changes, NodeID} |
PushResponse | {Merged, LatestHLC} |
| Function | Signature | Description |
|---|
NewSyncController | (plugin, opts...) *SyncController | Create controller |
HandlePull | (ctx, *PullRequest) (*PullResponse, error) | Core pull logic |
HandlePush | (ctx, *PushRequest) (*PushResponse, error) | Core push logic |
StreamChangesSince | (ctx, tables, since) (<-chan []ChangeRecord, error) | Change channel |
HandlePresenceUpdate | (ctx, *PresenceUpdate) (*PresenceEvent, error) | Process a presence update |
HandleGetPresence | (ctx, topic string) (*PresenceSnapshot, error) | Get presence snapshot for a topic |
Presence | () *PresenceManager | Access the presence manager (nil if disabled) |
PresenceChannel | () <-chan PresenceEvent | Broadcast channel for stream integration |
Close | () | Clean up controller resources |
| Option | Signature | Description |
|---|
WithStreamPollInterval | (d time.Duration) SyncControllerOption | Poll interval (default: 1s) |
WithStreamKeepAlive | (d time.Duration) SyncControllerOption | Keep-alive interval (default: 15s) |
WithControllerSyncHook | (hook SyncHook) SyncControllerOption | Additional controller hook |
WithPresenceEnabled | (bool) SyncControllerOption | Enable presence subsystem (default: false) |
WithPresenceTTL | (d time.Duration) SyncControllerOption | Presence entry TTL (default: 30s) |
| Function | Signature | Description |
|---|
NewHTTPHandler | (plugin, opts...) http.Handler | Standard http.Handler for sync |
Endpoints: POST /pull, POST /push, and when presence is enabled: POST /presence, GET /presence
type SyncHook interface {
BeforeInboundChange(ctx, *ChangeRecord) (*ChangeRecord, error)
AfterInboundChange(ctx, *ChangeRecord) error
BeforeOutboundChange(ctx, *ChangeRecord) (*ChangeRecord, error)
BeforeOutboundRead(ctx, []ChangeRecord) ([]ChangeRecord, error)
}
| Type | Description |
|---|
SyncHook | Interface for intercepting sync data |
BaseSyncHook | No-op default (embed and override) |
SyncHookChain | Composes multiple hooks sequentially |
| Function | Signature | Description |
|---|
NewSyncHookChain | (hooks ...SyncHook) *SyncHookChain | Create a chain |
Add | (hook SyncHook) | Append a hook |
Len | () int | Number of hooks |
| Type | Description |
|---|
InspectResult | Human-readable record state |
InspectField | Human-readable field state |
| Function | Signature | Description |
|---|
InspectState | (state *State) *InspectResult | Convert to human-readable form |
String | () string | Formatted output |
Ephemeral awareness system for tracking connected clients. See Presence & Awareness for the full guide.
| Type | Description |
|---|
PresenceState | A client's presence data: {NodeID, Topic, Data, UpdatedAt, ExpiresAt} |
PresenceUpdate | Request body for updates: {NodeID, Topic, Data} |
PresenceEvent | SSE broadcast event: {Type, NodeID, Topic, Data} |
PresenceSnapshot | GET response: {Topic, States} |
| Constant | Value | Description |
|---|
PresenceJoin | "join" | New node joined a topic |
PresenceUpdateEvt | "update" | Existing node updated its data |
PresenceLeave | "leave" | Node left or entry expired |
| Function | Signature | Description |
|---|
NewPresenceManager | (ttl time.Duration, onChange func(PresenceEvent), logger *slog.Logger) *PresenceManager | Create with TTL, callback, and logger; starts cleanup goroutine |
Update | (update PresenceUpdate) PresenceEvent | Upsert presence, returns join/update event |
Remove | (topic, nodeID string) *PresenceEvent | Explicitly remove an entry |
RemoveNode | (nodeID string) []PresenceEvent | Remove all entries for a node (on disconnect) |
Get | (topic string) []PresenceState | Get all active presence for a topic |
GetTopicsForNode | (nodeID string) []string | Get all topics a node is present in |
Close | () | Stop the cleanup goroutine |
MarshalPresenceEvent | (event PresenceEvent) ([]byte, error) | Serialize event to JSON for SSE |
| Function | Signature | Description |
|---|
ShadowTableDDL | (table string) string | DDL for shadow table creation |
ShadowTableSyncIndex | (table string) string | DDL for sync index |
Migrations | grove.MigrationGroup | Pre-built migration group |