File size: 3,289 Bytes
b95e73a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""Anomaly detection for government spending data."""

from typing import Dict, List, Optional, Tuple
from ..core.base_models import MLModel


class AnomalyDetector(MLModel):
    """Detects anomalies in government spending patterns."""
    
    def __init__(self):
        super().__init__("anomaly_detector")
        self._thresholds = {
            "value_threshold": 1000000,  # 1M BRL
            "frequency_threshold": 10,
            "pattern_threshold": 0.8
        }
    
    async def train(self, data: List[Dict], **kwargs) -> Dict:
        """Train anomaly detection model (stub)."""
        # TODO: Implement actual ML training with historical data
        self._is_trained = True
        return {
            "status": "trained",
            "samples": len(data),
            "model": self.model_name
        }
    
    async def predict(self, data: List[Dict]) -> List[Dict]:
        """Detect anomalies in spending data."""
        anomalies = []
        
        for item in data:
            anomaly_score, reasons = await self._calculate_anomaly_score(item)
            
            if anomaly_score > 0.5:  # Threshold for anomaly
                anomalies.append({
                    "item": item,
                    "anomaly_score": anomaly_score,
                    "reasons": reasons,
                    "severity": self._get_severity(anomaly_score)
                })
        
        return anomalies
    
    async def evaluate(self, data: List[Dict]) -> Dict:
        """Evaluate anomaly detection performance."""
        predictions = await self.predict(data)
        return {
            "total_items": len(data),
            "anomalies_detected": len(predictions),
            "anomaly_rate": len(predictions) / len(data) if data else 0
        }
    
    async def _calculate_anomaly_score(self, item: Dict) -> Tuple[float, List[str]]:
        """Calculate anomaly score for an item."""
        score = 0.0
        reasons = []
        
        # Check value anomalies
        value = item.get("valor", 0)
        if isinstance(value, (int, float)) and value > self._thresholds["value_threshold"]:
            score += 0.3
            reasons.append(f"Alto valor: R$ {value:,.2f}")
        
        # Check frequency anomalies (simplified)
        supplier = item.get("fornecedor", {}).get("nome", "")
        if supplier and len(supplier) < 10:  # Very short supplier names
            score += 0.2
            reasons.append("Nome de fornecedor suspeito")
        
        # Check pattern anomalies (simplified)
        description = item.get("objeto", "").lower()
        suspicious_keywords = ["urgente", "emergencial", "dispensada"]
        if any(keyword in description for keyword in suspicious_keywords):
            score += 0.4
            reasons.append("Contratação com características suspeitas")
        
        return min(score, 1.0), reasons
    
    def _get_severity(self, score: float) -> str:
        """Get severity level based on anomaly score."""
        if score >= 0.8:
            return "high"
        elif score >= 0.6:
            return "medium"
        else:
            return "low"
    
    def set_thresholds(self, **thresholds):
        """Update detection thresholds."""
        self._thresholds.update(thresholds)