Job Queue
Simple distributed FIFO job queue with visibility timeout, acknowledgment, and background task processing backed by Grove KV.
The Queue extension provides a distributed FIFO job queue built on the KV store. Jobs are enqueued with arbitrary payloads, dequeued in order, and acknowledged after processing. A configurable visibility timeout prevents double-processing by hiding dequeued jobs from other consumers until they are acknowledged or the timeout expires.
Installation
import "github.com/xraph/grove/kv/plugins"Creating a Queue
queue := plugins.NewQueue(store, "tasks")With a custom visibility timeout:
queue := plugins.NewQueue(store, "emails",
extension.WithVisibilityTimeout(2 * time.Minute),
)Parameters:
| Parameter | Type | Description |
|---|---|---|
store | *kv.Store | The backing KV store |
prefix | string | Logical name for the queue (auto-prefixed with queue:) |
Options
| Option | Default | Description |
|---|---|---|
WithVisibilityTimeout(d) | 30s | How long a dequeued job is hidden from other consumers before becoming available again |
API Reference
Enqueue
Adds a job to the end of the queue and returns the job ID.
func (q *Queue) Enqueue(ctx context.Context, payload any) (string, error)jobID, err := queue.Enqueue(ctx, map[string]string{
"to": "alice@example.com",
"subject": "Welcome!",
"body": "Thanks for signing up.",
})
// jobID = "a3f8c1e9b2d74f6a..." (32 hex chars)The payload can be any serializable value. The job is assigned a cryptographically random 16-byte ID (32 hex characters) and timestamped with the current time.
Dequeue
Retrieves and locks the next job from the front of the queue. Returns nil if the queue is empty.
func (q *Queue) Dequeue(ctx context.Context) (*Job, error)job, err := queue.Dequeue(ctx)
if err != nil {
// handle error
}
if job == nil {
// queue is empty
}
// Process the job...
fmt.Printf("processing job %s: %v\n", job.ID, job.Payload)When a job is dequeued:
- It is removed from the pending index.
- It is copied to a processing key with a TTL equal to the visibility timeout.
- Other consumers will not see it until the visibility timeout expires.
Ack
Acknowledges that a job has been successfully processed and removes it from the store entirely.
func (q *Queue) Ack(ctx context.Context, jobID string) errorerr := queue.Ack(ctx, job.ID)This deletes both the job data and the processing key. Always call Ack after successful processing to prevent the job from being reprocessed when the visibility timeout expires.
Size
Returns the number of pending (not yet dequeued) jobs in the queue.
func (q *Queue) Size(ctx context.Context) (int, error)n, err := queue.Size(ctx)
fmt.Printf("%d jobs pending\n", n)Job Struct
The Dequeue method returns a *Job value with the following fields:
type Job struct {
ID string `json:"id"`
Payload any `json:"payload"`
EnqueuedAt time.Time `json:"enqueued_at"`
}| Field | Type | Description |
|---|---|---|
ID | string | Unique job identifier (32 hex characters) |
Payload | any | The data passed to Enqueue |
EnqueuedAt | time.Time | Timestamp when the job was enqueued |
Visibility Timeout
The visibility timeout is the key mechanism that prevents double-processing:
- When a job is dequeued, it is moved to a processing key with a TTL equal to the visibility timeout.
- If the consumer calls
Ackbefore the timeout, the job is permanently removed. - If the consumer crashes or fails to call
Ack, the processing key expires and the job can be retried.
Set the visibility timeout to be longer than the expected processing time for your jobs. The default of 30 seconds works well for fast tasks. For longer-running jobs, increase it:
queue := plugins.NewQueue(store, "video-transcode",
extension.WithVisibilityTimeout(10 * time.Minute),
)Example: Background Task Processing
A worker loop that continuously processes jobs from the queue:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/xraph/grove/kv"
"github.com/xraph/grove/kv/plugins"
"github.com/xraph/grove/kv/drivers/redisdriver"
)
type EmailPayload struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
func main() {
ctx := context.Background()
rdb := redisdriver.New()
rdb.Open(ctx, "redis://localhost:6379/0")
store, _ := kv.Open(rdb)
defer store.Close()
queue := plugins.NewQueue(store, "emails",
extension.WithVisibilityTimeout(time.Minute),
)
// Producer: enqueue some jobs.
for i := 0; i < 5; i++ {
id, err := queue.Enqueue(ctx, EmailPayload{
To: fmt.Sprintf("user%d@example.com", i),
Subject: "Welcome!",
Body: "Thanks for signing up.",
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("enqueued job %s\n", id)
}
// Consumer: process jobs in a loop.
for {
job, err := queue.Dequeue(ctx)
if err != nil {
log.Printf("dequeue error: %v", err)
time.Sleep(time.Second)
continue
}
if job == nil {
fmt.Println("queue empty, waiting...")
time.Sleep(2 * time.Second)
continue
}
// Decode the payload.
raw, _ := json.Marshal(job.Payload)
var email EmailPayload
json.Unmarshal(raw, &email)
fmt.Printf("sending email to %s (job %s, enqueued at %s)\n",
email.To, job.ID, job.EnqueuedAt.Format(time.RFC3339))
// Simulate sending...
time.Sleep(100 * time.Millisecond)
// Acknowledge successful processing.
if err := queue.Ack(ctx, job.ID); err != nil {
log.Printf("ack error: %v", err)
}
}
}Example: HTTP API with Queue
Expose an HTTP endpoint that enqueues work and returns immediately:
mux.HandleFunc("POST /tasks", func(w http.ResponseWriter, r *http.Request) {
var payload map[string]any
json.NewDecoder(r.Body).Decode(&payload)
jobID, err := queue.Enqueue(r.Context(), payload)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{
"job_id": jobID,
"status": "queued",
})
})
mux.HandleFunc("GET /tasks/pending", func(w http.ResponseWriter, r *http.Request) {
n, err := queue.Size(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]int{"pending": n})
})Key Layout
The queue uses several keys internally:
| Key Pattern | Description |
|---|---|
queue:<prefix>:index | Ordered list of pending job IDs |
queue:<prefix>:job:<id> | Job data (ID, payload, timestamp) |
queue:<prefix>:processing:<id> | Processing lock with visibility timeout TTL |
For example, a queue created with NewQueue(store, "emails"):
queue:emails:index -- ["abc123", "def456"]
queue:emails:job:abc123 -- {id, payload, enqueued_at}
queue:emails:processing:abc123 -- {id, payload, enqueued_at} (TTL: 30s)Design Considerations
- FIFO ordering is maintained by an index list stored in the KV store. Jobs are appended on enqueue and taken from the front on dequeue.
- At-least-once delivery -- if a consumer crashes before calling
Ack, the visibility timeout ensures the job can be retried by another consumer. - No priority support -- all jobs are processed in the order they were enqueued. For priority queues, consider using separate queues per priority level.
- Best for moderate throughput -- this queue is designed for simplicity and correctness. For extremely high-throughput workloads (millions of jobs per second), consider a dedicated message broker.