Grove

Go Packages

Reference for all Grove Go packages.

Package Index

PackageImport PathDescription
grovegithub.com/xraph/groveCore types: DB, BaseModel, Open(), RegisterModel()
schemagithub.com/xraph/grove/schemaSchema reflection: Table, Field, Relation, Registry
drivergithub.com/xraph/grove/driverDriver and Dialect interfaces
scangithub.com/xraph/grove/scanResult scanning with cached field maps
hookgithub.com/xraph/grove/hookPrivacy hooks: PreQuery, PostQuery, PreMutation, PostMutation, StreamRowHook
streamgithub.com/xraph/grove/streamStreaming iterator: Stream[T], pipeline transforms, ChangeStream[T]
migrategithub.com/xraph/grove/migrateMigration system: Group, Migration, Orchestrator
pgdrivergithub.com/xraph/grove/drivers/pgdriverPostgreSQL driver (pgx-based)
mysqldrivergithub.com/xraph/grove/drivers/mysqldriverMySQL driver
sqlitedrivergithub.com/xraph/grove/drivers/sqlitedriverSQLite driver (pure Go)
mongodrivergithub.com/xraph/grove/drivers/mongodriverMongoDB driver
tursodrivergithub.com/xraph/grove/drivers/tursodriverTurso/libSQL driver
clickhousedrivergithub.com/xraph/grove/drivers/clickhousedriverClickHouse driver
esdrivergithub.com/xraph/grove/drivers/esdriverElasticsearch driver
plugingithub.com/xraph/grove/pluginPlugin interfaces and registry
grovetestgithub.com/xraph/grove/grovetestTesting utilities: mock driver, fixtures, assertions
extensiongithub.com/xraph/grove/extensionForge extension entry point
auditgithub.com/xraph/grove/auditChronicle audit plugin
observabilitygithub.com/xraph/grove/observabilityPrometheus metrics plugin
internal/tagparsergithub.com/xraph/grove/internal/tagparserHigh-performance struct tag parser
internal/poolgithub.com/xraph/grove/internal/poolsync.Pool-based byte buffer pool
internal/safegithub.com/xraph/grove/internal/safeSafe identifier quoting
KV Module
kvgithub.com/xraph/grove/kvCore KV Store: Open, Get, Set, Delete, Exists, MGet, MSet
kv/drivergithub.com/xraph/grove/kv/driverKV Driver interface, capabilities, optional interfaces
kv/codecgithub.com/xraph/grove/kv/codecCodecs: JSON, MsgPack, Protobuf, Gob
kv/keyspacegithub.com/xraph/grove/kv/keyspaceKeyspace[T] typed key-value partitions
kv/middlewaregithub.com/xraph/grove/kv/middlewareMiddleware: cache, retry, circuit, compress, encrypt, namespace, logging, stampede
kv/crdtgithub.com/xraph/grove/kv/crdtCRDT adapter: Counter, Register[T], Set[T], Map, Syncer
kv/pluginsgithub.com/xraph/grove/kv/pluginsPlugins: lock, ratelimit, session, counter, leaderboard, queue
kv/extensiongithub.com/xraph/grove/kv/extensionForge extension: multi-store KV with DI, config, lifecycle
kv/redisdrivergithub.com/xraph/grove/kv/drivers/redisdriverRedis KV driver (go-redis)
kv/memcacheddrivergithub.com/xraph/grove/kv/drivers/memcacheddriverMemcached KV driver (gomemcache)
kv/dynamodrivergithub.com/xraph/grove/kv/drivers/dynamodriverDynamoDB KV driver (aws-sdk-go-v2)
kv/boltdrivergithub.com/xraph/grove/kv/drivers/boltdriverBoltDB KV driver (bbolt)
kv/badgerdrivergithub.com/xraph/grove/kv/drivers/badgerdriverBadger KV driver (badger/v4)
kv/kvtestgithub.com/xraph/grove/kv/kvtestKV conformance test suite and mock driver

Core Types

grove.DB

type DB struct {
    // contains filtered or unexported fields
}

// Open creates a new DB with an already-connected driver.
// The driver must be connected before calling Open (call driver.Open first).
//
//   pgdb := pgdriver.New()
//   pgdb.Open(ctx, "postgres://localhost:5432/mydb")
//   db, err := grove.Open(pgdb)
func Open(drv GroveDriver, opts ...Option) (*DB, error)

func (db *DB) Close() error
func (db *DB) Ping(ctx context.Context) error
func (db *DB) RegisterModel(models ...any)

// Driver returns the underlying GroveDriver.
// Use with pgdriver.Unwrap(db) for typed access.
func (db *DB) Driver() GroveDriver

// Hooks returns the hook engine for registering lifecycle hooks.
//   db.Hooks().AddHook(&TenantIsolation{}, hook.Scope{Tables: []string{"users"}})
func (db *DB) Hooks() *hook.Engine

// Query builders return any; use pgdriver.Unwrap(db) for typed builders.
func (db *DB) NewSelect(model ...any) any
func (db *DB) NewInsert(model any) any
func (db *DB) NewUpdate(model any) any
func (db *DB) NewDelete(model any) any

func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error)

grove.BaseModel

type BaseModel struct{}

Embed in your model structs to declare them as Grove models. Tag with grove:"table:name,alias:a".

grove.Option

func WithPoolSize(n int) Option
func WithQueryTimeout(d time.Duration) Option
func WithLogger(l *slog.Logger) Option
func WithPlugin(p plugin.Plugin) Option

Driver Interfaces

driver.Driver

type Driver interface {
    Name() string
    Open(ctx context.Context, dsn string, opts ...Option) error
    Close() error
    Dialect() Dialect
    Ping(ctx context.Context) error
    BeginTx(ctx context.Context, opts *TxOptions) (Tx, error)
    Exec(ctx context.Context, query string, args ...any) (Result, error)
    Query(ctx context.Context, query string, args ...any) (Rows, error)
    QueryRow(ctx context.Context, query string, args ...any) Row
    SupportsReturning() bool
}

driver.StreamCapable

Optional interface for drivers that support streaming and CDC:

type StreamCapable interface {
    SupportsStreaming() bool
    SupportsCDC() bool
}

driver.Dialect

type Dialect interface {
    Name() string
    Quote(ident string) string
    Placeholder(index int) string
    GoToDBType(goType reflect.Type) string
    AppendBytes(b []byte, data []byte) []byte
    AppendTime(b []byte, t time.Time) []byte
}

Hook Types

hook.Scope

type Scope struct {
    Tables     []string    // restrict to these tables (empty = all)
    Operations []Operation // restrict to these operations (empty = all)
    Priority   int         // execution order, lower = earlier (default: 100)
}

Hook Interfaces

type PreQueryHook interface {
    BeforeQuery(ctx context.Context, qc *QueryContext) (*HookResult, error)
}

type PostQueryHook interface {
    AfterQuery(ctx context.Context, qc *QueryContext, result any) error
}

type PreMutationHook interface {
    BeforeMutation(ctx context.Context, qc *QueryContext, data any) (*HookResult, error)
}

type PostMutationHook interface {
    AfterMutation(ctx context.Context, qc *QueryContext, data any, result any) error
}

type StreamRowHook interface {
    OnStreamRow(ctx context.Context, qc *QueryContext, row any) (Decision, error)
}

hook.Decision

type Decision int

const (
    Allow  Decision = iota // proceed with the query
    Deny                    // block the query, return error
    Modify                  // hook modified query context
    Skip                    // exclude this row (post-query / stream)
)

Stream Types

stream.Stream[T]

func New[T any](cursor Cursor, decode DecodeFunc[T]) *Stream[T]
func NewWithHooks[T any](cursor Cursor, decode DecodeFunc[T], hooks HookRunner, qc any) *Stream[T]

func (s *Stream[T]) WithHooks(runner HookRunner, qc any) *Stream[T]
func (s *Stream[T]) Next(ctx context.Context) bool
func (s *Stream[T]) Value() T
func (s *Stream[T]) Err() error
func (s *Stream[T]) Close() error
func (s *Stream[T]) All(yield func(T, error) bool)           // Go 1.23+ range-over-func
func (s *Stream[T]) Collect(ctx context.Context) ([]T, error)
func (s *Stream[T]) Count(ctx context.Context) (int64, error)

Pipeline Transforms

func Map[T, U any](s *Stream[T], fn func(T) (U, error)) *Stream[U]
func Filter[T any](s *Stream[T], fn func(T) bool) *Stream[T]
func Take[T any](s *Stream[T], n int) *Stream[T]
func Chunk[T any](s *Stream[T], size int) *Stream[[]T]
func Reduce[T, A any](s *Stream[T], initial A, fn func(A, T) A) (A, error)
func ForEach[T any](s *Stream[T], fn func(T) error) error

stream.ChangeStream[T]

func NewChangeStream[T any](source ChangeSource[T]) *ChangeStream[T]

func (cs *ChangeStream[T]) Next(ctx context.Context) bool
func (cs *ChangeStream[T]) Event() ChangeEvent[T]
func (cs *ChangeStream[T]) Err() error
func (cs *ChangeStream[T]) Close() error
func (cs *ChangeStream[T]) ResumeToken() any
func (cs *ChangeStream[T]) All(yield func(ChangeEvent[T], error) bool)

stream.ChangeEvent[T]

type ChangeEvent[T any] struct {
    Operation   ChangeOp
    Before      *T          // previous state (nil for inserts)
    After       *T          // new state (nil for deletes)
    Timestamp   time.Time
    ResumeToken any
}

pgdriver Types

pgdriver.Unwrap

// Unwrap extracts the underlying *PgDB from a *grove.DB handle.
// Panics if the driver is not a *PgDB.
func Unwrap(db *grove.DB) *PgDB

Usage:

pgdb := pgdriver.Unwrap(db)
pgdb.NewSelect(&User{}).Where("active = $1", true).Scan(ctx, &users)

pgdriver.PgDB streaming methods

// Stream opens a server-side cursor with default fetch size (100 rows/batch).
func (q *SelectQuery) Stream(ctx context.Context) (*stream.Stream[any], error)

// StreamBatch opens a server-side cursor with custom fetch size.
func (q *SelectQuery) StreamBatch(ctx context.Context, fetchSize int) (*stream.Stream[any], error)

pgdriver.Listener

func (db *PgDB) NewListener() *Listener
func (db *PgDB) Listen(ctx context.Context, channel string, handler func(*Notification)) (*Listener, error)

func (l *Listener) Start(ctx context.Context) error
func (l *Listener) Listen(ctx context.Context, channel string) error
func (l *Listener) Unlisten(ctx context.Context, channel string) error
func (l *Listener) OnNotification(channel string, handler func(*Notification))
func (l *Listener) Notify(ctx context.Context, channel, payload string) error
func (l *Listener) Close() error

Sentinel Errors

var (
    ErrNoRows             = errors.New("grove: no rows in result set")
    ErrModelNotRegistered = errors.New("grove: model not registered")
    ErrNotSupported       = errors.New("grove: operation not supported by driver")
    ErrHookDenied         = errors.New("grove: hook denied the operation")
    ErrMigrationFailed    = errors.New("grove: migration failed")
    ErrMigrationLocked    = errors.New("grove: migration lock held by another process")
    ErrCyclicDependency   = errors.New("grove: cyclic dependency in migration groups")
)

On this page