Redis Driver
Redis driver for Grove KV with support for TTL, CAS, Scan, Batch, PubSub, Transactions, and Streams.
The Redis driver connects Grove KV to Redis, Valkey, or DragonflyDB using go-redis. It implements all optional KV interfaces including batch, TTL, scan, CAS, PubSub, transactions, and streams.
Installation
go get github.com/xraph/grove/kv
go get github.com/xraph/grove/kv/drivers/redisdriverConnection
import (
"github.com/xraph/grove/kv"
"github.com/xraph/grove/kv/drivers/redisdriver"
)
// Create driver and open the store
store, err := kv.Open(redisdriver.New(), "redis://localhost:6379")
if err != nil {
log.Fatal(err)
}
defer store.Close()The DSN follows the standard Redis URL format: redis://[user:password@]host:port[/db].
Capabilities
| Capability | Supported |
|---|---|
| TTL | Yes |
| CAS (SetNX/SetXX) | Yes |
| Scan | Yes |
| Batch (MGet/MSet) | Yes |
| PubSub | Yes |
| Transactions | Yes |
| Streams | Yes |
| Sorted Sets | Yes |
Cluster Mode
Connect to a Redis Cluster by calling OpenCluster directly on the driver:
import (
"github.com/xraph/grove/kv"
"github.com/xraph/grove/kv/drivers/redisdriver"
"github.com/xraph/grove/kv/driver"
)
rdb := redisdriver.New()
err := rdb.OpenCluster(ctx, []string{
"redis-node-1:6379",
"redis-node-2:6379",
"redis-node-3:6379",
}, driver.WithPoolSize(20))
if err != nil {
log.Fatal(err)
}
store, err := kv.Open(rdb)
if err != nil {
log.Fatal(err)
}
defer store.Close()Sentinel Mode
Connect through Redis Sentinel for high availability:
rdb := redisdriver.New()
err := rdb.OpenSentinel(ctx, "mymaster", []string{
"sentinel-1:26379",
"sentinel-2:26379",
"sentinel-3:26379",
})
if err != nil {
log.Fatal(err)
}
store, err := kv.Open(rdb)
if err != nil {
log.Fatal(err)
}
defer store.Close()Unwrap for Native Access
Use Unwrap to access the underlying RedisDB driver or UnwrapClient to get the go-redis UniversalClient directly:
// Get the RedisDB driver instance
rdb := redisdriver.Unwrap(store)
// Or get the go-redis UniversalClient directly
client := redisdriver.UnwrapClient(store)
result, err := client.Get(ctx, "raw-key").Result()Advanced Features
Pipeline
Accumulate multiple commands and execute them in a single round-trip:
rdb := redisdriver.Unwrap(store)
// Non-transactional pipeline
pipe := rdb.Pipeline()
pipe.Set(ctx, "key1", "val1", 0)
pipe.Set(ctx, "key2", "val2", 0)
pipe.Get(ctx, "key1")
cmds, err := pipe.Exec(ctx)
// Transactional pipeline (MULTI/EXEC)
txPipe := rdb.TxPipeline()
txPipe.Set(ctx, "key1", "val1", 0)
txPipe.Incr(ctx, "counter")
cmds, err = txPipe.Exec(ctx)Watch (Optimistic Locking)
Execute a transactional WATCH/MULTI/EXEC block on specific keys:
rdb := redisdriver.Unwrap(store)
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
val, err := tx.Get(ctx, "balance").Int64()
if err != nil {
return err
}
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, "balance", val+100, 0)
return nil
})
return err
}, "balance")PubSub
Publish and subscribe to Redis channels:
rdb := redisdriver.Unwrap(store)
ps := rdb.NewPubSub()
// Publish a message
err := ps.Publish(ctx, "events", "user:created")
// Subscribe and handle messages (blocks until context is cancelled)
err = ps.Subscribe(ctx, func(channel string, payload string) {
fmt.Printf("channel=%s payload=%s\n", channel, payload)
}, "events", "notifications")
// For advanced use, get the raw go-redis PubSub handle
sub := ps.SubscribeRaw(ctx, "events")
defer sub.Close()Streams
Work with Redis Streams for event-driven architectures:
rdb := redisdriver.Unwrap(store)
stream := rdb.NewStream()
// Add an entry to a stream
id, err := stream.XAdd(ctx, "orders", map[string]any{
"user_id": "u123",
"amount": 99.99,
})
// Create a consumer group
err = stream.XGroupCreate(ctx, "orders", "processors", "0")
// Read from the stream as a consumer
messages, err := stream.XReadGroup(ctx, "processors", "worker-1",
[]string{"orders", ">"}, 10)
// Acknowledge processed messages
count, err := stream.XAck(ctx, "orders", "processors", id)
// Get stream length
length, err := stream.XLen(ctx, "orders")Lua Scripts
Execute Lua scripts on the server for atomic operations:
rdb := redisdriver.Unwrap(store)
// Create a reusable script (cached via EVALSHA)
script := redisdriver.NewScript(`
local current = redis.call("GET", KEYS[1])
if current == false then
redis.call("SET", KEYS[1], ARGV[1])
return 1
end
return 0
`)
result := script.Run(ctx, rdb, []string{"my-key"}, "my-value")
val, err := result.Int()
// Or evaluate a script directly
cmd := rdb.Eval(ctx, `return redis.call("PING")`, nil)Configuration Options
The driver supports standard driver options via driver.Option:
import "github.com/xraph/grove/kv/driver"
rdb := redisdriver.New()
rdb.Open(ctx, "redis://localhost:6379",
driver.WithPoolSize(20),
driver.WithDialTimeout(5 * time.Second),
driver.WithReadTimeout(3 * time.Second),
driver.WithWriteTimeout(3 * time.Second),
)| Option | Description |
|---|---|
driver.WithPoolSize(n) | Maximum number of connections in the pool |
driver.WithDialTimeout(d) | Timeout for establishing new connections |
driver.WithReadTimeout(d) | Timeout for socket reads |
driver.WithWriteTimeout(d) | Timeout for socket writes |
driver.WithTLSConfig(cfg) | TLS configuration for encrypted connections |
Compatible Databases
The Redis driver works with any Redis-protocol-compatible database:
- Redis 6.x, 7.x
- Valkey -- open-source Redis fork with full API compatibility
- DragonflyDB -- high-performance Redis-compatible in-memory store
- KeyDB -- multi-threaded Redis fork