Spaces:
Sleeping
Sleeping
| """Base provider interface for data sources""" | |
| from __future__ import annotations | |
| from abc import ABC, abstractmethod | |
| from typing import List, Optional | |
| from datetime import datetime | |
| import time | |
| import httpx | |
| import sys | |
| import os | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) | |
| from core.models import OHLCV, Price, ProviderHealth | |
| class CircuitBreaker: | |
| """Circuit breaker for provider failures""" | |
| def __init__(self, threshold: int = 5, timeout: int = 60): | |
| self.threshold = threshold | |
| self.timeout = timeout | |
| self.failures = 0 | |
| self.last_failure_time: Optional[float] = None | |
| self.is_open = False | |
| def record_success(self): | |
| """Record successful request""" | |
| self.failures = 0 | |
| self.is_open = False | |
| def record_failure(self): | |
| """Record failed request""" | |
| self.failures += 1 | |
| self.last_failure_time = time.time() | |
| if self.failures >= self.threshold: | |
| self.is_open = True | |
| def can_attempt(self) -> bool: | |
| """Check if we can attempt a request""" | |
| if not self.is_open: | |
| return True | |
| # Check if timeout has passed | |
| if self.last_failure_time: | |
| elapsed = time.time() - self.last_failure_time | |
| if elapsed >= self.timeout: | |
| self.is_open = False | |
| self.failures = 0 | |
| return True | |
| return False | |
| class BaseProvider(ABC): | |
| """Base class for all data providers""" | |
| def __init__(self, name: str, base_url: str, timeout: int = 10): | |
| self.name = name | |
| self.base_url = base_url | |
| self.timeout = timeout | |
| self.circuit_breaker = CircuitBreaker() | |
| self.last_latency: Optional[int] = None | |
| self.last_check: Optional[datetime] = None | |
| self.last_error: Optional[str] = None | |
| self.client: Optional[httpx.AsyncClient] = None | |
| async def get_client(self) -> httpx.AsyncClient: | |
| """Get or create HTTP client""" | |
| if self.client is None: | |
| self.client = httpx.AsyncClient(timeout=self.timeout) | |
| return self.client | |
| async def close(self): | |
| """Close HTTP client""" | |
| if self.client: | |
| await self.client.aclose() | |
| self.client = None | |
| async def _make_request(self, url: str, params: Optional[dict] = None) -> dict: | |
| """Make HTTP request with timing and error handling""" | |
| if not self.circuit_breaker.can_attempt(): | |
| raise Exception(f"Circuit breaker open for {self.name}") | |
| client = await self.get_client() | |
| start_time = time.time() | |
| try: | |
| response = await client.get(url, params=params) | |
| response.raise_for_status() | |
| self.last_latency = int((time.time() - start_time) * 1000) | |
| self.last_check = datetime.now() | |
| self.last_error = None | |
| self.circuit_breaker.record_success() | |
| return response.json() | |
| except Exception as e: | |
| self.last_error = str(e) | |
| self.circuit_breaker.record_failure() | |
| raise | |
| async def fetch_ohlcv(self, symbol: str, interval: str, limit: int) -> List[OHLCV]: | |
| """Fetch OHLCV data""" | |
| pass | |
| async def fetch_prices(self, symbols: List[str]) -> List[Price]: | |
| """Fetch current prices""" | |
| pass | |
| async def get_health(self) -> ProviderHealth: | |
| """Get provider health status""" | |
| if self.circuit_breaker.is_open: | |
| status = "offline" | |
| elif self.last_error: | |
| status = "degraded" | |
| else: | |
| status = "online" | |
| return ProviderHealth( | |
| name=self.name, | |
| status=status, | |
| latency=self.last_latency, | |
| lastCheck=self.last_check.isoformat() if self.last_check else datetime.now().isoformat(), | |
| errorMessage=self.last_error | |
| ) | |