Presence & Awareness
Ephemeral real-time presence for collaborative features — typing indicators, cursor tracking, active users, and user metadata.
Grove CRDT includes an optional presence system for tracking ephemeral, real-time state — who's online, what they're doing, where their cursor is. Unlike persistent CRDT data (LWW, Counter, Set), presence is in-memory only, automatically expires via TTL, and is designed for awareness features in collaborative applications.
Overview
Presence solves "who else is here?" questions:
- Active users — show avatars of people viewing a document
- Cursor tracking — render other users' cursor positions on a canvas
- Typing indicators — show "Alice is typing..." in a chat
- User metadata — display names, roles, or status alongside edits
Presence is topic-scoped. A topic is any string — typically "table:pk" for per-document presence (e.g., "documents:doc-1"), or an arbitrary name for rooms/channels (e.g., "lobby").
Architecture
Client A Server (Go) Client B
──────── ─────────── ────────
updatePresence() PresenceManager
─── POST /sync/presence ──→ (in-memory map + TTL)
──── SSE "presence" event ────→ stream handler
applyEvent()
leavePresence()
─── POST /sync/presence ──→ remove entry
(data: null) ──── SSE "leave" event ───────→ remove from store
[disconnect] RemoveNode(nodeID)
──── SSE "leave" events ──────→ cleanupFlow:
- Client sends presence updates via
POST /sync/presence - Server stores in-memory with TTL (default 30s), keyed by
topic + nodeID - Server broadcasts presence events to all SSE-connected clients
- Clients receive
"presence"SSE events and update their localPresenceManager - On SSE disconnect or TTL expiry, the server removes the entry and broadcasts a
"leave"event
Server Setup (Go)
Forge Integration
Enable presence on the SyncController with two options:
app.Use(groveext.New(
groveext.WithDriver(pgdb),
groveext.WithCRDT(plugin, hook.Scope{Tables: []string{"documents"}}),
groveext.WithSyncController(
crdt.WithStreamPollInterval(2 * time.Second),
crdt.WithStreamKeepAlive(30 * time.Second),
crdt.WithPresenceEnabled(true), // Enable presence
crdt.WithPresenceTTL(30 * time.Second), // Entry expiry (default: 30s)
),
))This auto-registers two additional routes alongside the existing sync endpoints:
| Method | Path | Description |
|---|---|---|
| POST | /sync/presence | Update or leave a topic |
| GET | /sync/presence?topic=... | Get current presence snapshot |
The SSE stream at GET /sync/stream also broadcasts "presence" events when enabled.
Controller Options
| Option | Default | Description |
|---|---|---|
WithPresenceEnabled(bool) | false | Enable the presence subsystem |
WithPresenceTTL(duration) | 30s | Time before an idle entry expires |
When presence is disabled (the default), the routes are not registered and there is zero overhead.
Standalone Server
For non-Forge setups, the NewHTTPHandler also registers presence routes when enabled:
ctrl := crdt.NewSyncController(plugin,
crdt.WithPresenceEnabled(true),
crdt.WithPresenceTTL(30 * time.Second),
)
defer ctrl.Close()
handler := crdt.NewHTTPHandler(ctrl)
// Registers: POST /pull, POST /push, POST /presence, GET /presence
http.Handle("/sync/", handler)HTTP API
Update Presence
POST /sync/presence
{
"node_id": "browser-abc",
"topic": "documents:doc-1",
"data": {
"name": "Alice",
"cursor": { "x": 120, "y": 340 },
"isTyping": false
}
}Response: the resulting PresenceEvent (type "join" for new entries, "update" for existing).
{
"type": "join",
"node_id": "browser-abc",
"topic": "documents:doc-1",
"data": { "name": "Alice", "cursor": { "x": 120, "y": 340 }, "isTyping": false }
}Leave a Topic
Send a presence update with data: null:
{
"node_id": "browser-abc",
"topic": "documents:doc-1",
"data": null
}Get Presence Snapshot
GET /sync/presence?topic=documents:doc-1
{
"topic": "documents:doc-1",
"states": [
{
"node_id": "browser-abc",
"topic": "documents:doc-1",
"data": { "name": "Alice", "cursor": { "x": 120, "y": 340 } },
"updated_at": "2025-01-15T10:30:00Z",
"expires_at": "2025-01-15T10:30:30Z"
}
]
}SSE Presence Events
When presence is enabled, the SSE stream includes "presence" events alongside "change" and "changes":
event: presence
data: {"type":"join","node_id":"browser-abc","topic":"documents:doc-1","data":{"name":"Alice"}}
event: presence
data: {"type":"update","node_id":"browser-abc","topic":"documents:doc-1","data":{"name":"Alice","isTyping":true}}
event: presence
data: {"type":"leave","node_id":"browser-abc","topic":"documents:doc-1"}Event Types
| Type | When |
|---|---|
join | A new node joins a topic for the first time |
update | An existing node updates its presence data |
leave | A node explicitly leaves, disconnects, or its entry expires |
Automatic Disconnect Cleanup
When a client's SSE connection drops, the server calls RemoveNode(nodeID) to clean up all presence entries for that node and broadcast "leave" events. This requires the node_id query parameter on the stream URL:
GET /sync/stream?tables=documents&node_id=browser-abcTTL & Heartbeats
Presence entries expire after the configured TTL (default 30 seconds). To keep presence alive, clients send periodic heartbeat updates — re-POSTing the same data at a regular interval.
Server-side:
- The
PresenceManagerruns a background cleanup goroutine atTTL / 2intervals - Expired entries are removed and
"leave"events are broadcast automatically
Client-side:
- The TypeScript client starts a heartbeat timer when
updatePresence()is called - Default heartbeat interval: 10 seconds (configurable via
PresenceConfig.heartbeatInterval) - Heartbeats stop when
leavePresence()is called or the component unmounts
TypeScript Client
CRDTClient Presence Methods
const client = new CRDTClient({
baseURL: "/api/sync",
nodeID: "browser-1",
tables: ["documents"],
presence: { heartbeatInterval: 10000 }, // Optional: defaults to 10s
});
// Update your presence for a topic (starts heartbeat automatically).
await client.updatePresence("documents:doc-1", {
name: "Alice",
cursor: { x: 120, y: 340 },
});
// Leave a topic (stops heartbeat, notifies server).
await client.leavePresence("documents:doc-1");
// Get all current presence for a topic from the server.
const states = await client.getPresence("documents:doc-1");
// Leave all topics (call on cleanup/unmount).
await client.leaveAllPresence();PresenceManager (Local Store)
The CRDTClient exposes a presence field — a read-only local store of remote peers' presence state, updated automatically via SSE events:
// Access the local presence manager.
const manager = client.presence;
// Get all peers on a topic (excludes your own node).
const peers = manager.getPresence<{ name: string }>("documents:doc-1");
// Get a specific peer's presence.
const alice = manager.getPeer("documents:doc-1", "browser-alice");
// Subscribe to changes on a topic (useSyncExternalStore compatible).
const unsub = manager.subscribe("documents:doc-1", () => {
console.log("Presence changed:", manager.getPresence("documents:doc-1"));
});
// Subscribe to all presence changes across all topics.
const unsub = manager.subscribeAll(() => {
console.log("Any presence changed");
});React Hooks
usePresence
Subscribe to presence for any topic:
import { usePresence } from "@grove-js/crdt/react";
interface UserPresence {
name: string;
cursor: { x: number; y: number } | null;
isTyping: boolean;
}
function CollaborativeEditor({ roomId }: { roomId: string }) {
const { others, updateMyPresence, leave } = usePresence<UserPresence>(roomId);
return (
<div>
{/* Show who else is here */}
{others.map((peer) => (
<div key={peer.node_id}>
{peer.data.name} {peer.data.isTyping ? "is typing..." : ""}
</div>
))}
<textarea
onKeyDown={() => updateMyPresence({ isTyping: true })}
onBlur={() => updateMyPresence({ isTyping: false, cursor: null })}
/>
</div>
);
}Returns:
| Field | Type | Description |
|---|---|---|
others | PresenceState<T>[] | All other peers' presence for this topic |
updateMyPresence | (data: Partial<T>) => void | Update your own presence |
leave | () => void | Leave the topic |
The hook automatically leaves the topic when the component unmounts.
useDocumentPresence
Convenience wrapper that constructs the topic as "table:pk":
import { useDocumentPresence } from "@grove-js/crdt/react";
function DocumentEditor({ docId }: { docId: string }) {
const { others, updateMyPresence } = useDocumentPresence<{
name: string;
color: string;
}>("documents", docId);
useEffect(() => {
updateMyPresence({ name: "Alice", color: "#3b82f6" });
}, []);
return (
<div>
<div className="active-users">
{others.map((p) => (
<span key={p.node_id} style={{ color: p.data.color }}>
{p.data.name}
</span>
))}
</div>
{/* ... editor content ... */}
</div>
);
}Topic Naming Conventions
| Pattern | Example | Use Case |
|---|---|---|
"table:pk" | "documents:doc-1" | Per-document presence |
"room-name" | "lobby" | Chat rooms, lobbies |
"table:pk:section" | "docs:doc-1:comments" | Sub-sections of a document |
The useDocumentPresence(table, pk) hook constructs "table:pk" automatically.
Go API Reference
Types
type PresenceState struct {
NodeID string `json:"node_id"`
Topic string `json:"topic"`
Data json.RawMessage `json:"data"`
UpdatedAt time.Time `json:"updated_at"`
ExpiresAt time.Time `json:"expires_at"`
}
type PresenceUpdate struct {
NodeID string `json:"node_id"`
Topic string `json:"topic"`
Data json.RawMessage `json:"data"`
}
type PresenceEvent struct {
Type string `json:"type"` // "join", "update", "leave"
NodeID string `json:"node_id"`
Topic string `json:"topic"`
Data json.RawMessage `json:"data,omitempty"`
}
type PresenceSnapshot struct {
Topic string `json:"topic"`
States []PresenceState `json:"states"`
}Constants
| Constant | Value | Description |
|---|---|---|
PresenceJoin | "join" | New node joined a topic |
PresenceUpdateEvt | "update" | Existing node updated its data |
PresenceLeave | "leave" | Node left or entry expired |
PresenceManager
| Method | Signature | Description |
|---|---|---|
NewPresenceManager | (ttl, onChange, logger) *PresenceManager | Create with TTL, callback, and logger; starts cleanup goroutine |
Update | (update PresenceUpdate) PresenceEvent | Upsert presence, returns join/update event |
Remove | (topic, nodeID string) *PresenceEvent | Explicitly remove an entry |
RemoveNode | (nodeID string) []PresenceEvent | Remove all entries for a node (on disconnect) |
Get | (topic string) []PresenceState | Get all active presence for a topic |
GetTopicsForNode | (nodeID string) []string | Get all topics a node is present in |
Close | () | Stop the cleanup goroutine |
SyncController Presence Methods
| Method | Signature | Description |
|---|---|---|
HandlePresenceUpdate | (ctx, *PresenceUpdate) (*PresenceEvent, error) | Process a presence update (validates, delegates to manager) |
HandleGetPresence | (ctx, topic string) (*PresenceSnapshot, error) | Get presence snapshot for a topic |
Presence | () *PresenceManager | Access the underlying manager (nil if disabled) |
PresenceChannel | () <-chan PresenceEvent | Broadcast channel for stream integration |
Close | () | Clean up controller resources |
Helper
| Function | Signature | Description |
|---|---|---|
MarshalPresenceEvent | (event PresenceEvent) ([]byte, error) | Serialize event to JSON for SSE |