Change Data Capture
Real-time change streams for reacting to database inserts, updates, and deletes.
Overview
The stream package provides a generic Change Data Capture (CDC) abstraction through ChangeStream[T] and ChangeEvent[T]. This enables your application to react to real-time database changes -- inserts, updates, deletes -- without polling.
CDC support is driver-dependent. The driver.StreamCapable interface reports whether a driver supports it:
type StreamCapable interface {
SupportsStreaming() bool
SupportsCDC() bool
}| Driver | CDC Mechanism | SupportsCDC() |
|---|---|---|
pgdriver | PostgreSQL LISTEN/NOTIFY | true |
mongodriver | MongoDB Change Streams | true |
mysqldriver | -- | false |
sqlitedriver | -- | false |
Change Operations
The ChangeOp type represents the kind of mutation:
type ChangeOp int
const (
ChangeInsert ChangeOp = iota // INSERT
ChangeUpdate // UPDATE
ChangeDelete // DELETE
ChangeReplace // REPLACE (MongoDB-specific)
)Each operation has a String() method that returns the uppercase name (e.g., "INSERT", "UPDATE").
ChangeEvent[T]
A ChangeEvent[T] represents a single change from the database:
type ChangeEvent[T any] struct {
// Operation is the type of change (INSERT, UPDATE, DELETE, REPLACE).
Operation ChangeOp
// Before is the previous state of the row.
// nil for inserts; populated when the driver supports it.
Before *T
// After is the new state of the row.
// nil for deletes.
After *T
// Timestamp is the server timestamp of the change.
Timestamp time.Time
// ResumeToken is an opaque token for resuming the stream after disconnect.
ResumeToken any
}The Before and After fields depend on the operation type and driver capabilities:
| Operation | Before | After |
|---|---|---|
ChangeInsert | nil | new row |
ChangeUpdate | previous row (if supported) | updated row |
ChangeDelete | deleted row (if supported) | nil |
ChangeReplace | previous row (if supported) | replacement row |
The ChangeSource[T] Interface
Drivers implement the ChangeSource[T] interface to provide CDC events:
type ChangeSource[T any] interface {
Next(ctx context.Context) bool
Event() ChangeEvent[T]
Err() error
Close() error
ResumeToken() any
}This is the low-level contract between the stream package and the database driver. You typically interact with the higher-level ChangeStream[T] wrapper instead.
ChangeStream[T]
ChangeStream[T] wraps a ChangeSource[T] and provides a consistent iteration API:
type ChangeStream[T any] struct { /* unexported fields */ }
func NewChangeStream[T any](source ChangeSource[T]) *ChangeStream[T]Methods
| Method | Signature | Description |
|---|---|---|
Next | func (cs *ChangeStream[T]) Next(ctx context.Context) bool | Block until next change event |
Event | func (cs *ChangeStream[T]) Event() ChangeEvent[T] | Get current change event |
Err | func (cs *ChangeStream[T]) Err() error | Get first error encountered |
Close | func (cs *ChangeStream[T]) Close() error | Stop the change stream |
ResumeToken | func (cs *ChangeStream[T]) ResumeToken() any | Get current resume token |
All | func (cs *ChangeStream[T]) All(yield func(ChangeEvent[T], error) bool) | Range-over-func iterator (Go 1.23+) |
Iteration Pattern
cs := stream.NewChangeStream(source)
defer cs.Close()
for cs.Next(ctx) {
event := cs.Event()
switch event.Operation {
case stream.ChangeInsert:
fmt.Printf("New row: %+v\n", *event.After)
case stream.ChangeUpdate:
fmt.Printf("Updated row: %+v\n", *event.After)
case stream.ChangeDelete:
if event.Before != nil {
fmt.Printf("Deleted row: %+v\n", *event.Before)
}
}
}
if err := cs.Err(); err != nil {
log.Error("change stream error", "err", err)
}Range-Over-Func (Go 1.23+)
for event, err := range cs.All {
if err != nil {
log.Error("change stream error", "err", err)
break
}
if event.Operation == stream.ChangeInsert {
processNewRecord(event.After)
}
}Resume Tokens
Change streams support resume tokens for reconnection after disconnects. The ResumeToken() method returns an opaque value that can be stored and used to resume the stream from where it left off:
// Save the resume token periodically
token := cs.ResumeToken()
saveCheckpoint(token)
// Later, resume from the saved token
source := driver.WatchCollection("users", WithResumeToken(savedToken))
cs := stream.NewChangeStream(source)The format and semantics of resume tokens are driver-specific. For MongoDB, this is a BSON document. For PostgreSQL, it depends on the CDC implementation.
PostgreSQL: LISTEN/NOTIFY
The pgdriver provides CDC through PostgreSQL's LISTEN/NOTIFY mechanism. While this is not a full logical replication client, it enables real-time event notification for application-level CDC patterns.
The Listener Type
// Create a listener from the pgdriver
pgdb := pgdriver.Unwrap(db)
listener := pgdb.NewListener()
// Start listening (acquires a dedicated connection)
err := listener.Start(ctx)
// Subscribe to a channel
err = listener.Listen(ctx, "user_changes")
// Register a handler
listener.OnNotification("user_changes", func(n *pgdriver.Notification) {
fmt.Printf("Channel: %s, Payload: %s\n", n.Channel, n.Payload)
})Convenience Method
The PgDB.Listen() convenience method combines creating, starting, subscribing, and registering in a single call:
pgdb := pgdriver.Unwrap(db)
listener, err := pgdb.Listen(ctx, "order_events", func(n *pgdriver.Notification) {
fmt.Printf("Order event: %s\n", n.Payload)
})
if err != nil {
return err
}
defer listener.Close()Sending Notifications
Use the Notify method to send a notification on a channel:
err := listener.Notify(ctx, "user_changes", `{"op":"INSERT","id":42}`)The Notification Type
type Notification struct {
Channel string // notification channel name
Payload string // message payload (typically JSON)
PID uint32 // backend PID of the sender
}Setting Up Triggers
To use LISTEN/NOTIFY for CDC, create a trigger function that sends notifications on data changes:
-- Create a notification function
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(
TG_TABLE_NAME || '_changes',
json_build_object(
'op', TG_OP,
'before', CASE WHEN TG_OP = 'DELETE' THEN row_to_json(OLD) ELSE NULL END,
'after', CASE WHEN TG_OP = 'DELETE' THEN NULL ELSE row_to_json(NEW) END,
'timestamp', now()
)::text
);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
-- Attach to a table
CREATE TRIGGER users_notify
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_changes();Then subscribe in your application:
pgdb := pgdriver.Unwrap(db)
listener, err := pgdb.Listen(ctx, "users_changes", func(n *pgdriver.Notification) {
var event struct {
Op string `json:"op"`
Before json.RawMessage `json:"before"`
After json.RawMessage `json:"after"`
Timestamp time.Time `json:"timestamp"`
}
if err := json.Unmarshal([]byte(n.Payload), &event); err != nil {
log.Error("failed to parse notification", "err", err)
return
}
switch event.Op {
case "INSERT":
var user User
json.Unmarshal(event.After, &user)
fmt.Printf("New user: %s\n", user.Email)
case "UPDATE":
var user User
json.Unmarshal(event.After, &user)
fmt.Printf("Updated user: %s\n", user.Email)
case "DELETE":
var user User
json.Unmarshal(event.Before, &user)
fmt.Printf("Deleted user: %s\n", user.Email)
}
})
if err != nil {
return err
}
defer listener.Close()
// Block until context is cancelled
<-ctx.Done()Listener Lifecycle
| Method | Description |
|---|---|
pgdb.NewListener() | Create a new listener (not started) |
listener.Start(ctx) | Acquire a dedicated connection and begin the listen loop |
listener.Listen(ctx, channel) | Subscribe to a notification channel |
listener.Unlisten(ctx, channel) | Unsubscribe from a channel |
listener.OnNotification(channel, handler) | Register a handler for a channel |
listener.Notify(ctx, channel, payload) | Send a notification |
listener.Close() | Stop the listener and release the connection |
The listener acquires a dedicated connection from the pool when started. This connection is held open for the lifetime of the listener to receive asynchronous notifications. The background goroutine runs until the context is cancelled or Close() is called. Close() is idempotent.
MongoDB: Native Change Streams
MongoDB has native change stream support through its watch() API. The mongodriver implements the ChangeSource[T] interface by wrapping MongoDB's change stream cursor, providing full before/after document support and resume tokens.
MongoDB change streams support the ChangeReplace operation in addition to insert, update, and delete.
API Summary
Types
| Type | Description |
|---|---|
ChangeOp | Operation type: ChangeInsert, ChangeUpdate, ChangeDelete, ChangeReplace |
ChangeEvent[T] | A single change event with Operation, Before, After, Timestamp, ResumeToken |
ChangeSource[T] | Driver interface for providing CDC events |
ChangeStream[T] | High-level change stream iterator |
pgdriver Types
| Type | Description |
|---|---|
Notification | A PostgreSQL NOTIFY message with Channel, Payload, PID |
Listener | Manages LISTEN/NOTIFY subscriptions on a dedicated connection |