Spaces:
Paused
Paused
| """ | |
| Fraud Service - Handles fraud alerts, rules, and investigative actions. | |
| """ | |
| import logging | |
| from datetime import UTC, datetime | |
| from typing import Any | |
| from sqlalchemy.orm import Session | |
| from app.services.fraud.engine import rule_engine | |
| from core.database import Case, Transaction | |
| from core.database import FraudAlert as FraudAlertModel | |
| logger = logging.getLogger(__name__) | |
| class FraudService: | |
| """Consolidated logic for fraud detection, alert management, and account freezing""" | |
| def __init__(self, db: Session): | |
| self.db = db | |
| self.rule_engine = rule_engine | |
| async def analyze_case( | |
| self, case_id: str, transaction_ids: list[str] | None = None | |
| ) -> dict[str, Any]: | |
| """Analyze a case for fraud patterns""" | |
| try: | |
| case = self.db.query(Case).filter(Case.id == case_id).first() | |
| if not case: | |
| return {"error": "Case not found", "alerts": []} | |
| query = self.db.query(Transaction).filter(Transaction.case_id == case_id) | |
| if transaction_ids: | |
| query = query.filter(Transaction.id.in_(transaction_ids)) | |
| transactions = query.all() | |
| transaction_dicts = [ | |
| { | |
| "id": t.id, | |
| "amount": float(t.amount) if t.amount else 0.0, | |
| "timestamp": t.timestamp.isoformat() if t.timestamp else "", | |
| "description": t.description, | |
| "merchant": t.merchant, | |
| } | |
| for t in transactions | |
| ] | |
| rule_alerts = await self.rule_engine.execute_rules(transaction_dicts) | |
| saved_alerts = [] | |
| for result in rule_alerts: | |
| alert_metadata = { | |
| "transaction_ids": result.transaction_ids, | |
| "confidence": result.confidence, | |
| "risk_score": result.risk_score, | |
| "recommendations": result.recommendations, | |
| "rule_name": result.rule_name, | |
| "status": "open", | |
| } | |
| alert = FraudAlertModel( | |
| case_id=case_id, | |
| alert_type="fraud_rule", | |
| title=f"Fraud Alert: {result.rule_name}", | |
| severity=result.severity.value, | |
| description=result.description, | |
| alert_metadata=alert_metadata, | |
| created_at=datetime.now(UTC), | |
| ) | |
| self.db.add(alert) | |
| saved_alerts.append(alert) | |
| self.db.commit() | |
| return {"case_id": case_id, "alerts_generated": len(saved_alerts)} | |
| except Exception as e: | |
| logger.error(f"Error analyzing case {case_id}: {e}") | |
| self.db.rollback() | |
| return {"error": str(e), "alerts": []} | |
| def get_case_alerts(self, case_id: str) -> list[dict[str, Any]]: | |
| alerts = ( | |
| self.db.query(FraudAlertModel) | |
| .filter(FraudAlertModel.case_id == case_id) | |
| .all() | |
| ) | |
| return [dict(a._mapping) if hasattr(a, "_mapping") else vars(a) for a in alerts] | |
| # For backward compatibility where constructors were used | |
| FraudDetectionService = FraudService | |
| # Instance factory for dependency injection | |
| def get_fraud_service(db: Session = None): | |
| return FraudService(db) if db else None | |