LangGraph checkpointer¶
In LangGraph, a checkpointer is what persists a graph's state after every step, so a
conversation (a thread) can be paused, resumed, inspected, or rewound. LangGraph ships
InMemorySaver (fast but not persistent) and SqliteSaver (persistent but slower -
every step commits to disk).
SwarmStateSaver is a drop-in replacement for either: same
BaseCheckpointSaver interface (put, put_writes, get_tuple, list, plus the async
variants), but backed by a swarmstate Store with a Rust core. You get
faster checkpoints than SQLite (see the benchmarks), and - because the
state lives in a Store - the ability to snapshot or roll back every thread at once.
Point it at a persistent backend (DiskStore, RedisStore or
PostgresStore) and those checkpoints survive restarts, too.
One-line swap¶
from swarmstate.integrations.langgraph import SwarmStateSaver
graph = builder.compile(checkpointer=SwarmStateSaver()) # was SqliteSaver(...)
Everything else - invoke, stream, get_state, get_state_history, resuming a
thread - works unchanged.
config = {"configurable": {"thread_id": "user-42"}}
graph.invoke({"messages": [("user", "hi")]}, config)
graph.get_state(config) # resumes from the persisted checkpoint
Share one store across graphs¶
Pass a Store explicitly to share checkpoints across savers/graphs (or to keep several
independent ones):
import swarmstate as ss
from swarmstate.integrations.langgraph import SwarmStateSaver
store = ss.Store()
saver_a = SwarmStateSaver(store)
saver_b = SwarmStateSaver(store) # same underlying checkpoint DB
Snapshot / roll back the whole checkpoint DB¶
Because checkpoints live in a Store, you get cheap, atomic snapshots of every
thread at once - useful for tests, "what-if" branches, or recovery:
saver = SwarmStateSaver()
graph = builder.compile(checkpointer=saver)
snap = saver.store.snapshot() # O(1) snapshot of all threads
# ... run more turns across many threads ...
saver.store.restore(snap) # roll everything back
Serialization¶
By default the saver uses LangGraph's JsonPlusSerializer (so it handles the same
objects LangGraph does). Pass your own via serde=...:
Incremental storage (opt-in)¶
For long threads with large, mostly-stable channels, pass incremental=True. Each
channel value is then stored once per version (deduplicated) instead of writing the
whole checkpoint every step, saving storage and serialization:
Trade-off: get_tuple then does one read per channel to reassemble state, versus a
single read in the default mode. Leave it off unless channels are large and change
rarely.
Durable, shared checkpoints¶
The saver holds checkpoints in whatever Store you give it, so making them persistent is
a backend swap, nothing else changes:
from swarmstate.backends.disk import DiskStore # a SQLite file, no server
from swarmstate.integrations.langgraph import SwarmStateSaver
saver = SwarmStateSaver(DiskStore("checkpoints.db")) # or RedisStore / PostgresStore
graph = builder.compile(checkpointer=saver)
See the Disk, Redis and Postgres guides for the "which backend?" trade-offs.
Observability¶
Pass a metrics sink to measure the latency and outcome of each checkpoint operation, with zero overhead when unused:
from swarmstate.observability import InMemoryMetrics # or OpenTelemetryMetrics
metrics = InMemoryMetrics()
saver = SwarmStateSaver(metrics=metrics)
# ... run the graph ...
metrics.summary() # {"put": {"count": ..., "p50_ms": ...}, "get_tuple": {...}}
Full details in the Observability guide.
Async¶
The async methods (aput, aput_writes, aget_tuple, alist) run the store work on a
worker thread (asyncio.to_thread), so they don't block the event loop; the store
releases the GIL on its hot paths, so that work overlaps with the loop. ainvoke /
astream and async graphs work out of the box.