| import json |
| import logging |
| from copy import deepcopy |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
|
|
| class LocalResourceService: |
| """Centralized loader for the unified fallback registry.""" |
|
|
| def __init__(self, resource_path: Path): |
| self.resource_path = Path(resource_path) |
| self._raw_data: Optional[Dict[str, Any]] = None |
| self._assets: Dict[str, Dict[str, Any]] = {} |
| self._market_overview: Dict[str, Any] = {} |
| self._logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| def _ensure_loaded(self) -> None: |
| if self._raw_data is not None: |
| return |
|
|
| try: |
| with self.resource_path.open("r", encoding="utf-8") as handle: |
| data = json.load(handle) |
| except FileNotFoundError: |
| self._logger.warning("Fallback registry %s not found", self.resource_path) |
| data = {} |
| except json.JSONDecodeError as exc: |
| self._logger.error("Invalid fallback registry JSON: %s", exc) |
| data = {} |
|
|
| fallback_data = data.get("fallback_data") or {} |
| assets = fallback_data.get("assets") or {} |
| normalized_assets: Dict[str, Dict[str, Any]] = {} |
|
|
| for key, details in assets.items(): |
| symbol = str(details.get("symbol") or key).upper() |
| asset_copy = deepcopy(details) |
| asset_copy["symbol"] = symbol |
| normalized_assets[symbol] = asset_copy |
|
|
| self._raw_data = data |
| self._assets = normalized_assets |
| self._market_overview = deepcopy(fallback_data.get("market_overview") or {}) |
|
|
| def refresh(self) -> None: |
| """Force reload from disk (used in tests).""" |
| self._raw_data = None |
| self._assets = {} |
| self._market_overview = {} |
| self._ensure_loaded() |
|
|
| |
| |
| |
| def get_registry(self) -> Dict[str, Any]: |
| self._ensure_loaded() |
| return deepcopy(self._raw_data or {}) |
|
|
| def get_supported_symbols(self) -> List[str]: |
| self._ensure_loaded() |
| return sorted(self._assets.keys()) |
|
|
| def has_fallback_data(self) -> bool: |
| self._ensure_loaded() |
| return bool(self._assets) |
|
|
| |
| |
| |
| def _asset_to_market_record(self, asset: Dict[str, Any]) -> Dict[str, Any]: |
| price = asset.get("price", {}) |
| return { |
| "id": asset.get("slug") or asset.get("symbol", "").lower(), |
| "symbol": asset.get("symbol"), |
| "name": asset.get("name"), |
| "current_price": price.get("current_price"), |
| "market_cap": price.get("market_cap"), |
| "market_cap_rank": asset.get("market_cap_rank"), |
| "total_volume": price.get("total_volume"), |
| "price_change_24h": price.get("price_change_24h"), |
| "price_change_percentage_24h": price.get("price_change_percentage_24h"), |
| "high_24h": price.get("high_24h"), |
| "low_24h": price.get("low_24h"), |
| "last_updated": price.get("last_updated"), |
| } |
|
|
| def get_top_prices(self, limit: int = 10) -> List[Dict[str, Any]]: |
| self._ensure_loaded() |
| if not self._assets: |
| return [] |
|
|
| sorted_assets = sorted( |
| self._assets.values(), |
| key=lambda x: (x.get("market_cap_rank") or 9999, -(x.get("price", {}).get("market_cap") or 0)), |
| ) |
| selected = sorted_assets[: max(1, limit)] |
| return [self._asset_to_market_record(asset) for asset in selected] |
|
|
| def get_prices_for_symbols(self, symbols: List[str]) -> List[Dict[str, Any]]: |
| self._ensure_loaded() |
| if not symbols or not self._assets: |
| return [] |
|
|
| results: List[Dict[str, Any]] = [] |
| for raw_symbol in symbols: |
| symbol = str(raw_symbol or "").upper() |
| asset = self._assets.get(symbol) |
| if asset: |
| results.append(self._asset_to_market_record(asset)) |
| return results |
|
|
| def get_ticker_snapshot(self, symbol: str) -> Optional[Dict[str, Any]]: |
| self._ensure_loaded() |
| asset = self._assets.get(str(symbol or "").upper()) |
| if not asset: |
| return None |
|
|
| price = asset.get("price", {}) |
| return { |
| "symbol": asset.get("symbol"), |
| "price": price.get("current_price"), |
| "price_change_24h": price.get("price_change_24h"), |
| "price_change_percent_24h": price.get("price_change_percentage_24h"), |
| "high_24h": price.get("high_24h"), |
| "low_24h": price.get("low_24h"), |
| "volume_24h": price.get("total_volume"), |
| "quote_volume_24h": price.get("total_volume"), |
| } |
|
|
| def get_market_overview(self) -> Dict[str, Any]: |
| self._ensure_loaded() |
| if not self._assets: |
| return {} |
|
|
| overview = deepcopy(self._market_overview) |
| if not overview: |
| total_market_cap = sum( |
| (asset.get("price", {}) or {}).get("market_cap") or 0 for asset in self._assets.values() |
| ) |
| total_volume = sum( |
| (asset.get("price", {}) or {}).get("total_volume") or 0 for asset in self._assets.values() |
| ) |
| btc = self._assets.get("BTC", {}) |
| btc_cap = (btc.get("price", {}) or {}).get("market_cap") or 0 |
| overview = { |
| "total_market_cap": total_market_cap, |
| "total_volume_24h": total_volume, |
| "btc_dominance": (btc_cap / total_market_cap * 100) if total_market_cap else 0, |
| "active_cryptocurrencies": len(self._assets), |
| "markets": 500, |
| "market_cap_change_percentage_24h": 0, |
| } |
|
|
| |
| gainers = sorted( |
| self._assets.values(), |
| key=lambda asset: (asset.get("price", {}) or {}).get("price_change_percentage_24h") or 0, |
| reverse=True, |
| )[:5] |
| losers = sorted( |
| self._assets.values(), |
| key=lambda asset: (asset.get("price", {}) or {}).get("price_change_percentage_24h") or 0, |
| )[:5] |
| volumes = sorted( |
| self._assets.values(), |
| key=lambda asset: (asset.get("price", {}) or {}).get("total_volume") or 0, |
| reverse=True, |
| )[:5] |
|
|
| overview["top_gainers"] = [self._asset_to_market_record(asset) for asset in gainers] |
| overview["top_losers"] = [self._asset_to_market_record(asset) for asset in losers] |
| overview["top_by_volume"] = [self._asset_to_market_record(asset) for asset in volumes] |
| overview["timestamp"] = overview.get("timestamp") or datetime.utcnow().isoformat() |
|
|
| return overview |
|
|
| def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: |
| self._ensure_loaded() |
| asset = self._assets.get(str(symbol or "").upper()) |
| if not asset: |
| return [] |
|
|
| ohlcv = (asset.get("ohlcv") or {}).get(interval) or [] |
| if not ohlcv and interval != "1h": |
| |
| ohlcv = (asset.get("ohlcv") or {}).get("1h") or [] |
|
|
| if limit and ohlcv: |
| return deepcopy(ohlcv[-limit:]) |
| return deepcopy(ohlcv) |
|
|
| |
| |
| |
| def describe(self) -> Dict[str, Any]: |
| """Simple snapshot used in diagnostics/tests.""" |
| self._ensure_loaded() |
| return { |
| "resource_path": str(self.resource_path), |
| "assets": len(self._assets), |
| "supported_symbols": self.get_supported_symbols(), |
| } |
|
|