converge.coordination

Pool and task management, negotiation, consensus, bidding, and delegation. PoolManager and TaskManager create/join/leave pools and submit/claim/report tasks; both can use a store for persistence. TaskManager also supports cancel_task(task_id) (move to CANCELLED), fail_task(task_id, reason, agent_id=…) (move to FAILED), and release_expired_claims(now_ts) to return tasks to PENDING when claim_ttl_sec has elapsed (call periodically with time.monotonic()). PoolManager.get_pools_for_agent(agent_id) returns the list of pool IDs the agent has joined. TaskManager.list_pending_tasks_for_agent(agent_id, pool_ids, capabilities, topics=…, sort_by_priority=…, refresh_from_store=…) returns pending tasks visible to that agent (filtered by pool membership, capabilities, and optionally topic; optional sort by task.priority descending). Use refresh_from_store=True in shared-store/multi-process deployments for an eager refresh; default is eventual consistency. Tasks can set priority (int, default 0) and topic (Topic) for routing; the runtime passes agent topics from agent_descriptor when set. NegotiationProtocol manages sessions (propose, counter, accept, reject). Consensus provides majority and plurality voting. BiddingProtocol and DelegationProtocol support resource allocation and delegation of scope. The StandardExecutor can run coordination decisions when given optional bidding_protocols (auction_id → BiddingProtocol), negotiation_protocol, delegation_protocol, and votes_store (for Vote). See Customization — Protocol lifecycle for when and how to wire these into the executor. Decision types: SubmitBid, Vote, Propose, AcceptProposal, RejectProposal, Delegate, RevokeDelegation (see Core decisions).

class converge.coordination.pool_manager.PoolManager(store: Store | None = None, coordination_metrics=None)[source]

Bases: object

Manages the lifecycle of agent pools and membership.

Persists pool state to a backing store if provided. Handles creation, membership management (join/leave), and retrieval of pool data.

create_pool(spec: dict[str, Any]) Pool[source]

Create a new pool based on a specification.

Parameters:

spec (Dict[str, Any]) – Dictionary of arguments for the Pool constructor. May include “admission_policy”: AdmissionPolicy instance; “governance_model”: GovernanceModel instance for dispute resolution.

Returns:

The newly created Pool instance.

Return type:

Pool

join_pool(agent_id: str, pool_id: str) bool[source]

Add an agent to a pool.

Parameters:
  • agent_id (str) – The fingerprint of the agent joining the pool.

  • pool_id (str) – The ID of the pool to join.

Returns:

True if the agent successfully joined, False if pool not found.

Return type:

bool

leave_pool(agent_id: str, pool_id: str) None[source]

Remove an agent from a pool.

Parameters:
  • agent_id (str) – The fingerprint of the agent leaving the pool.

  • pool_id (str) – The ID of the pool to leave.

get_pool(pool_id: str) Pool | None[source]

Retrieve a pool by its ID.

Parameters:

pool_id (str) – The ID of the pool to retrieve.

Returns:

The Pool instance, or None if not found.

Return type:

Optional[Pool]

get_pools_for_agent(agent_id: str) list[str][source]

Return the list of pool IDs that the agent is a member of.

Parameters:

agent_id (str) – The fingerprint of the agent.

Returns:

Pool IDs the agent has joined.

Return type:

List[str]

class converge.coordination.task_manager.TaskManager(store: Store | None = None, coordination_metrics: CoordinationMetrics | None = None)[source]

Bases: object

Manages the lifecycle of tasks from submission to completion.

Handles task persistence, assignment (claiming), and result reporting. Acts as the source of truth for task state.

refresh_from_store() None[source]

Refresh in-memory task cache from the backing store.

Freshness model: eventual consistency. Callers can request this refresh before listing tasks when running in multi-process/shared-store setups.

submit(task: Task) str[source]

Submit a new task to the system.

Parameters:

task (Task) – The Task object to submit.

Returns:

The unique ID of the submitted task.

Return type:

str

claim(agent_id: str, task_id: str) bool[source]

Attempt to claim a task for a specific agent.

A task can only be claimed if it is in the PENDING state.

Parameters:
  • agent_id (str) – The fingerprint of the claiming agent.

  • task_id (str) – The ID of the task to claim.

Returns:

True if claim was successful, False otherwise.

Return type:

bool

cancel_task(task_id: str) bool[source]

Cancel a task. Moves it to CANCELLED state.

Parameters:

task_id – The task ID.

Returns:

True if the task was found and cancelled, False if not found or not cancellable (e.g. already COMPLETED, FAILED, or CANCELLED).

fail_task(task_id: str, reason: Any, *, agent_id: str | None = None) None[source]

Mark a task as FAILED with a reason.

Parameters:
  • task_id – The task ID.

  • reason – Error or failure descriptor (stored in task.result).

  • agent_id – If set, only the assigned agent can fail the task (same as report). If None, any caller can fail (e.g. system-level failure).

release_expired_claims(now_ts: float) list[str][source]

Release tasks that were claimed but not reported within claim_ttl_sec. Moves them back to PENDING and clears assigned_to / claimed_at.

Call this periodically (e.g. from runtime or a scheduler) with time.monotonic().

Returns:

List of task IDs that were released.

report(agent_id: str, task_id: str, result: Any) None[source]

Report the result of a completed task.

Parameters:
  • agent_id (str) – The fingerprint of the agent reporting the result.

  • task_id (str) – The ID of the completed task.

  • result (Any) – The result data/object.

get_task(task_id: str) Task | None[source]

Retrieve a task by its ID.

Parameters:

task_id (str) – The ID of the task.

Returns:

The Task instance, or None if not found.

Return type:

Optional[Task]

async wait_until_done(task_id: str, timeout: float) Task | None[source]

Wait until a task reaches a terminal state (COMPLETED, FAILED, CANCELLED) or timeout.

If the task is already terminal, returns immediately. Otherwise registers an asyncio.Event that is set when report(), fail_task(), or cancel_task() is called for this task_id, then waits up to timeout seconds.

Parameters:
  • task_id – The task ID to wait for.

  • timeout – Maximum seconds to wait.

Returns:

The Task if it reached a terminal state (or was already terminal), None on timeout or if the task is not found.

list_pending_tasks(*, refresh_from_store: bool = False) list[Task][source]

List all tasks currently in the PENDING state.

Returns:

A list of pending tasks.

Return type:

List[Task]

list_pending_tasks_for_agent(agent_id: str, pool_ids: list[str] | None = None, capabilities: list[str] | None = None, topics: list[Topic] | None = None, *, sort_by_priority: bool = False, refresh_from_store: bool = False) list[Task][source]

List pending tasks visible to an agent given its pool membership and capabilities.

Tasks are filtered so that: if a task has pool_id set, the agent must be in that pool (pool_id in pool_ids); if a task has required_capabilities, the agent must have all of them (required_capabilities subset of capabilities). When pool_ids or capabilities is None, that filter is not applied (backward compatible).

When topics is provided, only tasks whose topic is None (visible to all) or whose topic is in the given list are included. When sort_by_priority is True, tasks are returned in descending priority order with a stable tiebreak by task id.

Parameters:
  • agent_id (str) – The agent fingerprint (used for consistency; filtering is by pool_ids and capabilities).

  • pool_ids (List[str] | None) – Pool IDs the agent has joined. If None, pool_id filter is not applied.

  • capabilities (List[str] | None) – Capability names the agent has. If None, required_capabilities filter is not applied.

  • topics (List[Topic] | None) – If set, only tasks with no topic or with topic in this list are included.

  • sort_by_priority (bool) – If True, sort by task.priority descending then task.id.

Returns:

Pending tasks that the agent is allowed to see.

Return type:

List[Task]

class converge.coordination.negotiation.NegotiationState(value)[source]

Bases: Enum

PROPOSED = 'proposed'
COUNTERED = 'countered'
ACCEPTED = 'accepted'
REJECTED = 'rejected'
CLOSED = 'closed'
class converge.coordination.negotiation.Proposal(id: str = <factory>, proposer_id: str = '', content: Any = None, timestamp: float = 0.0)[source]

Bases: object

A proposal within a negotiation session.

id: str
proposer_id: str = ''
content: Any = None
timestamp: float = 0.0
class converge.coordination.negotiation.NegotiationSession(id: str = <factory>, participants: list[str] = <factory>, history: list[Proposal] = <factory>, state: NegotiationState = NegotiationState.PROPOSED, current_proposal: Proposal | None = None)[source]

Bases: object

Tracks the state of a negotiation between agents.

id: str
participants: list[str]
history: list[Proposal]
state: NegotiationState = 'proposed'
current_proposal: Proposal | None = None
class converge.coordination.negotiation.NegotiationProtocol[source]

Bases: object

Manages negotiation sessions and state transitions.

sessions: dict[str, NegotiationSession]
create_session(initiator_id: str, participants: list[str], initial_proposal: Any) str[source]

Start a new negotiation session.

Parameters:
  • initiator_id (str) – Fingerprint of the agent starting the session.

  • participants (List[str]) – List of other agents invited to negotiate.

  • initial_proposal (Any) – The initial content/offer.

Returns:

The unique ID of the new session.

Return type:

str

propose(session_id: str, agent_id: str, content: Any) bool[source]

Make a new proposal (or counter-proposal) in an existing session.

Parameters:
  • session_id (str) – The ID of the session.

  • agent_id (str) – The fingerprint of the agent making the proposal.

  • content (Any) – The content of the new proposal.

Returns:

True if proposal was accepted into the session, False otherwise.

Return type:

bool

accept(session_id: str, agent_id: str) bool[source]

Accept the current proposal.

Parameters:
  • session_id (str) – The ID of the session.

  • agent_id (str) – The fingerprint of the agent accepting.

Returns:

True if acceptance was recorded, False otherwise.

Return type:

bool

reject(session_id: str, agent_id: str) bool[source]

Reject the current proposal and close session.

Parameters:
  • session_id (str) – The ID of the session.

  • agent_id (str) – The fingerprint of the agent rejecting.

Returns:

True if rejection was recorded, False otherwise.

Return type:

bool

get_session(session_id: str) NegotiationSession | None[source]

Retrieve a session by ID.

Parameters:

session_id (str) – The ID of the session.

Returns:

The session object or None.

Return type:

Optional[NegotiationSession]

class converge.coordination.consensus.Consensus[source]

Bases: object

Basic consensus mechanisms for decision making.

static majority_vote(votes: list[Any]) Any[source]

Determine the winner by strict majority (> 50%).

Parameters:

votes (List[Any]) – A list of votes (any hashable type).

Returns:

The winning option, or None if no majority exists.

Return type:

Any

static plurality_vote(votes: list[Any]) Any[source]

Determine the winner by plurality (most votes).

Parameters:

votes (List[Any]) – A list of votes.

Returns:

The winning option, or None if there is a tie for first place.

Return type:

Any

class converge.coordination.bidding.AuctionType[source]

Bases: object

FIRST_PRICE_SEALED_BID = 'first_price_sealed_bid'
SECOND_PRICE_SEALED_BID = 'second_price_sealed_bid'
ENGLISH = 'english'
DUTCH = 'dutch'
class converge.coordination.bidding.BiddingProtocol(auction_type: str = 'first_price_sealed_bid')[source]

Bases: object

Manages auction-based coordination for task allocation or resource distribution.

Implements standard auction types and manages bid lifecycle.

__init__(auction_type: str = 'first_price_sealed_bid')[source]

Initialize the bidding protocol.

Parameters:

auction_type (str) – The type of auction to run.

bids: dict[str, float]
submit_bid(agent_id: str, amount: float, content: Any) bool[source]

Submit a bid for the active auction.

Parameters:
  • agent_id (str) – The bidder’s identity.

  • amount (float) – The bid amount (currency or token).

  • content (Any) – Additional bid details (e.g. promised SLA).

Returns:

True if bid accepted, False if rejected (e.g. auction closed).

Return type:

bool

resolve() str | None[source]

Determine the winner of the auction.

Returns:

The winning agent ID, or None if no bids.

Return type:

Optional[str]

class converge.coordination.delegation.DelegationType[source]

Bases: object

DIRECT = 'direct'
POOLED = 'pooled'
class converge.coordination.delegation.DelegationProtocol[source]

Bases: object

Manages the delegation of authority or tasks from one agent to another.

delegate(delegator_id: str, delegatee_id: str, scope: list[str]) str[source]

Create a new delegation mandate.

Parameters:
  • delegator_id (str) – The agent granting authority.

  • delegatee_id (str) – The agent receiving authority.

  • scope (List[str]) – List of scopes/permissions being delegated (e.g. tokens, topics).

Returns:

A unique ID for the delegation record.

Return type:

str

revoke(delegation_id: str) bool[source]

Revoke an active delegation.

Parameters:

delegation_id (str) – The ID of the delegation to revoke.

Returns:

True if revoked, False if not found or already inactive.

Return type:

bool