| """ |
| Health Monitoring System for API Providers |
| """ |
|
|
| import asyncio |
| from datetime import datetime |
| from sqlalchemy.orm import Session |
| from database.db import get_db |
| from database.models import Provider, ConnectionAttempt, StatusEnum, ProviderStatusEnum |
| from utils.http_client import APIClient |
| from config import config |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class HealthMonitor: |
| def __init__(self): |
| self.running = False |
|
|
| async def start(self): |
| """Start health monitoring loop""" |
| self.running = True |
| logger.info("Health monitoring started") |
|
|
| while self.running: |
| try: |
| await self.check_all_providers() |
| await asyncio.sleep(config.HEALTH_CHECK_INTERVAL) |
| except Exception as e: |
| logger.error(f"Health monitoring error: {e}") |
| await asyncio.sleep(10) |
|
|
| async def check_all_providers(self): |
| """Check health of all providers""" |
| with get_db() as db: |
| providers = db.query(Provider).filter(Provider.priority_tier <= 2).all() |
|
|
| async with APIClient() as client: |
| tasks = [self.check_provider(client, provider, db) for provider in providers] |
| await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| async def check_provider(self, client: APIClient, provider: Provider, db: Session): |
| """Check health of a single provider""" |
| try: |
| |
| endpoint = self.get_health_endpoint(provider) |
| headers = self.get_headers(provider) |
|
|
| |
| result = await client.get(endpoint, headers=headers) |
|
|
| |
| status = StatusEnum.SUCCESS if result["success"] and result["status_code"] == 200 else StatusEnum.FAILED |
|
|
| |
| attempt = ConnectionAttempt( |
| provider_id=provider.id, |
| timestamp=datetime.utcnow(), |
| endpoint=endpoint, |
| status=status, |
| response_time_ms=result["response_time_ms"], |
| http_status_code=result["status_code"], |
| error_type=result["error"]["type"] if result["error"] else None, |
| error_message=result["error"]["message"] if result["error"] else None, |
| retry_count=0 |
| ) |
| db.add(attempt) |
|
|
| |
| provider.last_response_time_ms = result["response_time_ms"] |
| provider.last_check_at = datetime.utcnow() |
|
|
| |
| recent_attempts = db.query(ConnectionAttempt).filter( |
| ConnectionAttempt.provider_id == provider.id |
| ).order_by(ConnectionAttempt.timestamp.desc()).limit(5).all() |
|
|
| success_count = sum(1 for a in recent_attempts if a.status == StatusEnum.SUCCESS) |
|
|
| if success_count == 5: |
| provider.status = ProviderStatusEnum.ONLINE |
| elif success_count >= 3: |
| provider.status = ProviderStatusEnum.DEGRADED |
| else: |
| provider.status = ProviderStatusEnum.OFFLINE |
|
|
| db.commit() |
|
|
| logger.info(f"Health check for {provider.name}: {status.value} ({result['response_time_ms']}ms)") |
|
|
| except Exception as e: |
| logger.error(f"Health check failed for {provider.name}: {e}") |
|
|
| def get_health_endpoint(self, provider: Provider) -> str: |
| """Get health check endpoint for provider""" |
| endpoints = { |
| "CoinGecko": f"{provider.endpoint_url}/ping", |
| "CoinMarketCap": f"{provider.endpoint_url}/cryptocurrency/map?limit=1", |
| "Etherscan": f"{provider.endpoint_url}?module=stats&action=ethsupply&apikey={config.API_KEYS['etherscan'][0] if config.API_KEYS['etherscan'] else ''}", |
| "BscScan": f"{provider.endpoint_url}?module=stats&action=bnbsupply&apikey={config.API_KEYS['bscscan'][0] if config.API_KEYS['bscscan'] else ''}", |
| "TronScan": f"{provider.endpoint_url}/system/status", |
| "CryptoPanic": f"{provider.endpoint_url}/posts/?auth_token=free&public=true", |
| "Alternative.me": f"{provider.endpoint_url}/fng/", |
| "CryptoCompare": f"{provider.endpoint_url}/price?fsym=BTC&tsyms=USD", |
| "Binance": f"{provider.endpoint_url}/ping", |
| "NewsAPI": f"{provider.endpoint_url}/news?language=en&category=technology", |
| "The Graph": "https://api.thegraph.com/index-node/graphql", |
| "Blockchair": f"{provider.endpoint_url}/bitcoin/stats" |
| } |
|
|
| return endpoints.get(provider.name, provider.endpoint_url) |
|
|
| def get_headers(self, provider: Provider) -> dict: |
| """Get headers for provider""" |
| headers = {"User-Agent": "CryptoMonitor/1.0"} |
|
|
| if provider.name == "CoinMarketCap" and config.API_KEYS["coinmarketcap"]: |
| headers["X-CMC_PRO_API_KEY"] = config.API_KEYS["coinmarketcap"][0] |
| elif provider.name == "TronScan" and config.API_KEYS["tronscan"]: |
| headers["TRON-PRO-API-KEY"] = config.API_KEYS["tronscan"][0] |
| elif provider.name == "CryptoCompare" and config.API_KEYS["cryptocompare"]: |
| headers["authorization"] = f"Apikey {config.API_KEYS['cryptocompare'][0]}" |
| elif provider.name == "NewsAPI" and config.API_KEYS["newsapi"]: |
| headers["X-ACCESS-KEY"] = config.API_KEYS["newsapi"][0] |
|
|
| return headers |
|
|
| def stop(self): |
| """Stop health monitoring""" |
| self.running = False |
| logger.info("Health monitoring stopped") |
|
|
|
|
| |
| health_monitor = HealthMonitor() |
|
|