| """Base provider interface for data sources""" |
| from __future__ import annotations |
| from abc import ABC, abstractmethod |
| from typing import List, Optional |
| from datetime import datetime |
| import time |
| import httpx |
| import sys |
| import os |
| sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) |
|
|
| from core.models import OHLCV, Price, ProviderHealth |
|
|
|
|
| class CircuitBreaker: |
| """Circuit breaker for provider failures""" |
|
|
| def __init__(self, threshold: int = 5, timeout: int = 60): |
| self.threshold = threshold |
| self.timeout = timeout |
| self.failures = 0 |
| self.last_failure_time: Optional[float] = None |
| self.is_open = False |
|
|
| def record_success(self): |
| """Record successful request""" |
| self.failures = 0 |
| self.is_open = False |
|
|
| def record_failure(self): |
| """Record failed request""" |
| self.failures += 1 |
| self.last_failure_time = time.time() |
|
|
| if self.failures >= self.threshold: |
| self.is_open = True |
|
|
| def can_attempt(self) -> bool: |
| """Check if we can attempt a request""" |
| if not self.is_open: |
| return True |
|
|
| |
| if self.last_failure_time: |
| elapsed = time.time() - self.last_failure_time |
| if elapsed >= self.timeout: |
| self.is_open = False |
| self.failures = 0 |
| return True |
|
|
| return False |
|
|
|
|
| class BaseProvider(ABC): |
| """Base class for all data providers""" |
|
|
| def __init__(self, name: str, base_url: str, timeout: int = 10): |
| self.name = name |
| self.base_url = base_url |
| self.timeout = timeout |
| self.circuit_breaker = CircuitBreaker() |
| self.last_latency: Optional[int] = None |
| self.last_check: Optional[datetime] = None |
| self.last_error: Optional[str] = None |
| self.client: Optional[httpx.AsyncClient] = None |
|
|
| async def get_client(self) -> httpx.AsyncClient: |
| """Get or create HTTP client""" |
| if self.client is None: |
| self.client = httpx.AsyncClient(timeout=self.timeout) |
| return self.client |
|
|
| async def close(self): |
| """Close HTTP client""" |
| if self.client: |
| await self.client.aclose() |
| self.client = None |
|
|
| async def _make_request(self, url: str, params: Optional[dict] = None) -> dict: |
| """Make HTTP request with timing and error handling""" |
| if not self.circuit_breaker.can_attempt(): |
| raise Exception(f"Circuit breaker open for {self.name}") |
|
|
| client = await self.get_client() |
| start_time = time.time() |
|
|
| try: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
|
|
| self.last_latency = int((time.time() - start_time) * 1000) |
| self.last_check = datetime.now() |
| self.last_error = None |
| self.circuit_breaker.record_success() |
|
|
| return response.json() |
|
|
| except Exception as e: |
| self.last_error = str(e) |
| self.circuit_breaker.record_failure() |
| raise |
|
|
| @abstractmethod |
| async def fetch_ohlcv(self, symbol: str, interval: str, limit: int) -> List[OHLCV]: |
| """Fetch OHLCV data""" |
| pass |
|
|
| @abstractmethod |
| async def fetch_prices(self, symbols: List[str]) -> List[Price]: |
| """Fetch current prices""" |
| pass |
|
|
| async def get_health(self) -> ProviderHealth: |
| """Get provider health status""" |
| if self.circuit_breaker.is_open: |
| status = "offline" |
| elif self.last_error: |
| status = "degraded" |
| else: |
| status = "online" |
|
|
| return ProviderHealth( |
| name=self.name, |
| status=status, |
| latency=self.last_latency, |
| lastCheck=self.last_check.isoformat() if self.last_check else datetime.now().isoformat(), |
| errorMessage=self.last_error |
| ) |
|
|