zenith-backend / app /modules /cases /service.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Case Service - Business logic for case management
"""
import logging
from typing import Any, Optional
from sqlalchemy.orm import Session
from core.database import Case, CaseNote
from .repository import CaseRepository
logger = logging.getLogger(__name__)
class CaseService:
"""Service for managing fraud investigation cases"""
def get_case(self, db: Session, case_id: str) -> Optional[Case]:
"""Get a case by ID"""
repo = CaseRepository(db)
return repo.get_by_id(case_id)
def get_cases_paginated(
self, db: Session, page: int, per_page: int, filters: dict[str, Any]
) -> dict[str, Any]:
"""Get cases with pagination and filtering"""
repo = CaseRepository(db)
cases, total_count = repo.get_paginated(page, per_page, filters)
total_pages = (total_count + per_page - 1) // per_page if per_page > 0 else 0
return {
"cases": cases,
"total": total_count,
"total_pages": total_pages,
"current_page": page,
}
def create_case(
self, db: Session, data: dict, creator_id: Optional[str] = None
) -> Case:
"""Create a new case"""
repo = CaseRepository(db)
case = repo.create(data, creator_id=creator_id)
db.commit()
db.refresh(case)
return case
def update_case(self, db: Session, case_id: str, updates: dict) -> Optional[Case]:
"""Update a case"""
repo = CaseRepository(db)
case = repo.get_by_id(case_id)
if not case:
return None
updated_case = repo.update(case, updates)
db.commit()
db.refresh(updated_case)
return updated_case
def delete_case(self, db: Session, case_id: str) -> bool:
"""Delete a case"""
repo = CaseRepository(db)
case = repo.get_by_id(case_id)
if not case:
return False
repo.delete(case)
db.commit()
return True
def bulk_delete_cases(
self, db: Session, case_ids: list[str]
) -> tuple[int, list[str]]:
"""
Bulk delete cases efficiently.
Returns (deleted_count, failed_ids)
"""
repo = CaseRepository(db)
# Fetch existing cases
cases = repo.get_all_by_ids(case_ids)
found_ids = {c.id for c in cases}
failed_ids = [cid for cid in case_ids if cid not in found_ids]
for case in cases:
repo.delete(case)
db.commit()
return len(cases), failed_ids
# ===== CASE NOTE METHODS =====
def get_notes(self, db: Session, case_id: str) -> list[CaseNote]:
"""Get all notes for a case"""
repo = CaseRepository(db)
return repo.get_notes(case_id)
def add_note(
self, db: Session, case_id: str, note_data: dict, user_id: str
) -> CaseNote:
"""Add a note to a case"""
repo = CaseRepository(db)
note = repo.add_note(case_id, note_data, user_id)
db.commit()
db.refresh(note)
return note
def get_case_stats(
self, db: Session, project_id: Optional[str] = None
) -> dict[str, Any]:
"""Get case statistics with optimized single query"""
from sqlalchemy import case, func
query = db.query(
func.count().label("total"),
func.sum(case((Case.status == "open", 1), else_=0)).label("open_cases"),
func.sum(case((Case.status == "closed", 1), else_=0)).label("closed_cases"),
func.sum(case((Case.priority == "critical", 1), else_=0)).label(
"critical_cases"
),
)
if project_id:
query = query.filter(Case.project_id == project_id)
result = query.one()
return {
"total_cases": result.total or 0,
"open_cases": result.open_cases or 0,
"closed_cases": result.closed_cases or 0,
"critical_cases": result.critical_cases or 0,
}
# Singleton instance
case_service = CaseService()