File size: 8,059 Bytes
4ae946d
 
 
 
 
 
 
 
 
d29a5a0
4ae946d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d29a5a0
4ae946d
 
 
 
 
 
 
 
 
 
d29a5a0
4ae946d
d29a5a0
4ae946d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d29a5a0
4ae946d
 
 
 
 
 
 
 
 
 
d29a5a0
4ae946d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""
Analytics Service - Advanced analytics, business intelligence, and real-time insights.
"""

import logging
import random
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any

from pydantic import BaseModel

logger = logging.getLogger(__name__)

# --- Enums and Dataclasses (from advanced_analytics) ---


class AnalyticsTimeframe(Enum):
    HOUR = "1H"
    DAY = "1D"
    WEEK = "1W"
    MONTH = "1M"
    QUARTER = "3M"
    YEAR = "1Y"


class MetricType(Enum):
    FRAUD_AMOUNT = "fraud_amount"
    CASE_COUNT = "case_count"
    DETECTION_RATE = "detection_rate"
    FALSE_POSITIVE_RATE = "false_positive_rate"
    RESPONSE_TIME = "response_time"
    RECOVERY_RATE = "recovery_rate"


@dataclass
class AnalyticsInsight:
    title: str
    description: str
    impact_level: str  # "high", "medium", "low"
    confidence_score: float
    recommended_actions: list[str]
    supporting_data: dict[str, Any]
    generated_at: datetime


@dataclass
class PredictiveTrend:
    metric: str
    current_value: float
    predicted_value: float
    trend_direction: str  # "increasing", "decreasing", "stable"
    confidence_interval: tuple[float, float]
    time_horizon: str
    drivers: list[str]


# --- Pydantic Models (from dashboard_analytics) ---


class InvestigationMetrics(BaseModel):
    total_cases: int = 0
    active_cases: int = 0
    completed_cases: int = 0
    average_resolution_time: float = 0.0
    success_rate: float = 0.0
    fraud_detection_rate: float = 0.0
    false_positive_rate: float = 0.0
    ai_assist_rate: float = 0.0
    user_satisfaction_score: float = 0.0
    compliance_rate: float = 0.0


class PerformanceTrend(BaseModel):
    date: datetime
    total_cases: int
    resolution_time_avg: float
    success_rate: float
    ai_effectiveness: float
    fraud_prevention_rate: float
    false_positive_rate: float = 0.0


class InvestigationInsight(BaseModel):
    id: str
    insight_type: str
    title: str
    description: str
    confidence_score: float
    impact_level: str
    recommendations: list[str]
    created_at: datetime


# --- Unified Analytics Service ---


class AnalyticsService:
    """Unified service for analytics, insights and dashboarding"""

    def __init__(self):
        self.metrics_history: list[PerformanceTrend] = []
        self._setup_initial_data()

    def _setup_initial_data(self):
        """Mock initial data for demonstration if needed"""
        base_date = datetime.now() - timedelta(days=30)
        for i in range(30):
            date = base_date + timedelta(days=i)
            self.metrics_history.append(
                PerformanceTrend(
                    date=date,
                    total_cases=random.randint(15, 25),
                    resolution_time_avg=72.0 - (i * 0.5),
                    success_rate=75.0 + (i * 0.8),
                    ai_effectiveness=i * 1.2,
                    fraud_prevention_rate=85.0 + (i * 0.3),
                    false_positive_rate=15.0 - (i * 0.2),
                )
            )

    async def get_dashboard_data(self, time_range_days: int = 30) -> dict[str, Any]:
        """Get comprehensive dashboard metrics and trends"""
        cutoff_date = datetime.now() - timedelta(days=time_range_days)
        trends = [t for t in self.metrics_history if t.date >= cutoff_date]

        latest = (
            trends[-1]
            if trends
            else PerformanceTrend(
                date=datetime.now(),
                total_cases=0,
                resolution_time_avg=0,
                success_rate=0,
                ai_effectiveness=0,
                fraud_prevention_rate=0,
            )
        )

        return {
            "current_metrics": {
                "total_cases": latest.total_cases,
                "active_cases": random.randint(5, 10),
                "resolution_time": latest.resolution_time_avg,
                "success_rate": latest.success_rate,
                "fraud_detection_rate": latest.fraud_prevention_rate,
                "ai_assist_rate": latest.ai_effectiveness,
            },
            "trends": [t.dict() for t in trends],
            "generated_at": datetime.now().isoformat(),
        }

    async def generate_risk_heatmaps(
        self, timeframe: AnalyticsTimeframe = AnalyticsTimeframe.MONTH
    ) -> dict[str, Any]:
        """Generate risk heatmaps for geographic and temporal analysis"""
        return {
            "geographic": {
                "regions": [
                    {
                        "name": "North America",
                        "risk_score": 2.1,
                        "cases": 45,
                        "amount": 850000,
                    },
                    {
                        "name": "Europe",
                        "risk_score": 1.8,
                        "cases": 38,
                        "amount": 720000,
                    },
                    {
                        "name": "Asia Pacific",
                        "risk_score": 2.4,
                        "cases": 52,
                        "amount": 980000,
                    },
                ]
            },
            "temporal": {
                "hourly_patterns": [random.uniform(1, 5) for _ in range(24)],
                "weekly_patterns": [random.uniform(2, 4) for _ in range(7)],
            },
        }

    def get_case_analytics(
        self, db, date_from: datetime | None = None, date_to: datetime | None = None
    ) -> dict[str, Any]:
        """Get case analytics with date filtering"""
        from sqlalchemy import case, func

        from core.database import Case

        query = db.query(
            func.count().label("total"),
            func.sum(case((Case.status == "open", 1), else_=0)).label("open"),
            func.sum(case((Case.status == "closed", 1), else_=0)).label("closed"),
            func.sum(case((Case.priority == "critical", 1), else_=0)).label("critical"),
        )

        if date_from:
            query = query.filter(Case.created_at >= date_from)
        if date_to:
            query = query.filter(Case.created_at <= date_to)

        result = query.one()

        return {
            "total_cases": result.total or 0,
            "open_cases": result.open or 0,
            "closed_cases": result.closed or 0,
            "critical_cases": result.critical or 0,
            "date_from": date_from.isoformat() if date_from else None,
            "date_to": date_to.isoformat() if date_to else None,
        }

    def get_transaction_aggregates(
        self,
        db,
        case_id: str | None = None,
        date_from: datetime | None = None,
        date_to: datetime | None = None,
    ) -> dict[str, Any]:
        """Get transaction aggregates with filtering"""
        from sqlalchemy import func

        from core.database import Transaction

        query = db.query(
            func.count(Transaction.id).label("count"),
            func.sum(Transaction.amount).label("total_amount"),
            func.avg(Transaction.amount).label("avg_amount"),
            func.max(Transaction.amount).label("max_amount"),
        )

        if case_id:
            query = query.filter(Transaction.case_id == case_id)
        if date_from:
            query = query.filter(Transaction.date >= date_from)
        if date_to:
            query = query.filter(Transaction.date <= date_to)

        result = query.one()

        return {
            "transaction_count": result.count or 0,
            "total_amount": float(result.total_amount or 0),
            "average_amount": float(result.avg_amount or 0),
            "max_amount": float(result.max_amount or 0),
            "case_id": case_id,
            "date_from": date_from.isoformat() if date_from else None,
            "date_to": date_to.isoformat() if date_to else None,
        }


# Singleton
analytics_service = AnalyticsService()
# For backward compatibility with advanced_analytics.py imports
advanced_analytics = analytics_service