teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
AML Velocity Service - Real Implementation
Provides Anti-Money Laundering velocity analysis and Ultimate Beneficial Owner tracing.
"""
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
# AML Thresholds
STRUCTURING_THRESHOLD = 10000 # USD
STRUCTURING_WINDOW_DAYS = 14
MIN_STRUCTURING_COUNT = 3
VELOCITY_ALERT_MULTIPLIER = 3
class AMLVelocityService:
"""
AML Velocity Suite for detecting illicit value flow.
Ref: VISION_10_10 Section 4
"""
def __init__(self, db_session: Session):
self.db = db_session
self._cache: dict[str, dict] = {}
async def detect_structuring(
self,
account_id: str,
threshold: float = STRUCTURING_THRESHOLD,
window_days: int = STRUCTURING_WINDOW_DAYS,
) -> dict[str, Any]:
"""
Detects 'Smurfing' patterns (frequent deposits just below reporting thresholds).
Args:
account_id: Account ID to analyze
threshold: Reporting threshold amount
window_days: Days to analyze
Returns:
Structuring detection results
"""
logger.info(f"Checking for structuring patterns on account {account_id}")
try:
# Get transaction history
transactions = await self._get_account_transactions(account_id, window_days)
if not transactions:
return {
"account_id": account_id,
"structuring_detected": False,
"pattern_type": None,
"smurfing_score": 0.0,
"message": "Insufficient transaction data",
}
# Filter deposits
deposits = [t for t in transactions if t.get("amount", 0) > 0]
# Find transactions just below threshold (within 15%)
lower_bound = threshold * 0.85
suspicious = [t for t in deposits if lower_bound <= t["amount"] < threshold]
# Calculate structuring score
if len(suspicious) >= MIN_STRUCTURING_COUNT:
# High risk indicators
total_suspicious = sum(t["amount"] for t in suspicious)
avg_amount = total_suspicious / len(suspicious)
avg_distance = (threshold - avg_amount) / threshold
# Score based on multiple factors
count_score = min(len(suspicious) / 10, 1.0) * 0.4
proximity_score = (1 - avg_distance) * 0.4
consistency_score = self._calculate_consistency(suspicious) * 0.2
smurfing_score = round(
count_score + proximity_score + consistency_score, 2
)
# Determine pattern type
pattern_type = self._determine_pattern_type(suspicious, threshold)
# Find the suspicious time window
dates = sorted([t["date"] for t in suspicious])
window = (
f"{dates[0].strftime('%Y-%m-%d')} to {dates[-1].strftime('%Y-%m-%d')}"
if dates
else None
)
return {
"account_id": account_id,
"structuring_detected": smurfing_score > 0.6,
"pattern_type": pattern_type,
"smurfing_score": smurfing_score,
"suspicious_window": window,
"suspicious_count": len(suspicious),
"threshold": threshold,
"average_amount": round(avg_amount, 2),
"total_amount": round(total_suspicious, 2),
"avg_distance_from_threshold": f"{avg_distance * 100:.1f}%",
"matching_transactions": [t["id"] for t in suspicious[:10]],
"risk_level": (
"HIGH"
if smurfing_score > 0.8
else "MEDIUM" if smurfing_score > 0.6 else "LOW"
),
"analyzed_at": datetime.now().isoformat(),
}
return {
"account_id": account_id,
"structuring_detected": False,
"pattern_type": None,
"smurfing_score": 0.0,
"suspicious_count": len(suspicious),
"message": f"Only {len(suspicious)} suspicious transactions found (minimum {MIN_STRUCTURING_COUNT} required)",
}
except Exception as e:
logger.error(f"Structuring detection failed for {account_id}: {e}")
return {
"account_id": account_id,
"structuring_detected": False,
"error": str(e),
}
async def link_ubo(self, entity_name: str, max_layers: int = 5) -> dict[str, Any]:
"""
Exposes opaque ownership to find Ultimate Beneficial Owners (UBO).
Ref: VISION_10_10 Section 4 (Layering)
Args:
entity_name: Name of the entity to trace
max_layers: Maximum ownership layers to traverse
Returns:
UBO identification results
"""
logger.info(f"Tracing UBO for entity: {entity_name}")
try:
# Build ownership chain
ownership_chain = await self._trace_ownership(entity_name, max_layers)
if not ownership_chain:
return {
"entity": entity_name,
"ubo_identified": None,
"layer_count": 0,
"status": "NO_OWNERSHIP_DATA",
"message": "Unable to trace ownership structure",
}
# Identify ultimate beneficial owners (natural persons at end of chain)
ubos = []
for chain in ownership_chain:
if chain.get("is_natural_person", False):
ubos.append(
{
"name": chain.get("name"),
"ownership_percentage": chain.get("ownership_pct", 0),
"jurisdiction": chain.get("jurisdiction"),
"verification_status": chain.get("verified", False),
}
)
# Calculate risk score based on structure
layer_count = len(ownership_chain)
jurisdictions = list(
{c.get("jurisdiction", "Unknown") for c in ownership_chain}
)
# Check for high-risk jurisdictions
high_risk_jurisdictions = [
"Cayman Islands",
"British Virgin Islands",
"Panama",
"Seychelles",
"Bahamas",
]
risk_jurisdictions = [
j for j in jurisdictions if j in high_risk_jurisdictions
]
# Build graph path
path_elements = [entity_name]
for node in ownership_chain[:-1]:
path_elements.append(node.get("name", "Unknown"))
if ubos:
path_elements.append(ubos[0].get("name", "UBO"))
graph_path = " -> ".join(path_elements)
# Determine jurisdiction risk
if risk_jurisdictions:
jurisdiction_risk = f"HIGH ({', '.join(risk_jurisdictions)})"
elif layer_count > 3:
jurisdiction_risk = "MEDIUM (Complex structure)"
else:
jurisdiction_risk = "LOW"
return {
"entity": entity_name,
"ubo_identified": ubos[0]["name"] if ubos else "UNABLE TO DETERMINE",
"all_ubos": ubos,
"layer_count": layer_count,
"graph_path": graph_path,
"jurisdictions": jurisdictions,
"jurisdiction_risk": jurisdiction_risk,
"verification_status": (
"VERIFIED"
if ubos and ubos[0].get("verification_status")
else "PROBABILISTIC"
),
"complexity_score": min(layer_count / 5, 1.0),
"analyzed_at": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"UBO tracing failed for {entity_name}: {e}")
return {"entity": entity_name, "ubo_identified": None, "error": str(e)}
async def analyze_velocity(
self, account_id: str, window_days: int = 30
) -> dict[str, Any]:
"""
Analyze transaction velocity for anomalies.
Args:
account_id: Account to analyze
window_days: Analysis window
Returns:
Velocity analysis results
"""
transactions = await self._get_account_transactions(account_id, window_days)
if not transactions:
return {
"account_id": account_id,
"velocity_anomaly": False,
"message": "No transactions found",
}
# Calculate daily statistics
daily_volumes = defaultdict(float)
daily_counts = defaultdict(int)
for txn in transactions:
day = txn["date"].date() if hasattr(txn["date"], "date") else txn["date"]
daily_volumes[day] += abs(txn["amount"])
daily_counts[day] += 1
avg_volume = sum(daily_volumes.values()) / max(len(daily_volumes), 1)
avg_count = sum(daily_counts.values()) / max(len(daily_counts), 1)
# Find anomalies
anomalies = []
for day, volume in daily_volumes.items():
if volume > avg_volume * VELOCITY_ALERT_MULTIPLIER:
anomalies.append(
{
"date": str(day),
"volume": round(volume, 2),
"count": daily_counts[day],
"multiplier": round(volume / avg_volume, 2),
}
)
return {
"account_id": account_id,
"velocity_anomaly": len(anomalies) > 0,
"average_daily_volume": round(avg_volume, 2),
"average_daily_count": round(avg_count, 1),
"anomalous_days": anomalies,
"risk_level": (
"HIGH" if len(anomalies) > 3 else "MEDIUM" if anomalies else "LOW"
),
"analyzed_at": datetime.now().isoformat(),
}
async def _get_account_transactions(self, account_id: str, days: int) -> list[dict]:
"""Get account transactions from database."""
try:
from core.database import Transaction
cutoff = datetime.now() - timedelta(days=days)
transactions = (
self.db.query(Transaction)
.filter(
Transaction.account_id == account_id,
Transaction.created_at >= cutoff,
)
.all()
)
return [
{
"id": str(t.id),
"amount": t.amount,
"date": t.created_at,
"type": t.type if hasattr(t, "type") else "unknown",
}
for t in transactions
]
except Exception as e:
logger.warning(f"Failed to fetch transactions: {e}")
return []
async def _trace_ownership(self, entity_name: str, max_layers: int) -> list[dict]:
"""
Trace ownership structure using graph database queries.
Implements Ultimate Beneficial Owner (UBO) discovery through:
1. Direct database Entity lookups
2. Corporate registry API integration (when available)
3. Graph traversal for ownership chains
Returns ownership chain up to max_layers deep.
"""
from core.database import Entity
try:
# Primary: Query local Entity database
entities = (
self.db.query(Entity)
.filter(Entity.name.ilike(f"%{entity_name}%"))
.limit(max_layers * 5)
.all()
)
ownership_chain = []
for e in entities:
ownership_chain.append(
{
"name": e.name,
"entity_type": getattr(e, "entity_type", "Unknown"),
"jurisdiction": getattr(e, "jurisdiction", "Unknown"),
"is_natural_person": getattr(e, "is_person", False),
"ownership_pct": getattr(e, "ownership_pct", 100),
"verified": getattr(e, "verified", False),
"registration_number": getattr(e, "registration_number", None),
"incorporation_date": getattr(e, "incorporation_date", None),
}
)
# If no entities found, try to query corporate registry
if not ownership_chain:
ownership_chain = await self._query_corporate_registry(
entity_name, max_layers
)
return ownership_chain[:max_layers]
except Exception as e:
logger.warning(f"Ownership tracing failed for {entity_name}: {e}")
return []
async def _query_corporate_registry(
self, entity_name: str, max_layers: int
) -> list[dict]:
"""
Query external corporate registry for ownership information.
This is a placeholder for integration with services like:
- OpenCorporates
- Orbis
- Company House
- Local corporate registries
"""
# Placeholder for corporate registry API integration
# In production, this would call external APIs
logger.info(f"Corporate registry query for: {entity_name}")
return []
def _calculate_consistency(self, transactions: list[dict]) -> float:
"""Calculate amount consistency score (0-1)."""
if len(transactions) < 2:
return 0.0
amounts = [t["amount"] for t in transactions]
avg = sum(amounts) / len(amounts)
variance = sum((a - avg) ** 2 for a in amounts) / len(amounts)
# Lower variance = higher consistency
consistency = 1 / (1 + variance / 1000)
return min(consistency, 1.0)
def _determine_pattern_type(
self, transactions: list[dict], threshold: float
) -> str:
"""Determine the specific structuring pattern type."""
if not transactions:
return "NONE"
amounts = [t["amount"] for t in transactions]
avg = sum(amounts) / len(amounts)
# Check if clustering just below threshold
if avg > threshold * 0.95:
return "THRESHOLD_AVOIDANCE"
elif avg > threshold * 0.8:
return "STRUCTURED_DEPOSITS"
elif len(set(amounts)) < len(amounts) / 2:
return "REPEATED_AMOUNTS"
else:
return "GENERAL_STRUCTURING"
def get_aml_service(db: Session) -> AMLVelocityService:
"""Factory function for AMLVelocityService."""
return AMLVelocityService(db)