jebin2's picture
Phase 2: Implement Audit Middleware
1c302c7
"""
Audit Service Configuration
Configures automatic request/response logging via middleware.
"""
from typing import Dict, List, Optional, Set
import logging
logger = logging.getLogger(__name__)
class AuditServiceConfig:
"""Configuration for automatic audit logging."""
_excluded_paths: Set[str] = set()
_log_all_requests: bool = True
_log_response_bodies: bool = False
_batch_size: int = 10
_log_types: Dict[str, str] = {}
@classmethod
def register(
cls,
excluded_paths: Optional[List[str]] = None,
log_all_requests: bool = True,
log_response_bodies: bool = False,
batch_size: int = 10,
log_types: Optional[Dict[str, str]] = None
) -> None:
"""
Register audit service configuration.
Args:
excluded_paths: Paths to exclude from logging (e.g., /health, /docs)
log_all_requests: If True, log all requests. If False, only log errors
log_response_bodies: If True, include response body in logs (privacy risk!)
batch_size: Number of logs to batch before committing
log_types: Map of path patterns to log types (client/server)
Example:
AuditServiceConfig.register(
excluded_paths=["/health", "/docs", "/openapi.json"],
log_all_requests=True,
log_response_bodies=False
)
"""
cls._excluded_paths = set(excluded_paths or [])
cls._log_all_requests = log_all_requests
cls._log_response_bodies = log_response_bodies
cls._batch_size = batch_size
cls._log_types = log_types or {}
logger.info(
f"Audit Service configured: "
f"excluded_paths={len(cls._excluded_paths)}, "
f"log_all={log_all_requests}, "
f"log_bodies={log_response_bodies}"
)
@classmethod
def is_excluded(cls, path: str) -> bool:
"""Check if a path should be excluded from logging."""
# Exact match
if path in cls._excluded_paths:
return True
# Prefix match for wildcard patterns
for excluded in cls._excluded_paths:
if excluded.endswith("*") and path.startswith(excluded[:-1]):
return True
return False
@classmethod
def should_log(cls, path: str, status_code: int) -> bool:
"""Determine if request should be logged."""
if cls.is_excluded(path):
return False
if cls._log_all_requests:
return True
# Only log errors if not logging all
return status_code >= 400
@classmethod
def get_log_type(cls, path: str) -> str:
"""Get log type for a path (client/server)."""
for pattern, log_type in cls._log_types.items():
if pattern in path or path.startswith(pattern):
return log_type
# Default to server log type
return "server"
@classmethod
def get_config(cls) -> dict:
"""Get current configuration."""
return {
"excluded_paths": list(cls._excluded_paths),
"log_all_requests": cls._log_all_requests,
"log_response_bodies": cls._log_response_bodies,
"batch_size": cls._batch_size,
"log_types": cls._log_types
}