Spaces:
Sleeping
Sleeping
| """ | |
| Audit Logs API Endpoints | |
| """ | |
| from fastapi import APIRouter, Depends, Query | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy import desc, or_ | |
| from typing import List, Optional | |
| from datetime import datetime | |
| from app.api.deps import get_db, get_current_user | |
| from app.core.permissions import require_role | |
| from app.models.user import User | |
| from app.models.audit_log import AuditLog | |
| from app.schemas.audit_log import AuditLogResponse, AuditLogListResponse | |
| router = APIRouter() | |
| def get_audit_logs( | |
| skip: int = Query(0, ge=0), | |
| limit: int = Query(100, ge=1, le=1000), | |
| user_id: Optional[str] = Query(None, description="Filter by user ID"), | |
| action: Optional[str] = Query(None, description="Filter by action type"), | |
| entity_type: Optional[str] = Query(None, description="Filter by entity type"), | |
| search: Optional[str] = Query(None, description="Search in description or email"), | |
| start_date: Optional[str] = Query(None, description="Start date (ISO format)"), | |
| end_date: Optional[str] = Query(None, description="End date (ISO format)"), | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user) | |
| ): | |
| """ | |
| Get audit logs with filtering and pagination | |
| **Authorization:** | |
| - Platform admins see all audit logs | |
| - Org admins see only logs from users in their organization | |
| Query parameters: | |
| - skip: Number of records to skip (pagination) | |
| - limit: Number of records to return (max 1000) | |
| - user_id: Filter by specific user | |
| - action: Filter by action type (create, update, delete, login, etc.) | |
| - entity_type: Filter by entity type (user, ticket, project, etc.) | |
| - search: Search in description or user_email | |
| - start_date: Filter logs from this date (ISO format) | |
| - end_date: Filter logs until this date (ISO format) | |
| """ | |
| # Build query | |
| query = db.query(AuditLog) | |
| # Org scoping for org admins | |
| # Filter to show only logs from users in their organization | |
| if current_user.role in ['client_admin', 'contractor_admin']: | |
| # Get all user IDs in the same org | |
| org_users_query = db.query(User.id).filter(User.deleted_at == None) | |
| if current_user.client_id: | |
| org_users_query = org_users_query.filter(User.client_id == current_user.client_id) | |
| elif current_user.contractor_id: | |
| org_users_query = org_users_query.filter(User.contractor_id == current_user.contractor_id) | |
| org_user_ids = [str(u.id) for u in org_users_query.all()] | |
| # Filter audit logs to only show actions by users in the org | |
| query = query.filter(AuditLog.user_id.in_(org_user_ids)) | |
| # Apply filters | |
| if user_id: | |
| query = query.filter(AuditLog.user_id == user_id) | |
| if action: | |
| query = query.filter(AuditLog.action == action) | |
| if entity_type: | |
| query = query.filter(AuditLog.entity_type == entity_type) | |
| if search: | |
| search_term = f"%{search}%" | |
| query = query.filter( | |
| or_( | |
| AuditLog.description.ilike(search_term), | |
| AuditLog.user_email.ilike(search_term) | |
| ) | |
| ) | |
| if start_date: | |
| query = query.filter(AuditLog.created_at >= start_date) | |
| if end_date: | |
| query = query.filter(AuditLog.created_at <= end_date) | |
| # Get total count before pagination | |
| total = query.count() | |
| # Apply pagination and ordering (most recent first) | |
| audit_logs = query.order_by(desc(AuditLog.created_at)).offset(skip).limit(limit).all() | |
| return { | |
| "items": audit_logs, | |
| "total": total, | |
| "skip": skip, | |
| "limit": limit | |
| } | |
| def get_audit_log( | |
| audit_log_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user) | |
| ): | |
| """ | |
| Get a specific audit log entry by ID | |
| **Authorization:** | |
| - Platform admins can view any log | |
| - Org admins can only view logs from users in their organization | |
| """ | |
| from app.core.exceptions import NotFoundException | |
| from fastapi import HTTPException, status | |
| audit_log = db.query(AuditLog).filter(AuditLog.id == audit_log_id).first() | |
| if not audit_log: | |
| raise NotFoundException("Audit log not found") | |
| # Org scoping check | |
| if current_user.role in ['client_admin', 'contractor_admin']: | |
| # Get the user who performed this action | |
| log_user = db.query(User).filter(User.id == audit_log.user_id).first() | |
| if log_user: | |
| # Check if the log user is in the same org | |
| if current_user.client_id and log_user.client_id != current_user.client_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only view audit logs from your organization" | |
| ) | |
| if current_user.contractor_id and log_user.contractor_id != current_user.contractor_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only view audit logs from your organization" | |
| ) | |
| return audit_log | |
| def get_user_audit_logs( | |
| user_id: str, | |
| skip: int = Query(0, ge=0), | |
| limit: int = Query(100, ge=1, le=1000), | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user) | |
| ): | |
| """ | |
| Get all audit logs for a specific user | |
| **Authorization:** | |
| - Platform admins can view logs for any user | |
| - Org admins can only view logs for users in their organization | |
| """ | |
| from fastapi import HTTPException, status | |
| # Org scoping check - verify the target user is in the same org | |
| if current_user.role in ['client_admin', 'contractor_admin']: | |
| target_user = db.query(User).filter(User.id == user_id).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| if current_user.client_id and target_user.client_id != current_user.client_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only view audit logs for users in your organization" | |
| ) | |
| if current_user.contractor_id and target_user.contractor_id != current_user.contractor_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only view audit logs for users in your organization" | |
| ) | |
| query = db.query(AuditLog).filter(AuditLog.user_id == user_id) | |
| total = query.count() | |
| audit_logs = query.order_by(desc(AuditLog.created_at)).offset(skip).limit(limit).all() | |
| return { | |
| "items": audit_logs, | |
| "total": total, | |
| "skip": skip, | |
| "limit": limit | |
| } | |
| def export_audit_logs_csv( | |
| start_date: Optional[str] = Query(None), | |
| end_date: Optional[str] = Query(None), | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user) | |
| ): | |
| """ | |
| Export audit logs to CSV format (Platform Admin only) | |
| """ | |
| from fastapi.responses import StreamingResponse | |
| import io | |
| import csv | |
| # Build query | |
| query = db.query(AuditLog) | |
| if start_date: | |
| query = query.filter(AuditLog.created_at >= start_date) | |
| if end_date: | |
| query = query.filter(AuditLog.created_at <= end_date) | |
| audit_logs = query.order_by(desc(AuditLog.created_at)).all() | |
| # Create CSV in memory | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| # Write header | |
| writer.writerow([ | |
| 'ID', 'User Email', 'User Role', 'Action', 'Entity Type', | |
| 'Entity ID', 'Description', 'IP Address', 'Created At' | |
| ]) | |
| # Write data | |
| for log in audit_logs: | |
| writer.writerow([ | |
| str(log.id), | |
| log.user_email or 'N/A', | |
| log.user_role or 'N/A', | |
| log.action, | |
| log.entity_type, | |
| str(log.entity_id) if log.entity_id else 'N/A', | |
| log.description, | |
| str(log.ip_address) if log.ip_address else 'N/A', | |
| log.created_at | |
| ]) | |
| # Prepare response | |
| output.seek(0) | |
| return StreamingResponse( | |
| iter([output.getvalue()]), | |
| media_type="text/csv", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename=audit_logs_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv" | |
| } | |
| ) | |