swiftops-backend / src /app /api /v1 /audit_logs.py
kamau1's picture
feat: implement org admin permissions, org-scoped filtering, and audit log updates
b43d99a
"""
Audit Logs API Endpoints
"""
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from sqlalchemy import desc, or_
from typing import List, Optional
from datetime import datetime
from app.api.deps import get_db, get_current_user
from app.core.permissions import require_role
from app.models.user import User
from app.models.audit_log import AuditLog
from app.schemas.audit_log import AuditLogResponse, AuditLogListResponse
router = APIRouter()
@router.get("", response_model=AuditLogListResponse)
@require_role(["platform_admin", "client_admin", "contractor_admin"])
def get_audit_logs(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
user_id: Optional[str] = Query(None, description="Filter by user ID"),
action: Optional[str] = Query(None, description="Filter by action type"),
entity_type: Optional[str] = Query(None, description="Filter by entity type"),
search: Optional[str] = Query(None, description="Search in description or email"),
start_date: Optional[str] = Query(None, description="Start date (ISO format)"),
end_date: Optional[str] = Query(None, description="End date (ISO format)"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get audit logs with filtering and pagination
**Authorization:**
- Platform admins see all audit logs
- Org admins see only logs from users in their organization
Query parameters:
- skip: Number of records to skip (pagination)
- limit: Number of records to return (max 1000)
- user_id: Filter by specific user
- action: Filter by action type (create, update, delete, login, etc.)
- entity_type: Filter by entity type (user, ticket, project, etc.)
- search: Search in description or user_email
- start_date: Filter logs from this date (ISO format)
- end_date: Filter logs until this date (ISO format)
"""
# Build query
query = db.query(AuditLog)
# Org scoping for org admins
# Filter to show only logs from users in their organization
if current_user.role in ['client_admin', 'contractor_admin']:
# Get all user IDs in the same org
org_users_query = db.query(User.id).filter(User.deleted_at == None)
if current_user.client_id:
org_users_query = org_users_query.filter(User.client_id == current_user.client_id)
elif current_user.contractor_id:
org_users_query = org_users_query.filter(User.contractor_id == current_user.contractor_id)
org_user_ids = [str(u.id) for u in org_users_query.all()]
# Filter audit logs to only show actions by users in the org
query = query.filter(AuditLog.user_id.in_(org_user_ids))
# Apply filters
if user_id:
query = query.filter(AuditLog.user_id == user_id)
if action:
query = query.filter(AuditLog.action == action)
if entity_type:
query = query.filter(AuditLog.entity_type == entity_type)
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
AuditLog.description.ilike(search_term),
AuditLog.user_email.ilike(search_term)
)
)
if start_date:
query = query.filter(AuditLog.created_at >= start_date)
if end_date:
query = query.filter(AuditLog.created_at <= end_date)
# Get total count before pagination
total = query.count()
# Apply pagination and ordering (most recent first)
audit_logs = query.order_by(desc(AuditLog.created_at)).offset(skip).limit(limit).all()
return {
"items": audit_logs,
"total": total,
"skip": skip,
"limit": limit
}
@router.get("/{audit_log_id}", response_model=AuditLogResponse)
@require_role(["platform_admin", "client_admin", "contractor_admin"])
def get_audit_log(
audit_log_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get a specific audit log entry by ID
**Authorization:**
- Platform admins can view any log
- Org admins can only view logs from users in their organization
"""
from app.core.exceptions import NotFoundException
from fastapi import HTTPException, status
audit_log = db.query(AuditLog).filter(AuditLog.id == audit_log_id).first()
if not audit_log:
raise NotFoundException("Audit log not found")
# Org scoping check
if current_user.role in ['client_admin', 'contractor_admin']:
# Get the user who performed this action
log_user = db.query(User).filter(User.id == audit_log.user_id).first()
if log_user:
# Check if the log user is in the same org
if current_user.client_id and log_user.client_id != current_user.client_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only view audit logs from your organization"
)
if current_user.contractor_id and log_user.contractor_id != current_user.contractor_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only view audit logs from your organization"
)
return audit_log
@router.get("/user/{user_id}", response_model=AuditLogListResponse)
@require_role(["platform_admin", "client_admin", "contractor_admin"])
def get_user_audit_logs(
user_id: str,
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get all audit logs for a specific user
**Authorization:**
- Platform admins can view logs for any user
- Org admins can only view logs for users in their organization
"""
from fastapi import HTTPException, status
# Org scoping check - verify the target user is in the same org
if current_user.role in ['client_admin', 'contractor_admin']:
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if current_user.client_id and target_user.client_id != current_user.client_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only view audit logs for users in your organization"
)
if current_user.contractor_id and target_user.contractor_id != current_user.contractor_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only view audit logs for users in your organization"
)
query = db.query(AuditLog).filter(AuditLog.user_id == user_id)
total = query.count()
audit_logs = query.order_by(desc(AuditLog.created_at)).offset(skip).limit(limit).all()
return {
"items": audit_logs,
"total": total,
"skip": skip,
"limit": limit
}
@router.get("/export/csv")
@require_role(["platform_admin"])
def export_audit_logs_csv(
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Export audit logs to CSV format (Platform Admin only)
"""
from fastapi.responses import StreamingResponse
import io
import csv
# Build query
query = db.query(AuditLog)
if start_date:
query = query.filter(AuditLog.created_at >= start_date)
if end_date:
query = query.filter(AuditLog.created_at <= end_date)
audit_logs = query.order_by(desc(AuditLog.created_at)).all()
# Create CSV in memory
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow([
'ID', 'User Email', 'User Role', 'Action', 'Entity Type',
'Entity ID', 'Description', 'IP Address', 'Created At'
])
# Write data
for log in audit_logs:
writer.writerow([
str(log.id),
log.user_email or 'N/A',
log.user_role or 'N/A',
log.action,
log.entity_type,
str(log.entity_id) if log.entity_id else 'N/A',
log.description,
str(log.ip_address) if log.ip_address else 'N/A',
log.created_at
])
# Prepare response
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename=audit_logs_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
}
)