"""``MeshClient`` — async wrapper around the mesh-gateway HTTP API per
PLAN.md §5.
Replaces the hand-rolled curl-based code in:
* ``lab-orchestrator/scripts/lab_loop_drain.py``
* ``hedge-fund-mcp/scripts/mesh_inbox_watcher.py`` (droplet-side)
* ``hedge-fund-mcp/scripts/science_claude_inbox_drain.py``
* assorted ad-hoc curl invocations in CLAUDE.md / session ritual
with a single typed surface so cross-Claude DM coordination doesn't
require re-discovering the gateway shape from scratch every time.
Public surface:
client = MeshClient(node="lab-ovh", token=os.environ["MESH_GATEWAY_TOKEN"])
peers = await client.list_peers()
msgs = await client.fetch(unread_only=True)
sent = await client.send(to="droplet", kind="fyi", content="...")
await client.mark_read(msg_id=123)
await client.register(url="http://lab-ovh:8787", capabilities={...})
Two structural invariants enforced:
1. **Recipient name validation.** Every ``send()`` calls
``swarph_shared.validate_node_name`` on the recipient — closes the
Vector A (peer-onboarding chatter) + Vector B (human-prompt
shorthand) framing-contagion classes documented in
``project_peer_name_canonical.md``.
2. **Mesh-secrets out-of-band guard.** Every ``send()`` runs a
best-effort regex sniff on content for obvious credentials
(``pypi-...``, ``sk-...``, ``gh[ops]_...``, ``eyJ...`` JWTs).
Hits raise :class:`MeshSecretLeakError` BEFORE the POST. CLAUDE.md
"Mesh secrets out-of-band only — NEVER ride /messages content
fields" is the non-negotiable rule. Best-effort because regex
misses are common — operators must still treat content fields as
public. The guard catches the obvious cases.
"""
from __future__ import annotations
import os
import re
from typing import Any, Literal, Optional
# v0.5.1 (drop DM #722 + #728): client-side enum for mesh-gateway-accepted
# DM kinds. Gateway validates server-side too; this is fail-fast at the
# type-check + runtime layer so callers don't round-trip a known-bad value
# (e.g., kind="review_request" → 400). Same shape as the validate_caller
# pattern from swarph_shared. Add new kinds here when the gateway ships
# them (current accepted set per server.py mesh-gateway):
DM_KIND = Literal["fyi", "question", "answer", "status", "unblock"]
import httpx
from swarph_shared import validate_node_name
from swarph_mesh.exceptions import SwarphMeshError
from swarph_mesh.mesh_types import MeshMessage, MeshPeer
DEFAULT_GATEWAY_URL = "http://lab-ovh:8788"
GATEWAY_TOKEN_ENV = "MESH_GATEWAY_TOKEN"
DEFAULT_TIMEOUT_SECONDS = 10.0
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
[docs]
class MeshGatewayError(SwarphMeshError):
"""Gateway returned non-2xx or otherwise non-conformant response."""
[docs]
class MeshAuthError(MeshGatewayError):
"""Authentication failed — token missing or rejected (401/403)."""
[docs]
class MeshSecretLeakError(SwarphMeshError):
"""Best-effort guard caught an apparent credential in DM content.
Refuses to send. Operators must rotate the credential and resend
without it.
"""
# ---------------------------------------------------------------------------
# Best-effort credential leak detector
# ---------------------------------------------------------------------------
# Patterns chosen to match observable shapes of credentials seen in
# session JSONLs in the field. Updates here are cheap; misses are
# expected (defense-in-depth, not strict validation).
_CRED_PATTERNS: list[tuple[str, re.Pattern[str]]] = [
("pypi token", re.compile(r"\bpypi-[A-Za-z0-9_-]{20,}")),
("anthropic api key", re.compile(r"\bsk-ant-[A-Za-z0-9_-]{20,}")),
# OpenAI ADMIN keys (highest privilege class — can create/delete keys,
# manage org). Listed FIRST so it matches before the generic openai
# pattern, giving the operator a properly scary error message.
# Endpoint that mints these: POST /v1/organization/admin_api_keys.
("openai ADMIN key (org-level privilege)", re.compile(r"\bsk-admin-[A-Za-z0-9_-]{20,}")),
# OpenAI key shapes: legacy ``sk-<48hex>`` + project keys
# ``sk-proj-<long>`` (newer, longer alphabet incl. ``_-``). Earlier
# regex ``sk-[A-Za-z0-9]{20,}`` missed sk-proj-... because of the
# underscore + hyphen in the body. Issue #4 tightening.
# NOTE: ``sk-admin-...`` is matched by the named pattern above so
# this catch-all doesn't downgrade an admin key's error severity.
("openai api key", re.compile(r"\bsk-(?!admin-)(?:proj-)?[A-Za-z0-9_-]{20,}")),
# xAI / Grok key — xai-<long>. Same shape as OpenAI's hyphen-prefix
# pattern. Added in v0.5.1 alongside the Grok adapter shipped in
# v0.5.0; would have matched the key the user pasted in chat earlier
# this session if mesh send had been the destination.
("xai api key", re.compile(r"\bxai-[A-Za-z0-9_-]{40,}")),
# DeepSeek api key — sk-<32hex> hex-only post the sk- prefix.
# Already covered loosely by the openai pattern but listing
# explicitly so the error message names the right provider.
("deepseek api key", re.compile(r"\bsk-[a-f0-9]{32}\b")),
# Google Gemini / Cloud API keys — ``AIza<35-alnum>`` format.
# Common shape across Google's API tier; would catch GEMINI_API_KEY
# leaks. Issue #4 tightening (was missing).
("google api key", re.compile(r"\bAIza[0-9A-Za-z_-]{35}")),
("github token", re.compile(r"\bgh[ops]_[A-Za-z0-9]{30,}")),
("aws access key", re.compile(r"\bAKIA[0-9A-Z]{16}\b")),
("aws secret key", re.compile(r"\b[A-Za-z0-9/+=]{40}\b(?=.*aws)")), # heuristic
# JWT — three base64url segments separated by dots (stricter than the
# generic shape so we don't false-fire on every blob with two dots).
("jwt", re.compile(r"\beyJ[A-Za-z0-9_-]{15,}\.eyJ[A-Za-z0-9_-]{15,}\.[A-Za-z0-9_-]{15,}\b")),
# Mesh gateway tokens — long random base64url-shaped values stored in
# MESH_GATEWAY_TOKEN env. Heuristic shape: 40+ chars of url-safe
# base64 alphabet. False-positive rate higher than other patterns —
# operator escape via skip_secret_check=True is the right hatch.
("possible mesh-gateway token", re.compile(r"\b[A-Za-z0-9_-]{48,}\b(?=.*MESH_GATEWAY_TOKEN)")),
]
def _check_no_secret(content: str) -> None:
"""Raise :class:`MeshSecretLeakError` if content looks like it
carries a credential. Best-effort; expected to miss novel shapes.
"""
for label, rx in _CRED_PATTERNS:
if rx.search(content):
raise MeshSecretLeakError(
f"refusing to send: content matches {label} pattern. "
"Mesh secrets out-of-band only — NEVER ride /messages "
"content fields. Use ssh+tee, commander manual paste, "
"or pre-redacted Hub records. (CLAUDE.md mesh-secrets rule.)"
)
# ---------------------------------------------------------------------------
# MeshClient
# ---------------------------------------------------------------------------
[docs]
class MeshClient:
"""Async client for the mesh-gateway HTTP API.
Construct once per peer (or per cycle), reuse for the lifetime of
the work. Backed by ``httpx.AsyncClient`` so connection-pooling
happens for free across many ``send()`` calls.
Use as an async context manager when you want explicit close:
async with MeshClient(node="lab-ovh") as client:
await client.send(to="droplet", kind="fyi", content="...")
Or instantiate normally + call ``aclose()`` when done.
"""
def __init__(
self,
node: str,
*,
token: Optional[str] = None,
gateway_url: Optional[str] = None,
timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
validate_self_name: bool = True,
):
"""``token`` falls back to ``MESH_GATEWAY_TOKEN`` env. ``gateway_url``
falls back to ``MESH_GATEWAY_URL`` env, then ``http://lab-ovh:8788``.
Set ``validate_self_name=False`` for test fixtures that need to
instantiate a client with a mock peer name. Production callers
should leave it on (caller's own ``node`` is the most common
contagion target — see project_peer_name_canonical.md).
"""
# Self-name validation — best-effort. If the gateway is offline
# we still want construction to succeed; the validation is
# registry-dependent so use strict=False to skip the registry
# check when offline.
if validate_self_name:
try:
node = validate_node_name(node, strict=False)
except Exception: # pragma: no cover — defensive
pass
self.node = node
self._token = token if token is not None else os.environ.get(GATEWAY_TOKEN_ENV)
self._gateway_url = (
gateway_url
or os.environ.get("MESH_GATEWAY_URL")
or DEFAULT_GATEWAY_URL
)
self._timeout = timeout_seconds
self._client: Optional[httpx.AsyncClient] = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def _ensure_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
headers: dict[str, str] = {}
if self._token:
headers["Authorization"] = f"Bearer {self._token}"
self._client = httpx.AsyncClient(
base_url=self._gateway_url,
headers=headers,
timeout=self._timeout,
)
return self._client
[docs]
async def aclose(self) -> None:
if self._client is not None and not self._client.is_closed:
await self._client.aclose()
self._client = None
async def __aenter__(self) -> "MeshClient":
self._ensure_client()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.aclose()
# ------------------------------------------------------------------
# Internal HTTP wrapper
# ------------------------------------------------------------------
async def _request(
self,
method: str,
path: str,
*,
params: Optional[dict[str, Any]] = None,
json: Optional[dict[str, Any]] = None,
) -> Any:
client = self._ensure_client()
try:
resp = await client.request(method, path, params=params, json=json)
except httpx.RequestError as exc:
raise MeshGatewayError(
f"mesh-gateway {method} {path} request failed: {exc}"
) from exc
if resp.status_code == 401 or resp.status_code == 403:
raise MeshAuthError(
f"mesh-gateway {method} {path} returned {resp.status_code}: "
f"check MESH_GATEWAY_TOKEN env"
)
if resp.status_code >= 400:
body = resp.text[:500]
raise MeshGatewayError(
f"mesh-gateway {method} {path} returned {resp.status_code}: {body}"
)
# Gateway returns JSON for everything except /health
if not resp.content:
return None
try:
return resp.json()
except ValueError as exc:
raise MeshGatewayError(
f"mesh-gateway {method} {path} returned non-JSON: {resp.text[:200]}"
) from exc
# ------------------------------------------------------------------
# Peer endpoints
# ------------------------------------------------------------------
[docs]
async def list_peers(self) -> list[MeshPeer]:
"""``GET /peers`` — return all registered peers."""
payload = await self._request("GET", "/peers")
rows = payload.get("peers", payload) if isinstance(payload, dict) else payload
return [MeshPeer.model_validate(r) for r in rows]
[docs]
async def register(
self,
*,
url: Optional[str] = None,
capabilities: Optional[dict[str, Any]] = None,
) -> MeshPeer:
"""``POST /peers/register`` — register this peer with the gateway."""
body: dict[str, Any] = {"name": self.node}
if url is not None:
body["url"] = url
if capabilities is not None:
body["capabilities"] = capabilities
payload = await self._request("POST", "/peers/register", json=body)
return MeshPeer.model_validate(payload)
# ------------------------------------------------------------------
# Message endpoints
# ------------------------------------------------------------------
[docs]
async def fetch(
self,
*,
to_node: Optional[str] = None,
unread_only: bool = False,
limit: Optional[int] = None,
) -> list[MeshMessage]:
"""``GET /messages`` — fetch own inbox.
Defaults ``to_node`` to ``self.node``. Pass an explicit value
only if you have a legitimate reason to peek at another peer's
inbox (gateway access-controls this; most callers will get
their own inbox only).
v0.5.1 latent bug fix (discovered in Phase 5.6 daemon build):
the mesh-gateway query parameter is ``?to=`` — NOT ``?to_node=``.
The latter is silently ignored, returning ALL recent messages
regardless of recipient. Pre-fix every ``MeshClient.fetch``
caller saw outbound bleed-through unless they Python-side
filtered. The kwarg keeps its descriptive name for the
caller-facing API; only the wire-shape changes.
"""
# Defense-in-depth: filter from_node!=self_node client-side too,
# so any future gateway quirk that re-introduces outbound
# bleed-through doesn't silently break consumers (same shape
# as the daemon's filter).
target = to_node or self.node
params: dict[str, Any] = {"to": target}
if unread_only:
params["unread_only"] = "true"
if limit is not None:
params["limit"] = limit
payload = await self._request("GET", "/messages", params=params)
rows = payload.get("messages", payload) if isinstance(payload, dict) else payload
# Client-side defensive filter — only if fetching own inbox
# (i.e., target == self.node). When peeking at another peer's
# inbox via explicit to_node= override, return whatever gateway
# returns without filtering (caller asked for raw view).
if target == self.node:
rows = [r for r in rows if r.get("from_node") != self.node]
return [MeshMessage.model_validate(r) for r in rows]
[docs]
async def send(
self,
*,
to: str,
kind: DM_KIND,
content: str,
related_task_id: Optional[str] = None,
thread_id: Optional[str] = None,
skip_secret_check: bool = False,
) -> MeshMessage:
"""``POST /messages`` — send a DM.
Validates ``to`` via ``swarph_shared.validate_node_name`` (closes
Vector A + B contagion classes); raises ``ValueError`` /
``NotInRegistry`` on invalid recipient names BEFORE the POST.
``kind`` is a ``Literal`` enum (see ``DM_KIND`` above) — type
checkers reject unknown values at lint time, and the runtime
guard below catches anyone passing ``kind`` as ``str``-typed
from a callsite that bypassed type-checking.
Runs the best-effort credential leak detector on ``content``.
Set ``skip_secret_check=True`` only when content has been
inspected by the caller and the leak detector is producing a
false positive (e.g. discussing a credential format in
prose). False positives should be rare; treat them as a
signal that the content is on a fence and rotate aggressively
if you're not 100% sure.
"""
# Runtime kind enum check — defense-in-depth against str-typed
# callsites that slipped past type-checking. Same shape as
# validate_caller fail-fast discipline.
if kind not in {"fyi", "question", "answer", "status", "unblock"}:
raise ValueError(
f"MeshClient.send: kind={kind!r} is not a valid mesh-gateway "
f"DM kind. Accepted: fyi / question / answer / status / unblock. "
f"(Adding new kinds requires gateway-side support; check "
f"server.py POST /messages handler.)"
)
# Recipient validation — strict=False because the gateway may
# not be reachable from caller's perspective at the moment;
# we still want regex + alias resolution.
canonical_to = validate_node_name(to, strict=False)
# Best-effort credential leak guard. Operator can disable for
# legitimate prose mentioning credential shapes.
if not skip_secret_check:
_check_no_secret(content)
body: dict[str, Any] = {
"from_node": self.node,
"to_node": canonical_to,
"kind": kind,
"content": content,
}
if related_task_id is not None:
body["related_task_id"] = related_task_id
if thread_id is not None:
body["thread_id"] = thread_id
payload = await self._request("POST", "/messages", json=body)
return MeshMessage.model_validate(payload)
[docs]
async def mark_read(self, msg_id: int) -> None:
"""``POST /messages/{msg_id}/read`` — flip ``read_at`` on a DM.
Per CLAUDE.md droplet-side mesh DM drain rule, the cron-side
watcher does NOT mark read; only the commander or the
peer-in-session does. Adapter callers consuming inboxes
should mark-read explicitly when they've actually processed
the DM, not on fetch.
"""
await self._request("POST", f"/messages/{msg_id}/read")