""" Logger Module Centralized logging system for the virtual ISP stack: - Structured logging with multiple levels - Log aggregation and filtering - Real-time log streaming - Log persistence and rotation """ import logging import logging.handlers import time import threading import json import os from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, asdict from enum import Enum from collections import deque import queue class LogLevel(Enum): DEBUG = "DEBUG" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" CRITICAL = "CRITICAL" class LogCategory(Enum): SYSTEM = "SYSTEM" OUTLINE = "OUTLINE" NAT = "NAT" TCP = "TCP" ROUTER = "ROUTER" BRIDGE = "BRIDGE" SOCKET = "SOCKET" SESSION = "SESSION" SECURITY = "SECURITY" PERFORMANCE = "PERFORMANCE" @dataclass class LogEntry: """Structured log entry""" timestamp: float level: str category: str module: str message: str session_id: Optional[str] = None client_id: Optional[str] = None source_ip: Optional[str] = None dest_ip: Optional[str] = None protocol: Optional[str] = None metadata: Dict[str, Any] = None def __post_init__(self): if self.timestamp == 0: self.timestamp = time.time() if self.metadata is None: self.metadata = {} def to_dict(self) -> Dict: """Convert to dictionary""" return asdict(self) def to_json(self) -> str: """Convert to JSON string""" return json.dumps(self.to_dict(), default=str) class LogFilter: """Log filtering class""" def __init__(self): self.level_filter: Optional[LogLevel] = None self.category_filter: Optional[LogCategory] = None self.module_filter: Optional[str] = None self.session_filter: Optional[str] = None self.client_filter: Optional[str] = None self.ip_filter: Optional[str] = None self.text_filter: Optional[str] = None self.time_range: Optional[tuple] = None def matches(self, entry: LogEntry) -> bool: """Check if log entry matches filter criteria""" # Level filter if self.level_filter: entry_level_value = getattr(logging, entry.level) filter_level_value = getattr(logging, self.level_filter.value) if entry_level_value < filter_level_value: return False # Category filter if self.category_filter and entry.category != self.category_filter.value: return False # Module filter if self.module_filter and entry.module != self.module_filter: return False # Session filter if self.session_filter and entry.session_id != self.session_filter: return False # Client filter if self.client_filter and entry.client_id != self.client_filter: return False # IP filter if self.ip_filter: if not (entry.source_ip == self.ip_filter or entry.dest_ip == self.ip_filter): return False # Text filter if self.text_filter: if self.text_filter.lower() not in entry.message.lower(): return False # Time range filter if self.time_range: start_time, end_time = self.time_range if not (start_time <= entry.timestamp <= end_time): return False return True class LogAggregator: """Aggregates and manages logs""" def __init__(self, max_entries: int = 10000): self.entries: deque = deque(maxlen=max_entries) self.subscribers: List[Callable[[LogEntry], None]] = [] self.lock = threading.Lock() def add_entry(self, entry: LogEntry): """Add a new log entry""" with self.lock: self.entries.append(entry) # Notify subscribers for subscriber in self.subscribers: try: subscriber(entry) except Exception as e: logging.error(f"Error notifying subscriber: {e}") def get_entries(self, log_filter: Optional[LogFilter] = None) -> List[LogEntry]: """Get filtered log entries""" with self.lock: if log_filter is None: return list(self.entries) return [entry for entry in self.entries if log_filter.matches(entry)] def subscribe(self, callback: Callable[[LogEntry], None]): """Subscribe to new log entries""" with self.lock: self.subscribers.append(callback) def unsubscribe(self, callback: Callable[[LogEntry], None]): """Unsubscribe from log entries""" with self.lock: try: self.subscribers.remove(callback) except ValueError: pass class LogManager: """Central logging manager""" _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.aggregator = LogAggregator() self.log_queue = queue.Queue() self.file_handler = None self.console_handler = None # Configure logging self._configure_logging() # Start processing thread self.running = True self.process_thread = threading.Thread(target=self._process_queue) self.process_thread.daemon = True self.process_thread.start() self._initialized = True def _configure_logging(self): """Configure logging handlers""" log_dir = os.path.join(os.path.dirname(__file__), '../logs') os.makedirs(log_dir, exist_ok=True) # File handler with rotation log_file = os.path.join(log_dir, 'outline.log') self.file_handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5 ) self.file_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') ) # Console handler self.console_handler = logging.StreamHandler() self.console_handler.setFormatter( logging.Formatter('[%(levelname)s] %(message)s') ) # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) root_logger.addHandler(self.file_handler) root_logger.addHandler(self.console_handler) def _process_queue(self): """Process log queue""" while self.running: try: entry = self.log_queue.get(timeout=1) self.aggregator.add_entry(entry) except queue.Empty: continue except Exception as e: logging.error(f"Error processing log entry: {e}") def log(self, level: LogLevel, category: LogCategory, module: str, message: str, **kwargs): """Add a log entry""" entry = LogEntry( timestamp=time.time(), level=level.value, category=category.value, module=module, message=message, **kwargs ) self.log_queue.put(entry) # Also log to Python's logging system log_func = getattr(logging, level.value.lower()) log_func(f"[{category.value}] {message}") def get_logs(self, log_filter: Optional[LogFilter] = None) -> List[LogEntry]: """Get filtered logs""" return self.aggregator.get_entries(log_filter) def subscribe(self, callback: Callable[[LogEntry], None]): """Subscribe to log entries""" self.aggregator.subscribe(callback) def unsubscribe(self, callback: Callable[[LogEntry], None]): """Unsubscribe from log entries""" self.aggregator.unsubscribe(callback) def shutdown(self): """Shutdown the log manager""" self.running = False if self.process_thread.is_alive(): self.process_thread.join() # Close handlers if self.file_handler: self.file_handler.close() if self.console_handler: self.console_handler.close()