Source code for swarph_mesh.hooks
"""Lifecycle hooks per PLAN.md §9.
v0.1.0 ships THREE hook points wired into :class:`HookSet`:
* ``pre_call`` — before adapter dispatch
* ``post_call`` — after adapter returns successfully
* ``on_error`` — adapter raised
PLAN.md §9 names two additional points (``pre_parse``, ``post_parse``)
that wrap the JSON-mode harness. Those are NOT scaffolded in v0.1.0
— add to :class:`HookSet` when a call site materializes that needs
them. Documenting as-future per drop PR #1 review carry-forward #3
(don't ship empty hook lists for hook points no caller has asked for).
The default ``post_call`` hook writes an :class:`AttributionEvent`
via the configured :class:`AttributionWriter`. Override at SwarphCall
construction with ``hooks=HookSet()`` to opt out, or with
``hooks=HookSet(post_call=[attribution_post_call(writer=...)])`` to
swap the writer per-call.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable, Optional
from swarph_mesh.attribution import (
AttributionWriter,
get_default_writer,
make_event,
)
from swarph_mesh.types import ChatMessage, LLMResponse
[docs]
@dataclass
class CallContext:
"""Per-call envelope passed through the hook pipeline. Mutable —
pre_call hooks can rewrite ``messages`` (e.g., redaction layer).
Adapters MUST treat ``messages`` and ``model`` as inputs only,
not outputs."""
provider: str
model: str
caller: str
role: str
mesh_peer: Optional[str]
messages: list[ChatMessage]
system_prompt: Optional[str] = None
json_schema: Optional[dict] = None
temperature: float = 0.7
max_tokens: Optional[int] = None
extra: dict[str, Any] = field(default_factory=dict)
PreCallHook = Callable[[CallContext], Awaitable[None]]
PostCallHook = Callable[[CallContext, LLMResponse], Awaitable[None]]
OnErrorHook = Callable[[CallContext, BaseException], Awaitable[None]]
[docs]
@dataclass
class HookSet:
"""Collection of hooks for one ``SwarphCall``. Hooks fire in
registration order. Empty by default — callers register what
they need."""
pre_call: list[PreCallHook] = field(default_factory=list)
post_call: list[PostCallHook] = field(default_factory=list)
on_error: list[OnErrorHook] = field(default_factory=list)
[docs]
def attribution_post_call(
writer: Optional[AttributionWriter] = None,
) -> PostCallHook:
"""Default post-call hook factory: writes one
:class:`AttributionEvent` per successful call.
Use the module-level default writer (``FileAttributionWriter``
by default) if none is provided. Production consumers swap the
default writer at startup; per-call override is also supported.
"""
async def _hook(ctx: CallContext, resp: LLMResponse) -> None:
w = writer or get_default_writer()
cached_tokens = (resp.raw_response or {}).get("cached_tokens", 0) if resp.raw_response else 0
event = make_event(
provider=ctx.provider,
model=ctx.model,
role=ctx.role,
caller=ctx.caller,
mesh_peer=ctx.mesh_peer,
input_tokens=resp.input_tokens,
output_tokens=resp.output_tokens,
cached_tokens=int(cached_tokens),
cost_usd=resp.cost_usd,
duration_s=resp.duration_s,
cached=resp.cached,
error_class=resp.error_class,
)
await w.write(event)
return _hook
[docs]
def default_hooks(writer: Optional[AttributionWriter] = None) -> HookSet:
"""Return a HookSet with the default attribution post-call hook
pre-installed. Other slots empty."""
return HookSet(post_call=[attribution_post_call(writer=writer)])