|
|
""" |
|
|
Logger Module |
|
|
|
|
|
Centralized logging system for the virtual ISP stack: |
|
|
- Structured logging with multiple levels |
|
|
- Log aggregation and filtering |
|
|
- Real-time log streaming |
|
|
- Log persistence and rotation |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import logging.handlers |
|
|
import time |
|
|
import threading |
|
|
import json |
|
|
import os |
|
|
from typing import Dict, List, Optional, Any, Callable |
|
|
from dataclasses import dataclass, asdict |
|
|
from enum import Enum |
|
|
from collections import deque |
|
|
import queue |
|
|
|
|
|
|
|
|
class LogLevel(Enum): |
|
|
DEBUG = "DEBUG" |
|
|
INFO = "INFO" |
|
|
WARNING = "WARNING" |
|
|
ERROR = "ERROR" |
|
|
CRITICAL = "CRITICAL" |
|
|
|
|
|
|
|
|
class LogCategory(Enum): |
|
|
SYSTEM = "SYSTEM" |
|
|
DHCP = "DHCP" |
|
|
NAT = "NAT" |
|
|
FIREWALL = "FIREWALL" |
|
|
TCP = "TCP" |
|
|
ROUTER = "ROUTER" |
|
|
BRIDGE = "BRIDGE" |
|
|
SOCKET = "SOCKET" |
|
|
SESSION = "SESSION" |
|
|
SECURITY = "SECURITY" |
|
|
PERFORMANCE = "PERFORMANCE" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LogEntry: |
|
|
"""Structured log entry""" |
|
|
timestamp: float |
|
|
level: str |
|
|
category: str |
|
|
module: str |
|
|
message: str |
|
|
session_id: Optional[str] = None |
|
|
client_id: Optional[str] = None |
|
|
source_ip: Optional[str] = None |
|
|
dest_ip: Optional[str] = None |
|
|
protocol: Optional[str] = None |
|
|
metadata: Dict[str, Any] = None |
|
|
|
|
|
def __post_init__(self): |
|
|
if self.timestamp == 0: |
|
|
self.timestamp = time.time() |
|
|
if self.metadata is None: |
|
|
self.metadata = {} |
|
|
|
|
|
def to_dict(self) -> Dict: |
|
|
"""Convert to dictionary""" |
|
|
return asdict(self) |
|
|
|
|
|
def to_json(self) -> str: |
|
|
"""Convert to JSON string""" |
|
|
return json.dumps(self.to_dict(), default=str) |
|
|
|
|
|
|
|
|
class LogFilter: |
|
|
"""Log filtering class""" |
|
|
|
|
|
def __init__(self): |
|
|
self.level_filter: Optional[LogLevel] = None |
|
|
self.category_filter: Optional[LogCategory] = None |
|
|
self.module_filter: Optional[str] = None |
|
|
self.session_filter: Optional[str] = None |
|
|
self.client_filter: Optional[str] = None |
|
|
self.ip_filter: Optional[str] = None |
|
|
self.text_filter: Optional[str] = None |
|
|
self.time_range: Optional[tuple] = None |
|
|
|
|
|
def matches(self, entry: LogEntry) -> bool: |
|
|
"""Check if log entry matches filter criteria""" |
|
|
|
|
|
if self.level_filter: |
|
|
entry_level_value = getattr(logging, entry.level) |
|
|
filter_level_value = getattr(logging, self.level_filter.value) |
|
|
if entry_level_value < filter_level_value: |
|
|
return False |
|
|
|
|
|
|
|
|
if self.category_filter and entry.category != self.category_filter.value: |
|
|
return False |
|
|
|
|
|
|
|
|
if self.module_filter and self.module_filter.lower() not in entry.module.lower(): |
|
|
return False |
|
|
|
|
|
|
|
|
if self.session_filter and entry.session_id != self.session_filter: |
|
|
return False |
|
|
|
|
|
|
|
|
if self.client_filter and entry.client_id != self.client_filter: |
|
|
return False |
|
|
|
|
|
|
|
|
if self.ip_filter: |
|
|
if (entry.source_ip != self.ip_filter and |
|
|
entry.dest_ip != self.ip_filter): |
|
|
return False |
|
|
|
|
|
|
|
|
if self.text_filter and self.text_filter.lower() not in entry.message.lower(): |
|
|
return False |
|
|
|
|
|
|
|
|
if self.time_range: |
|
|
start_time, end_time = self.time_range |
|
|
if not (start_time <= entry.timestamp <= end_time): |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
class LogSubscriber: |
|
|
"""Log subscriber for real-time streaming""" |
|
|
|
|
|
def __init__(self, subscriber_id: str, callback: Callable[[LogEntry], None], |
|
|
log_filter: Optional[LogFilter] = None): |
|
|
self.subscriber_id = subscriber_id |
|
|
self.callback = callback |
|
|
self.filter = log_filter or LogFilter() |
|
|
self.created_time = time.time() |
|
|
self.message_count = 0 |
|
|
self.last_message_time = None |
|
|
self.is_active = True |
|
|
|
|
|
def send_log(self, entry: LogEntry) -> bool: |
|
|
"""Send log entry to subscriber if it matches filter""" |
|
|
if not self.is_active: |
|
|
return False |
|
|
|
|
|
if self.filter.matches(entry): |
|
|
try: |
|
|
self.callback(entry) |
|
|
self.message_count += 1 |
|
|
self.last_message_time = time.time() |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Error sending log to subscriber {self.subscriber_id}: {e}") |
|
|
self.is_active = False |
|
|
return False |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
class VirtualISPLogger: |
|
|
"""Centralized logger for Virtual ISP stack""" |
|
|
|
|
|
def __init__(self, config: Dict): |
|
|
self.config = config |
|
|
self.log_entries: deque = deque(maxlen=config.get('max_memory_logs', 10000)) |
|
|
self.subscribers: Dict[str, LogSubscriber] = {} |
|
|
self.lock = threading.Lock() |
|
|
|
|
|
|
|
|
self.log_level = LogLevel(config.get('log_level', 'INFO')) |
|
|
self.log_to_file = config.get('log_to_file', True) |
|
|
self.log_file_path = config.get('log_file_path', '/tmp/virtual_isp.log') |
|
|
self.log_file_max_size = config.get('log_file_max_size', 10 * 1024 * 1024) |
|
|
self.log_file_backup_count = config.get('log_file_backup_count', 5) |
|
|
self.log_to_console = config.get('log_to_console', True) |
|
|
self.structured_logging = config.get('structured_logging', True) |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
'total_logs': 0, |
|
|
'logs_by_level': {level.value: 0 for level in LogLevel}, |
|
|
'logs_by_category': {cat.value: 0 for cat in LogCategory}, |
|
|
'active_subscribers': 0, |
|
|
'file_logs_written': 0, |
|
|
'console_logs_written': 0, |
|
|
'dropped_logs': 0 |
|
|
} |
|
|
|
|
|
|
|
|
self._setup_logging() |
|
|
|
|
|
|
|
|
self.running = False |
|
|
self.log_queue = queue.Queue() |
|
|
self.processing_thread = None |
|
|
|
|
|
def _setup_logging(self): |
|
|
"""Setup Python logging infrastructure""" |
|
|
|
|
|
self.logger = logging.getLogger('virtual_isp') |
|
|
self.logger.setLevel(getattr(logging, self.log_level.value)) |
|
|
|
|
|
|
|
|
for handler in self.logger.handlers[:]: |
|
|
self.logger.removeHandler(handler) |
|
|
|
|
|
|
|
|
if self.log_to_console: |
|
|
console_handler = logging.StreamHandler() |
|
|
if self.structured_logging: |
|
|
console_formatter = logging.Formatter( |
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
else: |
|
|
console_formatter = logging.Formatter('%(message)s') |
|
|
console_handler.setFormatter(console_formatter) |
|
|
self.logger.addHandler(console_handler) |
|
|
|
|
|
|
|
|
if self.log_to_file: |
|
|
|
|
|
log_dir = os.path.dirname(self.log_file_path) |
|
|
if log_dir and not os.path.exists(log_dir): |
|
|
os.makedirs(log_dir, exist_ok=True) |
|
|
|
|
|
file_handler = logging.handlers.RotatingFileHandler( |
|
|
self.log_file_path, |
|
|
maxBytes=self.log_file_max_size, |
|
|
backupCount=self.log_file_backup_count |
|
|
) |
|
|
|
|
|
if self.structured_logging: |
|
|
file_formatter = logging.Formatter( |
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
else: |
|
|
file_formatter = logging.Formatter('%(message)s') |
|
|
|
|
|
file_handler.setFormatter(file_formatter) |
|
|
self.logger.addHandler(file_handler) |
|
|
|
|
|
def _process_log_queue(self): |
|
|
"""Background thread to process log queue""" |
|
|
while self.running: |
|
|
try: |
|
|
|
|
|
try: |
|
|
entry = self.log_queue.get(timeout=1.0) |
|
|
except queue.Empty: |
|
|
continue |
|
|
|
|
|
|
|
|
with self.lock: |
|
|
self.log_entries.append(entry) |
|
|
|
|
|
|
|
|
inactive_subscribers = [] |
|
|
with self.lock: |
|
|
for subscriber_id, subscriber in self.subscribers.items(): |
|
|
if not subscriber.send_log(entry): |
|
|
inactive_subscribers.append(subscriber_id) |
|
|
|
|
|
|
|
|
for subscriber_id in inactive_subscribers: |
|
|
self.remove_subscriber(subscriber_id) |
|
|
|
|
|
|
|
|
self.stats['total_logs'] += 1 |
|
|
self.stats['logs_by_level'][entry.level] += 1 |
|
|
self.stats['logs_by_category'][entry.category] += 1 |
|
|
|
|
|
|
|
|
self.log_queue.task_done() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing log queue: {e}") |
|
|
time.sleep(1) |
|
|
|
|
|
def log(self, level: LogLevel, category: LogCategory, module: str, message: str, |
|
|
session_id: Optional[str] = None, client_id: Optional[str] = None, |
|
|
source_ip: Optional[str] = None, dest_ip: Optional[str] = None, |
|
|
protocol: Optional[str] = None, **metadata): |
|
|
"""Log a message""" |
|
|
|
|
|
level_value = getattr(logging, level.value) |
|
|
min_level_value = getattr(logging, self.log_level.value) |
|
|
if level_value < min_level_value: |
|
|
return |
|
|
|
|
|
|
|
|
entry = LogEntry( |
|
|
timestamp=time.time(), |
|
|
level=level.value, |
|
|
category=category.value, |
|
|
module=module, |
|
|
message=message, |
|
|
session_id=session_id, |
|
|
client_id=client_id, |
|
|
source_ip=source_ip, |
|
|
dest_ip=dest_ip, |
|
|
protocol=protocol, |
|
|
metadata=metadata |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
self.log_queue.put_nowait(entry) |
|
|
except queue.Full: |
|
|
self.stats['dropped_logs'] += 1 |
|
|
|
|
|
|
|
|
if self.structured_logging: |
|
|
log_data = entry.to_dict() |
|
|
log_message = f"{message} | {json.dumps(log_data, default=str)}" |
|
|
else: |
|
|
log_message = message |
|
|
|
|
|
|
|
|
python_logger_level = getattr(logging, level.value) |
|
|
self.logger.log(python_logger_level, log_message) |
|
|
|
|
|
|
|
|
if self.log_to_console: |
|
|
self.stats['console_logs_written'] += 1 |
|
|
if self.log_to_file: |
|
|
self.stats['file_logs_written'] += 1 |
|
|
|
|
|
def debug(self, category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Log debug message""" |
|
|
self.log(LogLevel.DEBUG, category, module, message, **kwargs) |
|
|
|
|
|
def info(self, category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Log info message""" |
|
|
self.log(LogLevel.INFO, category, module, message, **kwargs) |
|
|
|
|
|
def warning(self, category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Log warning message""" |
|
|
self.log(LogLevel.WARNING, category, module, message, **kwargs) |
|
|
|
|
|
def error(self, category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Log error message""" |
|
|
self.log(LogLevel.ERROR, category, module, message, **kwargs) |
|
|
|
|
|
def critical(self, category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Log critical message""" |
|
|
self.log(LogLevel.CRITICAL, category, module, message, **kwargs) |
|
|
|
|
|
def add_subscriber(self, subscriber_id: str, callback: Callable[[LogEntry], None], |
|
|
log_filter: Optional[LogFilter] = None) -> bool: |
|
|
"""Add log subscriber for real-time streaming""" |
|
|
with self.lock: |
|
|
if subscriber_id in self.subscribers: |
|
|
return False |
|
|
|
|
|
subscriber = LogSubscriber(subscriber_id, callback, log_filter) |
|
|
self.subscribers[subscriber_id] = subscriber |
|
|
self.stats['active_subscribers'] = len(self.subscribers) |
|
|
|
|
|
return True |
|
|
|
|
|
def remove_subscriber(self, subscriber_id: str) -> bool: |
|
|
"""Remove log subscriber""" |
|
|
with self.lock: |
|
|
if subscriber_id in self.subscribers: |
|
|
del self.subscribers[subscriber_id] |
|
|
self.stats['active_subscribers'] = len(self.subscribers) |
|
|
return True |
|
|
return False |
|
|
|
|
|
def get_logs(self, limit: int = 100, offset: int = 0, |
|
|
log_filter: Optional[LogFilter] = None) -> List[Dict]: |
|
|
"""Get logs with filtering and pagination""" |
|
|
with self.lock: |
|
|
|
|
|
all_logs = list(self.log_entries) |
|
|
|
|
|
|
|
|
if log_filter: |
|
|
filtered_logs = [entry for entry in all_logs if log_filter.matches(entry)] |
|
|
else: |
|
|
filtered_logs = all_logs |
|
|
|
|
|
|
|
|
filtered_logs.sort(key=lambda x: x.timestamp, reverse=True) |
|
|
|
|
|
|
|
|
paginated_logs = filtered_logs[offset:offset + limit] |
|
|
|
|
|
return [entry.to_dict() for entry in paginated_logs] |
|
|
|
|
|
def search_logs(self, query: str, limit: int = 100) -> List[Dict]: |
|
|
"""Search logs by text query""" |
|
|
log_filter = LogFilter() |
|
|
log_filter.text_filter = query |
|
|
|
|
|
return self.get_logs(limit=limit, log_filter=log_filter) |
|
|
|
|
|
def get_logs_by_session(self, session_id: str, limit: int = 100) -> List[Dict]: |
|
|
"""Get logs for specific session""" |
|
|
log_filter = LogFilter() |
|
|
log_filter.session_filter = session_id |
|
|
|
|
|
return self.get_logs(limit=limit, log_filter=log_filter) |
|
|
|
|
|
def get_logs_by_client(self, client_id: str, limit: int = 100) -> List[Dict]: |
|
|
"""Get logs for specific client""" |
|
|
log_filter = LogFilter() |
|
|
log_filter.client_filter = client_id |
|
|
|
|
|
return self.get_logs(limit=limit, log_filter=log_filter) |
|
|
|
|
|
def get_logs_by_ip(self, ip_address: str, limit: int = 100) -> List[Dict]: |
|
|
"""Get logs for specific IP address""" |
|
|
log_filter = LogFilter() |
|
|
log_filter.ip_filter = ip_address |
|
|
|
|
|
return self.get_logs(limit=limit, log_filter=log_filter) |
|
|
|
|
|
def get_recent_errors(self, limit: int = 50) -> List[Dict]: |
|
|
"""Get recent error and critical logs""" |
|
|
log_filter = LogFilter() |
|
|
log_filter.level_filter = LogLevel.ERROR |
|
|
|
|
|
return self.get_logs(limit=limit, log_filter=log_filter) |
|
|
|
|
|
def clear_logs(self): |
|
|
"""Clear all logs from memory""" |
|
|
with self.lock: |
|
|
self.log_entries.clear() |
|
|
|
|
|
def get_stats(self) -> Dict: |
|
|
"""Get logging statistics""" |
|
|
with self.lock: |
|
|
stats = self.stats.copy() |
|
|
stats['memory_logs_count'] = len(self.log_entries) |
|
|
stats['active_subscribers'] = len(self.subscribers) |
|
|
stats['queue_size'] = self.log_queue.qsize() |
|
|
|
|
|
return stats |
|
|
|
|
|
def reset_stats(self): |
|
|
"""Reset logging statistics""" |
|
|
self.stats = { |
|
|
'total_logs': 0, |
|
|
'logs_by_level': {level.value: 0 for level in LogLevel}, |
|
|
'logs_by_category': {cat.value: 0 for cat in LogCategory}, |
|
|
'active_subscribers': len(self.subscribers), |
|
|
'file_logs_written': 0, |
|
|
'console_logs_written': 0, |
|
|
'dropped_logs': 0 |
|
|
} |
|
|
|
|
|
def export_logs(self, format: str = 'json', log_filter: Optional[LogFilter] = None) -> str: |
|
|
"""Export logs in specified format""" |
|
|
logs = self.get_logs(limit=10000, log_filter=log_filter) |
|
|
|
|
|
if format == 'json': |
|
|
return json.dumps(logs, indent=2, default=str) |
|
|
elif format == 'csv': |
|
|
import csv |
|
|
import io |
|
|
|
|
|
output = io.StringIO() |
|
|
if logs: |
|
|
writer = csv.DictWriter(output, fieldnames=logs[0].keys()) |
|
|
writer.writeheader() |
|
|
writer.writerows(logs) |
|
|
|
|
|
return output.getvalue() |
|
|
else: |
|
|
raise ValueError(f"Unsupported export format: {format}") |
|
|
|
|
|
def set_log_level(self, level: LogLevel): |
|
|
"""Set logging level""" |
|
|
self.log_level = level |
|
|
self.logger.setLevel(getattr(logging, level.value)) |
|
|
|
|
|
def start(self): |
|
|
"""Start logger""" |
|
|
self.running = True |
|
|
self.processing_thread = threading.Thread(target=self._process_log_queue, daemon=True) |
|
|
self.processing_thread.start() |
|
|
|
|
|
self.info(LogCategory.SYSTEM, 'logger', 'Virtual ISP Logger started') |
|
|
|
|
|
def stop(self): |
|
|
"""Stop logger""" |
|
|
self.info(LogCategory.SYSTEM, 'logger', 'Virtual ISP Logger stopping') |
|
|
|
|
|
self.running = False |
|
|
|
|
|
|
|
|
self.log_queue.join() |
|
|
|
|
|
|
|
|
if self.processing_thread: |
|
|
self.processing_thread.join() |
|
|
|
|
|
|
|
|
with self.lock: |
|
|
self.subscribers.clear() |
|
|
|
|
|
print("Virtual ISP Logger stopped") |
|
|
|
|
|
|
|
|
|
|
|
_global_logger: Optional[VirtualISPLogger] = None |
|
|
|
|
|
|
|
|
def get_logger() -> Optional[VirtualISPLogger]: |
|
|
"""Get global logger instance""" |
|
|
return _global_logger |
|
|
|
|
|
|
|
|
def init_logger(config: Dict) -> VirtualISPLogger: |
|
|
"""Initialize global logger""" |
|
|
global _global_logger |
|
|
_global_logger = VirtualISPLogger(config) |
|
|
return _global_logger |
|
|
|
|
|
|
|
|
def log_debug(category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Global debug logging function""" |
|
|
if _global_logger: |
|
|
_global_logger.debug(category, module, message, **kwargs) |
|
|
|
|
|
|
|
|
def log_info(category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Global info logging function""" |
|
|
if _global_logger: |
|
|
_global_logger.info(category, module, message, **kwargs) |
|
|
|
|
|
|
|
|
def log_warning(category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Global warning logging function""" |
|
|
if _global_logger: |
|
|
_global_logger.warning(category, module, message, **kwargs) |
|
|
|
|
|
|
|
|
def log_error(category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Global error logging function""" |
|
|
if _global_logger: |
|
|
_global_logger.error(category, module, message, **kwargs) |
|
|
|
|
|
|
|
|
def log_critical(category: LogCategory, module: str, message: str, **kwargs): |
|
|
"""Global critical logging function""" |
|
|
if _global_logger: |
|
|
_global_logger.critical(category, module, message, **kwargs) |
|
|
|
|
|
|