Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| from datetime import datetime | |
| from typing import Optional | |
| import re | |
| class SecureFormatter(logging.Formatter): | |
| """ | |
| Custom formatter that sanitizes log messages to prevent log injection | |
| """ | |
| def __init__(self, fmt: Optional[str] = None, datefmt: Optional[str] = None): | |
| super().__init__(fmt, datefmt) | |
| # Pattern to identify potentially malicious content in log messages | |
| self.sanitize_pattern = re.compile(r'[\r\n\x00\x08\x0c\x1b]') # Remove control characters | |
| self.crlf_pattern = re.compile(r'\r\n|\r|\n') # Normalize line breaks | |
| def format(self, record): | |
| # Sanitize the message to prevent log injection | |
| if hasattr(record, 'msg'): | |
| # Sanitize the message | |
| record.msg = self.sanitize_pattern.sub(' ', str(record.msg)) | |
| # Replace line breaks with a space to prevent log forging | |
| record.msg = self.crlf_pattern.sub(' ', str(record.msg)) | |
| # If there are args, sanitize them too | |
| if hasattr(record, 'args') and record.args: | |
| sanitized_args = [] | |
| for arg in record.args: | |
| if isinstance(arg, str): | |
| arg = self.sanitize_pattern.sub(' ', arg) | |
| arg = self.crlf_pattern.sub(' ', arg) | |
| sanitized_args.append(arg) | |
| record.args = tuple(sanitized_args) | |
| return super().format(record) | |
| def setup_secure_logging(): | |
| """ | |
| Configure secure logging for the application | |
| """ | |
| # Create logs directory if it doesn't exist | |
| log_dir = "logs" | |
| if not os.path.exists(log_dir): | |
| os.makedirs(log_dir) | |
| # Create secure formatter | |
| secure_formatter = SecureFormatter( | |
| fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| # Configure root logger | |
| root_logger = logging.getLogger() | |
| root_logger.setLevel(logging.INFO) | |
| # Remove any existing handlers to avoid duplicates | |
| for handler in root_logger.handlers[:]: | |
| root_logger.removeHandler(handler) | |
| # File handler for general logs | |
| file_handler = logging.FileHandler( | |
| os.path.join(log_dir, f"app_{datetime.now().strftime('%Y%m%d')}.log"), | |
| mode='a', | |
| encoding='utf-8' | |
| ) | |
| file_handler.setFormatter(secure_formatter) | |
| root_logger.addHandler(file_handler) | |
| # Console handler for development | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(secure_formatter) | |
| root_logger.addHandler(console_handler) | |
| # Security-specific logger | |
| security_logger = logging.getLogger('security') | |
| security_logger.setLevel(logging.INFO) | |
| # Security log file | |
| security_handler = logging.FileHandler( | |
| os.path.join(log_dir, f"security_{datetime.now().strftime('%Y%m%d')}.log"), | |
| mode='a', | |
| encoding='utf-8' | |
| ) | |
| security_handler.setFormatter(secure_formatter) | |
| security_logger.addHandler(security_handler) | |
| return root_logger, security_logger | |
| def log_security_event(event_type: str, details: dict, security_logger: logging.Logger): | |
| """ | |
| Log security-related events | |
| """ | |
| # Sanitize details to prevent log injection | |
| sanitized_details = {} | |
| for key, value in details.items(): | |
| if isinstance(value, str): | |
| # Remove potentially dangerous characters | |
| sanitized_value = re.sub(r'[\r\n\x00\x08\x0c\x1b]', ' ', value) | |
| sanitized_details[key] = sanitized_value | |
| else: | |
| sanitized_details[key] = value | |
| security_logger.info(f"SECURITY_EVENT: {event_type} - {sanitized_details}") | |
| # Initialize loggers | |
| root_logger, security_logger = setup_secure_logging() | |
| def get_logger(name: str) -> logging.Logger: | |
| """ | |
| Get a logger instance with the specified name | |
| """ | |
| logger = logging.getLogger(name) | |
| logger.setLevel(logging.INFO) | |
| return logger | |
| def get_security_logger() -> logging.Logger: | |
| """ | |
| Get the security logger instance | |
| """ | |
| return security_logger |