Spaces:
Sleeping
Sleeping
| """ | |
| Audit Service - Centralized audit logging | |
| """ | |
| import logging | |
| import json | |
| from decimal import Decimal | |
| from datetime import datetime, date | |
| from typing import Optional, Dict, Any | |
| from sqlalchemy.orm import Session | |
| from fastapi import Request | |
| from app.models.audit_log import AuditLog | |
| from app.models.user import User | |
| logger = logging.getLogger(__name__) | |
| def _serialize_for_json(obj: Any) -> Any: | |
| """ | |
| Convert non-JSON-serializable types to JSON-compatible types | |
| Handles: | |
| - Decimal -> float | |
| - datetime/date -> ISO string | |
| - Other objects -> string representation | |
| """ | |
| if isinstance(obj, Decimal): | |
| return float(obj) | |
| elif isinstance(obj, (datetime, date)): | |
| return obj.isoformat() | |
| elif isinstance(obj, dict): | |
| return {k: _serialize_for_json(v) for k, v in obj.items()} | |
| elif isinstance(obj, (list, tuple)): | |
| return [_serialize_for_json(item) for item in obj] | |
| return obj | |
| class AuditService: | |
| """Service for logging user actions to audit trail""" | |
| def log_action( | |
| db: Session, | |
| action: str, | |
| entity_type: str, | |
| description: str, | |
| user: Optional[User] = None, | |
| entity_id: Optional[str] = None, | |
| changes: Optional[Dict[str, Any]] = None, | |
| request: Optional[Request] = None, | |
| additional_metadata: Optional[Dict[str, Any]] = None | |
| ) -> AuditLog: | |
| """ | |
| Log an action to the audit trail | |
| Args: | |
| db: Database session | |
| action: Action type ('login', 'create', 'update', 'delete', etc.) | |
| entity_type: Type of entity ('user', 'ticket', 'project', etc.) | |
| description: Human-readable description | |
| user: User who performed the action (optional for system actions) | |
| entity_id: ID of the affected entity (optional) | |
| changes: Dict of changes {"old": {...}, "new": {...}} | |
| request: FastAPI request object (for IP, user agent) | |
| additional_metadata: Additional context | |
| Returns: | |
| Created AuditLog record | |
| """ | |
| try: | |
| # Extract request context | |
| ip_address = None | |
| user_agent = None | |
| if request: | |
| # Get client IP (handle proxies) | |
| ip_address = request.client.host if request.client else None | |
| if not ip_address and request.headers.get("x-forwarded-for"): | |
| ip_address = request.headers.get("x-forwarded-for").split(",")[0].strip() | |
| # Get user agent | |
| user_agent = request.headers.get("user-agent") | |
| # Sanitize changes and metadata for JSON serialization | |
| sanitized_changes = _serialize_for_json(changes or {}) | |
| sanitized_metadata = _serialize_for_json(additional_metadata or {}) | |
| # Create audit log entry | |
| audit_log = AuditLog( | |
| user_id=user.id if user else None, | |
| user_email=user.email if user else None, | |
| user_role=user.role if user else None, | |
| action=action, | |
| entity_type=entity_type, | |
| entity_id=entity_id, | |
| description=description, | |
| changes=sanitized_changes, | |
| ip_address=ip_address, | |
| user_agent=user_agent, | |
| additional_metadata=sanitized_metadata | |
| ) | |
| db.add(audit_log) | |
| # PERFORMANCE: Flush instead of commit - parent transaction will commit | |
| # This batches audit logs with the main operation, reducing DB round trips | |
| db.flush() | |
| logger.info(f"Audit log created: {action} on {entity_type} by {user.email if user else 'system'}") | |
| return audit_log | |
| except Exception as e: | |
| logger.error(f"Failed to create audit log: {str(e)}") | |
| # Rollback this audit log attempt to prevent session corruption | |
| db.rollback() | |
| # Don't fail the main operation if audit logging fails | |
| return None | |
| def log_auth_event( | |
| db: Session, | |
| action: str, | |
| user_email: str, | |
| success: bool, | |
| request: Optional[Request] = None, | |
| reason: Optional[str] = None, | |
| user: Optional[Any] = None | |
| ) -> AuditLog: | |
| """ | |
| Log authentication events (login, logout, login_failed) | |
| Args: | |
| db: Database session | |
| action: 'login', 'logout', or 'login_failed' | |
| user_email: Email of user attempting action | |
| success: Whether the action succeeded | |
| request: FastAPI request object | |
| reason: Reason for failure (if applicable) | |
| user: User object (if authenticated) - for successful logins | |
| Returns: | |
| Created AuditLog record | |
| """ | |
| description = f"{'Successful' if success else 'Failed'} {action} attempt for {user_email}" | |
| if reason: | |
| description += f": {reason}" | |
| metadata = { | |
| "success": success, | |
| "email": user_email | |
| } | |
| if reason: | |
| metadata["reason"] = reason | |
| return AuditService.log_action( | |
| db=db, | |
| action=action if success else 'login_failed', | |
| entity_type='auth', | |
| description=description, | |
| user=user, # Pass user if provided (successful auth) | |
| request=request, | |
| additional_metadata=metadata | |
| ) | |