JRNET / core /logger.py
Factor Studios
Upload 96 files
6a5b8d8 verified
"""
Logger Module
Centralized logging system for the virtual ISP stack:
- Structured logging with multiple levels
- Log aggregation and filtering
- Real-time log streaming
- Log persistence and rotation
"""
import logging
import logging.handlers
import time
import threading
import json
import os
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict
from enum import Enum
from collections import deque
import queue
class LogLevel(Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class LogCategory(Enum):
SYSTEM = "SYSTEM"
OUTLINE = "OUTLINE"
NAT = "NAT"
TCP = "TCP"
ROUTER = "ROUTER"
BRIDGE = "BRIDGE"
SOCKET = "SOCKET"
SESSION = "SESSION"
SECURITY = "SECURITY"
PERFORMANCE = "PERFORMANCE"
@dataclass
class LogEntry:
"""Structured log entry"""
timestamp: float
level: str
category: str
module: str
message: str
session_id: Optional[str] = None
client_id: Optional[str] = None
source_ip: Optional[str] = None
dest_ip: Optional[str] = None
protocol: Optional[str] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.timestamp == 0:
self.timestamp = time.time()
if self.metadata is None:
self.metadata = {}
def to_dict(self) -> Dict:
"""Convert to dictionary"""
return asdict(self)
def to_json(self) -> str:
"""Convert to JSON string"""
return json.dumps(self.to_dict(), default=str)
class LogFilter:
"""Log filtering class"""
def __init__(self):
self.level_filter: Optional[LogLevel] = None
self.category_filter: Optional[LogCategory] = None
self.module_filter: Optional[str] = None
self.session_filter: Optional[str] = None
self.client_filter: Optional[str] = None
self.ip_filter: Optional[str] = None
self.text_filter: Optional[str] = None
self.time_range: Optional[tuple] = None
def matches(self, entry: LogEntry) -> bool:
"""Check if log entry matches filter criteria"""
# Level filter
if self.level_filter:
entry_level_value = getattr(logging, entry.level)
filter_level_value = getattr(logging, self.level_filter.value)
if entry_level_value < filter_level_value:
return False
# Category filter
if self.category_filter and entry.category != self.category_filter.value:
return False
# Module filter
if self.module_filter and entry.module != self.module_filter:
return False
# Session filter
if self.session_filter and entry.session_id != self.session_filter:
return False
# Client filter
if self.client_filter and entry.client_id != self.client_filter:
return False
# IP filter
if self.ip_filter:
if not (entry.source_ip == self.ip_filter or
entry.dest_ip == self.ip_filter):
return False
# Text filter
if self.text_filter:
if self.text_filter.lower() not in entry.message.lower():
return False
# Time range filter
if self.time_range:
start_time, end_time = self.time_range
if not (start_time <= entry.timestamp <= end_time):
return False
return True
class LogAggregator:
"""Aggregates and manages logs"""
def __init__(self, max_entries: int = 10000):
self.entries: deque = deque(maxlen=max_entries)
self.subscribers: List[Callable[[LogEntry], None]] = []
self.lock = threading.Lock()
def add_entry(self, entry: LogEntry):
"""Add a new log entry"""
with self.lock:
self.entries.append(entry)
# Notify subscribers
for subscriber in self.subscribers:
try:
subscriber(entry)
except Exception as e:
logging.error(f"Error notifying subscriber: {e}")
def get_entries(self, log_filter: Optional[LogFilter] = None) -> List[LogEntry]:
"""Get filtered log entries"""
with self.lock:
if log_filter is None:
return list(self.entries)
return [entry for entry in self.entries if log_filter.matches(entry)]
def subscribe(self, callback: Callable[[LogEntry], None]):
"""Subscribe to new log entries"""
with self.lock:
self.subscribers.append(callback)
def unsubscribe(self, callback: Callable[[LogEntry], None]):
"""Unsubscribe from log entries"""
with self.lock:
try:
self.subscribers.remove(callback)
except ValueError:
pass
class LogManager:
"""Central logging manager"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.aggregator = LogAggregator()
self.log_queue = queue.Queue()
self.file_handler = None
self.console_handler = None
# Configure logging
self._configure_logging()
# Start processing thread
self.running = True
self.process_thread = threading.Thread(target=self._process_queue)
self.process_thread.daemon = True
self.process_thread.start()
self._initialized = True
def _configure_logging(self):
"""Configure logging handlers"""
log_dir = os.path.join(os.path.dirname(__file__), '../logs')
os.makedirs(log_dir, exist_ok=True)
# File handler with rotation
log_file = os.path.join(log_dir, 'outline.log')
self.file_handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
self.file_handler.setFormatter(
logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
)
# Console handler
self.console_handler = logging.StreamHandler()
self.console_handler.setFormatter(
logging.Formatter('[%(levelname)s] %(message)s')
)
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(self.file_handler)
root_logger.addHandler(self.console_handler)
def _process_queue(self):
"""Process log queue"""
while self.running:
try:
entry = self.log_queue.get(timeout=1)
self.aggregator.add_entry(entry)
except queue.Empty:
continue
except Exception as e:
logging.error(f"Error processing log entry: {e}")
def log(self, level: LogLevel, category: LogCategory, module: str,
message: str, **kwargs):
"""Add a log entry"""
entry = LogEntry(
timestamp=time.time(),
level=level.value,
category=category.value,
module=module,
message=message,
**kwargs
)
self.log_queue.put(entry)
# Also log to Python's logging system
log_func = getattr(logging, level.value.lower())
log_func(f"[{category.value}] {message}")
def get_logs(self, log_filter: Optional[LogFilter] = None) -> List[LogEntry]:
"""Get filtered logs"""
return self.aggregator.get_entries(log_filter)
def subscribe(self, callback: Callable[[LogEntry], None]):
"""Subscribe to log entries"""
self.aggregator.subscribe(callback)
def unsubscribe(self, callback: Callable[[LogEntry], None]):
"""Unsubscribe from log entries"""
self.aggregator.unsubscribe(callback)
def shutdown(self):
"""Shutdown the log manager"""
self.running = False
if self.process_thread.is_alive():
self.process_thread.join()
# Close handlers
if self.file_handler:
self.file_handler.close()
if self.console_handler:
self.console_handler.close()