File size: 3,445 Bytes
1c302c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
Audit Service Configuration

Configures automatic request/response logging via middleware.
"""
from typing import Dict, List, Optional, Set
import logging

logger = logging.getLogger(__name__)


class AuditServiceConfig:
    """Configuration for automatic audit logging."""
    
    _excluded_paths: Set[str] = set()
    _log_all_requests: bool = True
    _log_response_bodies: bool = False
    _batch_size: int = 10
    _log_types: Dict[str, str] = {}
    
    @classmethod
    def register(
        cls,
        excluded_paths: Optional[List[str]] = None,
        log_all_requests: bool = True,
        log_response_bodies: bool = False,
        batch_size: int = 10,
        log_types: Optional[Dict[str, str]] = None
    ) -> None:
        """
        Register audit service configuration.
        
        Args:
            excluded_paths: Paths to exclude from logging (e.g., /health, /docs)
            log_all_requests: If True, log all requests. If False, only log errors
            log_response_bodies: If True, include response body in logs (privacy risk!)
            batch_size: Number of logs to batch before committing
            log_types: Map of path patterns to log types (client/server)
        
        Example:
            AuditServiceConfig.register(
                excluded_paths=["/health", "/docs", "/openapi.json"],
                log_all_requests=True,
                log_response_bodies=False
            )
        """
        cls._excluded_paths = set(excluded_paths or [])
        cls._log_all_requests = log_all_requests
        cls._log_response_bodies = log_response_bodies
        cls._batch_size = batch_size
        cls._log_types = log_types or {}
        
        logger.info(
            f"Audit Service configured: "
            f"excluded_paths={len(cls._excluded_paths)}, "
            f"log_all={log_all_requests}, "
            f"log_bodies={log_response_bodies}"
        )
    
    @classmethod
    def is_excluded(cls, path: str) -> bool:
        """Check if a path should be excluded from logging."""
        # Exact match
        if path in cls._excluded_paths:
            return True
        
        # Prefix match for wildcard patterns
        for excluded in cls._excluded_paths:
            if excluded.endswith("*") and path.startswith(excluded[:-1]):
                return True
        
        return False
    
    @classmethod
    def should_log(cls, path: str, status_code: int) -> bool:
        """Determine if request should be logged."""
        if cls.is_excluded(path):
            return False
        
        if cls._log_all_requests:
            return True
        
        # Only log errors if not logging all
        return status_code >= 400
    
    @classmethod
    def get_log_type(cls, path: str) -> str:
        """Get log type for a path (client/server)."""
        for pattern, log_type in cls._log_types.items():
            if pattern in path or path.startswith(pattern):
                return log_type
        
        # Default to server log type
        return "server"
    
    @classmethod
    def get_config(cls) -> dict:
        """Get current configuration."""
        return {
            "excluded_paths": list(cls._excluded_paths),
            "log_all_requests": cls._log_all_requests,
            "log_response_bodies": cls._log_response_bodies,
            "batch_size": cls._batch_size,
            "log_types": cls._log_types
        }