Plugin System
Intercept merge, metadata, presence, room, time-travel, and connection events with server-side and client-side CRDT plugins.
Grove CRDT provides a plugin system for intercepting events across the sync lifecycle. Plugins can implement custom merge logic, enforce access control, collect metrics, audit changes, and more. Both the Go server and the TypeScript client support plugins.
Overview
Server-Side Plugins (Go)
Server-side plugins implement the CRDTPlugin interface and optionally one or more interceptor interfaces to hook into specific subsystems:
| Interface | Events |
|---|---|
MergeInterceptor | Before/after field and state merges |
MetadataInterceptor | Before/after metadata reads and writes |
PresenceInterceptor | Before/after presence updates |
RoomInterceptor | Before/after room join/leave |
TimeTravelInterceptor | Before time-travel reads |
ConnectionInterceptor | On client connect/disconnect |
Client-Side Plugins (TypeScript)
Client-side plugins implement the StorePlugin interface and hook into store mutations, sync events, and conflict resolution. See the TypeScript Client for details.
CRDTPlugin Interface
Every server-side plugin must implement the base interface:
type CRDTPlugin interface {
Name() string
Init(ctrl *SyncController) error
Close() error
}| Method | Description |
|---|---|
Name() | Unique plugin name for logging and identification |
Init() | Called once when the plugin is registered; receives the controller |
Close() | Called on controller shutdown for cleanup |
BaseCRDTPlugin
For convenience, embed BaseCRDTPlugin to get no-op defaults for all methods. Override only what you need:
type MyPlugin struct {
crdt.BaseCRDTPlugin
}
func (p *MyPlugin) Name() string { return "my-plugin" }
// Only override the interceptors you need.Interceptor Interfaces
MergeInterceptor
Intercept field and state merge operations for custom merge logic, conflict analytics, or transformation:
type MergeInterceptor interface {
BeforeMergeField(ctx context.Context, table, pk, field string, local, remote *FieldState) error
AfterMergeField(ctx context.Context, table, pk, field string, local, remote, merged *FieldState)
BeforeMergeState(ctx context.Context, table, pk string, local, remote *State) error
AfterMergeState(ctx context.Context, table, pk string, local, remote, merged *State)
}Use cases:
- Custom merge logic that overrides default behavior
- Conflict detection and analytics (track how often conflicts occur per field)
- Merge validation (reject merges that violate business rules)
- Transformation (modify values before or after merge)
type ConflictAnalytics struct {
crdt.BaseCRDTPlugin
conflicts map[string]int
mu sync.Mutex
}
func (p *ConflictAnalytics) Name() string { return "conflict-analytics" }
func (p *ConflictAnalytics) AfterMergeField(
ctx context.Context, table, pk, field string,
local, remote, merged *crdt.FieldState,
) {
if local != nil && remote != nil && local.HLC != remote.HLC {
p.mu.Lock()
p.conflicts[fmt.Sprintf("%s.%s.%s", table, pk, field)]++
p.mu.Unlock()
}
}MetadataInterceptor
Intercept shadow table reads and writes for encryption, access control, or audit logging:
type MetadataInterceptor interface {
BeforeWriteMetadata(ctx context.Context, table, pk string, state *State) error
AfterWriteMetadata(ctx context.Context, table, pk string, state *State)
BeforeReadMetadata(ctx context.Context, table, pk string) error
AfterReadMetadata(ctx context.Context, table, pk string, state *State)
}Use cases:
- Field-level encryption (encrypt values before write, decrypt after read)
- Access control (block reads/writes based on user permissions)
- Audit logging (record every metadata access)
type EncryptionPlugin struct {
crdt.BaseCRDTPlugin
key []byte
}
func (p *EncryptionPlugin) Name() string { return "encryption" }
func (p *EncryptionPlugin) BeforeWriteMetadata(
ctx context.Context, table, pk string, state *crdt.State,
) error {
for field, fs := range state.Fields {
if isSensitive(table, field) {
encrypted, err := encrypt(fs.Value, p.key)
if err != nil {
return err
}
fs.Value = encrypted
}
}
return nil
}
func (p *EncryptionPlugin) AfterReadMetadata(
ctx context.Context, table, pk string, state *crdt.State,
) {
for field, fs := range state.Fields {
if isSensitive(table, field) {
decrypted, _ := decrypt(fs.Value, p.key)
fs.Value = decrypted
}
}
}PresenceInterceptor
Intercept presence updates for rate limiting, enrichment, or filtering:
type PresenceInterceptor interface {
BeforePresenceUpdate(ctx context.Context, update *PresenceUpdate) error
AfterPresenceUpdate(ctx context.Context, update PresenceUpdate, event PresenceEvent)
}Use cases:
- Rate limiting presence updates per node
- Enriching presence data (add server-side metadata)
- Filtering sensitive presence data before broadcast
type PresenceRateLimiter struct {
crdt.BaseCRDTPlugin
lastUpdate map[string]time.Time
minInterval time.Duration
mu sync.Mutex
}
func (p *PresenceRateLimiter) Name() string { return "presence-rate-limiter" }
func (p *PresenceRateLimiter) BeforePresenceUpdate(
ctx context.Context, update *crdt.PresenceUpdate,
) error {
p.mu.Lock()
defer p.mu.Unlock()
key := update.NodeID + ":" + update.Topic
if last, ok := p.lastUpdate[key]; ok && time.Since(last) < p.minInterval {
return fmt.Errorf("rate limited: wait %v", p.minInterval-time.Since(last))
}
p.lastUpdate[key] = time.Now()
return nil
}RoomInterceptor
Intercept room join/leave for authorization and lifecycle management:
type RoomInterceptor interface {
BeforeJoinRoom(ctx context.Context, roomID, nodeID string, data *ParticipantData) error
AfterJoinRoom(ctx context.Context, roomID, nodeID string, data ParticipantData)
BeforeLeaveRoom(ctx context.Context, roomID, nodeID string) error
AfterLeaveRoom(ctx context.Context, roomID, nodeID string)
}Use cases:
- Authorization (check if a user can join a specific room)
- Participant enrichment (add server-side data to participant info)
- Notifications (send alerts when users join/leave)
type RoomACL struct {
crdt.BaseCRDTPlugin
allowList map[string][]string // roomID -> allowed nodeIDs
}
func (p *RoomACL) Name() string { return "room-acl" }
func (p *RoomACL) BeforeJoinRoom(
ctx context.Context, roomID, nodeID string, data *crdt.ParticipantData,
) error {
allowed, ok := p.allowList[roomID]
if !ok {
return nil // No ACL for this room, allow all
}
for _, id := range allowed {
if id == nodeID {
return nil
}
}
return fmt.Errorf("node %s is not allowed in room %s", nodeID, roomID)
}TimeTravelInterceptor
Intercept time-travel queries for access control:
type TimeTravelInterceptor interface {
BeforeReadStateAt(ctx context.Context, table, pk string, hlc HLC) error
BeforeReadFieldHistory(ctx context.Context, table, pk, field string) error
}Use cases:
- Restrict who can view history for sensitive tables
- Audit logging for history access
ConnectionInterceptor
Track client connections and disconnections:
type ConnectionInterceptor interface {
OnConnect(ctx context.Context, nodeID string, metadata map[string]string)
OnDisconnect(ctx context.Context, nodeID string)
}Use cases:
- Connection tracking and analytics
- Active user counting
- Session logging
type ConnectionTracker struct {
crdt.BaseCRDTPlugin
active map[string]time.Time
mu sync.Mutex
}
func (p *ConnectionTracker) Name() string { return "connection-tracker" }
func (p *ConnectionTracker) OnConnect(
ctx context.Context, nodeID string, metadata map[string]string,
) {
p.mu.Lock()
p.active[nodeID] = time.Now()
p.mu.Unlock()
log.Printf("Node %s connected (total: %d)", nodeID, len(p.active))
}
func (p *ConnectionTracker) OnDisconnect(ctx context.Context, nodeID string) {
p.mu.Lock()
delete(p.active, nodeID)
p.mu.Unlock()
log.Printf("Node %s disconnected (total: %d)", nodeID, len(p.active))
}Registering Plugins
Via Controller Options
Register plugins at controller creation time:
ctrl := crdt.NewSyncController(plugin,
crdt.WithControllerPlugin(&ConflictAnalytics{}),
crdt.WithControllerPlugin(&EncryptionPlugin{key: secretKey}),
crdt.WithControllerPlugin(&ConnectionTracker{}),
)At Runtime
Add plugins after creation:
ctrl.AddPlugin(&RoomACL{allowList: acl})Via Forge Extension
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncController(
crdt.WithControllerPlugin(&ConflictAnalytics{}),
crdt.WithControllerPlugin(&EncryptionPlugin{key: secretKey}),
),
))Example: Audit Plugin
A complete plugin that logs all merge operations and metadata access:
type AuditPlugin struct {
crdt.BaseCRDTPlugin
logger *log.Logger
}
func NewAuditPlugin(logger *log.Logger) *AuditPlugin {
return &AuditPlugin{logger: logger}
}
func (p *AuditPlugin) Name() string { return "audit" }
func (p *AuditPlugin) Init(ctrl *crdt.SyncController) error {
p.logger.Println("Audit plugin initialized")
return nil
}
func (p *AuditPlugin) AfterMergeState(
ctx context.Context, table, pk string,
local, remote, merged *crdt.State,
) {
p.logger.Printf("MERGE %s/%s: %d fields merged", table, pk, len(merged.Fields))
}
func (p *AuditPlugin) AfterWriteMetadata(
ctx context.Context, table, pk string, state *crdt.State,
) {
p.logger.Printf("WRITE %s/%s: %d fields written", table, pk, len(state.Fields))
}
func (p *AuditPlugin) OnConnect(ctx context.Context, nodeID string, meta map[string]string) {
p.logger.Printf("CONNECT %s", nodeID)
}
func (p *AuditPlugin) OnDisconnect(ctx context.Context, nodeID string) {
p.logger.Printf("DISCONNECT %s", nodeID)
}
func (p *AuditPlugin) Close() error {
p.logger.Println("Audit plugin closed")
return nil
}TypeScript Client Plugins
Client-side plugins use the StorePlugin interface:
interface StorePlugin {
name: string;
onInit?(store: CRDTStore): void;
onBeforeMutation?(mutation: Mutation): Mutation | null;
onAfterMutation?(mutation: Mutation, prevState: DocumentState): void;
onBeforeSync?(changes: ChangeRecord[]): ChangeRecord[];
onAfterSync?(result: SyncResult): void;
onConflict?(field: string, local: FieldState, remote: FieldState, resolved: FieldState): void;
destroy?(): void;
}Hook Types
| Hook | Timing | Can Modify | Description |
|---|---|---|---|
onInit | Once on registration | No | Access to store instance |
onBeforeMutation | Before each mutation | Yes (return modified or null to cancel) | Validation, transformation |
onAfterMutation | After each mutation | No | Logging, side effects |
onBeforeSync | Before push | Yes (filter/transform changes) | Change filtering |
onAfterSync | After sync completes | No | Notifications, analytics |
onConflict | On merge conflict | No | Conflict logging, UI notifications |
destroy | On plugin removal | No | Cleanup |
Registering Client Plugins
// Direct registration.
store.use(myPlugin);
// Via CRDTProvider.
<CRDTProvider config={{
baseURL: "/api/sync",
nodeID: "browser-1",
tables: ["documents"],
plugins: [validationPlugin, loggingPlugin],
}}>
<App />
</CRDTProvider>React Hook
import { usePlugin } from "@grove-js/crdt/react";
function PluginDashboard() {
const validation = usePlugin("validation");
const logger = usePlugin("conflict-logger");
return (
<div>
<p>Validation: {validation ? "active" : "inactive"}</p>
<p>Conflict Logger: {logger ? "active" : "inactive"}</p>
</div>
);
}Example: Validation Plugin
const validationPlugin: StorePlugin = {
name: "validation",
onBeforeMutation(mutation) {
if (mutation.field === "title" && typeof mutation.value === "string") {
if (mutation.value.length === 0) {
console.warn("Title cannot be empty, cancelling mutation");
return null; // Cancel the mutation
}
if (mutation.value.length > 200) {
return { ...mutation, value: mutation.value.slice(0, 200) };
}
}
return mutation;
},
};Example: Conflict Logging Plugin
const conflictLogger: StorePlugin = {
name: "conflict-logger",
onConflict(field, local, remote, resolved) {
console.log(`Conflict on "${field}":`, {
localHLC: local.hlc,
remoteHLC: remote.hlc,
winner: resolved === remote ? "remote" : "local",
});
// Send to analytics service.
analytics.track("crdt_conflict", { field, table: local.table });
},
};