Grove

Elasticsearch

Elasticsearch driver with native JSON query DSL, aggregations, bulk operations, and scroll support.

The Elasticsearch driver provides native JSON query DSL support built on the official Go client. Unlike the SQL-based drivers, this driver uses Elasticsearch-native operations (Search, Index, Update, Delete, Bulk, Aggregate) and implements grove.GroveDriver.

Installation

go get github.com/xraph/grove
go get github.com/xraph/grove/drivers/esdriver

Connection

import (
    "context"
    "log"

    "github.com/xraph/grove"
    "github.com/xraph/grove/drivers/esdriver"
)

ctx := context.Background()

// 1. Create an unconnected driver
es := esdriver.New()

// 2. Open a connection
err := es.Open(ctx, "http://localhost:9200")
if err != nil {
    log.Fatal(err)
}

// 3. Wrap in a grove.DB handle
db, err := grove.Open(es)
if err != nil {
    log.Fatal(err)
}
defer db.Close()

Multi-Node Cluster

Pass multiple addresses separated by commas:

err := es.Open(ctx, "http://es1:9200,http://es2:9200,http://es3:9200")

Or use the WithAddresses option:

err := es.Open(ctx, "",
    esdriver.WithAddresses("http://es1:9200", "http://es2:9200"),
)

Elastic Cloud

Connect using a Cloud ID:

err := es.Open(ctx, "",
    esdriver.WithCloudID("my-deployment:dXMtZWFzdC0xLm..."),
    esdriver.WithAPIKey("base64-api-key"),
)

Authentication

// Basic auth
err := es.Open(ctx, "http://localhost:9200",
    esdriver.WithBasicAuth("elastic", "changeme"),
)

// API key
err := es.Open(ctx, "http://localhost:9200",
    esdriver.WithAPIKey("base64-encoded-api-key"),
)

// TLS with CA certificate
cert, _ := os.ReadFile("ca.crt")
err := es.Open(ctx, "https://localhost:9200",
    esdriver.WithCACert(cert),
)

Unwrap Pattern

Many Elasticsearch-specific query builders live on *esdriver.ElasticDB. Use Unwrap to extract the driver from a *grove.DB handle:

es := esdriver.Unwrap(db)

// Now you can use ES-native query builders:
es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Match("title", "grove").Scan(ctx)

Accessing the Underlying Client

es := esdriver.Unwrap(db)

client := es.Client() // *elasticsearch.Client

The M Type

The esdriver package provides an M type alias for map[string]any, used for building JSON request bodies:

import "github.com/xraph/grove/drivers/esdriver"

query := esdriver.M{"match": esdriver.M{"title": "grove"}}

Defining Models

Models are plain Go structs with grove struct tags. The index name is derived from the table name in the schema (typically the pluralized, snake-cased struct name).

type Article struct {
    grove.BaseModel `grove:"table:articles"`

    ID        string    `grove:"id,pk"`
    Title     string    `grove:"title"`
    Body      string    `grove:"body"`
    Author    string    `grove:"author"`
    Tags      []string  `grove:"tags"`
    CreatedAt time.Time `grove:"created_at"`
}

Default Index

Set a default index name to avoid specifying it on every query:

es := esdriver.Unwrap(db)
es.SetDefaultIndex("articles")

Index Management

Manage Elasticsearch indices directly through the driver:

es := esdriver.Unwrap(db)

// Create an index with mappings
err := es.CreateIndex(ctx, "articles", esdriver.M{
    "mappings": esdriver.M{
        "properties": esdriver.M{
            "title":      esdriver.M{"type": "text", "analyzer": "standard"},
            "body":       esdriver.M{"type": "text"},
            "author":     esdriver.M{"type": "keyword"},
            "tags":       esdriver.M{"type": "keyword"},
            "created_at": esdriver.M{"type": "date"},
        },
    },
    "settings": esdriver.M{
        "number_of_shards":   1,
        "number_of_replicas": 0,
    },
})

// Check if an index exists
exists, err := es.IndexExists(ctx, "articles")

// Update field mappings
err = es.PutMapping(ctx, "articles", esdriver.M{
    "properties": esdriver.M{
        "category": esdriver.M{"type": "keyword"},
    },
})

// Force refresh (make recent writes searchable)
err = es.Refresh(ctx, "articles")

// Delete an index
err = es.DeleteIndex(ctx, "articles")

Search Queries

Create search queries with GroveSelect. The query builder provides convenience methods for common Elasticsearch query types.

var articles []Article
es := esdriver.Unwrap(db)

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Match("title", "getting started").
    Size(10).
    Scan(ctx)

Phrase Matching

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    MatchPhrase("body", "distributed systems").
    Scan(ctx)

Term Query (Exact Match)

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Term("author", "alice").
    Scan(ctx)

Terms Query (IN-style)

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Terms("tags", "go", "elasticsearch", "grove").
    Scan(ctx)

Range Query

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Range("created_at", esdriver.RangeOpts{
        GTE:    "2024-01-01",
        LT:     "2025-01-01",
        Format: "yyyy-MM-dd",
    }).
    Scan(ctx)

Bool Query

Combine multiple conditions with a bool query:

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Bool(func(b *esdriver.BoolQuery) {
        b.Must(esdriver.M{"match": esdriver.M{"body": "grove"}})
        b.Filter(esdriver.M{"term": esdriver.M{"author": "alice"}})
        b.MustNot(esdriver.M{"term": esdriver.M{"status": "draft"}})
        b.Should(esdriver.M{"match": esdriver.M{"tags": "featured"}})
        b.MinimumShouldMatch(1)
    }).
    Scan(ctx)

Exists Query

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Exists("tags").
    Scan(ctx)

Raw Query (Escape Hatch)

Use RawQuery for any query type not covered by convenience methods:

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    RawQuery(esdriver.M{
        "fuzzy": esdriver.M{
            "title": esdriver.M{
                "value":     "grov",
                "fuzziness": "AUTO",
            },
        },
    }).
    Scan(ctx)

Sorting, Pagination, and Source Filtering

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Match("body", "grove").
    Sort("created_at", "desc").
    Source("title", "author", "created_at").
    ExcludeSource("body").
    From(20).
    Size(10).
    Scan(ctx)

Search After (Keyset Pagination)

For efficient deep pagination, use SearchAfter instead of From:

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Match("body", "grove").
    Sort("created_at", "desc").
    SortBy(esdriver.M{"_id": "asc"}).
    Size(10).
    SearchAfter("2024-06-15T10:00:00Z", "abc123").
    Scan(ctx)

Highlighting

result, err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Match("body", "grove").
    Highlight(esdriver.M{
        "fields": esdriver.M{
            "body": esdriver.M{
                "fragment_size":       150,
                "number_of_fragments": 3,
            },
        },
    }).
    ScanHits(ctx)

Count

count, err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Term("author", "alice").
    Count(ctx)

Override Index

err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Index("articles-2024").
    Match("body", "grove").
    Scan(ctx)

Insert

Create insert queries with GroveInsert. The driver auto-detects single vs bulk insert based on whether the model is a struct pointer or a slice pointer.

Insert a Single Document

article := Article{
    ID:        "article-1",
    Title:     "Getting Started with Grove",
    Body:      "...",
    Author:    "alice",
    Tags:      []string{"grove", "go"},
    CreatedAt: time.Now(),
}

result, err := es.GroveInsert(&article).(*esdriver.InsertQuery).
    Exec(ctx)
if err != nil {
    log.Fatal(err)
}

fmt.Println(result.DocumentID()) // "article-1"
fmt.Println(result.Action())     // "created"

Bulk Insert

articles := []Article{
    {Title: "Post 1", Author: "alice"},
    {Title: "Post 2", Author: "bob"},
}

result, err := es.GroveInsert(&articles).(*esdriver.InsertQuery).
    Exec(ctx)

Insert Options

result, err := es.GroveInsert(&article).(*esdriver.InsertQuery).
    DocumentID("custom-id").
    Routing("tenant-1").
    Pipeline("my-ingest-pipeline").
    Refresh("wait_for").
    Index("articles-2024").
    Exec(ctx)

Update

Create update queries with GroveUpdate.

Partial Update by ID

result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
    DocumentID("article-1").
    Set("title", "Updated Title").
    Set("author", "bob").
    Exec(ctx)

fmt.Println(result.RowsAffected()) // 1
fmt.Println(result.Action())       // "updated"

Set Entire Partial Document

result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
    DocumentID("article-1").
    SetDoc(esdriver.M{
        "title":  "New Title",
        "author": "carol",
        "tags":   []string{"updated"},
    }).
    Exec(ctx)

Scripted Update

result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
    DocumentID("article-1").
    SetScript(esdriver.Script{
        Source: "ctx._source.views += params.count",
        Lang:   "painless",
        Params: esdriver.M{"count": 1},
    }).
    Exec(ctx)

Upsert

Insert the document if it does not exist:

result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
    DocumentID("article-1").
    Set("title", "New or Updated").
    Upsert().
    Exec(ctx)

Update by Query

Update all matching documents:

result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
    Filter(esdriver.M{"term": esdriver.M{"author": "alice"}}).
    SetScript(esdriver.Script{
        Source: "ctx._source.author = params.newAuthor",
        Lang:   "painless",
        Params: esdriver.M{"newAuthor": "alice-v2"},
    }).
    Many().
    Exec(ctx)

Delete

Create delete queries with GroveDelete.

Delete a Single Document

result, err := es.GroveDelete(&Article{}).(*esdriver.DeleteQuery).
    DocumentID("article-1").
    Exec(ctx)

fmt.Println(result.RowsAffected()) // 1

Delete by Query

Delete all matching documents:

result, err := es.GroveDelete(&Article{}).(*esdriver.DeleteQuery).
    Filter(esdriver.M{"term": esdriver.M{"status": "archived"}}).
    Many().
    Exec(ctx)

Bulk Operations

Use NewBulk() for mixed bulk operations (index, create, update, delete) in a single request:

es := esdriver.Unwrap(db)

result, err := es.NewBulk().
    Index("articles", "id-1", esdriver.M{"title": "Post 1"}).
    Index("articles", "id-2", esdriver.M{"title": "Post 2"}).
    Create("articles", "id-3", esdriver.M{"title": "Post 3"}).
    Update("articles", "id-1", esdriver.M{"doc": esdriver.M{"title": "Updated Post 1"}}).
    Delete("articles", "id-2").
    Refresh("wait_for").
    Exec(ctx)
if err != nil {
    log.Fatal(err)
}

fmt.Println(result.Errors)     // true if any action failed
fmt.Println(len(result.Items)) // number of actions

Each BulkItem in the result contains the outcome for that action:

for _, item := range result.Items {
    if item.Index != nil {
        fmt.Printf("Indexed %s: status %d\n", item.Index.ID, item.Index.Status)
    }
}

Aggregation

Build Elasticsearch aggregations with the fluent AggregateQuery builder:

Pre-Built Aggregations

es := esdriver.Unwrap(db)

agg := es.NewAggregate(&Article{})

var results any
err := agg.
    Terms("by_author", "author").
    Avg("avg_views", "views").
    DateHistogram("by_month", "created_at", "month").
    Query(esdriver.M{"term": esdriver.M{"status": "published"}}).
    Scan(ctx, &results)

Available Aggregations

MethodDescription
Terms(name, field)Terms aggregation (group by field)
DateHistogram(name, field, interval)Date histogram aggregation
Avg(name, field)Average metric
Sum(name, field)Sum metric
Min(name, field)Minimum metric
Max(name, field)Maximum metric
Cardinality(name, field)Approximate distinct count

Sub-Aggregations

err := es.NewAggregate(&Article{}).
    Terms("by_author", "author").
    SubAgg("by_author", esdriver.M{
        "avg_views": esdriver.M{"avg": esdriver.M{"field": "views"}},
    }).
    Scan(ctx, &results)

Raw Aggregations

For full control, use RawAggs:

result, err := es.NewAggregate(&Article{}).
    RawAggs(esdriver.M{
        "popular_tags": esdriver.M{
            "terms": esdriver.M{
                "field": "tags",
                "size":  20,
            },
        },
    }).
    ScanRaw(ctx)

Scroll (Deep Pagination)

For iterating over large result sets, use scroll:

es := esdriver.Unwrap(db)

cursor, err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
    Match("body", "grove").
    Size(100).
    Scroll(ctx, "5m")
if err != nil {
    log.Fatal(err)
}
defer cursor.Close() // clears the scroll context on the server

for cursor.Next() {
    var article Article
    if err := cursor.Decode(&article); err != nil {
        log.Fatal(err)
    }
    fmt.Println(article.Title)
}

if err := cursor.Err(); err != nil {
    log.Fatal(err)
}

The cursor automatically fetches the next batch when the current batch is exhausted. Call Close() when done to release server-side resources.

Transactions

Elasticsearch does not support multi-document ACID transactions. Calling grove.DB.BeginTx() returns grove.ErrNotSupported.

Use NewBulk() for atomic batching of multiple writes in a single request.

Result Type

Write operations return *EsResult with the following methods:

MethodDescription
RowsAffected()Number of documents affected
DocumentID()The _id of the document
Version()Document version after operation
Action()Result action: "created", "updated", "deleted", "noop"

LastInsertId() returns ErrLastInsertIDNotSupported. Use DocumentID() instead.

Options

Configure the driver when calling Open:

OptionDescription
esdriver.WithBasicAuth(user, pass)HTTP basic authentication
esdriver.WithAPIKey(key)API key authentication
esdriver.WithCloudID(id)Elastic Cloud deployment ID
esdriver.WithCACert(cert)TLS CA certificate (PEM bytes)
esdriver.WithRefresh(policy)Default refresh policy: "true", "false", "wait_for"
esdriver.WithMaxRetries(n)Maximum retry attempts
esdriver.WithTransport(t)Custom http.RoundTripper
esdriver.WithAddresses(addrs...)Override parsed addresses

Compatible Services

The Elasticsearch driver works with any Elasticsearch-compatible service:

  • Elasticsearch 7.x, 8.x
  • OpenSearch -- AWS-managed or self-hosted Elasticsearch fork
  • Amazon OpenSearch Service -- fully managed Elasticsearch on AWS
  • Elastic Cloud -- managed Elasticsearch from Elastic

On this page