Spaces:
Runtime error
Runtime error
| """ | |
| Logger Module | |
| Centralized logging system for the virtual ISP stack: | |
| - Structured logging with multiple levels | |
| - Log aggregation and filtering | |
| - Real-time log streaming | |
| - Log persistence and rotation | |
| """ | |
| import logging | |
| import logging.handlers | |
| import time | |
| import threading | |
| import json | |
| import os | |
| from typing import Dict, List, Optional, Any, Callable | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| from collections import deque | |
| import queue | |
| class LogLevel(Enum): | |
| DEBUG = "DEBUG" | |
| INFO = "INFO" | |
| WARNING = "WARNING" | |
| ERROR = "ERROR" | |
| CRITICAL = "CRITICAL" | |
| class LogCategory(Enum): | |
| SYSTEM = "SYSTEM" | |
| OUTLINE = "OUTLINE" | |
| NAT = "NAT" | |
| TCP = "TCP" | |
| ROUTER = "ROUTER" | |
| BRIDGE = "BRIDGE" | |
| SOCKET = "SOCKET" | |
| SESSION = "SESSION" | |
| SECURITY = "SECURITY" | |
| PERFORMANCE = "PERFORMANCE" | |
| class LogEntry: | |
| """Structured log entry""" | |
| timestamp: float | |
| level: str | |
| category: str | |
| module: str | |
| message: str | |
| session_id: Optional[str] = None | |
| client_id: Optional[str] = None | |
| source_ip: Optional[str] = None | |
| dest_ip: Optional[str] = None | |
| protocol: Optional[str] = None | |
| metadata: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.timestamp == 0: | |
| self.timestamp = time.time() | |
| if self.metadata is None: | |
| self.metadata = {} | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary""" | |
| return asdict(self) | |
| def to_json(self) -> str: | |
| """Convert to JSON string""" | |
| return json.dumps(self.to_dict(), default=str) | |
| class LogFilter: | |
| """Log filtering class""" | |
| def __init__(self): | |
| self.level_filter: Optional[LogLevel] = None | |
| self.category_filter: Optional[LogCategory] = None | |
| self.module_filter: Optional[str] = None | |
| self.session_filter: Optional[str] = None | |
| self.client_filter: Optional[str] = None | |
| self.ip_filter: Optional[str] = None | |
| self.text_filter: Optional[str] = None | |
| self.time_range: Optional[tuple] = None | |
| def matches(self, entry: LogEntry) -> bool: | |
| """Check if log entry matches filter criteria""" | |
| # Level filter | |
| if self.level_filter: | |
| entry_level_value = getattr(logging, entry.level) | |
| filter_level_value = getattr(logging, self.level_filter.value) | |
| if entry_level_value < filter_level_value: | |
| return False | |
| # Category filter | |
| if self.category_filter and entry.category != self.category_filter.value: | |
| return False | |
| # Module filter | |
| if self.module_filter and entry.module != self.module_filter: | |
| return False | |
| # Session filter | |
| if self.session_filter and entry.session_id != self.session_filter: | |
| return False | |
| # Client filter | |
| if self.client_filter and entry.client_id != self.client_filter: | |
| return False | |
| # IP filter | |
| if self.ip_filter: | |
| if not (entry.source_ip == self.ip_filter or | |
| entry.dest_ip == self.ip_filter): | |
| return False | |
| # Text filter | |
| if self.text_filter: | |
| if self.text_filter.lower() not in entry.message.lower(): | |
| return False | |
| # Time range filter | |
| if self.time_range: | |
| start_time, end_time = self.time_range | |
| if not (start_time <= entry.timestamp <= end_time): | |
| return False | |
| return True | |
| class LogAggregator: | |
| """Aggregates and manages logs""" | |
| def __init__(self, max_entries: int = 10000): | |
| self.entries: deque = deque(maxlen=max_entries) | |
| self.subscribers: List[Callable[[LogEntry], None]] = [] | |
| self.lock = threading.Lock() | |
| def add_entry(self, entry: LogEntry): | |
| """Add a new log entry""" | |
| with self.lock: | |
| self.entries.append(entry) | |
| # Notify subscribers | |
| for subscriber in self.subscribers: | |
| try: | |
| subscriber(entry) | |
| except Exception as e: | |
| logging.error(f"Error notifying subscriber: {e}") | |
| def get_entries(self, log_filter: Optional[LogFilter] = None) -> List[LogEntry]: | |
| """Get filtered log entries""" | |
| with self.lock: | |
| if log_filter is None: | |
| return list(self.entries) | |
| return [entry for entry in self.entries if log_filter.matches(entry)] | |
| def subscribe(self, callback: Callable[[LogEntry], None]): | |
| """Subscribe to new log entries""" | |
| with self.lock: | |
| self.subscribers.append(callback) | |
| def unsubscribe(self, callback: Callable[[LogEntry], None]): | |
| """Unsubscribe from log entries""" | |
| with self.lock: | |
| try: | |
| self.subscribers.remove(callback) | |
| except ValueError: | |
| pass | |
| class LogManager: | |
| """Central logging manager""" | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if self._initialized: | |
| return | |
| self.aggregator = LogAggregator() | |
| self.log_queue = queue.Queue() | |
| self.file_handler = None | |
| self.console_handler = None | |
| # Configure logging | |
| self._configure_logging() | |
| # Start processing thread | |
| self.running = True | |
| self.process_thread = threading.Thread(target=self._process_queue) | |
| self.process_thread.daemon = True | |
| self.process_thread.start() | |
| self._initialized = True | |
| def _configure_logging(self): | |
| """Configure logging handlers""" | |
| log_dir = os.path.join(os.path.dirname(__file__), '../logs') | |
| os.makedirs(log_dir, exist_ok=True) | |
| # File handler with rotation | |
| log_file = os.path.join(log_dir, 'outline.log') | |
| self.file_handler = logging.handlers.RotatingFileHandler( | |
| log_file, maxBytes=10*1024*1024, backupCount=5 | |
| ) | |
| self.file_handler.setFormatter( | |
| logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') | |
| ) | |
| # Console handler | |
| self.console_handler = logging.StreamHandler() | |
| self.console_handler.setFormatter( | |
| logging.Formatter('[%(levelname)s] %(message)s') | |
| ) | |
| # Configure root logger | |
| root_logger = logging.getLogger() | |
| root_logger.setLevel(logging.INFO) | |
| root_logger.addHandler(self.file_handler) | |
| root_logger.addHandler(self.console_handler) | |
| def _process_queue(self): | |
| """Process log queue""" | |
| while self.running: | |
| try: | |
| entry = self.log_queue.get(timeout=1) | |
| self.aggregator.add_entry(entry) | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| logging.error(f"Error processing log entry: {e}") | |
| def log(self, level: LogLevel, category: LogCategory, module: str, | |
| message: str, **kwargs): | |
| """Add a log entry""" | |
| entry = LogEntry( | |
| timestamp=time.time(), | |
| level=level.value, | |
| category=category.value, | |
| module=module, | |
| message=message, | |
| **kwargs | |
| ) | |
| self.log_queue.put(entry) | |
| # Also log to Python's logging system | |
| log_func = getattr(logging, level.value.lower()) | |
| log_func(f"[{category.value}] {message}") | |
| def get_logs(self, log_filter: Optional[LogFilter] = None) -> List[LogEntry]: | |
| """Get filtered logs""" | |
| return self.aggregator.get_entries(log_filter) | |
| def subscribe(self, callback: Callable[[LogEntry], None]): | |
| """Subscribe to log entries""" | |
| self.aggregator.subscribe(callback) | |
| def unsubscribe(self, callback: Callable[[LogEntry], None]): | |
| """Unsubscribe from log entries""" | |
| self.aggregator.unsubscribe(callback) | |
| def shutdown(self): | |
| """Shutdown the log manager""" | |
| self.running = False | |
| if self.process_thread.is_alive(): | |
| self.process_thread.join() | |
| # Close handlers | |
| if self.file_handler: | |
| self.file_handler.close() | |
| if self.console_handler: | |
| self.console_handler.close() | |