swiftops-backend / src /app /services /audit_service.py
kamau1's picture
Enhance audit and project services with idempotent behavior and JSON serialization
70ac999
"""
Audit Service - Centralized audit logging
"""
import logging
import json
from decimal import Decimal
from datetime import datetime, date
from typing import Optional, Dict, Any
from sqlalchemy.orm import Session
from fastapi import Request
from app.models.audit_log import AuditLog
from app.models.user import User
logger = logging.getLogger(__name__)
def _serialize_for_json(obj: Any) -> Any:
"""
Convert non-JSON-serializable types to JSON-compatible types
Handles:
- Decimal -> float
- datetime/date -> ISO string
- Other objects -> string representation
"""
if isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, (datetime, date)):
return obj.isoformat()
elif isinstance(obj, dict):
return {k: _serialize_for_json(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [_serialize_for_json(item) for item in obj]
return obj
class AuditService:
"""Service for logging user actions to audit trail"""
@staticmethod
def log_action(
db: Session,
action: str,
entity_type: str,
description: str,
user: Optional[User] = None,
entity_id: Optional[str] = None,
changes: Optional[Dict[str, Any]] = None,
request: Optional[Request] = None,
additional_metadata: Optional[Dict[str, Any]] = None
) -> AuditLog:
"""
Log an action to the audit trail
Args:
db: Database session
action: Action type ('login', 'create', 'update', 'delete', etc.)
entity_type: Type of entity ('user', 'ticket', 'project', etc.)
description: Human-readable description
user: User who performed the action (optional for system actions)
entity_id: ID of the affected entity (optional)
changes: Dict of changes {"old": {...}, "new": {...}}
request: FastAPI request object (for IP, user agent)
additional_metadata: Additional context
Returns:
Created AuditLog record
"""
try:
# Extract request context
ip_address = None
user_agent = None
if request:
# Get client IP (handle proxies)
ip_address = request.client.host if request.client else None
if not ip_address and request.headers.get("x-forwarded-for"):
ip_address = request.headers.get("x-forwarded-for").split(",")[0].strip()
# Get user agent
user_agent = request.headers.get("user-agent")
# Sanitize changes and metadata for JSON serialization
sanitized_changes = _serialize_for_json(changes or {})
sanitized_metadata = _serialize_for_json(additional_metadata or {})
# Create audit log entry
audit_log = AuditLog(
user_id=user.id if user else None,
user_email=user.email if user else None,
user_role=user.role if user else None,
action=action,
entity_type=entity_type,
entity_id=entity_id,
description=description,
changes=sanitized_changes,
ip_address=ip_address,
user_agent=user_agent,
additional_metadata=sanitized_metadata
)
db.add(audit_log)
# PERFORMANCE: Flush instead of commit - parent transaction will commit
# This batches audit logs with the main operation, reducing DB round trips
db.flush()
logger.info(f"Audit log created: {action} on {entity_type} by {user.email if user else 'system'}")
return audit_log
except Exception as e:
logger.error(f"Failed to create audit log: {str(e)}")
# Rollback this audit log attempt to prevent session corruption
db.rollback()
# Don't fail the main operation if audit logging fails
return None
@staticmethod
def log_auth_event(
db: Session,
action: str,
user_email: str,
success: bool,
request: Optional[Request] = None,
reason: Optional[str] = None,
user: Optional[Any] = None
) -> AuditLog:
"""
Log authentication events (login, logout, login_failed)
Args:
db: Database session
action: 'login', 'logout', or 'login_failed'
user_email: Email of user attempting action
success: Whether the action succeeded
request: FastAPI request object
reason: Reason for failure (if applicable)
user: User object (if authenticated) - for successful logins
Returns:
Created AuditLog record
"""
description = f"{'Successful' if success else 'Failed'} {action} attempt for {user_email}"
if reason:
description += f": {reason}"
metadata = {
"success": success,
"email": user_email
}
if reason:
metadata["reason"] = reason
return AuditService.log_action(
db=db,
action=action if success else 'login_failed',
entity_type='auth',
description=description,
user=user, # Pass user if provided (successful auth)
request=request,
additional_metadata=metadata
)