Resonance-Calc / core /logging_config.py
SergeyO7's picture
Upload 25 files
790625d verified
import logging
import os
from datetime import datetime
from typing import Optional
import re
class SecureFormatter(logging.Formatter):
"""
Custom formatter that sanitizes log messages to prevent log injection
"""
def __init__(self, fmt: Optional[str] = None, datefmt: Optional[str] = None):
super().__init__(fmt, datefmt)
# Pattern to identify potentially malicious content in log messages
self.sanitize_pattern = re.compile(r'[\r\n\x00\x08\x0c\x1b]') # Remove control characters
self.crlf_pattern = re.compile(r'\r\n|\r|\n') # Normalize line breaks
def format(self, record):
# Sanitize the message to prevent log injection
if hasattr(record, 'msg'):
# Sanitize the message
record.msg = self.sanitize_pattern.sub(' ', str(record.msg))
# Replace line breaks with a space to prevent log forging
record.msg = self.crlf_pattern.sub(' ', str(record.msg))
# If there are args, sanitize them too
if hasattr(record, 'args') and record.args:
sanitized_args = []
for arg in record.args:
if isinstance(arg, str):
arg = self.sanitize_pattern.sub(' ', arg)
arg = self.crlf_pattern.sub(' ', arg)
sanitized_args.append(arg)
record.args = tuple(sanitized_args)
return super().format(record)
def setup_secure_logging():
"""
Configure secure logging for the application
"""
# Create logs directory if it doesn't exist
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# Create secure formatter
secure_formatter = SecureFormatter(
fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Remove any existing handlers to avoid duplicates
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# File handler for general logs
file_handler = logging.FileHandler(
os.path.join(log_dir, f"app_{datetime.now().strftime('%Y%m%d')}.log"),
mode='a',
encoding='utf-8'
)
file_handler.setFormatter(secure_formatter)
root_logger.addHandler(file_handler)
# Console handler for development
console_handler = logging.StreamHandler()
console_handler.setFormatter(secure_formatter)
root_logger.addHandler(console_handler)
# Security-specific logger
security_logger = logging.getLogger('security')
security_logger.setLevel(logging.INFO)
# Security log file
security_handler = logging.FileHandler(
os.path.join(log_dir, f"security_{datetime.now().strftime('%Y%m%d')}.log"),
mode='a',
encoding='utf-8'
)
security_handler.setFormatter(secure_formatter)
security_logger.addHandler(security_handler)
return root_logger, security_logger
def log_security_event(event_type: str, details: dict, security_logger: logging.Logger):
"""
Log security-related events
"""
# Sanitize details to prevent log injection
sanitized_details = {}
for key, value in details.items():
if isinstance(value, str):
# Remove potentially dangerous characters
sanitized_value = re.sub(r'[\r\n\x00\x08\x0c\x1b]', ' ', value)
sanitized_details[key] = sanitized_value
else:
sanitized_details[key] = value
security_logger.info(f"SECURITY_EVENT: {event_type} - {sanitized_details}")
# Initialize loggers
root_logger, security_logger = setup_secure_logging()
def get_logger(name: str) -> logging.Logger:
"""
Get a logger instance with the specified name
"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
return logger
def get_security_logger() -> logging.Logger:
"""
Get the security logger instance
"""
return security_logger