Elasticsearch
Elasticsearch driver with native JSON query DSL, aggregations, bulk operations, and scroll support.
The Elasticsearch driver provides native JSON query DSL support built on the official Go client. Unlike the SQL-based drivers, this driver uses Elasticsearch-native operations (Search, Index, Update, Delete, Bulk, Aggregate) and implements grove.GroveDriver.
Installation
go get github.com/xraph/grove
go get github.com/xraph/grove/drivers/esdriverConnection
import (
"context"
"log"
"github.com/xraph/grove"
"github.com/xraph/grove/drivers/esdriver"
)
ctx := context.Background()
// 1. Create an unconnected driver
es := esdriver.New()
// 2. Open a connection
err := es.Open(ctx, "http://localhost:9200")
if err != nil {
log.Fatal(err)
}
// 3. Wrap in a grove.DB handle
db, err := grove.Open(es)
if err != nil {
log.Fatal(err)
}
defer db.Close()Multi-Node Cluster
Pass multiple addresses separated by commas:
err := es.Open(ctx, "http://es1:9200,http://es2:9200,http://es3:9200")Or use the WithAddresses option:
err := es.Open(ctx, "",
esdriver.WithAddresses("http://es1:9200", "http://es2:9200"),
)Elastic Cloud
Connect using a Cloud ID:
err := es.Open(ctx, "",
esdriver.WithCloudID("my-deployment:dXMtZWFzdC0xLm..."),
esdriver.WithAPIKey("base64-api-key"),
)Authentication
// Basic auth
err := es.Open(ctx, "http://localhost:9200",
esdriver.WithBasicAuth("elastic", "changeme"),
)
// API key
err := es.Open(ctx, "http://localhost:9200",
esdriver.WithAPIKey("base64-encoded-api-key"),
)
// TLS with CA certificate
cert, _ := os.ReadFile("ca.crt")
err := es.Open(ctx, "https://localhost:9200",
esdriver.WithCACert(cert),
)Unwrap Pattern
Many Elasticsearch-specific query builders live on *esdriver.ElasticDB. Use Unwrap to extract the driver from a *grove.DB handle:
es := esdriver.Unwrap(db)
// Now you can use ES-native query builders:
es.GroveSelect(&articles).(*esdriver.SearchQuery).
Match("title", "grove").Scan(ctx)Accessing the Underlying Client
es := esdriver.Unwrap(db)
client := es.Client() // *elasticsearch.ClientThe M Type
The esdriver package provides an M type alias for map[string]any, used for building JSON request bodies:
import "github.com/xraph/grove/drivers/esdriver"
query := esdriver.M{"match": esdriver.M{"title": "grove"}}Defining Models
Models are plain Go structs with grove struct tags. The index name is derived from the table name in the schema (typically the pluralized, snake-cased struct name).
type Article struct {
grove.BaseModel `grove:"table:articles"`
ID string `grove:"id,pk"`
Title string `grove:"title"`
Body string `grove:"body"`
Author string `grove:"author"`
Tags []string `grove:"tags"`
CreatedAt time.Time `grove:"created_at"`
}Default Index
Set a default index name to avoid specifying it on every query:
es := esdriver.Unwrap(db)
es.SetDefaultIndex("articles")Index Management
Manage Elasticsearch indices directly through the driver:
es := esdriver.Unwrap(db)
// Create an index with mappings
err := es.CreateIndex(ctx, "articles", esdriver.M{
"mappings": esdriver.M{
"properties": esdriver.M{
"title": esdriver.M{"type": "text", "analyzer": "standard"},
"body": esdriver.M{"type": "text"},
"author": esdriver.M{"type": "keyword"},
"tags": esdriver.M{"type": "keyword"},
"created_at": esdriver.M{"type": "date"},
},
},
"settings": esdriver.M{
"number_of_shards": 1,
"number_of_replicas": 0,
},
})
// Check if an index exists
exists, err := es.IndexExists(ctx, "articles")
// Update field mappings
err = es.PutMapping(ctx, "articles", esdriver.M{
"properties": esdriver.M{
"category": esdriver.M{"type": "keyword"},
},
})
// Force refresh (make recent writes searchable)
err = es.Refresh(ctx, "articles")
// Delete an index
err = es.DeleteIndex(ctx, "articles")Search Queries
Create search queries with GroveSelect. The query builder provides convenience methods for common Elasticsearch query types.
Full-Text Search
var articles []Article
es := esdriver.Unwrap(db)
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Match("title", "getting started").
Size(10).
Scan(ctx)Phrase Matching
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
MatchPhrase("body", "distributed systems").
Scan(ctx)Term Query (Exact Match)
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Term("author", "alice").
Scan(ctx)Terms Query (IN-style)
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Terms("tags", "go", "elasticsearch", "grove").
Scan(ctx)Range Query
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Range("created_at", esdriver.RangeOpts{
GTE: "2024-01-01",
LT: "2025-01-01",
Format: "yyyy-MM-dd",
}).
Scan(ctx)Bool Query
Combine multiple conditions with a bool query:
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Bool(func(b *esdriver.BoolQuery) {
b.Must(esdriver.M{"match": esdriver.M{"body": "grove"}})
b.Filter(esdriver.M{"term": esdriver.M{"author": "alice"}})
b.MustNot(esdriver.M{"term": esdriver.M{"status": "draft"}})
b.Should(esdriver.M{"match": esdriver.M{"tags": "featured"}})
b.MinimumShouldMatch(1)
}).
Scan(ctx)Exists Query
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Exists("tags").
Scan(ctx)Raw Query (Escape Hatch)
Use RawQuery for any query type not covered by convenience methods:
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
RawQuery(esdriver.M{
"fuzzy": esdriver.M{
"title": esdriver.M{
"value": "grov",
"fuzziness": "AUTO",
},
},
}).
Scan(ctx)Sorting, Pagination, and Source Filtering
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Match("body", "grove").
Sort("created_at", "desc").
Source("title", "author", "created_at").
ExcludeSource("body").
From(20).
Size(10).
Scan(ctx)Search After (Keyset Pagination)
For efficient deep pagination, use SearchAfter instead of From:
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Match("body", "grove").
Sort("created_at", "desc").
SortBy(esdriver.M{"_id": "asc"}).
Size(10).
SearchAfter("2024-06-15T10:00:00Z", "abc123").
Scan(ctx)Highlighting
result, err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Match("body", "grove").
Highlight(esdriver.M{
"fields": esdriver.M{
"body": esdriver.M{
"fragment_size": 150,
"number_of_fragments": 3,
},
},
}).
ScanHits(ctx)Count
count, err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Term("author", "alice").
Count(ctx)Override Index
err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Index("articles-2024").
Match("body", "grove").
Scan(ctx)Insert
Create insert queries with GroveInsert. The driver auto-detects single vs bulk insert based on whether the model is a struct pointer or a slice pointer.
Insert a Single Document
article := Article{
ID: "article-1",
Title: "Getting Started with Grove",
Body: "...",
Author: "alice",
Tags: []string{"grove", "go"},
CreatedAt: time.Now(),
}
result, err := es.GroveInsert(&article).(*esdriver.InsertQuery).
Exec(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(result.DocumentID()) // "article-1"
fmt.Println(result.Action()) // "created"Bulk Insert
articles := []Article{
{Title: "Post 1", Author: "alice"},
{Title: "Post 2", Author: "bob"},
}
result, err := es.GroveInsert(&articles).(*esdriver.InsertQuery).
Exec(ctx)Insert Options
result, err := es.GroveInsert(&article).(*esdriver.InsertQuery).
DocumentID("custom-id").
Routing("tenant-1").
Pipeline("my-ingest-pipeline").
Refresh("wait_for").
Index("articles-2024").
Exec(ctx)Update
Create update queries with GroveUpdate.
Partial Update by ID
result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
DocumentID("article-1").
Set("title", "Updated Title").
Set("author", "bob").
Exec(ctx)
fmt.Println(result.RowsAffected()) // 1
fmt.Println(result.Action()) // "updated"Set Entire Partial Document
result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
DocumentID("article-1").
SetDoc(esdriver.M{
"title": "New Title",
"author": "carol",
"tags": []string{"updated"},
}).
Exec(ctx)Scripted Update
result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
DocumentID("article-1").
SetScript(esdriver.Script{
Source: "ctx._source.views += params.count",
Lang: "painless",
Params: esdriver.M{"count": 1},
}).
Exec(ctx)Upsert
Insert the document if it does not exist:
result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
DocumentID("article-1").
Set("title", "New or Updated").
Upsert().
Exec(ctx)Update by Query
Update all matching documents:
result, err := es.GroveUpdate(&Article{}).(*esdriver.UpdateQuery).
Filter(esdriver.M{"term": esdriver.M{"author": "alice"}}).
SetScript(esdriver.Script{
Source: "ctx._source.author = params.newAuthor",
Lang: "painless",
Params: esdriver.M{"newAuthor": "alice-v2"},
}).
Many().
Exec(ctx)Delete
Create delete queries with GroveDelete.
Delete a Single Document
result, err := es.GroveDelete(&Article{}).(*esdriver.DeleteQuery).
DocumentID("article-1").
Exec(ctx)
fmt.Println(result.RowsAffected()) // 1Delete by Query
Delete all matching documents:
result, err := es.GroveDelete(&Article{}).(*esdriver.DeleteQuery).
Filter(esdriver.M{"term": esdriver.M{"status": "archived"}}).
Many().
Exec(ctx)Bulk Operations
Use NewBulk() for mixed bulk operations (index, create, update, delete) in a single request:
es := esdriver.Unwrap(db)
result, err := es.NewBulk().
Index("articles", "id-1", esdriver.M{"title": "Post 1"}).
Index("articles", "id-2", esdriver.M{"title": "Post 2"}).
Create("articles", "id-3", esdriver.M{"title": "Post 3"}).
Update("articles", "id-1", esdriver.M{"doc": esdriver.M{"title": "Updated Post 1"}}).
Delete("articles", "id-2").
Refresh("wait_for").
Exec(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(result.Errors) // true if any action failed
fmt.Println(len(result.Items)) // number of actionsEach BulkItem in the result contains the outcome for that action:
for _, item := range result.Items {
if item.Index != nil {
fmt.Printf("Indexed %s: status %d\n", item.Index.ID, item.Index.Status)
}
}Aggregation
Build Elasticsearch aggregations with the fluent AggregateQuery builder:
Pre-Built Aggregations
es := esdriver.Unwrap(db)
agg := es.NewAggregate(&Article{})
var results any
err := agg.
Terms("by_author", "author").
Avg("avg_views", "views").
DateHistogram("by_month", "created_at", "month").
Query(esdriver.M{"term": esdriver.M{"status": "published"}}).
Scan(ctx, &results)Available Aggregations
| Method | Description |
|---|---|
Terms(name, field) | Terms aggregation (group by field) |
DateHistogram(name, field, interval) | Date histogram aggregation |
Avg(name, field) | Average metric |
Sum(name, field) | Sum metric |
Min(name, field) | Minimum metric |
Max(name, field) | Maximum metric |
Cardinality(name, field) | Approximate distinct count |
Sub-Aggregations
err := es.NewAggregate(&Article{}).
Terms("by_author", "author").
SubAgg("by_author", esdriver.M{
"avg_views": esdriver.M{"avg": esdriver.M{"field": "views"}},
}).
Scan(ctx, &results)Raw Aggregations
For full control, use RawAggs:
result, err := es.NewAggregate(&Article{}).
RawAggs(esdriver.M{
"popular_tags": esdriver.M{
"terms": esdriver.M{
"field": "tags",
"size": 20,
},
},
}).
ScanRaw(ctx)Scroll (Deep Pagination)
For iterating over large result sets, use scroll:
es := esdriver.Unwrap(db)
cursor, err := es.GroveSelect(&articles).(*esdriver.SearchQuery).
Match("body", "grove").
Size(100).
Scroll(ctx, "5m")
if err != nil {
log.Fatal(err)
}
defer cursor.Close() // clears the scroll context on the server
for cursor.Next() {
var article Article
if err := cursor.Decode(&article); err != nil {
log.Fatal(err)
}
fmt.Println(article.Title)
}
if err := cursor.Err(); err != nil {
log.Fatal(err)
}The cursor automatically fetches the next batch when the current batch is exhausted. Call Close() when done to release server-side resources.
Transactions
Elasticsearch does not support multi-document ACID transactions. Calling grove.DB.BeginTx() returns grove.ErrNotSupported.
Use NewBulk() for atomic batching of multiple writes in a single request.
Result Type
Write operations return *EsResult with the following methods:
| Method | Description |
|---|---|
RowsAffected() | Number of documents affected |
DocumentID() | The _id of the document |
Version() | Document version after operation |
Action() | Result action: "created", "updated", "deleted", "noop" |
LastInsertId() returns ErrLastInsertIDNotSupported. Use DocumentID() instead.
Options
Configure the driver when calling Open:
| Option | Description |
|---|---|
esdriver.WithBasicAuth(user, pass) | HTTP basic authentication |
esdriver.WithAPIKey(key) | API key authentication |
esdriver.WithCloudID(id) | Elastic Cloud deployment ID |
esdriver.WithCACert(cert) | TLS CA certificate (PEM bytes) |
esdriver.WithRefresh(policy) | Default refresh policy: "true", "false", "wait_for" |
esdriver.WithMaxRetries(n) | Maximum retry attempts |
esdriver.WithTransport(t) | Custom http.RoundTripper |
esdriver.WithAddresses(addrs...) | Override parsed addresses |
Compatible Services
The Elasticsearch driver works with any Elasticsearch-compatible service:
- Elasticsearch 7.x, 8.x
- OpenSearch -- AWS-managed or self-hosted Elasticsearch fork
- Amazon OpenSearch Service -- fully managed Elasticsearch on AWS
- Elastic Cloud -- managed Elasticsearch from Elastic