| """
|
| Enhanced Logging System
|
| Provides structured logging with provider health tracking and error classification
|
| """
|
|
|
| import logging
|
| import sys
|
| from datetime import datetime
|
| from typing import Optional, Dict, Any
|
| from pathlib import Path
|
| import json
|
|
|
|
|
| class ProviderHealthLogger:
|
| """Enhanced logger with provider health tracking"""
|
|
|
| def __init__(self, name: str = "crypto_monitor"):
|
| self.logger = logging.getLogger(name)
|
| self.health_log_path = Path("data/logs/provider_health.jsonl")
|
| self.error_log_path = Path("data/logs/errors.jsonl")
|
|
|
|
|
| self.health_log_path.parent.mkdir(parents=True, exist_ok=True)
|
| self.error_log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| if not self.logger.handlers:
|
| self._setup_handlers()
|
|
|
| def _setup_handlers(self):
|
| """Set up logging handlers"""
|
| self.logger.setLevel(logging.DEBUG)
|
|
|
|
|
| console_handler = logging.StreamHandler(sys.stdout)
|
| console_handler.setLevel(logging.INFO)
|
|
|
|
|
| console_formatter = ColoredFormatter(
|
| '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
|
| datefmt='%Y-%m-%d %H:%M:%S'
|
| )
|
| console_handler.setFormatter(console_formatter)
|
|
|
|
|
| file_handler = logging.FileHandler('data/logs/app.log')
|
| file_handler.setLevel(logging.DEBUG)
|
| file_formatter = logging.Formatter(
|
| '%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s',
|
| datefmt='%Y-%m-%d %H:%M:%S'
|
| )
|
| file_handler.setFormatter(file_formatter)
|
|
|
|
|
| error_handler = logging.FileHandler('data/logs/errors.log')
|
| error_handler.setLevel(logging.ERROR)
|
| error_handler.setFormatter(file_formatter)
|
|
|
|
|
| self.logger.addHandler(console_handler)
|
| self.logger.addHandler(file_handler)
|
| self.logger.addHandler(error_handler)
|
|
|
| def log_provider_request(
|
| self,
|
| provider_name: str,
|
| endpoint: str,
|
| status: str,
|
| response_time_ms: Optional[float] = None,
|
| status_code: Optional[int] = None,
|
| error_message: Optional[str] = None,
|
| used_proxy: bool = False
|
| ):
|
| """Log a provider API request with full context"""
|
|
|
| log_entry = {
|
| "timestamp": datetime.now().isoformat(),
|
| "provider": provider_name,
|
| "endpoint": endpoint,
|
| "status": status,
|
| "response_time_ms": response_time_ms,
|
| "status_code": status_code,
|
| "error_message": error_message,
|
| "used_proxy": used_proxy
|
| }
|
|
|
|
|
| if status == "success":
|
| self.logger.info(
|
| f"✓ {provider_name} | {endpoint} | {response_time_ms:.0f}ms | HTTP {status_code}"
|
| )
|
| elif status == "error":
|
| self.logger.error(
|
| f"✗ {provider_name} | {endpoint} | {error_message}"
|
| )
|
| elif status == "timeout":
|
| self.logger.warning(
|
| f"⏱ {provider_name} | {endpoint} | Timeout"
|
| )
|
| elif status == "proxy_fallback":
|
| self.logger.info(
|
| f"🌐 {provider_name} | {endpoint} | Switched to proxy"
|
| )
|
|
|
|
|
| try:
|
| with open(self.health_log_path, 'a', encoding='utf-8') as f:
|
| f.write(json.dumps(log_entry) + '\n')
|
| except Exception as e:
|
| self.logger.error(f"Failed to write health log: {e}")
|
|
|
| def log_error(
|
| self,
|
| error_type: str,
|
| message: str,
|
| provider: Optional[str] = None,
|
| endpoint: Optional[str] = None,
|
| traceback: Optional[str] = None,
|
| **extra
|
| ):
|
| """Log an error with classification"""
|
|
|
| error_entry = {
|
| "timestamp": datetime.now().isoformat(),
|
| "error_type": error_type,
|
| "message": message,
|
| "provider": provider,
|
| "endpoint": endpoint,
|
| "traceback": traceback,
|
| **extra
|
| }
|
|
|
|
|
| self.logger.error(f"[{error_type}] {message}")
|
|
|
| if traceback:
|
| self.logger.debug(f"Traceback: {traceback}")
|
|
|
|
|
| try:
|
| with open(self.error_log_path, 'a', encoding='utf-8') as f:
|
| f.write(json.dumps(error_entry) + '\n')
|
| except Exception as e:
|
| self.logger.error(f"Failed to write error log: {e}")
|
|
|
| def log_proxy_switch(self, provider: str, reason: str):
|
| """Log when a provider switches to proxy mode"""
|
| self.logger.info(f"🌐 Proxy activated for {provider}: {reason}")
|
|
|
| def log_feature_flag_change(self, flag_name: str, old_value: bool, new_value: bool):
|
| """Log feature flag changes"""
|
| self.logger.info(f"⚙️ Feature flag '{flag_name}' changed: {old_value} → {new_value}")
|
|
|
| def log_health_check(self, provider: str, status: str, details: Optional[Dict] = None):
|
| """Log provider health check results"""
|
| if status == "online":
|
| self.logger.info(f"✓ Health check passed: {provider}")
|
| elif status == "degraded":
|
| self.logger.warning(f"⚠ Health check degraded: {provider}")
|
| else:
|
| self.logger.error(f"✗ Health check failed: {provider}")
|
|
|
| if details:
|
| self.logger.debug(f"Health details for {provider}: {details}")
|
|
|
| def get_recent_errors(self, limit: int = 100) -> list:
|
| """Read recent errors from log file"""
|
| errors = []
|
| try:
|
| if self.error_log_path.exists():
|
| with open(self.error_log_path, 'r', encoding='utf-8') as f:
|
| lines = f.readlines()
|
| for line in lines[-limit:]:
|
| try:
|
| errors.append(json.loads(line))
|
| except json.JSONDecodeError:
|
| continue
|
| except Exception as e:
|
| self.logger.error(f"Failed to read error log: {e}")
|
|
|
| return errors
|
|
|
| def get_provider_stats(self, provider: str, hours: int = 24) -> Dict[str, Any]:
|
| """Get statistics for a specific provider from logs"""
|
| from datetime import timedelta
|
|
|
| stats = {
|
| "total_requests": 0,
|
| "successful_requests": 0,
|
| "failed_requests": 0,
|
| "avg_response_time": 0,
|
| "proxy_requests": 0,
|
| "errors": []
|
| }
|
|
|
| try:
|
| if self.health_log_path.exists():
|
| cutoff_time = datetime.now() - timedelta(hours=hours)
|
| response_times = []
|
|
|
| with open(self.health_log_path, 'r', encoding='utf-8') as f:
|
| for line in f:
|
| try:
|
| entry = json.loads(line)
|
| entry_time = datetime.fromisoformat(entry["timestamp"])
|
|
|
| if entry_time < cutoff_time:
|
| continue
|
|
|
| if entry.get("provider") != provider:
|
| continue
|
|
|
| stats["total_requests"] += 1
|
|
|
| if entry.get("status") == "success":
|
| stats["successful_requests"] += 1
|
| if entry.get("response_time_ms"):
|
| response_times.append(entry["response_time_ms"])
|
| else:
|
| stats["failed_requests"] += 1
|
| if entry.get("error_message"):
|
| stats["errors"].append({
|
| "timestamp": entry["timestamp"],
|
| "message": entry["error_message"]
|
| })
|
|
|
| if entry.get("used_proxy"):
|
| stats["proxy_requests"] += 1
|
|
|
| except (json.JSONDecodeError, KeyError):
|
| continue
|
|
|
| if response_times:
|
| stats["avg_response_time"] = sum(response_times) / len(response_times)
|
|
|
| except Exception as e:
|
| self.logger.error(f"Failed to get provider stats: {e}")
|
|
|
| return stats
|
|
|
|
|
| class ColoredFormatter(logging.Formatter):
|
| """Custom formatter with colors for terminal output"""
|
|
|
| COLORS = {
|
| 'DEBUG': '\033[36m',
|
| 'INFO': '\033[32m',
|
| 'WARNING': '\033[33m',
|
| 'ERROR': '\033[31m',
|
| 'CRITICAL': '\033[35m',
|
| 'RESET': '\033[0m'
|
| }
|
|
|
| def format(self, record):
|
|
|
| if record.levelname in self.COLORS:
|
| record.levelname = (
|
| f"{self.COLORS[record.levelname]}"
|
| f"{record.levelname}"
|
| f"{self.COLORS['RESET']}"
|
| )
|
|
|
| return super().format(record)
|
|
|
|
|
|
|
| provider_health_logger = ProviderHealthLogger()
|
|
|
|
|
|
|
| def log_request(provider: str, endpoint: str, **kwargs):
|
| """Log a provider request"""
|
| provider_health_logger.log_provider_request(provider, endpoint, **kwargs)
|
|
|
|
|
| def log_error(error_type: str, message: str, **kwargs):
|
| """Log an error"""
|
| provider_health_logger.log_error(error_type, message, **kwargs)
|
|
|
|
|
| def log_proxy_switch(provider: str, reason: str):
|
| """Log proxy switch"""
|
| provider_health_logger.log_proxy_switch(provider, reason)
|
|
|
|
|
| def get_provider_stats(provider: str, hours: int = 24):
|
| """Get provider statistics"""
|
| return provider_health_logger.get_provider_stats(provider, hours)
|
|
|