Streaming Overview
Lazy, pull-based iteration over large result sets using server-side cursors.
Why Streams?
Traditional query methods like Scan load every result row into memory at once. This works well for small result sets, but when you query millions of rows for a report, a data export, or a batch job, loading everything upfront can exhaust memory and stall your application.
Grove's stream package solves this with lazy, pull-based iteration. A Stream[T] holds an open server-side cursor and decodes one row at a time. The database sends rows in batches (default 100), but your code sees a simple iterator that yields decoded model instances.
Key properties of streams:
- Constant memory -- only one row (or one batch) is in memory at a time.
- Server-side cursors -- the database manages the result set, not your application.
- Hook integration -- per-row privacy hooks run on every yielded row, so long-lived streams respect permission changes.
- Composable transforms --
Map,Filter,Reduce,Chunk,Take, andForEachlet you build processing pipelines without collecting results.
The stream.Cursor Interface
Every driver that supports streaming implements the Cursor interface. This is the low-level contract between the stream package and the database driver:
type Cursor interface {
Next() bool
Scan(dest ...any) error
Columns() ([]string, error)
Close() error
Err() error
}For PostgreSQL, pgdriver implements this using DECLARE CURSOR / FETCH N inside a dedicated transaction. The cursor transparently fetches new batches when the current batch is exhausted.
You do not need to interact with Cursor directly -- the driver creates it for you when you call Stream() on a select query.
The stream.DecodeFunc[T] Type
A DecodeFunc converts a cursor row into a typed value:
type DecodeFunc[T any] func(cursor Cursor) (T, error)The pgdriver builds this automatically from your model's schema metadata, mapping columns to struct fields using the same scan infrastructure as regular queries.
Creating a Stream
The most common way to create a stream is through the pgdriver's SelectQuery.Stream() method:
pgdb := pgdriver.Unwrap(db)
s, err := pgdb.NewSelect(&User{}).
Where("active = $1", true).
Stream(ctx)
if err != nil {
return err
}
defer s.Close()Stream() uses a default fetch size of 100 rows per batch. To control the batch size, use StreamBatch():
s, err := pgdb.NewSelect(&AuditLog{}).
Where("created_at > $1", since).
StreamBatch(ctx, 500) // fetch 500 rows per round-tripBoth methods open a dedicated read-only transaction for the server-side cursor. The transaction is committed automatically when you close the stream.
Iterating: Next() / Value() Loop
The standard iteration pattern uses Next() and Value():
s, err := pgdb.NewSelect(&User{}).Stream(ctx)
if err != nil {
return err
}
defer s.Close()
for s.Next(ctx) {
user := s.Value() // returns the decoded model (any type)
fmt.Println(user)
}
if err := s.Err(); err != nil {
return err
}Next(ctx) advances the cursor, decodes the next row, runs any configured hooks, and returns true if a value is available. It returns false when the cursor is exhausted or an error occurs. Always check Err() after the loop.
The context.Context passed to Next is checked on every iteration, so cancellation stops the stream promptly.
Iterating: All for Range-Over-Func (Go 1.23+)
If you are using Go 1.23 or later, you can iterate with the range-over-func syntax via All:
for val, err := range s.All {
if err != nil {
return err
}
user := val.(*User)
fmt.Println(user.Email)
}All closes the stream automatically when iteration completes or when you break out of the loop. You do not need to defer Close() separately when using All.
Collecting and Counting
For convenience, Stream[T] provides two drain methods:
// Collect drains the entire stream into a slice.
users, err := s.Collect(ctx)
// Count drains the stream, counting rows without allocating models.
n, err := s.Count(ctx)Both methods close the stream when they finish. Use Collect when you need all the data; use Count when you only need the total.
Hook Integration
Streams integrate with Grove's hook system through the StreamRowHook interface. When hooks are configured, every row decoded from the cursor is passed through registered StreamRowHook handlers before being yielded to your code.
A hook can return one of three decisions:
| Decision | Effect |
|---|---|
Allow | Row is yielded normally |
Skip | Row is silently skipped; iteration continues to the next row |
Deny | Iteration stops immediately with an error |
This is critical for long-lived streams where permissions can change during iteration. Pre-query hooks (filter injection, deny) run once when the stream is opened. Stream row hooks run per-row as each row is decoded.
Example: Per-Row Tenant Filtering
type TenantRowFilter struct {
TenantID string
}
func (f *TenantRowFilter) OnStreamRow(ctx context.Context, qc *hook.QueryContext, row any) (hook.Decision, error) {
type tenanted interface { GetTenantID() string }
if t, ok := row.(tenanted); ok {
if t.GetTenantID() != f.TenantID {
return hook.Skip, nil
}
}
return hook.Allow, nil
}
// Register the hook:
db.Hooks().AddHook(&TenantRowFilter{TenantID: "acme"}, hook.Scope{
Tables: []string{"users"},
})Programmatic Hook Attachment
You can also attach hooks directly to a stream using NewWithHooks or the WithHooks method:
// Using NewWithHooks:
s := stream.NewWithHooks[any](cursor, decode, hookRunner, queryCtx)
// Using the chaining method:
s := stream.New[any](cursor, decode).WithHooks(hookRunner, queryCtx)When you create streams through the pgdriver's Stream() method, hooks from the database's hook engine are attached automatically.
Closing Streams
Streams hold open server-side cursors and database connections. Always close them:
s, err := pgdb.NewSelect(&User{}).Stream(ctx)
if err != nil {
return err
}
defer s.Close()Close() is idempotent -- calling it multiple times is safe. For pgdriver streams, closing the stream sends CLOSE <cursor_name> to the server and commits the dedicated transaction.
If you use All, Collect, or Count, the stream is closed automatically.
Error Handling
Errors can arise from:
- Cursor iteration -- network errors, query timeouts, or database errors during
FETCH. - Decoding -- type mismatches or scan failures when mapping columns to struct fields.
- Hooks -- a
StreamRowHookreturning an error or aDenydecision. - Context cancellation -- the context passed to
Next()is cancelled.
In all cases, Next() returns false and the error is available via Err():
for s.Next(ctx) {
// process s.Value()
}
if err := s.Err(); err != nil {
// handle: could be a database error, decode error, hook denial, or context cancellation
log.Error("stream failed", "err", err)
}BatchCursor for Paginated Sources
The stream package also provides BatchCursor, which implements the Cursor interface using paginated fetches. This is useful for sources that do not support true server-side cursors but can return pages of results:
bc := stream.NewBatchCursor(
func(offset, limit int) (stream.Cursor, error) {
// fetch a page of results from your data source
return fetchPage(offset, limit)
},
200, // batch size
)
s := stream.New[MyModel](bc, myDecodeFunc)
defer s.Close()BatchCursor calls the fetch function each time the current batch is exhausted, passing the current offset and batch size. It yields rows one at a time, transparently managing batch boundaries.
API Summary
stream.Stream[T]
| Method | Signature | Description |
|---|---|---|
New | func New[T any](cursor Cursor, decode DecodeFunc[T]) *Stream[T] | Create a stream from a cursor and decoder |
NewWithHooks | func NewWithHooks[T any](cursor Cursor, decode DecodeFunc[T], hooks HookRunner, qc any) *Stream[T] | Create a stream with per-row hooks |
WithHooks | func (s *Stream[T]) WithHooks(runner HookRunner, qc any) *Stream[T] | Attach hooks (chainable) |
Next | func (s *Stream[T]) Next(ctx context.Context) bool | Advance to next row |
Value | func (s *Stream[T]) Value() T | Get current decoded value |
Err | func (s *Stream[T]) Err() error | Get first error encountered |
Close | func (s *Stream[T]) Close() error | Release cursor and connection |
All | func (s *Stream[T]) All(yield func(T, error) bool) | Range-over-func iterator (Go 1.23+) |
Collect | func (s *Stream[T]) Collect(ctx context.Context) ([]T, error) | Drain stream into a slice |
Count | func (s *Stream[T]) Count(ctx context.Context) (int64, error) | Count rows without allocation |
stream.Cursor
| Method | Signature | Description |
|---|---|---|
Next | Next() bool | Advance to next row |
Scan | Scan(dest ...any) error | Copy column values into dest |
Columns | Columns() ([]string, error) | Get column names |
Close | Close() error | Release resources |
Err | Err() error | Get iteration error |
stream.BatchCursor
| Method | Signature | Description |
|---|---|---|
NewBatchCursor | func NewBatchCursor(fetchBatch func(offset, limit int) (Cursor, error), batchSize int) *BatchCursor | Create a paginated cursor |
Next | Next() bool | Advance, fetching new batches as needed |
Scan | Scan(dest ...any) error | Copy column values |
Columns | Columns() ([]string, error) | Get column names |
Close | Close() error | Release resources |
Err | Err() error | Get iteration error |