converge.core¶
Core abstractions: agent, identity, message, topic, task, pool, capability, store, and decisions. These types are used everywhere: identity and messages for communication, topics for routing and discovery, tasks and pools for coordination, capabilities for discovery, store for persistence, and decisions as the output of the agent’s decide() and input to the executor.
Task routing: Tasks can optionally set pool_id, topic, and required_capabilities. Only agents in the given pool and with the required capabilities see the task when the runtime uses scoped task listing (see coordination and runtime).
Task constraints: Conventional keys in task.constraints (enforced by custom logic if needed): timeout_sec, deadline, claim_ttl_sec, max_retries, cpu, memory_mb. See Task docstring and coordination docs for cancel, fail, and claim TTL.
Tools and actions: Agents can emit an InvokeTool decision (tool_name, params). The runtime’s executor looks up the tool in an optional ToolRegistry (see converge.core.tools) and runs it; implement the Tool protocol (name, run(params)).
- class converge.core.agent.AgentState(value)[source]¶
Bases:
Enum- IDLE = 'idle'¶
- BUSY = 'busy'¶
- OFFLINE = 'offline'¶
- ERROR = 'error'¶
- class converge.core.agent.Agent(identity: Identity)[source]¶
Bases:
objectAutonomous computational entity that interacts with the network and executes decisions.
- topics¶
List of topics this agent is interested in or manages.
- Type:
List[Any]
- state¶
Current operational state of the agent.
- Type:
- pool_manager¶
Reference to the pool manager implementation.
- Type:
Optional[Any]
- task_manager¶
Reference to the task manager implementation.
- Type:
Optional[Any]
- __init__(identity: Identity)[source]¶
Initialize a new Agent instance.
- Parameters:
identity (Identity) – The cryptographic identity for this agent.
- property id: str¶
Get the unique agent identifier (fingerprint).
- Returns:
The agent’s identity fingerprint.
- Return type:
- decide(messages: list[Any], tasks: list[Any], **kwargs: Any) list[Any][source]¶
The core decision loop for the agent.
This method processes incoming messages and task updates to produce a list of decisions (actions) to be executed by the runtime.
- class converge.core.identity.Identity(public_key: bytes, private_key: bytes | None, fingerprint: str)[source]¶
Bases:
objectCryptographic identity for an agent.
Serves as the root of trust for an agent, enabling message signing and verification. Identities are immutable and derived from Ed25519 keypairs.
- class converge.core.message.Message(id: str = <factory>, sender: str = <factory>, recipient: str | None = None, topics: list[Topic] = <factory>, payload: dict[str, ~typing.Any]=<factory>, task_id: str | None = None, timestamp: int = <factory>, signature: bytes = b'')[source]¶
Bases:
objectA cryptographically signed, immutable communication unit.
Payload can be arbitrary. For streaming or long-running responses, conventions: use
payload["streaming"]for streamed content orpayload["progress"]for progress updates (e.g. for UIs).- sign(identity: Identity) Message[source]¶
Sign the message using the sender’s identity. Returns a new Message instance with the signature populated.
- verify(sender_public_key: bytes) bool[source]¶
Verify the message signature against the sender’s public key.
- encrypt_payload(key: bytes) Message[source]¶
Encrypt the payload using AES-256-GCM. Returns a new Message with encrypted payload. Requires converge.extensions.crypto.symmetric.
- class converge.core.topic.Topic(namespace: str, attributes: dict[str, ~typing.Any]=<factory>, version: str = '1.0')[source]¶
Bases:
objectTopic for routing and semantic filtering.
- class converge.core.task.TaskState(value)[source]¶
Bases:
Enum- PENDING = 'pending'¶
- ASSIGNED = 'assigned'¶
- RUNNING = 'running'¶
- COMPLETED = 'completed'¶
- FAILED = 'failed'¶
- CANCELLED = 'cancelled'¶
- class converge.core.task.Task(id: str = <factory>, objective: dict[str, ~typing.Any]=<factory>, inputs: dict[str, ~typing.Any]=<factory>, outputs: dict[str, ~typing.Any] | None=None, constraints: dict[str, ~typing.Any]=<factory>, evaluator: str = 'default', state: TaskState = TaskState.PENDING, assigned_to: str | None = None, result: Any | None = None, claimed_at: float | None = None, pool_id: str | None = None, topic: Topic | None = None, priority: int = 0, required_capabilities: list[str] = <factory>)[source]¶
Bases:
objectA formally defined unit of work with clear objectives, inputs, and constraints.
- custom logic if needed)
timeout_sec, deadline (iso or unix), claim_ttl_sec (seconds
- after claim before task returns to PENDING if not reported), max_retries, cpu, memory_mb.
- result¶
The final output or error descriptor.
- Type:
Optional[Any]
- topic¶
If set, used for routing; only agents matching this topic see the task.
- Type:
Optional[Topic]
- priority¶
Routing priority; higher value means higher priority when listing/sorting (default 0).
- Type:
- class converge.core.pool.Pool(id: str = <factory>, topics: list[Topic] = <factory>, admission_policy: dict[str, ~typing.Any]=<factory>, admission_policy_instance: Any = None, governance: dict[str, ~typing.Any]=<factory>, governance_model: Any = None, agents: set[str] = <factory>, trust_model: Any = None, trust_threshold: float = 0.0)[source]¶
Bases:
objectA scoped sub-network of agents organizing around shared topics or goals.
- class converge.core.capability.Capability(name: str, version: str, description: str, constraints: dict[str, ~typing.Any]=<factory>, costs: dict[str, float]=<factory>, latency_ms: int = 0)[source]¶
Bases:
objectDefines a specific ability or tool an agent possesses.
- class converge.core.capability.CapabilitySet(capabilities: list[Capability] = <factory>)[source]¶
Bases:
objectA collection of capabilities possessed by an agent.
- capabilities: list[Capability]¶
- add(capability: Capability) None[source]¶
Add a capability to the set.
- class converge.core.store.Store[source]¶
Bases:
ABCAbstract base class for persistence.
Optional put_if_absent: Override for atomic put-when-absent; default implementation is not atomic (get then put). Backends that need safe concurrency should override.
- class converge.core.decisions.SendMessage(message: Message)[source]¶
Bases:
DecisionDecision to send a single message. Carries the Message to send.
- class converge.core.decisions.SubmitTask(task: converge.core.task.Task)[source]¶
Bases:
Decision
- class converge.core.decisions.SubmitBid(auction_id: str, amount: float, content: Any = None)[source]¶
Bases:
DecisionSubmit a bid to an auction. Executor calls BiddingProtocol.submit_bid.
- class converge.core.decisions.Vote(vote_id: str, option: Any)[source]¶
Bases:
DecisionRecord a vote for a vote_id. Executor records (agent_id, option) for later resolution.
- class converge.core.decisions.Propose(session_id: str, proposal_content: Any)[source]¶
Bases:
DecisionMake or counter a proposal in a negotiation session. Executor calls NegotiationProtocol.propose.
- class converge.core.decisions.AcceptProposal(session_id: str)[source]¶
Bases:
DecisionAccept the current proposal in a session. Executor calls NegotiationProtocol.accept.
- class converge.core.decisions.RejectProposal(session_id: str)[source]¶
Bases:
DecisionReject the current proposal. Executor calls NegotiationProtocol.reject.
- class converge.core.decisions.Delegate(delegatee_id: str, scope: list[str])[source]¶
Bases:
DecisionCreate a delegation to another agent. Executor calls DelegationProtocol.delegate.
- class converge.core.decisions.RevokeDelegation(delegation_id: str)[source]¶
Bases:
DecisionRevoke a delegation. Executor calls DelegationProtocol.revoke.
- class converge.core.decisions.InvokeTool(tool_name: str, params: dict[str, Any])[source]¶
Bases:
DecisionInvoke a registered tool by name with the given parameters. Executor runs the tool and may attach the result to a message or ReportTask.
Tool protocol and registry for agent tool execution.
- class converge.core.tools.Tool(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for executable tools. Agents emit InvokeTool decisions; the executor looks up the tool by name and runs it. Optional: implement a
schemaproperty returning a JSON Schema dict for params (used for provider tool definitions and prompt injection).
- converge.core.tools.get_tool_schema(tool: Tool) dict[str, Any] | None[source]¶
Return the tool’s param schema (JSON Schema) if defined, else None.
- class converge.core.tools.ToolRegistry[source]¶
Bases:
objectRegistry mapping tool names to Tool instances. Used by StandardExecutor to execute InvokeTool decisions.
- to_provider_tools() list[dict[str, Any]][source]¶
Return tool definitions in OpenAI-compatible format for provider APIs. Each tool has type “function”, “function.name”, “function.description”, “function.parameters” (JSON Schema). Tools without a schema get a generic parameters schema accepting an object.