cardserver / app /services /auth_logger.py
GitHub Actions
πŸš€ Auto-deploy from GitHub
1d89d54
"""
Authentication event logging service with file output
"""
import logging
import json
from datetime import datetime
from typing import Optional
from pathlib import Path
# Create logs directory if it doesn't exist
logs_dir = Path("logs")
logs_dir.mkdir(exist_ok=True)
# Create a specific logger for authentication events
auth_logger = logging.getLogger("auth_events")
auth_logger.setLevel(logging.INFO)
# Create file handler for authentication logs
auth_log_file = logs_dir / "auth_events.log"
file_handler = logging.FileHandler(auth_log_file)
file_handler.setLevel(logging.INFO)
# Create console handler for important events
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
# Create formatter for structured logging
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add handlers to logger
auth_logger.addHandler(file_handler)
auth_logger.addHandler(console_handler)
# Prevent propagation to root logger to avoid duplicate logs
auth_logger.propagate = False
class AuthEventLogger:
"""Authentication event logging with structured file output"""
@staticmethod
def log_login_attempt(
username: str,
client_ip: str,
user_agent: str,
success: bool = False,
user_id: Optional[str] = None,
failure_reason: Optional[str] = None
):
"""Log a login attempt"""
event_data = {
"event": "login_attempt",
"username": username,
"client_ip": client_ip,
"user_agent": user_agent,
"success": success,
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"failure_reason": failure_reason
}
if success:
auth_logger.info(f"LOGIN_SUCCESS: {json.dumps(event_data)}")
else:
auth_logger.warning(f"LOGIN_FAILED: {json.dumps(event_data)}")
@staticmethod
def log_registration_attempt(
username: str,
email: str,
client_ip: str,
user_agent: str,
success: bool = False,
user_id: Optional[str] = None,
failure_reason: Optional[str] = None
):
"""Log a registration attempt"""
event_data = {
"event": "registration_attempt",
"username": username,
"email": email,
"client_ip": client_ip,
"user_agent": user_agent,
"success": success,
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"failure_reason": failure_reason
}
if success:
auth_logger.info(f"REGISTRATION_SUCCESS: {json.dumps(event_data)}")
else:
auth_logger.warning(f"REGISTRATION_FAILED: {json.dumps(event_data)}")
@staticmethod
def log_token_refresh(
username: str,
client_ip: str,
user_agent: str,
success: bool = False,
failure_reason: Optional[str] = None
):
"""Log a token refresh"""
event_data = {
"event": "token_refresh",
"username": username,
"client_ip": client_ip,
"user_agent": user_agent,
"success": success,
"timestamp": datetime.utcnow().isoformat(),
"failure_reason": failure_reason
}
if success:
auth_logger.info(f"TOKEN_REFRESH_SUCCESS: {json.dumps(event_data)}")
else:
auth_logger.warning(f"TOKEN_REFRESH_FAILED: {json.dumps(event_data)}")
@staticmethod
def log_logout(
username: str,
client_ip: str,
user_agent: str,
success: bool = False,
failure_reason: Optional[str] = None
):
"""Log a logout"""
event_data = {
"event": "logout",
"username": username,
"client_ip": client_ip,
"user_agent": user_agent,
"success": success,
"timestamp": datetime.utcnow().isoformat(),
"failure_reason": failure_reason
}
if success:
auth_logger.info(f"LOGOUT_SUCCESS: {json.dumps(event_data)}")
else:
auth_logger.warning(f"LOGOUT_FAILED: {json.dumps(event_data)}")
@staticmethod
def log_credential_validation(
username: str,
success: bool = False,
failure_reason: Optional[str] = None
):
"""Log credential validation attempt"""
event_data = {
"event": "credential_validation",
"username": username,
"success": success,
"timestamp": datetime.utcnow().isoformat(),
"failure_reason": failure_reason
}
if success:
auth_logger.info(f"CREDENTIAL_VALIDATION_SUCCESS: {json.dumps(event_data)}")
else:
auth_logger.warning(f"CREDENTIAL_VALIDATION_FAILED: {json.dumps(event_data)}")
def get_client_info(request) -> tuple[str, str]:
"""Extract client IP and user agent from request"""
client_ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "unknown")
return client_ip, user_agent