Grove

Pipeline Transforms

Composable Map, Filter, Reduce, Chunk, Take, and ForEach operations on streams.

Overview

The stream package provides six composable transform functions that operate on Stream[T] values. These are top-level generic functions (not methods) because Go does not allow new type parameters on methods.

Transforms fall into two categories:

  • Lazy transforms return a new Stream and only execute when you iterate: Map, Filter, Take, Chunk.
  • Terminal operations consume the stream immediately and return a result: Reduce, ForEach.

All transforms respect the underlying cursor lifecycle. When the source stream is closed, derived streams stop yielding values.

Map[T, U] -- Transform Each Element

Map applies a function to every element, transforming Stream[T] into Stream[U]:

func Map[T, U any](s *Stream[T], fn func(T) (U, error)) *Stream[U]

The mapping function can return an error to stop iteration.

Example: Extract Emails from Users

pgdb := pgdriver.Unwrap(db)

userStream, err := pgdb.NewSelect(&User{}).
    Where("active = $1", true).
    Stream(ctx)
if err != nil {
    return err
}
defer userStream.Close()

emailStream := stream.Map(userStream, func(val any) (string, error) {
    user := val.(*User)
    return user.Email, nil
})

for emailStream.Next(ctx) {
    email := emailStream.Value()
    fmt.Println(email)
}
if err := emailStream.Err(); err != nil {
    return err
}

Example: Convert to API Response

type UserResponse struct {
    ID    int64  `json:"id"`
    Name  string `json:"name"`
}

responseStream := stream.Map(userStream, func(val any) (UserResponse, error) {
    user := val.(*User)
    return UserResponse{
        ID:   user.ID,
        Name: user.FirstName + " " + user.LastName,
    }, nil
})

Filter[T] -- Select Elements by Predicate

Filter yields only elements where the predicate function returns true:

func Filter[T any](s *Stream[T], fn func(T) bool) *Stream[T]

Rows that do not match the predicate are silently skipped. The cursor continues advancing until a matching row is found or the stream is exhausted.

Example: Filter by Role

adminStream := stream.Filter(userStream, func(val any) bool {
    user := val.(*User)
    return user.Role == "admin"
})

for adminStream.Next(ctx) {
    admin := adminStream.Value()
    fmt.Println(admin)
}

Note: When possible, prefer adding WHERE clauses to your query rather than filtering in application code. Use Filter for conditions that cannot be expressed in SQL or for post-processing logic.

Take[T] -- Limit the Number of Elements

Take yields at most n elements, then marks the stream as done:

func Take[T any](s *Stream[T], n int) *Stream[T]

Example: First 10 Results

top10 := stream.Take(userStream, 10)

for top10.Next(ctx) {
    user := top10.Value()
    fmt.Println(user)
}

Take is useful when you want to process a bounded subset of a potentially large stream without adding a LIMIT clause to the query, or when composing with other transforms.

Chunk[T] -- Group Into Fixed-Size Batches

Chunk groups consecutive elements into slices of the specified size, transforming Stream[T] into Stream[[]T]:

func Chunk[T any](s *Stream[T], size int) *Stream[[]T]

The last chunk may contain fewer than size elements if the stream ends mid-batch.

Example: Batch Processing

batches := stream.Chunk(userStream, 50)

for batches.Next(ctx) {
    batch := batches.Value() // []any, each element is *User
    fmt.Printf("Processing batch of %d users\n", len(batch))

    for _, val := range batch {
        user := val.(*User)
        // process each user in the batch
        _ = sendWelcomeEmail(user)
    }
}
if err := batches.Err(); err != nil {
    return err
}

Example: Bulk Insert with Chunks

batches := stream.Chunk(sourceStream, 100)

for batches.Next(ctx) {
    batch := batches.Value()
    // insert batch into another table
    _, err := pgdb.NewInsert(&batch).Exec(ctx)
    if err != nil {
        return err
    }
}

Reduce[T, A] -- Accumulate to a Single Value

Reduce consumes the entire stream, accumulating a result using the provided function:

func Reduce[T, A any](s *Stream[T], initial A, fn func(A, T) A) (A, error)

This is a terminal operation -- it drains and closes the stream.

Example: Sum Order Totals

pgdb := pgdriver.Unwrap(db)

orderStream, err := pgdb.NewSelect(&Order{}).
    Where("status = $1", "completed").
    Stream(ctx)
if err != nil {
    return 0, err
}

total, err := stream.Reduce(orderStream, 0.0, func(sum float64, val any) float64 {
    order := val.(*Order)
    return sum + order.Total
})
if err != nil {
    return 0, err
}
fmt.Printf("Total revenue: $%.2f\n", total)

Example: Collect Unique Tags

tagSet, err := stream.Reduce(postStream, make(map[string]bool), func(acc map[string]bool, val any) map[string]bool {
    post := val.(*Post)
    for _, tag := range post.Tags {
        acc[tag] = true
    }
    return acc
})

ForEach[T] -- Side-Effect on Each Element

ForEach calls a function on every element in the stream. It returns the first error encountered (from the callback or from the stream itself):

func ForEach[T any](s *Stream[T], fn func(T) error) error

This is a terminal operation -- it drains and closes the stream.

Example: Send Notifications

err := stream.ForEach(userStream, func(val any) error {
    user := val.(*User)
    return notificationService.Send(user.Email, "Your account has been updated")
})
if err != nil {
    log.Error("notification stream failed", "err", err)
}

Example: Write to CSV

writer := csv.NewWriter(file)
defer writer.Flush()

err := stream.ForEach(userStream, func(val any) error {
    user := val.(*User)
    return writer.Write([]string{
        strconv.FormatInt(user.ID, 10),
        user.Email,
        user.Name,
    })
})

Composing Pipelines

Since lazy transforms return new Stream values, you can chain them into processing pipelines:

Example: Extract, Filter, and Batch

pgdb := pgdriver.Unwrap(db)

s, err := pgdb.NewSelect(&User{}).
    Where("created_at > $1", lastMonth).
    Stream(ctx)
if err != nil {
    return err
}
defer s.Close()

// Transform: extract email addresses from active premium users, in batches of 100
emails := stream.Chunk(
    stream.Map(
        stream.Filter(s, func(val any) bool {
            user := val.(*User)
            return user.Plan == "premium"
        }),
        func(val any) (string, error) {
            user := val.(*User)
            return user.Email, nil
        },
    ),
    100,
)

for emails.Next(ctx) {
    batch := emails.Value() // []string, up to 100 emails
    if err := mailService.SendBulk(batch); err != nil {
        return err
    }
}
return emails.Err()

Example: Take Top N After Filtering

// Get the first 5 admin users from the stream
topAdmins := stream.Take(
    stream.Filter(userStream, func(val any) bool {
        return val.(*User).Role == "admin"
    }),
    5,
)

admins, err := topAdmins.Collect(ctx)

API Summary

FunctionSignatureTypeDescription
Mapfunc Map[T, U any](s *Stream[T], fn func(T) (U, error)) *Stream[U]LazyTransform each element
Filterfunc Filter[T any](s *Stream[T], fn func(T) bool) *Stream[T]LazyKeep elements matching predicate
Takefunc Take[T any](s *Stream[T], n int) *Stream[T]LazyYield at most n elements
Chunkfunc Chunk[T any](s *Stream[T], size int) *Stream[[]T]LazyGroup into fixed-size slices
Reducefunc Reduce[T, A any](s *Stream[T], initial A, fn func(A, T) A) (A, error)TerminalAccumulate to single value
ForEachfunc ForEach[T any](s *Stream[T], fn func(T) error) errorTerminalSide-effect on each element

On this page