"""
Discovery service for finding agents by topic and capability.
Agents register with the discovery service on runtime start() and unregister
on stop(). Peers query by DiscoveryQuery (topics, capabilities) and receive
matching AgentDescriptors. When a store is provided, descriptors are persisted
so they survive process restarts. AgentDescriptor can include public_key for
message verification; callers that want verified receive can populate
IdentityRegistry from query results (identity_registry.register(d.id, d.public_key)).
"""
import base64
import logging
from dataclasses import dataclass, field
from typing import Any
from converge.core.capability import Capability
from converge.core.store import Store
from converge.core.topic import Topic
from converge.policy.trust import TrustModel
logger = logging.getLogger(__name__)
[docs]
@dataclass
class DiscoveryQuery:
"""Query criteria for discovery: topics, capabilities, and optional trust threshold."""
topics: list[Topic] = field(default_factory=list)
capabilities: list[str] = field(default_factory=list)
trust_threshold: float = 0.0 # Min trust score (0.0-1.0) when trust_model set on DiscoveryService; ignored when 0.
[docs]
@dataclass
class AgentDescriptor:
"""
Descriptor of an agent for discovery and optional verification.
id, topics, and capabilities are used for query matching. public_key, when set,
is used by peers to verify message signatures (register in IdentityRegistry).
"""
id: str
topics: list[Topic]
capabilities: list[Capability]
public_key: bytes | None = None
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize for persistence. public_key is base64-encoded if present."""
out: dict[str, Any] = {
"id": self.id,
"topics": [t.to_dict() for t in self.topics],
"capabilities": [
{
"name": c.name,
"version": c.version,
"description": c.description,
"constraints": c.constraints,
"costs": c.costs,
"latency_ms": c.latency_ms,
}
for c in self.capabilities
],
}
if self.public_key is not None:
out["public_key"] = base64.b64encode(self.public_key).decode("ascii")
return out
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AgentDescriptor":
"""Deserialize from stored dict. public_key is decoded from base64 if present."""
topics = [Topic.from_dict(t) for t in data.get("topics", [])]
caps_data = data.get("capabilities", [])
capabilities = [
Capability(
name=c.get("name", ""),
version=c.get("version", "1.0"),
description=c.get("description", ""),
constraints=c.get("constraints", {}),
costs=c.get("costs", {}),
latency_ms=c.get("latency_ms", 0),
)
for c in caps_data
]
public_key = None
if "public_key" in data and data["public_key"]:
public_key = base64.b64decode(data["public_key"].encode("ascii"))
return cls(
id=data.get("id", ""),
topics=topics,
capabilities=capabilities,
public_key=public_key,
)
_DISCOVERY_PREFIX = "discovery:agent:"
[docs]
class DiscoveryService:
"""
Service for discovering agents based on queries.
Persists AgentDescriptors in Store when provided.
"""
[docs]
def __init__(self, store: Store | None = None, trust_model: TrustModel | None = None):
"""
Initialize the discovery service.
Args:
store: Optional store for persisting descriptors across restarts.
trust_model: Optional TrustModel for trust-aware filtering when
query.trust_threshold > 0. When set, query() filters out agents
whose trust score is below the threshold.
"""
self.descriptors: dict[str, AgentDescriptor] = {}
self.store = store
self.trust_model = trust_model
if store:
self._load_from_store()
def _load_from_store(self) -> None:
"""Load persisted descriptors from store."""
if not self.store:
return
keys = self.store.list(_DISCOVERY_PREFIX)
for key in keys:
val = self.store.get(key)
if isinstance(val, dict):
try:
desc = AgentDescriptor.from_dict(val)
self.descriptors[desc.id] = desc
except Exception:
pass
[docs]
def register(self, descriptor: AgentDescriptor) -> None:
self.descriptors[descriptor.id] = descriptor
if self.store:
self.store.put(
f"{_DISCOVERY_PREFIX}{descriptor.id}",
descriptor.to_dict(),
)
[docs]
def unregister(self, agent_id: str) -> None:
if agent_id in self.descriptors:
del self.descriptors[agent_id]
if self.store:
self.store.delete(f"{_DISCOVERY_PREFIX}{agent_id}")
[docs]
def query(
self,
query: DiscoveryQuery,
candidates: list[AgentDescriptor] | None = None,
*,
refresh_from_store: bool = False,
) -> list[AgentDescriptor]:
"""
Filter agent candidates based on a discovery query.
When candidates is None, uses the service's own descriptors (optionally
refreshed from store when refresh_from_store is True and a store is set).
When query.trust_threshold > 0 and trust_model is set, only agents with
trust score >= threshold are returned.
Args:
query: The criteria for discovery (topics, capabilities, trust_threshold).
candidates: Optional list of agents to search within. If None, uses
self.descriptors (after optional refresh from store).
refresh_from_store: If True and store is set, reload descriptors from
store before filtering. Use in multi-process setups to see
agents registered by other processes.
Returns:
List of agents that match the query criteria (topics, capabilities,
and trust when trust_threshold > 0 and trust_model is set).
"""
if refresh_from_store and self.store:
self._load_from_store()
if candidates is None:
candidates = list(self.descriptors.values())
results = []
for agent in candidates:
# Check topics
topic_match = True
if query.topics:
agent_topic_ids = {str(t) for t in agent.topics}
query_topic_ids = {str(t) for t in query.topics}
if not agent_topic_ids.intersection(query_topic_ids):
topic_match = False
# Check capabilities (support both Capability objects and str names from Agent)
cap_match = True
if query.capabilities:
agent_caps: set[str] = set()
for c in agent.capabilities:
name = c.name if hasattr(c, "name") else str(c)
agent_caps.add(name)
if not set(query.capabilities).issubset(agent_caps):
cap_match = False
if not (topic_match and cap_match):
continue
# Trust filter: when threshold > 0 and trust_model set, require score >= threshold
if query.trust_threshold > 0:
if self.trust_model is not None:
if self.trust_model.get_trust(agent.id) < query.trust_threshold:
continue
else:
logger.warning(
"trust_threshold > 0 but no trust_model on DiscoveryService; skipping trust filter",
)
results.append(agent)
return results