Spaces:
Sleeping
Sleeping
| """Anomaly detection for government spending data.""" | |
| from typing import Dict, List, Optional, Tuple | |
| from ..core.base_models import MLModel | |
| class AnomalyDetector(MLModel): | |
| """Detects anomalies in government spending patterns.""" | |
| def __init__(self): | |
| super().__init__("anomaly_detector") | |
| self._thresholds = { | |
| "value_threshold": 1000000, # 1M BRL | |
| "frequency_threshold": 10, | |
| "pattern_threshold": 0.8 | |
| } | |
| async def train(self, data: List[Dict], **kwargs) -> Dict: | |
| """Train anomaly detection model (stub).""" | |
| # TODO: Implement actual ML training with historical data | |
| self._is_trained = True | |
| return { | |
| "status": "trained", | |
| "samples": len(data), | |
| "model": self.model_name | |
| } | |
| async def predict(self, data: List[Dict]) -> List[Dict]: | |
| """Detect anomalies in spending data.""" | |
| anomalies = [] | |
| for item in data: | |
| anomaly_score, reasons = await self._calculate_anomaly_score(item) | |
| if anomaly_score > 0.5: # Threshold for anomaly | |
| anomalies.append({ | |
| "item": item, | |
| "anomaly_score": anomaly_score, | |
| "reasons": reasons, | |
| "severity": self._get_severity(anomaly_score) | |
| }) | |
| return anomalies | |
| async def evaluate(self, data: List[Dict]) -> Dict: | |
| """Evaluate anomaly detection performance.""" | |
| predictions = await self.predict(data) | |
| return { | |
| "total_items": len(data), | |
| "anomalies_detected": len(predictions), | |
| "anomaly_rate": len(predictions) / len(data) if data else 0 | |
| } | |
| async def _calculate_anomaly_score(self, item: Dict) -> Tuple[float, List[str]]: | |
| """Calculate anomaly score for an item.""" | |
| score = 0.0 | |
| reasons = [] | |
| # Check value anomalies | |
| value = item.get("valor", 0) | |
| if isinstance(value, (int, float)) and value > self._thresholds["value_threshold"]: | |
| score += 0.3 | |
| reasons.append(f"Alto valor: R$ {value:,.2f}") | |
| # Check frequency anomalies (simplified) | |
| supplier = item.get("fornecedor", {}).get("nome", "") | |
| if supplier and len(supplier) < 10: # Very short supplier names | |
| score += 0.2 | |
| reasons.append("Nome de fornecedor suspeito") | |
| # Check pattern anomalies (simplified) | |
| description = item.get("objeto", "").lower() | |
| suspicious_keywords = ["urgente", "emergencial", "dispensada"] | |
| if any(keyword in description for keyword in suspicious_keywords): | |
| score += 0.4 | |
| reasons.append("Contratação com características suspeitas") | |
| return min(score, 1.0), reasons | |
| def _get_severity(self, score: float) -> str: | |
| """Get severity level based on anomaly score.""" | |
| if score >= 0.8: | |
| return "high" | |
| elif score >= 0.6: | |
| return "medium" | |
| else: | |
| return "low" | |
| def set_thresholds(self, **thresholds): | |
| """Update detection thresholds.""" | |
| self._thresholds.update(thresholds) |