Grove

Change Data Capture

Real-time change streams for reacting to database inserts, updates, and deletes.

Overview

The stream package provides a generic Change Data Capture (CDC) abstraction through ChangeStream[T] and ChangeEvent[T]. This enables your application to react to real-time database changes -- inserts, updates, deletes -- without polling.

CDC support is driver-dependent. The driver.StreamCapable interface reports whether a driver supports it:

type StreamCapable interface {
    SupportsStreaming() bool
    SupportsCDC() bool
}
DriverCDC MechanismSupportsCDC()
pgdriverPostgreSQL LISTEN/NOTIFYtrue
mongodriverMongoDB Change Streamstrue
mysqldriver--false
sqlitedriver--false

Change Operations

The ChangeOp type represents the kind of mutation:

type ChangeOp int

const (
    ChangeInsert  ChangeOp = iota // INSERT
    ChangeUpdate                   // UPDATE
    ChangeDelete                   // DELETE
    ChangeReplace                  // REPLACE (MongoDB-specific)
)

Each operation has a String() method that returns the uppercase name (e.g., "INSERT", "UPDATE").

ChangeEvent[T]

A ChangeEvent[T] represents a single change from the database:

type ChangeEvent[T any] struct {
    // Operation is the type of change (INSERT, UPDATE, DELETE, REPLACE).
    Operation ChangeOp

    // Before is the previous state of the row.
    // nil for inserts; populated when the driver supports it.
    Before *T

    // After is the new state of the row.
    // nil for deletes.
    After *T

    // Timestamp is the server timestamp of the change.
    Timestamp time.Time

    // ResumeToken is an opaque token for resuming the stream after disconnect.
    ResumeToken any
}

The Before and After fields depend on the operation type and driver capabilities:

OperationBeforeAfter
ChangeInsertnilnew row
ChangeUpdateprevious row (if supported)updated row
ChangeDeletedeleted row (if supported)nil
ChangeReplaceprevious row (if supported)replacement row

The ChangeSource[T] Interface

Drivers implement the ChangeSource[T] interface to provide CDC events:

type ChangeSource[T any] interface {
    Next(ctx context.Context) bool
    Event() ChangeEvent[T]
    Err() error
    Close() error
    ResumeToken() any
}

This is the low-level contract between the stream package and the database driver. You typically interact with the higher-level ChangeStream[T] wrapper instead.

ChangeStream[T]

ChangeStream[T] wraps a ChangeSource[T] and provides a consistent iteration API:

type ChangeStream[T any] struct { /* unexported fields */ }

func NewChangeStream[T any](source ChangeSource[T]) *ChangeStream[T]

Methods

MethodSignatureDescription
Nextfunc (cs *ChangeStream[T]) Next(ctx context.Context) boolBlock until next change event
Eventfunc (cs *ChangeStream[T]) Event() ChangeEvent[T]Get current change event
Errfunc (cs *ChangeStream[T]) Err() errorGet first error encountered
Closefunc (cs *ChangeStream[T]) Close() errorStop the change stream
ResumeTokenfunc (cs *ChangeStream[T]) ResumeToken() anyGet current resume token
Allfunc (cs *ChangeStream[T]) All(yield func(ChangeEvent[T], error) bool)Range-over-func iterator (Go 1.23+)

Iteration Pattern

cs := stream.NewChangeStream(source)
defer cs.Close()

for cs.Next(ctx) {
    event := cs.Event()

    switch event.Operation {
    case stream.ChangeInsert:
        fmt.Printf("New row: %+v\n", *event.After)
    case stream.ChangeUpdate:
        fmt.Printf("Updated row: %+v\n", *event.After)
    case stream.ChangeDelete:
        if event.Before != nil {
            fmt.Printf("Deleted row: %+v\n", *event.Before)
        }
    }
}
if err := cs.Err(); err != nil {
    log.Error("change stream error", "err", err)
}

Range-Over-Func (Go 1.23+)

for event, err := range cs.All {
    if err != nil {
        log.Error("change stream error", "err", err)
        break
    }

    if event.Operation == stream.ChangeInsert {
        processNewRecord(event.After)
    }
}

Resume Tokens

Change streams support resume tokens for reconnection after disconnects. The ResumeToken() method returns an opaque value that can be stored and used to resume the stream from where it left off:

// Save the resume token periodically
token := cs.ResumeToken()
saveCheckpoint(token)

// Later, resume from the saved token
source := driver.WatchCollection("users", WithResumeToken(savedToken))
cs := stream.NewChangeStream(source)

The format and semantics of resume tokens are driver-specific. For MongoDB, this is a BSON document. For PostgreSQL, it depends on the CDC implementation.

PostgreSQL: LISTEN/NOTIFY

The pgdriver provides CDC through PostgreSQL's LISTEN/NOTIFY mechanism. While this is not a full logical replication client, it enables real-time event notification for application-level CDC patterns.

The Listener Type

// Create a listener from the pgdriver
pgdb := pgdriver.Unwrap(db)
listener := pgdb.NewListener()

// Start listening (acquires a dedicated connection)
err := listener.Start(ctx)

// Subscribe to a channel
err = listener.Listen(ctx, "user_changes")

// Register a handler
listener.OnNotification("user_changes", func(n *pgdriver.Notification) {
    fmt.Printf("Channel: %s, Payload: %s\n", n.Channel, n.Payload)
})

Convenience Method

The PgDB.Listen() convenience method combines creating, starting, subscribing, and registering in a single call:

pgdb := pgdriver.Unwrap(db)

listener, err := pgdb.Listen(ctx, "order_events", func(n *pgdriver.Notification) {
    fmt.Printf("Order event: %s\n", n.Payload)
})
if err != nil {
    return err
}
defer listener.Close()

Sending Notifications

Use the Notify method to send a notification on a channel:

err := listener.Notify(ctx, "user_changes", `{"op":"INSERT","id":42}`)

The Notification Type

type Notification struct {
    Channel string // notification channel name
    Payload string // message payload (typically JSON)
    PID     uint32 // backend PID of the sender
}

Setting Up Triggers

To use LISTEN/NOTIFY for CDC, create a trigger function that sends notifications on data changes:

-- Create a notification function
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify(
        TG_TABLE_NAME || '_changes',
        json_build_object(
            'op', TG_OP,
            'before', CASE WHEN TG_OP = 'DELETE' THEN row_to_json(OLD) ELSE NULL END,
            'after', CASE WHEN TG_OP = 'DELETE' THEN NULL ELSE row_to_json(NEW) END,
            'timestamp', now()
        )::text
    );
    RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;

-- Attach to a table
CREATE TRIGGER users_notify
    AFTER INSERT OR UPDATE OR DELETE ON users
    FOR EACH ROW EXECUTE FUNCTION notify_changes();

Then subscribe in your application:

pgdb := pgdriver.Unwrap(db)

listener, err := pgdb.Listen(ctx, "users_changes", func(n *pgdriver.Notification) {
    var event struct {
        Op        string          `json:"op"`
        Before    json.RawMessage `json:"before"`
        After     json.RawMessage `json:"after"`
        Timestamp time.Time       `json:"timestamp"`
    }
    if err := json.Unmarshal([]byte(n.Payload), &event); err != nil {
        log.Error("failed to parse notification", "err", err)
        return
    }

    switch event.Op {
    case "INSERT":
        var user User
        json.Unmarshal(event.After, &user)
        fmt.Printf("New user: %s\n", user.Email)
    case "UPDATE":
        var user User
        json.Unmarshal(event.After, &user)
        fmt.Printf("Updated user: %s\n", user.Email)
    case "DELETE":
        var user User
        json.Unmarshal(event.Before, &user)
        fmt.Printf("Deleted user: %s\n", user.Email)
    }
})
if err != nil {
    return err
}
defer listener.Close()

// Block until context is cancelled
<-ctx.Done()

Listener Lifecycle

MethodDescription
pgdb.NewListener()Create a new listener (not started)
listener.Start(ctx)Acquire a dedicated connection and begin the listen loop
listener.Listen(ctx, channel)Subscribe to a notification channel
listener.Unlisten(ctx, channel)Unsubscribe from a channel
listener.OnNotification(channel, handler)Register a handler for a channel
listener.Notify(ctx, channel, payload)Send a notification
listener.Close()Stop the listener and release the connection

The listener acquires a dedicated connection from the pool when started. This connection is held open for the lifetime of the listener to receive asynchronous notifications. The background goroutine runs until the context is cancelled or Close() is called. Close() is idempotent.

MongoDB: Native Change Streams

MongoDB has native change stream support through its watch() API. The mongodriver implements the ChangeSource[T] interface by wrapping MongoDB's change stream cursor, providing full before/after document support and resume tokens.

MongoDB change streams support the ChangeReplace operation in addition to insert, update, and delete.

API Summary

Types

TypeDescription
ChangeOpOperation type: ChangeInsert, ChangeUpdate, ChangeDelete, ChangeReplace
ChangeEvent[T]A single change event with Operation, Before, After, Timestamp, ResumeToken
ChangeSource[T]Driver interface for providing CDC events
ChangeStream[T]High-level change stream iterator

pgdriver Types

TypeDescription
NotificationA PostgreSQL NOTIFY message with Channel, Payload, PID
ListenerManages LISTEN/NOTIFY subscriptions on a dedicated connection

On this page