QAway-to
Improve crypto summary context and stabilize metrics fetch
5605c33
"""Client helpers for the external portfolio analytics API."""
import re
from typing import Any, Dict, List, Optional
import httpx
from config import (
CACHE_RETRY_SECONDS,
CACHE_TTL_SECONDS,
DEBUG,
EXTERNAL_API_URL,
REQUEST_TIMEOUT,
)
from infrastructure.cache import CacheUnavailableError, TTLCache
# === UUID detection ===
UUID_PATTERN = re.compile(
r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"
)
def extract_portfolio_id(text: str) -> Optional[str]:
"""Extract a portfolio UUID from text or URL."""
match = UUID_PATTERN.search(text or "")
return match.group(0) if match else None
async def _get_json(url: str) -> Dict[str, Any]:
"""Generic helper to send GET request and parse JSON."""
if DEBUG:
print(f"[DEBUG] Requesting URL: {url}")
async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT) as client:
r = await client.get(url, headers={"User-Agent": "Mozilla/5.0", "Accept": "application/json"})
r.raise_for_status()
return r.json()
def _parse_metrics(payload: Dict[str, Any]) -> Dict[str, Any]:
extended = payload.get("data", {}).get("extended", {})
result: Dict[str, Any] = {}
for k, v in extended.items():
if isinstance(v, (int, float)):
if k in {"cagr", "alphaRatio", "volatility", "maxDD"}:
result[k] = v * 100
else:
result[k] = v
return result
async def fetch_metrics_async(portfolio_id: str) -> Optional[Dict[str, Any]]:
"""Fetch portfolio metrics (extended data) asynchronously."""
url = f"{EXTERNAL_API_URL}/portfolio/get?portfolioId={portfolio_id}&extended=1"
try:
data = await _get_json(url)
result = _parse_metrics(data)
if DEBUG:
print(f"[DEBUG] Metrics fetched for {portfolio_id}: {result}")
return result
except Exception as e:
if DEBUG:
print(f"[ERROR] fetch_metrics_async: {e}")
return None
def _get_json_sync(url: str) -> Dict[str, Any]:
"""Synchronous helper mirroring :func:`_get_json`."""
if DEBUG:
print(f"[DEBUG] Requesting URL (sync): {url}")
with httpx.Client(timeout=REQUEST_TIMEOUT) as client:
r = client.get(url, headers={"User-Agent": "Mozilla/5.0", "Accept": "application/json"})
r.raise_for_status()
return r.json()
def fetch_metrics(portfolio_id: str) -> Optional[Dict[str, Any]]:
"""Synchronous helper to fetch metrics for caching loaders."""
url = f"{EXTERNAL_API_URL}/portfolio/get?portfolioId={portfolio_id}&extended=1"
try:
data = _get_json_sync(url)
result = _parse_metrics(data)
if DEBUG:
print(f"[DEBUG] Metrics fetched (sync) for {portfolio_id}: {result}")
return result
except Exception as e:
if DEBUG:
print(f"[ERROR] fetch_metrics: {e}")
return None
_metrics_cache = TTLCache(CACHE_TTL_SECONDS, CACHE_RETRY_SECONDS)
def fetch_metrics_cached(portfolio_id: str) -> Dict[str, Any]:
"""Cached variant with cooldown on upstream failures."""
def _loader() -> Dict[str, Any]:
data = fetch_metrics(portfolio_id)
if not data:
raise CacheUnavailableError(
"Metrics temporarily unavailable from upstream API.",
CACHE_RETRY_SECONDS,
)
return data
return _metrics_cache.get(portfolio_id, _loader)
async def fetch_absolute_pnl_async(portfolio_id: str) -> Optional[List[Dict[str, Any]]]:
"""Fetch absolutePnL daily data."""
url = f"{EXTERNAL_API_URL}/portfolio/get?portfolioId={portfolio_id}&extended=1&step=day"
try:
data = await _get_json(url)
pnl = data.get("data", {}).get("extended", {}).get("absolutePnL", [])
if not isinstance(pnl, list):
if DEBUG:
print("[ERROR] absolutePnL is not a list or missing")
return None
return pnl
except Exception as e:
if DEBUG:
print(f"[ERROR] fetch_absolute_pnl_async: {e}")
return None