Spaces:
Paused
Paused
| import logging | |
| from typing import Any | |
| from fastapi import APIRouter, Body, Depends, HTTPException, Query | |
| from sqlalchemy.orm import Session | |
| from app.dependencies import get_current_project_id | |
| from app.modules.auth.service import auth_service | |
| from core.database import User, get_db | |
| from .schemas import ( | |
| BulkDeleteRequest, | |
| BulkDeleteResponse, | |
| CaseCreate, | |
| CaseCreateResponse, | |
| CaseListResponse, | |
| CaseNoteCreate, | |
| CaseNoteResponse, | |
| CaseResponse, | |
| ) | |
| from .service import case_service | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # ===== CASE MANAGEMENT ENDPOINTS ===== | |
| async def create_case( | |
| case_data: CaseCreate, | |
| current_user: User = Depends(auth_service.get_current_user), | |
| db: Session = Depends(get_db), | |
| project_id: str = Depends(get_current_project_id), | |
| ): | |
| """Create a new case""" | |
| try: | |
| # Prepare dictionary for service | |
| # Normalize priority but preserve casing if it's one of the standard ones for the response | |
| priority_val = case_data.priority if case_data.priority else "Medium" | |
| create_dict = { | |
| "title": case_data.title, | |
| "description": case_data.description, | |
| "priority": priority_val, | |
| "status": "open", | |
| "fraud_amount": 0.0, | |
| "tags": case_data.tags or [], | |
| "project_id": project_id, | |
| } | |
| new_case = case_service.create_case( | |
| db, create_dict, creator_id=getattr(current_user, "id", None) | |
| ) | |
| return { | |
| "id": new_case.id, | |
| "case_id": new_case.id, | |
| "message": "Case created successfully", | |
| "case": { | |
| "id": new_case.id, | |
| "case_id": new_case.id, | |
| "title": new_case.title, | |
| "status": new_case.status, | |
| "priority": new_case.priority, | |
| "fraud_amount": getattr(new_case, "fraud_amount", 0.0), | |
| "customer_name": getattr(new_case, "customer_name", "Unknown"), | |
| "created_at": ( | |
| new_case.created_at.isoformat() if new_case.created_at else None | |
| ), | |
| }, | |
| } | |
| except Exception as e: | |
| logger.error(f"Error creating case: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_cases( | |
| page: int = Query(1, ge=1, le=1000), | |
| per_page: int = Query(20, ge=1, le=100), | |
| search: str | None = Query(None, min_length=1, max_length=100), | |
| status: str | None = Query( | |
| None, pattern=r"^(OPEN|INVESTIGATING|PENDING_REVIEW|ESCALATED|CLOSED|ARCHIVED)$" | |
| ), | |
| assignee_id: str | None = Query(None), | |
| priority: str | None = Query(None, pattern=r"^(Low|Medium|High|Critical)$"), | |
| current_user: User = Depends(auth_service.get_current_user), | |
| db: Session = Depends(get_db), | |
| project_id: str = Depends(get_current_project_id), | |
| ): | |
| """ | |
| Get a paginated list of cases with optional filtering. | |
| """ | |
| try: | |
| filters = { | |
| "status": status.lower() if status else None, | |
| "priority": priority.lower() if priority else None, | |
| "search": search, | |
| "project_id": project_id, | |
| "assignee_id": assignee_id, | |
| } | |
| result = case_service.get_cases_paginated(db, page, per_page, filters) | |
| # Format for response | |
| cases_data = [] | |
| for row in result["cases"]: | |
| cases_data.append( | |
| { | |
| "id": row.id, | |
| "case_id": row.id, | |
| "title": row.title, | |
| "description": row.description, | |
| "status": row.status, | |
| "priority": row.priority, | |
| "assignee_id": row.assignee_id, | |
| "risk_score": getattr(row, "risk_score", 0), | |
| "risk_level": getattr(row, "risk_level", "low"), | |
| "fraud_amount": getattr(row, "fraud_amount", 0.0), | |
| "customer_name": getattr(row, "customer_name", "Unknown"), | |
| "created_at": row.created_at, | |
| "updated_at": row.updated_at, | |
| "due_date": getattr(row, "due_date", None), | |
| "tags": getattr(row, "tags", []), | |
| } | |
| ) | |
| return { | |
| "cases": cases_data, | |
| "page": result["current_page"], | |
| "per_page": per_page, | |
| "total": result["total"], | |
| "total_pages": result["total_pages"], | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing cases: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_case_detail( | |
| case_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """Get detailed information for a specific case""" | |
| case = case_service.get_case(db, case_id) | |
| if not case: | |
| raise HTTPException(status_code=404, detail="Case not found") | |
| return { | |
| "id": case.id, | |
| "case_id": case.id, | |
| "title": case.title, | |
| "description": case.description, | |
| "status": case.status, | |
| "priority": case.priority, | |
| "assignee_id": case.assignee_id, | |
| "risk_score": getattr(case, "risk_score", 0), | |
| "risk_level": getattr(case, "risk_level", "low"), | |
| "fraud_amount": getattr(case, "fraud_amount", 0.0), | |
| "customer_name": getattr(case, "customer_name", "Unknown"), | |
| "created_at": case.created_at, | |
| "updated_at": case.updated_at, | |
| "due_date": getattr(case, "due_date", None), | |
| "tags": getattr(case, "tags", []), | |
| } | |
| async def update_case_partial( | |
| case_id: str, | |
| updates: dict[str, Any] = Body(), | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """Update general case details""" | |
| case = case_service.update_case(db, case_id, updates) | |
| if not case: | |
| raise HTTPException(status_code=404, detail="Case not found") | |
| return { | |
| "id": case.id, | |
| "case_id": case.id, | |
| "title": case.title, | |
| "description": case.description, | |
| "status": case.status, | |
| "priority": case.priority, | |
| "assignee_id": case.assignee_id, | |
| "risk_score": getattr(case, "risk_score", 0), | |
| "risk_level": getattr(case, "risk_level", "low"), | |
| "fraud_amount": getattr(case, "fraud_amount", 0.0), | |
| "customer_name": getattr(case, "customer_name", "Unknown"), | |
| "created_at": case.created_at, | |
| "updated_at": case.updated_at, | |
| "due_date": getattr(case, "due_date", None), | |
| "tags": getattr(case, "tags", []), | |
| } | |
| async def delete_case( | |
| case_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """Delete a case""" | |
| success = case_service.delete_case(db, case_id) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Case not found") | |
| return {"message": "Case deleted successfully"} | |
| async def bulk_delete_cases( | |
| request: BulkDeleteRequest, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """Bulk delete cases""" | |
| case_ids = request.case_ids | |
| if not case_ids: | |
| return BulkDeleteResponse( | |
| deleted_count=0, failed_ids=[], message="No cases specified for deletion" | |
| ) | |
| deleted_count, failed_ids = case_service.bulk_delete_cases(db, case_ids) | |
| message = f"Successfully deleted {deleted_count} cases" | |
| if failed_ids: | |
| message += f", {len(failed_ids)} failed" | |
| return BulkDeleteResponse( | |
| deleted_count=deleted_count, failed_ids=failed_ids, message=message | |
| ) | |
| # ===== CASE NOTE ENDPOINTS ===== | |
| async def get_case_notes( | |
| case_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """Get all notes for a case""" | |
| notes = case_service.get_notes(db, case_id) | |
| # Format for response | |
| return [ | |
| { | |
| "id": note.id, | |
| "content": note.content, | |
| "author_id": note.user_id, | |
| "author_name": getattr(note.user, "full_name", "Unknown"), | |
| "is_internal": note.is_internal, | |
| "category": getattr(note, "category", "Analysis"), | |
| "created_at": note.created_at, | |
| } | |
| for note in notes | |
| ] | |
| async def add_case_note( | |
| case_id: str, | |
| note_data: CaseNoteCreate, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """Add a new note to a case""" | |
| user_id = getattr(current_user, "id", None) | |
| if not user_id: | |
| raise HTTPException(status_code=401, detail="Authentication required") | |
| note = case_service.add_note(db, case_id, note_data.model_dump(), user_id) | |
| return { | |
| "id": note.id, | |
| "content": note.content, | |
| "author_id": note.user_id, | |
| "author_name": getattr(current_user, "full_name", "Unknown"), | |
| "is_internal": note.is_internal, | |
| "category": getattr(note, "category", "Analysis"), | |
| "created_at": note.created_at, | |
| } | |