teoat's picture
Upload folder using huggingface_hub
4ae946d verified
import logging
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import text
from sqlalchemy.orm import Session
from app.modules.auth.service import auth_service
from core.database import User, get_db
from .schemas import TransactionFlowResponse
from .service import AnalyticsTimeframe, analytics_service
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/cases")
async def get_case_analytics(
date_from: str | None = None,
date_to: str | None = None,
db: Session = Depends(get_db),
current_user: User = Depends(auth_service.get_current_user),
):
"""Get optimized case analytics"""
try:
date_from_parsed = datetime.fromisoformat(date_from) if date_from else None
date_to_parsed = datetime.fromisoformat(date_to) if date_to else None
analytics = analytics_service.get_case_analytics(
db, date_from_parsed, date_to_parsed
)
return analytics
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/transactions")
async def get_transaction_analytics(
case_id: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
db: Session = Depends(get_db),
current_user: User = Depends(auth_service.get_current_user),
):
"""Get transaction analytics with optimized aggregates"""
try:
date_from_parsed = datetime.fromisoformat(date_from) if date_from else None
date_to_parsed = datetime.fromisoformat(date_to) if date_to else None
analytics = analytics_service.get_transaction_aggregates(
db, case_id, date_from_parsed, date_to_parsed
)
return analytics
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/overview")
async def get_system_overview(
db: Session = Depends(get_db),
current_user: User = Depends(auth_service.get_current_user),
):
"""Get system overview statistics"""
try:
from app.modules.cases.service import case_service
case_stats = case_service.get_case_stats(db)
# Get recent cases using CaseService
recent_cases_data = case_service.get_cases_paginated(
db, page=1, per_page=5, filters={}
)
recent_cases = recent_cases_data.get("cases", [])
return {
"case_stats": case_stats,
"recent_cases": [
{
"id": case.id,
"title": case.title,
"status": (
case.status.value
if hasattr(case.status, "value")
else case.status
),
"created_at": (
case.created_at.isoformat() if case.created_at else None
),
}
for case in recent_cases
],
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/temporal-flow", response_model=list[TransactionFlowResponse])
async def get_temporal_flow(
days: int = 30,
limit: int = 2000,
db: Session = Depends(get_db),
current_user: User = Depends(auth_service.get_current_user),
):
"""Get temporal flow data for visualization"""
try:
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
query = """
SELECT t.id, t.customer_name, t.merchant_name, t.amount, t.date, t.merchant_category, t.risk_score
FROM transactions t
WHERE t.date >= :cutoff_date
ORDER BY t.date ASC
LIMIT :limit
"""
result = db.execute(text(query), {"cutoff_date": cutoff_date, "limit": limit})
flows = []
for row in result:
risk_score = float(row[6]) if row[6] else 0.0
tx_type = "normal"
if risk_score > 80:
tx_type = "flagged"
elif risk_score > 50:
tx_type = "suspicious"
flows.append(
{
"id": str(row[0]),
"source": row[1] or "Unknown Source",
"target": row[2] or "Unknown Target",
"amount": float(row[3]),
"timestamp": str(row[4]),
"type": tx_type,
"category": row[5] or "Uncategorized",
"risk_score": risk_score,
}
)
return flows
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/behavioral")
async def get_behavioral_analytics(
current_user: User = Depends(auth_service.get_current_user),
):
"""Get behavioral analytics for heatmaps"""
try:
risk_heatmaps = await analytics_service.generate_risk_heatmaps(
AnalyticsTimeframe.MONTH
)
return risk_heatmaps
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))