Implementing Store for external backends¶
The Store interface (put, get, delete, list(prefix), and optional put_if_absent) can be implemented over any key-value backend. This guide outlines patterns for Redis, SQLite, or a database.
Built-in Redis and SQLite backends¶
Converge provides RedisStore and SQLiteStore under converge.extensions.storage with atomic put_if_absent, suitable for multi-process deployments.
SQLiteStore (
converge.extensions.storage.sqlite_store): Single file path, stdlib only.SQLiteStore(path: str | Path); table(key TEXT PRIMARY KEY, value BLOB); pickle serialization;list(prefix)viaLIKE prefix%;put_if_absentviaINSERT OR IGNORE.RedisStore (
converge.extensions.storage.redis_store): Requires source extra installpip install -e ".[store-backends]"(addsredis>=5.0).RedisStore(redis_url="redis://localhost:6379/0", *, client=None); pickle serialization;list(prefix)via SCAN;put_if_absentvia SET NX.
VersionedStore and migration hook¶
A VersionedStore wrapper (converge.extensions.storage.versioned) adds a reserved metadata key __meta__:schema_version and optional migration callback. Use it with any Store:
get_version() / set_version(version): Read or write the schema version (e.g. integer or string).
check_version(): Returns the current stored version; if you passed on_version_change(old_version, new_version) to the constructor, the callback is invoked once when the stored version differs from the last seen value (e.g. after an upgrade). Use this to run migrations or clear caches.
Example: wrap a store, set initial version, and register a callback for upgrades:
from converge.extensions.storage.versioned import VersionedStore, SCHEMA_VERSION_KEY
from converge.extensions.storage.memory import MemoryStore
store = MemoryStore()
def on_change(old, new):
print(f"Version changed from {old} to {new}") # e.g. run migrations
versioned = VersionedStore(store, on_version_change=on_change)
versioned.set_version(1)
assert versioned.get_version() == 1
versioned.set_version(2)
versioned.check_version() # calls on_change(1, 2)
Interface contract¶
put(key, value): Store or overwrite. Value must be serializable (e.g. pickle, JSON, or your format).
get(key): Return the value or None if absent.
delete(key): Remove the key; no-op if absent.
list(prefix): Return keys that start with
prefix(e.g."task:"for all task keys).put_if_absent(key, value): Store only if key is absent; return True if stored, False if key existed. Override for atomicity (e.g. Redis SETNX, SQLite INSERT OR IGNORE).
Capability flags: Backends may expose
atomic_put_if_absentandsupports_lockingbooleans to describe operational guarantees to callers.
Redis¶
Use a single key per entry; keys can include colons (e.g. task:uuid). Serialize values with pickle, msgpack, or JSON. list(prefix) maps to KEYS prefix* or SCAN (prefer SCAN in production). put_if_absent can use SET key value NX for atomicity.
SQLite¶
Single table: (key TEXT PRIMARY KEY, value BLOB). Serialize values to bytes (pickle or msgpack). list(prefix) is SELECT key FROM store WHERE key LIKE prefix||'%'. put_if_absent can use INSERT OR IGNORE or a conditional insert.
Database (e.g. PostgreSQL)¶
Same pattern as SQLite: key-value table, optional version column for optimistic concurrency. put_if_absent can use INSERT ... ON CONFLICT (key) DO NOTHING and check row count.
Serialization¶
PoolManager, TaskManager, and DiscoveryService store Task, Pool, and AgentDescriptor objects. Ensure your serializer handles these types (pickle does; for JSON you would need a custom encoder or store dicts only). Schema or type changes may require a migration or version field.