converge.runtime

Execution loop, scheduler, and executor. AgentRuntime runs the agent loop: wait for work, poll inbox and task manager, call agent.decide(), execute decisions via StandardExecutor (or fallback when no managers are set). Optional claim_ttl_interval_sec enables automatic release of expired task claims; optional task_poll_interval_sec runs a periodic task poll that notifies the scheduler when pending tasks change so the loop wakes sooner. Inbox buffers messages (bounded, optional drop-when-full). Scheduler provides event-driven wake-up with timeout. StandardExecutor handles SendMessage, SubmitTask, ClaimTask, JoinPool, LeavePool, CreatePool, ReportTask; it uses the network and managers injected at construction. Runtime supports optional network injection, ops_server wiring, and runtime_hooks.

class converge.runtime.loop.Inbox(maxsize: int | None = None, *, drop_when_full: bool = False)[source]

Bases: object

Buffers incoming messages for the agent. Supports bounded queue with configurable behavior when full.

Custom inbox: Any object that implements async push(message) and poll(batch_size=10) -> list can be passed to AgentRuntime as inbox=.

async push(message: Any) None[source]
poll(batch_size: int = 10) list[Any][source]

Get all currently available messages up to batch_size. Non-blocking.

class converge.runtime.loop.AgentRuntime(agent: Agent, transport: Transport, pool_manager=None, task_manager=None, metrics_collector: MetricsCollector | None = None, discovery_service: DiscoveryService | None = None, agent_descriptor: AgentDescriptor | None = None, identity_registry: IdentityRegistry | None = None, replay_log: ReplayLog | None = None, tool_registry=None, checkpoint_store: Store | None = None, checkpoint_interval_sec: float = 60.0, inbox=None, inbox_kwargs: dict[str, Any] | None = None, scheduler=None, executor_factory=None, executor_kwargs: dict[str, Any] | None = None, health_check=None, ready_check=None, receive_timeout_sec: float | None = 30.0, claim_ttl_interval_sec: float | None = None, task_poll_interval_sec: float | None = None, max_tool_loop_iterations: int = 5, scheduler_timeout_sec: float = 1.0, task_refresh_interval_sec: float | None = 0.5, pool_cache_ttl_sec: float | None = 1.0, *, network: AgentNetwork | None = None, ops_server: RuntimeOpsServer | None = None, runtime_hooks: list[RuntimeHook] | None = None, allow_network_transport_mismatch: bool = False)[source]

Bases: object

Manages the execution loop of an agent.

__init__(agent: Agent, transport: Transport, pool_manager=None, task_manager=None, metrics_collector: MetricsCollector | None = None, discovery_service: DiscoveryService | None = None, agent_descriptor: AgentDescriptor | None = None, identity_registry: IdentityRegistry | None = None, replay_log: ReplayLog | None = None, tool_registry=None, checkpoint_store: Store | None = None, checkpoint_interval_sec: float = 60.0, inbox=None, inbox_kwargs: dict[str, Any] | None = None, scheduler=None, executor_factory=None, executor_kwargs: dict[str, Any] | None = None, health_check=None, ready_check=None, receive_timeout_sec: float | None = 30.0, claim_ttl_interval_sec: float | None = None, task_poll_interval_sec: float | None = None, max_tool_loop_iterations: int = 5, scheduler_timeout_sec: float = 1.0, task_refresh_interval_sec: float | None = 0.5, pool_cache_ttl_sec: float | None = 1.0, *, network: AgentNetwork | None = None, ops_server: RuntimeOpsServer | None = None, runtime_hooks: list[RuntimeHook] | None = None, allow_network_transport_mismatch: bool = False)[source]

Initialize the agent runtime.

Parameters:
  • agent – The agent instance.

  • transport – Transport for sending and receiving messages.

  • pool_manager – Optional pool manager for pool membership.

  • task_manager – Optional task manager for task lifecycle.

  • metrics_collector – Optional metrics collector.

  • discovery_service – Optional discovery service. When set, the agent is registered on start() and unregistered on stop() so peers can discover it by topic/capability.

  • agent_descriptor – Optional descriptor for discovery. If discovery_service is set and agent_descriptor is None, a descriptor is built from the agent (id, topics, capabilities) at start().

  • identity_registry – Optional registry mapping agent fingerprints to public keys. When set, the runtime uses receive_verified() and drops messages that fail verification (log at debug). Populate from discovery or store to enable verified receive.

  • replay_log – Optional replay log. When set, incoming messages (in _listen_transport) and outgoing messages (SendMessage in executor) are recorded for audit and replay.

  • tool_registry – Optional ToolRegistry for InvokeTool decisions.

  • checkpoint_store – Optional store for writing periodic checkpoints (agent_id -> last_activity_ts) for observability. Does not affect message replay; pool/task state is restored by using the same store for PoolManager and TaskManager on restart.

  • checkpoint_interval_sec – Interval in seconds between checkpoint writes when checkpoint_store is set.

  • inbox – Optional custom inbox. Must implement push(message) and poll(batch_size) -> list. If None, an Inbox is created with inbox_kwargs.

  • inbox_kwargs – Optional dict of kwargs for the default Inbox when inbox is None (e.g. maxsize, drop_when_full).

  • scheduler – Optional custom scheduler. Must implement notify() and wait_for_work(timeout) -> bool. If None, the default Scheduler() is used.

  • executor_factory – Optional callable (agent_id, network, task_manager, pool_manager, **kwargs) -> Executor. When provided, the runtime calls it in the run loop to obtain the executor instead of building StandardExecutor. Use for custom executors or to inject extra dependencies.

  • executor_kwargs – Optional dict of kwargs passed to StandardExecutor when executor_factory is not used (e.g. custom_handlers, safety_policy, bidding_protocols, tool_timeout_sec, tool_allowlist). Ignored if executor_factory is set.

  • health_check – Optional callable () -> bool. When set, is_healthy() delegates to it. Optionally expose via RuntimeOpsServer helper.

  • ready_check – Optional callable () -> bool. When set, is_ready() delegates to it.

  • receive_timeout_sec – Optional timeout for transport.receive() so the loop can react to shutdown. When set, receive() is called with this timeout; TimeoutError is caught and the loop continues. None means no timeout (block until message).

  • claim_ttl_interval_sec – Optional interval in seconds. When set and task_manager is set, the run loop calls task_manager.release_expired_claims(time.monotonic()) at most once per interval so expired task claims are released automatically. Ignored if task_manager is None.

  • task_poll_interval_sec – Optional interval in seconds. When set and task_manager is set, a background task periodically checks whether the set of pending tasks for this agent has changed and calls scheduler.notify() so the main loop wakes and processes tasks sooner. Ignored if task_manager is None.

  • max_tool_loop_iterations – Max ReAct tool loop iterations (run InvokeTool, feed result back to decide). 0 disables.

  • scheduler_timeout_sec – Max seconds to wait in scheduler.wait_for_work() before periodic wake.

  • task_refresh_interval_sec – Optional interval for refreshing task visibility from shared store when polling for tasks. None disables periodic refresh (cache-only reads).

  • pool_cache_ttl_sec – Optional TTL for cached pool membership lookups. None disables caching.

  • network – Optional injected AgentNetwork. When None, runtime builds AgentNetwork(transport).

  • ops_server – Optional RuntimeOpsServer helper. When set, runtime starts/stops it with lifecycle.

  • runtime_hooks – Optional runtime hooks for fallback send and unverified receive drop.

  • allow_network_transport_mismatch – If False and network is provided, require network.transport to be the same object as transport.

is_healthy() bool[source]

Return health status. Delegates to health_check callable if set, else True.

is_ready() bool[source]

Return readiness status. Delegates to ready_check callable if set, else True.

async start() None[source]

Start the agent loop.

async stop() None[source]

Stop the agent loop.

class converge.runtime.scheduler.Scheduler[source]

Bases: object

Event-driven scheduler for the agent runtime. Replaces busy-wait polling with asyncio.Event signaling.

Custom scheduler: Any object that implements notify() and async wait_for_work(timeout: float | None) -> bool can be passed to AgentRuntime as scheduler=.

notify() None[source]

Notify the scheduler that new work is available. This will wake up the loop if it is waiting.

async wait_for_work(timeout: float | None = None) bool[source]

Wait until new work is available or timeout occurs.

Parameters:

timeout (float) – Max time to wait in seconds.

Returns:

True if woken by event, False if timeout.

Return type:

bool

class converge.runtime.executor.Executor(*args, **kwargs)[source]

Bases: Protocol

Protocol for action executors.

async execute(decisions: list[Decision]) None[source]
class converge.runtime.executor.StandardExecutor(agent_id: str, network: AgentNetwork | None, task_manager: TaskManager, pool_manager: PoolManager, metrics_collector: MetricsCollector | None = None, bidding_protocols: dict[str, BiddingProtocol] | None = None, negotiation_protocol: NegotiationProtocol | None = None, delegation_protocol: DelegationProtocol | None = None, votes_store: dict[str, list[tuple[str, Any]]] | None = None, safety_policy: tuple[ResourceLimits | None, ActionPolicy | None] | None = None, replay_log: ReplayLog | None = None, tool_registry: ToolRegistry | None = None, custom_handlers: dict[type, Callable[[Decision], Awaitable[None]]] | None = None, tool_timeout_sec: float | None = None, tool_allowlist: set[str] | None = None, reflect_result: Callable[[str, Any], Any] | Callable[[str, Any], Awaitable[Any]] | None = None, rate_limiter: RateLimiter | None = None, coordination_metrics: CoordinationMetrics | None = None)[source]

Bases: object

Standard implementation of the Executor. Directly acts upon the Network and Managers; optionally runs coordination protocols (bidding, negotiation, delegation) and records votes.

__init__(agent_id: str, network: AgentNetwork | None, task_manager: TaskManager, pool_manager: PoolManager, metrics_collector: MetricsCollector | None = None, bidding_protocols: dict[str, BiddingProtocol] | None = None, negotiation_protocol: NegotiationProtocol | None = None, delegation_protocol: DelegationProtocol | None = None, votes_store: dict[str, list[tuple[str, Any]]] | None = None, safety_policy: tuple[ResourceLimits | None, ActionPolicy | None] | None = None, replay_log: ReplayLog | None = None, tool_registry: ToolRegistry | None = None, custom_handlers: dict[type, Callable[[Decision], Awaitable[None]]] | None = None, tool_timeout_sec: float | None = None, tool_allowlist: set[str] | None = None, reflect_result: Callable[[str, Any], Any] | Callable[[str, Any], Awaitable[Any]] | None = None, rate_limiter: RateLimiter | None = None, coordination_metrics: CoordinationMetrics | None = None)[source]

Initialize the executor.

Parameters:
  • agent_id – The fingerprint of the agent executing decisions.

  • network – Optional network for sending messages.

  • task_manager – Task manager for task lifecycle.

  • pool_manager – Pool manager for pool membership.

  • metrics_collector – Optional metrics collector.

  • bidding_protocols – Optional dict of auction_id -> BiddingProtocol for SubmitBid.

  • negotiation_protocol – Optional protocol for Propose/AcceptProposal/RejectProposal.

  • delegation_protocol – Optional protocol for Delegate/RevokeDelegation.

  • votes_store – Optional dict vote_id -> list of (agent_id, option) for Vote.

  • safety_policy – Optional (ResourceLimits, ActionPolicy). When set, ActionPolicy restricts which decision types are allowed; ResourceLimits validate SubmitTask/ClaimTask constraints (cpu, memory_mb).

  • replay_log – Optional replay log. When set, SendMessage decisions record the sent message for audit and replay.

  • tool_registry – Optional registry of tools. When set, InvokeTool decisions look up the tool by name and run it with the given params; result is discarded (agent may emit ReportTask separately to report results).

  • custom_handlers – Optional dict mapping decision type -> async handler(decision). For custom Decision subclasses, register a handler so the executor runs it instead of logging “Unknown decision type”.

  • tool_timeout_sec – Optional timeout in seconds for tool execution. When set, tool.run(params) is run in a thread and must complete within this time or the call is cancelled and an error is logged; the agent may report task failure separately.

  • tool_allowlist – Optional set of allowed tool names. When set, InvokeTool for a tool not in this set is skipped and a warning is logged.

  • reflect_result – Optional callable (task_id, result) -> revised result (or awaitable). When set, called before committing ReportTask; the return value is used as the result (reflection step).

  • rate_limiter – Optional rate limiter for outbound SendMessage enforcement (egress).

  • coordination_metrics – Optional structured task/pool metrics helper.

async execute(decisions: list[Decision]) None[source]

Execute a batch of decisions.

Parameters:

decisions (List[Decision]) – The decisions to execute.

class converge.runtime.hooks.RuntimeHook(*args, **kwargs)[source]

Bases: Protocol

on_fallback_pre_send(message: Message) Message | None[source]
on_unverified_drop(context: dict[str, Any]) None[source]