Spaces:
Sleeping
Sleeping
| import aiohttp | |
| import asyncio | |
| import time | |
| from typing import Dict, Any, Optional, List | |
| from src.config import config | |
| import json | |
| class RateLimiter: | |
| def __init__(self, delay: float): | |
| self.delay = delay | |
| self.last_call = 0 | |
| async def acquire(self): | |
| now = time.time() | |
| elapsed = now - self.last_call | |
| if elapsed < self.delay: | |
| await asyncio.sleep(self.delay - elapsed) | |
| self.last_call = time.time() | |
| class CoinGeckoClient: | |
| def __init__(self): | |
| self.rate_limiter = RateLimiter(config.RATE_LIMIT_DELAY) | |
| self.session = None | |
| async def get_session(self): | |
| if self.session is None: | |
| timeout = aiohttp.ClientTimeout(total=config.REQUEST_TIMEOUT) | |
| self.session = aiohttp.ClientSession(timeout=timeout) | |
| return self.session | |
| async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| await self.rate_limiter.acquire() | |
| url = f"{config.COINGECKO_BASE_URL}/{endpoint}" | |
| if params is None: | |
| params = {} | |
| if config.COINGECKO_API_KEY: | |
| params["x_cg_demo_api_key"] = config.COINGECKO_API_KEY | |
| session = await self.get_session() | |
| for attempt in range(config.MAX_RETRIES): | |
| try: | |
| async with session.get(url, params=params) as response: | |
| if response.status == 200: | |
| return await response.json() | |
| elif response.status == 429: | |
| await asyncio.sleep(2 ** attempt) | |
| continue | |
| else: | |
| raise Exception(f"API error: {response.status}") | |
| except asyncio.TimeoutError: | |
| if attempt == config.MAX_RETRIES - 1: | |
| raise Exception("Request timeout") | |
| await asyncio.sleep(1) | |
| raise Exception("Max retries exceeded") | |
| async def get_price(self, coin_ids: str, vs_currencies: str = "usd") -> Dict[str, Any]: | |
| params = { | |
| "ids": coin_ids, | |
| "vs_currencies": vs_currencies, | |
| "include_24hr_change": "true", | |
| "include_24hr_vol": "true", | |
| "include_market_cap": "true" | |
| } | |
| return await self._make_request("simple/price", params) | |
| async def get_trending(self) -> Dict[str, Any]: | |
| return await self._make_request("search/trending") | |
| async def get_global_data(self) -> Dict[str, Any]: | |
| return await self._make_request("global") | |
| async def get_coin_data(self, coin_id: str) -> Dict[str, Any]: | |
| params = {"localization": "false", "tickers": "false", "community_data": "false"} | |
| return await self._make_request(f"coins/{coin_id}", params) | |
| async def get_market_data(self, vs_currency: str = "usd", per_page: int = 10) -> Dict[str, Any]: | |
| params = { | |
| "vs_currency": vs_currency, | |
| "order": "market_cap_desc", | |
| "per_page": per_page, | |
| "page": 1, | |
| "sparkline": "false" | |
| } | |
| return await self._make_request("coins/markets", params) | |
| async def get_price_history(self, coin_id: str, days: int = 7) -> Dict[str, Any]: | |
| params = {"vs_currency": "usd", "days": days} | |
| return await self._make_request(f"coins/{coin_id}/market_chart", params) | |
| async def close(self): | |
| if self.session: | |
| await self.session.close() | |
| class CryptoCompareClient: | |
| def __init__(self): | |
| self.rate_limiter = RateLimiter(config.RATE_LIMIT_DELAY) | |
| self.session = None | |
| async def get_session(self): | |
| if self.session is None: | |
| timeout = aiohttp.ClientTimeout(total=config.REQUEST_TIMEOUT) | |
| self.session = aiohttp.ClientSession(timeout=timeout) | |
| return self.session | |
| async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| await self.rate_limiter.acquire() | |
| url = f"{config.CRYPTOCOMPARE_BASE_URL}/{endpoint}" | |
| if params is None: | |
| params = {} | |
| if config.CRYPTOCOMPARE_API_KEY: | |
| params["api_key"] = config.CRYPTOCOMPARE_API_KEY | |
| session = await self.get_session() | |
| for attempt in range(config.MAX_RETRIES): | |
| try: | |
| async with session.get(url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data.get("Response") == "Error": | |
| raise Exception(data.get("Message", "API error")) | |
| return data | |
| elif response.status == 429: | |
| await asyncio.sleep(2 ** attempt) | |
| continue | |
| else: | |
| raise Exception(f"API error: {response.status}") | |
| except asyncio.TimeoutError: | |
| if attempt == config.MAX_RETRIES - 1: | |
| raise Exception("Request timeout") | |
| await asyncio.sleep(1) | |
| raise Exception("Max retries exceeded") | |
| async def get_price_multi(self, fsyms: str, tsyms: str = "USD") -> Dict[str, Any]: | |
| params = {"fsyms": fsyms, "tsyms": tsyms} | |
| return await self._make_request("pricemulti", params) | |
| async def get_social_data(self, coin_symbol: str) -> Dict[str, Any]: | |
| params = {"coinSymbol": coin_symbol} | |
| return await self._make_request("social/coin/latest", params) | |
| async def get_news(self, categories: str = "blockchain") -> Dict[str, Any]: | |
| params = {"categories": categories} | |
| return await self._make_request("news/", params) | |
| async def close(self): | |
| if self.session: | |
| await self.session.close() | |
| coingecko_client = CoinGeckoClient() | |
| cryptocompare_client = CryptoCompareClient() | |