| """ |
| Real-time API Health Monitoring Module |
| Implements comprehensive health checks with rate limiting, failure tracking, and database persistence |
| """ |
|
|
| import asyncio |
| import time |
| from typing import Dict, List, Optional, Tuple |
| from datetime import datetime |
| from collections import defaultdict |
|
|
| |
| from utils.api_client import APIClient |
| from config import config |
| from monitoring.rate_limiter import rate_limiter |
| from utils.logger import setup_logger, log_api_request, log_error |
| from monitor import HealthCheckResult, HealthStatus |
| from database import Database |
|
|
| |
| logger = setup_logger("health_checker") |
|
|
|
|
| class HealthChecker: |
| """ |
| Real-time API health monitoring with rate limiting and failure tracking |
| """ |
|
|
| def __init__(self, db_path: str = "data/health_metrics.db"): |
| """ |
| Initialize health checker |
| |
| Args: |
| db_path: Path to SQLite database |
| """ |
| self.api_client = APIClient( |
| default_timeout=10, |
| max_connections=50, |
| retry_attempts=1, |
| retry_delay=1.0 |
| ) |
| self.db = Database(db_path) |
| self.consecutive_failures: Dict[str, int] = defaultdict(int) |
|
|
| |
| self._initialize_rate_limiters() |
|
|
| logger.info("HealthChecker initialized") |
|
|
| def _initialize_rate_limiters(self): |
| """Configure rate limiters for all providers""" |
| for provider in config.get_all_providers(): |
| if provider.rate_limit_type and provider.rate_limit_value: |
| rate_limiter.configure_limit( |
| provider=provider.name, |
| limit_type=provider.rate_limit_type, |
| limit_value=provider.rate_limit_value |
| ) |
| logger.info( |
| f"Configured rate limit for {provider.name}: " |
| f"{provider.rate_limit_value} {provider.rate_limit_type}" |
| ) |
|
|
| async def check_provider(self, provider_name: str) -> Optional[HealthCheckResult]: |
| """ |
| Check single provider health |
| |
| Args: |
| provider_name: Name of the provider to check |
| |
| Returns: |
| HealthCheckResult object or None if provider not found |
| """ |
| provider = config.get_provider(provider_name) |
| if not provider: |
| logger.error(f"Provider not found: {provider_name}") |
| return None |
|
|
| |
| can_proceed, reason = rate_limiter.can_make_request(provider.name) |
| if not can_proceed: |
| logger.warning(f"Rate limit blocked request to {provider.name}: {reason}") |
|
|
| |
| result = HealthCheckResult( |
| provider_name=provider.name, |
| category=provider.category, |
| status=HealthStatus.DEGRADED, |
| response_time=0, |
| status_code=None, |
| error_message=f"Rate limited: {reason}", |
| timestamp=time.time(), |
| endpoint_tested=provider.health_check_endpoint |
| ) |
|
|
| |
| self.db.save_health_check(result) |
| return result |
|
|
| |
| result = await self._perform_health_check(provider) |
|
|
| |
| rate_limiter.record_request(provider.name) |
|
|
| |
| if result.status == HealthStatus.OFFLINE: |
| self.consecutive_failures[provider.name] += 1 |
| logger.warning( |
| f"{provider.name} offline - consecutive failures: " |
| f"{self.consecutive_failures[provider.name]}" |
| ) |
| else: |
| self.consecutive_failures[provider.name] = 0 |
|
|
| |
| if self.consecutive_failures[provider.name] >= 3: |
| result = HealthCheckResult( |
| provider_name=result.provider_name, |
| category=result.category, |
| status=HealthStatus.OFFLINE, |
| response_time=result.response_time, |
| status_code=result.status_code, |
| error_message=f"3+ consecutive failures (count: {self.consecutive_failures[provider.name]})", |
| timestamp=result.timestamp, |
| endpoint_tested=result.endpoint_tested |
| ) |
|
|
| |
| self.db.save_health_check(result) |
|
|
| |
| log_api_request( |
| logger=logger, |
| provider=provider.name, |
| endpoint=provider.health_check_endpoint, |
| duration_ms=result.response_time, |
| status=result.status.value, |
| http_code=result.status_code, |
| level="INFO" if result.status == HealthStatus.ONLINE else "WARNING" |
| ) |
|
|
| return result |
|
|
| async def check_all_providers(self) -> List[HealthCheckResult]: |
| """ |
| Check all configured providers |
| |
| Returns: |
| List of HealthCheckResult objects |
| """ |
| providers = config.get_all_providers() |
| logger.info(f"Starting health check for {len(providers)} providers") |
|
|
| |
| tasks = [] |
| for i, provider in enumerate(providers): |
| |
| await asyncio.sleep(0.1) |
| task = asyncio.create_task(self.check_provider(provider.name)) |
| tasks.append(task) |
|
|
| |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| |
| valid_results = [] |
| for i, result in enumerate(results): |
| if isinstance(result, HealthCheckResult): |
| valid_results.append(result) |
| elif isinstance(result, Exception): |
| logger.error(f"Health check failed with exception: {result}", exc_info=True) |
| |
| provider = providers[i] |
| failed_result = HealthCheckResult( |
| provider_name=provider.name, |
| category=provider.category, |
| status=HealthStatus.OFFLINE, |
| response_time=0, |
| status_code=None, |
| error_message=f"Exception: {str(result)[:200]}", |
| timestamp=time.time(), |
| endpoint_tested=provider.health_check_endpoint |
| ) |
| self.db.save_health_check(failed_result) |
| valid_results.append(failed_result) |
| elif result is None: |
| |
| continue |
|
|
| logger.info(f"Completed health check: {len(valid_results)} results") |
|
|
| |
| self._log_summary_stats(valid_results) |
|
|
| return valid_results |
|
|
| async def check_category(self, category: str) -> List[HealthCheckResult]: |
| """ |
| Check providers in a specific category |
| |
| Args: |
| category: Category name (e.g., 'market_data', 'blockchain_explorers') |
| |
| Returns: |
| List of HealthCheckResult objects |
| """ |
| providers = config.get_providers_by_category(category) |
|
|
| if not providers: |
| logger.warning(f"No providers found for category: {category}") |
| return [] |
|
|
| logger.info(f"Starting health check for category '{category}': {len(providers)} providers") |
|
|
| |
| tasks = [] |
| for i, provider in enumerate(providers): |
| |
| await asyncio.sleep(0.1) |
| task = asyncio.create_task(self.check_provider(provider.name)) |
| tasks.append(task) |
|
|
| |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| |
| valid_results = [] |
| for result in results: |
| if isinstance(result, HealthCheckResult): |
| valid_results.append(result) |
| elif isinstance(result, Exception): |
| logger.error(f"Category check failed with exception: {result}", exc_info=True) |
|
|
| logger.info(f"Completed category '{category}' check: {len(valid_results)} results") |
|
|
| return valid_results |
|
|
| async def _perform_health_check(self, provider) -> HealthCheckResult: |
| """ |
| Perform the actual health check HTTP request |
| |
| Args: |
| provider: ProviderConfig object |
| |
| Returns: |
| HealthCheckResult object |
| """ |
| endpoint = provider.health_check_endpoint |
|
|
| |
| headers = {} |
| params = {} |
|
|
| |
| if provider.requires_key and provider.api_key: |
| if 'coinmarketcap' in provider.name.lower(): |
| headers['X-CMC_PRO_API_KEY'] = provider.api_key |
| elif 'cryptocompare' in provider.name.lower(): |
| headers['authorization'] = f'Apikey {provider.api_key}' |
| elif 'newsapi' in provider.name.lower() or 'newsdata' in endpoint.lower(): |
| params['apikey'] = provider.api_key |
| elif 'etherscan' in provider.name.lower() or 'bscscan' in provider.name.lower(): |
| params['apikey'] = provider.api_key |
| elif 'tronscan' in provider.name.lower(): |
| headers['TRON-PRO-API-KEY'] = provider.api_key |
| else: |
| |
| params['apikey'] = provider.api_key |
|
|
| |
| timeout = (provider.timeout_ms or 10000) / 1000.0 |
|
|
| |
| start_time = time.time() |
| response = await self.api_client.request( |
| method='GET', |
| url=endpoint, |
| headers=headers if headers else None, |
| params=params if params else None, |
| timeout=int(timeout), |
| retry=False |
| ) |
|
|
| |
| success = response.get('success', False) |
| status_code = response.get('status_code', 0) |
| response_time_ms = response.get('response_time_ms', 0) |
| error_type = response.get('error_type') |
| error_message = response.get('error_message') |
|
|
| |
| status = self._determine_health_status( |
| success=success, |
| status_code=status_code, |
| response_time_ms=response_time_ms, |
| error_type=error_type |
| ) |
|
|
| |
| final_error_message = None |
| if not success: |
| if error_message: |
| final_error_message = error_message |
| elif error_type: |
| final_error_message = f"{error_type}: HTTP {status_code}" if status_code else error_type |
| else: |
| final_error_message = f"Request failed with status {status_code}" |
|
|
| |
| result = HealthCheckResult( |
| provider_name=provider.name, |
| category=provider.category, |
| status=status, |
| response_time=response_time_ms, |
| status_code=status_code if status_code > 0 else None, |
| error_message=final_error_message, |
| timestamp=time.time(), |
| endpoint_tested=endpoint |
| ) |
|
|
| return result |
|
|
| def _determine_health_status( |
| self, |
| success: bool, |
| status_code: int, |
| response_time_ms: float, |
| error_type: Optional[str] |
| ) -> HealthStatus: |
| """ |
| Determine health status based on response metrics |
| |
| Rules: |
| - ONLINE: status 200, response < 2000ms |
| - DEGRADED: response 2000-5000ms OR status 4xx/5xx |
| - OFFLINE: timeout OR status 0 (network error) |
| |
| Args: |
| success: Whether request was successful |
| status_code: HTTP status code |
| response_time_ms: Response time in milliseconds |
| error_type: Type of error if any |
| |
| Returns: |
| HealthStatus enum value |
| """ |
| |
| if error_type == 'timeout': |
| return HealthStatus.OFFLINE |
|
|
| if status_code == 0: |
| return HealthStatus.OFFLINE |
|
|
| |
| if status_code >= 400: |
| return HealthStatus.DEGRADED |
|
|
| if response_time_ms >= 2000 and response_time_ms < 5000: |
| return HealthStatus.DEGRADED |
|
|
| if response_time_ms >= 5000: |
| return HealthStatus.OFFLINE |
|
|
| |
| if status_code == 200 and response_time_ms < 2000: |
| return HealthStatus.ONLINE |
|
|
| |
| if success and 200 <= status_code < 300 and response_time_ms < 2000: |
| return HealthStatus.ONLINE |
|
|
| |
| return HealthStatus.DEGRADED |
|
|
| def _log_summary_stats(self, results: List[HealthCheckResult]): |
| """ |
| Log summary statistics for health check results |
| |
| Args: |
| results: List of HealthCheckResult objects |
| """ |
| if not results: |
| return |
|
|
| total = len(results) |
| online = sum(1 for r in results if r.status == HealthStatus.ONLINE) |
| degraded = sum(1 for r in results if r.status == HealthStatus.DEGRADED) |
| offline = sum(1 for r in results if r.status == HealthStatus.OFFLINE) |
|
|
| avg_response_time = sum(r.response_time for r in results) / total if total > 0 else 0 |
|
|
| logger.info( |
| f"Health Check Summary - Total: {total}, " |
| f"Online: {online} ({online/total*100:.1f}%), " |
| f"Degraded: {degraded} ({degraded/total*100:.1f}%), " |
| f"Offline: {offline} ({offline/total*100:.1f}%), " |
| f"Avg Response Time: {avg_response_time:.2f}ms" |
| ) |
|
|
| def get_consecutive_failures(self, provider_name: str) -> int: |
| """ |
| Get consecutive failure count for a provider |
| |
| Args: |
| provider_name: Provider name |
| |
| Returns: |
| Number of consecutive failures |
| """ |
| return self.consecutive_failures.get(provider_name, 0) |
|
|
| def reset_consecutive_failures(self, provider_name: str): |
| """ |
| Reset consecutive failure count for a provider |
| |
| Args: |
| provider_name: Provider name |
| """ |
| if provider_name in self.consecutive_failures: |
| self.consecutive_failures[provider_name] = 0 |
| logger.info(f"Reset consecutive failures for {provider_name}") |
|
|
| def get_all_consecutive_failures(self) -> Dict[str, int]: |
| """ |
| Get all consecutive failure counts |
| |
| Returns: |
| Dictionary mapping provider names to failure counts |
| """ |
| return dict(self.consecutive_failures) |
|
|
| async def close(self): |
| """Close resources""" |
| await self.api_client.close() |
| logger.info("HealthChecker closed") |
|
|
|
|
| |
| def check_provider_sync(provider_name: str) -> Optional[HealthCheckResult]: |
| """ |
| Synchronous wrapper for checking a single provider |
| |
| Args: |
| provider_name: Provider name |
| |
| Returns: |
| HealthCheckResult object or None |
| """ |
| checker = HealthChecker() |
| result = asyncio.run(checker.check_provider(provider_name)) |
| asyncio.run(checker.close()) |
| return result |
|
|
|
|
| def check_all_providers_sync() -> List[HealthCheckResult]: |
| """ |
| Synchronous wrapper for checking all providers |
| |
| Returns: |
| List of HealthCheckResult objects |
| """ |
| checker = HealthChecker() |
| results = asyncio.run(checker.check_all_providers()) |
| asyncio.run(checker.close()) |
| return results |
|
|
|
|
| def check_category_sync(category: str) -> List[HealthCheckResult]: |
| """ |
| Synchronous wrapper for checking a category |
| |
| Args: |
| category: Category name |
| |
| Returns: |
| List of HealthCheckResult objects |
| """ |
| checker = HealthChecker() |
| results = asyncio.run(checker.check_category(category)) |
| asyncio.run(checker.close()) |
| return results |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def main(): |
| """Example usage of HealthChecker""" |
| checker = HealthChecker() |
|
|
| |
| print("\n=== Checking single provider: CoinGecko ===") |
| result = await checker.check_provider('CoinGecko') |
| if result: |
| print(f"Status: {result.status.value}") |
| print(f"Response Time: {result.response_time:.2f}ms") |
| print(f"HTTP Code: {result.status_code}") |
| print(f"Error: {result.error_message}") |
|
|
| |
| print("\n=== Checking all providers ===") |
| results = await checker.check_all_providers() |
| for r in results: |
| print(f"{r.provider_name}: {r.status.value} ({r.response_time:.2f}ms)") |
|
|
| |
| print("\n=== Checking market_data category ===") |
| market_results = await checker.check_category('market_data') |
| for r in market_results: |
| print(f"{r.provider_name}: {r.status.value} ({r.response_time:.2f}ms)") |
|
|
| await checker.close() |
|
|
| asyncio.run(main()) |
|
|