""" Forensic Intelligence Service - Real Implementation Provides advanced forensic analysis capabilities for fraud investigation. """ import hashlib import logging from collections import defaultdict from datetime import datetime from typing import Any logger = logging.getLogger(__name__) class TriangulationEngine: """ Probabilistic unmasking of redacted transaction names using multi-source triangulation. Ref: VISION_10_10 Section 6 """ def __init__(self, db_session): self.db = db_session self._vendor_cache: dict[str, dict] = {} async def unmask_redaction( self, transaction_id: str, masked_name: str ) -> dict[str, Any]: """ Attempts to resolve '*' or partial names by looking at: 1. Similar amounts in other transactions. 2. Merchant frequency for the specific account. 3. External metadata linkages. Args: transaction_id: Transaction ID to unmask masked_name: The masked/redacted merchant name Returns: Unmasking results with confidence scores """ logger.info( f"Triangulating redacted merchant for txn {transaction_id}: {masked_name}" ) try: # Get transaction details transaction = await self._get_transaction(transaction_id) if not transaction: return { "transaction_id": transaction_id, "original_masked": masked_name, "resolved_name": None, "confidence_score": 0.0, "error": "Transaction not found", } suggestions = [] # Strategy 1: Amount-Pattern Match amount_matches = await self._find_by_amount(transaction.get("amount", 0)) for match in amount_matches: if match.get("merchant") and "*" not in match["merchant"]: suggestions.append( { "candidate": match["merchant"], "confidence": min( 0.7 + (0.1 * match.get("frequency", 1)), 0.95 ), "source": "Amount-Pattern Match", } ) # Strategy 2: Account History Sync history_matches = await self._find_by_account_history( transaction.get("account_id"), masked_name ) for match in history_matches: suggestions.append( { "candidate": match["merchant"], "confidence": min(0.8 + (0.05 * match.get("count", 1)), 0.95), "source": "Account-History Sync", } ) # Strategy 3: Pattern Recognition if masked_name and "*" in masked_name: pattern_matches = self._match_partial_pattern(masked_name) suggestions.extend(pattern_matches) # Deduplicate and sort by confidence seen = set() unique_suggestions = [] for s in sorted(suggestions, key=lambda x: x["confidence"], reverse=True): if s["candidate"] not in seen: seen.add(s["candidate"]) unique_suggestions.append(s) best_match = unique_suggestions[0] if unique_suggestions else None return { "transaction_id": transaction_id, "original_masked": masked_name, "resolved_name": best_match["candidate"] if best_match else None, "confidence_score": best_match["confidence"] if best_match else 0.0, "all_candidates": unique_suggestions[:5], "triangulation_logic": list({s["source"] for s in unique_suggestions}), "timestamp": datetime.now().isoformat(), } except Exception as e: logger.error(f"Triangulation failed for {transaction_id}: {e}") return { "transaction_id": transaction_id, "original_masked": masked_name, "resolved_name": None, "confidence_score": 0.0, "error": str(e), } async def _get_transaction(self, transaction_id: str) -> dict | None: """Get transaction from database.""" try: from core.database import Transaction txn = self.db.query(Transaction).filter_by(id=transaction_id).first() if txn: return { "id": str(txn.id), "amount": txn.amount, "account_id": getattr(txn, "account_id", None), "merchant": getattr(txn, "merchant", None), } except Exception: pass return None async def _find_by_amount(self, amount: float) -> list[dict]: """Find transactions with similar amounts.""" try: from core.database import Transaction tolerance = amount * 0.05 # 5% tolerance matches = ( self.db.query(Transaction) .filter( Transaction.amount.between(amount - tolerance, amount + tolerance) ) .limit(50) .all() ) # Group by merchant merchant_counts = defaultdict(int) for m in matches: if hasattr(m, "merchant") and m.merchant: merchant_counts[m.merchant] += 1 return [ {"merchant": k, "frequency": v} for k, v in sorted( merchant_counts.items(), key=lambda x: x[1], reverse=True ) ] except Exception: return [] async def _find_by_account_history( self, account_id: str, masked: str ) -> list[dict]: """Find matches from account transaction history.""" if not account_id: return [] try: from core.database import Transaction history = ( self.db.query(Transaction) .filter_by(account_id=account_id) .limit(100) .all() ) # Find unmasked merchants merchant_counts = defaultdict(int) for t in history: if hasattr(t, "merchant") and t.merchant and "*" not in t.merchant: merchant_counts[t.merchant] += 1 return [ {"merchant": k, "count": v} for k, v in sorted( merchant_counts.items(), key=lambda x: x[1], reverse=True )[:5] ] except Exception: return [] def _match_partial_pattern(self, masked: str) -> list[dict]: """Match partial patterns against known vendors.""" # Common vendor patterns known_patterns = { "AMZN*": "Amazon", "AMAZON*": "Amazon", "APPLE*": "Apple", "GOOGLE*": "Google", "PAYPAL*": "PayPal", "SQ *": "Square", "UBER*": "Uber", "LYFT*": "Lyft", "DOORDASH*": "DoorDash", } results = [] masked_upper = masked.upper() for pattern, vendor in known_patterns.items(): if masked_upper.startswith(pattern.replace("*", "")): results.append( { "candidate": vendor, "confidence": 0.85, "source": "Pattern Recognition", } ) return results class LIBRAlgorithm: """ Lowest Intermediate Balance Rule (LIBR) for tracking mixed funds. Used to detect illicit float in personal/business accounts. Ref: VISION_10_10 Section 6 """ def __init__(self, db_session): self.db = db_session def analyze_mixed_funds( self, account_id: str, start_date: datetime, end_date: datetime, suspected_illicit_deposits: list[str] | None = None, ) -> dict[str, Any]: """ Applies LIBR to distinguish between legitimate funds and illicit injections. The LIBR principle: The lowest balance reached between an illicit deposit and a subsequent withdrawal represents the maximum illicit funds in that withdrawal. Args: account_id: Account to analyze start_date: Analysis start date end_date: Analysis end date suspected_illicit_deposits: Optional list of suspected illicit deposit IDs Returns: LIBR analysis results """ logger.info(f"Applying LIBR Algorithm to account {account_id}") try: # Get chronological transactions transactions = self._get_transactions(account_id, start_date, end_date) if not transactions: return { "account_id": account_id, "status": "NO_DATA", "message": "No transactions found", } # Calculate running balance running_balance = 0.0 balance_history = [] for txn in transactions: running_balance += txn["amount"] balance_history.append( { "date": txn["date"], "amount": txn["amount"], "balance": running_balance, "is_suspected": txn["id"] in (suspected_illicit_deposits or []), } ) # Find minimum intermediate balances after suspected deposits libr_violations = [] illicit_float = 0.0 for i, entry in enumerate(balance_history): if entry["is_suspected"] and entry["amount"] > 0: # Find lowest balance after this deposit min_balance = entry["balance"] for j in range(i + 1, len(balance_history)): if balance_history[j]["balance"] < min_balance: min_balance = balance_history[j]["balance"] # LIBR violation if withdrawal occurred max_illicit = min( entry["amount"], max(0, entry["balance"] - min_balance) ) if max_illicit > 0: libr_violations.append( { "deposit_date": str(entry["date"]), "deposit_amount": entry["amount"], "min_subsequent_balance": min_balance, "max_illicit_withdrawn": max_illicit, } ) illicit_float += max_illicit # Calculate commingling ratio total_deposits = sum(t["amount"] for t in transactions if t["amount"] > 0) commingling_ratio = ( illicit_float / total_deposits if total_deposits > 0 else 0 ) # Determine risk status if commingling_ratio > 0.5: status = "HIGH_RISK" elif commingling_ratio > 0.2: status = "MEDIUM_RISK" elif libr_violations: status = "LOW_RISK" else: status = "CLEAN" return { "account_id": account_id, "period": f"{start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}", "commingling_ratio": round(commingling_ratio, 2), "illicit_float_detected": round(illicit_float, 2), "libr_violation_count": len(libr_violations), "status": status, "violations": libr_violations[:10], "findings": self._generate_findings(commingling_ratio, libr_violations), "analyzed_at": datetime.now().isoformat(), } except Exception as e: logger.error(f"LIBR analysis failed: {e}") return {"account_id": account_id, "status": "ERROR", "error": str(e)} def _get_transactions( self, account_id: str, start_date: datetime, end_date: datetime ) -> list[dict]: """Get transactions from database.""" try: from core.database import Transaction txns = ( self.db.query(Transaction) .filter( Transaction.account_id == account_id, Transaction.created_at.between(start_date, end_date), ) .order_by(Transaction.created_at) .all() ) return [ {"id": str(t.id), "amount": t.amount, "date": t.created_at} for t in txns ] except Exception: return [] def _generate_findings(self, ratio: float, violations: list[dict]) -> str: """Generate human-readable findings.""" if not violations: return "No LIBR violations detected. Funds appear segregated." if ratio > 0.5: return f"High commingling detected ({ratio * 100:.0f}%). Multiple illicit deposits followed by withdrawals before balance separation." elif ratio > 0.2: return f"Moderate commingling ({ratio * 100:.0f}%). Some potential structuring activity detected." else: return f"Low commingling ({ratio * 100:.0f}%). Minor LIBR violations detected but funds largely traceable." class MensReaEngine: """ Theory of Intent (Mens Rea) Engine. AI classifiers that map evidence to Knowledge, Intent, or Willful Blindness. Ref: VISION_10_10 Section 5 """ def __init__(self, ai_service=None): self.ai_service = ai_service self.legal_lexicon = { "avoidance": [ "bypass", "limit", "threshold", "split", "smurf", "avoid", "circumvent", "evade", "dodge", "structure", ], "obfuscation": [ "hide", "mask", "proxy", "nominee", "shell", "offshore", "anonymous", "conceal", "disguise", "launder", ], "knowledge": [ "aware", "understand", "policy", "regulation", "illegal", "know", "recognize", "acknowledge", "realize", "compliance", ], "planning": [ "plan", "schedule", "arrange", "organize", "coordinate", "timing", "sequence", "prepare", "strategy", ], } self.intent_weights = { "knowledge": 0.25, "avoidance": 0.35, "obfuscation": 0.30, "planning": 0.10, } async def attribute_intent(self, evidence_id: str, content: str) -> dict[str, Any]: """ Analyzes text/metadata to classify legal intent with detailed justification. Args: evidence_id: Evidence ID being analyzed content: Text content to analyze Returns: Intent analysis with legal theory mapping """ logger.info(f"Running Advanced Mens Rea analysis on evidence {evidence_id}") if not content: return {"evidence_id": evidence_id, "error": "No content provided"} content_lower = content.lower() # Find all markers found_markers = [] category_scores = {} for category, keywords in self.legal_lexicon.items(): matches = [kw for kw in keywords if kw in content_lower] if matches: found_markers.append( {"category": category, "keywords": matches, "count": len(matches)} ) # Score based on match density category_scores[category] = min(len(matches) / 3, 1.0) # Calculate weighted intent scores intent_scores = { "KNOWLEDGE": 0.0, "INTENT": 0.0, "WILLFUL_BLINDNESS": 0.0, "NEGLIGENCE": 0.0, } # Knowledge requires explicit awareness markers knowledge_markers = category_scores.get("knowledge", 0) intent_scores["KNOWLEDGE"] = min(0.3 + (knowledge_markers * 0.5), 0.95) # Intent requires avoidance + planning avoidance = category_scores.get("avoidance", 0) planning = category_scores.get("planning", 0) intent_scores["INTENT"] = min(0.2 + (avoidance * 0.4) + (planning * 0.3), 0.95) # Willful blindness = knowledge without action if knowledge_markers > 0.3 and avoidance < 0.2: intent_scores["WILLFUL_BLINDNESS"] = min(knowledge_markers * 0.7, 0.8) else: intent_scores["WILLFUL_BLINDNESS"] = 0.15 # Negligence is default low intent_scores["NEGLIGENCE"] = max(0.1, 0.5 - max(intent_scores.values())) # Determine primary intent primary_intent = max(intent_scores, key=intent_scores.get) confidence = intent_scores[primary_intent] # Generate legal theory legal_theory = self._generate_legal_theory(primary_intent, found_markers) return { "evidence_id": evidence_id, "primary_intent": primary_intent, "confidence": round(confidence, 2), "justification": { "summary": self._generate_summary(primary_intent, found_markers), "evidence_markers": found_markers, "legal_theory": legal_theory, }, "mens_rea_matrix": {k: round(v, 2) for k, v in intent_scores.items()}, "litigation_readiness": ( "HIGH" if confidence > 0.7 else "MODERATE" if confidence > 0.5 else "LOW" ), "admissibility_context": "Ref: Rule 403 (Probative value vs. Prejudice), Rule 404(b) (Prior acts)", "analyzed_at": datetime.now().isoformat(), } def _generate_summary(self, intent: str, markers: list[dict]) -> str: """Generate analysis summary.""" if not markers: return "Insufficient evidence markers for conclusive intent determination." [m["category"] for m in markers] if intent == "KNOWLEDGE": return "Evidence shows explicit awareness of regulatory requirements and potential violations." elif intent == "INTENT": return "Pattern of avoidance keywords combined with planning language suggests deliberate circumvention." elif intent == "WILLFUL_BLINDNESS": return "Subject demonstrated awareness without taking corrective action - potential willful blindness." else: return "No clear intentional misconduct detected. May represent negligence or oversight." def _generate_legal_theory(self, intent: str, markers: list[dict]) -> str: """Generate legal theory mapping.""" if intent == "INTENT": return "The proximity of 'avoidance' keywords to 'planning' language suggests a calculated attempt to circumvent AML controls. See United States v. MacPherson (intent inferred from pattern of conduct)." elif intent == "KNOWLEDGE": return "Explicit references to regulations and compliance requirements demonstrate actual knowledge. See Model Penal Code ยง 2.02(2)(b)." elif intent == "WILLFUL_BLINDNESS": return "Subject's awareness combined with failure to inquire further may constitute willful blindness. See Global-Tech Appliances v. SEB S.A." else: return "Standard of reasonable care analysis applies. See negligence elements under common law." class TemporalPairMatcher: """ Detects Mirror/Wash transactions (Equal & Opposite movements). Ref: VISION_10_10 Section 6 (Mirror Detection) """ def __init__(self, db_session): self.db = db_session async def find_mirror_pairs( self, account_id: str, threshold_seconds: int = 3600, amount_tolerance: float = 0.01, ) -> list[dict[str, Any]]: """ Identifies pairs of transactions that appear to 'cancel' each other out to artificially inflate volume or hide fund source. Args: account_id: Account to analyze threshold_seconds: Time window for pair matching amount_tolerance: Percentage tolerance for amount matching Returns: List of detected mirror pairs """ logger.info(f"Running Mirror Detection for account {account_id}") try: transactions = await self._get_transactions(account_id) if len(transactions) < 2: return [] pairs = [] matched = set() for i, tx1 in enumerate(transactions): if tx1["id"] in matched: continue for tx2 in transactions[i + 1 :]: if tx2["id"] in matched: continue # Check if mirror pair is_mirror, score = self._check_mirror( tx1, tx2, threshold_seconds, amount_tolerance ) if is_mirror: pairs.append( { "pair_id": f"pair_{hashlib.md5((tx1['id'] + tx2['id']).encode()).hexdigest()[:8]}", "tx1": { "id": tx1["id"], "amount": tx1["amount"], "type": "DEBIT" if tx1["amount"] < 0 else "CREDIT", "time": ( tx1["date"].strftime("%H:%M:%S") if hasattr(tx1["date"], "strftime") else str(tx1["date"]) ), }, "tx2": { "id": tx2["id"], "amount": tx2["amount"], "type": "DEBIT" if tx2["amount"] < 0 else "CREDIT", "time": ( tx2["date"].strftime("%H:%M:%S") if hasattr(tx2["date"], "strftime") else str(tx2["date"]) ), }, "score": round(score, 2), "time_gap_seconds": self._time_diff_seconds( tx1["date"], tx2["date"] ), "label": ( "Potential Wash Trade" if score > 0.9 else "Suspicious Pair" ), } ) matched.add(tx1["id"]) matched.add(tx2["id"]) break # Move to next tx1 # Sort by score pairs.sort(key=lambda x: x["score"], reverse=True) return pairs except Exception as e: logger.error(f"Mirror detection failed: {e}") return [] async def _get_transactions(self, account_id: str) -> list[dict]: """Get transactions from database.""" try: from core.database import Transaction txns = ( self.db.query(Transaction) .filter_by(account_id=account_id) .order_by(Transaction.created_at) .limit(500) .all() ) return [ {"id": str(t.id), "amount": t.amount, "date": t.created_at} for t in txns ] except Exception: return [] def _check_mirror( self, tx1: dict, tx2: dict, threshold_seconds: int, tolerance: float ) -> tuple: """Check if two transactions form a mirror pair.""" # Amounts must be opposite (or very close) amt1, amt2 = abs(tx1["amount"]), abs(tx2["amount"]) if amt1 == 0 or amt2 == 0: return False, 0.0 amount_diff = abs(amt1 - amt2) / max(amt1, amt2) if amount_diff > tolerance: return False, 0.0 # Must be opposite directions if (tx1["amount"] > 0) == (tx2["amount"] > 0): return False, 0.0 # Time check time_diff = self._time_diff_seconds(tx1["date"], tx2["date"]) if time_diff > threshold_seconds: return False, 0.0 # Calculate score amount_score = 1 - amount_diff time_score = 1 - (time_diff / threshold_seconds) score = (amount_score * 0.6) + (time_score * 0.4) return score > 0.7, score def _time_diff_seconds(self, date1, date2) -> int: """Calculate time difference in seconds.""" try: if isinstance(date1, datetime) and isinstance(date2, datetime): return abs(int((date2 - date1).total_seconds())) except Exception: pass return 0 # Global accessibility for services def get_forensic_intelligence(db): """Factory function for forensic intelligence services.""" return { "triangulation": TriangulationEngine(db), "libr": LIBRAlgorithm(db), "mens_rea": MensReaEngine(), "mirror_matcher": TemporalPairMatcher(db), }