Spaces:
Sleeping
Sleeping
| """ | |
| Authentication event logging service with file output | |
| """ | |
| import logging | |
| import json | |
| from datetime import datetime | |
| from typing import Optional | |
| from pathlib import Path | |
| # Create logs directory if it doesn't exist | |
| logs_dir = Path("logs") | |
| logs_dir.mkdir(exist_ok=True) | |
| # Create a specific logger for authentication events | |
| auth_logger = logging.getLogger("auth_events") | |
| auth_logger.setLevel(logging.INFO) | |
| # Create file handler for authentication logs | |
| auth_log_file = logs_dir / "auth_events.log" | |
| file_handler = logging.FileHandler(auth_log_file) | |
| file_handler.setLevel(logging.INFO) | |
| # Create console handler for important events | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel(logging.WARNING) | |
| # Create formatter for structured logging | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| file_handler.setFormatter(formatter) | |
| console_handler.setFormatter(formatter) | |
| # Add handlers to logger | |
| auth_logger.addHandler(file_handler) | |
| auth_logger.addHandler(console_handler) | |
| # Prevent propagation to root logger to avoid duplicate logs | |
| auth_logger.propagate = False | |
| class AuthEventLogger: | |
| """Authentication event logging with structured file output""" | |
| def log_login_attempt( | |
| username: str, | |
| client_ip: str, | |
| user_agent: str, | |
| success: bool = False, | |
| user_id: Optional[str] = None, | |
| failure_reason: Optional[str] = None | |
| ): | |
| """Log a login attempt""" | |
| event_data = { | |
| "event": "login_attempt", | |
| "username": username, | |
| "client_ip": client_ip, | |
| "user_agent": user_agent, | |
| "success": success, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "user_id": user_id, | |
| "failure_reason": failure_reason | |
| } | |
| if success: | |
| auth_logger.info(f"LOGIN_SUCCESS: {json.dumps(event_data)}") | |
| else: | |
| auth_logger.warning(f"LOGIN_FAILED: {json.dumps(event_data)}") | |
| def log_registration_attempt( | |
| username: str, | |
| email: str, | |
| client_ip: str, | |
| user_agent: str, | |
| success: bool = False, | |
| user_id: Optional[str] = None, | |
| failure_reason: Optional[str] = None | |
| ): | |
| """Log a registration attempt""" | |
| event_data = { | |
| "event": "registration_attempt", | |
| "username": username, | |
| "email": email, | |
| "client_ip": client_ip, | |
| "user_agent": user_agent, | |
| "success": success, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "user_id": user_id, | |
| "failure_reason": failure_reason | |
| } | |
| if success: | |
| auth_logger.info(f"REGISTRATION_SUCCESS: {json.dumps(event_data)}") | |
| else: | |
| auth_logger.warning(f"REGISTRATION_FAILED: {json.dumps(event_data)}") | |
| def log_token_refresh( | |
| username: str, | |
| client_ip: str, | |
| user_agent: str, | |
| success: bool = False, | |
| failure_reason: Optional[str] = None | |
| ): | |
| """Log a token refresh""" | |
| event_data = { | |
| "event": "token_refresh", | |
| "username": username, | |
| "client_ip": client_ip, | |
| "user_agent": user_agent, | |
| "success": success, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "failure_reason": failure_reason | |
| } | |
| if success: | |
| auth_logger.info(f"TOKEN_REFRESH_SUCCESS: {json.dumps(event_data)}") | |
| else: | |
| auth_logger.warning(f"TOKEN_REFRESH_FAILED: {json.dumps(event_data)}") | |
| def log_logout( | |
| username: str, | |
| client_ip: str, | |
| user_agent: str, | |
| success: bool = False, | |
| failure_reason: Optional[str] = None | |
| ): | |
| """Log a logout""" | |
| event_data = { | |
| "event": "logout", | |
| "username": username, | |
| "client_ip": client_ip, | |
| "user_agent": user_agent, | |
| "success": success, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "failure_reason": failure_reason | |
| } | |
| if success: | |
| auth_logger.info(f"LOGOUT_SUCCESS: {json.dumps(event_data)}") | |
| else: | |
| auth_logger.warning(f"LOGOUT_FAILED: {json.dumps(event_data)}") | |
| def log_credential_validation( | |
| username: str, | |
| success: bool = False, | |
| failure_reason: Optional[str] = None | |
| ): | |
| """Log credential validation attempt""" | |
| event_data = { | |
| "event": "credential_validation", | |
| "username": username, | |
| "success": success, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "failure_reason": failure_reason | |
| } | |
| if success: | |
| auth_logger.info(f"CREDENTIAL_VALIDATION_SUCCESS: {json.dumps(event_data)}") | |
| else: | |
| auth_logger.warning(f"CREDENTIAL_VALIDATION_FAILED: {json.dumps(event_data)}") | |
| def get_client_info(request) -> tuple[str, str]: | |
| """Extract client IP and user agent from request""" | |
| client_ip = request.client.host if request.client else "unknown" | |
| user_agent = request.headers.get("user-agent", "unknown") | |
| return client_ip, user_agent | |