converge.network

Network facade, discovery service, identity registry, and transport layer. AgentNetwork wraps a transport and local agent set and exposes send, broadcast, and discover. build_descriptor(agent) builds an AgentDescriptor from an agent (id, topics, capabilities) for use with discovery. DiscoveryService holds agent descriptors and answers queries by topics and capabilities; it can persist descriptors via a store. Optional trust_model enables trust-aware filtering: when set and query.trust_threshold > 0, only agents with trust score >= threshold are returned. query(query, candidates=None, refresh_from_store=False): when candidates is None, the service uses its own descriptors (optionally reloaded from store when refresh_from_store is True); pass a list to use an explicit candidate set. When AgentRuntime is constructed with discovery_service (and optionally agent_descriptor), the runtime registers the agent on start and unregisters it on stop so peers can discover it. IdentityRegistry maps agent ids to public keys for signature verification. Transports (base, local, TCP, optional WebSocket) implement start/stop, send, and receive; HookedTransport adds reusable pre_send/post_receive middleware.

converge.network.network.build_descriptor(agent: Agent) AgentDescriptor[source]

Build an AgentDescriptor from an Agent for discovery registration.

Uses the agent’s id, topics, and capabilities. If capabilities are strings, they are converted to Capability instances with default version and description.

Parameters:

agent – The agent to describe.

Returns:

AgentDescriptor suitable for DiscoveryService.register().

class converge.network.network.AgentNetwork(transport: Transport)[source]

Bases: object

Manages agent registration and message routing.

register_agent(agent: Agent) None[source]
unregister_agent(agent_id: str) None[source]
async send(message: Message) None[source]
async broadcast(message: Message) None[source]
discover(query: DiscoveryQuery) list[AgentDescriptor][source]

Registry mapping agent fingerprints to public keys for message verification.

class converge.network.identity_registry.IdentityRegistry[source]

Bases: object

Maps agent IDs (fingerprints) to Ed25519 public keys. Used by transports to verify message signatures.

register(agent_id: str, public_key: bytes) None[source]

Register an agent’s public key.

unregister(agent_id: str) None[source]

Remove an agent from the registry.

get(agent_id: str) bytes | None[source]

Get the public key for an agent, or None if not found.

Discovery service for finding agents by topic and capability.

Agents register with the discovery service on runtime start() and unregister on stop(). Peers query by DiscoveryQuery (topics, capabilities) and receive matching AgentDescriptors. When a store is provided, descriptors are persisted so they survive process restarts. AgentDescriptor can include public_key for message verification; callers that want verified receive can populate IdentityRegistry from query results (identity_registry.register(d.id, d.public_key)).

class converge.network.discovery.DiscoveryQuery(topics: list[Topic] = <factory>, capabilities: list[str] = <factory>, trust_threshold: float = 0.0)[source]

Bases: object

Query criteria for discovery: topics, capabilities, and optional trust threshold.

topics: list[Topic]
capabilities: list[str]
trust_threshold: float = 0.0
class converge.network.discovery.AgentDescriptor(id: str, topics: list[Topic], capabilities: list[Capability], public_key: bytes | None = None)[source]

Bases: object

Descriptor of an agent for discovery and optional verification.

id, topics, and capabilities are used for query matching. public_key, when set, is used by peers to verify message signatures (register in IdentityRegistry).

id: str
topics: list[Topic]
capabilities: list[Capability]
public_key: bytes | None = None
to_dict() dict[str, Any][source]

Serialize for persistence. public_key is base64-encoded if present.

classmethod from_dict(data: dict[str, Any]) AgentDescriptor[source]

Deserialize from stored dict. public_key is decoded from base64 if present.

class converge.network.discovery.DiscoveryService(store: Store | None = None, trust_model: TrustModel | None = None)[source]

Bases: object

Service for discovering agents based on queries. Persists AgentDescriptors in Store when provided.

__init__(store: Store | None = None, trust_model: TrustModel | None = None)[source]

Initialize the discovery service.

Parameters:
  • store – Optional store for persisting descriptors across restarts.

  • trust_model – Optional TrustModel for trust-aware filtering when query.trust_threshold > 0. When set, query() filters out agents whose trust score is below the threshold.

descriptors: dict[str, AgentDescriptor]
register(descriptor: AgentDescriptor) None[source]
unregister(agent_id: str) None[source]
query(query: DiscoveryQuery, candidates: list[AgentDescriptor] | None = None, *, refresh_from_store: bool = False) list[AgentDescriptor][source]

Filter agent candidates based on a discovery query.

When candidates is None, uses the service’s own descriptors (optionally refreshed from store when refresh_from_store is True and a store is set). When query.trust_threshold > 0 and trust_model is set, only agents with trust score >= threshold are returned.

Parameters:
  • query – The criteria for discovery (topics, capabilities, trust_threshold).

  • candidates – Optional list of agents to search within. If None, uses self.descriptors (after optional refresh from store).

  • refresh_from_store – If True and store is set, reload descriptors from store before filtering. Use in multi-process setups to see agents registered by other processes.

Returns:

List of agents that match the query criteria (topics, capabilities, and trust when trust_threshold > 0 and trust_model is set).

class converge.network.transport.base.Transport[source]

Bases: ABC

Abstract base class for transports. Transports are hot-swappable, stateless, and observable.

abstractmethod async send(message: Message) None[source]

Send a message.

abstractmethod async receive(timeout: float | None = None) Message[source]

Receive a message. If timeout is set, return within that many seconds or raise TimeoutError.

async receive_verified(identity_registry: IdentityRegistry, timeout: float | None = None) Message | None[source]

Receive a message and verify its signature. Returns the message if verified, or None if verification fails. If timeout is set, receive must complete within that time or TimeoutError is raised. Default implementation receives and verifies via Message.verify().

abstractmethod async start() None[source]

Start the transport.

abstractmethod async stop() None[source]

Stop the transport.

class converge.network.transport.local.LocalTransportRegistry[source]

Bases: object

Shared registry for local transports to find each other. Supports point-to-point (recipient) and topic-based routing.

register(agent_id: str, queue: Queue) None[source]
unregister(agent_id: str) None[source]
get_queue(agent_id: str) Queue | None[source]
subscribe(agent_id: str, topic_namespace: str) None[source]
unsubscribe(agent_id: str, topic_namespace: str) None[source]
get_subscribers_for_topics(topic_namespaces: list[str]) set[str][source]
clear() None[source]

Clear all registered queues (useful for testing).

class converge.network.transport.local.LocalTransport(agent_id: str)[source]

Bases: Transport

Transport for in-process memory communication.

async start() None[source]

Start the transport.

async stop() None[source]

Stop the transport.

async send(message: Message) None[source]

Send a message.

async receive(timeout: float | None = None) Message[source]

Receive a message. If timeout is set, return within that many seconds or raise TimeoutError.

class converge.network.transport.hooks.MessageHook(*args, **kwargs)[source]

Bases: Protocol

pre_send(message: Message) Message | None[source]
post_receive(message: Message) Message | None[source]
on_error(stage: str, error: Exception, context: dict[str, Any]) None[source]
class converge.network.transport.hooks.HookedTransport(base_transport: Transport, hooks: list[MessageHook] | None = None)[source]

Bases: Transport

Transport wrapper that applies message hooks around send/receive.

async start() None[source]

Start the transport.

async stop() None[source]

Stop the transport.

async send(message: Message) None[source]

Send a message.

async receive(timeout: float | None = None) Message[source]

Receive a message. If timeout is set, return within that many seconds or raise TimeoutError.

async receive_verified(identity_registry, timeout: float | None = None) Message | None[source]

Receive a message and verify its signature. Returns the message if verified, or None if verification fails. If timeout is set, receive must complete within that time or TimeoutError is raised. Default implementation receives and verifies via Message.verify().

class converge.network.transport.tcp.TcpTransport(host: str, port: int, identity_fingerprint: str, *, ssl_context: SSLContext | None = None)[source]

Bases: Transport

TCP Transport using asyncio. Uses length-prefixed framing: [4 bytes length][payload]. Supports optional TLS via ssl_context.

async start() None[source]

Start the transport.

async stop() None[source]

Stop the transport.

async send(message: Message) None[source]

Send a message.

async receive(timeout: float | None = None) Message[source]

Receive a message. If timeout is set, return within that many seconds or raise TimeoutError.