converge.observability

Logging, tracing, metrics, and replay. Logging: JsonFormatter for structured logs, configure_logging, get_logger, log_struct. Tracing: trace context manager and get_current_trace_id for span tracking; optional SpanExporter (register via register_span_exporter) is invoked when a trace() context exits with (span, duration_sec). MetricsCollector: counters and gauges with snapshot(); format_prometheus() returns Prometheus text exposition format for scrape endpoints. ReplayLog records directional message events (record_inbound, record_outbound; record_message compatibility alias), and ReplayRunner replays filtered events into inbox/callback targets in deterministic timestamp order. CoordinationMetrics provides stable task/pool metric helpers.

Operations: AgentRuntime supports optional health_check and ready_check callables (is_healthy(), is_ready()), and RuntimeOpsServer exposes /health, /ready, and /metrics over stdlib HTTP.

class converge.observability.logging.JsonFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]

Bases: Formatter

Formatter that outputs JSON strings for structured logging.

format(record: LogRecord) str[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

converge.observability.logging.configure_logging(level: int = 20, *, json_format: bool = False) None[source]

Configure the root logger with stream handlers and formatting.

Removes existing handlers to avoid duplication.

Parameters:
  • level (int) – The logging level to set (e.g. logging.INFO).

  • json_format (bool) – If True, use structured JSON logging. If False, use text format.

converge.observability.logging.get_logger(name: str) Logger[source]

Retrieve a named logger instance.

Parameters:

name (str) – The name of the logger (typically __name__).

Returns:

The configured logger instance.

Return type:

logging.Logger

converge.observability.logging.log_struct(logger: Logger, level: int, message: str, **kwargs: Any) None[source]

Log a message with structured data as extra fields.

This ensures that when JSON formatting is enabled, the kwargs appear as top-level keys in the JSON output.

Parameters:
  • logger (logging.Logger) – The logger instance to use.

  • level (int) – The severity level of the log message.

  • message (str) – The primary log message.

  • **kwargs – Arbitrary key-value pairs to attach to the log context.

class converge.observability.tracing.SpanExporter(*args, **kwargs)[source]

Bases: Protocol

Protocol for exporting spans when a trace() context exits.

export(span: Span, duration_sec: float) None[source]

Called with the span and its duration in seconds.

converge.observability.tracing.get_current_trace_id() str | None[source]

Get the current trace ID from context.

converge.observability.tracing.set_trace_id(trace_id: str | None = None) None[source]

Set the current trace ID in context. Pass None to clear.

converge.observability.tracing.register_span_exporter(exporter: SpanExporter | None) None[source]

Register a span exporter for the current context. When trace() exits, export(span, duration_sec) is called.

converge.observability.tracing.new_trace_id() str[source]

Generate a new trace ID.

class converge.observability.tracing.Span(trace_id: str, span_id: str = <factory>, parent_id: str | None = None, name: str = 'operation')[source]

Bases: object

Represents a unit of work within a trace.

trace_id

The global ID of the trace.

Type:

str

span_id

The unique ID of this span.

Type:

str

parent_id

The ID of the parent span (if any).

Type:

Optional[str]

name

The name of the operation being traced.

Type:

str

trace_id: str
span_id: str
parent_id: str | None = None
name: str = 'operation'
converge.observability.tracing.trace(operation_name: str) Span[source]

Start a new trace span.

If a trace context already exists, the new span uses the existing trace ID. Otherwise, a new trace ID is generated.

Parameters:

operation_name (str) – Simple description of the operation.

Returns:

A Span object usable as a context manager.

Return type:

Span

class converge.observability.metrics.MetricsCollector[source]

Bases: object

Collects and aggegrates operational metrics.

inc(metric_name: str, value: int = 1) None[source]

Increment a counter.

Parameters:
  • metric_name (str) – Name of the metric.

  • value (int) – Amount to increment.

gauge(metric_name: str, value: float) None[source]

Set a gauge value.

Parameters:
  • metric_name (str) – Name of the metric.

  • value (float) – Current value.

snapshot() dict[str, Any][source]

Return a snapshot of all metrics.

format_prometheus() str[source]

Return metrics in Prometheus text exposition format for scrape endpoints. Counters are emitted as gauge-style lines (current value); gauges as-is. Expose this from an HTTP server in user code (e.g. /metrics).

class converge.observability.replay.ReplayLog[source]

Bases: object

Manages the recording and playback of system events for debugging and analysis.

record_inbound(message: Message, *, agent_id: str | None = None, transport: str | None = None) None[source]

Record an inbound message event.

record_outbound(message: Message, *, agent_id: str | None = None, transport: str | None = None) None[source]

Record an outbound message event.

record_message(message: Message) None[source]

Compatibility alias for outbound message recording.

export(filepath: str) None[source]

Export the log to a file.

load(filepath: str) None[source]

Load a log from a file.

class converge.observability.replay.ReplayRunner(replay_log: ReplayLog)[source]

Bases: object

Replays message events from ReplayLog into an inbox or callback.

async replay(*, inbox=None, callback=None, direction: str | None = None, agent_id: str | None = None, start_ts: int | None = None, end_ts: int | None = None, dry_run: bool = False) list[dict[str, Any]][source]
class converge.observability.coordination_metrics.CoordinationMetrics(collector: MetricsCollector)[source]

Bases: object

task_submitted() None[source]
task_claimed() None[source]
task_completed() None[source]
task_failed() None[source]
task_cancelled() None[source]
pool_created() None[source]
pool_join() None[source]
pool_leave() None[source]
pool_size(size: int) None[source]
pending_tasks(size: int) None[source]
class converge.observability.runtime_ops.RuntimeOpsServer(runtime, metrics_collector: MetricsCollector | None = None, *, host: str = '127.0.0.1', port: int = 0)[source]

Bases: object

property address: tuple[str, int] | None
start() None[source]
stop() None[source]