converge.extensions

Optional extensions: storage, crypto, LLM, connectors, and runtime rate limiting. Storage: MemoryStore (in-memory) and FileStore (file-backed with pickle) implement the Store interface. Crypto: encrypt/decrypt (AES-256-GCM), derive_key (PBKDF2-HMAC-SHA256), secure_random_bytes. LLM: LLMAgent and providers (OpenAI, Anthropic, Mistral) for LLM-driven decide(); install from source with pip install -e ".[llm]". Rate limiting: RateLimiter + RateLimitHook for token-bucket ingress/egress middleware. Connectors: WebhookConnector/WebhookGateway for strict sidecar webhook ingress/egress.

class converge.extensions.storage.memory.MemoryStore[source]

Bases: Store

In-memory storage implementation.

atomic_put_if_absent: bool = True
supports_locking: bool = False
put(key: str, value: Any) None[source]

Store a value.

get(key: str) Any | None[source]

Retrieve a value.

delete(key: str) None[source]

Delete a value.

list(prefix: str = '') list[str][source]

List keys starting with prefix.

put_if_absent(key: str, value: Any) bool[source]

Atomic put only if key is absent.

class converge.extensions.storage.file.FileStore(base_path: str, *, locking: bool = False)[source]

Bases: Store

File-based storage using pickle.

Keys are encoded to safe filenames to avoid path traversal and accidental directory escaping. Writes are atomic via temp-file + replace.

atomic_put_if_absent: bool = True
supports_locking: bool = True
put(key: str, value: Any) None[source]

Store a value.

get(key: str) Any | None[source]

Retrieve a value.

delete(key: str) None[source]

Delete a value.

list(prefix: str = '') list[str][source]

List keys starting with prefix.

put_if_absent(key: str, value: Any) bool[source]

Store value only if key is absent. Return True if stored, False if key existed. Default implementation is not atomic; backends should override for atomicity.

Crypto extension: symmetric encryption, key derivation, secure random.

converge.extensions.crypto.encrypt(plaintext: bytes, key: bytes, associated_data: bytes | None = None) bytes[source]

Encrypt plaintext with AES-256-GCM.

Parameters:
  • plaintext – Data to encrypt.

  • key – 32-byte encryption key.

  • associated_data – Optional authenticated associated data (not encrypted).

Returns:

nonce (12 bytes) + ciphertext + tag (16 bytes), concatenated.

converge.extensions.crypto.decrypt(ciphertext: bytes, key: bytes, associated_data: bytes | None = None) bytes[source]

Decrypt AES-256-GCM ciphertext.

Parameters:
  • ciphertext – nonce (12 bytes) + encrypted data + tag (16 bytes).

  • key – 32-byte decryption key.

  • associated_data – Same as used for encryption, if any.

Returns:

Decrypted plaintext.

converge.extensions.crypto.derive_key(password: str, salt: bytes, length: int = 32, iterations: int = 100000) bytes[source]

Derive a key from a password using PBKDF2-HMAC-SHA256.

Parameters:
  • password – The password to derive from.

  • salt – Random salt (at least 16 bytes recommended).

  • length – Desired key length in bytes.

  • iterations – Number of PBKDF2 iterations.

Returns:

Derived key bytes of the specified length.

converge.extensions.crypto.secure_random_bytes(n: int) bytes[source]

Generate cryptographically secure random bytes.

Parameters:

n – Number of bytes to generate.

Returns:

Random bytes of length n.

LLM extension: provider abstraction and LLM-driven agent.

class converge.extensions.llm.LLMAgent(identity: Any, provider: Any, system_prompt: str | None = None, tool_registry: Any = None, *, use_structured_output: bool = False, on_decide_error: Any = None, memory: Any = None, few_shot_examples: list[tuple[str, str]] | None = None)[source]

Bases: Agent

Agent that uses an LLM to produce decisions in decide().

__init__(identity: Any, provider: Any, system_prompt: str | None = None, tool_registry: Any = None, *, use_structured_output: bool = False, on_decide_error: Any = None, memory: Any = None, few_shot_examples: list[tuple[str, str]] | None = None)[source]

Initialize the LLM agent.

Parameters:
  • identity – Cryptographic identity (converge.core.identity.Identity).

  • provider – LLM provider implementing chat(messages, **kwargs) -> str.

  • system_prompt – Optional override for the system prompt.

  • tool_registry – Optional ToolRegistry; when set, tool schemas are injected into the system prompt (fallback path).

  • use_structured_output – When True, request provider-native structured output (e.g. OpenAI function calling) when supported.

  • on_decide_error – Optional callable (exception_or_message: Exception | str) -> None for observability when decide fails.

  • memory – Optional ShortTermMemory (or object with append(role, content), get_messages()) for conversation history.

  • few_shot_examples – Optional list of (user_content, assistant_json) example pairs to inject into the prompt.

async adecide(messages: list[Any], tasks: list[Any], tool_observations: list[dict[str, Any]] | None = None, **kwargs: Any) list[Any][source]

Use the LLM to produce decisions from messages and tasks.

Parameters:
  • messages – Incoming messages from the inbox.

  • tasks – Task updates or assignments.

  • tool_observations – Optional list of {tool_name, params, result} from previous InvokeTool (ReAct loop).

  • **kwargs – Ignored; for compatibility with base decide().

Returns:

List of Decision objects (e.g. SendMessage).

decide(messages: list[Any], tasks: list[Any], tool_observations: list[dict[str, Any]] | None = None, **kwargs: Any) list[Any][source]

Synchronous wrapper for compatibility outside async runtimes.

In async runtimes, use await adecide(...).

class converge.extensions.llm.LLMProvider(*args, **kwargs)[source]

Bases: Protocol

Protocol for LLM providers. Implementations produce text completions from a list of messages.

Optional streaming: implementations may provide chat_stream(messages, **kwargs) -> AsyncIterator[str] to yield tokens incrementally. When present, LLMAgent or callers can use it for progress or streaming UIs. Message payload convention for streaming: use payload["streaming"] or payload["progress"] for UI hints.

chat(messages: list[dict[str, Any]], **kwargs: Any) str[source]

Send messages to the LLM and return the completion text.

Parameters:
  • messages – List of message dicts with “role” and “content” keys.

  • **kwargs – Provider-specific options (model, temperature, etc.).

Returns:

The model’s response text.

async achat(messages: list[dict[str, Any]], **kwargs: Any) str[source]

Optional async variant for event-loop friendly providers.

Providers may omit this method; callers can fallback to running chat() in a worker thread.

class converge.extensions.llm.OpenAIProvider(api_key: str | None = None, model: str = 'gpt-4o-mini')[source]

Bases: object

LLM provider using the OpenAI API. Requires openai>=1.0: pip install “converge[llm]”

__init__(api_key: str | None = None, model: str = 'gpt-4o-mini')[source]

Initialize the OpenAI provider.

Parameters:
  • api_key – OpenAI API key. If None, uses OPENAI_API_KEY env var.

  • model – Model name (e.g. gpt-4o-mini, gpt-4o).

chat(messages: list[dict[str, Any]], **kwargs: Any) str[source]

Send messages to OpenAI and return the completion text. When use_structured_output=True and emit_decisions_tool is provided, uses function calling and returns the tool call arguments (JSON string of decision array).

Parameters:
  • messages – List of {“role”: “user”|”assistant”|”system”, “content”: str}.

  • **kwargs – Overrides (e.g. model, temperature). use_structured_output=True and emit_decisions_tool=<dict> enable structured decision output via tool call.

Returns:

The assistant’s reply content, or the emit_decisions tool call arguments when structured.

class converge.extensions.llm.AnthropicProvider(api_key: str | None = None, model: str = 'claude-sonnet-4-20250514')[source]

Bases: object

LLM provider using the Anthropic API. Requires anthropic>=0.18: pip install “converge[llm-anthropic]”

__init__(api_key: str | None = None, model: str = 'claude-sonnet-4-20250514')[source]

Initialize the Anthropic provider.

Parameters:
  • api_key – Anthropic API key. If None, uses ANTHROPIC_API_KEY env var.

  • model – Model name (e.g. claude-sonnet-4-20250514, claude-3-5-haiku).

chat(messages: list[dict[str, Any]], **kwargs: Any) str[source]

Send messages to Anthropic and return the completion text. When use_structured_output=True and emit_decisions_tool is provided, uses tools API and returns the tool use input (JSON string).

Parameters:
  • messages – List of {“role”: “user”|”assistant”|”system”, “content”: str}.

  • **kwargs – Overrides. use_structured_output and emit_decisions_tool enable structured output.

Returns:

The assistant’s reply content, or the emit_decisions tool input when structured.

class converge.extensions.llm.MistralProvider(api_key: str | None = None, model: str = 'mistral-small-latest')[source]

Bases: object

LLM provider using the Mistral AI API. Requires mistralai>=1.0: pip install “converge[llm-mistral]”

__init__(api_key: str | None = None, model: str = 'mistral-small-latest')[source]

Initialize the Mistral provider.

Parameters:
  • api_key – Mistral API key. If None, uses MISTRAL_API_KEY env var.

  • model – Model name (e.g. mistral-small-latest, mistral-large-latest).

chat(messages: list[dict[str, Any]], **kwargs: Any) str[source]

Send messages to Mistral and return the completion text. When use_structured_output=True and emit_decisions_tool is provided, uses tool calling and returns the tool call arguments (JSON string).

Parameters:
  • messages – List of {“role”: “user”|”assistant”|”system”, “content”: str}.

  • **kwargs – Overrides. use_structured_output and emit_decisions_tool enable structured output.

Returns:

The assistant’s reply content, or the emit_decisions tool arguments when structured.

class converge.extensions.rate_limit.TokenBucketConfig(capacity: 'float', refill_tokens_per_sec: 'float')[source]

Bases: object

capacity: float
refill_tokens_per_sec: float
class converge.extensions.rate_limit.RateLimiter(*, store: Store | None = None, global_config: TokenBucketConfig | None = None, sender_config: TokenBucketConfig | None = None, topic_config: TokenBucketConfig | None = None)[source]

Bases: object

allow_message(message: Message, *, direction: str) bool[source]
class converge.extensions.rate_limit.RateLimitHook(rate_limiter: RateLimiter, *, metrics_collector: MetricsCollector | None = None)[source]

Bases: MessageHook

pre_send(message: Message) Message | None[source]
post_receive(message: Message) Message | None[source]
on_error(stage: str, error: Exception, context: dict[str, Any]) None[source]
class converge.extensions.connectors.CircuitBreaker(failure_threshold: 'int' = 5, recovery_timeout_sec: 'float' = 30.0, _failures: 'int' = 0, _opened_at: 'float | None' = None)[source]

Bases: object

failure_threshold: int = 5
recovery_timeout_sec: float = 30.0
allow_request() bool[source]
record_success() None[source]
record_failure() None[source]
property is_open: bool
class converge.extensions.connectors.InboundWebhookEvent(provider: 'str', source: 'str', event_id: 'str', occurred_at: 'float', subject: 'str', payload: 'dict[str, Any]', headers: 'dict[str, str]', trace_id: 'str | None' = None)[source]

Bases: object

provider: str
source: str
event_id: str
occurred_at: float
subject: str
payload: dict[str, Any]
headers: dict[str, str]
trace_id: str | None = None
class converge.extensions.connectors.OutboundWebhookAction(target: 'str', method: 'str', url: 'str', body: 'dict[str, Any]', headers: 'dict[str, str]'=<factory>, idempotency_key: 'str | None' = None, deadline: 'float | None' = None)[source]

Bases: object

target: str
method: str
url: str
body: dict[str, Any]
headers: dict[str, str]
idempotency_key: str | None = None
deadline: float | None = None
class converge.extensions.connectors.ProviderProfile(name: 'str', secret_ref: 'str', signature_header: 'str' = 'X-Webhook-Signature', timestamp_header: 'str' = 'X-Webhook-Timestamp', event_id_field: 'str' = 'event_id', signature_algorithm: 'str' = 'sha256', canonicalization: 'str' = 'raw_body', required_payload_fields: 'tuple[str, ...]' = (), subject_field: 'str' = 'subject', source_field: 'str' = 'source', emit_as: 'str' = 'message')[source]

Bases: object

name: str
secret_ref: str
signature_header: str = 'X-Webhook-Signature'
timestamp_header: str = 'X-Webhook-Timestamp'
event_id_field: str = 'event_id'
signature_algorithm: str = 'sha256'
canonicalization: str = 'raw_body'
required_payload_fields: tuple[str, ...] = ()
subject_field: str = 'subject'
source_field: str = 'source'
emit_as: str = 'message'
class converge.extensions.connectors.WebhookConnector(*, provider_profiles: dict[str, ProviderProfile], secrets: dict[str, str], security_policy: WebhookSecurityPolicy | None = None, retry_policy: WebhookRetryPolicy | None = None, store: Store | None = None, metrics_collector: MetricsCollector | None = None, connector_id: str = 'webhook-connector', outbound_sender: Any | None = None)[source]

Bases: object

Generic webhook bridge with strict inbound verification and reliable outbound dispatch.

async accept_inbound_http(provider: str, *, method: str, headers: dict[str, str], raw_body: bytes, remote_addr: str | None = None, client_cert_present: bool = False) tuple[int, dict[str, str], bytes][source]
async submit_outbound_action(action: OutboundWebhookAction) None[source]
async submit_outbound_message(message: Message) None[source]
async dispatch_outbound_action(action: OutboundWebhookAction) bool[source]
async run_outbound_dispatcher(*, poll_interval_sec: float = 0.1) None[source]
stop_dispatcher() None[source]
class converge.extensions.connectors.WebhookGateway(connector: WebhookConnector)[source]

Bases: object

HTTP-oriented facade around WebhookConnector.

is_healthy() bool[source]
is_ready() bool[source]
async handle_post(provider: str, *, headers: dict[str, str], body: bytes, remote_addr: str | None = None, client_cert_present: bool = False) tuple[int, dict[str, str], bytes][source]
async handle_outbound_payload(payload: dict[str, Any]) tuple[int, bytes][source]
health_payload() bytes[source]
ready_payload() bytes[source]
metrics_payload() bytes[source]
class converge.extensions.connectors.WebhookRetryPolicy(max_attempts: 'int' = 5, base_delay_sec: 'float' = 0.25, max_delay_sec: 'float' = 10.0, jitter_ratio: 'float' = 0.2, request_timeout_sec: 'float' = 5.0)[source]

Bases: object

max_attempts: int = 5
base_delay_sec: float = 0.25
max_delay_sec: float = 10.0
jitter_ratio: float = 0.2
request_timeout_sec: float = 5.0
compute_delay(attempt: int) float[source]
class converge.extensions.connectors.WebhookSecurity(policy: WebhookSecurityPolicy, provider_profiles: dict[str, ProviderProfile], secrets: dict[str, str], *, store: Store | None = None)[source]

Bases: object

validate_and_extract(provider: str, *, method: str, headers: dict[str, str], raw_body: bytes, remote_addr: str | None = None, client_cert_present: bool = False) tuple[ProviderProfile, dict[str, Any], str, float][source]
exception converge.extensions.connectors.WebhookSecurityError[source]

Bases: ValueError

class converge.extensions.connectors.WebhookSecurityPolicy(timestamp_skew_limit_sec: 'int' = 300, idempotency_ttl_sec: 'int' = 3600, max_payload_bytes: 'int' = 1048576, allowed_content_types: 'tuple[str, ...]' = ('application/json',), require_mtls: 'bool' = False, ip_allowlist: 'tuple[str, ...]' = (), strict_mode: 'bool' = True)[source]

Bases: object

timestamp_skew_limit_sec: int = 300
idempotency_ttl_sec: int = 3600
max_payload_bytes: int = 1048576
allowed_content_types: tuple[str, ...] = ('application/json',)
require_mtls: bool = False
ip_allowlist: tuple[str, ...] = ()
strict_mode: bool = True