converge.extensions¶
Optional extensions: storage, crypto, LLM, connectors, and runtime rate limiting. Storage: MemoryStore (in-memory) and FileStore (file-backed with pickle) implement the Store interface. Crypto: encrypt/decrypt (AES-256-GCM), derive_key (PBKDF2-HMAC-SHA256), secure_random_bytes. LLM: LLMAgent and providers (OpenAI, Anthropic, Mistral) for LLM-driven decide(); install from source with pip install -e ".[llm]". Rate limiting: RateLimiter + RateLimitHook for token-bucket ingress/egress middleware. Connectors: WebhookConnector/WebhookGateway for strict sidecar webhook ingress/egress.
- class converge.extensions.storage.memory.MemoryStore[source]¶
Bases:
StoreIn-memory storage implementation.
- class converge.extensions.storage.file.FileStore(base_path: str, *, locking: bool = False)[source]¶
Bases:
StoreFile-based storage using pickle.
Keys are encoded to safe filenames to avoid path traversal and accidental directory escaping. Writes are atomic via temp-file + replace.
Crypto extension: symmetric encryption, key derivation, secure random.
- converge.extensions.crypto.encrypt(plaintext: bytes, key: bytes, associated_data: bytes | None = None) bytes[source]¶
Encrypt plaintext with AES-256-GCM.
- Parameters:
plaintext – Data to encrypt.
key – 32-byte encryption key.
associated_data – Optional authenticated associated data (not encrypted).
- Returns:
nonce (12 bytes) + ciphertext + tag (16 bytes), concatenated.
- converge.extensions.crypto.decrypt(ciphertext: bytes, key: bytes, associated_data: bytes | None = None) bytes[source]¶
Decrypt AES-256-GCM ciphertext.
- Parameters:
ciphertext – nonce (12 bytes) + encrypted data + tag (16 bytes).
key – 32-byte decryption key.
associated_data – Same as used for encryption, if any.
- Returns:
Decrypted plaintext.
- converge.extensions.crypto.derive_key(password: str, salt: bytes, length: int = 32, iterations: int = 100000) bytes[source]¶
Derive a key from a password using PBKDF2-HMAC-SHA256.
- Parameters:
password – The password to derive from.
salt – Random salt (at least 16 bytes recommended).
length – Desired key length in bytes.
iterations – Number of PBKDF2 iterations.
- Returns:
Derived key bytes of the specified length.
- converge.extensions.crypto.secure_random_bytes(n: int) bytes[source]¶
Generate cryptographically secure random bytes.
- Parameters:
n – Number of bytes to generate.
- Returns:
Random bytes of length n.
LLM extension: provider abstraction and LLM-driven agent.
- class converge.extensions.llm.LLMAgent(identity: Any, provider: Any, system_prompt: str | None = None, tool_registry: Any = None, *, use_structured_output: bool = False, on_decide_error: Any = None, memory: Any = None, few_shot_examples: list[tuple[str, str]] | None = None)[source]¶
Bases:
AgentAgent that uses an LLM to produce decisions in decide().
- __init__(identity: Any, provider: Any, system_prompt: str | None = None, tool_registry: Any = None, *, use_structured_output: bool = False, on_decide_error: Any = None, memory: Any = None, few_shot_examples: list[tuple[str, str]] | None = None)[source]¶
Initialize the LLM agent.
- Parameters:
identity – Cryptographic identity (converge.core.identity.Identity).
provider – LLM provider implementing
chat(messages, **kwargs) -> str.system_prompt – Optional override for the system prompt.
tool_registry – Optional ToolRegistry; when set, tool schemas are injected into the system prompt (fallback path).
use_structured_output – When True, request provider-native structured output (e.g. OpenAI function calling) when supported.
on_decide_error – Optional callable (exception_or_message: Exception | str) -> None for observability when decide fails.
memory – Optional ShortTermMemory (or object with append(role, content), get_messages()) for conversation history.
few_shot_examples – Optional list of (user_content, assistant_json) example pairs to inject into the prompt.
- async adecide(messages: list[Any], tasks: list[Any], tool_observations: list[dict[str, Any]] | None = None, **kwargs: Any) list[Any][source]¶
Use the LLM to produce decisions from messages and tasks.
- Parameters:
messages – Incoming messages from the inbox.
tasks – Task updates or assignments.
tool_observations – Optional list of {tool_name, params, result} from previous InvokeTool (ReAct loop).
**kwargs – Ignored; for compatibility with base decide().
- Returns:
List of Decision objects (e.g. SendMessage).
- class converge.extensions.llm.LLMProvider(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for LLM providers. Implementations produce text completions from a list of messages.
Optional streaming: implementations may provide
chat_stream(messages, **kwargs) -> AsyncIterator[str]to yield tokens incrementally. When present, LLMAgent or callers can use it for progress or streaming UIs. Message payload convention for streaming: usepayload["streaming"]orpayload["progress"]for UI hints.
- class converge.extensions.llm.OpenAIProvider(api_key: str | None = None, model: str = 'gpt-4o-mini')[source]¶
Bases:
objectLLM provider using the OpenAI API. Requires openai>=1.0: pip install “converge[llm]”
- __init__(api_key: str | None = None, model: str = 'gpt-4o-mini')[source]¶
Initialize the OpenAI provider.
- Parameters:
api_key – OpenAI API key. If None, uses OPENAI_API_KEY env var.
model – Model name (e.g. gpt-4o-mini, gpt-4o).
- chat(messages: list[dict[str, Any]], **kwargs: Any) str[source]¶
Send messages to OpenAI and return the completion text. When use_structured_output=True and emit_decisions_tool is provided, uses function calling and returns the tool call arguments (JSON string of decision array).
- Parameters:
messages – List of {“role”: “user”|”assistant”|”system”, “content”: str}.
**kwargs – Overrides (e.g. model, temperature). use_structured_output=True and emit_decisions_tool=<dict> enable structured decision output via tool call.
- Returns:
The assistant’s reply content, or the emit_decisions tool call arguments when structured.
- class converge.extensions.llm.AnthropicProvider(api_key: str | None = None, model: str = 'claude-sonnet-4-20250514')[source]¶
Bases:
objectLLM provider using the Anthropic API. Requires anthropic>=0.18: pip install “converge[llm-anthropic]”
- __init__(api_key: str | None = None, model: str = 'claude-sonnet-4-20250514')[source]¶
Initialize the Anthropic provider.
- Parameters:
api_key – Anthropic API key. If None, uses ANTHROPIC_API_KEY env var.
model – Model name (e.g. claude-sonnet-4-20250514, claude-3-5-haiku).
- chat(messages: list[dict[str, Any]], **kwargs: Any) str[source]¶
Send messages to Anthropic and return the completion text. When use_structured_output=True and emit_decisions_tool is provided, uses tools API and returns the tool use input (JSON string).
- Parameters:
messages – List of {“role”: “user”|”assistant”|”system”, “content”: str}.
**kwargs – Overrides. use_structured_output and emit_decisions_tool enable structured output.
- Returns:
The assistant’s reply content, or the emit_decisions tool input when structured.
- class converge.extensions.llm.MistralProvider(api_key: str | None = None, model: str = 'mistral-small-latest')[source]¶
Bases:
objectLLM provider using the Mistral AI API. Requires mistralai>=1.0: pip install “converge[llm-mistral]”
- __init__(api_key: str | None = None, model: str = 'mistral-small-latest')[source]¶
Initialize the Mistral provider.
- Parameters:
api_key – Mistral API key. If None, uses MISTRAL_API_KEY env var.
model – Model name (e.g. mistral-small-latest, mistral-large-latest).
- chat(messages: list[dict[str, Any]], **kwargs: Any) str[source]¶
Send messages to Mistral and return the completion text. When use_structured_output=True and emit_decisions_tool is provided, uses tool calling and returns the tool call arguments (JSON string).
- Parameters:
messages – List of {“role”: “user”|”assistant”|”system”, “content”: str}.
**kwargs – Overrides. use_structured_output and emit_decisions_tool enable structured output.
- Returns:
The assistant’s reply content, or the emit_decisions tool arguments when structured.
- class converge.extensions.rate_limit.TokenBucketConfig(capacity: 'float', refill_tokens_per_sec: 'float')[source]¶
Bases:
object
- class converge.extensions.rate_limit.RateLimiter(*, store: Store | None = None, global_config: TokenBucketConfig | None = None, sender_config: TokenBucketConfig | None = None, topic_config: TokenBucketConfig | None = None)[source]¶
Bases:
object
- class converge.extensions.rate_limit.RateLimitHook(rate_limiter: RateLimiter, *, metrics_collector: MetricsCollector | None = None)[source]¶
Bases:
MessageHook
- class converge.extensions.connectors.CircuitBreaker(failure_threshold: 'int' = 5, recovery_timeout_sec: 'float' = 30.0, _failures: 'int' = 0, _opened_at: 'float | None' = None)[source]¶
Bases:
object
- class converge.extensions.connectors.InboundWebhookEvent(provider: 'str', source: 'str', event_id: 'str', occurred_at: 'float', subject: 'str', payload: 'dict[str, Any]', headers: 'dict[str, str]', trace_id: 'str | None' = None)[source]¶
Bases:
object
- class converge.extensions.connectors.OutboundWebhookAction(target: 'str', method: 'str', url: 'str', body: 'dict[str, Any]', headers: 'dict[str, str]'=<factory>, idempotency_key: 'str | None' = None, deadline: 'float | None' = None)[source]¶
Bases:
object
- class converge.extensions.connectors.ProviderProfile(name: 'str', secret_ref: 'str', signature_header: 'str' = 'X-Webhook-Signature', timestamp_header: 'str' = 'X-Webhook-Timestamp', event_id_field: 'str' = 'event_id', signature_algorithm: 'str' = 'sha256', canonicalization: 'str' = 'raw_body', required_payload_fields: 'tuple[str, ...]' = (), subject_field: 'str' = 'subject', source_field: 'str' = 'source', emit_as: 'str' = 'message')[source]¶
Bases:
object
- class converge.extensions.connectors.WebhookConnector(*, provider_profiles: dict[str, ProviderProfile], secrets: dict[str, str], security_policy: WebhookSecurityPolicy | None = None, retry_policy: WebhookRetryPolicy | None = None, store: Store | None = None, metrics_collector: MetricsCollector | None = None, connector_id: str = 'webhook-connector', outbound_sender: Any | None = None)[source]¶
Bases:
objectGeneric webhook bridge with strict inbound verification and reliable outbound dispatch.
- async accept_inbound_http(provider: str, *, method: str, headers: dict[str, str], raw_body: bytes, remote_addr: str | None = None, client_cert_present: bool = False) tuple[int, dict[str, str], bytes][source]¶
- async submit_outbound_action(action: OutboundWebhookAction) None[source]¶
- async dispatch_outbound_action(action: OutboundWebhookAction) bool[source]¶
- class converge.extensions.connectors.WebhookGateway(connector: WebhookConnector)[source]¶
Bases:
objectHTTP-oriented facade around WebhookConnector.
- class converge.extensions.connectors.WebhookRetryPolicy(max_attempts: 'int' = 5, base_delay_sec: 'float' = 0.25, max_delay_sec: 'float' = 10.0, jitter_ratio: 'float' = 0.2, request_timeout_sec: 'float' = 5.0)[source]¶
Bases:
object
- class converge.extensions.connectors.WebhookSecurity(policy: WebhookSecurityPolicy, provider_profiles: dict[str, ProviderProfile], secrets: dict[str, str], *, store: Store | None = None)[source]¶
Bases:
object
- exception converge.extensions.connectors.WebhookSecurityError[source]¶
Bases:
ValueError
- class converge.extensions.connectors.WebhookSecurityPolicy(timestamp_skew_limit_sec: 'int' = 300, idempotency_ttl_sec: 'int' = 3600, max_payload_bytes: 'int' = 1048576, allowed_content_types: 'tuple[str, ...]' = ('application/json',), require_mtls: 'bool' = False, ip_allowlist: 'tuple[str, ...]' = (), strict_mode: 'bool' = True)[source]¶
Bases:
object