Source code for swarph_mesh.discovery

"""Model discovery — `swarph_mesh.discovery` (v0.6.0 architectural promotion).

Per drop's DM #720 direction: replaces the static `PRICING` dict's
implicit "is this model_id real" check with a runtime catalog query.
Closes the silent-mis-attribute trap from drop's #728 obs (a) at the
substrate layer.

Architecture (commander direction 2026-05-09): **AIMLAPI as primary
discovery, per-provider as fallback**.

Primary: ``GET https://api.aimlapi.com/models`` — public, no auth,
~608 entries with structured info (id, developer, contextLength,
maxTokens, type, aliases, tags). 24h TTL cache at module scope.

Fallback: per-provider ``/v1/models`` endpoints when AIMLAPI is
unreachable. Each provider requires its own API key:

  - OpenAI:    ``client.models.list()`` (requires OPENAI_API_KEY)
  - DeepSeek:  same shape via ``base_url`` (requires DEEPSEEK_API_KEY)
  - xAI:       same shape via ``base_url`` (requires XAI_API_KEY)
  - Anthropic: ``GET /v1/models`` (requires ANTHROPIC_API_KEY — note
              this is METERED auth, not the subscription path)
  - Google:    ``models.list()`` via ``google-generativeai`` SDK
              (requires GEMINI_API_KEY)

Fallback fires per-provider; if one provider's key is missing, that
provider's models simply don't appear in the fallback list.

PRICING stays in adapter-local tables (the AIMLAPI ``/models``
response does NOT include pricing — only the HTML pricing page does,
and we won't HTML-scrape). The discovery module exposes ``ModelInfo``
records WITHOUT pricing; callers join against the local PRICING dict
for cost information. Drift detection is a future v0.6.x stretch.
"""

from __future__ import annotations

import json
import logging
import os
import time
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from typing import Optional


logger = logging.getLogger(__name__)


AIMLAPI_MODELS_URL = "https://api.aimlapi.com/models"
DEFAULT_TTL_SECONDS = 24 * 60 * 60  # 24h


[docs] @dataclass class ModelInfo: """One row from the discovery catalog. Provider-agnostic shape; pricing intentionally absent (kept in adapter-local PRICING dicts since AIMLAPI doesn't expose pricing via API).""" id: str developer: str # "Open AI", "Anthropic", "X AI", "DeepSeek AI", "Google", ... context_length: Optional[int] = None max_tokens: Optional[int] = None name: Optional[str] = None description: Optional[str] = None type: Optional[str] = None # "openai/chat-completions", "image", "video", etc. aliases: list[str] = field(default_factory=list) tags: list[str] = field(default_factory=list) source: str = "aimlapi" # "aimlapi" | "openai" | "anthropic" | ...
[docs] def matches_developer(self, dev: str) -> bool: """Loose developer-name match (handles 'Open AI' vs 'OpenAI' vs 'openai' drift across catalogs).""" norm_self = self.developer.lower().replace(" ", "") norm_other = dev.lower().replace(" ", "").replace("-", "") return norm_self == norm_other
# Map our adapter provider names to AIMLAPI's developer field values. # Adapter calls ``list_models(provider="openai")`` → catalog filtered # by developer="Open AI". Updated when AIMLAPI changes their naming. _PROVIDER_TO_DEVELOPER: dict[str, str] = { "openai": "Open AI", "anthropic": "Anthropic", "claude": "Anthropic", # adapter name → developer name "google": "Google", "gemini": "Google", "deepseek": "DeepSeek AI", "xai": "X AI", "grok": "X AI", } # Module-level cache — single in-flight catalog. Reset via # ``invalidate_catalog()`` for tests + force-refresh. _catalog_cache: Optional[list[ModelInfo]] = None _catalog_fetched_at: Optional[float] = None
[docs] def invalidate_catalog() -> None: """Force the next ``list_models`` call to re-fetch from AIMLAPI.""" global _catalog_cache, _catalog_fetched_at _catalog_cache = None _catalog_fetched_at = None
def _is_cache_fresh(ttl_seconds: int) -> bool: if _catalog_cache is None or _catalog_fetched_at is None: return False return (time.time() - _catalog_fetched_at) < ttl_seconds def _parse_aimlapi_entry(entry: dict) -> Optional[ModelInfo]: """Convert one AIMLAPI ``/models`` array entry to ``ModelInfo``. Returns None on entries we can't parse (defensive — catalog has drift between docs + live shape).""" try: info = entry.get("info", {}) return ModelInfo( id=entry["id"], developer=info.get("developer", "?"), context_length=info.get("contextLength") or None, max_tokens=info.get("maxTokens") or None, name=info.get("name"), description=info.get("description"), type=entry.get("type"), aliases=list(entry.get("aliases", []) or []), tags=list(entry.get("tags", []) or []), source="aimlapi", ) except (KeyError, TypeError) as exc: logger.debug("discovery: skipped unparseable AIMLAPI entry: %s", exc) return None def _fetch_aimlapi_catalog(timeout: float = 10.0) -> list[ModelInfo]: """GET ``https://api.aimlapi.com/models``. No auth required. Raises ``urllib.error.URLError`` / ``URLError`` on network failure (callers in ``list_models`` translate this to fallback activation). Empty list returned if response is malformed (rare — AIMLAPI's shape has been stable, but we'd rather emit nothing than crash the caller's adapter dispatch).""" req = urllib.request.Request( AIMLAPI_MODELS_URL, headers={"User-Agent": "swarph-mesh/0.6.0 (discovery)"}, ) with urllib.request.urlopen(req, timeout=timeout) as resp: data = json.loads(resp.read().decode("utf-8")) entries = data.get("data", []) if isinstance(data, dict) else data out: list[ModelInfo] = [] seen_ids: set[str] = set() # AIMLAPI returns dups; dedupe by id for entry in entries: m = _parse_aimlapi_entry(entry) if m is None or m.id in seen_ids: continue seen_ids.add(m.id) out.append(m) return out def _fetch_provider_models_openai_compat( *, base_url: str, api_key: str, source_label: str, developer: str ) -> list[ModelInfo]: """Generic OpenAI-protocol-compatible ``/v1/models`` fallback. Used by OpenAI, DeepSeek, xAI, and any future adapter on the same protocol. Returns empty list on auth failure (treat as "this provider's keys aren't configured").""" try: from openai import OpenAI client = OpenAI(api_key=api_key, base_url=base_url) models = client.models.list() out: list[ModelInfo] = [] for m in models.data: out.append( ModelInfo( id=m.id, developer=developer, type=getattr(m, "object", None), source=source_label, ) ) return out except Exception as exc: logger.debug( "discovery: %s fallback failed (likely missing key): %s", source_label, exc, ) return [] def _fetch_per_provider_fallback(provider: Optional[str]) -> list[ModelInfo]: """Per-provider fallback when AIMLAPI is unreachable. Honors the ``provider`` filter — if specified, only that provider's fallback runs; otherwise tries all providers we have keys for.""" out: list[ModelInfo] = [] targets = ( [provider] if provider else ["openai", "deepseek", "xai", "google", "anthropic"] ) if "openai" in targets: key = os.environ.get("OPENAI_API_KEY") if key: out.extend( _fetch_provider_models_openai_compat( base_url="https://api.openai.com/v1", api_key=key, source_label="openai", developer="Open AI", ) ) if "deepseek" in targets: # Reuse DeepSeekAdapter's resolution logic for the legacy # /home/ubuntu/deepseek/.env file fallback. try: from swarph_mesh.adapters.deepseek import _resolve_api_key key = _resolve_api_key() except ImportError: key = os.environ.get("DEEPSEEK_API_KEY") if key: out.extend( _fetch_provider_models_openai_compat( base_url="https://api.deepseek.com", api_key=key, source_label="deepseek", developer="DeepSeek AI", ) ) if "xai" in targets or "grok" in targets: key = os.environ.get("XAI_API_KEY") or os.environ.get("GROK_API_KEY") if key: out.extend( _fetch_provider_models_openai_compat( base_url="https://api.x.ai/v1", api_key=key, source_label="xai", developer="X AI", ) ) # Anthropic + Google fallbacks deferred — they don't share the # OpenAI-compat shape. v0.6.x stretch when callers need them # AND AIMLAPI is consistently unreachable (low signal so far — # api.aimlapi.com has been responsive through testing). return out def _refresh_catalog(*, ttl_seconds: int) -> list[ModelInfo]: """Refresh module-level cache from AIMLAPI. Only the primary path populates the cache; fallback is per-call (no cache) since per- provider lists are smaller + already cached at the SDK level.""" global _catalog_cache, _catalog_fetched_at try: catalog = _fetch_aimlapi_catalog() _catalog_cache = catalog _catalog_fetched_at = time.time() logger.debug( "discovery: AIMLAPI catalog refreshed (%d entries)", len(catalog) ) return catalog except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc: logger.warning( "discovery: AIMLAPI fetch failed, fallback active: %s", exc, ) return [] # signal to caller that fallback should fire
[docs] def list_models( *, provider: Optional[str] = None, ttl_seconds: int = DEFAULT_TTL_SECONDS, ) -> list[ModelInfo]: """Return models known to the catalog. ``provider`` is the swarph-mesh adapter name ("openai", "deepseek", "claude", "gemini", "grok") which maps to AIMLAPI's developer field via :data:`_PROVIDER_TO_DEVELOPER`. Pass ``None`` for the full catalog. Cache TTL: 24h by default. Pass ``ttl_seconds=0`` to force a fresh fetch (e.g., after publishing a new model and wanting to verify it appears). """ if ttl_seconds == 0 or not _is_cache_fresh(ttl_seconds): catalog = _refresh_catalog(ttl_seconds=ttl_seconds) else: catalog = _catalog_cache or [] # Primary path returned data → filter + return if catalog: if provider is None: return list(catalog) target_dev = _PROVIDER_TO_DEVELOPER.get(provider, provider) return [m for m in catalog if m.matches_developer(target_dev)] # Primary failed → fallback per provider return _fetch_per_provider_fallback(provider)
[docs] def is_model_supported(model_id: str, *, ttl_seconds: int = DEFAULT_TTL_SECONDS) -> bool: """Fast existence check. Used by adapters to validate model_id before dispatching to the underlying SDK — closes the silent-mis- attribute trap (drop's #728 obs (a)) at the substrate layer. Matches against ``id`` AND ``aliases`` (AIMLAPI catalogs both canonical IDs like ``claude-opus-4-7`` and aliased forms like ``openai/gpt-3.5-turbo``).""" catalog = list_models(ttl_seconds=ttl_seconds) for m in catalog: if m.id == model_id or model_id in m.aliases: return True return False
[docs] def get_model_info( model_id: str, *, ttl_seconds: int = DEFAULT_TTL_SECONDS ) -> Optional[ModelInfo]: """Single-model lookup. Returns ``None`` when not in catalog.""" catalog = list_models(ttl_seconds=ttl_seconds) for m in catalog: if m.id == model_id or model_id in m.aliases: return m return None
# =========================================================================== # Centralized model-id normalizers (drop DM #745 obs #1, v0.6.2) # =========================================================================== # # Per-adapter normalizers (``_normalize_xai_id``, ``_normalize_deepseek_id``) # remain in their adapter modules for backward-compat callers. v0.6.2 # centralizes the SAME logic here so future telemetry / shared tooling # (e.g., a future drift-detection cron) can normalize across providers # without per-adapter import dance. import re as _re
[docs] def normalize_xai_id(model_id: str) -> str: """Strip xAI prefix (``x-ai/``) + ``-beta`` + dated build suffixes (``-DD-DD``). See ``swarph_mesh.adapters.grok._normalize_xai_id`` for adapter-local copy with the same semantics.""" if model_id.startswith("x-ai/"): model_id = model_id[len("x-ai/"):] if model_id.endswith("-beta"): model_id = model_id[: -len("-beta")] model_id = _re.sub(r"-\d{2}-\d{2}$", "", model_id) return model_id
[docs] def normalize_deepseek_id(model_id: str) -> str: """Strip DeepSeek prefix (``deepseek/``) + version suffixes (``-v3.1``, ``-v3.2-terminus``). See ``swarph_mesh.adapters.deepseek._normalize_deepseek_id`` for adapter-local copy.""" if model_id.startswith("deepseek/"): model_id = model_id[len("deepseek/"):] model_id = _re.sub(r"-v\d+\.\d+(-\w+)*$", "", model_id) return model_id
[docs] def normalize_model_id(provider: str, model_id: str) -> str: """Provider-aware normalizer. Dispatches to the right per-provider helper. Provider names match the adapter ``name`` field (``"openai"``, ``"grok"``, etc.).""" if provider in ("xai", "grok"): return normalize_xai_id(model_id) if provider == "deepseek": return normalize_deepseek_id(model_id) # Other providers: pass through (no known prefix/suffix to strip) return model_id
# =========================================================================== # Shared retirement registry (drop DM #745 obs #2, v0.6.2) # =========================================================================== # # Per-adapter retirement constants (``_GROK_RETIREMENT_NOTICE``) stay # in their modules for backward-compat. v0.6.2 promotes a SHARED # registry pattern so callers can query "is this model retired today # across any provider?" without per-adapter import dance. import datetime as _dt # Static cross-provider retirement notice. Each entry maps a fully- # qualified ``provider/model_id`` key to the date past which the # model is no longer routable. Update by editing this dict + bumping # ``_RETIREMENT_REGISTRY_VERIFIED_AT``. _RETIREMENT_REGISTRY_VERIFIED_AT = "2026-05-09" _RETIREMENT_REGISTRY: dict[str, str] = { # provider/model_id: retirement_date (ISO YYYY-MM-DD) "grok/grok-4": "2026-05-15", "grok/grok-code-fast-1": "2026-05-15", # Anthropic deprecated models — flagged by Anthropic but still # routable (metered API serves them); listing here surfaces the # status to drift-detection. Retirement date not specified by # Anthropic; using "deprecated" as a sentinel. "claude/claude-sonnet-3-7": "deprecated", "claude/claude-opus-3": "deprecated", }
[docs] def is_retired(provider: str, model_id: str, *, today: Optional[_dt.date] = None) -> bool: """Return True if the model is past its retirement date as of ``today`` (default: actual today UTC). Returns False for unregistered models (not retired) and for ``deprecated`` sentinel entries (still routable, just deprecated).""" key = f"{provider}/{model_id}" retirement = _RETIREMENT_REGISTRY.get(key) if retirement is None or retirement == "deprecated": return False try: retirement_date = _dt.date.fromisoformat(retirement) except ValueError: return False today = today or _dt.datetime.now(_dt.timezone.utc).date() return today >= retirement_date
[docs] def retirement_date(provider: str, model_id: str) -> Optional[str]: """Return the retirement-date string for a provider/model_id, OR None if not in the registry.""" return _RETIREMENT_REGISTRY.get(f"{provider}/{model_id}")
# =========================================================================== # Pricing discovery — heterogeneous per-provider sources (v0.6.0) # =========================================================================== # # AIMLAPI's catalog has NO pricing data. Only Google has a clean # programmatic source today (Cloud Billing Catalog API). Other providers # stay on adapter-local PRICING dicts + verify-after sentinel. # # As of 2026-05-09: # # Provider | Source | Status # -----------|-------------------------------------------|------------------ # Google | Cloud Billing Catalog API (this module) | ✓ live, API-key # OpenAI | /v1/organization/costs (admin key only) | historical USAGE # Anthropic | none public | manual table only # xAI | none public | manual table only # DeepSeek | none public | manual table only # AIMLAPI | HTML pricing page (no API) | manual scrape only # # Future: when more providers expose programmatic pricing, add per-source # fetch helpers below mirroring ``_fetch_gemini_pricing``. # Vertex AI / Gemini service ID in Google's Cloud Billing Catalog. # Reference: gcloud billing services list | grep -i vertex GEMINI_BILLING_SERVICE_ID = "241C-273D-49C8" GEMINI_BILLING_URL_TEMPLATE = ( "https://cloudbilling.googleapis.com/v1/services/{service_id}/skus" ) # Module-level pricing cache, keyed by provider name. 24h TTL like the # catalog. Pricing changes at provider-scheduled cadence (months), so # even 24h is conservative. _pricing_cache: dict[str, list] = {} _pricing_fetched_at: dict[str, float] = {}
[docs] @dataclass class ProviderPricing: """One row of pricing data for a provider+SKU combo. Tiered models (e.g., Gemini Pro at >128K context) surface as multiple ``ProviderPricing`` records — one per tier band — with ``tier_threshold_tokens`` distinguishing them. Callers wanting "the price at N tokens" pick the highest-threshold record where ``tier_threshold_tokens <= N``. """ provider: str # "google" / "gemini" — adapter-name space model_hint: str # description-extracted model fragment sku_id: str sku_description: str input_per_mtok: Optional[float] = None # USD per 1M tokens output_per_mtok: Optional[float] = None # USD per 1M tokens tier_threshold_tokens: int = 0 # 0 = base tier; 128000 = >128K tier usage_unit: Optional[str] = None # "TBy", "MTok", etc. as returned source: str = "google-cloud-billing" verified_at: Optional[str] = None # ISO timestamp of fetch
[docs] def invalidate_pricing(provider: Optional[str] = None) -> None: """Clear pricing cache. ``provider=None`` clears all.""" if provider is None: _pricing_cache.clear() _pricing_fetched_at.clear() else: _pricing_cache.pop(provider, None) _pricing_fetched_at.pop(provider, None)
def _money_to_usd(units: object, nanos: object) -> float: """Convert Google's Money proto-shape to a float USD value. Google returns ``units`` as a string (int64 wire-format), ``nanos`` as an int. Per the spec: total = units + nanos/1e9. """ try: u = int(units) if units else 0 except (ValueError, TypeError): u = 0 try: n = int(nanos) if nanos else 0 except (ValueError, TypeError): n = 0 return u + n / 1_000_000_000.0 def _classify_gemini_sku(description: str) -> tuple[Optional[str], int]: """Parse a Gemini SKU description into ``(direction, tier_threshold)``. Heuristics from observed SKU descriptions (subject to drift; revisit when verify-after sentinel fires): - ``Input``/``Prompt`` → input direction - ``Output``/``Completion`` → output direction - ``Greater than 128k`` / ``Long Context`` / ``200k`` → tier_threshold=128000 - Otherwise base tier (0) Returns ``(None, 0)`` on unrecognized SKU shape (caller skips). """ desc_lower = description.lower() direction: Optional[str] = None if "input" in desc_lower or "prompt" in desc_lower: direction = "input" elif "output" in desc_lower or "completion" in desc_lower: direction = "output" # Tier detection — Gemini 1.5 Pro doubles pricing past 128K tier = 0 if any( marker in desc_lower for marker in ("greater than 128k", "long context", "200k", "1m context") ): tier = 128000 return direction, tier def _parse_gemini_sku(sku: dict, *, verified_at: str) -> list[ProviderPricing]: """Convert one SKU JSON object to zero-or-more ``ProviderPricing`` records. SKUs with multiple ``tieredRates`` produce one record per tier. SKUs that don't match the Gemini-pricing shape return ``[]``. """ description = sku.get("description", "") if "gemini" not in description.lower(): return [] # not a Gemini SKU direction, sku_tier_threshold = _classify_gemini_sku(description) if direction is None: return [] # unrecognized direction; skip rather than guess sku_id = sku.get("skuId", "") pricing_info = sku.get("pricingInfo", []) if not pricing_info: return [] expression = pricing_info[0].get("pricingExpression", {}) usage_unit = expression.get("usageUnit") base_unit = expression.get("baseUnit") # SKUs are usually priced per-byte (1Tby = 1e12 bytes) or per-character. # For Gemini token pricing we want per-Mtok. The API exposes # ``displayQuantity`` + ``usageUnit`` like {"displayQuantity": 1.0, # "usageUnit": "1MCT"} = "1 million characters". For tokens, watch # for "MTok" or "1MTok". tiered_rates = expression.get("tieredRates", []) out: list[ProviderPricing] = [] for rate in tiered_rates: unit_price = rate.get("unitPrice", {}) per_unit_usd = _money_to_usd( unit_price.get("units"), unit_price.get("nanos") ) # tieredRates expose startUsageAmount as the kick-in threshold # in the SKU's usage unit (not tokens directly). The # base/expensive split for >128K Gemini context is encoded as # SEPARATE SKUs with description containing "Greater than # 128k", not as tiered rates within a single SKU. So we use # the description-derived tier_threshold here, falling back to # the rate's startUsageAmount only as a hint. rate_threshold = sku_tier_threshold record = ProviderPricing( provider="gemini", model_hint=_extract_model_from_description(description), sku_id=sku_id, sku_description=description, usage_unit=usage_unit, tier_threshold_tokens=rate_threshold, source="google-cloud-billing", verified_at=verified_at, ) # Convert per-unit-usd to per-Mtok based on usage_unit. Tokens- # per-Mtok = per-unit * 1e6 / unit_size_in_tokens. For now we # document the per-unit price under the relevant direction and # leave the per-Mtok normalization to the caller until we've # observed enough live SKUs to encode the unit conversions safely. if direction == "input": record.input_per_mtok = per_unit_usd else: record.output_per_mtok = per_unit_usd out.append(record) return out def _extract_model_from_description(description: str) -> str: """Best-effort model-name extraction from SKU description. SKU descriptions look like ``"Gemini 1.5 Pro Input Tokens (Greater than 128k)"``. We extract the model fragment between "Gemini" and the direction word. Conservative — returns the full description on parse failure so callers can match on substring.""" desc = description # Trim leading "Gemini" prefix if present lower = desc.lower() idx = lower.find("gemini") if idx >= 0: # Stop at the first direction word rest = desc[idx:] for stop in ("Input", "Output", "Prompt", "Completion"): cut = rest.find(stop) if cut > 0: return rest[:cut].strip() return rest.strip() return desc.strip() def _fetch_gemini_pricing_page( *, api_key: str, page_token: Optional[str] = None, timeout: float = 15.0 ) -> dict: """Fetch one page of Cloud Billing SKUs for the Gemini service. Returns the raw JSON dict; callers handle pagination via the ``nextPageToken`` field.""" url = GEMINI_BILLING_URL_TEMPLATE.format( service_id=GEMINI_BILLING_SERVICE_ID ) params = [f"key={api_key}", "pageSize=500"] if page_token: params.append(f"pageToken={page_token}") full_url = f"{url}?{'&'.join(params)}" req = urllib.request.Request( full_url, headers={"User-Agent": "swarph-mesh/0.6.0 (discovery.pricing)"}, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8"))
[docs] def fetch_gemini_pricing( *, api_key: Optional[str] = None, ttl_seconds: int = DEFAULT_TTL_SECONDS, ) -> list[ProviderPricing]: """Hit Cloud Billing Catalog API, return parsed ``ProviderPricing`` records for all Gemini SKUs. Auth: ``api_key`` parameter, OR ``$GOOGLE_CLOUD_BILLING_API_KEY`` env, OR ``$GOOGLE_CLOUD_API_KEY`` env. Note: this is a **separate API key** from the Generative AI ``GEMINI_API_KEY`` used by the chat adapter — Cloud Billing requires Cloud Console project keys with billing-API scope enabled. Operators provisioning need: gcloud services enable cloudbilling.googleapis.com # then create an API key in Cloud Console with Cloud Billing # API restriction; export as GOOGLE_CLOUD_BILLING_API_KEY Returns empty list on auth failure (treat as "billing-API key not configured" — adapter PRICING tables stay authoritative). Pagination: Google returns up to 5000 SKUs per page; service ``241C-273D-49C8`` typically has < 500 entries so a single page suffices. We still page defensively for forward-compat. """ resolved_key = ( api_key or os.environ.get("GOOGLE_CLOUD_BILLING_API_KEY") or os.environ.get("GOOGLE_CLOUD_API_KEY") ) if not resolved_key: logger.debug( "discovery.pricing: GOOGLE_CLOUD_BILLING_API_KEY not set; " "Gemini pricing fetch skipped" ) return [] # Cache hit if ( ttl_seconds > 0 and "gemini" in _pricing_cache and (time.time() - _pricing_fetched_at.get("gemini", 0)) < ttl_seconds ): return list(_pricing_cache["gemini"]) import datetime as _dt verified_at = _dt.datetime.now(_dt.timezone.utc).isoformat() out: list[ProviderPricing] = [] page_token: Optional[str] = None pages_fetched = 0 max_pages = 20 # safety bound try: while pages_fetched < max_pages: data = _fetch_gemini_pricing_page( api_key=resolved_key, page_token=page_token ) for sku in data.get("skus", []): out.extend(_parse_gemini_sku(sku, verified_at=verified_at)) page_token = data.get("nextPageToken") or None pages_fetched += 1 if not page_token: break except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc: logger.warning( "discovery.pricing: Gemini Cloud Billing fetch failed: %s", exc ) return [] _pricing_cache["gemini"] = out _pricing_fetched_at["gemini"] = time.time() logger.debug( "discovery.pricing: fetched %d Gemini pricing records (%d pages)", len(out), pages_fetched, ) return out
# --------------------------------------------------------------------------- # Anthropic pricing — manual table from claude.com/pricing # --------------------------------------------------------------------------- # # Anthropic does NOT expose a programmatic pricing endpoint. Source of # truth is the public docs page; we mirror it here with explicit # `verified_at` so drift is visible. Update by re-pasting the official # table and bumping `_ANTHROPIC_PRICING_VERIFIED_AT`. # # Full pricing dimensions captured per model: # - base input (per Mtok) # - 5m cache write (1.25x base) # - 1h cache write (2x base) # - cache hit / refresh (0.1x base) # - output (per Mtok) # # These multipliers are part of the published pricing model; the # explicit values are kept rather than computed so the table acts as # its own audit record. _ANTHROPIC_PRICING_VERIFIED_AT = "2026-05-09" # Tuple shape: (base_input, cache_write_5m, cache_write_1h, cache_hit, output) # v0.6.1 adds dated-build aliases (e.g. claude-opus-4-5-20251101) so # AIMLAPI catalog IDs resolve here too. Canonical + dated forms point # to identical pricing. _ANTHROPIC_PRICING: dict[str, tuple[float, float, float, float, float]] = { # Canonical short-form IDs "claude-opus-4-7": (5.00, 6.25, 10.00, 0.50, 25.00), "claude-opus-4-6": (5.00, 6.25, 10.00, 0.50, 25.00), "claude-opus-4-5": (5.00, 6.25, 10.00, 0.50, 25.00), "claude-opus-4-1": (15.00, 18.75, 30.00, 1.50, 75.00), "claude-opus-4": (15.00, 18.75, 30.00, 1.50, 75.00), "claude-sonnet-4-6": (3.00, 3.75, 6.00, 0.30, 15.00), "claude-sonnet-4-5": (3.00, 3.75, 6.00, 0.30, 15.00), "claude-sonnet-4": (3.00, 3.75, 6.00, 0.30, 15.00), "claude-sonnet-3-7": (3.00, 3.75, 6.00, 0.30, 15.00), # deprecated "claude-haiku-4-5": (1.00, 1.25, 2.00, 0.10, 5.00), "claude-haiku-3-5": (0.80, 1.00, 1.60, 0.08, 4.00), "claude-opus-3": (15.00, 18.75, 30.00, 1.50, 75.00), # deprecated "claude-haiku-3": (0.25, 0.30, 0.50, 0.03, 1.25), # v0.6.1 — dated-build aliases (AIMLAPI catalog format). # Each maps to the same pricing as its canonical short form. "claude-opus-4-5-20251101": (5.00, 6.25, 10.00, 0.50, 25.00), "claude-opus-4-1-20250805": (15.00, 18.75, 30.00, 1.50, 75.00), "claude-opus-4-20250514": (15.00, 18.75, 30.00, 1.50, 75.00), "claude-sonnet-4-5-20250929": (3.00, 3.75, 6.00, 0.30, 15.00), "claude-sonnet-4-20250514": (3.00, 3.75, 6.00, 0.30, 15.00), "claude-haiku-4-5-20251001": (1.00, 1.25, 2.00, 0.10, 5.00), }
[docs] def pricing_for_anthropic_model(model_id: str) -> Optional[ProviderPricing]: """Return a ``ProviderPricing`` record for an Anthropic model. Source: ``_ANTHROPIC_PRICING`` static table (manually verified against claude.com/pricing on ``_ANTHROPIC_PRICING_VERIFIED_AT``). Returns ``None`` for unknown model_ids — callers should fall back to the adapter-local PRICING table (which mirrors a subset of this table for the modern model lineup). The ``ProviderPricing`` shape carries all five Anthropic price dimensions in custom attributes; ``input_per_mtok`` is the base input rate, ``output_per_mtok`` is the output rate. Cache writes + hits are exposed via ``raw_pricing`` for callers that care. """ pricing = _ANTHROPIC_PRICING.get(model_id) if pricing is None: return None base_in, cache_5m, cache_1h, cache_hit, out = pricing record = ProviderPricing( provider="anthropic", model_hint=model_id, sku_id=f"anthropic-docs::{model_id}", sku_description=f"Anthropic {model_id} (manual table from claude.com/pricing)", input_per_mtok=base_in, output_per_mtok=out, usage_unit="MTok", source="anthropic-docs-manual", verified_at=_ANTHROPIC_PRICING_VERIFIED_AT, ) # Stash cache-pricing dimensions on the record for callers who # need them. Surfaced as plain attributes so dataclass equality # stays straightforward; canonical Anthropic 5-tuple preserved. record.cache_write_5m_per_mtok = cache_5m # type: ignore[attr-defined] record.cache_write_1h_per_mtok = cache_1h # type: ignore[attr-defined] record.cache_hit_per_mtok = cache_hit # type: ignore[attr-defined] return record
[docs] def list_anthropic_pricing() -> list[ProviderPricing]: """Return ``ProviderPricing`` records for every model in the static Anthropic table. Used by drift-detection cron + audit surfaces.""" return [ pricing_for_anthropic_model(model_id) for model_id in _ANTHROPIC_PRICING if pricing_for_anthropic_model(model_id) is not None ]
# --------------------------------------------------------------------------- # DeepSeek balance — current credit snapshot (different shape from # OpenAI/xAI which expose historical per-day buckets) # --------------------------------------------------------------------------- # # DeepSeek does NOT expose a historical-usage API at well-known paths. # What they DO expose is ``/user/balance`` — a snapshot of currently- # available credits. Authoritative per-token pricing for the table in # ``swarph_mesh.adapters.deepseek.PRICING`` lives at: # # https://api-docs.deepseek.com/quick_start/pricing/ # # Re-verify when ``_DEEPSEEK_PRICING_VERIFIED_AT`` (in adapter module) # crosses the verify-after threshold. To approximate historical spend, # callers can record balance at periodic checkpoints and diff over time: # # t0 = fetch_deepseek_balance().total_balance # ... do work ... # t1 = fetch_deepseek_balance().total_balance # approx_spent_in_window = t0 - t1 # # Auth: regular ``DEEPSEEK_API_KEY`` (NO admin/management key class — # DeepSeek doesn't have one). Same key the chat adapter uses. DEEPSEEK_BALANCE_URL = "https://api.deepseek.com/user/balance"
[docs] @dataclass class DeepSeekBalance: """Snapshot of a DeepSeek account's credit balance. Unlike OpenAI's ``CostBucket`` and xAI's ``XAICostBucket``, DeepSeek's API exposes only current balance — not historical per-day usage. Use ``fetched_at`` to track snapshots over time; diff successive balances to approximate windowed spend. """ total_balance: float granted_balance: float topped_up_balance: float currency: str = "USD" is_available: bool = True fetched_at: Optional[str] = None # ISO timestamp of fetch raw: Optional[dict] = None
[docs] def fetch_deepseek_balance( *, api_key: Optional[str] = None, timeout: float = 10.0 ) -> Optional[DeepSeekBalance]: """Hit ``GET /user/balance`` on DeepSeek for current credit state. ``api_key`` resolves from arg → ``$DEEPSEEK_API_KEY`` env → ``/home/ubuntu/deepseek/.env`` legacy fallback (matches ``DeepSeekAdapter._resolve_api_key`` shape). The Anthropic-protocol endpoint at ``/anthropic/v1/messages`` is documented in the adapter module docstring but not used by this balance primitive. Returns ``None`` if no key is configured OR the call fails. Note: DeepSeek can have multiple ``balance_infos`` entries (different currencies). We return the USD entry; callers needing other currencies can inspect ``raw``. """ # Reuse adapter's resolution (env + legacy file fallback) try: from swarph_mesh.adapters.deepseek import ( _resolve_api_key as _adapter_resolve, ) key = api_key or _adapter_resolve() except ImportError: key = api_key or os.environ.get("DEEPSEEK_API_KEY") if not key: logger.debug( "discovery.balance: DEEPSEEK_API_KEY unset; balance fetch skipped" ) return None req = urllib.request.Request( DEEPSEEK_BALANCE_URL, headers={ "Authorization": f"Bearer {key}", "User-Agent": "swarph-mesh/0.6.1 (deepseek-balance)", }, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: data = json.loads(resp.read().decode("utf-8")) except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError) as exc: logger.warning("discovery.balance: DeepSeek fetch failed: %s", exc) return None import datetime as _dt fetched_at = _dt.datetime.now(_dt.timezone.utc).isoformat() balance_infos = data.get("balance_infos", []) # Find USD entry; fall back to first entry if no USD usd_entry = next( (b for b in balance_infos if b.get("currency") == "USD"), balance_infos[0] if balance_infos else None, ) if not usd_entry: return DeepSeekBalance( total_balance=0.0, granted_balance=0.0, topped_up_balance=0.0, is_available=bool(data.get("is_available", False)), fetched_at=fetched_at, raw=data, ) # DeepSeek returns balance values as strings (per the live response); # convert to floats return DeepSeekBalance( total_balance=float(usd_entry.get("total_balance", 0)), granted_balance=float(usd_entry.get("granted_balance", 0)), topped_up_balance=float(usd_entry.get("topped_up_balance", 0)), currency=usd_entry.get("currency", "USD"), is_available=bool(data.get("is_available", True)), fetched_at=fetched_at, raw=data, )
# --------------------------------------------------------------------------- # xAI cost reconciliation — management-key gated (privilege boundary) # --------------------------------------------------------------------------- # # xAI exposes a Management API at ``https://management-api.x.ai`` with # team-scoped billing endpoints. The usage endpoint is more sophisticated # than OpenAI's costs (POST body with timezone + aggregation + grouping # vs simple query params). # # Privilege class: management keys can manage API keys, view billing # data, configure spending limits. Same blast radius shape as OpenAI # admin keys — env-only, never on-disk. # # Resolution: ``XAI_MANAGEMENT_KEY`` env (Bearer auth) + ``XAI_TEAM_ID`` # env (path scope; obtained from xAI console settings — not discoverable # via API per the auth docs). Both must be set; missing either returns # empty list. XAI_MANAGEMENT_BASE_URL = "https://management-api.x.ai" def _resolve_xai_management_key(arg: Optional[str]) -> Optional[str]: if arg: return arg return os.environ.get("XAI_MANAGEMENT_KEY") def _resolve_xai_team_id(arg: Optional[str]) -> Optional[str]: if arg: return arg return os.environ.get("XAI_TEAM_ID")
[docs] @dataclass class XAICostBucket: """One time-bucket of xAI usage data per ``/v1/billing/teams/.../usage``. xAI's response shape exposes ``line_items`` per bucket, each with a model description (e.g., "Chat grok-4-0709") and aggregated usage. We extract sum-style cost data into ``total_usd`` when available. """ start_time: str # ISO timestamp (xAI returns these) end_time: str total_usd: float = 0.0 line_items: list = field(default_factory=list) raw_bucket: Optional[dict] = None
def _to_xai_datetime(ts: str) -> str: """Convert ISO 8601 timestamps to xAI's expected ``YYYY-MM-DD HH:MM:SS`` format (no T, no timezone suffix). Accepts ``2026-05-01T00:00:00Z`` and ``2026-05-01T00:00:00+00:00`` and the bare format itself.""" if "T" not in ts: return ts # already in xAI format # Strip timezone suffix (Z or ±HH:MM) and replace T with space stripped = ts.replace("T", " ") for suffix in ("Z", "+00:00"): if stripped.endswith(suffix): stripped = stripped[: -len(suffix)] break # Handle other timezone offsets — drop everything from + or last - of # time portion (preserve the date's hyphens) if " " in stripped: date_part, time_part = stripped.split(" ", 1) # Strip ±HH:MM from time_part for sign in ("+", "-"): idx = time_part.find(sign) if idx > 0: time_part = time_part[:idx] break stripped = f"{date_part} {time_part}" return stripped
[docs] def fetch_xai_cost_buckets( *, start_time: str, # ISO 8601 OR "YYYY-MM-DD HH:MM:SS" end_time: str, management_key: Optional[str] = None, team_id: Optional[str] = None, time_unit: str = "TIME_UNIT_DAY", # xAI enum: TIME_UNIT_DAY/HOUR/MONTH/etc. timezone: str = "Etc/GMT", timeout: float = 15.0, ) -> list[XAICostBucket]: """Hit xAI's ``POST /v1/billing/teams/{team_id}/usage`` for a date range. ``start_time`` / ``end_time`` accept either ISO 8601 (e.g., ``"2026-05-01T00:00:00Z"``) or xAI's native format (``"2026-05-01 00:00:00"``). ISO inputs are converted internally; timezone is supplied via the separate ``timezone`` parameter (default ``"Etc/GMT"``). ``time_unit`` is one of xAI's enum strings: ``TIME_UNIT_DAY``, ``TIME_UNIT_HOUR``, ``TIME_UNIT_MONTH``, ``TIME_UNIT_CALENDAR_WEEK``, ``TIME_UNIT_QUARTER_HOUR``, ``TIME_UNIT_MINUTE``, ``TIME_UNIT_SECOND``, ``TIME_UNIT_NONE``. ``management_key`` resolves from arg → ``$XAI_MANAGEMENT_KEY`` env. ``team_id`` resolves from arg → ``$XAI_TEAM_ID`` env (obtained from xAI console — not discoverable via API). PRIVILEGE BOUNDARY: management keys can manage API keys, view billing, configure spending limits. swarph-mesh does NOT persist the key to disk — env-only. Returns ``[]`` if either credential is missing OR the call fails (4xx/5xx/network). """ key = _resolve_xai_management_key(management_key) tid = _resolve_xai_team_id(team_id) if not key or not tid: logger.debug( "discovery.cost_reconciliation: XAI_MANAGEMENT_KEY or " "XAI_TEAM_ID unset; xAI usage fetch skipped" ) return [] url = f"{XAI_MANAGEMENT_BASE_URL}/v1/billing/teams/{tid}/usage" # xAI's documented body shape (verified against docs.x.ai). Wrapped # in `analyticsRequest`; uses camelCase + enum-string values; date # format is "YYYY-MM-DD HH:MM:SS" (NOT ISO 8601 despite docs page # mentioning ISO elsewhere). body = { "analyticsRequest": { "timeRange": { "startTime": _to_xai_datetime(start_time), "endTime": _to_xai_datetime(end_time), "timezone": timezone, }, "timeUnit": time_unit, "values": [ {"name": "usd", "aggregation": "AGGREGATION_SUM"}, ], "groupBy": ["description"], "filters": [], } } req = urllib.request.Request( url, data=json.dumps(body).encode("utf-8"), method="POST", headers={ "Authorization": f"Bearer {key}", "Content-Type": "application/json", "User-Agent": "swarph-mesh/0.6.1 (xai-cost-reconciliation)", }, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: data = json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as exc: try: err_body = json.loads(exc.read().decode("utf-8")) except Exception: err_body = {"detail": str(exc)} logger.warning( "discovery.cost_reconciliation: xAI returned %d: %s", exc.code, err_body, ) return [] except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc: logger.warning( "discovery.cost_reconciliation: xAI fetch failed: %s", exc ) return [] # xAI's actual response shape (verified against live smoke # 2026-05-09): timeSeries[] grouped by model, each with # dataPoints[] per time bucket. Pivot to per-day buckets that # aggregate across all model groups, preserving per-model # breakdown in line_items so callers can drill in. # # { # "timeSeries": [ # { # "group": ["Chat grok-3-mini"], # "groupLabels": ["Chat grok-3-mini"], # "dataPoints": [ # {"timestamp": "2026-05-08T00:00:00Z", "values": [0.00027585]}, # ... # ] # } # ], # "limitReached": false # } series = data.get("timeSeries", []) or [] # Aggregate per timestamp across all groups by_timestamp: dict[str, dict] = {} for s in series: if not isinstance(s, dict): continue group_label = ( s.get("groupLabels", s.get("group", ["?"]))[0] if s.get("groupLabels") or s.get("group") else "?" ) for dp in s.get("dataPoints", []): ts = dp.get("timestamp", "") values = dp.get("values", []) # Each value position corresponds to one entry in our # request's `values[]` array; we requested only ``usd`` # SUM, so values[0] is total USD for this group+bucket. usd = float(values[0]) if values else 0.0 entry = by_timestamp.setdefault( ts, {"start_time": ts, "end_time": ts, "total_usd": 0.0, "line_items": []} ) entry["total_usd"] += usd if usd > 0: entry["line_items"].append( {"description": group_label, "usd": usd} ) out: list[XAICostBucket] = [] for ts in sorted(by_timestamp.keys()): e = by_timestamp[ts] out.append( XAICostBucket( start_time=e["start_time"], end_time=e["end_time"], total_usd=e["total_usd"], line_items=e["line_items"], raw_bucket=None, ) ) return out
[docs] def reconcile_xai_cost( *, start_time: str, end_time: str, swarph_attributed_usd: Optional[float] = None, management_key: Optional[str] = None, team_id: Optional[str] = None, ) -> dict: """xAI parallel of :func:`reconcile_openai_cost`. Compares xAI's actual billed costs against swarph-mesh's attribution.jsonl total and returns drift report. Returns same shape as ``reconcile_openai_cost`` but with ``xai_actual_usd`` instead of ``openai_actual_usd``. """ buckets = fetch_xai_cost_buckets( start_time=start_time, end_time=end_time, management_key=management_key, team_id=team_id, ) actual = sum(b.total_usd for b in buckets) drift_usd: Optional[float] = None drift_pct: Optional[float] = None if swarph_attributed_usd is not None and swarph_attributed_usd > 0: drift_usd = actual - swarph_attributed_usd drift_pct = (drift_usd / swarph_attributed_usd) * 100.0 return { "xai_actual_usd": actual, "swarph_attributed_usd": swarph_attributed_usd, "drift_usd": drift_usd, "drift_pct": drift_pct, "buckets": buckets, "window_start": start_time, "window_end": end_time, }
# --------------------------------------------------------------------------- # OpenAI cost reconciliation — admin-key gated (privilege boundary) # --------------------------------------------------------------------------- # # OpenAI's ``/v1/organization/costs`` endpoint returns historical USAGE # data (not list pricing) bucketed by day. Authenticates with admin # keys ONLY (sk-admin-...), which are higher-privilege than regular # sk- or sk-proj- keys — admin keys can mint MORE admin keys, delete # keys, manage org settings. Bigger blast radius. # # Storage discipline (privilege-boundary): admin key is read from # ``$OPENAI_ADMIN_KEY`` env ONLY. swarph-mesh does NOT persist it to # disk anywhere — operators paste at daemon boot if reconciliation is # wanted, the env-var lives in the process for that session, gone at # restart. Mirrors the subscription-credentials-out-of-band discipline # from the Claude adapter. # # Use case: periodic verification that our local PRICING tables are # accurate. Compare swarph-mesh's attribution.jsonl recorded costs # against OpenAI's bucketed actual costs; drift = stale PRICING OR # token-counting bug. Out of band of the chat path — costs are # fetched on-demand, not on every chat call. OPENAI_COSTS_URL = "https://api.openai.com/v1/organization/costs"
[docs] @dataclass class CostBucket: """One day's worth of OpenAI cost data per ``/v1/organization/costs``. The endpoint returns up to 7 buckets by default (configurable via ``limit``). Each bucket aggregates spend across the bucket window. Optional groupings (line_item / project_id / api_key_id) populate the breakdown dicts. """ start_time: int # unix seconds end_time: int total_usd: float # aggregated cost across all line_items in the bucket currency: str = "usd" line_item_breakdown: dict[str, float] = field(default_factory=dict) project_breakdown: dict[str, float] = field(default_factory=dict) api_key_breakdown: dict[str, float] = field(default_factory=dict) raw_results: list = field(default_factory=list)
def _resolve_openai_admin_key(arg: Optional[str]) -> Optional[str]: """Admin-key resolution — env-only, never on-disk.""" if arg: return arg return os.environ.get("OPENAI_ADMIN_KEY") def _parse_cost_amount(amount: Optional[dict]) -> tuple[float, str]: """Money proto → (USD, currency).""" if not amount: return 0.0, "usd" value = amount.get("value") if value is None: # Fall back to units+nanos shape if value missing return _money_to_usd(amount.get("units"), amount.get("nanos")), amount.get("currency", "usd") return float(value), amount.get("currency", "usd") def _parse_cost_bucket(bucket: dict) -> CostBucket: """Convert one bucket from the /v1/organization/costs response shape.""" results = bucket.get("results", []) cb = CostBucket( start_time=bucket.get("start_time", 0), end_time=bucket.get("end_time", 0), total_usd=0.0, raw_results=results, ) for r in results: if r.get("object") != "organization.costs.result": continue # skip usage-shape results, only sum pure cost rows amount_usd, currency = _parse_cost_amount(r.get("amount")) cb.total_usd += amount_usd cb.currency = currency # Group breakdowns when fields are populated line_item = r.get("line_item") if line_item: cb.line_item_breakdown[line_item] = ( cb.line_item_breakdown.get(line_item, 0.0) + amount_usd ) project = r.get("project_id") if project: cb.project_breakdown[project] = ( cb.project_breakdown.get(project, 0.0) + amount_usd ) api_key = r.get("api_key_id") if api_key: cb.api_key_breakdown[api_key] = ( cb.api_key_breakdown.get(api_key, 0.0) + amount_usd ) return cb
[docs] def fetch_openai_cost_buckets( *, start_time: int, end_time: Optional[int] = None, admin_key: Optional[str] = None, group_by: Optional[list[str]] = None, limit: int = 7, timeout: float = 15.0, ) -> list[CostBucket]: """Hit OpenAI's ``/v1/organization/costs`` for a date range. ``start_time`` / ``end_time`` are Unix-seconds timestamps; bucket width is 1d (the only supported value as of 2026-05). ``group_by`` accepts any subset of ``["line_item", "project_id", "api_key_id"]``. Default is no grouping (just total per bucket). ``admin_key`` resolves from arg → ``$OPENAI_ADMIN_KEY`` env. If no key is found, returns ``[]`` (treat as "reconciliation not configured" — discovery.pricing primitives stay authoritative). PRIVILEGE BOUNDARY: admin keys can mint more admin keys, delete keys, and manage org settings. swarph-mesh does NOT persist this key to disk — operators paste at daemon boot if reconciliation is wanted, and the env-var lives in the process for that session only. Treat this function as a privileged read; do NOT call it inside hot loops or chat paths. Returns ``list[CostBucket]`` in chronological order (oldest first). """ key = _resolve_openai_admin_key(admin_key) if not key: logger.debug( "discovery.cost_reconciliation: OPENAI_ADMIN_KEY not set; " "cost fetch skipped" ) return [] params = [f"start_time={start_time}", f"limit={limit}", "bucket_width=1d"] if end_time is not None: params.append(f"end_time={end_time}") if group_by: for g in group_by: params.append(f"group_by={g}") url = f"{OPENAI_COSTS_URL}?{'&'.join(params)}" out: list[CostBucket] = [] page_token: Optional[str] = None pages = 0 max_pages = 20 while pages < max_pages: page_url = url + (f"&page={page_token}" if page_token else "") req = urllib.request.Request( page_url, headers={ "Authorization": f"Bearer {key}", "User-Agent": "swarph-mesh/0.6.0 (cost-reconciliation)", }, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: data = json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as exc: try: err_body = json.loads(exc.read().decode("utf-8")) except Exception: err_body = {"detail": str(exc)} logger.warning( "discovery.cost_reconciliation: OpenAI returned %d: %s", exc.code, err_body, ) return out except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc: logger.warning( "discovery.cost_reconciliation: fetch failed: %s", exc ) return out for bucket in data.get("data", []): out.append(_parse_cost_bucket(bucket)) if not data.get("has_more"): break page_token = data.get("next_page") if not page_token: break pages += 1 out.sort(key=lambda b: b.start_time) return out
[docs] def reconcile_openai_cost( *, start_time: int, end_time: Optional[int] = None, swarph_attributed_usd: Optional[float] = None, admin_key: Optional[str] = None, ) -> dict: """Fetch OpenAI's actual costs for a window + return a drift report against swarph-mesh's attribution.jsonl-recorded total. Returns dict:: { "openai_actual_usd": float, # what OpenAI billed "swarph_attributed_usd": float | None, # what we recorded "drift_usd": float | None, # actual - attributed "drift_pct": float | None, # (drift / attributed) * 100 "buckets": list[CostBucket], # per-day breakdown "window_start": int, # unix seconds "window_end": int | None, } ``swarph_attributed_usd=None`` skips the comparison (just returns actual costs). Caller is responsible for summing the relevant attribution.jsonl rows for the same window. """ buckets = fetch_openai_cost_buckets( start_time=start_time, end_time=end_time, admin_key=admin_key, ) actual = sum(b.total_usd for b in buckets) drift_usd: Optional[float] = None drift_pct: Optional[float] = None if swarph_attributed_usd is not None and swarph_attributed_usd > 0: drift_usd = actual - swarph_attributed_usd drift_pct = (drift_usd / swarph_attributed_usd) * 100.0 return { "openai_actual_usd": actual, "swarph_attributed_usd": swarph_attributed_usd, "drift_usd": drift_usd, "drift_pct": drift_pct, "buckets": buckets, "window_start": start_time, "window_end": end_time, }
[docs] def pricing_for_gemini_model( model_hint: str, *, direction: str = "output", tier_threshold_tokens: int = 0, api_key: Optional[str] = None, ttl_seconds: int = DEFAULT_TTL_SECONDS, ) -> Optional[float]: """Return USD-per-Mtok for a Gemini model+direction+tier combination. ``model_hint`` is matched as a substring against SKU description's model fragment (e.g., ``"1.5 Pro"`` matches ``"Gemini 1.5 Pro"``). ``direction`` is ``"input"`` or ``"output"``. ``tier_threshold_tokens`` is 0 for base, 128000 for >128K. Returns ``None`` when no matching SKU is found OR billing-API key is unavailable. Callers fall back to the adapter-local PRICING table (``swarph_mesh.adapters.gemini.PRICING``). """ records = fetch_gemini_pricing(api_key=api_key, ttl_seconds=ttl_seconds) matches = [ r for r in records if model_hint.lower() in r.model_hint.lower() and r.tier_threshold_tokens == tier_threshold_tokens ] if not matches: return None if direction == "input": for r in matches: if r.input_per_mtok is not None: return r.input_per_mtok else: for r in matches: if r.output_per_mtok is not None: return r.output_per_mtok return None