Customization and extensibility¶
The framework is designed so that most components can be replaced or extended for your use case. This page summarizes what is configurable and how to plug in custom behavior.
Runtime¶
Transport: Use any implementation of the Transport interface (
start,stop,send(message),receive()). Built-in: Local, TCP, WebSocket (optional). Pass toAgentRuntime(transport=...).Inbox: Default
Inboxsupportsmaxsizeanddrop_when_full. To customize: passinbox=your_inboxtoAgentRuntime. Your object must implementasync push(message)andpoll(batch_size=10) -> list. Or passinbox_kwargs={"maxsize": 100, "drop_when_full": True}to configure the default Inbox.Scheduler: Default event-driven scheduler can be replaced with
scheduler=your_scheduler. Your object must implementnotify()andasync wait_for_work(timeout) -> bool.Executor: Two options:
executor_factory: Callable
(agent_id, network, task_manager, pool_manager, **kwargs) -> Executor. The runtime calls it each run loop to get the executor. Use for a fully custom executor.executor_kwargs: Dict of kwargs passed to the default
StandardExecutor(e.g.custom_handlers,safety_policy,bidding_protocols,tool_timeout_sec,tool_allowlist). Ignored ifexecutor_factoryis set.
Other runtime options:
pool_manager,task_manager,metrics_collector,discovery_service,agent_descriptor,identity_registry,replay_log,tool_registry,checkpoint_store,checkpoint_interval_sec,health_check,ready_check,receive_timeout_sec,claim_ttl_interval_sec,task_poll_interval_sec,task_refresh_interval_sec,pool_cache_ttl_sec,network,ops_server,runtime_hooks,allow_network_transport_mismatchare all optional and configurable.
Executor and decisions¶
Custom decision types: Define a new
Decisionsubclass and register an async handler withStandardExecutor(..., custom_handlers={MyDecision: my_async_handler}). The handler receives the decision instance; run your logic and return. You can also passexecutor_kwargs={"custom_handlers": {...}}when constructing the runtime.Safety:
safety_policy=(ResourceLimits, ActionPolicy)to restrict decision types and validate task resources.Coordination: Optional
bidding_protocols,negotiation_protocol,delegation_protocol,votes_storefor built-in decision types.
Protocol lifecycle and wiring¶
Bidding, negotiation, delegation, and votes are opt-in. The runtime does not create or inject these by default; you instantiate and pass them when your agents need the corresponding decision types.
When to use
NegotiationProtocol: When agents can emit
Propose,AcceptProposal,RejectProposal(e.g. multi-step agreements). Create one instance and pass it to the executor; sessions are identified bysession_idin decisions.BiddingProtocol: When agents submit bids in auctions. Create one protocol per auction (or a factory) and pass a dict
auction_id -> BiddingProtocolasbidding_protocols.DelegationProtocol: When agents can
DelegateorRevokeDelegation. One shared instance is typical; the protocol tracks delegation bydelegation_id.votes_store: A dict
vote_id -> list of (agent_id, option)so the executor can recordVotedecisions. Use a shared store (e.g. in-memory dict or one backed by your Store) when multiple runtimes participate in the same votes.
Scoping
Per session: Negotiation is session-scoped via
session_id; create oneNegotiationProtocoland use different session IDs per negotiation.Per pool: You can key protocols by pool (e.g.
bidding_protocolskeyed by pool or auction_id that encodes pool) if each pool has its own auctions.Global: A single
negotiation_protocol,delegation_protocol, orvotes_storeshared by all runtimes is common when all agents participate in the same coordination.
Wiring into the runtime
Pass protocol instances via executor_kwargs when constructing the runtime; the runtime builds the default StandardExecutor with these kwargs:
from converge.coordination.bidding import BiddingProtocol
from converge.coordination.negotiation import NegotiationProtocol
from converge.coordination.delegation import DelegationProtocol
negotiation = NegotiationProtocol()
delegation = DelegationProtocol()
votes_store = {} # or a dict-like backed by your Store
bidding_protocols = {"main_auction": BiddingProtocol(...)}
runtime = AgentRuntime(
agent=agent,
transport=transport,
task_manager=task_manager,
pool_manager=pool_manager,
executor_kwargs={
"negotiation_protocol": negotiation,
"delegation_protocol": delegation,
"votes_store": votes_store,
"bidding_protocols": bidding_protocols,
},
)
Agents that emit Propose, Vote, SubmitBid, or Delegate will have those decisions executed only when the corresponding protocol or store is provided; otherwise the executor logs that the decision was ignored.
Other executor options¶
Tools:
tool_registry(ToolRegistry) forInvokeTool; implement the Tool protocol (name,run(params)). Optionaltool_timeout_secandtool_allowlist(set) on StandardExecutor for execution timeout and allowlist. See Security.
Agent¶
Subclass Agent: Override
decide(messages, tasks),on_start,on_stop,on_tick,sign_messageas needed. The runtime only requires an object withid(and optionallycapabilities,topicsfor discovery/scoping).
Policy¶
Admission: Implement
AdmissionPolicy(can_admit(agent_id, context)) and pass tocreate_pool(..., admission_policy=...)or on the pool spec.Trust: Implement
TrustModel(get_trust,update_trust). Pools can usetrust_modelandtrust_threshold;join_poolchecks trust when set.Governance: Subclass
GovernanceModeland implementresolve_dispute(context). Pass tocreate_pool(..., governance_model=...)or call when resolving disputes. Built-in: Democratic, Dictatorial, Bicameral, Veto, Empirical.Safety: Use or extend
ResourceLimits,ActionPolicy,validate_safety; pass assafety_policyto the executor.
Storage and discovery¶
Store: Any implementation of the Store interface (
put,get,delete,list(prefix), and optionally put_if_absent for atomic put-when-absent). Built-in: MemoryStore (atomic put_if_absent), FileStore (put_if_absent not atomic across processes). Used by PoolManager, TaskManager, DiscoveryService, checkpoint. Stored values must be serializable; schema changes may require migration. See Store backends for implementing Store with Redis, SQLite, or a database.Discovery:
DiscoveryService(store=...); you can implement custom discovery by providing a different store or wrapping the service.AgentDescriptorcan carry optionalpublic_keyfor verification.Identity registry: Implement or use
IdentityRegistry(fingerprint → public key) forreceive_verified()on transports.
Observability¶
Metrics: Pass
metrics_collector(e.g.MetricsCollector) to the runtime/executor; implement your own collector withinc,gauge,snapshotif needed.MetricsCollector.format_prometheus()returns Prometheus text exposition format for scrape endpoints.Replay: Pass
replay_log(e.g.ReplayLog) to record inbound/outbound messages (record_inbound,record_outbound;record_messageremains as compatibility alias).Tracing: The runtime uses
trace()from observability; register a SpanExporter viaregister_span_exporter(exporter)soexport(span, duration_sec)is called when each trace context exits. You can forward to OpenTelemetry or logging.Health/readiness: Pass
health_checkandready_checkcallables to the runtime;is_healthy()andis_ready()delegate to them. For HTTP exposure, useRuntimeOpsServer(/health,/ready,/metrics) and pass asops_server.Transport middleware: Use
HookedTransport(base_transport, hooks=[...])and implementMessageHook(pre_send,post_receive, optionalon_error) for reusable policy and filtering.Rate limiting: Use
RateLimiterandRateLimitHookto enforce ingress and egress token-bucket limits.
Extensions¶
LLM provider: Implement
chat(messages, **kwargs) -> str; optionallyachat(messages, **kwargs) -> strfor async runtimes andchat_stream(...) -> AsyncIterator[str]. Used by LLMAgent.Tools: Implement the Tool protocol and register on a
ToolRegistry.
Summary table¶
Component |
How to customize |
|---|---|
Transport |
Implement Transport; pass to runtime |
Inbox |
Pass |
Scheduler |
Pass |
Executor |
Pass |
Agent |
Subclass Agent; override decide, lifecycle |
AdmissionPolicy |
Implement; pass in pool spec |
TrustModel |
Implement; set on pool |
GovernanceModel |
Subclass; pass governance_model in pool spec |
Store |
Implement Store; pass to managers/discovery |
MetricsCollector |
Implement; pass to runtime/executor |
ReplayLog |
Implement record_inbound/record_outbound (record_message supported for compatibility); pass to runtime |
claim_ttl_interval_sec |
Pass to runtime for automatic release_expired_claims |
task_poll_interval_sec |
Pass to runtime for periodic task-poll wake-up |
network injection |
Pass |
ops HTTP helper |
Pass |
transport hooks |
Wrap base transport in |
rate limiting |
Use |
Tool |
Implement Tool protocol; register on ToolRegistry |
LLM provider |
Implement chat (and optionally chat_stream) |
For API details, see the API reference and the docstrings of the classes above.