Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| from copy import deepcopy | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| class LocalResourceService: | |
| """Centralized loader for the unified fallback registry.""" | |
| def __init__(self, resource_path: Path): | |
| self.resource_path = Path(resource_path) | |
| self._raw_data: Optional[Dict[str, Any]] = None | |
| self._assets: Dict[str, Dict[str, Any]] = {} | |
| self._market_overview: Dict[str, Any] = {} | |
| self._logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------- # | |
| # Loading helpers | |
| # --------------------------------------------------------------------- # | |
| def _ensure_loaded(self) -> None: | |
| if self._raw_data is not None: | |
| return | |
| try: | |
| with self.resource_path.open("r", encoding="utf-8") as handle: | |
| data = json.load(handle) | |
| except FileNotFoundError: | |
| self._logger.warning("Fallback registry %s not found", self.resource_path) | |
| data = {} | |
| except json.JSONDecodeError as exc: | |
| self._logger.error("Invalid fallback registry JSON: %s", exc) | |
| data = {} | |
| fallback_data = data.get("fallback_data") or {} | |
| assets = fallback_data.get("assets") or {} | |
| normalized_assets: Dict[str, Dict[str, Any]] = {} | |
| for key, details in assets.items(): | |
| symbol = str(details.get("symbol") or key).upper() | |
| asset_copy = deepcopy(details) | |
| asset_copy["symbol"] = symbol | |
| normalized_assets[symbol] = asset_copy | |
| self._raw_data = data | |
| self._assets = normalized_assets | |
| self._market_overview = deepcopy(fallback_data.get("market_overview") or {}) | |
| def refresh(self) -> None: | |
| """Force reload from disk (used in tests).""" | |
| self._raw_data = None | |
| self._assets = {} | |
| self._market_overview = {} | |
| self._ensure_loaded() | |
| # --------------------------------------------------------------------- # | |
| # Registry level helpers | |
| # --------------------------------------------------------------------- # | |
| def get_registry(self) -> Dict[str, Any]: | |
| self._ensure_loaded() | |
| return deepcopy(self._raw_data or {}) | |
| def get_supported_symbols(self) -> List[str]: | |
| self._ensure_loaded() | |
| return sorted(self._assets.keys()) | |
| def has_fallback_data(self) -> bool: | |
| self._ensure_loaded() | |
| return bool(self._assets) | |
| # --------------------------------------------------------------------- # | |
| # Market data helpers | |
| # --------------------------------------------------------------------- # | |
| def _asset_to_market_record(self, asset: Dict[str, Any]) -> Dict[str, Any]: | |
| price = asset.get("price", {}) | |
| return { | |
| "id": asset.get("slug") or asset.get("symbol", "").lower(), | |
| "symbol": asset.get("symbol"), | |
| "name": asset.get("name"), | |
| "current_price": price.get("current_price"), | |
| "market_cap": price.get("market_cap"), | |
| "market_cap_rank": asset.get("market_cap_rank"), | |
| "total_volume": price.get("total_volume"), | |
| "price_change_24h": price.get("price_change_24h"), | |
| "price_change_percentage_24h": price.get("price_change_percentage_24h"), | |
| "high_24h": price.get("high_24h"), | |
| "low_24h": price.get("low_24h"), | |
| "last_updated": price.get("last_updated"), | |
| } | |
| def get_top_prices(self, limit: int = 10) -> List[Dict[str, Any]]: | |
| self._ensure_loaded() | |
| if not self._assets: | |
| return [] | |
| sorted_assets = sorted( | |
| self._assets.values(), | |
| key=lambda x: (x.get("market_cap_rank") or 9999, -(x.get("price", {}).get("market_cap") or 0)), | |
| ) | |
| selected = sorted_assets[: max(1, limit)] | |
| return [self._asset_to_market_record(asset) for asset in selected] | |
| def get_prices_for_symbols(self, symbols: List[str]) -> List[Dict[str, Any]]: | |
| self._ensure_loaded() | |
| if not symbols or not self._assets: | |
| return [] | |
| results: List[Dict[str, Any]] = [] | |
| for raw_symbol in symbols: | |
| symbol = str(raw_symbol or "").upper() | |
| asset = self._assets.get(symbol) | |
| if asset: | |
| results.append(self._asset_to_market_record(asset)) | |
| return results | |
| def get_ticker_snapshot(self, symbol: str) -> Optional[Dict[str, Any]]: | |
| self._ensure_loaded() | |
| asset = self._assets.get(str(symbol or "").upper()) | |
| if not asset: | |
| return None | |
| price = asset.get("price", {}) | |
| return { | |
| "symbol": asset.get("symbol"), | |
| "price": price.get("current_price"), | |
| "price_change_24h": price.get("price_change_24h"), | |
| "price_change_percent_24h": price.get("price_change_percentage_24h"), | |
| "high_24h": price.get("high_24h"), | |
| "low_24h": price.get("low_24h"), | |
| "volume_24h": price.get("total_volume"), | |
| "quote_volume_24h": price.get("total_volume"), | |
| } | |
| def get_market_overview(self) -> Dict[str, Any]: | |
| self._ensure_loaded() | |
| if not self._assets: | |
| return {} | |
| overview = deepcopy(self._market_overview) | |
| if not overview: | |
| total_market_cap = sum( | |
| (asset.get("price", {}) or {}).get("market_cap") or 0 for asset in self._assets.values() | |
| ) | |
| total_volume = sum( | |
| (asset.get("price", {}) or {}).get("total_volume") or 0 for asset in self._assets.values() | |
| ) | |
| btc = self._assets.get("BTC", {}) | |
| btc_cap = (btc.get("price", {}) or {}).get("market_cap") or 0 | |
| overview = { | |
| "total_market_cap": total_market_cap, | |
| "total_volume_24h": total_volume, | |
| "btc_dominance": (btc_cap / total_market_cap * 100) if total_market_cap else 0, | |
| "active_cryptocurrencies": len(self._assets), | |
| "markets": 500, | |
| "market_cap_change_percentage_24h": 0, | |
| } | |
| # Enrich with derived leaderboards | |
| gainers = sorted( | |
| self._assets.values(), | |
| key=lambda asset: (asset.get("price", {}) or {}).get("price_change_percentage_24h") or 0, | |
| reverse=True, | |
| )[:5] | |
| losers = sorted( | |
| self._assets.values(), | |
| key=lambda asset: (asset.get("price", {}) or {}).get("price_change_percentage_24h") or 0, | |
| )[:5] | |
| volumes = sorted( | |
| self._assets.values(), | |
| key=lambda asset: (asset.get("price", {}) or {}).get("total_volume") or 0, | |
| reverse=True, | |
| )[:5] | |
| overview["top_gainers"] = [self._asset_to_market_record(asset) for asset in gainers] | |
| overview["top_losers"] = [self._asset_to_market_record(asset) for asset in losers] | |
| overview["top_by_volume"] = [self._asset_to_market_record(asset) for asset in volumes] | |
| overview["timestamp"] = overview.get("timestamp") or datetime.utcnow().isoformat() | |
| return overview | |
| def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: | |
| self._ensure_loaded() | |
| asset = self._assets.get(str(symbol or "").upper()) | |
| if not asset: | |
| return [] | |
| ohlcv = (asset.get("ohlcv") or {}).get(interval) or [] | |
| if not ohlcv and interval != "1h": | |
| # Provide 1h data for other intervals when nothing else is present | |
| ohlcv = (asset.get("ohlcv") or {}).get("1h") or [] | |
| if limit and ohlcv: | |
| return deepcopy(ohlcv[-limit:]) | |
| return deepcopy(ohlcv) | |
| # --------------------------------------------------------------------- # | |
| # Convenience helpers for testing / diagnostics | |
| # --------------------------------------------------------------------- # | |
| def describe(self) -> Dict[str, Any]: | |
| """Simple snapshot used in diagnostics/tests.""" | |
| self._ensure_loaded() | |
| return { | |
| "resource_path": str(self.resource_path), | |
| "assets": len(self._assets), | |
| "supported_symbols": self.get_supported_symbols(), | |
| } | |