Grove

CRDT: Offline-First App

Build an offline-first Go application with Grove CRDTs — local operations, queue-and-sync, error handling, and tombstone cleanup.

This guide walks through building an offline-first application with Grove CRDTs. The app makes local changes while offline and syncs them when connectivity returns.

Overview

Offline-first with CRDTs works because:

  1. Each node has a complete local copy of the data
  2. Mutations are recorded with HLC timestamps in shadow tables
  3. When online, the syncer pulls remote changes and pushes local ones
  4. The merge engine resolves all conflicts automatically

Define Models

package models

import "github.com/xraph/grove"

type Task struct {
    grove.BaseModel `grove:"table:tasks,alias:t"`
    ID          string   `grove:"id,pk"`
    Title       string   `grove:"title,crdt:lww"`
    Description string   `grove:"description,crdt:lww"`
    Priority    int64    `grove:"priority,crdt:counter"`
    Tags        []string `grove:"tags,type:jsonb,crdt:set"`
    Completed   bool     `grove:"completed,crdt:lww"`
}

Set Up the CRDT Plugin

package main

import (
    "context"
    "log"
    "time"

    "github.com/xraph/grove"
    "github.com/xraph/grove/crdt"
    "github.com/xraph/grove/hook"
    "github.com/xraph/grove/drivers/sqlitedriver"
)

func main() {
    ctx := context.Background()

    // Use SQLite for local storage (works offline).
    driver := sqlitedriver.New()
    if err := driver.Open(ctx, "file:local.db"); err != nil {
        log.Fatal(err)
    }

    db, err := grove.Open(driver)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // Create the CRDT plugin.
    plugin := crdt.New(
        crdt.WithNodeID("edge-device-42"),
        crdt.WithTables("tasks"),
        crdt.WithTombstoneTTL(14 * 24 * time.Hour), // Keep tombstones 14 days
    )

    // Register as a Grove hook.
    db.Hooks().AddHook(plugin, hook.Scope{
        Tables: []string{"tasks"},
    })

    // Ensure shadow table exists.
    if err := plugin.EnsureShadowTable(ctx, "tasks"); err != nil {
        log.Fatal(err)
    }

    // Start the app.
    app := NewApp(db, plugin)
    app.Run(ctx)
}

Local Operations (Offline)

All mutations work locally, even without network access. The CRDT plugin records changes in the shadow table via hooks:

type App struct {
    db     *grove.DB
    plugin *crdt.Plugin
    syncer *crdt.Syncer
}

func NewApp(db *grove.DB, plugin *crdt.Plugin) *App {
    return &App{db: db, plugin: plugin}
}

func (a *App) CreateTask(ctx context.Context, task *Task) error {
    // This works offline. The CRDT plugin records the change
    // in the shadow table via PostInsert hook.
    _, err := a.db.Insert(ctx, task)
    return err
}

func (a *App) UpdateTitle(ctx context.Context, id, title string) error {
    // The PostUpdate hook records this as an LWW change.
    _, err := a.db.Update(ctx, &Task{ID: id, Title: title})
    return err
}

func (a *App) IncrementPriority(ctx context.Context, id string) error {
    // Counter operations are tracked per-node.
    _, err := a.db.Update(ctx, &Task{ID: id, Priority: 1})
    return err
}

func (a *App) AddTag(ctx context.Context, id string, tag string) error {
    task := &Task{ID: id}
    // Read current tags, add the new one.
    if err := a.db.Select(ctx, task); err != nil {
        return err
    }
    task.Tags = append(task.Tags, tag)
    _, err := a.db.Update(ctx, task)
    return err
}

func (a *App) DeleteTask(ctx context.Context, id string) error {
    // Deletion creates a tombstone that propagates during sync.
    _, err := a.db.Delete(ctx, &Task{ID: id})
    return err
}

Sync When Online

Set up the syncer to connect when the network is available:

func (a *App) SetupSync(cloudURL string) {
    transport := crdt.NewStreamingTransport(
        cloudURL,
        crdt.WithStreamTables("tasks"),
        crdt.WithStreamReconnect(10 * time.Second),
    )

    a.syncer = crdt.NewSyncer(a.plugin,
        crdt.WithTransport(transport),
        crdt.WithSyncTables("tasks"),
        crdt.WithSyncInterval(60 * time.Second), // Poll every minute
    )
}

func (a *App) Run(ctx context.Context) {
    // Set up sync with cloud server.
    a.SetupSync("https://cloud.example.com/sync")

    // Run periodic sync in the background.
    go a.syncer.Run(ctx)

    // Also run SSE streaming for real-time updates.
    go a.syncer.StreamSync(ctx)

    // Your app logic here...
    select {
    case <-ctx.Done():
        log.Println("Shutting down")
    }
}

Manual Sync with Error Handling

For more control, trigger sync manually with retry logic:

func (a *App) SyncWithRetry(ctx context.Context, maxRetries int) error {
    var lastErr error

    for attempt := 0; attempt < maxRetries; attempt++ {
        report, err := a.syncer.Sync(ctx)
        if err == nil {
            log.Printf("Sync complete: %s", report)
            return nil
        }

        lastErr = err
        log.Printf("Sync attempt %d/%d failed: %v", attempt+1, maxRetries, err)

        // Exponential backoff.
        backoff := time.Duration(1<<uint(attempt)) * time.Second
        if backoff > 30*time.Second {
            backoff = 30 * time.Second
        }

        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(backoff):
            continue
        }
    }

    return fmt.Errorf("sync failed after %d retries: %w", maxRetries, lastErr)
}

Inspecting CRDT State

Debug the local CRDT state for any record:

func (a *App) DebugTask(ctx context.Context, id string) {
    state, err := a.plugin.Inspect(ctx, "tasks", id)
    if err != nil {
        log.Printf("Inspect error: %v", err)
        return
    }

    result := crdt.InspectState(state)
    fmt.Println(result)
    // CRDT State for tasks[task-1]
    //   title (lww): "Buy groceries" (node=edge-device-42, hlc=HLC{...})
    //   priority (counter): 3 [edge-device-42: +3 -0]
    //   tags (set): ["shopping", "urgent"]
    //   completed (lww): false (node=edge-device-42, hlc=HLC{...})
}

Tombstone Cleanup

Tombstones mark deleted records for sync propagation. Clean them up periodically:

func (a *App) CleanupLoop(ctx context.Context) {
    ticker := time.NewTicker(24 * time.Hour)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := a.plugin.CleanupTombstones(ctx, "tasks"); err != nil {
                log.Printf("Tombstone cleanup error: %v", err)
            }
        }
    }
}

Conflict Resolution in Practice

CRDTs resolve conflicts automatically, but it helps to understand the behavior:

// Scenario: Two offline nodes edit the same task title.
//
// Node A (offline, ts=100): title = "Buy groceries at Costco"
// Node B (offline, ts=105): title = "Buy groceries at Trader Joe's"
//
// After both come online and sync:
// Result: title = "Buy groceries at Trader Joe's" (Node B wins, higher HLC)

// Scenario: Two offline nodes add tags concurrently.
//
// Node A (offline): adds "urgent"
// Node B (offline): adds "shopping"
//
// After sync: tags = ["urgent", "shopping"] (both preserved, set union)

// Scenario: One node adds a tag, another removes it concurrently.
//
// Node A (offline): removes "urgent"
// Node B (offline): adds "urgent" (new add operation)
//
// After sync: tags still contain "urgent" (add-wins semantics)

Complete Example

Put it all together:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Set up local DB + CRDT plugin (see above).
    db, plugin := setupDB(ctx)
    defer db.Close()

    app := NewApp(db, plugin)
    app.SetupSync("https://cloud.example.com/sync")

    // Background sync.
    go app.syncer.Run(ctx)
    go app.syncer.StreamSync(ctx)
    go app.CleanupLoop(ctx)

    // Create tasks offline.
    app.CreateTask(ctx, &Task{
        ID:    "task-1",
        Title: "Buy groceries",
        Tags:  []string{"shopping"},
    })

    // Increment priority.
    app.IncrementPriority(ctx, "task-1")
    app.IncrementPriority(ctx, "task-1")

    // Manual sync when ready.
    if err := app.SyncWithRetry(ctx, 3); err != nil {
        log.Printf("Sync failed: %v", err)
    }

    // Debug state.
    app.DebugTask(ctx, "task-1")
}

On this page