converge.core

Core abstractions: agent, identity, message, topic, task, pool, capability, store, and decisions. These types are used everywhere: identity and messages for communication, topics for routing and discovery, tasks and pools for coordination, capabilities for discovery, store for persistence, and decisions as the output of the agent’s decide() and input to the executor.

Task routing: Tasks can optionally set pool_id, topic, and required_capabilities. Only agents in the given pool and with the required capabilities see the task when the runtime uses scoped task listing (see coordination and runtime).

Task constraints: Conventional keys in task.constraints (enforced by custom logic if needed): timeout_sec, deadline, claim_ttl_sec, max_retries, cpu, memory_mb. See Task docstring and coordination docs for cancel, fail, and claim TTL.

Tools and actions: Agents can emit an InvokeTool decision (tool_name, params). The runtime’s executor looks up the tool in an optional ToolRegistry (see converge.core.tools) and runs it; implement the Tool protocol (name, run(params)).

class converge.core.agent.AgentState(value)[source]

Bases: Enum

IDLE = 'idle'
BUSY = 'busy'
OFFLINE = 'offline'
ERROR = 'error'
class converge.core.agent.Agent(identity: Identity)[source]

Bases: object

Autonomous computational entity that interacts with the network and executes decisions.

identity

Cryptographic identity of the agent.

Type:

Identity

id

Unique fingerprint of the agent identity.

Type:

str

capabilities

List of capabilities this agent possesses.

Type:

List[str]

topics

List of topics this agent is interested in or manages.

Type:

List[Any]

state

Current operational state of the agent.

Type:

AgentState

pool_manager

Reference to the pool manager implementation.

Type:

Optional[Any]

task_manager

Reference to the task manager implementation.

Type:

Optional[Any]

__init__(identity: Identity)[source]

Initialize a new Agent instance.

Parameters:

identity (Identity) – The cryptographic identity for this agent.

capabilities: list[str]
topics: list[Any]
property id: str

Get the unique agent identifier (fingerprint).

Returns:

The agent’s identity fingerprint.

Return type:

str

decide(messages: list[Any], tasks: list[Any], **kwargs: Any) list[Any][source]

The core decision loop for the agent.

This method processes incoming messages and task updates to produce a list of decisions (actions) to be executed by the runtime.

Parameters:
  • messages (list[Any]) – Validated messages from the inbox.

  • tasks (list[Any]) – Task updates or assignment requests.

  • **kwargs – Optional context, e.g. tool_observations (list[dict]) for ReAct tool loop.

Returns:

A list of Decision objects or Messages to send.

Return type:

list[Any]

on_start() None[source]

Called when the agent runtime starts. Override for setup logic.

on_stop() None[source]

Called when the agent runtime stops. Override for cleanup logic.

on_tick(messages: list[Any], tasks: list[Any]) None[source]

Called each loop iteration before decide. Override for per-tick logic.

sign_message(message: Any) Any[source]
class converge.core.identity.Identity(public_key: bytes, private_key: bytes | None, fingerprint: str)[source]

Bases: object

Cryptographic identity for an agent.

Serves as the root of trust for an agent, enabling message signing and verification. Identities are immutable and derived from Ed25519 keypairs.

public_key

Raw Ed25519 public key.

Type:

bytes

private_key

Raw Ed25519 private key (None if verifying others).

Type:

Optional[bytes]

fingerprint

Hex-encoded SHA-256 hash of the public key.

Type:

str

public_key: bytes
private_key: bytes | None
fingerprint: str
classmethod generate() Identity[source]

Generate a new random identity using Ed25519.

Returns:

A new identity with both public and private keys.

Return type:

Identity

classmethod from_public_key(public_key_bytes: bytes) Identity[source]

Create an identity from a known public key (e.g. for verifying others).

Parameters:

public_key_bytes (bytes) – The raw public key bytes.

Returns:

An identity instance (without private key).

Return type:

Identity

class converge.core.message.Message(id: str = <factory>, sender: str = <factory>, recipient: str | None = None, topics: list[Topic] = <factory>, payload: dict[str, ~typing.Any]=<factory>, task_id: str | None = None, timestamp: int = <factory>, signature: bytes = b'')[source]

Bases: object

A cryptographically signed, immutable communication unit.

Payload can be arbitrary. For streaming or long-running responses, conventions: use payload["streaming"] for streamed content or payload["progress"] for progress updates (e.g. for UIs).

id

Unique message identifier.

Type:

str

sender

Fingerprint of the sending agent.

Type:

str

topics

Topics this message is routed to.

Type:

List[Topic]

payload

The content of the message.

Type:

Dict[str, Any]

task_id

Reference to a specific task context.

Type:

Optional[str]

timestamp

Unix timestamp in milliseconds.

Type:

int

signature

Ed25519 signature of the message content.

Type:

bytes

id: str
sender: str
recipient: str | None = None
topics: list[Topic]
payload: dict[str, Any]
task_id: str | None = None
timestamp: int
signature: bytes = b''
sign(identity: Identity) Message[source]

Sign the message using the sender’s identity. Returns a new Message instance with the signature populated.

verify(sender_public_key: bytes) bool[source]

Verify the message signature against the sender’s public key.

to_bytes() bytes[source]

Serialize message to bytes using msgpack.

classmethod from_bytes(data: bytes) Message[source]

Deserialize message from msgpack bytes.

to_dict() dict[str, Any][source]
encrypt_payload(key: bytes) Message[source]

Encrypt the payload using AES-256-GCM. Returns a new Message with encrypted payload. Requires converge.extensions.crypto.symmetric.

decrypt_payload(key: bytes) Message[source]

Decrypt the payload if it was encrypted with encrypt_payload. Returns a new Message with decrypted payload.

classmethod from_dict(data: dict[str, Any]) Message[source]
class converge.core.topic.Topic(namespace: str, attributes: dict[str, ~typing.Any]=<factory>, version: str = '1.0')[source]

Bases: object

Topic for routing and semantic filtering.

namespace: str
attributes: dict[str, Any]
version: str = '1.0'
to_dict() dict[str, Any][source]
classmethod from_dict(data: dict[str, Any]) Topic[source]
class converge.core.task.TaskState(value)[source]

Bases: Enum

PENDING = 'pending'
ASSIGNED = 'assigned'
RUNNING = 'running'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELLED = 'cancelled'
class converge.core.task.Task(id: str = <factory>, objective: dict[str, ~typing.Any]=<factory>, inputs: dict[str, ~typing.Any]=<factory>, outputs: dict[str, ~typing.Any] | None=None, constraints: dict[str, ~typing.Any]=<factory>, evaluator: str = 'default', state: TaskState = TaskState.PENDING, assigned_to: str | None = None, result: Any | None = None, claimed_at: float | None = None, pool_id: str | None = None, topic: Topic | None = None, priority: int = 0, required_capabilities: list[str] = <factory>)[source]

Bases: object

A formally defined unit of work with clear objectives, inputs, and constraints.

id

Unique identifier for the task.

Type:

str

objective

Structural description of the goal.

Type:

Dict[str, Any]

inputs

Data required to execute the task.

Type:

Dict[str, Any]

outputs

Resulting data after execution.

Type:

Optional[Dict[str, Any]]

constraints

Limitations or requirements. Conventional keys (enforced by

Type:

Dict[str, Any]

custom logic if needed)

timeout_sec, deadline (iso or unix), claim_ttl_sec (seconds

after claim before task returns to PENDING if not reported), max_retries, cpu, memory_mb.
evaluator

Identifier for the mechanism to validate results.

Type:

str

state

Current lifecycle state of the task.

Type:

TaskState

assigned_to

Fingerprint of the agent assigned to this task.

Type:

Optional[str]

result

The final output or error descriptor.

Type:

Optional[Any]

pool_id

If set, only agents in this pool should see the task (routing).

Type:

Optional[str]

topic

If set, used for routing; only agents matching this topic see the task.

Type:

Optional[Topic]

priority

Routing priority; higher value means higher priority when listing/sorting (default 0).

Type:

int

required_capabilities

If set, only agents with all these capabilities see the task.

Type:

List[str]

id: str
objective: dict[str, Any]
inputs: dict[str, Any]
outputs: dict[str, Any] | None = None
constraints: dict[str, Any]
evaluator: str = 'default'
state: TaskState = 'pending'
assigned_to: str | None = None
result: Any | None = None
claimed_at: float | None = None
pool_id: str | None = None
topic: Topic | None = None
priority: int = 0
required_capabilities: list[str]
class converge.core.pool.Pool(id: str = <factory>, topics: list[Topic] = <factory>, admission_policy: dict[str, ~typing.Any]=<factory>, admission_policy_instance: Any = None, governance: dict[str, ~typing.Any]=<factory>, governance_model: Any = None, agents: set[str] = <factory>, trust_model: Any = None, trust_threshold: float = 0.0)[source]

Bases: object

A scoped sub-network of agents organizing around shared topics or goals.

id

Unique identifier for the pool.

Type:

str

topics

Topics associated with this pool.

Type:

List[Topic]

admission_policy

Rules for agent admission.

Type:

Dict[str, Any]

governance

Rules for decision making within the pool.

Type:

Dict[str, Any]

agents

Set of AgentIDs (fingerprints) currently in the pool.

Type:

Set[str]

id: str
topics: list[Topic]
admission_policy: dict[str, Any]
admission_policy_instance: Any = None
governance: dict[str, Any]
governance_model: Any = None
agents: set[str]
trust_model: Any = None
trust_threshold: float = 0.0
add_agent(agent_id: str) None[source]

Add an agent to the pool.

remove_agent(agent_id: str) None[source]

Remove an agent from the pool.

class converge.core.capability.Capability(name: str, version: str, description: str, constraints: dict[str, ~typing.Any]=<factory>, costs: dict[str, float]=<factory>, latency_ms: int = 0)[source]

Bases: object

Defines a specific ability or tool an agent possesses.

name

Unique name of the capability.

Type:

str

version

Semantic version string.

Type:

str

description

Human-readable description.

Type:

str

constraints

Usage limitations.

Type:

Dict[str, Any]

costs

Resource costs associated with usage.

Type:

Dict[str, float]

latency_ms

Expected execution latency.

Type:

int

name: str
version: str
description: str
constraints: dict[str, Any]
costs: dict[str, float]
latency_ms: int = 0
class converge.core.capability.CapabilitySet(capabilities: list[Capability] = <factory>)[source]

Bases: object

A collection of capabilities possessed by an agent.

capabilities: list[Capability]
add(capability: Capability) None[source]

Add a capability to the set.

has(name: str) bool[source]

Check if a capability exists by name.

class converge.core.store.Store[source]

Bases: ABC

Abstract base class for persistence.

Optional put_if_absent: Override for atomic put-when-absent; default implementation is not atomic (get then put). Backends that need safe concurrency should override.

atomic_put_if_absent: bool = False
supports_locking: bool = False
abstractmethod put(key: str, value: Any) None[source]

Store a value.

abstractmethod get(key: str) Any | None[source]

Retrieve a value.

abstractmethod delete(key: str) None[source]

Delete a value.

abstractmethod list(prefix: str = '') list[str][source]

List keys starting with prefix.

put_if_absent(key: str, value: Any) bool[source]

Store value only if key is absent. Return True if stored, False if key existed. Default implementation is not atomic; backends should override for atomicity.

class converge.core.decisions.Decision[source]

Bases: object

Base class for agent decisions.

class converge.core.decisions.SendMessage(message: Message)[source]

Bases: Decision

Decision to send a single message. Carries the Message to send.

message: Message
class converge.core.decisions.JoinPool(pool_id: str)[source]

Bases: Decision

pool_id: str
class converge.core.decisions.LeavePool(pool_id: str)[source]

Bases: Decision

pool_id: str
class converge.core.decisions.CreatePool(spec: dict[str, Any])[source]

Bases: Decision

spec: dict[str, Any]
class converge.core.decisions.SubmitTask(task: converge.core.task.Task)[source]

Bases: Decision

task: Task
class converge.core.decisions.ClaimTask(task_id: str)[source]

Bases: Decision

task_id: str
class converge.core.decisions.ReportTask(task_id: str, result: Any)[source]

Bases: Decision

task_id: str
result: Any
class converge.core.decisions.SubmitBid(auction_id: str, amount: float, content: Any = None)[source]

Bases: Decision

Submit a bid to an auction. Executor calls BiddingProtocol.submit_bid.

auction_id: str
amount: float
content: Any = None
class converge.core.decisions.Vote(vote_id: str, option: Any)[source]

Bases: Decision

Record a vote for a vote_id. Executor records (agent_id, option) for later resolution.

vote_id: str
option: Any
class converge.core.decisions.Propose(session_id: str, proposal_content: Any)[source]

Bases: Decision

Make or counter a proposal in a negotiation session. Executor calls NegotiationProtocol.propose.

session_id: str
proposal_content: Any
class converge.core.decisions.AcceptProposal(session_id: str)[source]

Bases: Decision

Accept the current proposal in a session. Executor calls NegotiationProtocol.accept.

session_id: str
class converge.core.decisions.RejectProposal(session_id: str)[source]

Bases: Decision

Reject the current proposal. Executor calls NegotiationProtocol.reject.

session_id: str
class converge.core.decisions.Delegate(delegatee_id: str, scope: list[str])[source]

Bases: Decision

Create a delegation to another agent. Executor calls DelegationProtocol.delegate.

delegatee_id: str
scope: list[str]
class converge.core.decisions.RevokeDelegation(delegation_id: str)[source]

Bases: Decision

Revoke a delegation. Executor calls DelegationProtocol.revoke.

delegation_id: str
class converge.core.decisions.InvokeTool(tool_name: str, params: dict[str, Any])[source]

Bases: Decision

Invoke a registered tool by name with the given parameters. Executor runs the tool and may attach the result to a message or ReportTask.

tool_name: str
params: dict[str, Any]

Tool protocol and registry for agent tool execution.

class converge.core.tools.Tool(*args, **kwargs)[source]

Bases: Protocol

Protocol for executable tools. Agents emit InvokeTool decisions; the executor looks up the tool by name and runs it. Optional: implement a schema property returning a JSON Schema dict for params (used for provider tool definitions and prompt injection).

property name: str

Tool name used in InvokeTool.tool_name.

run(params: dict[str, Any]) Any[source]

Run the tool with the given parameters.

Parameters:

params – Key-value arguments for the tool.

Returns:

Result of the tool (e.g. str, dict, or serializable value).

converge.core.tools.get_tool_schema(tool: Tool) dict[str, Any] | None[source]

Return the tool’s param schema (JSON Schema) if defined, else None.

class converge.core.tools.ToolRegistry[source]

Bases: object

Registry mapping tool names to Tool instances. Used by StandardExecutor to execute InvokeTool decisions.

register(tool: Tool) None[source]

Register a tool by its name.

get(name: str) Tool | None[source]

Get a tool by name, or None if not registered.

list_names() list[str][source]

Return all registered tool names.

to_provider_tools() list[dict[str, Any]][source]

Return tool definitions in OpenAI-compatible format for provider APIs. Each tool has type “function”, “function.name”, “function.description”, “function.parameters” (JSON Schema). Tools without a schema get a generic parameters schema accepting an object.