""" Logger Module Centralized logging system for the virtual ISP stack: - Structured logging with multiple levels - Log aggregation and filtering - Real-time log streaming - Log persistence and rotation """ import logging import logging.handlers import time import threading import json import os from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, asdict from enum import Enum from collections import deque import queue class LogLevel(Enum): DEBUG = "DEBUG" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" CRITICAL = "CRITICAL" class LogCategory(Enum): SYSTEM = "SYSTEM" DHCP = "DHCP" NAT = "NAT" FIREWALL = "FIREWALL" TCP = "TCP" ROUTER = "ROUTER" BRIDGE = "BRIDGE" SOCKET = "SOCKET" SESSION = "SESSION" SECURITY = "SECURITY" PERFORMANCE = "PERFORMANCE" @dataclass class LogEntry: """Structured log entry""" timestamp: float level: str category: str module: str message: str session_id: Optional[str] = None client_id: Optional[str] = None source_ip: Optional[str] = None dest_ip: Optional[str] = None protocol: Optional[str] = None metadata: Dict[str, Any] = None def __post_init__(self): if self.timestamp == 0: self.timestamp = time.time() if self.metadata is None: self.metadata = {} def to_dict(self) -> Dict: """Convert to dictionary""" return asdict(self) def to_json(self) -> str: """Convert to JSON string""" return json.dumps(self.to_dict(), default=str) class LogFilter: """Log filtering class""" def __init__(self): self.level_filter: Optional[LogLevel] = None self.category_filter: Optional[LogCategory] = None self.module_filter: Optional[str] = None self.session_filter: Optional[str] = None self.client_filter: Optional[str] = None self.ip_filter: Optional[str] = None self.text_filter: Optional[str] = None self.time_range: Optional[tuple] = None def matches(self, entry: LogEntry) -> bool: """Check if log entry matches filter criteria""" # Level filter if self.level_filter: entry_level_value = getattr(logging, entry.level) filter_level_value = getattr(logging, self.level_filter.value) if entry_level_value < filter_level_value: return False # Category filter if self.category_filter and entry.category != self.category_filter.value: return False # Module filter if self.module_filter and self.module_filter.lower() not in entry.module.lower(): return False # Session filter if self.session_filter and entry.session_id != self.session_filter: return False # Client filter if self.client_filter and entry.client_id != self.client_filter: return False # IP filter if self.ip_filter: if (entry.source_ip != self.ip_filter and entry.dest_ip != self.ip_filter): return False # Text filter if self.text_filter and self.text_filter.lower() not in entry.message.lower(): return False # Time range filter if self.time_range: start_time, end_time = self.time_range if not (start_time <= entry.timestamp <= end_time): return False return True class LogSubscriber: """Log subscriber for real-time streaming""" def __init__(self, subscriber_id: str, callback: Callable[[LogEntry], None], log_filter: Optional[LogFilter] = None): self.subscriber_id = subscriber_id self.callback = callback self.filter = log_filter or LogFilter() self.created_time = time.time() self.message_count = 0 self.last_message_time = None self.is_active = True def send_log(self, entry: LogEntry) -> bool: """Send log entry to subscriber if it matches filter""" if not self.is_active: return False if self.filter.matches(entry): try: self.callback(entry) self.message_count += 1 self.last_message_time = time.time() return True except Exception as e: print(f"Error sending log to subscriber {self.subscriber_id}: {e}") self.is_active = False return False return False class VirtualISPLogger: """Centralized logger for Virtual ISP stack""" def __init__(self, config: Dict): self.config = config self.log_entries: deque = deque(maxlen=config.get('max_memory_logs', 10000)) self.subscribers: Dict[str, LogSubscriber] = {} self.lock = threading.Lock() # Configuration self.log_level = LogLevel(config.get('log_level', 'INFO')) self.log_to_file = config.get('log_to_file', True) self.log_file_path = config.get('log_file_path', '/tmp/virtual_isp.log') self.log_file_max_size = config.get('log_file_max_size', 10 * 1024 * 1024) # 10MB self.log_file_backup_count = config.get('log_file_backup_count', 5) self.log_to_console = config.get('log_to_console', True) self.structured_logging = config.get('structured_logging', True) # Statistics self.stats = { 'total_logs': 0, 'logs_by_level': {level.value: 0 for level in LogLevel}, 'logs_by_category': {cat.value: 0 for cat in LogCategory}, 'active_subscribers': 0, 'file_logs_written': 0, 'console_logs_written': 0, 'dropped_logs': 0 } # Setup logging self._setup_logging() # Background processing self.running = False self.log_queue = queue.Queue() self.processing_thread = None def _setup_logging(self): """Setup Python logging infrastructure""" # Create logger self.logger = logging.getLogger('virtual_isp') self.logger.setLevel(getattr(logging, self.log_level.value)) # Remove existing handlers for handler in self.logger.handlers[:]: self.logger.removeHandler(handler) # Console handler if self.log_to_console: console_handler = logging.StreamHandler() if self.structured_logging: console_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) else: console_formatter = logging.Formatter('%(message)s') console_handler.setFormatter(console_formatter) self.logger.addHandler(console_handler) # File handler with rotation if self.log_to_file: # Ensure log directory exists log_dir = os.path.dirname(self.log_file_path) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) file_handler = logging.handlers.RotatingFileHandler( self.log_file_path, maxBytes=self.log_file_max_size, backupCount=self.log_file_backup_count ) if self.structured_logging: file_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) else: file_formatter = logging.Formatter('%(message)s') file_handler.setFormatter(file_formatter) self.logger.addHandler(file_handler) def _process_log_queue(self): """Background thread to process log queue""" while self.running: try: # Get log entry from queue (with timeout) try: entry = self.log_queue.get(timeout=1.0) except queue.Empty: continue # Store in memory with self.lock: self.log_entries.append(entry) # Send to subscribers inactive_subscribers = [] with self.lock: for subscriber_id, subscriber in self.subscribers.items(): if not subscriber.send_log(entry): inactive_subscribers.append(subscriber_id) # Remove inactive subscribers for subscriber_id in inactive_subscribers: self.remove_subscriber(subscriber_id) # Update statistics self.stats['total_logs'] += 1 self.stats['logs_by_level'][entry.level] += 1 self.stats['logs_by_category'][entry.category] += 1 # Mark task as done self.log_queue.task_done() except Exception as e: print(f"Error processing log queue: {e}") time.sleep(1) def log(self, level: LogLevel, category: LogCategory, module: str, message: str, session_id: Optional[str] = None, client_id: Optional[str] = None, source_ip: Optional[str] = None, dest_ip: Optional[str] = None, protocol: Optional[str] = None, **metadata): """Log a message""" # Check if we should log this level level_value = getattr(logging, level.value) min_level_value = getattr(logging, self.log_level.value) if level_value < min_level_value: return # Create log entry entry = LogEntry( timestamp=time.time(), level=level.value, category=category.value, module=module, message=message, session_id=session_id, client_id=client_id, source_ip=source_ip, dest_ip=dest_ip, protocol=protocol, metadata=metadata ) # Add to queue for background processing try: self.log_queue.put_nowait(entry) except queue.Full: self.stats['dropped_logs'] += 1 # Also log through Python logging system if self.structured_logging: log_data = entry.to_dict() log_message = f"{message} | {json.dumps(log_data, default=str)}" else: log_message = message # Log to Python logger python_logger_level = getattr(logging, level.value) self.logger.log(python_logger_level, log_message) # Update console/file stats if self.log_to_console: self.stats['console_logs_written'] += 1 if self.log_to_file: self.stats['file_logs_written'] += 1 def debug(self, category: LogCategory, module: str, message: str, **kwargs): """Log debug message""" self.log(LogLevel.DEBUG, category, module, message, **kwargs) def info(self, category: LogCategory, module: str, message: str, **kwargs): """Log info message""" self.log(LogLevel.INFO, category, module, message, **kwargs) def warning(self, category: LogCategory, module: str, message: str, **kwargs): """Log warning message""" self.log(LogLevel.WARNING, category, module, message, **kwargs) def error(self, category: LogCategory, module: str, message: str, **kwargs): """Log error message""" self.log(LogLevel.ERROR, category, module, message, **kwargs) def critical(self, category: LogCategory, module: str, message: str, **kwargs): """Log critical message""" self.log(LogLevel.CRITICAL, category, module, message, **kwargs) def add_subscriber(self, subscriber_id: str, callback: Callable[[LogEntry], None], log_filter: Optional[LogFilter] = None) -> bool: """Add log subscriber for real-time streaming""" with self.lock: if subscriber_id in self.subscribers: return False subscriber = LogSubscriber(subscriber_id, callback, log_filter) self.subscribers[subscriber_id] = subscriber self.stats['active_subscribers'] = len(self.subscribers) return True def remove_subscriber(self, subscriber_id: str) -> bool: """Remove log subscriber""" with self.lock: if subscriber_id in self.subscribers: del self.subscribers[subscriber_id] self.stats['active_subscribers'] = len(self.subscribers) return True return False def get_logs(self, limit: int = 100, offset: int = 0, log_filter: Optional[LogFilter] = None) -> List[Dict]: """Get logs with filtering and pagination""" with self.lock: # Convert deque to list for easier manipulation all_logs = list(self.log_entries) # Apply filter if log_filter: filtered_logs = [entry for entry in all_logs if log_filter.matches(entry)] else: filtered_logs = all_logs # Sort by timestamp (newest first) filtered_logs.sort(key=lambda x: x.timestamp, reverse=True) # Apply pagination paginated_logs = filtered_logs[offset:offset + limit] return [entry.to_dict() for entry in paginated_logs] def search_logs(self, query: str, limit: int = 100) -> List[Dict]: """Search logs by text query""" log_filter = LogFilter() log_filter.text_filter = query return self.get_logs(limit=limit, log_filter=log_filter) def get_logs_by_session(self, session_id: str, limit: int = 100) -> List[Dict]: """Get logs for specific session""" log_filter = LogFilter() log_filter.session_filter = session_id return self.get_logs(limit=limit, log_filter=log_filter) def get_logs_by_client(self, client_id: str, limit: int = 100) -> List[Dict]: """Get logs for specific client""" log_filter = LogFilter() log_filter.client_filter = client_id return self.get_logs(limit=limit, log_filter=log_filter) def get_logs_by_ip(self, ip_address: str, limit: int = 100) -> List[Dict]: """Get logs for specific IP address""" log_filter = LogFilter() log_filter.ip_filter = ip_address return self.get_logs(limit=limit, log_filter=log_filter) def get_recent_errors(self, limit: int = 50) -> List[Dict]: """Get recent error and critical logs""" log_filter = LogFilter() log_filter.level_filter = LogLevel.ERROR return self.get_logs(limit=limit, log_filter=log_filter) def clear_logs(self): """Clear all logs from memory""" with self.lock: self.log_entries.clear() def get_stats(self) -> Dict: """Get logging statistics""" with self.lock: stats = self.stats.copy() stats['memory_logs_count'] = len(self.log_entries) stats['active_subscribers'] = len(self.subscribers) stats['queue_size'] = self.log_queue.qsize() return stats def reset_stats(self): """Reset logging statistics""" self.stats = { 'total_logs': 0, 'logs_by_level': {level.value: 0 for level in LogLevel}, 'logs_by_category': {cat.value: 0 for cat in LogCategory}, 'active_subscribers': len(self.subscribers), 'file_logs_written': 0, 'console_logs_written': 0, 'dropped_logs': 0 } def export_logs(self, format: str = 'json', log_filter: Optional[LogFilter] = None) -> str: """Export logs in specified format""" logs = self.get_logs(limit=10000, log_filter=log_filter) if format == 'json': return json.dumps(logs, indent=2, default=str) elif format == 'csv': import csv import io output = io.StringIO() if logs: writer = csv.DictWriter(output, fieldnames=logs[0].keys()) writer.writeheader() writer.writerows(logs) return output.getvalue() else: raise ValueError(f"Unsupported export format: {format}") def set_log_level(self, level: LogLevel): """Set logging level""" self.log_level = level self.logger.setLevel(getattr(logging, level.value)) def start(self): """Start logger""" self.running = True self.processing_thread = threading.Thread(target=self._process_log_queue, daemon=True) self.processing_thread.start() self.info(LogCategory.SYSTEM, 'logger', 'Virtual ISP Logger started') def stop(self): """Stop logger""" self.info(LogCategory.SYSTEM, 'logger', 'Virtual ISP Logger stopping') self.running = False # Wait for queue to be processed self.log_queue.join() # Wait for processing thread if self.processing_thread: self.processing_thread.join() # Remove all subscribers with self.lock: self.subscribers.clear() print("Virtual ISP Logger stopped") # Global logger instance _global_logger: Optional[VirtualISPLogger] = None def get_logger() -> Optional[VirtualISPLogger]: """Get global logger instance""" return _global_logger def init_logger(config: Dict) -> VirtualISPLogger: """Initialize global logger""" global _global_logger _global_logger = VirtualISPLogger(config) return _global_logger def log_debug(category: LogCategory, module: str, message: str, **kwargs): """Global debug logging function""" if _global_logger: _global_logger.debug(category, module, message, **kwargs) def log_info(category: LogCategory, module: str, message: str, **kwargs): """Global info logging function""" if _global_logger: _global_logger.info(category, module, message, **kwargs) def log_warning(category: LogCategory, module: str, message: str, **kwargs): """Global warning logging function""" if _global_logger: _global_logger.warning(category, module, message, **kwargs) def log_error(category: LogCategory, module: str, message: str, **kwargs): """Global error logging function""" if _global_logger: _global_logger.error(category, module, message, **kwargs) def log_critical(category: LogCategory, module: str, message: str, **kwargs): """Global critical logging function""" if _global_logger: _global_logger.critical(category, module, message, **kwargs)