"""Redis-backed live-data adapters for strategy enrichment.""" from __future__ import annotations import logging import os import json import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Callable import requests from src.infrastructure.redis_cache import RedisJSONCache logger = logging.getLogger(__name__) try: from prometheus_client import Counter except Exception: # pragma: no cover - metric dependency may be unavailable at runtime. Counter = None # type: ignore[assignment] _DEFI_LLAMA_PROTOCOL = "https://api.llama.fi/protocol" _DEFI_LLAMA_CHAINS = "https://api.llama.fi/v2/chains" _DEFI_LLAMA_YIELDS = "https://yields.llama.fi/pools" _TIMEOUT = 8 _HTTP_RETRIES = 3 _RETRY_BASE_SLEEP = 0.25 _TTL = int(os.getenv("STRATEGY_LIVE_DATA_TTL_SEC", "120")) _STALE_TTL = int(os.getenv("STRATEGY_LIVE_DATA_STALE_TTL_SEC", "600")) _ERROR_TTL = int(os.getenv("STRATEGY_LIVE_DATA_ERROR_TTL_SEC", "30")) _MICRO_TTL = 3 _MICRO_CACHE: dict[str, tuple[Any, float]] = {} _CACHE = RedisJSONCache.instance() _CB_FAILURE_THRESHOLD = 3 _CB_WINDOW_SEC = 120 _CB_COOLDOWN_SEC = 60 _DEFAULT_NETWORK = (os.getenv("STRATEGY_DEFAULT_NETWORK", "avalanche") or "avalanche").strip().lower() _SUPPORTED_NETWORKS = {"avalanche", "base"} _NETWORK_CHAIN_LABELS: dict[str, dict[str, Any]] = { "avalanche": {"name": "Avalanche", "aliases": ("avalanche", "avax"), "registry": "strategy_registry.avalanche.json"}, "base": {"name": "Base", "aliases": ("base",), "registry": "strategy_registry.base.json"}, } _PROTOCOL_API_ENDPOINTS: dict[str, str] = { "Morpho Blue": "https://api.morpho.org/markets", "Aerodrome": "https://api.aerodrome.finance", "Beefy Finance": "https://api.beefy.finance/apy", "Yearn": "https://api.yearn.finance", "Ondo Finance": "https://api.ondo.finance", } _PROTOCOL_MAP: dict[str, dict[str, Any]] = { "OpenTrade": { "protocol_slug": "opentrade", "project_aliases_exact": ["opentrade"], "project_aliases_contains": ["opentrade"], }, "Securitize": { "protocol_slug": "securitize", "project_aliases_exact": ["securitize"], "project_aliases_contains": ["securitize"], }, "Centrifuge Protocol": { "protocol_slug": "centrifuge", "project_aliases_exact": ["centrifuge"], "project_aliases_contains": ["centrifuge"], }, "Euler V2": { "protocol_slug": "euler-v2", "project_aliases_exact": ["euler-v2"], "project_aliases_contains": ["euler"], }, "MEV Capital": { "protocol_slug": "mev-capital", "project_aliases_exact": ["mev-capital"], "project_aliases_contains": ["mev-capital", "mev"], }, "Re7 Labs": { "protocol_slug": "re7-labs", "project_aliases_exact": ["re7-labs"], "project_aliases_contains": ["re7"], }, "Varlamore Capital": { "protocol_slug": "varlamore-capital", "project_aliases_exact": ["varlamore-capital"], "project_aliases_contains": ["varlamore"], }, "Hypha": { "protocol_slug": "hypha", "project_aliases_exact": ["hypha"], "project_aliases_contains": ["hypha"], }, "Spark": { "protocol_slug": "spark-savings", "project_aliases_exact": ["spark-savings"], "project_aliases_contains": ["spark"], }, "XSY": { "protocol_slug": "xsy", "project_aliases_exact": ["xsy"], "project_aliases_contains": ["xsy"], }, "LFJ": { "protocol_slug": "joe-v2", "project_aliases_exact": ["joe-v2", "joe-v2.1", "joe-v2.2", "joe-lend"], "project_aliases_contains": ["joe", "lfj", "trader-joe"], }, "Blackhole": { "protocol_slug": "blackhole-amm", "project_aliases_exact": ["blackhole-amm", "blackhole-clmm"], "project_aliases_contains": ["blackhole"], }, "Pharaoh Exchange": { "protocol_slug": "pharaoh-v3", "project_aliases_exact": ["pharaoh-v3"], "project_aliases_contains": ["pharaoh"], }, "K3 Capital": { "protocol_slug": "k3-capital", "project_aliases_exact": ["k3-capital"], "project_aliases_contains": ["k3"], }, "Avant Protocol": { "protocol_slug": "avant-protocol", "project_aliases_exact": ["avant-protocol"], "project_aliases_contains": ["avant"], }, "Morpho Blue": { "protocol_slug": "morpho-blue", "project_aliases_exact": ["morpho-blue", "morpho"], "project_aliases_contains": ["morpho"], }, "Ondo Finance": { "protocol_slug": "ondo-finance", "project_aliases_exact": ["ondo-finance", "ondo"], "project_aliases_contains": ["ondo"], }, "Aerodrome": { "protocol_slug": "aerodrome-slipstream", "project_aliases_exact": ["aerodrome-slipstream", "aerodrome"], "project_aliases_contains": ["aerodrome"], }, "Beefy Finance": { "protocol_slug": "beefy", "project_aliases_exact": ["beefy"], "project_aliases_contains": ["beefy"], }, "Yearn": { "protocol_slug": "yearn-finance", "project_aliases_exact": ["yearn-finance", "yearn"], "project_aliases_contains": ["yearn"], }, "Rocket Pool": { "protocol_slug": "rocket-pool", "project_aliases_exact": ["rocket-pool"], "project_aliases_contains": ["rocket"], }, "Uniswap V3": { "protocol_slug": "uniswap-v3", "project_aliases_exact": ["uniswap-v3", "uniswap"], "project_aliases_contains": ["uniswap"], }, "Perpetual Markets": { "protocol_slug": "gmx", "project_aliases_exact": ["gmx", "kwenta", "hyperliquid"], "project_aliases_contains": ["perp", "gmx", "hyperliquid"], }, "Ethena": { "protocol_slug": "ethena", "project_aliases_exact": ["ethena"], "project_aliases_contains": ["ethena"], }, } if Counter: _METRIC_CACHE_HITS = Counter( "strategy_live_data_cache_hits_total", "Total strategy live-data cache hits.", ) _METRIC_FETCH_FAILURES = Counter( "strategy_live_data_fetch_failures_total", "Total strategy live-data upstream fetch failures.", ["source"], ) _METRIC_FALLBACKS = Counter( "strategy_live_data_fallback_total", "Total strategy live-data fallback responses.", ["source"], ) else: # pragma: no cover - used only when metrics package is missing. _METRIC_CACHE_HITS = None _METRIC_FETCH_FAILURES = None _METRIC_FALLBACKS = None def _now_iso() -> str: return datetime.utcnow().replace(tzinfo=timezone.utc).isoformat() def _micro_get(key: str) -> Any: entry = _MICRO_CACHE.get(key) if not entry: return None value, ts = entry if (time.time() - ts) > _MICRO_TTL: return None return value def _micro_set(key: str, value: Any) -> None: _MICRO_CACHE[key] = (value, time.time()) def _metric_inc(metric, **labels: str) -> None: if metric is None: return try: if labels: metric.labels(**labels).inc() else: metric.inc() except Exception: return def _slugify(value: str) -> str: return "".join(ch if ch.isalnum() else "-" for ch in value.lower()).strip("-") def _normalize_network(network: str | None) -> str: normalized = (network or _DEFAULT_NETWORK).strip().lower() if normalized in _SUPPORTED_NETWORKS: return normalized return _DEFAULT_NETWORK def _build_cache_key(category: str, protocols: List[str], network: str = _DEFAULT_NETWORK) -> str: normalized_network = _normalize_network(network) normalized_protocols = [_slugify(p) for p in protocols if p] normalized_protocols = sorted(set(normalized_protocols)) protocol_part = "|".join(normalized_protocols) if normalized_protocols else "none" return f"live:{normalized_network}:{_slugify(category or 'unknown')}:{protocol_part}" def _is_cb_open(source: str) -> bool: return _CACHE.exists(f"cb:{source}:cooldown") def _record_cb_failure(source: str) -> None: failures = _CACHE.incr(f"cb:{source}:failures", ttl=_CB_WINDOW_SEC) if failures >= _CB_FAILURE_THRESHOLD: _CACHE.set_json(f"cb:{source}:cooldown", {"until": _now_iso()}, ttl=_CB_COOLDOWN_SEC) def _record_cb_success(source: str) -> None: # Soft reset: overwriting with low TTL is enough to make the window pass quickly. _CACHE.set_json(f"cb:{source}:failures", 0, ttl=1) def _http_get_json(url: str, source: str) -> Any: cache_key = f"http:{_slugify(url)}" cached = _CACHE.get_json(cache_key) if cached is not None: _metric_inc(_METRIC_CACHE_HITS) return cached if _is_cb_open(source): raise RuntimeError(f"circuit_open:{source}") start = time.perf_counter() last_exc: Exception | None = None for attempt in range(_HTTP_RETRIES): try: response = requests.get(url, timeout=_TIMEOUT) response.raise_for_status() payload = response.json() _CACHE.set_json(cache_key, payload, ttl=_TTL) _record_cb_success(source) logger.info( "strategy_live_data_fetch_ok source=%s url=%s attempt=%s latency_ms=%s", source, url, attempt + 1, int((time.perf_counter() - start) * 1000), ) return payload except Exception as exc: last_exc = exc if attempt < (_HTTP_RETRIES - 1): time.sleep(_RETRY_BASE_SLEEP * (2**attempt)) continue _record_cb_failure(source) _metric_inc(_METRIC_FETCH_FAILURES, source=source) logger.warning( "strategy_live_data_fetch_failed source=%s url=%s error=%s", source, url, last_exc, ) raise RuntimeError(f"fetch_failed:{source}") from last_exc def _fallback(source: str, warning: str) -> Dict[str, Any]: return { "apy": None, "tvl": None, "capacity": "unknown", "risk_flags": [warning], "as_of": _now_iso(), "source": source, "freshness": "stale", "warning": warning, "confidence": "low", } def _defillama_protocol_slug(protocol: str) -> str: mapped = _PROTOCOL_MAP.get(protocol or "", {}) if mapped.get("protocol_slug"): return str(mapped["protocol_slug"]) return protocol.lower().replace(" protocol", "").replace(" ", "-") def _load_static_apy_defaults(network: str = _DEFAULT_NETWORK) -> Dict[str, float]: normalized_network = _normalize_network(network) registry_name = str(_NETWORK_CHAIN_LABELS.get(normalized_network, {}).get("registry") or "strategy_registry.avalanche.json") registry_path = Path(__file__).resolve().parent / registry_name try: raw = json.loads(registry_path.read_text(encoding="utf-8")) except Exception: return {} if not isinstance(raw, list): return {} by_protocol: dict[str, list[float]] = {} for entry in raw: if not isinstance(entry, dict): continue assumptions = entry.get("simulation_assumptions") or {} base_apy = assumptions.get("base_apy") protocols = entry.get("protocols") or [] if not isinstance(base_apy, (int, float)) or not isinstance(protocols, list): continue for protocol in protocols: if isinstance(protocol, str) and protocol: by_protocol.setdefault(protocol, []).append(float(base_apy)) return { protocol: (sum(values) / len(values)) for protocol, values in by_protocol.items() if values } _STATIC_APY_DEFAULTS_BY_NETWORK: Dict[str, Dict[str, float]] = {} def _static_defaults(network: str) -> Dict[str, float]: normalized_network = _normalize_network(network) if normalized_network not in _STATIC_APY_DEFAULTS_BY_NETWORK: _STATIC_APY_DEFAULTS_BY_NETWORK[normalized_network] = _load_static_apy_defaults(normalized_network) return _STATIC_APY_DEFAULTS_BY_NETWORK[normalized_network] def _extract_chain_tvl(data: Dict[str, Any], network: str) -> float | None: normalized_network = _normalize_network(network) aliases = tuple(str(v).lower() for v in (_NETWORK_CHAIN_LABELS.get(normalized_network, {}).get("aliases") or (normalized_network,))) tvl = data.get("tvl") if isinstance(data, dict) else None chain_tvls = data.get("chainTvls") if isinstance(data, dict) else None chain_tvl = None if isinstance(chain_tvls, dict): for key, value in chain_tvls.items(): key_l = str(key).lower() if any(alias in key_l for alias in aliases): if isinstance(value, dict): chain_tvl = value.get("tvl") elif isinstance(value, (int, float)): chain_tvl = value break raw_tvl = chain_tvl if chain_tvl is not None else tvl return float(raw_tvl) if isinstance(raw_tvl, (int, float)) else None def _fetch_protocol_stats(protocol: str, network: str) -> Dict[str, Any]: slug = _defillama_protocol_slug(protocol) url = f"{_DEFI_LLAMA_PROTOCOL}/{slug}" data = _http_get_json(url, source="defillama_protocol") tvl = _extract_chain_tvl(data, network) if isinstance(data, dict) else None return { "apy": None, "tvl": tvl, "capacity": "ok" if tvl is not None else "unknown", "risk_flags": [], "as_of": _now_iso(), "source": "defillama", "freshness": "fresh", "warning": None, "confidence": "medium", } def _defillama_aliases(protocol: str) -> set[str]: mapped = _PROTOCOL_MAP.get(protocol or "", {}) aliases = {str(a).lower() for a in (mapped.get("project_aliases_exact") or [])} if not aliases: aliases.add(_slugify(protocol)) aliases.add(_slugify(protocol).replace("-protocol", "")) return aliases def _defillama_alias_fragments(protocol: str) -> set[str]: mapped = _PROTOCOL_MAP.get(protocol or "", {}) frags = {str(a).lower() for a in (mapped.get("project_aliases_contains") or [])} frags.add(_slugify(protocol).replace("-protocol", "")) return {f for f in frags if f} def _project_matches_protocol(project: str, protocol: str) -> bool: name = (project or "").lower() if not name: return False exact = _defillama_aliases(protocol) if name in exact: return True fragments = _defillama_alias_fragments(protocol) return any(fragment in name for fragment in fragments) def _pool_apy(pool: Dict[str, Any]) -> float | None: apy = pool.get("apy") if isinstance(apy, (int, float)): return float(apy) apy_base = pool.get("apyBase") apy_reward = pool.get("apyReward") if isinstance(apy_base, (int, float)) and isinstance(apy_reward, (int, float)): return float(apy_base) + float(apy_reward) return None def _iter_numeric_values(payload: Any, key_hints: tuple[str, ...]) -> list[float]: values: list[float] = [] key_tokens = tuple(k.lower() for k in key_hints) if isinstance(payload, dict): for key, value in payload.items(): key_l = str(key).lower() if any(token in key_l for token in key_tokens) and isinstance(value, (int, float)): values.append(float(value)) values.extend(_iter_numeric_values(value, key_hints)) elif isinstance(payload, list): for item in payload: values.extend(_iter_numeric_values(item, key_hints)) return values def _fetch_protocol_api_fallback(protocol: str) -> Dict[str, Any] | None: url = _PROTOCOL_API_ENDPOINTS.get(protocol) if not url: return None try: payload = _http_get_json(url, source=f"{_slugify(protocol)}_api") except Exception: return None apy_candidates = _iter_numeric_values(payload, ("apy", "apr", "rate", "yield")) tvl_candidates = _iter_numeric_values(payload, ("tvl", "liquidity", "aum", "assets")) if not apy_candidates: return None apy = max(apy_candidates) if apy > 1: apy = apy / 100.0 tvl = max(tvl_candidates) if tvl_candidates else None return { "apy": float(apy), "tvl": float(tvl) if isinstance(tvl, (int, float)) else None, "capacity": "ok", "risk_flags": ["APY derived from protocol API payload heuristics."], "as_of": _now_iso(), "source": "protocol_api", "freshness": "fresh", "warning": None, "confidence": "medium", } def _fetch_pool_apy(protocol: str, network: str) -> Dict[str, Any] | None: normalized_network = _normalize_network(network) chain_name = str(_NETWORK_CHAIN_LABELS.get(normalized_network, {}).get("name") or normalized_network.title()) aliases = tuple(str(v).lower() for v in (_NETWORK_CHAIN_LABELS.get(normalized_network, {}).get("aliases") or (normalized_network,))) payload = _http_get_json(_DEFI_LLAMA_YIELDS, source="defillama_yields") rows = payload.get("data") if isinstance(payload, dict) else None if not isinstance(rows, list): return None chain_matches: list[dict[str, Any]] = [] for row in rows: if not isinstance(row, dict): continue project = str(row.get("project") or "").lower() chain = str(row.get("chain") or "").lower() if not _project_matches_protocol(project, protocol): continue if any(alias in chain for alias in aliases): chain_matches.append(row) if chain_matches: chain_matches.sort(key=lambda p: float(p.get("tvlUsd") or 0), reverse=True) top = chain_matches[0] apy = _pool_apy(top) if apy is not None: return { "apy": apy / 100.0 if apy > 1 else apy, "tvl": float(top.get("tvlUsd") or 0) if top.get("tvlUsd") is not None else None, "capacity": "ok", "risk_flags": [], "as_of": _now_iso(), "source": "defillama", "freshness": "fresh", "warning": None, "confidence": "high", } # Protocol-level fallback from global pools average when chain-specific pool is unavailable. protocol_rows = [ row for row in rows if isinstance(row, dict) and _project_matches_protocol(str(row.get("project") or "").lower(), protocol) and _pool_apy(row) is not None ] if protocol_rows: weighted_numerator = 0.0 weighted_denominator = 0.0 for row in protocol_rows: apy = _pool_apy(row) if apy is None: continue tvl = float(row.get("tvlUsd") or 1.0) weighted_numerator += (apy * max(tvl, 1.0)) weighted_denominator += max(tvl, 1.0) if weighted_denominator > 0: inferred = weighted_numerator / weighted_denominator total_tvl = sum(float(r.get("tvlUsd") or 0) for r in protocol_rows) return { "apy": inferred / 100.0 if inferred > 1 else inferred, "tvl": total_tvl or None, "capacity": "ok", "risk_flags": [f"APY inferred from protocol-level pools, {chain_name}-specific pool unavailable."], "as_of": _now_iso(), "source": "protocol_fallback", "freshness": "fresh", "warning": f"{chain_name}-specific APY not found; using protocol-level inferred APY.", "confidence": "medium", } return None def _static_protocol_fallback(protocols: List[str], category: str, network: str) -> Dict[str, Any]: defaults = _static_defaults(network) for protocol in protocols: base_apy = defaults.get(protocol) if isinstance(base_apy, (int, float)): warning = f"Using static APY fallback for {protocol}." _metric_inc(_METRIC_FALLBACKS, source="static") return { "apy": float(base_apy), "tvl": None, "capacity": "unknown", "risk_flags": [warning], "as_of": _now_iso(), "source": "static_assumption", "freshness": "stale", "warning": warning, "confidence": "low", } _metric_inc(_METRIC_FALLBACKS, source="static") return _fallback("static_assumption", f"No static APY fallback found for category '{category}'.") def _resolve_with_sources(protocol: str, network: str) -> Dict[str, Any]: pooled = _fetch_pool_apy(protocol, network) if pooled: return pooled protocol_api = _fetch_protocol_api_fallback(protocol) if protocol_api: return protocol_api stats = _fetch_protocol_stats(protocol, network) if stats.get("apy") is not None or stats.get("tvl") is not None: return stats return _static_protocol_fallback([protocol], "unknown", network) def _adapter_opentrade(network: str) -> Dict[str, Any]: return _resolve_with_sources("OpenTrade", network) def _adapter_securitize(network: str) -> Dict[str, Any]: return _resolve_with_sources("Securitize", network) def _adapter_centrifuge(network: str) -> Dict[str, Any]: return _resolve_with_sources("Centrifuge Protocol", network) def _adapter_euler_v2(network: str) -> Dict[str, Any]: return _resolve_with_sources("Euler V2", network) def _adapter_mev_capital(network: str) -> Dict[str, Any]: return _resolve_with_sources("MEV Capital", network) def _adapter_re7_labs(network: str) -> Dict[str, Any]: return _resolve_with_sources("Re7 Labs", network) def _adapter_varlamore(network: str) -> Dict[str, Any]: return _resolve_with_sources("Varlamore Capital", network) def _adapter_hypha(network: str) -> Dict[str, Any]: return _resolve_with_sources("Hypha", network) def _adapter_spark(network: str) -> Dict[str, Any]: return _resolve_with_sources("Spark", network) def _adapter_xsy(network: str) -> Dict[str, Any]: return _resolve_with_sources("XSY", network) def _adapter_lfj(network: str) -> Dict[str, Any]: return _resolve_with_sources("LFJ", network) def _adapter_blackhole(network: str) -> Dict[str, Any]: return _resolve_with_sources("Blackhole", network) def _adapter_pharaoh(network: str) -> Dict[str, Any]: return _resolve_with_sources("Pharaoh Exchange", network) def _adapter_k3(network: str) -> Dict[str, Any]: return _resolve_with_sources("K3 Capital", network) def _adapter_avant(network: str) -> Dict[str, Any]: return _resolve_with_sources("Avant Protocol", network) def _adapter_morpho_blue(network: str) -> Dict[str, Any]: return _resolve_with_sources("Morpho Blue", network) def _adapter_ondo(network: str) -> Dict[str, Any]: return _resolve_with_sources("Ondo Finance", network) def _adapter_aerodrome(network: str) -> Dict[str, Any]: return _resolve_with_sources("Aerodrome", network) def _adapter_beefy(network: str) -> Dict[str, Any]: return _resolve_with_sources("Beefy Finance", network) def _adapter_yearn(network: str) -> Dict[str, Any]: return _resolve_with_sources("Yearn", network) def _adapter_rocket_pool(network: str) -> Dict[str, Any]: return _resolve_with_sources("Rocket Pool", network) def _adapter_uniswap_v3(network: str) -> Dict[str, Any]: return _resolve_with_sources("Uniswap V3", network) def _adapter_perpetual_markets(network: str) -> Dict[str, Any]: return _resolve_with_sources("Perpetual Markets", network) def _adapter_ethena(network: str) -> Dict[str, Any]: return _resolve_with_sources("Ethena", network) _PROTOCOL_ADAPTERS: dict[str, Callable[[str], Dict[str, Any]]] = { "OpenTrade": _adapter_opentrade, "Securitize": _adapter_securitize, "Centrifuge Protocol": _adapter_centrifuge, "Euler V2": _adapter_euler_v2, "MEV Capital": _adapter_mev_capital, "Re7 Labs": _adapter_re7_labs, "Varlamore Capital": _adapter_varlamore, "Hypha": _adapter_hypha, "Spark": _adapter_spark, "XSY": _adapter_xsy, "LFJ": _adapter_lfj, "Blackhole": _adapter_blackhole, "Pharaoh Exchange": _adapter_pharaoh, "K3 Capital": _adapter_k3, "Avant Protocol": _adapter_avant, "Morpho Blue": _adapter_morpho_blue, "Ondo Finance": _adapter_ondo, "Aerodrome": _adapter_aerodrome, "Beefy Finance": _adapter_beefy, "Yearn": _adapter_yearn, "Rocket Pool": _adapter_rocket_pool, "Uniswap V3": _adapter_uniswap_v3, "Perpetual Markets": _adapter_perpetual_markets, "Ethena": _adapter_ethena, } def _confidence_rank(value: str | None) -> int: rank = {"low": 1, "medium": 2, "high": 3} return rank.get((value or "").lower(), 0) def _select_best_result(results: List[Dict[str, Any]]) -> Dict[str, Any] | None: if not results: return None sorted_results = sorted( results, key=lambda r: ( 1 if r.get("apy") is not None else 0, 1 if r.get("freshness") == "fresh" else 0, _confidence_rank(str(r.get("confidence"))), float(r.get("apy") or 0.0), float(r.get("tvl") or 0.0), ), reverse=True, ) return sorted_results[0] def _resolve_protocols(protocols: List[str], category: str, network: str) -> Dict[str, Any]: if not protocols: return _static_protocol_fallback([], category, network) results: List[Dict[str, Any]] = [] for protocol in protocols: adapter = _PROTOCOL_ADAPTERS.get(protocol) try: result = adapter(network) if adapter else _resolve_with_sources(protocol, network) except Exception as exc: logger.info("Protocol adapter fallback for %s: %s", protocol, exc) result = _static_protocol_fallback([protocol], category, network) if isinstance(result, dict): tagged = dict(result) tagged["protocol"] = protocol results.append(tagged) best = _select_best_result(results) if best is None: return _static_protocol_fallback(protocols, category, network) if len(results) > 1: other_protocols = [str(r.get("protocol")) for r in results if r.get("protocol") != best.get("protocol")] if other_protocols: flags = list(best.get("risk_flags") or []) flags.append( f"Selected best live data from {best.get('protocol')} among {len(results)} protocol adapters." ) best["risk_flags"] = flags return best def _with_stale_fallback(cache_key: str, fetcher, category: str, protocols: List[str], network: str) -> Dict[str, Any]: now = time.time() redis_payload = _CACHE.get_json(cache_key) stale_candidate = None if isinstance(redis_payload, dict): cached_at = float(redis_payload.get("_cached_at", 0) or 0) age = now - cached_at if cached_at else (_TTL + 1) if age <= _TTL: _metric_inc(_METRIC_CACHE_HITS) payload = dict(redis_payload.get("payload") or {}) payload["freshness"] = "fresh" _micro_set(cache_key, payload) logger.info("strategy_live_data_cache_hit key=%s age_s=%s", cache_key, int(age)) return payload if age <= _STALE_TTL: stale_candidate = dict(redis_payload.get("payload") or {}) error_payload = _CACHE.get_json(f"{cache_key}:error") if isinstance(error_payload, dict) and error_payload.get("__error__"): if stale_candidate: stale_candidate["freshness"] = "stale" stale_candidate["warning"] = stale_candidate.get("warning") or "Serving stale cached live data due to upstream errors." logger.info("strategy_live_data_stale_hit key=%s reason=negative_cache", cache_key) return stale_candidate return _static_protocol_fallback(protocols, category, network) try: result = fetcher() _CACHE.set_json(cache_key, {"payload": result, "_cached_at": now}, ttl=_STALE_TTL) _micro_set(cache_key, result) return result except Exception as exc: _CACHE.set_error(f"{cache_key}:error", {"error": str(exc), "as_of": _now_iso()}, ttl=_ERROR_TTL) if stale_candidate: stale_candidate["freshness"] = "stale" stale_candidate["warning"] = stale_candidate.get("warning") or "Serving stale cached live data due to upstream errors." logger.info("strategy_live_data_stale_hit key=%s reason=fetch_failure", cache_key) _metric_inc(_METRIC_FALLBACKS, source="stale_cache") return stale_candidate raise def rwa_adapter(protocols: List[str], network: str) -> Dict[str, Any]: try: stats = _resolve_protocols(protocols, "rwa", network) if stats.get("tvl") is None: stats["risk_flags"].append("No live TVL for RWA protocol.") return stats except Exception as exc: logger.info("RWA adapter fallback for protocols=%s: %s", protocols, exc) return _static_protocol_fallback(protocols, "rwa", network) def lending_adapter(protocols: List[str], network: str) -> Dict[str, Any]: try: return _resolve_protocols(protocols, "lending", network) except Exception as exc: logger.info("Lending adapter fallback for protocols=%s: %s", protocols, exc) return _static_protocol_fallback(protocols, "lending", network) def lp_adapter(protocols: List[str], network: str) -> Dict[str, Any]: normalized_network = _normalize_network(network) chain_name = str(_NETWORK_CHAIN_LABELS.get(normalized_network, {}).get("name") or normalized_network.title()) try: live = _resolve_protocols(protocols, "lp", normalized_network) if live.get("apy") is not None: return live chain_data = _http_get_json(_DEFI_LLAMA_CHAINS, source="defillama_chains") chain_row = None if isinstance(chain_data, list): chain_row = next((c for c in chain_data if str(c.get("name", "")).lower() == chain_name.lower()), None) return { "apy": None, "tvl": float(chain_row.get("tvl") or 0) if chain_row else None, "capacity": "ok" if chain_row else "unknown", "risk_flags": [], "as_of": _now_iso(), "source": "defillama_chains", "freshness": "fresh" if chain_row else "stale", "warning": None if chain_row else f"{chain_name} chain TVL unavailable", "confidence": "medium" if chain_row else "low", } except Exception as exc: logger.info("LP adapter fallback: %s", exc) return _static_protocol_fallback(protocols, "lp", normalized_network) def basis_adapter(protocols: List[str], network: str) -> Dict[str, Any]: try: return _resolve_protocols(protocols, "basis", network) except Exception: return _static_protocol_fallback(protocols, "basis", network) def curated_adapter(protocols: List[str], network: str) -> Dict[str, Any]: try: stats = _resolve_protocols(protocols, "curated", network) if stats.get("tvl") is None: stats["risk_flags"].append("Curated vault live TVL unavailable.") return stats except Exception: return _static_protocol_fallback(protocols, "curated", network) def enrich_live_data(category: str, protocols: List[str], network: str = _DEFAULT_NETWORK) -> Dict[str, Any]: normalized_network = _normalize_network(network) cache_key = _build_cache_key(category, protocols, normalized_network) micro = _micro_get(cache_key) if isinstance(micro, dict): _metric_inc(_METRIC_CACHE_HITS) return dict(micro) category_key = (category or "").lower().strip() if category_key == "rwa": return _with_stale_fallback(cache_key, lambda: rwa_adapter(protocols, normalized_network), category_key, protocols, normalized_network) if category_key == "lending" or category_key == "staking": return _with_stale_fallback(cache_key, lambda: lending_adapter(protocols, normalized_network), category_key, protocols, normalized_network) if category_key == "lp": return _with_stale_fallback(cache_key, lambda: lp_adapter(protocols, normalized_network), category_key, protocols, normalized_network) if category_key == "basis": return _with_stale_fallback(cache_key, lambda: basis_adapter(protocols, normalized_network), category_key, protocols, normalized_network) if category_key == "curated" or category_key == "structured": return _with_stale_fallback(cache_key, lambda: curated_adapter(protocols, normalized_network), category_key, protocols, normalized_network) _metric_inc(_METRIC_FALLBACKS, source="unknown") return _fallback("unknown", f"No adapter for category '{category_key}'")