| """
|
| Logger Module
|
|
|
| Centralized logging system for the virtual ISP stack:
|
| - Structured logging with multiple levels
|
| - Log aggregation and filtering
|
| - Real-time log streaming
|
| - Log persistence and rotation
|
| """
|
|
|
| import logging
|
| import logging.handlers
|
| import time
|
| import threading
|
| import json
|
| import os
|
| from typing import Dict, List, Optional, Any, Callable
|
| from dataclasses import dataclass, asdict
|
| from enum import Enum
|
| from collections import deque
|
| import queue
|
|
|
|
|
| class LogLevel(Enum):
|
| DEBUG = "DEBUG"
|
| INFO = "INFO"
|
| WARNING = "WARNING"
|
| ERROR = "ERROR"
|
| CRITICAL = "CRITICAL"
|
|
|
|
|
| class LogCategory(Enum):
|
| SYSTEM = "SYSTEM"
|
| OUTLINE = "OUTLINE"
|
| NAT = "NAT"
|
| TCP = "TCP"
|
| ROUTER = "ROUTER"
|
| BRIDGE = "BRIDGE"
|
| SOCKET = "SOCKET"
|
| SESSION = "SESSION"
|
| SECURITY = "SECURITY"
|
| PERFORMANCE = "PERFORMANCE"
|
|
|
|
|
| @dataclass
|
| class LogEntry:
|
| """Structured log entry"""
|
| timestamp: float
|
| level: str
|
| category: str
|
| module: str
|
| message: str
|
| session_id: Optional[str] = None
|
| client_id: Optional[str] = None
|
| source_ip: Optional[str] = None
|
| dest_ip: Optional[str] = None
|
| protocol: Optional[str] = None
|
| metadata: Dict[str, Any] = None
|
|
|
| def __post_init__(self):
|
| if self.timestamp == 0:
|
| self.timestamp = time.time()
|
| if self.metadata is None:
|
| self.metadata = {}
|
|
|
| def to_dict(self) -> Dict:
|
| """Convert to dictionary"""
|
| return asdict(self)
|
|
|
| def to_json(self) -> str:
|
| """Convert to JSON string"""
|
| return json.dumps(self.to_dict(), default=str)
|
|
|
|
|
| class LogFilter:
|
| """Log filtering class"""
|
|
|
| def __init__(self):
|
| self.level_filter: Optional[LogLevel] = None
|
| self.category_filter: Optional[LogCategory] = None
|
| self.module_filter: Optional[str] = None
|
| self.session_filter: Optional[str] = None
|
| self.client_filter: Optional[str] = None
|
| self.ip_filter: Optional[str] = None
|
| self.text_filter: Optional[str] = None
|
| self.time_range: Optional[tuple] = None
|
|
|
| def matches(self, entry: LogEntry) -> bool:
|
| """Check if log entry matches filter criteria"""
|
|
|
| if self.level_filter:
|
| entry_level_value = getattr(logging, entry.level)
|
| filter_level_value = getattr(logging, self.level_filter.value)
|
| if entry_level_value < filter_level_value:
|
| return False
|
|
|
|
|
| if self.category_filter and entry.category != self.category_filter.value:
|
| return False
|
|
|
|
|
| if self.module_filter and entry.module != self.module_filter:
|
| return False
|
|
|
|
|
| if self.session_filter and entry.session_id != self.session_filter:
|
| return False
|
|
|
|
|
| if self.client_filter and entry.client_id != self.client_filter:
|
| return False
|
|
|
|
|
| if self.ip_filter:
|
| if not (entry.source_ip == self.ip_filter or
|
| entry.dest_ip == self.ip_filter):
|
| return False
|
|
|
|
|
| if self.text_filter:
|
| if self.text_filter.lower() not in entry.message.lower():
|
| return False
|
|
|
|
|
| if self.time_range:
|
| start_time, end_time = self.time_range
|
| if not (start_time <= entry.timestamp <= end_time):
|
| return False
|
|
|
| return True
|
|
|
|
|
| class LogAggregator:
|
| """Aggregates and manages logs"""
|
|
|
| def __init__(self, max_entries: int = 10000):
|
| self.entries: deque = deque(maxlen=max_entries)
|
| self.subscribers: List[Callable[[LogEntry], None]] = []
|
| self.lock = threading.Lock()
|
|
|
| def add_entry(self, entry: LogEntry):
|
| """Add a new log entry"""
|
| with self.lock:
|
| self.entries.append(entry)
|
|
|
| for subscriber in self.subscribers:
|
| try:
|
| subscriber(entry)
|
| except Exception as e:
|
| logging.error(f"Error notifying subscriber: {e}")
|
|
|
| def get_entries(self, log_filter: Optional[LogFilter] = None) -> List[LogEntry]:
|
| """Get filtered log entries"""
|
| with self.lock:
|
| if log_filter is None:
|
| return list(self.entries)
|
| return [entry for entry in self.entries if log_filter.matches(entry)]
|
|
|
| def subscribe(self, callback: Callable[[LogEntry], None]):
|
| """Subscribe to new log entries"""
|
| with self.lock:
|
| self.subscribers.append(callback)
|
|
|
| def unsubscribe(self, callback: Callable[[LogEntry], None]):
|
| """Unsubscribe from log entries"""
|
| with self.lock:
|
| try:
|
| self.subscribers.remove(callback)
|
| except ValueError:
|
| pass
|
|
|
|
|
| class LogManager:
|
| """Central logging manager"""
|
|
|
| _instance = None
|
| _lock = threading.Lock()
|
|
|
| def __new__(cls):
|
| with cls._lock:
|
| if cls._instance is None:
|
| cls._instance = super().__new__(cls)
|
| cls._instance._initialized = False
|
| return cls._instance
|
|
|
| def __init__(self):
|
| if self._initialized:
|
| return
|
|
|
| self.aggregator = LogAggregator()
|
| self.log_queue = queue.Queue()
|
| self.file_handler = None
|
| self.console_handler = None
|
|
|
|
|
| self._configure_logging()
|
|
|
|
|
| self.running = True
|
| self.process_thread = threading.Thread(target=self._process_queue)
|
| self.process_thread.daemon = True
|
| self.process_thread.start()
|
|
|
| self._initialized = True
|
|
|
| def _configure_logging(self):
|
| """Configure logging handlers"""
|
| log_dir = os.path.join(os.path.dirname(__file__), '../logs')
|
| os.makedirs(log_dir, exist_ok=True)
|
|
|
|
|
| log_file = os.path.join(log_dir, 'outline.log')
|
| self.file_handler = logging.handlers.RotatingFileHandler(
|
| log_file, maxBytes=10*1024*1024, backupCount=5
|
| )
|
| self.file_handler.setFormatter(
|
| logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
|
| )
|
|
|
|
|
| self.console_handler = logging.StreamHandler()
|
| self.console_handler.setFormatter(
|
| logging.Formatter('[%(levelname)s] %(message)s')
|
| )
|
|
|
|
|
| root_logger = logging.getLogger()
|
| root_logger.setLevel(logging.INFO)
|
| root_logger.addHandler(self.file_handler)
|
| root_logger.addHandler(self.console_handler)
|
|
|
| def _process_queue(self):
|
| """Process log queue"""
|
| while self.running:
|
| try:
|
| entry = self.log_queue.get(timeout=1)
|
| self.aggregator.add_entry(entry)
|
| except queue.Empty:
|
| continue
|
| except Exception as e:
|
| logging.error(f"Error processing log entry: {e}")
|
|
|
| def log(self, level: LogLevel, category: LogCategory, module: str,
|
| message: str, **kwargs):
|
| """Add a log entry"""
|
| entry = LogEntry(
|
| timestamp=time.time(),
|
| level=level.value,
|
| category=category.value,
|
| module=module,
|
| message=message,
|
| **kwargs
|
| )
|
| self.log_queue.put(entry)
|
|
|
|
|
| log_func = getattr(logging, level.value.lower())
|
| log_func(f"[{category.value}] {message}")
|
|
|
| def get_logs(self, log_filter: Optional[LogFilter] = None) -> List[LogEntry]:
|
| """Get filtered logs"""
|
| return self.aggregator.get_entries(log_filter)
|
|
|
| def subscribe(self, callback: Callable[[LogEntry], None]):
|
| """Subscribe to log entries"""
|
| self.aggregator.subscribe(callback)
|
|
|
| def unsubscribe(self, callback: Callable[[LogEntry], None]):
|
| """Unsubscribe from log entries"""
|
| self.aggregator.unsubscribe(callback)
|
|
|
| def shutdown(self):
|
| """Shutdown the log manager"""
|
| self.running = False
|
| if self.process_thread.is_alive():
|
| self.process_thread.join()
|
|
|
|
|
| if self.file_handler:
|
| self.file_handler.close()
|
| if self.console_handler:
|
| self.console_handler.close()
|
|
|