converge.coordination¶
Pool and task management, negotiation, consensus, bidding, and delegation. PoolManager and TaskManager create/join/leave pools and submit/claim/report tasks; both can use a store for persistence. TaskManager also supports cancel_task(task_id) (move to CANCELLED), fail_task(task_id, reason, agent_id=…) (move to FAILED), and release_expired_claims(now_ts) to return tasks to PENDING when claim_ttl_sec has elapsed (call periodically with time.monotonic()). PoolManager.get_pools_for_agent(agent_id) returns the list of pool IDs the agent has joined. TaskManager.list_pending_tasks_for_agent(agent_id, pool_ids, capabilities, topics=…, sort_by_priority=…, refresh_from_store=…) returns pending tasks visible to that agent (filtered by pool membership, capabilities, and optionally topic; optional sort by task.priority descending). Use refresh_from_store=True in shared-store/multi-process deployments for an eager refresh; default is eventual consistency. Tasks can set priority (int, default 0) and topic (Topic) for routing; the runtime passes agent topics from agent_descriptor when set. NegotiationProtocol manages sessions (propose, counter, accept, reject). Consensus provides majority and plurality voting. BiddingProtocol and DelegationProtocol support resource allocation and delegation of scope. The StandardExecutor can run coordination decisions when given optional bidding_protocols (auction_id → BiddingProtocol), negotiation_protocol, delegation_protocol, and votes_store (for Vote). See Customization — Protocol lifecycle for when and how to wire these into the executor. Decision types: SubmitBid, Vote, Propose, AcceptProposal, RejectProposal, Delegate, RevokeDelegation (see Core decisions).
- class converge.coordination.pool_manager.PoolManager(store: Store | None = None, coordination_metrics=None)[source]¶
Bases:
objectManages the lifecycle of agent pools and membership.
Persists pool state to a backing store if provided. Handles creation, membership management (join/leave), and retrieval of pool data.
- class converge.coordination.task_manager.TaskManager(store: Store | None = None, coordination_metrics: CoordinationMetrics | None = None)[source]¶
Bases:
objectManages the lifecycle of tasks from submission to completion.
Handles task persistence, assignment (claiming), and result reporting. Acts as the source of truth for task state.
- refresh_from_store() None[source]¶
Refresh in-memory task cache from the backing store.
Freshness model: eventual consistency. Callers can request this refresh before listing tasks when running in multi-process/shared-store setups.
- claim(agent_id: str, task_id: str) bool[source]¶
Attempt to claim a task for a specific agent.
A task can only be claimed if it is in the PENDING state.
- cancel_task(task_id: str) bool[source]¶
Cancel a task. Moves it to CANCELLED state.
- Parameters:
task_id – The task ID.
- Returns:
True if the task was found and cancelled, False if not found or not cancellable (e.g. already COMPLETED, FAILED, or CANCELLED).
- fail_task(task_id: str, reason: Any, *, agent_id: str | None = None) None[source]¶
Mark a task as FAILED with a reason.
- Parameters:
task_id – The task ID.
reason – Error or failure descriptor (stored in task.result).
agent_id – If set, only the assigned agent can fail the task (same as report). If None, any caller can fail (e.g. system-level failure).
- release_expired_claims(now_ts: float) list[str][source]¶
Release tasks that were claimed but not reported within claim_ttl_sec. Moves them back to PENDING and clears assigned_to / claimed_at.
Call this periodically (e.g. from runtime or a scheduler) with time.monotonic().
- Returns:
List of task IDs that were released.
- report(agent_id: str, task_id: str, result: Any) None[source]¶
Report the result of a completed task.
- async wait_until_done(task_id: str, timeout: float) Task | None[source]¶
Wait until a task reaches a terminal state (COMPLETED, FAILED, CANCELLED) or timeout.
If the task is already terminal, returns immediately. Otherwise registers an asyncio.Event that is set when report(), fail_task(), or cancel_task() is called for this task_id, then waits up to timeout seconds.
- Parameters:
task_id – The task ID to wait for.
timeout – Maximum seconds to wait.
- Returns:
The Task if it reached a terminal state (or was already terminal), None on timeout or if the task is not found.
- list_pending_tasks(*, refresh_from_store: bool = False) list[Task][source]¶
List all tasks currently in the PENDING state.
- Returns:
A list of pending tasks.
- Return type:
List[Task]
- list_pending_tasks_for_agent(agent_id: str, pool_ids: list[str] | None = None, capabilities: list[str] | None = None, topics: list[Topic] | None = None, *, sort_by_priority: bool = False, refresh_from_store: bool = False) list[Task][source]¶
List pending tasks visible to an agent given its pool membership and capabilities.
Tasks are filtered so that: if a task has pool_id set, the agent must be in that pool (pool_id in pool_ids); if a task has required_capabilities, the agent must have all of them (required_capabilities subset of capabilities). When pool_ids or capabilities is None, that filter is not applied (backward compatible).
When topics is provided, only tasks whose topic is None (visible to all) or whose topic is in the given list are included. When sort_by_priority is True, tasks are returned in descending priority order with a stable tiebreak by task id.
- Parameters:
agent_id (str) – The agent fingerprint (used for consistency; filtering is by pool_ids and capabilities).
pool_ids (List[str] | None) – Pool IDs the agent has joined. If None, pool_id filter is not applied.
capabilities (List[str] | None) – Capability names the agent has. If None, required_capabilities filter is not applied.
topics (List[Topic] | None) – If set, only tasks with no topic or with topic in this list are included.
sort_by_priority (bool) – If True, sort by task.priority descending then task.id.
- Returns:
Pending tasks that the agent is allowed to see.
- Return type:
List[Task]
- class converge.coordination.negotiation.NegotiationState(value)[source]¶
Bases:
Enum- PROPOSED = 'proposed'¶
- COUNTERED = 'countered'¶
- ACCEPTED = 'accepted'¶
- REJECTED = 'rejected'¶
- CLOSED = 'closed'¶
- class converge.coordination.negotiation.Proposal(id: str = <factory>, proposer_id: str = '', content: Any = None, timestamp: float = 0.0)[source]¶
Bases:
objectA proposal within a negotiation session.
- class converge.coordination.negotiation.NegotiationSession(id: str = <factory>, participants: list[str] = <factory>, history: list[Proposal] = <factory>, state: NegotiationState = NegotiationState.PROPOSED, current_proposal: Proposal | None = None)[source]¶
Bases:
objectTracks the state of a negotiation between agents.
- state: NegotiationState = 'proposed'¶
- class converge.coordination.negotiation.NegotiationProtocol[source]¶
Bases:
objectManages negotiation sessions and state transitions.
- sessions: dict[str, NegotiationSession]¶
- create_session(initiator_id: str, participants: list[str], initial_proposal: Any) str[source]¶
Start a new negotiation session.
- propose(session_id: str, agent_id: str, content: Any) bool[source]¶
Make a new proposal (or counter-proposal) in an existing session.
- get_session(session_id: str) NegotiationSession | None[source]¶
Retrieve a session by ID.
- Parameters:
session_id (str) – The ID of the session.
- Returns:
The session object or None.
- Return type:
Optional[NegotiationSession]
- class converge.coordination.consensus.Consensus[source]¶
Bases:
objectBasic consensus mechanisms for decision making.
- class converge.coordination.bidding.AuctionType[source]¶
Bases:
object- FIRST_PRICE_SEALED_BID = 'first_price_sealed_bid'¶
- SECOND_PRICE_SEALED_BID = 'second_price_sealed_bid'¶
- ENGLISH = 'english'¶
- DUTCH = 'dutch'¶
- class converge.coordination.bidding.BiddingProtocol(auction_type: str = 'first_price_sealed_bid')[source]¶
Bases:
objectManages auction-based coordination for task allocation or resource distribution.
Implements standard auction types and manages bid lifecycle.
- __init__(auction_type: str = 'first_price_sealed_bid')[source]¶
Initialize the bidding protocol.
- Parameters:
auction_type (str) – The type of auction to run.
- class converge.coordination.delegation.DelegationType[source]¶
Bases:
object- DIRECT = 'direct'¶
- POOLED = 'pooled'¶