Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException, Depends, status, Body | |
| from sqlmodel import Session, select | |
| from typing import List, Dict, Any, Optional | |
| from uuid import UUID | |
| import logging | |
| from datetime import datetime | |
| import uuid | |
| from ..models.audit_log import AuditLog, AuditLogCreate | |
| from ..models.user import User | |
| from ..database import get_session_dep | |
| from ..utils.deps import get_current_user | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api/audit", tags=["audit"]) | |
| async def receive_audit_event( | |
| event: Dict[str, Any] = Body(...), | |
| session: Session = Depends(get_session_dep) | |
| ): | |
| """ | |
| Receives audit events from Dapr Pub/Sub (Kafka) and saves them to the database. | |
| This endpoint is called by the Dapr sidecar when events are published. | |
| """ | |
| try: | |
| logger.info(f"Received audit event: {event}") | |
| # Extract event data | |
| event_id = event.get("event_id") | |
| event_type = event.get("event_type") | |
| user_id = event.get("user_id") | |
| task_id = event.get("task_id") | |
| task_data = event.get("task_data", {}) | |
| timestamp = event.get("timestamp") | |
| # Validate required fields | |
| if not all([event_id, event_type, user_id, task_id]): | |
| logger.warning(f"Missing required fields in event: {event}") | |
| return {"status": "error", "message": "Missing required fields"} | |
| # Check if event already exists (deduplication) | |
| existing = session.exec( | |
| select(AuditLog).where(AuditLog.event_id == event_id) | |
| ).first() | |
| if existing: | |
| logger.info(f"Event {event_id} already exists, skipping") | |
| return {"status": "skipped", "message": "Event already exists"} | |
| # Create audit log entry | |
| audit_log = AuditLog( | |
| event_id=event_id, | |
| event_type=event_type, | |
| user_id=user_id, | |
| task_id=task_id, | |
| event_data={ | |
| "title": task_data.get("title", ""), | |
| "description": task_data.get("description", ""), | |
| "completed": task_data.get("completed", False) | |
| } | |
| ) | |
| session.add(audit_log) | |
| session.commit() | |
| session.refresh(audit_log) | |
| logger.info(f"Audit event {event_id} saved successfully") | |
| return {"status": "success", "message": "Event saved", "id": audit_log.id} | |
| except Exception as e: | |
| logger.error(f"Error saving audit event: {e}", exc_info=True) | |
| session.rollback() | |
| return {"status": "error", "message": str(e)} | |
| async def create_audit_event( | |
| user_id: str, | |
| data: Dict[str, Any] = Body(...), | |
| current_user: User = Depends(get_current_user), | |
| session: Session = Depends(get_session_dep) | |
| ): | |
| """ | |
| Frontend API endpoint to create audit events for task operations. | |
| Called by the frontend when tasks are created, updated, completed, or deleted. | |
| Expected request body: | |
| { | |
| "event_type": "created|updated|completed|deleted", | |
| "task_id": 123, | |
| "event_data": { | |
| "title": "task title", | |
| "description": "optional description", | |
| "completed": false | |
| } | |
| } | |
| """ | |
| try: | |
| # Verify that the user is creating audit events for themselves | |
| if str(current_user.id) != user_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Unauthorized" | |
| ) | |
| # Extract fields from request body | |
| event_type = data.get("event_type") | |
| task_id = data.get("task_id") | |
| event_data = data.get("event_data", {}) | |
| # Validate required fields | |
| if not event_type or not task_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Missing required fields: event_type, task_id" | |
| ) | |
| # Validate event type | |
| valid_event_types = ['created', 'updated', 'completed', 'deleted'] | |
| if event_type not in valid_event_types: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Invalid event type. Must be one of: {valid_event_types}" | |
| ) | |
| # Generate unique event ID | |
| event_id = str(uuid.uuid4()) | |
| # Create audit log entry | |
| audit_log = AuditLog( | |
| event_id=event_id, | |
| event_type=event_type, | |
| user_id=user_id, | |
| task_id=task_id, | |
| event_data={ | |
| "title": event_data.get("title", ""), | |
| "description": event_data.get("description", ""), | |
| "completed": event_data.get("completed", False) | |
| } | |
| ) | |
| session.add(audit_log) | |
| session.commit() | |
| session.refresh(audit_log) | |
| logger.info(f"Audit event {event_id} created successfully for user {user_id}") | |
| return { | |
| "status": "success", | |
| "message": "Audit event created", | |
| "id": audit_log.id, | |
| "event_id": audit_log.event_id | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error creating audit event: {e}", exc_info=True) | |
| session.rollback() | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to create audit event: {str(e)}" | |
| ) | |
| async def get_user_audit_events( | |
| user_id: UUID, | |
| current_user: User = Depends(get_current_user), | |
| session: Session = Depends(get_session_dep), | |
| offset: int = 0, | |
| limit: int = 50 | |
| ): | |
| """Get audit events for a specific user.""" | |
| # Verify that the user_id matches the authenticated user | |
| if current_user.id != user_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Query audit logs for the user | |
| query = select(AuditLog).where(AuditLog.user_id == str(user_id)).order_by(AuditLog.timestamp.desc()) | |
| # Get total count | |
| total_query = select(AuditLog).where(AuditLog.user_id == str(user_id)) | |
| total_count = len(session.exec(total_query).all()) | |
| # Apply pagination | |
| audit_logs = session.exec(query.offset(offset).limit(limit)).all() | |
| # Convert to dict | |
| events = [ | |
| { | |
| "id": log.id, | |
| "event_id": log.event_id, | |
| "event_type": log.event_type, | |
| "user_id": log.user_id, | |
| "task_id": log.task_id, | |
| "event_data": log.event_data, | |
| "timestamp": log.timestamp.isoformat() if log.timestamp else None | |
| } | |
| for log in audit_logs | |
| ] | |
| return { | |
| "events": events, | |
| "total": total_count, | |
| "offset": offset, | |
| "limit": limit | |
| } | |