Pipeline Transforms
Composable Map, Filter, Reduce, Chunk, Take, and ForEach operations on streams.
Overview
The stream package provides six composable transform functions that operate on Stream[T] values. These are top-level generic functions (not methods) because Go does not allow new type parameters on methods.
Transforms fall into two categories:
- Lazy transforms return a new
Streamand only execute when you iterate:Map,Filter,Take,Chunk. - Terminal operations consume the stream immediately and return a result:
Reduce,ForEach.
All transforms respect the underlying cursor lifecycle. When the source stream is closed, derived streams stop yielding values.
Map[T, U] -- Transform Each Element
Map applies a function to every element, transforming Stream[T] into Stream[U]:
func Map[T, U any](s *Stream[T], fn func(T) (U, error)) *Stream[U]The mapping function can return an error to stop iteration.
Example: Extract Emails from Users
pgdb := pgdriver.Unwrap(db)
userStream, err := pgdb.NewSelect(&User{}).
Where("active = $1", true).
Stream(ctx)
if err != nil {
return err
}
defer userStream.Close()
emailStream := stream.Map(userStream, func(val any) (string, error) {
user := val.(*User)
return user.Email, nil
})
for emailStream.Next(ctx) {
email := emailStream.Value()
fmt.Println(email)
}
if err := emailStream.Err(); err != nil {
return err
}Example: Convert to API Response
type UserResponse struct {
ID int64 `json:"id"`
Name string `json:"name"`
}
responseStream := stream.Map(userStream, func(val any) (UserResponse, error) {
user := val.(*User)
return UserResponse{
ID: user.ID,
Name: user.FirstName + " " + user.LastName,
}, nil
})Filter[T] -- Select Elements by Predicate
Filter yields only elements where the predicate function returns true:
func Filter[T any](s *Stream[T], fn func(T) bool) *Stream[T]Rows that do not match the predicate are silently skipped. The cursor continues advancing until a matching row is found or the stream is exhausted.
Example: Filter by Role
adminStream := stream.Filter(userStream, func(val any) bool {
user := val.(*User)
return user.Role == "admin"
})
for adminStream.Next(ctx) {
admin := adminStream.Value()
fmt.Println(admin)
}Note: When possible, prefer adding WHERE clauses to your query rather than filtering in application code. Use Filter for conditions that cannot be expressed in SQL or for post-processing logic.
Take[T] -- Limit the Number of Elements
Take yields at most n elements, then marks the stream as done:
func Take[T any](s *Stream[T], n int) *Stream[T]Example: First 10 Results
top10 := stream.Take(userStream, 10)
for top10.Next(ctx) {
user := top10.Value()
fmt.Println(user)
}Take is useful when you want to process a bounded subset of a potentially large stream without adding a LIMIT clause to the query, or when composing with other transforms.
Chunk[T] -- Group Into Fixed-Size Batches
Chunk groups consecutive elements into slices of the specified size, transforming Stream[T] into Stream[[]T]:
func Chunk[T any](s *Stream[T], size int) *Stream[[]T]The last chunk may contain fewer than size elements if the stream ends mid-batch.
Example: Batch Processing
batches := stream.Chunk(userStream, 50)
for batches.Next(ctx) {
batch := batches.Value() // []any, each element is *User
fmt.Printf("Processing batch of %d users\n", len(batch))
for _, val := range batch {
user := val.(*User)
// process each user in the batch
_ = sendWelcomeEmail(user)
}
}
if err := batches.Err(); err != nil {
return err
}Example: Bulk Insert with Chunks
batches := stream.Chunk(sourceStream, 100)
for batches.Next(ctx) {
batch := batches.Value()
// insert batch into another table
_, err := pgdb.NewInsert(&batch).Exec(ctx)
if err != nil {
return err
}
}Reduce[T, A] -- Accumulate to a Single Value
Reduce consumes the entire stream, accumulating a result using the provided function:
func Reduce[T, A any](s *Stream[T], initial A, fn func(A, T) A) (A, error)This is a terminal operation -- it drains and closes the stream.
Example: Sum Order Totals
pgdb := pgdriver.Unwrap(db)
orderStream, err := pgdb.NewSelect(&Order{}).
Where("status = $1", "completed").
Stream(ctx)
if err != nil {
return 0, err
}
total, err := stream.Reduce(orderStream, 0.0, func(sum float64, val any) float64 {
order := val.(*Order)
return sum + order.Total
})
if err != nil {
return 0, err
}
fmt.Printf("Total revenue: $%.2f\n", total)Example: Collect Unique Tags
tagSet, err := stream.Reduce(postStream, make(map[string]bool), func(acc map[string]bool, val any) map[string]bool {
post := val.(*Post)
for _, tag := range post.Tags {
acc[tag] = true
}
return acc
})ForEach[T] -- Side-Effect on Each Element
ForEach calls a function on every element in the stream. It returns the first error encountered (from the callback or from the stream itself):
func ForEach[T any](s *Stream[T], fn func(T) error) errorThis is a terminal operation -- it drains and closes the stream.
Example: Send Notifications
err := stream.ForEach(userStream, func(val any) error {
user := val.(*User)
return notificationService.Send(user.Email, "Your account has been updated")
})
if err != nil {
log.Error("notification stream failed", "err", err)
}Example: Write to CSV
writer := csv.NewWriter(file)
defer writer.Flush()
err := stream.ForEach(userStream, func(val any) error {
user := val.(*User)
return writer.Write([]string{
strconv.FormatInt(user.ID, 10),
user.Email,
user.Name,
})
})Composing Pipelines
Since lazy transforms return new Stream values, you can chain them into processing pipelines:
Example: Extract, Filter, and Batch
pgdb := pgdriver.Unwrap(db)
s, err := pgdb.NewSelect(&User{}).
Where("created_at > $1", lastMonth).
Stream(ctx)
if err != nil {
return err
}
defer s.Close()
// Transform: extract email addresses from active premium users, in batches of 100
emails := stream.Chunk(
stream.Map(
stream.Filter(s, func(val any) bool {
user := val.(*User)
return user.Plan == "premium"
}),
func(val any) (string, error) {
user := val.(*User)
return user.Email, nil
},
),
100,
)
for emails.Next(ctx) {
batch := emails.Value() // []string, up to 100 emails
if err := mailService.SendBulk(batch); err != nil {
return err
}
}
return emails.Err()Example: Take Top N After Filtering
// Get the first 5 admin users from the stream
topAdmins := stream.Take(
stream.Filter(userStream, func(val any) bool {
return val.(*User).Role == "admin"
}),
5,
)
admins, err := topAdmins.Collect(ctx)API Summary
| Function | Signature | Type | Description |
|---|---|---|---|
Map | func Map[T, U any](s *Stream[T], fn func(T) (U, error)) *Stream[U] | Lazy | Transform each element |
Filter | func Filter[T any](s *Stream[T], fn func(T) bool) *Stream[T] | Lazy | Keep elements matching predicate |
Take | func Take[T any](s *Stream[T], n int) *Stream[T] | Lazy | Yield at most n elements |
Chunk | func Chunk[T any](s *Stream[T], size int) *Stream[[]T] | Lazy | Group into fixed-size slices |
Reduce | func Reduce[T, A any](s *Stream[T], initial A, fn func(A, T) A) (A, error) | Terminal | Accumulate to single value |
ForEach | func ForEach[T any](s *Stream[T], fn func(T) error) error | Terminal | Side-effect on each element |