neural-thinker's picture
feat: initial cidadao.ai-models deployment
b95e73a
"""Anomaly detection for government spending data."""
from typing import Dict, List, Optional, Tuple
from ..core.base_models import MLModel
class AnomalyDetector(MLModel):
"""Detects anomalies in government spending patterns."""
def __init__(self):
super().__init__("anomaly_detector")
self._thresholds = {
"value_threshold": 1000000, # 1M BRL
"frequency_threshold": 10,
"pattern_threshold": 0.8
}
async def train(self, data: List[Dict], **kwargs) -> Dict:
"""Train anomaly detection model (stub)."""
# TODO: Implement actual ML training with historical data
self._is_trained = True
return {
"status": "trained",
"samples": len(data),
"model": self.model_name
}
async def predict(self, data: List[Dict]) -> List[Dict]:
"""Detect anomalies in spending data."""
anomalies = []
for item in data:
anomaly_score, reasons = await self._calculate_anomaly_score(item)
if anomaly_score > 0.5: # Threshold for anomaly
anomalies.append({
"item": item,
"anomaly_score": anomaly_score,
"reasons": reasons,
"severity": self._get_severity(anomaly_score)
})
return anomalies
async def evaluate(self, data: List[Dict]) -> Dict:
"""Evaluate anomaly detection performance."""
predictions = await self.predict(data)
return {
"total_items": len(data),
"anomalies_detected": len(predictions),
"anomaly_rate": len(predictions) / len(data) if data else 0
}
async def _calculate_anomaly_score(self, item: Dict) -> Tuple[float, List[str]]:
"""Calculate anomaly score for an item."""
score = 0.0
reasons = []
# Check value anomalies
value = item.get("valor", 0)
if isinstance(value, (int, float)) and value > self._thresholds["value_threshold"]:
score += 0.3
reasons.append(f"Alto valor: R$ {value:,.2f}")
# Check frequency anomalies (simplified)
supplier = item.get("fornecedor", {}).get("nome", "")
if supplier and len(supplier) < 10: # Very short supplier names
score += 0.2
reasons.append("Nome de fornecedor suspeito")
# Check pattern anomalies (simplified)
description = item.get("objeto", "").lower()
suspicious_keywords = ["urgente", "emergencial", "dispensada"]
if any(keyword in description for keyword in suspicious_keywords):
score += 0.4
reasons.append("Contratação com características suspeitas")
return min(score, 1.0), reasons
def _get_severity(self, score: float) -> str:
"""Get severity level based on anomaly score."""
if score >= 0.8:
return "high"
elif score >= 0.6:
return "medium"
else:
return "low"
def set_thresholds(self, **thresholds):
"""Update detection thresholds."""
self._thresholds.update(thresholds)