zenith-backend / core /logging.py
teoat's picture
Upload core/logging.py with huggingface_hub
71af2e7 verified
import contextlib
import json
import logging
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
class PIIScrubber:
"""PII detection and scrubbing utilities"""
# PII patterns
EMAIL_PATTERN = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b")
PHONE_PATTERN = re.compile(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b")
SSN_PATTERN = re.compile(r"\b\d{3}-\d{2}-\d{4}\b")
CREDIT_CARD_PATTERN = re.compile(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b")
IP_ADDRESS_PATTERN = re.compile(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b")
# Additional patterns for sensitive data
BANK_ACCOUNT_PATTERN = re.compile(r"\b\d{8,17}\b")
PASSPORT_PATTERN = re.compile(r"\b[A-Z]{1,2}\d{7,8}\b")
DRIVER_LICENSE_PATTERN = re.compile(r"\b[A-Z]{1,2}\d{6,8}\b")
@staticmethod
def scrub_pii(text: str, mask_char: str = "***") -> str:
"""Scrub PII from text using patterns"""
if not text or not isinstance(text, str):
return text
scrubbed_text = text
# Apply all PII patterns
patterns = [
(PIIScrubber.EMAIL_PATTERN, f"{mask_char}@{mask_char}.com"),
(PIIScrubber.PHONE_PATTERN, f"{mask_char}-{mask_char}-{mask_char}"),
(PIIScrubber.SSN_PATTERN, f"{mask_char}-{mask_char}-{mask_char}"),
(
PIIScrubber.CREDIT_CARD_PATTERN,
f"{mask_char}-{mask_char}-{mask_char}-{mask_char}",
),
(
PIIScrubber.IP_ADDRESS_PATTERN,
f"{mask_char}.{mask_char}.{mask_char}.{mask_char}",
),
(PIIScrubber.BANK_ACCOUNT_PATTERN, mask_char * 8),
(PIIScrubber.PASSPORT_PATTERN, f"{mask_char}{mask_char}"),
(PIIScrubber.DRIVER_LICENSE_PATTERN, f"{mask_char}{mask_char}"),
]
for pattern, replacement in patterns:
scrubbed_text = pattern.sub(replacement, scrubbed_text)
return scrubbed_text
class JSONFormatter(logging.Formatter):
"""Custom JSON formatter for structured logging"""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
if hasattr(record, "extra_fields"):
log_entry.update(record.extra_fields)
if hasattr(record, "request_id"):
log_entry["request_id"] = record.request_id
if hasattr(record, "user_id"):
log_entry["user_id"] = record.user_id
if hasattr(record, "ip_address"):
log_entry["ip_address"] = PIIScrubber.scrub_pii(record.ip_address)
# Scrub the message itself
log_entry["message"] = PIIScrubber.scrub_pii(log_entry["message"])
# Scrub extra fields
for key, value in log_entry.items():
if isinstance(value, str) and key not in ["timestamp", "level", "logger"]:
log_entry[key] = PIIScrubber.scrub_pii(value)
return json.dumps(log_entry, default=str)
def setup_logging(
level: str = "INFO",
format_type: str = "json",
log_file: str | None = None,
max_file_size: int = 10 * 1024 * 1024, # 10MB
backup_count: int = 5,
enable_console: bool = True,
enable_file: bool = True,
) -> logging.Logger:
"""Enhanced logging setup with better configuration options"""
# Convert string level to numeric
numeric_level = getattr(logging, level.upper(), logging.INFO)
# Get or create logger
logger = logging.getLogger("zenith")
logger.setLevel(numeric_level)
# Clear existing handlers
for h in list(logger.handlers):
logger.removeHandler(h)
# Create formatter
if format_type.lower() == "json":
formatter = JSONFormatter()
else:
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Console handler
if enable_console:
console = logging.StreamHandler(sys.stdout)
console.setLevel(numeric_level)
console.setFormatter(formatter)
logger.addHandler(console)
# File handler with rotation
if enable_file and log_file:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
from logging.handlers import RotatingFileHandler
fh = RotatingFileHandler(
log_file, maxBytes=max_file_size, backupCount=backup_count, encoding="utf-8"
)
fh.setLevel(numeric_level)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.propagate = False
return logger
# Environment-based logging configuration
logger = setup_logging(
level=os.getenv("LOG_LEVEL", "WARNING"), # Default to WARNING for production
format_type=os.getenv("LOG_FORMAT", "json"),
log_file=os.getenv("LOG_FILE", "logs/fraud_detection.log"),
max_file_size=int(os.getenv("LOG_MAX_SIZE_MB", "10")) * 1024 * 1024,
backup_count=int(os.getenv("LOG_BACKUP_COUNT", "5")),
enable_console=os.getenv("LOG_CONSOLE", "true").lower() == "true",
enable_file=os.getenv("LOG_FILE_ENABLED", "true").lower() == "true",
)
def log_request(
request_id: str,
method: str,
path: str,
status_code: int,
duration: float,
user_id: str | None = None,
):
"""Log HTTP request details (single call to module logger)."""
extra_fields = {
"request_id": request_id,
"method": method,
"path": path,
"status_code": status_code,
"duration_ms": round(duration * 1000, 2),
}
if user_id:
extra_fields["user_id"] = user_id
# Prefer calling the attribute on the imported module object so tests
# that patch `core.logging.logger` are observed reliably.
try:
core_mod = sys.modules.get("core.logging")
if core_mod and hasattr(core_mod, "logger"):
try:
core_mod.logger.info("HTTP request", extra=extra_fields)
return
except Exception:
pass
except Exception:
pass
try:
logger.info("HTTP request", extra=extra_fields)
except Exception:
with contextlib.suppress(Exception):
logging.getLogger("Zenith").info("HTTP request", extra=extra_fields)
def log_error(
error_type: str,
message: str,
details: dict[str, Any] | None = None,
user_id: str | None = None,
):
"""Log application errors."""
extra_fields = {"error_type": error_type, "details": details or {}}
if user_id:
extra_fields["user_id"] = user_id
try:
core_mod = sys.modules.get("core.logging")
if core_mod and hasattr(core_mod, "logger"):
try:
core_mod.logger.error(message, extra=extra_fields)
return
except Exception:
pass
except Exception:
pass
try:
logger.error(message, extra=extra_fields)
except Exception:
with contextlib.suppress(Exception):
logging.getLogger("Zenith").error(message, extra=extra_fields)
def log_security_event(
event_type: str,
user_id: str | None = None,
ip_address: str | None = None,
details: dict[str, Any] | None = None,
):
"""Log security-related events."""
extra_fields = {
"event_type": event_type,
"security_event": True,
"details": details or {},
}
if user_id:
extra_fields["user_id"] = user_id
if ip_address:
extra_fields["ip_address"] = ip_address
try:
core_mod = sys.modules.get("core.logging")
if core_mod and hasattr(core_mod, "logger"):
try:
core_mod.logger.warning("Security event", extra=extra_fields)
return
except Exception:
pass
except Exception:
pass
try:
logger.warning("Security event", extra=extra_fields)
except Exception:
with contextlib.suppress(Exception):
logging.getLogger("Zenith").warning("Security event", extra=extra_fields)
def log_performance(metric_name: str, value: float, tags: dict[str, Any] | None = None):
"""Log performance metrics."""
extra_fields = {
"metric_name": metric_name,
"metric_value": value,
"performance_metric": True,
"tags": tags or {},
}
try:
core_mod = sys.modules.get("core.logging")
if core_mod and hasattr(core_mod, "logger"):
try:
core_mod.logger.info("Performance metric", extra=extra_fields)
return
except Exception:
pass
except Exception:
pass
try:
logger.info("Performance metric", extra=extra_fields)
except Exception:
with contextlib.suppress(Exception):
logging.getLogger("Zenith").info("Performance metric", extra=extra_fields)
def configure_environment_logging():
"""Configure logging based on environment (development vs production)"""
env = os.getenv("ENVIRONMENT", "development").lower()
if env == "development":
# Development: more verbose, console only, human-readable
setup_logging(
level="DEBUG", format_type="text", enable_console=True, enable_file=False
)
elif env == "testing":
# Testing: capture all logs, minimal output, structured
setup_logging(
level="DEBUG",
format_type="json",
log_file="logs/test.log",
enable_console=False,
enable_file=True,
)
elif env == "staging":
# Staging: balanced logging, both console and file
setup_logging(
level="INFO",
format_type="json",
log_file="logs/staging.log",
enable_console=True,
enable_file=True,
)
else: # production
# Production: minimal, structured logging, file only
setup_logging(
level="WARNING",
format_type="json",
log_file="logs/production.log",
enable_console=False,
enable_file=True,
)
# Initialize with environment-based configuration
configure_environment_logging()