File size: 5,758 Bytes
31f0e50 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | """
Logging Configuration Module.
Provides structured logging for:
- API request/response logging
- Detection event logging
- Error logging
- Audit logging
"""
import logging
import sys
from typing import Optional, Dict, Any
from datetime import datetime
# Default logging configuration
LOG_FORMAT = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
def setup_logging(
level: str = "INFO",
log_format: Optional[str] = None,
log_file: Optional[str] = None,
) -> None:
"""
Configure logging for the application.
Args:
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_format: Custom log format string
log_file: Optional file path for file logging
"""
# Get log level
log_level = getattr(logging, level.upper(), logging.INFO)
# Configure root logger
handlers = []
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(logging.Formatter(
log_format or LOG_FORMAT,
LOG_DATE_FORMAT,
))
handlers.append(console_handler)
# File handler (optional)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(log_level)
file_handler.setFormatter(logging.Formatter(
log_format or LOG_FORMAT,
LOG_DATE_FORMAT,
))
handlers.append(file_handler)
# Configure root logger
logging.basicConfig(
level=log_level,
format=log_format or LOG_FORMAT,
datefmt=LOG_DATE_FORMAT,
handlers=handlers,
)
def get_logger(name: str) -> logging.Logger:
"""
Get a logger instance.
Args:
name: Logger name (typically __name__)
Returns:
Logger instance
"""
return logging.getLogger(name)
class RequestLogger:
"""Logger for API requests."""
def __init__(self) -> None:
"""Initialize request logger."""
self.logger = get_logger("scamshield.request")
def log_request(
self,
method: str,
path: str,
client_ip: Optional[str] = None,
request_id: Optional[str] = None,
) -> None:
"""
Log incoming API request.
Args:
method: HTTP method
path: Request path
client_ip: Client IP address
request_id: Unique request identifier
"""
self.logger.info(
f"REQUEST | {method} {path} | client={client_ip} | request_id={request_id}"
)
def log_response(
self,
status_code: int,
duration_ms: int,
request_id: Optional[str] = None,
) -> None:
"""
Log API response.
Args:
status_code: HTTP status code
duration_ms: Response time in milliseconds
request_id: Request identifier
"""
self.logger.info(
f"RESPONSE | status={status_code} | duration={duration_ms}ms | request_id={request_id}"
)
class DetectionLogger:
"""Logger for scam detection events."""
def __init__(self) -> None:
"""Initialize detection logger."""
self.logger = get_logger("scamshield.detection")
def log_detection(
self,
session_id: str,
scam_detected: bool,
confidence: float,
language: str,
indicators: Optional[list] = None,
) -> None:
"""
Log scam detection event.
Args:
session_id: Session identifier
scam_detected: Detection result
confidence: Detection confidence
language: Detected language
indicators: Matched scam indicators
"""
result = "SCAM" if scam_detected else "LEGITIMATE"
self.logger.info(
f"DETECTION | session={session_id} | result={result} | "
f"confidence={confidence:.2f} | language={language} | "
f"indicators={indicators}"
)
def log_extraction(
self,
session_id: str,
intel_summary: Dict[str, int],
confidence: float,
) -> None:
"""
Log intelligence extraction event.
Args:
session_id: Session identifier
intel_summary: Summary of extracted entities
confidence: Extraction confidence
"""
self.logger.info(
f"EXTRACTION | session={session_id} | "
f"entities={intel_summary} | confidence={confidence:.2f}"
)
class AuditLogger:
"""Logger for audit events."""
def __init__(self) -> None:
"""Initialize audit logger."""
self.logger = get_logger("scamshield.audit")
def log_event(
self,
event_type: str,
details: Dict[str, Any],
session_id: Optional[str] = None,
) -> None:
"""
Log audit event.
Args:
event_type: Type of event
details: Event details
session_id: Session identifier if applicable
"""
timestamp = datetime.utcnow().isoformat()
self.logger.info(
f"AUDIT | time={timestamp} | type={event_type} | "
f"session={session_id} | details={details}"
)
# Module-level logger instances
request_logger = RequestLogger()
detection_logger = DetectionLogger()
audit_logger = AuditLogger()
|