Spaces:
Paused
Paused
| """ | |
| Forensic Intelligence Service - Real Implementation | |
| Provides advanced forensic analysis capabilities for fraud investigation. | |
| """ | |
| import hashlib | |
| import logging | |
| from collections import defaultdict | |
| from datetime import datetime | |
| from typing import Any | |
| logger = logging.getLogger(__name__) | |
| class TriangulationEngine: | |
| """ | |
| Probabilistic unmasking of redacted transaction names using multi-source triangulation. | |
| Ref: VISION_10_10 Section 6 | |
| """ | |
| def __init__(self, db_session): | |
| self.db = db_session | |
| self._vendor_cache: dict[str, dict] = {} | |
| async def unmask_redaction( | |
| self, transaction_id: str, masked_name: str | |
| ) -> dict[str, Any]: | |
| """ | |
| Attempts to resolve '*' or partial names by looking at: | |
| 1. Similar amounts in other transactions. | |
| 2. Merchant frequency for the specific account. | |
| 3. External metadata linkages. | |
| Args: | |
| transaction_id: Transaction ID to unmask | |
| masked_name: The masked/redacted merchant name | |
| Returns: | |
| Unmasking results with confidence scores | |
| """ | |
| logger.info( | |
| f"Triangulating redacted merchant for txn {transaction_id}: {masked_name}" | |
| ) | |
| try: | |
| # Get transaction details | |
| transaction = await self._get_transaction(transaction_id) | |
| if not transaction: | |
| return { | |
| "transaction_id": transaction_id, | |
| "original_masked": masked_name, | |
| "resolved_name": None, | |
| "confidence_score": 0.0, | |
| "error": "Transaction not found", | |
| } | |
| suggestions = [] | |
| # Strategy 1: Amount-Pattern Match | |
| amount_matches = await self._find_by_amount(transaction.get("amount", 0)) | |
| for match in amount_matches: | |
| if match.get("merchant") and "*" not in match["merchant"]: | |
| suggestions.append( | |
| { | |
| "candidate": match["merchant"], | |
| "confidence": min( | |
| 0.7 + (0.1 * match.get("frequency", 1)), 0.95 | |
| ), | |
| "source": "Amount-Pattern Match", | |
| } | |
| ) | |
| # Strategy 2: Account History Sync | |
| history_matches = await self._find_by_account_history( | |
| transaction.get("account_id"), masked_name | |
| ) | |
| for match in history_matches: | |
| suggestions.append( | |
| { | |
| "candidate": match["merchant"], | |
| "confidence": min(0.8 + (0.05 * match.get("count", 1)), 0.95), | |
| "source": "Account-History Sync", | |
| } | |
| ) | |
| # Strategy 3: Pattern Recognition | |
| if masked_name and "*" in masked_name: | |
| pattern_matches = self._match_partial_pattern(masked_name) | |
| suggestions.extend(pattern_matches) | |
| # Deduplicate and sort by confidence | |
| seen = set() | |
| unique_suggestions = [] | |
| for s in sorted(suggestions, key=lambda x: x["confidence"], reverse=True): | |
| if s["candidate"] not in seen: | |
| seen.add(s["candidate"]) | |
| unique_suggestions.append(s) | |
| best_match = unique_suggestions[0] if unique_suggestions else None | |
| return { | |
| "transaction_id": transaction_id, | |
| "original_masked": masked_name, | |
| "resolved_name": best_match["candidate"] if best_match else None, | |
| "confidence_score": best_match["confidence"] if best_match else 0.0, | |
| "all_candidates": unique_suggestions[:5], | |
| "triangulation_logic": list({s["source"] for s in unique_suggestions}), | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| except Exception as e: | |
| logger.error(f"Triangulation failed for {transaction_id}: {e}") | |
| return { | |
| "transaction_id": transaction_id, | |
| "original_masked": masked_name, | |
| "resolved_name": None, | |
| "confidence_score": 0.0, | |
| "error": str(e), | |
| } | |
| async def _get_transaction(self, transaction_id: str) -> dict | None: | |
| """Get transaction from database.""" | |
| try: | |
| from core.database import Transaction | |
| txn = self.db.query(Transaction).filter_by(id=transaction_id).first() | |
| if txn: | |
| return { | |
| "id": str(txn.id), | |
| "amount": txn.amount, | |
| "account_id": getattr(txn, "account_id", None), | |
| "merchant": getattr(txn, "merchant", None), | |
| } | |
| except Exception: | |
| pass | |
| return None | |
| async def _find_by_amount(self, amount: float) -> list[dict]: | |
| """Find transactions with similar amounts.""" | |
| try: | |
| from core.database import Transaction | |
| tolerance = amount * 0.05 # 5% tolerance | |
| matches = ( | |
| self.db.query(Transaction) | |
| .filter( | |
| Transaction.amount.between(amount - tolerance, amount + tolerance) | |
| ) | |
| .limit(50) | |
| .all() | |
| ) | |
| # Group by merchant | |
| merchant_counts = defaultdict(int) | |
| for m in matches: | |
| if hasattr(m, "merchant") and m.merchant: | |
| merchant_counts[m.merchant] += 1 | |
| return [ | |
| {"merchant": k, "frequency": v} | |
| for k, v in sorted( | |
| merchant_counts.items(), key=lambda x: x[1], reverse=True | |
| ) | |
| ] | |
| except Exception: | |
| return [] | |
| async def _find_by_account_history( | |
| self, account_id: str, masked: str | |
| ) -> list[dict]: | |
| """Find matches from account transaction history.""" | |
| if not account_id: | |
| return [] | |
| try: | |
| from core.database import Transaction | |
| history = ( | |
| self.db.query(Transaction) | |
| .filter_by(account_id=account_id) | |
| .limit(100) | |
| .all() | |
| ) | |
| # Find unmasked merchants | |
| merchant_counts = defaultdict(int) | |
| for t in history: | |
| if hasattr(t, "merchant") and t.merchant and "*" not in t.merchant: | |
| merchant_counts[t.merchant] += 1 | |
| return [ | |
| {"merchant": k, "count": v} | |
| for k, v in sorted( | |
| merchant_counts.items(), key=lambda x: x[1], reverse=True | |
| )[:5] | |
| ] | |
| except Exception: | |
| return [] | |
| def _match_partial_pattern(self, masked: str) -> list[dict]: | |
| """Match partial patterns against known vendors.""" | |
| # Common vendor patterns | |
| known_patterns = { | |
| "AMZN*": "Amazon", | |
| "AMAZON*": "Amazon", | |
| "APPLE*": "Apple", | |
| "GOOGLE*": "Google", | |
| "PAYPAL*": "PayPal", | |
| "SQ *": "Square", | |
| "UBER*": "Uber", | |
| "LYFT*": "Lyft", | |
| "DOORDASH*": "DoorDash", | |
| } | |
| results = [] | |
| masked_upper = masked.upper() | |
| for pattern, vendor in known_patterns.items(): | |
| if masked_upper.startswith(pattern.replace("*", "")): | |
| results.append( | |
| { | |
| "candidate": vendor, | |
| "confidence": 0.85, | |
| "source": "Pattern Recognition", | |
| } | |
| ) | |
| return results | |
| class LIBRAlgorithm: | |
| """ | |
| Lowest Intermediate Balance Rule (LIBR) for tracking mixed funds. | |
| Used to detect illicit float in personal/business accounts. | |
| Ref: VISION_10_10 Section 6 | |
| """ | |
| def __init__(self, db_session): | |
| self.db = db_session | |
| def analyze_mixed_funds( | |
| self, | |
| account_id: str, | |
| start_date: datetime, | |
| end_date: datetime, | |
| suspected_illicit_deposits: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| """ | |
| Applies LIBR to distinguish between legitimate funds and illicit injections. | |
| The LIBR principle: The lowest balance reached between an illicit deposit | |
| and a subsequent withdrawal represents the maximum illicit funds in that withdrawal. | |
| Args: | |
| account_id: Account to analyze | |
| start_date: Analysis start date | |
| end_date: Analysis end date | |
| suspected_illicit_deposits: Optional list of suspected illicit deposit IDs | |
| Returns: | |
| LIBR analysis results | |
| """ | |
| logger.info(f"Applying LIBR Algorithm to account {account_id}") | |
| try: | |
| # Get chronological transactions | |
| transactions = self._get_transactions(account_id, start_date, end_date) | |
| if not transactions: | |
| return { | |
| "account_id": account_id, | |
| "status": "NO_DATA", | |
| "message": "No transactions found", | |
| } | |
| # Calculate running balance | |
| running_balance = 0.0 | |
| balance_history = [] | |
| for txn in transactions: | |
| running_balance += txn["amount"] | |
| balance_history.append( | |
| { | |
| "date": txn["date"], | |
| "amount": txn["amount"], | |
| "balance": running_balance, | |
| "is_suspected": txn["id"] in (suspected_illicit_deposits or []), | |
| } | |
| ) | |
| # Find minimum intermediate balances after suspected deposits | |
| libr_violations = [] | |
| illicit_float = 0.0 | |
| for i, entry in enumerate(balance_history): | |
| if entry["is_suspected"] and entry["amount"] > 0: | |
| # Find lowest balance after this deposit | |
| min_balance = entry["balance"] | |
| for j in range(i + 1, len(balance_history)): | |
| if balance_history[j]["balance"] < min_balance: | |
| min_balance = balance_history[j]["balance"] | |
| # LIBR violation if withdrawal occurred | |
| max_illicit = min( | |
| entry["amount"], max(0, entry["balance"] - min_balance) | |
| ) | |
| if max_illicit > 0: | |
| libr_violations.append( | |
| { | |
| "deposit_date": str(entry["date"]), | |
| "deposit_amount": entry["amount"], | |
| "min_subsequent_balance": min_balance, | |
| "max_illicit_withdrawn": max_illicit, | |
| } | |
| ) | |
| illicit_float += max_illicit | |
| # Calculate commingling ratio | |
| total_deposits = sum(t["amount"] for t in transactions if t["amount"] > 0) | |
| commingling_ratio = ( | |
| illicit_float / total_deposits if total_deposits > 0 else 0 | |
| ) | |
| # Determine risk status | |
| if commingling_ratio > 0.5: | |
| status = "HIGH_RISK" | |
| elif commingling_ratio > 0.2: | |
| status = "MEDIUM_RISK" | |
| elif libr_violations: | |
| status = "LOW_RISK" | |
| else: | |
| status = "CLEAN" | |
| return { | |
| "account_id": account_id, | |
| "period": f"{start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}", | |
| "commingling_ratio": round(commingling_ratio, 2), | |
| "illicit_float_detected": round(illicit_float, 2), | |
| "libr_violation_count": len(libr_violations), | |
| "status": status, | |
| "violations": libr_violations[:10], | |
| "findings": self._generate_findings(commingling_ratio, libr_violations), | |
| "analyzed_at": datetime.now().isoformat(), | |
| } | |
| except Exception as e: | |
| logger.error(f"LIBR analysis failed: {e}") | |
| return {"account_id": account_id, "status": "ERROR", "error": str(e)} | |
| def _get_transactions( | |
| self, account_id: str, start_date: datetime, end_date: datetime | |
| ) -> list[dict]: | |
| """Get transactions from database.""" | |
| try: | |
| from core.database import Transaction | |
| txns = ( | |
| self.db.query(Transaction) | |
| .filter( | |
| Transaction.account_id == account_id, | |
| Transaction.created_at.between(start_date, end_date), | |
| ) | |
| .order_by(Transaction.created_at) | |
| .all() | |
| ) | |
| return [ | |
| {"id": str(t.id), "amount": t.amount, "date": t.created_at} | |
| for t in txns | |
| ] | |
| except Exception: | |
| return [] | |
| def _generate_findings(self, ratio: float, violations: list[dict]) -> str: | |
| """Generate human-readable findings.""" | |
| if not violations: | |
| return "No LIBR violations detected. Funds appear segregated." | |
| if ratio > 0.5: | |
| return f"High commingling detected ({ratio * 100:.0f}%). Multiple illicit deposits followed by withdrawals before balance separation." | |
| elif ratio > 0.2: | |
| return f"Moderate commingling ({ratio * 100:.0f}%). Some potential structuring activity detected." | |
| else: | |
| return f"Low commingling ({ratio * 100:.0f}%). Minor LIBR violations detected but funds largely traceable." | |
| class MensReaEngine: | |
| """ | |
| Theory of Intent (Mens Rea) Engine. | |
| AI classifiers that map evidence to Knowledge, Intent, or Willful Blindness. | |
| Ref: VISION_10_10 Section 5 | |
| """ | |
| def __init__(self, ai_service=None): | |
| self.ai_service = ai_service | |
| self.legal_lexicon = { | |
| "avoidance": [ | |
| "bypass", | |
| "limit", | |
| "threshold", | |
| "split", | |
| "smurf", | |
| "avoid", | |
| "circumvent", | |
| "evade", | |
| "dodge", | |
| "structure", | |
| ], | |
| "obfuscation": [ | |
| "hide", | |
| "mask", | |
| "proxy", | |
| "nominee", | |
| "shell", | |
| "offshore", | |
| "anonymous", | |
| "conceal", | |
| "disguise", | |
| "launder", | |
| ], | |
| "knowledge": [ | |
| "aware", | |
| "understand", | |
| "policy", | |
| "regulation", | |
| "illegal", | |
| "know", | |
| "recognize", | |
| "acknowledge", | |
| "realize", | |
| "compliance", | |
| ], | |
| "planning": [ | |
| "plan", | |
| "schedule", | |
| "arrange", | |
| "organize", | |
| "coordinate", | |
| "timing", | |
| "sequence", | |
| "prepare", | |
| "strategy", | |
| ], | |
| } | |
| self.intent_weights = { | |
| "knowledge": 0.25, | |
| "avoidance": 0.35, | |
| "obfuscation": 0.30, | |
| "planning": 0.10, | |
| } | |
| async def attribute_intent(self, evidence_id: str, content: str) -> dict[str, Any]: | |
| """ | |
| Analyzes text/metadata to classify legal intent with detailed justification. | |
| Args: | |
| evidence_id: Evidence ID being analyzed | |
| content: Text content to analyze | |
| Returns: | |
| Intent analysis with legal theory mapping | |
| """ | |
| logger.info(f"Running Advanced Mens Rea analysis on evidence {evidence_id}") | |
| if not content: | |
| return {"evidence_id": evidence_id, "error": "No content provided"} | |
| content_lower = content.lower() | |
| # Find all markers | |
| found_markers = [] | |
| category_scores = {} | |
| for category, keywords in self.legal_lexicon.items(): | |
| matches = [kw for kw in keywords if kw in content_lower] | |
| if matches: | |
| found_markers.append( | |
| {"category": category, "keywords": matches, "count": len(matches)} | |
| ) | |
| # Score based on match density | |
| category_scores[category] = min(len(matches) / 3, 1.0) | |
| # Calculate weighted intent scores | |
| intent_scores = { | |
| "KNOWLEDGE": 0.0, | |
| "INTENT": 0.0, | |
| "WILLFUL_BLINDNESS": 0.0, | |
| "NEGLIGENCE": 0.0, | |
| } | |
| # Knowledge requires explicit awareness markers | |
| knowledge_markers = category_scores.get("knowledge", 0) | |
| intent_scores["KNOWLEDGE"] = min(0.3 + (knowledge_markers * 0.5), 0.95) | |
| # Intent requires avoidance + planning | |
| avoidance = category_scores.get("avoidance", 0) | |
| planning = category_scores.get("planning", 0) | |
| intent_scores["INTENT"] = min(0.2 + (avoidance * 0.4) + (planning * 0.3), 0.95) | |
| # Willful blindness = knowledge without action | |
| if knowledge_markers > 0.3 and avoidance < 0.2: | |
| intent_scores["WILLFUL_BLINDNESS"] = min(knowledge_markers * 0.7, 0.8) | |
| else: | |
| intent_scores["WILLFUL_BLINDNESS"] = 0.15 | |
| # Negligence is default low | |
| intent_scores["NEGLIGENCE"] = max(0.1, 0.5 - max(intent_scores.values())) | |
| # Determine primary intent | |
| primary_intent = max(intent_scores, key=intent_scores.get) | |
| confidence = intent_scores[primary_intent] | |
| # Generate legal theory | |
| legal_theory = self._generate_legal_theory(primary_intent, found_markers) | |
| return { | |
| "evidence_id": evidence_id, | |
| "primary_intent": primary_intent, | |
| "confidence": round(confidence, 2), | |
| "justification": { | |
| "summary": self._generate_summary(primary_intent, found_markers), | |
| "evidence_markers": found_markers, | |
| "legal_theory": legal_theory, | |
| }, | |
| "mens_rea_matrix": {k: round(v, 2) for k, v in intent_scores.items()}, | |
| "litigation_readiness": ( | |
| "HIGH" | |
| if confidence > 0.7 | |
| else "MODERATE" if confidence > 0.5 else "LOW" | |
| ), | |
| "admissibility_context": "Ref: Rule 403 (Probative value vs. Prejudice), Rule 404(b) (Prior acts)", | |
| "analyzed_at": datetime.now().isoformat(), | |
| } | |
| def _generate_summary(self, intent: str, markers: list[dict]) -> str: | |
| """Generate analysis summary.""" | |
| if not markers: | |
| return "Insufficient evidence markers for conclusive intent determination." | |
| [m["category"] for m in markers] | |
| if intent == "KNOWLEDGE": | |
| return "Evidence shows explicit awareness of regulatory requirements and potential violations." | |
| elif intent == "INTENT": | |
| return "Pattern of avoidance keywords combined with planning language suggests deliberate circumvention." | |
| elif intent == "WILLFUL_BLINDNESS": | |
| return "Subject demonstrated awareness without taking corrective action - potential willful blindness." | |
| else: | |
| return "No clear intentional misconduct detected. May represent negligence or oversight." | |
| def _generate_legal_theory(self, intent: str, markers: list[dict]) -> str: | |
| """Generate legal theory mapping.""" | |
| if intent == "INTENT": | |
| return "The proximity of 'avoidance' keywords to 'planning' language suggests a calculated attempt to circumvent AML controls. See United States v. MacPherson (intent inferred from pattern of conduct)." | |
| elif intent == "KNOWLEDGE": | |
| return "Explicit references to regulations and compliance requirements demonstrate actual knowledge. See Model Penal Code § 2.02(2)(b)." | |
| elif intent == "WILLFUL_BLINDNESS": | |
| return "Subject's awareness combined with failure to inquire further may constitute willful blindness. See Global-Tech Appliances v. SEB S.A." | |
| else: | |
| return "Standard of reasonable care analysis applies. See negligence elements under common law." | |
| class TemporalPairMatcher: | |
| """ | |
| Detects Mirror/Wash transactions (Equal & Opposite movements). | |
| Ref: VISION_10_10 Section 6 (Mirror Detection) | |
| """ | |
| def __init__(self, db_session): | |
| self.db = db_session | |
| async def find_mirror_pairs( | |
| self, | |
| account_id: str, | |
| threshold_seconds: int = 3600, | |
| amount_tolerance: float = 0.01, | |
| ) -> list[dict[str, Any]]: | |
| """ | |
| Identifies pairs of transactions that appear to 'cancel' each other out | |
| to artificially inflate volume or hide fund source. | |
| Args: | |
| account_id: Account to analyze | |
| threshold_seconds: Time window for pair matching | |
| amount_tolerance: Percentage tolerance for amount matching | |
| Returns: | |
| List of detected mirror pairs | |
| """ | |
| logger.info(f"Running Mirror Detection for account {account_id}") | |
| try: | |
| transactions = await self._get_transactions(account_id) | |
| if len(transactions) < 2: | |
| return [] | |
| pairs = [] | |
| matched = set() | |
| for i, tx1 in enumerate(transactions): | |
| if tx1["id"] in matched: | |
| continue | |
| for tx2 in transactions[i + 1 :]: | |
| if tx2["id"] in matched: | |
| continue | |
| # Check if mirror pair | |
| is_mirror, score = self._check_mirror( | |
| tx1, tx2, threshold_seconds, amount_tolerance | |
| ) | |
| if is_mirror: | |
| pairs.append( | |
| { | |
| "pair_id": f"pair_{hashlib.md5((tx1['id'] + tx2['id']).encode()).hexdigest()[:8]}", | |
| "tx1": { | |
| "id": tx1["id"], | |
| "amount": tx1["amount"], | |
| "type": "DEBIT" if tx1["amount"] < 0 else "CREDIT", | |
| "time": ( | |
| tx1["date"].strftime("%H:%M:%S") | |
| if hasattr(tx1["date"], "strftime") | |
| else str(tx1["date"]) | |
| ), | |
| }, | |
| "tx2": { | |
| "id": tx2["id"], | |
| "amount": tx2["amount"], | |
| "type": "DEBIT" if tx2["amount"] < 0 else "CREDIT", | |
| "time": ( | |
| tx2["date"].strftime("%H:%M:%S") | |
| if hasattr(tx2["date"], "strftime") | |
| else str(tx2["date"]) | |
| ), | |
| }, | |
| "score": round(score, 2), | |
| "time_gap_seconds": self._time_diff_seconds( | |
| tx1["date"], tx2["date"] | |
| ), | |
| "label": ( | |
| "Potential Wash Trade" | |
| if score > 0.9 | |
| else "Suspicious Pair" | |
| ), | |
| } | |
| ) | |
| matched.add(tx1["id"]) | |
| matched.add(tx2["id"]) | |
| break # Move to next tx1 | |
| # Sort by score | |
| pairs.sort(key=lambda x: x["score"], reverse=True) | |
| return pairs | |
| except Exception as e: | |
| logger.error(f"Mirror detection failed: {e}") | |
| return [] | |
| async def _get_transactions(self, account_id: str) -> list[dict]: | |
| """Get transactions from database.""" | |
| try: | |
| from core.database import Transaction | |
| txns = ( | |
| self.db.query(Transaction) | |
| .filter_by(account_id=account_id) | |
| .order_by(Transaction.created_at) | |
| .limit(500) | |
| .all() | |
| ) | |
| return [ | |
| {"id": str(t.id), "amount": t.amount, "date": t.created_at} | |
| for t in txns | |
| ] | |
| except Exception: | |
| return [] | |
| def _check_mirror( | |
| self, tx1: dict, tx2: dict, threshold_seconds: int, tolerance: float | |
| ) -> tuple: | |
| """Check if two transactions form a mirror pair.""" | |
| # Amounts must be opposite (or very close) | |
| amt1, amt2 = abs(tx1["amount"]), abs(tx2["amount"]) | |
| if amt1 == 0 or amt2 == 0: | |
| return False, 0.0 | |
| amount_diff = abs(amt1 - amt2) / max(amt1, amt2) | |
| if amount_diff > tolerance: | |
| return False, 0.0 | |
| # Must be opposite directions | |
| if (tx1["amount"] > 0) == (tx2["amount"] > 0): | |
| return False, 0.0 | |
| # Time check | |
| time_diff = self._time_diff_seconds(tx1["date"], tx2["date"]) | |
| if time_diff > threshold_seconds: | |
| return False, 0.0 | |
| # Calculate score | |
| amount_score = 1 - amount_diff | |
| time_score = 1 - (time_diff / threshold_seconds) | |
| score = (amount_score * 0.6) + (time_score * 0.4) | |
| return score > 0.7, score | |
| def _time_diff_seconds(self, date1, date2) -> int: | |
| """Calculate time difference in seconds.""" | |
| try: | |
| if isinstance(date1, datetime) and isinstance(date2, datetime): | |
| return abs(int((date2 - date1).total_seconds())) | |
| except Exception: | |
| pass | |
| return 0 | |
| # Global accessibility for services | |
| def get_forensic_intelligence(db): | |
| """Factory function for forensic intelligence services.""" | |
| return { | |
| "triangulation": TriangulationEngine(db), | |
| "libr": LIBRAlgorithm(db), | |
| "mens_rea": MensReaEngine(), | |
| "mirror_matcher": TemporalPairMatcher(db), | |
| } | |