""" Authentication event logging service with file output """ import logging import json from datetime import datetime from typing import Optional from pathlib import Path # Create logs directory if it doesn't exist logs_dir = Path("logs") logs_dir.mkdir(exist_ok=True) # Create a specific logger for authentication events auth_logger = logging.getLogger("auth_events") auth_logger.setLevel(logging.INFO) # Create file handler for authentication logs auth_log_file = logs_dir / "auth_events.log" file_handler = logging.FileHandler(auth_log_file) file_handler.setLevel(logging.INFO) # Create console handler for important events console_handler = logging.StreamHandler() console_handler.setLevel(logging.WARNING) # Create formatter for structured logging formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # Add handlers to logger auth_logger.addHandler(file_handler) auth_logger.addHandler(console_handler) # Prevent propagation to root logger to avoid duplicate logs auth_logger.propagate = False class AuthEventLogger: """Authentication event logging with structured file output""" @staticmethod def log_login_attempt( username: str, client_ip: str, user_agent: str, success: bool = False, user_id: Optional[str] = None, failure_reason: Optional[str] = None ): """Log a login attempt""" event_data = { "event": "login_attempt", "username": username, "client_ip": client_ip, "user_agent": user_agent, "success": success, "timestamp": datetime.utcnow().isoformat(), "user_id": user_id, "failure_reason": failure_reason } if success: auth_logger.info(f"LOGIN_SUCCESS: {json.dumps(event_data)}") else: auth_logger.warning(f"LOGIN_FAILED: {json.dumps(event_data)}") @staticmethod def log_registration_attempt( username: str, email: str, client_ip: str, user_agent: str, success: bool = False, user_id: Optional[str] = None, failure_reason: Optional[str] = None ): """Log a registration attempt""" event_data = { "event": "registration_attempt", "username": username, "email": email, "client_ip": client_ip, "user_agent": user_agent, "success": success, "timestamp": datetime.utcnow().isoformat(), "user_id": user_id, "failure_reason": failure_reason } if success: auth_logger.info(f"REGISTRATION_SUCCESS: {json.dumps(event_data)}") else: auth_logger.warning(f"REGISTRATION_FAILED: {json.dumps(event_data)}") @staticmethod def log_token_refresh( username: str, client_ip: str, user_agent: str, success: bool = False, failure_reason: Optional[str] = None ): """Log a token refresh""" event_data = { "event": "token_refresh", "username": username, "client_ip": client_ip, "user_agent": user_agent, "success": success, "timestamp": datetime.utcnow().isoformat(), "failure_reason": failure_reason } if success: auth_logger.info(f"TOKEN_REFRESH_SUCCESS: {json.dumps(event_data)}") else: auth_logger.warning(f"TOKEN_REFRESH_FAILED: {json.dumps(event_data)}") @staticmethod def log_logout( username: str, client_ip: str, user_agent: str, success: bool = False, failure_reason: Optional[str] = None ): """Log a logout""" event_data = { "event": "logout", "username": username, "client_ip": client_ip, "user_agent": user_agent, "success": success, "timestamp": datetime.utcnow().isoformat(), "failure_reason": failure_reason } if success: auth_logger.info(f"LOGOUT_SUCCESS: {json.dumps(event_data)}") else: auth_logger.warning(f"LOGOUT_FAILED: {json.dumps(event_data)}") @staticmethod def log_credential_validation( username: str, success: bool = False, failure_reason: Optional[str] = None ): """Log credential validation attempt""" event_data = { "event": "credential_validation", "username": username, "success": success, "timestamp": datetime.utcnow().isoformat(), "failure_reason": failure_reason } if success: auth_logger.info(f"CREDENTIAL_VALIDATION_SUCCESS: {json.dumps(event_data)}") else: auth_logger.warning(f"CREDENTIAL_VALIDATION_FAILED: {json.dumps(event_data)}") def get_client_info(request) -> tuple[str, str]: """Extract client IP and user agent from request""" client_ip = request.client.host if request.client else "unknown" user_agent = request.headers.get("user-agent", "unknown") return client_ip, user_agent