""" Analytics Service - Advanced analytics, business intelligence, and real-time insights. """ import logging import random from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from typing import Any from pydantic import BaseModel logger = logging.getLogger(__name__) # --- Enums and Dataclasses (from advanced_analytics) --- class AnalyticsTimeframe(Enum): HOUR = "1H" DAY = "1D" WEEK = "1W" MONTH = "1M" QUARTER = "3M" YEAR = "1Y" class MetricType(Enum): FRAUD_AMOUNT = "fraud_amount" CASE_COUNT = "case_count" DETECTION_RATE = "detection_rate" FALSE_POSITIVE_RATE = "false_positive_rate" RESPONSE_TIME = "response_time" RECOVERY_RATE = "recovery_rate" @dataclass class AnalyticsInsight: title: str description: str impact_level: str # "high", "medium", "low" confidence_score: float recommended_actions: list[str] supporting_data: dict[str, Any] generated_at: datetime @dataclass class PredictiveTrend: metric: str current_value: float predicted_value: float trend_direction: str # "increasing", "decreasing", "stable" confidence_interval: tuple[float, float] time_horizon: str drivers: list[str] # --- Pydantic Models (from dashboard_analytics) --- class InvestigationMetrics(BaseModel): total_cases: int = 0 active_cases: int = 0 completed_cases: int = 0 average_resolution_time: float = 0.0 success_rate: float = 0.0 fraud_detection_rate: float = 0.0 false_positive_rate: float = 0.0 ai_assist_rate: float = 0.0 user_satisfaction_score: float = 0.0 compliance_rate: float = 0.0 class PerformanceTrend(BaseModel): date: datetime total_cases: int resolution_time_avg: float success_rate: float ai_effectiveness: float fraud_prevention_rate: float false_positive_rate: float = 0.0 class InvestigationInsight(BaseModel): id: str insight_type: str title: str description: str confidence_score: float impact_level: str recommendations: list[str] created_at: datetime # --- Unified Analytics Service --- class AnalyticsService: """Unified service for analytics, insights and dashboarding""" def __init__(self): self.metrics_history: list[PerformanceTrend] = [] self._setup_initial_data() def _setup_initial_data(self): """Mock initial data for demonstration if needed""" base_date = datetime.now() - timedelta(days=30) for i in range(30): date = base_date + timedelta(days=i) self.metrics_history.append( PerformanceTrend( date=date, total_cases=random.randint(15, 25), resolution_time_avg=72.0 - (i * 0.5), success_rate=75.0 + (i * 0.8), ai_effectiveness=i * 1.2, fraud_prevention_rate=85.0 + (i * 0.3), false_positive_rate=15.0 - (i * 0.2), ) ) async def get_dashboard_data(self, time_range_days: int = 30) -> dict[str, Any]: """Get comprehensive dashboard metrics and trends""" cutoff_date = datetime.now() - timedelta(days=time_range_days) trends = [t for t in self.metrics_history if t.date >= cutoff_date] latest = ( trends[-1] if trends else PerformanceTrend( date=datetime.now(), total_cases=0, resolution_time_avg=0, success_rate=0, ai_effectiveness=0, fraud_prevention_rate=0, ) ) return { "current_metrics": { "total_cases": latest.total_cases, "active_cases": random.randint(5, 10), "resolution_time": latest.resolution_time_avg, "success_rate": latest.success_rate, "fraud_detection_rate": latest.fraud_prevention_rate, "ai_assist_rate": latest.ai_effectiveness, }, "trends": [t.dict() for t in trends], "generated_at": datetime.now().isoformat(), } async def generate_risk_heatmaps( self, timeframe: AnalyticsTimeframe = AnalyticsTimeframe.MONTH ) -> dict[str, Any]: """Generate risk heatmaps for geographic and temporal analysis""" return { "geographic": { "regions": [ { "name": "North America", "risk_score": 2.1, "cases": 45, "amount": 850000, }, { "name": "Europe", "risk_score": 1.8, "cases": 38, "amount": 720000, }, { "name": "Asia Pacific", "risk_score": 2.4, "cases": 52, "amount": 980000, }, ] }, "temporal": { "hourly_patterns": [random.uniform(1, 5) for _ in range(24)], "weekly_patterns": [random.uniform(2, 4) for _ in range(7)], }, } def get_case_analytics( self, db, date_from: datetime | None = None, date_to: datetime | None = None ) -> dict[str, Any]: """Get case analytics with date filtering""" from sqlalchemy import case, func from core.database import Case query = db.query( func.count().label("total"), func.sum(case((Case.status == "open", 1), else_=0)).label("open"), func.sum(case((Case.status == "closed", 1), else_=0)).label("closed"), func.sum(case((Case.priority == "critical", 1), else_=0)).label("critical"), ) if date_from: query = query.filter(Case.created_at >= date_from) if date_to: query = query.filter(Case.created_at <= date_to) result = query.one() return { "total_cases": result.total or 0, "open_cases": result.open or 0, "closed_cases": result.closed or 0, "critical_cases": result.critical or 0, "date_from": date_from.isoformat() if date_from else None, "date_to": date_to.isoformat() if date_to else None, } def get_transaction_aggregates( self, db, case_id: str | None = None, date_from: datetime | None = None, date_to: datetime | None = None, ) -> dict[str, Any]: """Get transaction aggregates with filtering""" from sqlalchemy import func from core.database import Transaction query = db.query( func.count(Transaction.id).label("count"), func.sum(Transaction.amount).label("total_amount"), func.avg(Transaction.amount).label("avg_amount"), func.max(Transaction.amount).label("max_amount"), ) if case_id: query = query.filter(Transaction.case_id == case_id) if date_from: query = query.filter(Transaction.date >= date_from) if date_to: query = query.filter(Transaction.date <= date_to) result = query.one() return { "transaction_count": result.count or 0, "total_amount": float(result.total_amount or 0), "average_amount": float(result.avg_amount or 0), "max_amount": float(result.max_amount or 0), "case_id": case_id, "date_from": date_from.isoformat() if date_from else None, "date_to": date_to.isoformat() if date_to else None, } # Singleton analytics_service = AnalyticsService() # For backward compatibility with advanced_analytics.py imports advanced_analytics = analytics_service