Spaces:
Paused
Paused
| """ | |
| Logger Module | |
| Centralized logging system for the virtual ISP stack: | |
| - Structured logging with multiple levels | |
| - Log aggregation and filtering | |
| - Real-time log streaming | |
| - Log persistence and rotation | |
| """ | |
| import logging | |
| import logging.handlers | |
| import time | |
| import threading | |
| import json | |
| import os | |
| from typing import Dict, List, Optional, Any, Callable | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| from collections import deque | |
| import queue | |
| class LogLevel(Enum): | |
| DEBUG = "DEBUG" | |
| INFO = "INFO" | |
| WARNING = "WARNING" | |
| ERROR = "ERROR" | |
| CRITICAL = "CRITICAL" | |
| class LogCategory(Enum): | |
| SYSTEM = "SYSTEM" | |
| DHCP = "DHCP" | |
| NAT = "NAT" | |
| FIREWALL = "FIREWALL" | |
| TCP = "TCP" | |
| ROUTER = "ROUTER" | |
| BRIDGE = "BRIDGE" | |
| SOCKET = "SOCKET" | |
| SESSION = "SESSION" | |
| SECURITY = "SECURITY" | |
| PERFORMANCE = "PERFORMANCE" | |
| class LogEntry: | |
| """Structured log entry""" | |
| timestamp: float | |
| level: str | |
| category: str | |
| module: str | |
| message: str | |
| session_id: Optional[str] = None | |
| client_id: Optional[str] = None | |
| source_ip: Optional[str] = None | |
| dest_ip: Optional[str] = None | |
| protocol: Optional[str] = None | |
| metadata: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.timestamp == 0: | |
| self.timestamp = time.time() | |
| if self.metadata is None: | |
| self.metadata = {} | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary""" | |
| return asdict(self) | |
| def to_json(self) -> str: | |
| """Convert to JSON string""" | |
| return json.dumps(self.to_dict(), default=str) | |
| class LogFilter: | |
| """Log filtering class""" | |
| def __init__(self): | |
| self.level_filter: Optional[LogLevel] = None | |
| self.category_filter: Optional[LogCategory] = None | |
| self.module_filter: Optional[str] = None | |
| self.session_filter: Optional[str] = None | |
| self.client_filter: Optional[str] = None | |
| self.ip_filter: Optional[str] = None | |
| self.text_filter: Optional[str] = None | |
| self.time_range: Optional[tuple] = None | |
| def matches(self, entry: LogEntry) -> bool: | |
| """Check if log entry matches filter criteria""" | |
| # Level filter | |
| if self.level_filter: | |
| entry_level_value = getattr(logging, entry.level) | |
| filter_level_value = getattr(logging, self.level_filter.value) | |
| if entry_level_value < filter_level_value: | |
| return False | |
| # Category filter | |
| if self.category_filter and entry.category != self.category_filter.value: | |
| return False | |
| # Module filter | |
| if self.module_filter and self.module_filter.lower() not in entry.module.lower(): | |
| return False | |
| # Session filter | |
| if self.session_filter and entry.session_id != self.session_filter: | |
| return False | |
| # Client filter | |
| if self.client_filter and entry.client_id != self.client_filter: | |
| return False | |
| # IP filter | |
| if self.ip_filter: | |
| if (entry.source_ip != self.ip_filter and | |
| entry.dest_ip != self.ip_filter): | |
| return False | |
| # Text filter | |
| if self.text_filter and self.text_filter.lower() not in entry.message.lower(): | |
| return False | |
| # Time range filter | |
| if self.time_range: | |
| start_time, end_time = self.time_range | |
| if not (start_time <= entry.timestamp <= end_time): | |
| return False | |
| return True | |
| class LogSubscriber: | |
| """Log subscriber for real-time streaming""" | |
| def __init__(self, subscriber_id: str, callback: Callable[[LogEntry], None], | |
| log_filter: Optional[LogFilter] = None): | |
| self.subscriber_id = subscriber_id | |
| self.callback = callback | |
| self.filter = log_filter or LogFilter() | |
| self.created_time = time.time() | |
| self.message_count = 0 | |
| self.last_message_time = None | |
| self.is_active = True | |
| def send_log(self, entry: LogEntry) -> bool: | |
| """Send log entry to subscriber if it matches filter""" | |
| if not self.is_active: | |
| return False | |
| if self.filter.matches(entry): | |
| try: | |
| self.callback(entry) | |
| self.message_count += 1 | |
| self.last_message_time = time.time() | |
| return True | |
| except Exception as e: | |
| print(f"Error sending log to subscriber {self.subscriber_id}: {e}") | |
| self.is_active = False | |
| return False | |
| return False | |
| class VirtualISPLogger: | |
| """Centralized logger for Virtual ISP stack""" | |
| def __init__(self, config: Dict): | |
| self.config = config | |
| self.log_entries: deque = deque(maxlen=config.get('max_memory_logs', 10000)) | |
| self.subscribers: Dict[str, LogSubscriber] = {} | |
| self.lock = threading.Lock() | |
| # Configuration | |
| self.log_level = LogLevel(config.get('log_level', 'INFO')) | |
| self.log_to_file = config.get('log_to_file', True) | |
| self.log_file_path = config.get('log_file_path', '/tmp/virtual_isp.log') | |
| self.log_file_max_size = config.get('log_file_max_size', 10 * 1024 * 1024) # 10MB | |
| self.log_file_backup_count = config.get('log_file_backup_count', 5) | |
| self.log_to_console = config.get('log_to_console', True) | |
| self.structured_logging = config.get('structured_logging', True) | |
| # Statistics | |
| self.stats = { | |
| 'total_logs': 0, | |
| 'logs_by_level': {level.value: 0 for level in LogLevel}, | |
| 'logs_by_category': {cat.value: 0 for cat in LogCategory}, | |
| 'active_subscribers': 0, | |
| 'file_logs_written': 0, | |
| 'console_logs_written': 0, | |
| 'dropped_logs': 0 | |
| } | |
| # Setup logging | |
| self._setup_logging() | |
| # Background processing | |
| self.running = False | |
| self.log_queue = queue.Queue() | |
| self.processing_thread = None | |
| def _setup_logging(self): | |
| """Setup Python logging infrastructure""" | |
| # Create logger | |
| self.logger = logging.getLogger('virtual_isp') | |
| self.logger.setLevel(getattr(logging, self.log_level.value)) | |
| # Remove existing handlers | |
| for handler in self.logger.handlers[:]: | |
| self.logger.removeHandler(handler) | |
| # Console handler | |
| if self.log_to_console: | |
| console_handler = logging.StreamHandler() | |
| if self.structured_logging: | |
| console_formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| else: | |
| console_formatter = logging.Formatter('%(message)s') | |
| console_handler.setFormatter(console_formatter) | |
| self.logger.addHandler(console_handler) | |
| # File handler with rotation | |
| if self.log_to_file: | |
| # Ensure log directory exists | |
| log_dir = os.path.dirname(self.log_file_path) | |
| if log_dir and not os.path.exists(log_dir): | |
| os.makedirs(log_dir, exist_ok=True) | |
| file_handler = logging.handlers.RotatingFileHandler( | |
| self.log_file_path, | |
| maxBytes=self.log_file_max_size, | |
| backupCount=self.log_file_backup_count | |
| ) | |
| if self.structured_logging: | |
| file_formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| else: | |
| file_formatter = logging.Formatter('%(message)s') | |
| file_handler.setFormatter(file_formatter) | |
| self.logger.addHandler(file_handler) | |
| def _process_log_queue(self): | |
| """Background thread to process log queue""" | |
| while self.running: | |
| try: | |
| # Get log entry from queue (with timeout) | |
| try: | |
| entry = self.log_queue.get(timeout=1.0) | |
| except queue.Empty: | |
| continue | |
| # Store in memory | |
| with self.lock: | |
| self.log_entries.append(entry) | |
| # Send to subscribers | |
| inactive_subscribers = [] | |
| with self.lock: | |
| for subscriber_id, subscriber in self.subscribers.items(): | |
| if not subscriber.send_log(entry): | |
| inactive_subscribers.append(subscriber_id) | |
| # Remove inactive subscribers | |
| for subscriber_id in inactive_subscribers: | |
| self.remove_subscriber(subscriber_id) | |
| # Update statistics | |
| self.stats['total_logs'] += 1 | |
| self.stats['logs_by_level'][entry.level] += 1 | |
| self.stats['logs_by_category'][entry.category] += 1 | |
| # Mark task as done | |
| self.log_queue.task_done() | |
| except Exception as e: | |
| print(f"Error processing log queue: {e}") | |
| time.sleep(1) | |
| def log(self, level: LogLevel, category: LogCategory, module: str, message: str, | |
| session_id: Optional[str] = None, client_id: Optional[str] = None, | |
| source_ip: Optional[str] = None, dest_ip: Optional[str] = None, | |
| protocol: Optional[str] = None, **metadata): | |
| """Log a message""" | |
| # Check if we should log this level | |
| level_value = getattr(logging, level.value) | |
| min_level_value = getattr(logging, self.log_level.value) | |
| if level_value < min_level_value: | |
| return | |
| # Create log entry | |
| entry = LogEntry( | |
| timestamp=time.time(), | |
| level=level.value, | |
| category=category.value, | |
| module=module, | |
| message=message, | |
| session_id=session_id, | |
| client_id=client_id, | |
| source_ip=source_ip, | |
| dest_ip=dest_ip, | |
| protocol=protocol, | |
| metadata=metadata | |
| ) | |
| # Add to queue for background processing | |
| try: | |
| self.log_queue.put_nowait(entry) | |
| except queue.Full: | |
| self.stats['dropped_logs'] += 1 | |
| # Also log through Python logging system | |
| if self.structured_logging: | |
| log_data = entry.to_dict() | |
| log_message = f"{message} | {json.dumps(log_data, default=str)}" | |
| else: | |
| log_message = message | |
| # Log to Python logger | |
| python_logger_level = getattr(logging, level.value) | |
| self.logger.log(python_logger_level, log_message) | |
| # Update console/file stats | |
| if self.log_to_console: | |
| self.stats['console_logs_written'] += 1 | |
| if self.log_to_file: | |
| self.stats['file_logs_written'] += 1 | |
| def debug(self, category: LogCategory, module: str, message: str, **kwargs): | |
| """Log debug message""" | |
| self.log(LogLevel.DEBUG, category, module, message, **kwargs) | |
| def info(self, category: LogCategory, module: str, message: str, **kwargs): | |
| """Log info message""" | |
| self.log(LogLevel.INFO, category, module, message, **kwargs) | |
| def warning(self, category: LogCategory, module: str, message: str, **kwargs): | |
| """Log warning message""" | |
| self.log(LogLevel.WARNING, category, module, message, **kwargs) | |
| def error(self, category: LogCategory, module: str, message: str, **kwargs): | |
| """Log error message""" | |
| self.log(LogLevel.ERROR, category, module, message, **kwargs) | |
| def critical(self, category: LogCategory, module: str, message: str, **kwargs): | |
| """Log critical message""" | |
| self.log(LogLevel.CRITICAL, category, module, message, **kwargs) | |
| def add_subscriber(self, subscriber_id: str, callback: Callable[[LogEntry], None], | |
| log_filter: Optional[LogFilter] = None) -> bool: | |
| """Add log subscriber for real-time streaming""" | |
| with self.lock: | |
| if subscriber_id in self.subscribers: | |
| return False | |
| subscriber = LogSubscriber(subscriber_id, callback, log_filter) | |
| self.subscribers[subscriber_id] = subscriber | |
| self.stats['active_subscribers'] = len(self.subscribers) | |
| return True | |
| def remove_subscriber(self, subscriber_id: str) -> bool: | |
| """Remove log subscriber""" | |
| with self.lock: | |
| if subscriber_id in self.subscribers: | |
| del self.subscribers[subscriber_id] | |
| self.stats['active_subscribers'] = len(self.subscribers) | |
| return True | |
| return False | |
| def get_logs(self, limit: int = 100, offset: int = 0, | |
| log_filter: Optional[LogFilter] = None) -> List[Dict]: | |
| """Get logs with filtering and pagination""" | |
| with self.lock: | |
| # Convert deque to list for easier manipulation | |
| all_logs = list(self.log_entries) | |
| # Apply filter | |
| if log_filter: | |
| filtered_logs = [entry for entry in all_logs if log_filter.matches(entry)] | |
| else: | |
| filtered_logs = all_logs | |
| # Sort by timestamp (newest first) | |
| filtered_logs.sort(key=lambda x: x.timestamp, reverse=True) | |
| # Apply pagination | |
| paginated_logs = filtered_logs[offset:offset + limit] | |
| return [entry.to_dict() for entry in paginated_logs] | |
| def search_logs(self, query: str, limit: int = 100) -> List[Dict]: | |
| """Search logs by text query""" | |
| log_filter = LogFilter() | |
| log_filter.text_filter = query | |
| return self.get_logs(limit=limit, log_filter=log_filter) | |
| def get_logs_by_session(self, session_id: str, limit: int = 100) -> List[Dict]: | |
| """Get logs for specific session""" | |
| log_filter = LogFilter() | |
| log_filter.session_filter = session_id | |
| return self.get_logs(limit=limit, log_filter=log_filter) | |
| def get_logs_by_client(self, client_id: str, limit: int = 100) -> List[Dict]: | |
| """Get logs for specific client""" | |
| log_filter = LogFilter() | |
| log_filter.client_filter = client_id | |
| return self.get_logs(limit=limit, log_filter=log_filter) | |
| def get_logs_by_ip(self, ip_address: str, limit: int = 100) -> List[Dict]: | |
| """Get logs for specific IP address""" | |
| log_filter = LogFilter() | |
| log_filter.ip_filter = ip_address | |
| return self.get_logs(limit=limit, log_filter=log_filter) | |
| def get_recent_errors(self, limit: int = 50) -> List[Dict]: | |
| """Get recent error and critical logs""" | |
| log_filter = LogFilter() | |
| log_filter.level_filter = LogLevel.ERROR | |
| return self.get_logs(limit=limit, log_filter=log_filter) | |
| def clear_logs(self): | |
| """Clear all logs from memory""" | |
| with self.lock: | |
| self.log_entries.clear() | |
| def get_stats(self) -> Dict: | |
| """Get logging statistics""" | |
| with self.lock: | |
| stats = self.stats.copy() | |
| stats['memory_logs_count'] = len(self.log_entries) | |
| stats['active_subscribers'] = len(self.subscribers) | |
| stats['queue_size'] = self.log_queue.qsize() | |
| return stats | |
| def reset_stats(self): | |
| """Reset logging statistics""" | |
| self.stats = { | |
| 'total_logs': 0, | |
| 'logs_by_level': {level.value: 0 for level in LogLevel}, | |
| 'logs_by_category': {cat.value: 0 for cat in LogCategory}, | |
| 'active_subscribers': len(self.subscribers), | |
| 'file_logs_written': 0, | |
| 'console_logs_written': 0, | |
| 'dropped_logs': 0 | |
| } | |
| def export_logs(self, format: str = 'json', log_filter: Optional[LogFilter] = None) -> str: | |
| """Export logs in specified format""" | |
| logs = self.get_logs(limit=10000, log_filter=log_filter) | |
| if format == 'json': | |
| return json.dumps(logs, indent=2, default=str) | |
| elif format == 'csv': | |
| import csv | |
| import io | |
| output = io.StringIO() | |
| if logs: | |
| writer = csv.DictWriter(output, fieldnames=logs[0].keys()) | |
| writer.writeheader() | |
| writer.writerows(logs) | |
| return output.getvalue() | |
| else: | |
| raise ValueError(f"Unsupported export format: {format}") | |
| def set_log_level(self, level: LogLevel): | |
| """Set logging level""" | |
| self.log_level = level | |
| self.logger.setLevel(getattr(logging, level.value)) | |
| def start(self): | |
| """Start logger""" | |
| self.running = True | |
| self.processing_thread = threading.Thread(target=self._process_log_queue, daemon=True) | |
| self.processing_thread.start() | |
| self.info(LogCategory.SYSTEM, 'logger', 'Virtual ISP Logger started') | |
| def stop(self): | |
| """Stop logger""" | |
| self.info(LogCategory.SYSTEM, 'logger', 'Virtual ISP Logger stopping') | |
| self.running = False | |
| # Wait for queue to be processed | |
| self.log_queue.join() | |
| # Wait for processing thread | |
| if self.processing_thread: | |
| self.processing_thread.join() | |
| # Remove all subscribers | |
| with self.lock: | |
| self.subscribers.clear() | |
| print("Virtual ISP Logger stopped") | |
| # Global logger instance | |
| _global_logger: Optional[VirtualISPLogger] = None | |
| def get_logger() -> Optional[VirtualISPLogger]: | |
| """Get global logger instance""" | |
| return _global_logger | |
| def init_logger(config: Dict) -> VirtualISPLogger: | |
| """Initialize global logger""" | |
| global _global_logger | |
| _global_logger = VirtualISPLogger(config) | |
| return _global_logger | |
| def log_debug(category: LogCategory, module: str, message: str, **kwargs): | |
| """Global debug logging function""" | |
| if _global_logger: | |
| _global_logger.debug(category, module, message, **kwargs) | |
| def log_info(category: LogCategory, module: str, message: str, **kwargs): | |
| """Global info logging function""" | |
| if _global_logger: | |
| _global_logger.info(category, module, message, **kwargs) | |
| def log_warning(category: LogCategory, module: str, message: str, **kwargs): | |
| """Global warning logging function""" | |
| if _global_logger: | |
| _global_logger.warning(category, module, message, **kwargs) | |
| def log_error(category: LogCategory, module: str, message: str, **kwargs): | |
| """Global error logging function""" | |
| if _global_logger: | |
| _global_logger.error(category, module, message, **kwargs) | |
| def log_critical(category: LogCategory, module: str, message: str, **kwargs): | |
| """Global critical logging function""" | |
| if _global_logger: | |
| _global_logger.critical(category, module, message, **kwargs) | |