CRDT: Offline-First App
Build an offline-first Go application with Grove CRDTs — local operations, queue-and-sync, error handling, and tombstone cleanup.
This guide walks through building an offline-first application with Grove CRDTs. The app makes local changes while offline and syncs them when connectivity returns.
Overview
Offline-first with CRDTs works because:
- Each node has a complete local copy of the data
- Mutations are recorded with HLC timestamps in shadow tables
- When online, the syncer pulls remote changes and pushes local ones
- The merge engine resolves all conflicts automatically
Define Models
package models
import "github.com/xraph/grove"
type Task struct {
grove.BaseModel `grove:"table:tasks,alias:t"`
ID string `grove:"id,pk"`
Title string `grove:"title,crdt:lww"`
Description string `grove:"description,crdt:lww"`
Priority int64 `grove:"priority,crdt:counter"`
Tags []string `grove:"tags,type:jsonb,crdt:set"`
Completed bool `grove:"completed,crdt:lww"`
}Set Up the CRDT Plugin
package main
import (
"context"
"log"
"time"
"github.com/xraph/grove"
"github.com/xraph/grove/crdt"
"github.com/xraph/grove/hook"
"github.com/xraph/grove/drivers/sqlitedriver"
)
func main() {
ctx := context.Background()
// Use SQLite for local storage (works offline).
driver := sqlitedriver.New()
if err := driver.Open(ctx, "file:local.db"); err != nil {
log.Fatal(err)
}
db, err := grove.Open(driver)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Create the CRDT plugin.
plugin := crdt.New(
crdt.WithNodeID("edge-device-42"),
crdt.WithTables("tasks"),
crdt.WithTombstoneTTL(14 * 24 * time.Hour), // Keep tombstones 14 days
)
// Register as a Grove hook.
db.Hooks().AddHook(plugin, hook.Scope{
Tables: []string{"tasks"},
})
// Ensure shadow table exists.
if err := plugin.EnsureShadowTable(ctx, "tasks"); err != nil {
log.Fatal(err)
}
// Start the app.
app := NewApp(db, plugin)
app.Run(ctx)
}Local Operations (Offline)
All mutations work locally, even without network access. The CRDT plugin records changes in the shadow table via hooks:
type App struct {
db *grove.DB
plugin *crdt.Plugin
syncer *crdt.Syncer
}
func NewApp(db *grove.DB, plugin *crdt.Plugin) *App {
return &App{db: db, plugin: plugin}
}
func (a *App) CreateTask(ctx context.Context, task *Task) error {
// This works offline. The CRDT plugin records the change
// in the shadow table via PostInsert hook.
_, err := a.db.Insert(ctx, task)
return err
}
func (a *App) UpdateTitle(ctx context.Context, id, title string) error {
// The PostUpdate hook records this as an LWW change.
_, err := a.db.Update(ctx, &Task{ID: id, Title: title})
return err
}
func (a *App) IncrementPriority(ctx context.Context, id string) error {
// Counter operations are tracked per-node.
_, err := a.db.Update(ctx, &Task{ID: id, Priority: 1})
return err
}
func (a *App) AddTag(ctx context.Context, id string, tag string) error {
task := &Task{ID: id}
// Read current tags, add the new one.
if err := a.db.Select(ctx, task); err != nil {
return err
}
task.Tags = append(task.Tags, tag)
_, err := a.db.Update(ctx, task)
return err
}
func (a *App) DeleteTask(ctx context.Context, id string) error {
// Deletion creates a tombstone that propagates during sync.
_, err := a.db.Delete(ctx, &Task{ID: id})
return err
}Sync When Online
Set up the syncer to connect when the network is available:
func (a *App) SetupSync(cloudURL string) {
transport := crdt.NewStreamingTransport(
cloudURL,
crdt.WithStreamTables("tasks"),
crdt.WithStreamReconnect(10 * time.Second),
)
a.syncer = crdt.NewSyncer(a.plugin,
crdt.WithTransport(transport),
crdt.WithSyncTables("tasks"),
crdt.WithSyncInterval(60 * time.Second), // Poll every minute
)
}
func (a *App) Run(ctx context.Context) {
// Set up sync with cloud server.
a.SetupSync("https://cloud.example.com/sync")
// Run periodic sync in the background.
go a.syncer.Run(ctx)
// Also run SSE streaming for real-time updates.
go a.syncer.StreamSync(ctx)
// Your app logic here...
select {
case <-ctx.Done():
log.Println("Shutting down")
}
}Manual Sync with Error Handling
For more control, trigger sync manually with retry logic:
func (a *App) SyncWithRetry(ctx context.Context, maxRetries int) error {
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
report, err := a.syncer.Sync(ctx)
if err == nil {
log.Printf("Sync complete: %s", report)
return nil
}
lastErr = err
log.Printf("Sync attempt %d/%d failed: %v", attempt+1, maxRetries, err)
// Exponential backoff.
backoff := time.Duration(1<<uint(attempt)) * time.Second
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
continue
}
}
return fmt.Errorf("sync failed after %d retries: %w", maxRetries, lastErr)
}Inspecting CRDT State
Debug the local CRDT state for any record:
func (a *App) DebugTask(ctx context.Context, id string) {
state, err := a.plugin.Inspect(ctx, "tasks", id)
if err != nil {
log.Printf("Inspect error: %v", err)
return
}
result := crdt.InspectState(state)
fmt.Println(result)
// CRDT State for tasks[task-1]
// title (lww): "Buy groceries" (node=edge-device-42, hlc=HLC{...})
// priority (counter): 3 [edge-device-42: +3 -0]
// tags (set): ["shopping", "urgent"]
// completed (lww): false (node=edge-device-42, hlc=HLC{...})
}Tombstone Cleanup
Tombstones mark deleted records for sync propagation. Clean them up periodically:
func (a *App) CleanupLoop(ctx context.Context) {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := a.plugin.CleanupTombstones(ctx, "tasks"); err != nil {
log.Printf("Tombstone cleanup error: %v", err)
}
}
}
}Conflict Resolution in Practice
CRDTs resolve conflicts automatically, but it helps to understand the behavior:
// Scenario: Two offline nodes edit the same task title.
//
// Node A (offline, ts=100): title = "Buy groceries at Costco"
// Node B (offline, ts=105): title = "Buy groceries at Trader Joe's"
//
// After both come online and sync:
// Result: title = "Buy groceries at Trader Joe's" (Node B wins, higher HLC)
// Scenario: Two offline nodes add tags concurrently.
//
// Node A (offline): adds "urgent"
// Node B (offline): adds "shopping"
//
// After sync: tags = ["urgent", "shopping"] (both preserved, set union)
// Scenario: One node adds a tag, another removes it concurrently.
//
// Node A (offline): removes "urgent"
// Node B (offline): adds "urgent" (new add operation)
//
// After sync: tags still contain "urgent" (add-wins semantics)Complete Example
Put it all together:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set up local DB + CRDT plugin (see above).
db, plugin := setupDB(ctx)
defer db.Close()
app := NewApp(db, plugin)
app.SetupSync("https://cloud.example.com/sync")
// Background sync.
go app.syncer.Run(ctx)
go app.syncer.StreamSync(ctx)
go app.CleanupLoop(ctx)
// Create tasks offline.
app.CreateTask(ctx, &Task{
ID: "task-1",
Title: "Buy groceries",
Tags: []string{"shopping"},
})
// Increment priority.
app.IncrementPriority(ctx, "task-1")
app.IncrementPriority(ctx, "task-1")
// Manual sync when ready.
if err := app.SyncWithRetry(ctx, 3); err != nil {
log.Printf("Sync failed: %v", err)
}
// Debug state.
app.DebugTask(ctx, "task-1")
}