Grove

Streaming Overview

Lazy, pull-based iteration over large result sets using server-side cursors.

Why Streams?

Traditional query methods like Scan load every result row into memory at once. This works well for small result sets, but when you query millions of rows for a report, a data export, or a batch job, loading everything upfront can exhaust memory and stall your application.

Grove's stream package solves this with lazy, pull-based iteration. A Stream[T] holds an open server-side cursor and decodes one row at a time. The database sends rows in batches (default 100), but your code sees a simple iterator that yields decoded model instances.

Key properties of streams:

  • Constant memory -- only one row (or one batch) is in memory at a time.
  • Server-side cursors -- the database manages the result set, not your application.
  • Hook integration -- per-row privacy hooks run on every yielded row, so long-lived streams respect permission changes.
  • Composable transforms -- Map, Filter, Reduce, Chunk, Take, and ForEach let you build processing pipelines without collecting results.

The stream.Cursor Interface

Every driver that supports streaming implements the Cursor interface. This is the low-level contract between the stream package and the database driver:

type Cursor interface {
    Next() bool
    Scan(dest ...any) error
    Columns() ([]string, error)
    Close() error
    Err() error
}

For PostgreSQL, pgdriver implements this using DECLARE CURSOR / FETCH N inside a dedicated transaction. The cursor transparently fetches new batches when the current batch is exhausted.

You do not need to interact with Cursor directly -- the driver creates it for you when you call Stream() on a select query.

The stream.DecodeFunc[T] Type

A DecodeFunc converts a cursor row into a typed value:

type DecodeFunc[T any] func(cursor Cursor) (T, error)

The pgdriver builds this automatically from your model's schema metadata, mapping columns to struct fields using the same scan infrastructure as regular queries.

Creating a Stream

The most common way to create a stream is through the pgdriver's SelectQuery.Stream() method:

pgdb := pgdriver.Unwrap(db)

s, err := pgdb.NewSelect(&User{}).
    Where("active = $1", true).
    Stream(ctx)
if err != nil {
    return err
}
defer s.Close()

Stream() uses a default fetch size of 100 rows per batch. To control the batch size, use StreamBatch():

s, err := pgdb.NewSelect(&AuditLog{}).
    Where("created_at > $1", since).
    StreamBatch(ctx, 500) // fetch 500 rows per round-trip

Both methods open a dedicated read-only transaction for the server-side cursor. The transaction is committed automatically when you close the stream.

Iterating: Next() / Value() Loop

The standard iteration pattern uses Next() and Value():

s, err := pgdb.NewSelect(&User{}).Stream(ctx)
if err != nil {
    return err
}
defer s.Close()

for s.Next(ctx) {
    user := s.Value() // returns the decoded model (any type)
    fmt.Println(user)
}
if err := s.Err(); err != nil {
    return err
}

Next(ctx) advances the cursor, decodes the next row, runs any configured hooks, and returns true if a value is available. It returns false when the cursor is exhausted or an error occurs. Always check Err() after the loop.

The context.Context passed to Next is checked on every iteration, so cancellation stops the stream promptly.

Iterating: All for Range-Over-Func (Go 1.23+)

If you are using Go 1.23 or later, you can iterate with the range-over-func syntax via All:

for val, err := range s.All {
    if err != nil {
        return err
    }
    user := val.(*User)
    fmt.Println(user.Email)
}

All closes the stream automatically when iteration completes or when you break out of the loop. You do not need to defer Close() separately when using All.

Collecting and Counting

For convenience, Stream[T] provides two drain methods:

// Collect drains the entire stream into a slice.
users, err := s.Collect(ctx)

// Count drains the stream, counting rows without allocating models.
n, err := s.Count(ctx)

Both methods close the stream when they finish. Use Collect when you need all the data; use Count when you only need the total.

Hook Integration

Streams integrate with Grove's hook system through the StreamRowHook interface. When hooks are configured, every row decoded from the cursor is passed through registered StreamRowHook handlers before being yielded to your code.

A hook can return one of three decisions:

DecisionEffect
AllowRow is yielded normally
SkipRow is silently skipped; iteration continues to the next row
DenyIteration stops immediately with an error

This is critical for long-lived streams where permissions can change during iteration. Pre-query hooks (filter injection, deny) run once when the stream is opened. Stream row hooks run per-row as each row is decoded.

Example: Per-Row Tenant Filtering

type TenantRowFilter struct {
    TenantID string
}

func (f *TenantRowFilter) OnStreamRow(ctx context.Context, qc *hook.QueryContext, row any) (hook.Decision, error) {
    type tenanted interface { GetTenantID() string }
    if t, ok := row.(tenanted); ok {
        if t.GetTenantID() != f.TenantID {
            return hook.Skip, nil
        }
    }
    return hook.Allow, nil
}

// Register the hook:
db.Hooks().AddHook(&TenantRowFilter{TenantID: "acme"}, hook.Scope{
    Tables: []string{"users"},
})

Programmatic Hook Attachment

You can also attach hooks directly to a stream using NewWithHooks or the WithHooks method:

// Using NewWithHooks:
s := stream.NewWithHooks[any](cursor, decode, hookRunner, queryCtx)

// Using the chaining method:
s := stream.New[any](cursor, decode).WithHooks(hookRunner, queryCtx)

When you create streams through the pgdriver's Stream() method, hooks from the database's hook engine are attached automatically.

Closing Streams

Streams hold open server-side cursors and database connections. Always close them:

s, err := pgdb.NewSelect(&User{}).Stream(ctx)
if err != nil {
    return err
}
defer s.Close()

Close() is idempotent -- calling it multiple times is safe. For pgdriver streams, closing the stream sends CLOSE <cursor_name> to the server and commits the dedicated transaction.

If you use All, Collect, or Count, the stream is closed automatically.

Error Handling

Errors can arise from:

  1. Cursor iteration -- network errors, query timeouts, or database errors during FETCH.
  2. Decoding -- type mismatches or scan failures when mapping columns to struct fields.
  3. Hooks -- a StreamRowHook returning an error or a Deny decision.
  4. Context cancellation -- the context passed to Next() is cancelled.

In all cases, Next() returns false and the error is available via Err():

for s.Next(ctx) {
    // process s.Value()
}
if err := s.Err(); err != nil {
    // handle: could be a database error, decode error, hook denial, or context cancellation
    log.Error("stream failed", "err", err)
}

BatchCursor for Paginated Sources

The stream package also provides BatchCursor, which implements the Cursor interface using paginated fetches. This is useful for sources that do not support true server-side cursors but can return pages of results:

bc := stream.NewBatchCursor(
    func(offset, limit int) (stream.Cursor, error) {
        // fetch a page of results from your data source
        return fetchPage(offset, limit)
    },
    200, // batch size
)

s := stream.New[MyModel](bc, myDecodeFunc)
defer s.Close()

BatchCursor calls the fetch function each time the current batch is exhausted, passing the current offset and batch size. It yields rows one at a time, transparently managing batch boundaries.

API Summary

stream.Stream[T]

MethodSignatureDescription
Newfunc New[T any](cursor Cursor, decode DecodeFunc[T]) *Stream[T]Create a stream from a cursor and decoder
NewWithHooksfunc NewWithHooks[T any](cursor Cursor, decode DecodeFunc[T], hooks HookRunner, qc any) *Stream[T]Create a stream with per-row hooks
WithHooksfunc (s *Stream[T]) WithHooks(runner HookRunner, qc any) *Stream[T]Attach hooks (chainable)
Nextfunc (s *Stream[T]) Next(ctx context.Context) boolAdvance to next row
Valuefunc (s *Stream[T]) Value() TGet current decoded value
Errfunc (s *Stream[T]) Err() errorGet first error encountered
Closefunc (s *Stream[T]) Close() errorRelease cursor and connection
Allfunc (s *Stream[T]) All(yield func(T, error) bool)Range-over-func iterator (Go 1.23+)
Collectfunc (s *Stream[T]) Collect(ctx context.Context) ([]T, error)Drain stream into a slice
Countfunc (s *Stream[T]) Count(ctx context.Context) (int64, error)Count rows without allocation

stream.Cursor

MethodSignatureDescription
NextNext() boolAdvance to next row
ScanScan(dest ...any) errorCopy column values into dest
ColumnsColumns() ([]string, error)Get column names
CloseClose() errorRelease resources
ErrErr() errorGet iteration error

stream.BatchCursor

MethodSignatureDescription
NewBatchCursorfunc NewBatchCursor(fetchBatch func(offset, limit int) (Cursor, error), batchSize int) *BatchCursorCreate a paginated cursor
NextNext() boolAdvance, fetching new batches as needed
ScanScan(dest ...any) errorCopy column values
ColumnsColumns() ([]string, error)Get column names
CloseClose() errorRelease resources
ErrErr() errorGet iteration error

On this page