Grove
Extensions

Job Queue

Simple distributed FIFO job queue with visibility timeout, acknowledgment, and background task processing backed by Grove KV.

The Queue extension provides a distributed FIFO job queue built on the KV store. Jobs are enqueued with arbitrary payloads, dequeued in order, and acknowledged after processing. A configurable visibility timeout prevents double-processing by hiding dequeued jobs from other consumers until they are acknowledged or the timeout expires.

Installation

import "github.com/xraph/grove/kv/plugins"

Creating a Queue

queue := plugins.NewQueue(store, "tasks")

With a custom visibility timeout:

queue := plugins.NewQueue(store, "emails",
    extension.WithVisibilityTimeout(2 * time.Minute),
)

Parameters:

ParameterTypeDescription
store*kv.StoreThe backing KV store
prefixstringLogical name for the queue (auto-prefixed with queue:)

Options

OptionDefaultDescription
WithVisibilityTimeout(d)30sHow long a dequeued job is hidden from other consumers before becoming available again

API Reference

Enqueue

Adds a job to the end of the queue and returns the job ID.

func (q *Queue) Enqueue(ctx context.Context, payload any) (string, error)
jobID, err := queue.Enqueue(ctx, map[string]string{
    "to":      "alice@example.com",
    "subject": "Welcome!",
    "body":    "Thanks for signing up.",
})
// jobID = "a3f8c1e9b2d74f6a..."  (32 hex chars)

The payload can be any serializable value. The job is assigned a cryptographically random 16-byte ID (32 hex characters) and timestamped with the current time.

Dequeue

Retrieves and locks the next job from the front of the queue. Returns nil if the queue is empty.

func (q *Queue) Dequeue(ctx context.Context) (*Job, error)
job, err := queue.Dequeue(ctx)
if err != nil {
    // handle error
}
if job == nil {
    // queue is empty
}

// Process the job...
fmt.Printf("processing job %s: %v\n", job.ID, job.Payload)

When a job is dequeued:

  1. It is removed from the pending index.
  2. It is copied to a processing key with a TTL equal to the visibility timeout.
  3. Other consumers will not see it until the visibility timeout expires.

Ack

Acknowledges that a job has been successfully processed and removes it from the store entirely.

func (q *Queue) Ack(ctx context.Context, jobID string) error
err := queue.Ack(ctx, job.ID)

This deletes both the job data and the processing key. Always call Ack after successful processing to prevent the job from being reprocessed when the visibility timeout expires.

Size

Returns the number of pending (not yet dequeued) jobs in the queue.

func (q *Queue) Size(ctx context.Context) (int, error)
n, err := queue.Size(ctx)
fmt.Printf("%d jobs pending\n", n)

Job Struct

The Dequeue method returns a *Job value with the following fields:

type Job struct {
    ID         string    `json:"id"`
    Payload    any       `json:"payload"`
    EnqueuedAt time.Time `json:"enqueued_at"`
}
FieldTypeDescription
IDstringUnique job identifier (32 hex characters)
PayloadanyThe data passed to Enqueue
EnqueuedAttime.TimeTimestamp when the job was enqueued

Visibility Timeout

The visibility timeout is the key mechanism that prevents double-processing:

  1. When a job is dequeued, it is moved to a processing key with a TTL equal to the visibility timeout.
  2. If the consumer calls Ack before the timeout, the job is permanently removed.
  3. If the consumer crashes or fails to call Ack, the processing key expires and the job can be retried.

Set the visibility timeout to be longer than the expected processing time for your jobs. The default of 30 seconds works well for fast tasks. For longer-running jobs, increase it:

queue := plugins.NewQueue(store, "video-transcode",
    extension.WithVisibilityTimeout(10 * time.Minute),
)

Example: Background Task Processing

A worker loop that continuously processes jobs from the queue:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/xraph/grove/kv"
    "github.com/xraph/grove/kv/plugins"
    "github.com/xraph/grove/kv/drivers/redisdriver"
)

type EmailPayload struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

func main() {
    ctx := context.Background()

    rdb := redisdriver.New()
    rdb.Open(ctx, "redis://localhost:6379/0")

    store, _ := kv.Open(rdb)
    defer store.Close()

    queue := plugins.NewQueue(store, "emails",
        extension.WithVisibilityTimeout(time.Minute),
    )

    // Producer: enqueue some jobs.
    for i := 0; i < 5; i++ {
        id, err := queue.Enqueue(ctx, EmailPayload{
            To:      fmt.Sprintf("user%d@example.com", i),
            Subject: "Welcome!",
            Body:    "Thanks for signing up.",
        })
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("enqueued job %s\n", id)
    }

    // Consumer: process jobs in a loop.
    for {
        job, err := queue.Dequeue(ctx)
        if err != nil {
            log.Printf("dequeue error: %v", err)
            time.Sleep(time.Second)
            continue
        }

        if job == nil {
            fmt.Println("queue empty, waiting...")
            time.Sleep(2 * time.Second)
            continue
        }

        // Decode the payload.
        raw, _ := json.Marshal(job.Payload)
        var email EmailPayload
        json.Unmarshal(raw, &email)

        fmt.Printf("sending email to %s (job %s, enqueued at %s)\n",
            email.To, job.ID, job.EnqueuedAt.Format(time.RFC3339))

        // Simulate sending...
        time.Sleep(100 * time.Millisecond)

        // Acknowledge successful processing.
        if err := queue.Ack(ctx, job.ID); err != nil {
            log.Printf("ack error: %v", err)
        }
    }
}

Example: HTTP API with Queue

Expose an HTTP endpoint that enqueues work and returns immediately:

mux.HandleFunc("POST /tasks", func(w http.ResponseWriter, r *http.Request) {
    var payload map[string]any
    json.NewDecoder(r.Body).Decode(&payload)

    jobID, err := queue.Enqueue(r.Context(), payload)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusAccepted)
    json.NewEncoder(w).Encode(map[string]string{
        "job_id": jobID,
        "status": "queued",
    })
})

mux.HandleFunc("GET /tasks/pending", func(w http.ResponseWriter, r *http.Request) {
    n, err := queue.Size(r.Context())
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    json.NewEncoder(w).Encode(map[string]int{"pending": n})
})

Key Layout

The queue uses several keys internally:

Key PatternDescription
queue:<prefix>:indexOrdered list of pending job IDs
queue:<prefix>:job:<id>Job data (ID, payload, timestamp)
queue:<prefix>:processing:<id>Processing lock with visibility timeout TTL

For example, a queue created with NewQueue(store, "emails"):

queue:emails:index              -- ["abc123", "def456"]
queue:emails:job:abc123         -- {id, payload, enqueued_at}
queue:emails:processing:abc123  -- {id, payload, enqueued_at} (TTL: 30s)

Design Considerations

  • FIFO ordering is maintained by an index list stored in the KV store. Jobs are appended on enqueue and taken from the front on dequeue.
  • At-least-once delivery -- if a consumer crashes before calling Ack, the visibility timeout ensures the job can be retried by another consumer.
  • No priority support -- all jobs are processed in the order they were enqueued. For priority queues, consider using separate queues per priority level.
  • Best for moderate throughput -- this queue is designed for simplicity and correctness. For extremely high-throughput workloads (millions of jobs per second), consider a dedicated message broker.

On this page