converge.network¶
Network facade, discovery service, identity registry, and transport layer. AgentNetwork wraps a transport and local agent set and exposes send, broadcast, and discover. build_descriptor(agent) builds an AgentDescriptor from an agent (id, topics, capabilities) for use with discovery. DiscoveryService holds agent descriptors and answers queries by topics and capabilities; it can persist descriptors via a store. Optional trust_model enables trust-aware filtering: when set and query.trust_threshold > 0, only agents with trust score >= threshold are returned. query(query, candidates=None, refresh_from_store=False): when candidates is None, the service uses its own descriptors (optionally reloaded from store when refresh_from_store is True); pass a list to use an explicit candidate set. When AgentRuntime is constructed with discovery_service (and optionally agent_descriptor), the runtime registers the agent on start and unregisters it on stop so peers can discover it. IdentityRegistry maps agent ids to public keys for signature verification. Transports (base, local, TCP, optional WebSocket) implement start/stop, send, and receive; HookedTransport adds reusable pre_send/post_receive middleware.
- converge.network.network.build_descriptor(agent: Agent) AgentDescriptor[source]¶
Build an AgentDescriptor from an Agent for discovery registration.
Uses the agent’s id, topics, and capabilities. If capabilities are strings, they are converted to Capability instances with default version and description.
- Parameters:
agent – The agent to describe.
- Returns:
AgentDescriptor suitable for DiscoveryService.register().
- class converge.network.network.AgentNetwork(transport: Transport)[source]¶
Bases:
objectManages agent registration and message routing.
- discover(query: DiscoveryQuery) list[AgentDescriptor][source]¶
Registry mapping agent fingerprints to public keys for message verification.
- class converge.network.identity_registry.IdentityRegistry[source]¶
Bases:
objectMaps agent IDs (fingerprints) to Ed25519 public keys. Used by transports to verify message signatures.
Discovery service for finding agents by topic and capability.
Agents register with the discovery service on runtime start() and unregister on stop(). Peers query by DiscoveryQuery (topics, capabilities) and receive matching AgentDescriptors. When a store is provided, descriptors are persisted so they survive process restarts. AgentDescriptor can include public_key for message verification; callers that want verified receive can populate IdentityRegistry from query results (identity_registry.register(d.id, d.public_key)).
- class converge.network.discovery.DiscoveryQuery(topics: list[Topic] = <factory>, capabilities: list[str] = <factory>, trust_threshold: float = 0.0)[source]¶
Bases:
objectQuery criteria for discovery: topics, capabilities, and optional trust threshold.
- class converge.network.discovery.AgentDescriptor(id: str, topics: list[Topic], capabilities: list[Capability], public_key: bytes | None = None)[source]¶
Bases:
objectDescriptor of an agent for discovery and optional verification.
id, topics, and capabilities are used for query matching. public_key, when set, is used by peers to verify message signatures (register in IdentityRegistry).
- capabilities: list[Capability]¶
- class converge.network.discovery.DiscoveryService(store: Store | None = None, trust_model: TrustModel | None = None)[source]¶
Bases:
objectService for discovering agents based on queries. Persists AgentDescriptors in Store when provided.
- __init__(store: Store | None = None, trust_model: TrustModel | None = None)[source]¶
Initialize the discovery service.
- Parameters:
store – Optional store for persisting descriptors across restarts.
trust_model – Optional TrustModel for trust-aware filtering when query.trust_threshold > 0. When set, query() filters out agents whose trust score is below the threshold.
- descriptors: dict[str, AgentDescriptor]¶
- register(descriptor: AgentDescriptor) None[source]¶
- query(query: DiscoveryQuery, candidates: list[AgentDescriptor] | None = None, *, refresh_from_store: bool = False) list[AgentDescriptor][source]¶
Filter agent candidates based on a discovery query.
When candidates is None, uses the service’s own descriptors (optionally refreshed from store when refresh_from_store is True and a store is set). When query.trust_threshold > 0 and trust_model is set, only agents with trust score >= threshold are returned.
- Parameters:
query – The criteria for discovery (topics, capabilities, trust_threshold).
candidates – Optional list of agents to search within. If None, uses self.descriptors (after optional refresh from store).
refresh_from_store – If True and store is set, reload descriptors from store before filtering. Use in multi-process setups to see agents registered by other processes.
- Returns:
List of agents that match the query criteria (topics, capabilities, and trust when trust_threshold > 0 and trust_model is set).
- class converge.network.transport.base.Transport[source]¶
Bases:
ABCAbstract base class for transports. Transports are hot-swappable, stateless, and observable.
- abstractmethod async receive(timeout: float | None = None) Message[source]¶
Receive a message. If timeout is set, return within that many seconds or raise TimeoutError.
- async receive_verified(identity_registry: IdentityRegistry, timeout: float | None = None) Message | None[source]¶
Receive a message and verify its signature. Returns the message if verified, or None if verification fails. If timeout is set, receive must complete within that time or TimeoutError is raised. Default implementation receives and verifies via Message.verify().
- class converge.network.transport.local.LocalTransportRegistry[source]¶
Bases:
objectShared registry for local transports to find each other. Supports point-to-point (recipient) and topic-based routing.
- class converge.network.transport.local.LocalTransport(agent_id: str)[source]¶
Bases:
TransportTransport for in-process memory communication.
- class converge.network.transport.hooks.HookedTransport(base_transport: Transport, hooks: list[MessageHook] | None = None)[source]¶
Bases:
TransportTransport wrapper that applies message hooks around send/receive.
- async receive(timeout: float | None = None) Message[source]¶
Receive a message. If timeout is set, return within that many seconds or raise TimeoutError.
- async receive_verified(identity_registry, timeout: float | None = None) Message | None[source]¶
Receive a message and verify its signature. Returns the message if verified, or None if verification fails. If timeout is set, receive must complete within that time or TimeoutError is raised. Default implementation receives and verifies via Message.verify().