Grove

Plugin System

Intercept merge, metadata, presence, room, time-travel, and connection events with server-side and client-side CRDT plugins.

Grove CRDT provides a plugin system for intercepting events across the sync lifecycle. Plugins can implement custom merge logic, enforce access control, collect metrics, audit changes, and more. Both the Go server and the TypeScript client support plugins.

Overview

Server-Side Plugins (Go)

Server-side plugins implement the CRDTPlugin interface and optionally one or more interceptor interfaces to hook into specific subsystems:

InterfaceEvents
MergeInterceptorBefore/after field and state merges
MetadataInterceptorBefore/after metadata reads and writes
PresenceInterceptorBefore/after presence updates
RoomInterceptorBefore/after room join/leave
TimeTravelInterceptorBefore time-travel reads
ConnectionInterceptorOn client connect/disconnect

Client-Side Plugins (TypeScript)

Client-side plugins implement the StorePlugin interface and hook into store mutations, sync events, and conflict resolution. See the TypeScript Client for details.

CRDTPlugin Interface

Every server-side plugin must implement the base interface:

type CRDTPlugin interface {
    Name() string
    Init(ctrl *SyncController) error
    Close() error
}
MethodDescription
Name()Unique plugin name for logging and identification
Init()Called once when the plugin is registered; receives the controller
Close()Called on controller shutdown for cleanup

BaseCRDTPlugin

For convenience, embed BaseCRDTPlugin to get no-op defaults for all methods. Override only what you need:

type MyPlugin struct {
    crdt.BaseCRDTPlugin
}

func (p *MyPlugin) Name() string { return "my-plugin" }

// Only override the interceptors you need.

Interceptor Interfaces

MergeInterceptor

Intercept field and state merge operations for custom merge logic, conflict analytics, or transformation:

type MergeInterceptor interface {
    BeforeMergeField(ctx context.Context, table, pk, field string, local, remote *FieldState) error
    AfterMergeField(ctx context.Context, table, pk, field string, local, remote, merged *FieldState)
    BeforeMergeState(ctx context.Context, table, pk string, local, remote *State) error
    AfterMergeState(ctx context.Context, table, pk string, local, remote, merged *State)
}

Use cases:

  • Custom merge logic that overrides default behavior
  • Conflict detection and analytics (track how often conflicts occur per field)
  • Merge validation (reject merges that violate business rules)
  • Transformation (modify values before or after merge)
type ConflictAnalytics struct {
    crdt.BaseCRDTPlugin
    conflicts map[string]int
    mu        sync.Mutex
}

func (p *ConflictAnalytics) Name() string { return "conflict-analytics" }

func (p *ConflictAnalytics) AfterMergeField(
    ctx context.Context, table, pk, field string,
    local, remote, merged *crdt.FieldState,
) {
    if local != nil && remote != nil && local.HLC != remote.HLC {
        p.mu.Lock()
        p.conflicts[fmt.Sprintf("%s.%s.%s", table, pk, field)]++
        p.mu.Unlock()
    }
}

MetadataInterceptor

Intercept shadow table reads and writes for encryption, access control, or audit logging:

type MetadataInterceptor interface {
    BeforeWriteMetadata(ctx context.Context, table, pk string, state *State) error
    AfterWriteMetadata(ctx context.Context, table, pk string, state *State)
    BeforeReadMetadata(ctx context.Context, table, pk string) error
    AfterReadMetadata(ctx context.Context, table, pk string, state *State)
}

Use cases:

  • Field-level encryption (encrypt values before write, decrypt after read)
  • Access control (block reads/writes based on user permissions)
  • Audit logging (record every metadata access)
type EncryptionPlugin struct {
    crdt.BaseCRDTPlugin
    key []byte
}

func (p *EncryptionPlugin) Name() string { return "encryption" }

func (p *EncryptionPlugin) BeforeWriteMetadata(
    ctx context.Context, table, pk string, state *crdt.State,
) error {
    for field, fs := range state.Fields {
        if isSensitive(table, field) {
            encrypted, err := encrypt(fs.Value, p.key)
            if err != nil {
                return err
            }
            fs.Value = encrypted
        }
    }
    return nil
}

func (p *EncryptionPlugin) AfterReadMetadata(
    ctx context.Context, table, pk string, state *crdt.State,
) {
    for field, fs := range state.Fields {
        if isSensitive(table, field) {
            decrypted, _ := decrypt(fs.Value, p.key)
            fs.Value = decrypted
        }
    }
}

PresenceInterceptor

Intercept presence updates for rate limiting, enrichment, or filtering:

type PresenceInterceptor interface {
    BeforePresenceUpdate(ctx context.Context, update *PresenceUpdate) error
    AfterPresenceUpdate(ctx context.Context, update PresenceUpdate, event PresenceEvent)
}

Use cases:

  • Rate limiting presence updates per node
  • Enriching presence data (add server-side metadata)
  • Filtering sensitive presence data before broadcast
type PresenceRateLimiter struct {
    crdt.BaseCRDTPlugin
    lastUpdate map[string]time.Time
    minInterval time.Duration
    mu         sync.Mutex
}

func (p *PresenceRateLimiter) Name() string { return "presence-rate-limiter" }

func (p *PresenceRateLimiter) BeforePresenceUpdate(
    ctx context.Context, update *crdt.PresenceUpdate,
) error {
    p.mu.Lock()
    defer p.mu.Unlock()
    key := update.NodeID + ":" + update.Topic
    if last, ok := p.lastUpdate[key]; ok && time.Since(last) < p.minInterval {
        return fmt.Errorf("rate limited: wait %v", p.minInterval-time.Since(last))
    }
    p.lastUpdate[key] = time.Now()
    return nil
}

RoomInterceptor

Intercept room join/leave for authorization and lifecycle management:

type RoomInterceptor interface {
    BeforeJoinRoom(ctx context.Context, roomID, nodeID string, data *ParticipantData) error
    AfterJoinRoom(ctx context.Context, roomID, nodeID string, data ParticipantData)
    BeforeLeaveRoom(ctx context.Context, roomID, nodeID string) error
    AfterLeaveRoom(ctx context.Context, roomID, nodeID string)
}

Use cases:

  • Authorization (check if a user can join a specific room)
  • Participant enrichment (add server-side data to participant info)
  • Notifications (send alerts when users join/leave)
type RoomACL struct {
    crdt.BaseCRDTPlugin
    allowList map[string][]string // roomID -> allowed nodeIDs
}

func (p *RoomACL) Name() string { return "room-acl" }

func (p *RoomACL) BeforeJoinRoom(
    ctx context.Context, roomID, nodeID string, data *crdt.ParticipantData,
) error {
    allowed, ok := p.allowList[roomID]
    if !ok {
        return nil // No ACL for this room, allow all
    }
    for _, id := range allowed {
        if id == nodeID {
            return nil
        }
    }
    return fmt.Errorf("node %s is not allowed in room %s", nodeID, roomID)
}

TimeTravelInterceptor

Intercept time-travel queries for access control:

type TimeTravelInterceptor interface {
    BeforeReadStateAt(ctx context.Context, table, pk string, hlc HLC) error
    BeforeReadFieldHistory(ctx context.Context, table, pk, field string) error
}

Use cases:

  • Restrict who can view history for sensitive tables
  • Audit logging for history access

ConnectionInterceptor

Track client connections and disconnections:

type ConnectionInterceptor interface {
    OnConnect(ctx context.Context, nodeID string, metadata map[string]string)
    OnDisconnect(ctx context.Context, nodeID string)
}

Use cases:

  • Connection tracking and analytics
  • Active user counting
  • Session logging
type ConnectionTracker struct {
    crdt.BaseCRDTPlugin
    active map[string]time.Time
    mu     sync.Mutex
}

func (p *ConnectionTracker) Name() string { return "connection-tracker" }

func (p *ConnectionTracker) OnConnect(
    ctx context.Context, nodeID string, metadata map[string]string,
) {
    p.mu.Lock()
    p.active[nodeID] = time.Now()
    p.mu.Unlock()
    log.Printf("Node %s connected (total: %d)", nodeID, len(p.active))
}

func (p *ConnectionTracker) OnDisconnect(ctx context.Context, nodeID string) {
    p.mu.Lock()
    delete(p.active, nodeID)
    p.mu.Unlock()
    log.Printf("Node %s disconnected (total: %d)", nodeID, len(p.active))
}

Registering Plugins

Via Controller Options

Register plugins at controller creation time:

ctrl := crdt.NewSyncController(plugin,
    crdt.WithControllerPlugin(&ConflictAnalytics{}),
    crdt.WithControllerPlugin(&EncryptionPlugin{key: secretKey}),
    crdt.WithControllerPlugin(&ConnectionTracker{}),
)

At Runtime

Add plugins after creation:

ctrl.AddPlugin(&RoomACL{allowList: acl})

Via Forge Extension

app.Use(groveext.New(
    groveext.WithDriver(pgdb),
    groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
    groveext.WithSyncController(
        crdt.WithControllerPlugin(&ConflictAnalytics{}),
        crdt.WithControllerPlugin(&EncryptionPlugin{key: secretKey}),
    ),
))

Example: Audit Plugin

A complete plugin that logs all merge operations and metadata access:

type AuditPlugin struct {
    crdt.BaseCRDTPlugin
    logger *log.Logger
}

func NewAuditPlugin(logger *log.Logger) *AuditPlugin {
    return &AuditPlugin{logger: logger}
}

func (p *AuditPlugin) Name() string { return "audit" }

func (p *AuditPlugin) Init(ctrl *crdt.SyncController) error {
    p.logger.Println("Audit plugin initialized")
    return nil
}

func (p *AuditPlugin) AfterMergeState(
    ctx context.Context, table, pk string,
    local, remote, merged *crdt.State,
) {
    p.logger.Printf("MERGE %s/%s: %d fields merged", table, pk, len(merged.Fields))
}

func (p *AuditPlugin) AfterWriteMetadata(
    ctx context.Context, table, pk string, state *crdt.State,
) {
    p.logger.Printf("WRITE %s/%s: %d fields written", table, pk, len(state.Fields))
}

func (p *AuditPlugin) OnConnect(ctx context.Context, nodeID string, meta map[string]string) {
    p.logger.Printf("CONNECT %s", nodeID)
}

func (p *AuditPlugin) OnDisconnect(ctx context.Context, nodeID string) {
    p.logger.Printf("DISCONNECT %s", nodeID)
}

func (p *AuditPlugin) Close() error {
    p.logger.Println("Audit plugin closed")
    return nil
}

TypeScript Client Plugins

Client-side plugins use the StorePlugin interface:

interface StorePlugin {
    name: string;
    onInit?(store: CRDTStore): void;
    onBeforeMutation?(mutation: Mutation): Mutation | null;
    onAfterMutation?(mutation: Mutation, prevState: DocumentState): void;
    onBeforeSync?(changes: ChangeRecord[]): ChangeRecord[];
    onAfterSync?(result: SyncResult): void;
    onConflict?(field: string, local: FieldState, remote: FieldState, resolved: FieldState): void;
    destroy?(): void;
}

Hook Types

HookTimingCan ModifyDescription
onInitOnce on registrationNoAccess to store instance
onBeforeMutationBefore each mutationYes (return modified or null to cancel)Validation, transformation
onAfterMutationAfter each mutationNoLogging, side effects
onBeforeSyncBefore pushYes (filter/transform changes)Change filtering
onAfterSyncAfter sync completesNoNotifications, analytics
onConflictOn merge conflictNoConflict logging, UI notifications
destroyOn plugin removalNoCleanup

Registering Client Plugins

// Direct registration.
store.use(myPlugin);

// Via CRDTProvider.
<CRDTProvider config={{
    baseURL: "/api/sync",
    nodeID: "browser-1",
    tables: ["documents"],
    plugins: [validationPlugin, loggingPlugin],
}}>
    <App />
</CRDTProvider>

React Hook

import { usePlugin } from "@grove-js/crdt/react";

function PluginDashboard() {
    const validation = usePlugin("validation");
    const logger = usePlugin("conflict-logger");

    return (
        <div>
            <p>Validation: {validation ? "active" : "inactive"}</p>
            <p>Conflict Logger: {logger ? "active" : "inactive"}</p>
        </div>
    );
}

Example: Validation Plugin

const validationPlugin: StorePlugin = {
    name: "validation",
    onBeforeMutation(mutation) {
        if (mutation.field === "title" && typeof mutation.value === "string") {
            if (mutation.value.length === 0) {
                console.warn("Title cannot be empty, cancelling mutation");
                return null; // Cancel the mutation
            }
            if (mutation.value.length > 200) {
                return { ...mutation, value: mutation.value.slice(0, 200) };
            }
        }
        return mutation;
    },
};

Example: Conflict Logging Plugin

const conflictLogger: StorePlugin = {
    name: "conflict-logger",
    onConflict(field, local, remote, resolved) {
        console.log(`Conflict on "${field}":`, {
            localHLC: local.hlc,
            remoteHLC: remote.hlc,
            winner: resolved === remote ? "remote" : "local",
        });
        // Send to analytics service.
        analytics.track("crdt_conflict", { field, table: local.table });
    },
};

On this page