import contextlib import json import logging import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any class PIIScrubber: """PII detection and scrubbing utilities""" # PII patterns EMAIL_PATTERN = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b") PHONE_PATTERN = re.compile(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b") SSN_PATTERN = re.compile(r"\b\d{3}-\d{2}-\d{4}\b") CREDIT_CARD_PATTERN = re.compile(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b") IP_ADDRESS_PATTERN = re.compile(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b") # Additional patterns for sensitive data BANK_ACCOUNT_PATTERN = re.compile(r"\b\d{8,17}\b") PASSPORT_PATTERN = re.compile(r"\b[A-Z]{1,2}\d{7,8}\b") DRIVER_LICENSE_PATTERN = re.compile(r"\b[A-Z]{1,2}\d{6,8}\b") @staticmethod def scrub_pii(text: str, mask_char: str = "***") -> str: """Scrub PII from text using patterns""" if not text or not isinstance(text, str): return text scrubbed_text = text # Apply all PII patterns patterns = [ (PIIScrubber.EMAIL_PATTERN, f"{mask_char}@{mask_char}.com"), (PIIScrubber.PHONE_PATTERN, f"{mask_char}-{mask_char}-{mask_char}"), (PIIScrubber.SSN_PATTERN, f"{mask_char}-{mask_char}-{mask_char}"), ( PIIScrubber.CREDIT_CARD_PATTERN, f"{mask_char}-{mask_char}-{mask_char}-{mask_char}", ), ( PIIScrubber.IP_ADDRESS_PATTERN, f"{mask_char}.{mask_char}.{mask_char}.{mask_char}", ), (PIIScrubber.BANK_ACCOUNT_PATTERN, mask_char * 8), (PIIScrubber.PASSPORT_PATTERN, f"{mask_char}{mask_char}"), (PIIScrubber.DRIVER_LICENSE_PATTERN, f"{mask_char}{mask_char}"), ] for pattern, replacement in patterns: scrubbed_text = pattern.sub(replacement, scrubbed_text) return scrubbed_text class JSONFormatter(logging.Formatter): """Custom JSON formatter for structured logging""" def format(self, record: logging.LogRecord) -> str: log_entry = { "timestamp": datetime.now(timezone.utc).isoformat() + "Z", "level": record.levelname, "logger": record.name, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno, } if record.exc_info: log_entry["exception"] = self.formatException(record.exc_info) if hasattr(record, "extra_fields"): log_entry.update(record.extra_fields) if hasattr(record, "request_id"): log_entry["request_id"] = record.request_id if hasattr(record, "user_id"): log_entry["user_id"] = record.user_id if hasattr(record, "ip_address"): log_entry["ip_address"] = PIIScrubber.scrub_pii(record.ip_address) # Scrub the message itself log_entry["message"] = PIIScrubber.scrub_pii(log_entry["message"]) # Scrub extra fields for key, value in log_entry.items(): if isinstance(value, str) and key not in ["timestamp", "level", "logger"]: log_entry[key] = PIIScrubber.scrub_pii(value) return json.dumps(log_entry, default=str) def setup_logging( level: str = "INFO", format_type: str = "json", log_file: str | None = None, max_file_size: int = 10 * 1024 * 1024, # 10MB backup_count: int = 5, enable_console: bool = True, enable_file: bool = True, ) -> logging.Logger: """Enhanced logging setup with better configuration options""" # Convert string level to numeric numeric_level = getattr(logging, level.upper(), logging.INFO) # Get or create logger logger = logging.getLogger("zenith") logger.setLevel(numeric_level) # Clear existing handlers for h in list(logger.handlers): logger.removeHandler(h) # Create formatter if format_type.lower() == "json": formatter = JSONFormatter() else: formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # Console handler if enable_console: console = logging.StreamHandler(sys.stdout) console.setLevel(numeric_level) console.setFormatter(formatter) logger.addHandler(console) # File handler with rotation if enable_file and log_file: log_path = Path(log_file) log_path.parent.mkdir(parents=True, exist_ok=True) from logging.handlers import RotatingFileHandler fh = RotatingFileHandler( log_file, maxBytes=max_file_size, backupCount=backup_count, encoding="utf-8" ) fh.setLevel(numeric_level) fh.setFormatter(formatter) logger.addHandler(fh) logger.propagate = False return logger # Environment-based logging configuration logger = setup_logging( level=os.getenv("LOG_LEVEL", "WARNING"), # Default to WARNING for production format_type=os.getenv("LOG_FORMAT", "json"), log_file=os.getenv("LOG_FILE", "logs/fraud_detection.log"), max_file_size=int(os.getenv("LOG_MAX_SIZE_MB", "10")) * 1024 * 1024, backup_count=int(os.getenv("LOG_BACKUP_COUNT", "5")), enable_console=os.getenv("LOG_CONSOLE", "true").lower() == "true", enable_file=os.getenv("LOG_FILE_ENABLED", "true").lower() == "true", ) def log_request( request_id: str, method: str, path: str, status_code: int, duration: float, user_id: str | None = None, ): """Log HTTP request details (single call to module logger).""" extra_fields = { "request_id": request_id, "method": method, "path": path, "status_code": status_code, "duration_ms": round(duration * 1000, 2), } if user_id: extra_fields["user_id"] = user_id # Prefer calling the attribute on the imported module object so tests # that patch `core.logging.logger` are observed reliably. try: core_mod = sys.modules.get("core.logging") if core_mod and hasattr(core_mod, "logger"): try: core_mod.logger.info("HTTP request", extra=extra_fields) return except Exception: pass except Exception: pass try: logger.info("HTTP request", extra=extra_fields) except Exception: with contextlib.suppress(Exception): logging.getLogger("Zenith").info("HTTP request", extra=extra_fields) def log_error( error_type: str, message: str, details: dict[str, Any] | None = None, user_id: str | None = None, ): """Log application errors.""" extra_fields = {"error_type": error_type, "details": details or {}} if user_id: extra_fields["user_id"] = user_id try: core_mod = sys.modules.get("core.logging") if core_mod and hasattr(core_mod, "logger"): try: core_mod.logger.error(message, extra=extra_fields) return except Exception: pass except Exception: pass try: logger.error(message, extra=extra_fields) except Exception: with contextlib.suppress(Exception): logging.getLogger("Zenith").error(message, extra=extra_fields) def log_security_event( event_type: str, user_id: str | None = None, ip_address: str | None = None, details: dict[str, Any] | None = None, ): """Log security-related events.""" extra_fields = { "event_type": event_type, "security_event": True, "details": details or {}, } if user_id: extra_fields["user_id"] = user_id if ip_address: extra_fields["ip_address"] = ip_address try: core_mod = sys.modules.get("core.logging") if core_mod and hasattr(core_mod, "logger"): try: core_mod.logger.warning("Security event", extra=extra_fields) return except Exception: pass except Exception: pass try: logger.warning("Security event", extra=extra_fields) except Exception: with contextlib.suppress(Exception): logging.getLogger("Zenith").warning("Security event", extra=extra_fields) def log_performance(metric_name: str, value: float, tags: dict[str, Any] | None = None): """Log performance metrics.""" extra_fields = { "metric_name": metric_name, "metric_value": value, "performance_metric": True, "tags": tags or {}, } try: core_mod = sys.modules.get("core.logging") if core_mod and hasattr(core_mod, "logger"): try: core_mod.logger.info("Performance metric", extra=extra_fields) return except Exception: pass except Exception: pass try: logger.info("Performance metric", extra=extra_fields) except Exception: with contextlib.suppress(Exception): logging.getLogger("Zenith").info("Performance metric", extra=extra_fields) def configure_environment_logging(): """Configure logging based on environment (development vs production)""" env = os.getenv("ENVIRONMENT", "development").lower() if env == "development": # Development: more verbose, console only, human-readable setup_logging( level="DEBUG", format_type="text", enable_console=True, enable_file=False ) elif env == "testing": # Testing: capture all logs, minimal output, structured setup_logging( level="DEBUG", format_type="json", log_file="logs/test.log", enable_console=False, enable_file=True, ) elif env == "staging": # Staging: balanced logging, both console and file setup_logging( level="INFO", format_type="json", log_file="logs/staging.log", enable_console=True, enable_file=True, ) else: # production # Production: minimal, structured logging, file only setup_logging( level="WARNING", format_type="json", log_file="logs/production.log", enable_console=False, enable_file=True, ) # Initialize with environment-based configuration configure_environment_logging()