zenith-backend / app /modules /cases /repository.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
from datetime import UTC, datetime
from typing import Any, Optional
from uuid import uuid4
from sqlalchemy import desc, or_
from sqlalchemy.orm import Session, joinedload
from core.database import Case, CaseNote
class CaseRepository:
"""
Repository for Case entity.
Isolates database queries from business logic.
"""
def __init__(self, session: Session):
self.session = session
def get_by_id(self, case_id: str) -> Optional[Case]:
return self.session.query(Case).filter(Case.id == case_id).first()
def get_all_by_ids(self, case_ids: list[str]) -> list[Case]:
"""Get multiple cases by IDs with eager loading of dependencies"""
from sqlalchemy.orm import selectinload
return (
self.session.query(Case)
.filter(Case.id.in_(case_ids))
.options(
selectinload(Case.transactions),
selectinload(Case.evidence),
selectinload(Case.notes),
selectinload(Case.activities),
selectinload(Case.alerts),
selectinload(Case.trade_transactions),
selectinload(Case.crypto_transactions),
)
.all()
)
def get_paginated(
self, page: int, per_page: int, filters: dict[str, Any]
) -> tuple[list[Case], int]:
"""
Returns (cases, total_count)
"""
offset = (page - 1) * per_page
query = self.session.query(Case)
# Apply Filters
if filters.get("status"):
query = query.filter(Case.status == filters["status"])
if filters.get("assignee_id"):
query = query.filter(Case.assignee_id == filters["assignee_id"])
if filters.get("project_id"):
query = query.filter(Case.project_id == filters["project_id"])
if filters.get("search"):
term = f"%{filters['search']}%"
query = query.filter(
or_(Case.title.ilike(term), Case.description.ilike(term))
)
total_count = query.count()
cases = (
query.order_by(desc(Case.created_at)).offset(offset).limit(per_page).all()
)
return cases, total_count
def create(self, data: dict, creator_id: Optional[str] = None) -> Case:
# Generate ID if not provided
if "id" not in data:
data["id"] = str(uuid4())
case = Case(**data, created_at=datetime.now(UTC), updated_at=datetime.now(UTC))
if creator_id:
case.created_by = creator_id
self.session.add(case)
self.session.flush()
return case
def update(self, case: Case, data: dict) -> Case:
for key, value in data.items():
if hasattr(case, key):
setattr(case, key, value)
case.updated_at = datetime.now(UTC)
self.session.add(case)
return case
def delete(self, case: Case) -> None:
self.session.delete(case)
# ===== CASE NOTE METHODS =====
def get_notes(self, case_id: str) -> list[CaseNote]:
"""Get all notes for a specific case"""
# Eager load user to prevent N+1 queries when accessing author details
return (
self.session.query(CaseNote)
.options(joinedload(CaseNote.user))
.filter(CaseNote.case_id == case_id)
.order_by(desc(CaseNote.created_at))
.all()
)
def add_note(self, case_id: str, note_data: dict, user_id: str) -> CaseNote:
"""Add a new note to a case"""
note = CaseNote(
case_id=case_id,
user_id=user_id,
content=note_data.get("content"),
is_internal=note_data.get("is_internal", False),
created_at=datetime.now(UTC),
)
self.session.add(note)
self.session.flush()
return note