Spaces:
Paused
Paused
| """ | |
| Proof API Router | |
| Provides endpoints for fraud proof mechanisms that generate court-admissible evidence: | |
| - Metadata correlation detection | |
| - Temporal burst pattern analysis | |
| - Immutable audit chain verification | |
| - Shell network/community detection | |
| """ | |
| import logging | |
| from datetime import UTC, datetime | |
| from fastapi import APIRouter, Depends, HTTPException, Query | |
| from sqlalchemy.orm import Session | |
| from app.services.graph_service import relationship_graph | |
| from app.services.immutable_audit_chain import immutable_audit_chain | |
| from app.services.infrastructure.auth_service import auth_service | |
| from app.services.intelligence.metadata_correlation_service import ( | |
| MetadataCorrelationEngine, | |
| ) | |
| from app.services.temporal_burst_detector import ( | |
| TemporalBurstDetector, | |
| temporal_burst_detector, | |
| ) | |
| from core.database import get_db | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/proof", tags=["fraud-proof"]) | |
| # ---- Test placeholders (allow tests to patch module-level dependencies) ---- | |
| if "get_current_user" not in globals(): | |
| try: | |
| get_current_user = auth_service.get_current_user | |
| except Exception: | |
| def get_current_user(*args, **kwargs): | |
| return None | |
| if "require_permission" not in globals(): | |
| def require_permission(*args, **kwargs): | |
| def _dep(*a, **k): | |
| return None | |
| return _dep | |
| for _svc in ( | |
| "relationship_graph", | |
| "metadata_correlation_engine", | |
| "immutable_audit_chain", | |
| ): | |
| if _svc not in globals(): | |
| globals()[_svc] = None | |
| # ===== METADATA CORRELATION ENDPOINTS ===== | |
| def get_metadata_correlations(case_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Get metadata correlations for a case. | |
| Detects relationships between entities via shared metadata: | |
| - Same phone number | |
| - Same email address | |
| - Same physical address | |
| - Same IP address | |
| Returns: | |
| Correlation results with confidence scores | |
| """ | |
| try: | |
| engine = MetadataCorrelationEngine(db) | |
| correlations = engine.find_all_correlations(case_id) | |
| return { | |
| "success": True, | |
| "case_id": case_id, | |
| "analyzed_at": datetime.now(UTC).isoformat(), | |
| "correlation_count": len(correlations), | |
| "correlations": correlations, | |
| "summary": { | |
| "phone_correlations": len( | |
| [c for c in correlations if c.get("metadata_type") == "phone"] | |
| ), | |
| "email_correlations": len( | |
| [c for c in correlations if c.get("metadata_type") == "email"] | |
| ), | |
| "address_correlations": len( | |
| [c for c in correlations if c.get("metadata_type") == "address"] | |
| ), | |
| "ip_correlations": len( | |
| [c for c in correlations if c.get("metadata_type") == "ip_address"] | |
| ), | |
| }, | |
| } | |
| except Exception as e: | |
| logger.error(f"Metadata correlation error for case {case_id}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ===== TEMPORAL BURST DETECTION ENDPOINTS ===== | |
| def detect_temporal_bursts( | |
| case_id: str, | |
| burst_threshold: int = Query( | |
| 10, description="Minimum transactions for burst detection" | |
| ), | |
| burst_window_hours: int = Query(48, description="Time window in hours"), | |
| structuring_threshold: float = Query( | |
| 10000, description="Reporting threshold amount" | |
| ), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| Detect temporal burst patterns in case transactions. | |
| Patterns detected: | |
| - Burst: Rapid transaction sequences (10+ in 48 hours default) | |
| - Structuring: Amounts clustering below reporting threshold | |
| - Velocity: Sudden increases in transaction frequency | |
| Returns: | |
| Burst analysis results with alerts and risk scores | |
| """ | |
| try: | |
| # Get transactions for case | |
| from core.database import Transaction | |
| transactions = ( | |
| db.query(Transaction).filter(Transaction.case_id == case_id).all() | |
| ) | |
| # Convert to dictionaries | |
| txn_dicts = [] | |
| for txn in transactions: | |
| txn_dicts.append( | |
| { | |
| "id": txn.id, | |
| "customer_id": txn.account_id, | |
| "amount": float(txn.amount) if txn.amount else 0, | |
| "date": txn.date.isoformat() if txn.date else None, | |
| "customer_name": txn.account_id, | |
| } | |
| ) | |
| # Create detector with custom thresholds | |
| detector = TemporalBurstDetector( | |
| burst_threshold=burst_threshold, | |
| burst_window_hours=burst_window_hours, | |
| structuring_threshold=structuring_threshold, | |
| ) | |
| results = detector.analyze_transactions(txn_dicts, case_id=case_id) | |
| return {"success": True, **results} | |
| except Exception as e: | |
| logger.error(f"Temporal burst detection error for case {case_id}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def analyze_transactions_for_bursts( | |
| transactions: list[dict], | |
| burst_threshold: int = Query( | |
| 10, description="Minimum transactions for burst detection" | |
| ), | |
| burst_window_hours: int = Query(48, description="Time window in hours"), | |
| ): | |
| """ | |
| Analyze provided transactions for temporal burst patterns. | |
| Useful for ad-hoc analysis without a case context. | |
| Args: | |
| transactions: List of transaction dictionaries with date, amount, customer_id | |
| Returns: | |
| Burst analysis results | |
| """ | |
| try: | |
| detector = TemporalBurstDetector( | |
| burst_threshold=burst_threshold, burst_window_hours=burst_window_hours | |
| ) | |
| results = detector.analyze_transactions(transactions) | |
| return {"success": True, **results} | |
| except Exception as e: | |
| logger.error(f"Temporal burst analysis error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ===== AUDIT CHAIN ENDPOINTS ===== | |
| def verify_audit_chain(): | |
| """ | |
| Verify the integrity of the immutable audit chain. | |
| Checks: | |
| - Chain linkage (each entry links to previous) | |
| - HMAC signatures validity | |
| - Data hash integrity | |
| Returns: | |
| Verification result with status and any detected issues | |
| """ | |
| try: | |
| result = immutable_audit_chain.verify_chain_integrity() | |
| return {"success": True, **result} | |
| except Exception as e: | |
| logger.error(f"Audit chain verification error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def export_audit_chain_proof( | |
| start_sequence: int | None = Query(None, description="Starting sequence number"), | |
| end_sequence: int | None = Query(None, description="Ending sequence number"), | |
| entity_type: str | None = Query(None, description="Filter by entity type"), | |
| entity_id: str | None = Query(None, description="Filter by entity ID"), | |
| ): | |
| """ | |
| Export chain proof for court-admissible evidence. | |
| Generates a cryptographically signed proof document containing: | |
| - Filtered audit entries | |
| - Chain verification data | |
| - HMAC proof signature | |
| Returns: | |
| Court-ready proof document | |
| """ | |
| try: | |
| proof = immutable_audit_chain.get_chain_proof( | |
| start_sequence=start_sequence, | |
| end_sequence=end_sequence, | |
| entity_type=entity_type, | |
| entity_id=entity_id, | |
| ) | |
| return {"success": True, **proof} | |
| except Exception as e: | |
| logger.error(f"Audit chain export error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_audit_chain_stats(): | |
| """Get audit chain statistics""" | |
| try: | |
| stats = immutable_audit_chain.get_chain_stats() | |
| return {"success": True, **stats} | |
| except Exception as e: | |
| logger.error(f"Audit chain stats error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def append_audit_entry( | |
| event_type: str = Query(description="Type of event"), | |
| action: str = Query(description="Action taken"), | |
| entity_type: str | None = Query(None, description="Entity type"), | |
| entity_id: str | None = Query(None, description="Entity ID"), | |
| user_id: str | None = Query(None, description="User ID"), | |
| data: dict | None = None, | |
| ): | |
| """ | |
| Append a new entry to the immutable audit chain. | |
| This creates a cryptographically linked entry with: | |
| - Link to previous entry (previous_hash) | |
| - HMAC signature | |
| - Data hash | |
| """ | |
| try: | |
| entry = immutable_audit_chain.append_entry( | |
| event_type=event_type, | |
| action=action, | |
| entity_type=entity_type, | |
| entity_id=entity_id, | |
| user_id=user_id, | |
| data=data, | |
| ) | |
| return {"success": True, "entry": entry} | |
| except Exception as e: | |
| logger.error(f"Audit chain append error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ===== COMMUNITY DETECTION ENDPOINTS ===== | |
| def detect_shell_networks( | |
| case_id: str, | |
| min_community_size: int = Query(3, description="Minimum community size"), | |
| max_density: float = Query(0.9, description="Maximum density threshold"), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| Detect potential shell company networks in case data. | |
| Uses Louvain community detection to identify: | |
| - Tight-knit transaction clusters | |
| - Circular transaction patterns | |
| - Entities with high internal transaction ratios | |
| Returns: | |
| Detected shell networks with risk scoring | |
| """ | |
| try: | |
| # Get transactions for case to build graph | |
| from core.database import Transaction | |
| transactions = ( | |
| db.query(Transaction).filter(Transaction.case_id == case_id).all() | |
| ) | |
| # Convert to dictionaries for graph building | |
| txn_dicts = [] | |
| for txn in transactions: | |
| txn_dicts.append( | |
| { | |
| "id": txn.id, | |
| "customer_id": txn.account_id, | |
| "account_id": txn.account_id, | |
| "amount": float(txn.amount) if txn.amount else 0, | |
| "date": txn.date.isoformat() if txn.date else None, | |
| "merchant_name": getattr(txn, "counterparty", None) | |
| or getattr(txn, "description", "Unknown"), | |
| } | |
| ) | |
| if not txn_dicts: | |
| return { | |
| "success": True, | |
| "case_id": case_id, | |
| "analyzed_at": datetime.now(UTC).isoformat(), | |
| "transaction_count": 0, | |
| "shell_networks": [], | |
| "summary": { | |
| "total_communities": 0, | |
| "suspicious_networks": 0, | |
| "highest_risk_score": 0, | |
| }, | |
| } | |
| # Build graph and detect shell networks | |
| relationship_graph.build_graph_from_transactions(txn_dicts) | |
| shell_networks = relationship_graph.detect_shell_networks( | |
| min_community_size=min_community_size, max_density=max_density | |
| ) | |
| return { | |
| "success": True, | |
| "case_id": case_id, | |
| "analyzed_at": datetime.now(UTC).isoformat(), | |
| "transaction_count": len(txn_dicts), | |
| "shell_networks": shell_networks, | |
| "summary": { | |
| "total_communities": len(relationship_graph.detect_communities()), | |
| "suspicious_networks": len(shell_networks), | |
| "highest_risk_score": max( | |
| [n["risk_score"] for n in shell_networks], default=0 | |
| ), | |
| }, | |
| } | |
| except Exception as e: | |
| logger.error(f"Community detection error for case {case_id}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ===== PROOF SUMMARY ENDPOINT ===== | |
| def get_proof_summary(case_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Get combined proof visualization data for a case. | |
| Aggregates all proof mechanisms into a single dashboard-ready response: | |
| - Metadata correlation summary | |
| - Temporal burst summary | |
| - Audit chain status | |
| - Shell network summary | |
| Returns: | |
| Combined proof summary with confidence scores | |
| """ | |
| try: | |
| # Metadata correlations | |
| try: | |
| engine = MetadataCorrelationEngine(db) | |
| correlations = engine.find_all_correlations(case_id) | |
| metadata_summary = { | |
| "total_correlations": len(correlations), | |
| "types": { | |
| "phone": len( | |
| [c for c in correlations if c.get("metadata_type") == "phone"] | |
| ), | |
| "email": len( | |
| [c for c in correlations if c.get("metadata_type") == "email"] | |
| ), | |
| "address": len( | |
| [c for c in correlations if c.get("metadata_type") == "address"] | |
| ), | |
| "ip": len( | |
| [ | |
| c | |
| for c in correlations | |
| if c.get("metadata_type") == "ip_address" | |
| ] | |
| ), | |
| }, | |
| "confidence": min(1.0, len(correlations) * 0.15) if correlations else 0, | |
| } | |
| except Exception as e: | |
| logger.warning(f"Metadata correlation failed: {e}") | |
| metadata_summary = { | |
| "total_correlations": 0, | |
| "types": {}, | |
| "confidence": 0, | |
| "error": str(e), | |
| } | |
| # Temporal bursts | |
| try: | |
| from core.database import Transaction | |
| transactions = ( | |
| db.query(Transaction).filter(Transaction.case_id == case_id).all() | |
| ) | |
| txn_dicts = [ | |
| { | |
| "id": txn.id, | |
| "customer_id": txn.account_id, | |
| "amount": float(txn.amount) if txn.amount else 0, | |
| "date": txn.date.isoformat() if txn.date else None, | |
| } | |
| for txn in transactions | |
| ] | |
| burst_results = temporal_burst_detector.analyze_transactions( | |
| txn_dicts, case_id | |
| ) | |
| burst_summary = { | |
| "alerts": len(burst_results.get("alerts", [])), | |
| "risk_score": burst_results.get("summary", {}).get( | |
| "overall_risk_score", 0 | |
| ), | |
| "patterns": { | |
| "burst": burst_results.get("summary", {}).get("burst_patterns", 0), | |
| "structuring": burst_results.get("summary", {}).get( | |
| "structuring_patterns", 0 | |
| ), | |
| "velocity": burst_results.get("summary", {}).get( | |
| "velocity_anomalies", 0 | |
| ), | |
| }, | |
| "confidence": min( | |
| 1.0, | |
| burst_results.get("summary", {}).get("overall_risk_score", 0) / 100, | |
| ), | |
| } | |
| except Exception as e: | |
| logger.warning(f"Temporal burst detection failed: {e}") | |
| burst_summary = { | |
| "alerts": 0, | |
| "risk_score": 0, | |
| "patterns": {}, | |
| "confidence": 0, | |
| "error": str(e), | |
| } | |
| # Audit chain status | |
| try: | |
| chain_verification = immutable_audit_chain.verify_chain_integrity() | |
| audit_summary = { | |
| "status": chain_verification.get("status", "unknown"), | |
| "total_entries": chain_verification.get("total_entries", 0), | |
| "integrity_percentage": chain_verification.get( | |
| "integrity_percentage", 0 | |
| ), | |
| "confidence": chain_verification.get("integrity_percentage", 0) / 100, | |
| } | |
| except Exception as e: | |
| logger.warning(f"Audit chain verification failed: {e}") | |
| audit_summary = { | |
| "status": "error", | |
| "total_entries": 0, | |
| "integrity_percentage": 0, | |
| "confidence": 0, | |
| "error": str(e), | |
| } | |
| # Shell networks | |
| try: | |
| shell_networks = relationship_graph.detect_shell_networks() | |
| shell_summary = { | |
| "networks_detected": len(shell_networks), | |
| "highest_risk_score": max( | |
| [n["risk_score"] for n in shell_networks], default=0 | |
| ), | |
| "confidence": ( | |
| min(1.0, len(shell_networks) * 0.2) if shell_networks else 0 | |
| ), | |
| } | |
| except Exception as e: | |
| logger.warning(f"Shell network detection failed: {e}") | |
| shell_summary = { | |
| "networks_detected": 0, | |
| "highest_risk_score": 0, | |
| "confidence": 0, | |
| "error": str(e), | |
| } | |
| # Calculate overall proof score | |
| overall_confidence = ( | |
| metadata_summary.get("confidence", 0) * 0.25 | |
| + burst_summary.get("confidence", 0) * 0.30 | |
| + audit_summary.get("confidence", 0) * 0.25 | |
| + shell_summary.get("confidence", 0) * 0.20 | |
| ) | |
| return { | |
| "success": True, | |
| "case_id": case_id, | |
| "analyzed_at": datetime.now(UTC).isoformat(), | |
| "proof_summary": { | |
| "metadata_correlations": metadata_summary, | |
| "temporal_bursts": burst_summary, | |
| "audit_chain": audit_summary, | |
| "shell_networks": shell_summary, | |
| "overall_confidence": round(overall_confidence, 2), | |
| "court_readiness": ( | |
| "high" | |
| if overall_confidence >= 0.7 | |
| else "medium" if overall_confidence >= 0.4 else "low" | |
| ), | |
| }, | |
| } | |
| except Exception as e: | |
| logger.error(f"Proof summary error for case {case_id}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |