Spaces:
Paused
Paused
| from datetime import datetime | |
| from typing import Any | |
| from fastapi import APIRouter, Depends, HTTPException | |
| from pydantic import BaseModel | |
| from sqlalchemy import text | |
| from sqlalchemy.orm import Session | |
| from app.services.infrastructure.auth_service import auth_service | |
| from app.services.intelligence.aml_service import get_aml_service | |
| from app.services.intelligence.behavior_engine import get_behavior_service | |
| from app.services.intelligence.coc_service import get_coc_service | |
| from app.services.intelligence.forensic_intelligence import get_forensic_intelligence | |
| from app.services.intelligence.juridical_anchor import get_juridical_anchor | |
| from app.services.intelligence.zenith_horizon import get_zenith_horizon | |
| from app.services.intelligence.zenith_scoring import get_zenith_scoring | |
| from core.database import get_db | |
| router = APIRouter( | |
| tags=["Forensic Intelligence"], | |
| responses={404: {"description": "Not found"}}, | |
| ) | |
| # --- Request Models --- | |
| class RedactionRequest(BaseModel): | |
| transaction_id: str | |
| masked_name: str | |
| class LIBRRequest(BaseModel): | |
| account_id: str | |
| start_date: datetime | |
| end_date: datetime | |
| class IntentRequest(BaseModel): | |
| evidence_id: str | |
| content: str | |
| # --- Endpoints --- | |
| async def triangulate_redaction( | |
| request: RedactionRequest, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Unmask redacted transaction names using probabilistic triangulation.""" | |
| intel = get_forensic_intelligence(db) | |
| return await intel["triangulation"].unmask_redaction( | |
| request.transaction_id, request.masked_name | |
| ) | |
| async def run_libr_analysis( | |
| request: LIBRRequest, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Analyze mixed funds (personal/business) using the Lowest Intermediate Balance Rule.""" | |
| intel = get_forensic_intelligence(db) | |
| return intel["libr"].analyze_mixed_funds( | |
| request.account_id, request.start_date, request.end_date | |
| ) | |
| async def attribute_intent( | |
| request: IntentRequest, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Determine Mens Rea (Theory of Intent) from forensic evidence.""" | |
| intel = get_forensic_intelligence(db) | |
| return await intel["mens_rea"].attribute_intent( | |
| request.evidence_id, request.content | |
| ) | |
| async def mirror_detection( | |
| account_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Detect equal and opposite 'mirror' transactions.""" | |
| intel = get_forensic_intelligence(db) | |
| return await intel["mirror_matcher"].find_mirror_pairs(account_id) | |
| async def get_zenith_score( | |
| project_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Get the overall Zenith health score for a project.""" | |
| scoring = get_zenith_scoring(db) | |
| return await scoring["scorecard"].calculate_project_score(project_id) | |
| async def validate_imputation( | |
| original: dict[str, Any], | |
| imputed: dict[str, Any], | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Validate AI-imputed forensic data for legal resilience.""" | |
| scoring = get_zenith_scoring(db) | |
| return scoring["validator"].validate_imputation(original, imputed) | |
| async def check_structuring( | |
| account_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Check for structuring (Smurfing) patterns.""" | |
| aml = get_aml_service(db) | |
| return await aml.detect_structuring(account_id) | |
| async def trace_ubo( | |
| entity_name: str, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Trace Ultimate Beneficial Owners (UBO) using layering analysis.""" | |
| aml = get_aml_service(db) | |
| return await aml.link_ubo(entity_name) | |
| async def sign_forensic_report( | |
| project_id: str, | |
| content: str, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Sign a forensic report with Post-Quantum resistant signatures.""" | |
| anchor = get_juridical_anchor(db) | |
| return await anchor.sign_report(project_id, content) | |
| async def zenith_federated_sync( | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Synchronize local knowledge with the Federated Forensic Mesh.""" | |
| horizon = get_zenith_horizon() | |
| return await horizon["federated"].synchronize_weights() | |
| async def zenith_shield_verify( | |
| artifact_id: str, current_user: Any = Depends(auth_service.get_current_user) | |
| ): | |
| """Verify artifact integrity using Adversarial Forensic Shield.""" | |
| horizon = get_zenith_horizon() | |
| return await horizon["adversarial"].verify_artifact(artifact_id) | |
| async def zenith_autonomous_hunt( | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Execute Autonomous Forensic Hunting Agents.""" | |
| horizon = get_zenith_horizon() | |
| return await horizon["autonomous"].run_discovery_cycle() | |
| async def get_evidence_coc( | |
| evidence_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Retrieve the litigation-grade Chain of Custody for an evidence item.""" | |
| query = "SELECT evidence_metadata FROM evidence WHERE id = :id" | |
| row = db.execute(text(query), {"id": evidence_id}).fetchone() | |
| if not row: | |
| raise HTTPException(status_code=404, detail="Evidence not found") | |
| import json | |
| metadata = json.loads(row[0]) if row[0] else {} | |
| return metadata.get("chain_of_custody", []) | |
| async def log_coc_event( | |
| evidence_id: str, | |
| action: str, | |
| notes: str | None = None, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Log a new Chain of Custody event.""" | |
| coc = get_coc_service(db) | |
| return await coc.log_event(evidence_id, action, current_user.id, notes) | |
| async def get_behavior_baseline( | |
| account_id: str, | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Get the calculated behavior baseline for an account.""" | |
| behavior = get_behavior_service(db) | |
| return await behavior.get_account_baseline(account_id) | |
| async def check_behavior_anomalies( | |
| account_id: str, | |
| transactions: list[dict[str, Any]], | |
| db: Session = Depends(get_db), | |
| current_user: Any = Depends(auth_service.get_current_user), | |
| ): | |
| """Check a batch of transactions against the behavior baseline.""" | |
| behavior = get_behavior_service(db) | |
| return await behavior.detect_anomalies(account_id, transactions) | |