Grove
KV Drivers

Redis Driver

Redis driver for Grove KV with support for TTL, CAS, Scan, Batch, PubSub, Transactions, and Streams.

The Redis driver connects Grove KV to Redis, Valkey, or DragonflyDB using go-redis. It implements all optional KV interfaces including batch, TTL, scan, CAS, PubSub, transactions, and streams.

Installation

go get github.com/xraph/grove/kv
go get github.com/xraph/grove/kv/drivers/redisdriver

Connection

import (
    "github.com/xraph/grove/kv"
    "github.com/xraph/grove/kv/drivers/redisdriver"
)

// Create driver and open the store
store, err := kv.Open(redisdriver.New(), "redis://localhost:6379")
if err != nil {
    log.Fatal(err)
}
defer store.Close()

The DSN follows the standard Redis URL format: redis://[user:password@]host:port[/db].

Capabilities

CapabilitySupported
TTLYes
CAS (SetNX/SetXX)Yes
ScanYes
Batch (MGet/MSet)Yes
PubSubYes
TransactionsYes
StreamsYes
Sorted SetsYes

Cluster Mode

Connect to a Redis Cluster by calling OpenCluster directly on the driver:

import (
    "github.com/xraph/grove/kv"
    "github.com/xraph/grove/kv/drivers/redisdriver"
    "github.com/xraph/grove/kv/driver"
)

rdb := redisdriver.New()
err := rdb.OpenCluster(ctx, []string{
    "redis-node-1:6379",
    "redis-node-2:6379",
    "redis-node-3:6379",
}, driver.WithPoolSize(20))
if err != nil {
    log.Fatal(err)
}

store, err := kv.Open(rdb)
if err != nil {
    log.Fatal(err)
}
defer store.Close()

Sentinel Mode

Connect through Redis Sentinel for high availability:

rdb := redisdriver.New()
err := rdb.OpenSentinel(ctx, "mymaster", []string{
    "sentinel-1:26379",
    "sentinel-2:26379",
    "sentinel-3:26379",
})
if err != nil {
    log.Fatal(err)
}

store, err := kv.Open(rdb)
if err != nil {
    log.Fatal(err)
}
defer store.Close()

Unwrap for Native Access

Use Unwrap to access the underlying RedisDB driver or UnwrapClient to get the go-redis UniversalClient directly:

// Get the RedisDB driver instance
rdb := redisdriver.Unwrap(store)

// Or get the go-redis UniversalClient directly
client := redisdriver.UnwrapClient(store)
result, err := client.Get(ctx, "raw-key").Result()

Advanced Features

Pipeline

Accumulate multiple commands and execute them in a single round-trip:

rdb := redisdriver.Unwrap(store)

// Non-transactional pipeline
pipe := rdb.Pipeline()
pipe.Set(ctx, "key1", "val1", 0)
pipe.Set(ctx, "key2", "val2", 0)
pipe.Get(ctx, "key1")
cmds, err := pipe.Exec(ctx)

// Transactional pipeline (MULTI/EXEC)
txPipe := rdb.TxPipeline()
txPipe.Set(ctx, "key1", "val1", 0)
txPipe.Incr(ctx, "counter")
cmds, err = txPipe.Exec(ctx)

Watch (Optimistic Locking)

Execute a transactional WATCH/MULTI/EXEC block on specific keys:

rdb := redisdriver.Unwrap(store)

err := rdb.Watch(ctx, func(tx *redis.Tx) error {
    val, err := tx.Get(ctx, "balance").Int64()
    if err != nil {
        return err
    }

    _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
        pipe.Set(ctx, "balance", val+100, 0)
        return nil
    })
    return err
}, "balance")

PubSub

Publish and subscribe to Redis channels:

rdb := redisdriver.Unwrap(store)
ps := rdb.NewPubSub()

// Publish a message
err := ps.Publish(ctx, "events", "user:created")

// Subscribe and handle messages (blocks until context is cancelled)
err = ps.Subscribe(ctx, func(channel string, payload string) {
    fmt.Printf("channel=%s payload=%s\n", channel, payload)
}, "events", "notifications")

// For advanced use, get the raw go-redis PubSub handle
sub := ps.SubscribeRaw(ctx, "events")
defer sub.Close()

Streams

Work with Redis Streams for event-driven architectures:

rdb := redisdriver.Unwrap(store)
stream := rdb.NewStream()

// Add an entry to a stream
id, err := stream.XAdd(ctx, "orders", map[string]any{
    "user_id": "u123",
    "amount":  99.99,
})

// Create a consumer group
err = stream.XGroupCreate(ctx, "orders", "processors", "0")

// Read from the stream as a consumer
messages, err := stream.XReadGroup(ctx, "processors", "worker-1",
    []string{"orders", ">"}, 10)

// Acknowledge processed messages
count, err := stream.XAck(ctx, "orders", "processors", id)

// Get stream length
length, err := stream.XLen(ctx, "orders")

Lua Scripts

Execute Lua scripts on the server for atomic operations:

rdb := redisdriver.Unwrap(store)

// Create a reusable script (cached via EVALSHA)
script := redisdriver.NewScript(`
    local current = redis.call("GET", KEYS[1])
    if current == false then
        redis.call("SET", KEYS[1], ARGV[1])
        return 1
    end
    return 0
`)

result := script.Run(ctx, rdb, []string{"my-key"}, "my-value")
val, err := result.Int()

// Or evaluate a script directly
cmd := rdb.Eval(ctx, `return redis.call("PING")`, nil)

Configuration Options

The driver supports standard driver options via driver.Option:

import "github.com/xraph/grove/kv/driver"

rdb := redisdriver.New()
rdb.Open(ctx, "redis://localhost:6379",
    driver.WithPoolSize(20),
    driver.WithDialTimeout(5 * time.Second),
    driver.WithReadTimeout(3 * time.Second),
    driver.WithWriteTimeout(3 * time.Second),
)
OptionDescription
driver.WithPoolSize(n)Maximum number of connections in the pool
driver.WithDialTimeout(d)Timeout for establishing new connections
driver.WithReadTimeout(d)Timeout for socket reads
driver.WithWriteTimeout(d)Timeout for socket writes
driver.WithTLSConfig(cfg)TLS configuration for encrypted connections

Compatible Databases

The Redis driver works with any Redis-protocol-compatible database:

  • Redis 6.x, 7.x
  • Valkey -- open-source Redis fork with full API compatibility
  • DragonflyDB -- high-performance Redis-compatible in-memory store
  • KeyDB -- multi-threaded Redis fork

On this page