Source code for swarph_mesh.attribution

"""Attribution event + writer protocol.

Per PLAN.md §8, every ``SwarphCall`` writes one row of attribution
data after a successful adapter call. The package ships writer
implementations for common cases (no-op, JSONL file); production
consumers (OMEGA droplet) wire up a TSDB writer at module load.

The writer is a Protocol — consumers can plug in any callable
matching the shape. Default in v0.1.0: ``FileAttributionWriter``
appending to ``~/.swarph/attribution.jsonl`` so cost data isn't
silently dropped on first run, but no DB driver is forced.
"""

from __future__ import annotations

import asyncio
import json
import os
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Optional, Protocol, runtime_checkable


[docs] @dataclass class AttributionEvent: """One row of cost / latency / token attribution for an LLM call. Fields match the shape consumed by the OMEGA ``token_usage`` / ``subscription_usage`` TimescaleDB hypertables (see hedge-fund-mcp ``database/init_timescale.sql``). External consumers can map to their own schema in their writer impl. """ timestamp: float # unix seconds at call completion provider: str # "gemini" | "claude" | ... model: str # provider-specific model id role: str # caller role label (orchestrator/council/...) caller: str # dotted-slug per swarph_shared.caller_convention mesh_peer: Optional[str] # peer scope for attribution joins, may be None input_tokens: int output_tokens: int cached_tokens: int = 0 cost_usd: float = 0.0 duration_s: float = 0.0 cached: bool = False error_class: Optional[str] = None extra: dict[str, Any] = field(default_factory=dict)
[docs] def to_dict(self) -> dict[str, Any]: return asdict(self)
[docs] @runtime_checkable class AttributionWriter(Protocol): """Protocol for sink-side attribution writers. Implementations can write to TSDB, JSONL files, OTLP, stdout, or be no-op."""
[docs] async def write(self, event: AttributionEvent) -> None: ...
[docs] class NullAttributionWriter: """No-op. Useful in test fixtures where attribution noise is distracting and the call's `LLMResponse` already carries the cost/token data inline."""
[docs] async def write(self, event: AttributionEvent) -> None: # noqa: D401 return None
[docs] class FileAttributionWriter: """Append-only JSONL writer at ``~/.swarph/attribution.jsonl``. Default writer in v0.1.0 — every call lands a parseable line, no DB driver required. Single-process safe via an asyncio.Lock. Multi-process scenarios (which v0.1.0 doesn't target) would need a file-lock; document and defer. """ def __init__(self, path: Optional[Path] = None): self.path = path or Path.home() / ".swarph" / "attribution.jsonl" self.path.parent.mkdir(parents=True, exist_ok=True) self._lock = asyncio.Lock()
[docs] async def write(self, event: AttributionEvent) -> None: line = json.dumps(event.to_dict(), separators=(",", ":")) + "\n" async with self._lock: # blocking write inside the lock — small, line-buffered with self.path.open("a", encoding="utf-8") as f: f.write(line)
_default_writer: AttributionWriter = FileAttributionWriter()
[docs] def get_default_writer() -> AttributionWriter: """Return the module-level default writer. ``SwarphCall`` uses this when no per-call writer is provided.""" return _default_writer
[docs] def set_default_writer(writer: AttributionWriter) -> None: """Override the module-level default writer (e.g., production consumers call this at startup with a TSDB-backed writer).""" global _default_writer _default_writer = writer
[docs] def make_event( *, provider: str, model: str, role: str, caller: str, mesh_peer: Optional[str], input_tokens: int, output_tokens: int, cached_tokens: int = 0, cost_usd: float = 0.0, duration_s: float = 0.0, cached: bool = False, error_class: Optional[str] = None, extra: Optional[dict[str, Any]] = None, ) -> AttributionEvent: """Construct an :class:`AttributionEvent` with ``timestamp=now``.""" return AttributionEvent( timestamp=time.time(), provider=provider, model=model, role=role, caller=caller, mesh_peer=mesh_peer, input_tokens=input_tokens, output_tokens=output_tokens, cached_tokens=cached_tokens, cost_usd=cost_usd, duration_s=duration_s, cached=cached, error_class=error_class, extra=extra or {}, )