Spaces:
Sleeping
Sleeping
| """ | |
| ForensiX AI - Digital Stratigraphy & Forensic Intelligence Engine | |
| ================================================================ | |
| IMPLEMENTED FEATURES: | |
| 1. Digital Stratigraphy Engine | |
| 2. FEAT Multi-Agent System (7 agents) | |
| 3. Property Graph Intelligence (NetworkX) | |
| 4. Dual-Mode TOD Estimation (Henssge + Metabolomic AI) | |
| 5. Explainable AI (SHAP-style attribution) | |
| 6. Blockchain Chain-of-Custody (SHA-256) | |
| 7. Natural Language Investigation Queries | |
| 8. Smart Evidence Prioritization | |
| 9. Cross-Case Intelligence Matching | |
| 10. Anomaly Detection | |
| """ | |
| import hashlib, json, uuid, re, math | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from dataclasses import dataclass | |
| import numpy as np | |
| from scipy.optimize import brentq | |
| from sklearn.ensemble import IsolationForest | |
| import networkx as nx | |
| # ═══ BLOCKCHAIN CHAIN-OF-CUSTODY ═══ | |
| class EvidenceBlock: | |
| def __init__(self, index, evidence_id, evidence_type, content_hash, metadata, previous_hash): | |
| self.index = index | |
| self.timestamp = datetime.now().isoformat() | |
| self.evidence_id = evidence_id | |
| self.evidence_type = evidence_type | |
| self.content_hash = content_hash | |
| self.metadata = metadata | |
| self.previous_hash = previous_hash | |
| self.hash = self.calculate_hash() | |
| def calculate_hash(self): | |
| block_string = json.dumps({"index": self.index, "timestamp": self.timestamp, | |
| "evidence_id": self.evidence_id, "content_hash": self.content_hash, | |
| "previous_hash": self.previous_hash}, sort_keys=True) | |
| return hashlib.sha256(block_string.encode()).hexdigest() | |
| def to_dict(self): | |
| return {"index": self.index, "timestamp": self.timestamp, "evidence_id": self.evidence_id, | |
| "type": self.evidence_type, "content_hash": self.content_hash, | |
| "block_hash": self.hash, "previous_hash": self.previous_hash} | |
| class ChainOfCustody: | |
| def __init__(self): | |
| self.chain = [] | |
| genesis = EvidenceBlock(0, "GENESIS", "system", "0"*64, {}, "0"*64) | |
| self.chain.append(genesis) | |
| def add_evidence(self, evidence_id, evidence_type, content, metadata=None): | |
| content_hash = hashlib.sha256(content.encode()).hexdigest() | |
| block = EvidenceBlock(len(self.chain), evidence_id, evidence_type, content_hash, metadata or {}, self.chain[-1].hash) | |
| self.chain.append(block) | |
| return block | |
| def verify_integrity(self): | |
| issues = [] | |
| for i in range(1, len(self.chain)): | |
| if self.chain[i].previous_hash != self.chain[i-1].hash: | |
| issues.append(f"Block {i}: chain broken") | |
| return len(issues) == 0, issues | |
| # ═══ PROPERTY GRAPH ENGINE ═══ | |
| class ForensicGraph: | |
| def __init__(self): | |
| self.G = nx.DiGraph() | |
| def add_entity(self, entity_id, entity_type, properties=None): | |
| self.G.add_node(entity_id, type=entity_type, **(properties or {})) | |
| def add_relationship(self, source, target, rel_type, properties=None): | |
| self.G.add_edge(source, target, type=rel_type, **(properties or {})) | |
| def find_connections(self, entity_id, max_hops=3): | |
| if entity_id not in self.G: return [] | |
| connections = [] | |
| for target in nx.single_source_shortest_path(self.G, entity_id, cutoff=max_hops): | |
| if target != entity_id: | |
| path = nx.shortest_path(self.G, entity_id, target) | |
| connections.append({"source": entity_id, "target": target, "hops": len(path)-1, "path": path}) | |
| return sorted(connections, key=lambda x: x["hops"]) | |
| def get_suspicious_patterns(self): | |
| patterns = [] | |
| for node, degree in self.G.degree(): | |
| if degree >= 4: | |
| patterns.append({"type": "hub_entity", "node": node, "degree": degree, | |
| "significance": "HIGH", "description": f"Entity '{node}' has {degree} connections"}) | |
| return patterns | |
| def to_visualization_data(self): | |
| nodes = [{"id": n, "type": self.G.nodes[n].get("type", "unknown")} for n in self.G.nodes()] | |
| edges = [{"source": u, "target": v, "type": self.G.edges[u,v].get("type", "")} for u,v in self.G.edges()] | |
| return {"nodes": nodes, "edges": edges} | |
| # ═══ MULTI-AGENT SYSTEM ═══ | |
| class AgentResult: | |
| def __init__(self, agent_name, findings, confidence, summary): | |
| self.agent_name = agent_name | |
| self.findings = findings | |
| self.confidence = confidence | |
| self.summary = summary | |
| def to_dict(self): | |
| return {"agent": self.agent_name, "findings": self.findings, "confidence": self.confidence, "summary": self.summary} | |
| class AutopsyAgent: | |
| NAME = "Autopsy Analysis Agent" | |
| PATTERNS = { | |
| "CAUSE_OF_DEATH": [r"(?i)cause\s+of\s+death[:\s]*([^\n.]{5,150})"], | |
| "MANNER_OF_DEATH": [r"(?i)manner\s+of\s+death[:\s]*(homicide|suicide|accident(?:al)?|natural|undetermined)"], | |
| "INJURY": [r"(?i)(blunt\s+force\s+trauma[^\n.,]{0,80})", r"(?i)(gunshot\s+wound[^\n.,]{0,60})", | |
| r"(?i)(defensive\s+wounds?[^\n.,]{0,80})", r"(?i)(petechial\s+hemorrhages?[^\n.,]{0,60})", | |
| r"(?i)(ligature\s+mark[^\n.,]{0,80})", r"(?i)(subdural\s+hematoma[^\n.,]{0,60})", | |
| r"(?i)(contusion[^\n.,]{0,60})", r"(?i)(fracture[^\n.,]{0,60})", r"(?i)(stab\s+wound[^\n.,]{0,60})"], | |
| "TOXICOLOGY": [r"(?i)(blood\s+alcohol[:\s]*\d+\.\d+\s*g/dL[^\n.,]*)", | |
| r"(?i)(benzodiazepines?[:\s]*[^\n.,]{0,60})", r"(?i)(no\s+illicit\s+substances?\s+detected)"], | |
| "EVIDENCE": [r"(?i)(DNA\s+(?:analysis|collected)[^\n.,]{0,60})", r"(?i)(foreign\s+fibers?[^\n.,]{0,60})"], | |
| } | |
| def analyze(self, text): | |
| findings = [] | |
| for cat, patterns in self.PATTERNS.items(): | |
| for p in patterns: | |
| for m in re.finditer(p, text): | |
| ent = (m.group(1) if m.lastindex else m.group(0)).strip() | |
| if len(ent) >= 3: findings.append({"category": cat, "text": ent, "confidence": 0.85}) | |
| injuries = [f for f in findings if f["category"] == "INJURY"] | |
| manner = [f["text"] for f in findings if f["category"] == "MANNER_OF_DEATH"] | |
| return AgentResult(self.NAME, findings, 0.85, f"{len(findings)} entities, {len(injuries)} injuries, manner: {manner[0] if manner else 'undetermined'}") | |
| class TimelineAgent: | |
| NAME = "Timeline Reconstruction Agent" | |
| def analyze(self, events): | |
| sorted_ev = sorted(events, key=lambda x: x.get("timestamp", "")) | |
| gaps = [] | |
| for i in range(len(sorted_ev)-1): | |
| try: | |
| t1 = datetime.fromisoformat(sorted_ev[i]["timestamp"]) | |
| t2 = datetime.fromisoformat(sorted_ev[i+1]["timestamp"]) | |
| gap = (t2-t1).total_seconds()/60 | |
| if gap > 30: | |
| gaps.append({"category": "TIMELINE_GAP", "gap_minutes": round(gap), | |
| "severity": "HIGH" if gap > 120 else "MODERATE", "confidence": 0.9}) | |
| except: pass | |
| findings = [{"category": "EVENT", **e} for e in sorted_ev] + gaps | |
| return AgentResult(self.NAME, findings, 0.88, f"{len(sorted_ev)} events, {len(gaps)} gaps") | |
| class CCTVAgent: | |
| NAME = "CCTV & Surveillance Agent" | |
| def analyze(self, evidence): | |
| cctv = [e for e in evidence if "cctv" in e.get("source","").lower() or "cam" in e.get("source","").lower()] | |
| findings = [] | |
| details_text = " ".join(e.get("details","").lower() for e in cctv) | |
| if "two" in details_text and ("single" in details_text or "one" in details_text): | |
| findings.append({"category": "PERSON_DISCREPANCY", "text": "Multiple arrived, fewer departed", "significance": "CRITICAL", "confidence": 0.92}) | |
| if "high speed" in details_text or "rapidly" in details_text: | |
| findings.append({"category": "RAPID_DEPARTURE", "text": "Rapid departure detected", "significance": "HIGH", "confidence": 0.88}) | |
| for e in cctv: | |
| findings.append({"category": "CCTV_EVENT", "text": e.get("details",""), "timestamp": e.get("timestamp",""), "confidence": 0.95}) | |
| return AgentResult(self.NAME, findings, 0.87, f"{len(cctv)} CCTV records, {len([f for f in findings if f.get('significance')])} patterns") | |
| class ToxicologyAgent: | |
| NAME = "Toxicology Intelligence Agent" | |
| RISK = {"fentanyl": 95, "cyanide": 98, "arsenic": 97, "cocaine": 60, "heroin": 70, | |
| "benzodiazepine": 45, "diazepam": 45, "alcohol": 25, "methamphetamine": 65} | |
| def analyze(self, text): | |
| findings = [] | |
| text_lower = text.lower() | |
| for sub, risk in self.RISK.items(): | |
| if sub in text_lower: | |
| findings.append({"category": "SUBSTANCE", "substance": sub, "risk_level": risk, "confidence": 0.9}) | |
| return AgentResult(self.NAME, findings, 0.9, f"{len(findings)} substances detected") | |
| class CorrelationAgent: | |
| NAME = "Cross-Evidence Correlation Agent" | |
| def analyze(self, all_results, evidence): | |
| findings = [] | |
| for i in range(len(evidence)): | |
| for j in range(i+1, min(i+5, len(evidence))): | |
| try: | |
| t1 = datetime.fromisoformat(evidence[i]["timestamp"]) | |
| t2 = datetime.fromisoformat(evidence[j]["timestamp"]) | |
| diff = abs((t2-t1).total_seconds())/60 | |
| if diff <= 15 and evidence[i].get("source") != evidence[j].get("source"): | |
| findings.append({"category": "TEMPORAL_CORRELATION", "time_diff_min": round(diff,1), | |
| "event_1": evidence[i].get("details","")[:40], "event_2": evidence[j].get("details","")[:40], | |
| "significance": "HIGH" if diff <= 5 else "MODERATE", "confidence": 0.85}) | |
| except: pass | |
| # Cross-agent | |
| has_defensive = any("defensive" in f.get("text","").lower() for r in all_results for f in r.findings) | |
| has_person_disc = any(f.get("category") == "PERSON_DISCREPANCY" for r in all_results for f in r.findings) | |
| if has_defensive and has_person_disc: | |
| findings.append({"category": "CROSS_EVIDENCE_CORRELATION", "significance": "CRITICAL", "confidence": 0.94, | |
| "text": "Defensive wounds + person count discrepancy = strong homicide indicator"}) | |
| return AgentResult(self.NAME, findings, 0.87, f"{len(findings)} correlations found") | |
| class ExplainabilityAgent: | |
| NAME = "Explainability (SHAP-style) Agent" | |
| def explain_risk(self, factors, score): | |
| findings = [] | |
| sorted_f = sorted(factors.items(), key=lambda x: x[1], reverse=True) | |
| total = max(sum(factors.values()), 1) | |
| for name, val in sorted_f: | |
| findings.append({"category": "RISK_FACTOR", "factor": name, "value": round(val,1), | |
| "contribution_pct": round(val/total*100, 1), "confidence": 0.95}) | |
| explanation = f"Risk {score:.1f}/100 driven by: " + ", ".join(f"{f[0]}({f[1]:.0f})" for f in sorted_f[:3]) | |
| findings.append({"category": "EXPLANATION", "text": explanation, | |
| "methodology": "SHAP-inspired factor attribution", | |
| "legal_note": "Advisory only — requires expert validation"}) | |
| return AgentResult(self.NAME, findings, 0.95, explanation) | |
| class RiskAgent: | |
| NAME = "Risk Scoring & Anomaly Agent" | |
| WEIGHTS = {"violence_severity": 0.25, "evidence_gaps": 0.12, "toxicology_risk": 0.10, | |
| "manner_complexity": 0.15, "digital_patterns": 0.13, "temporal_consistency": 0.10, "cross_evidence": 0.15} | |
| VIOLENCE = {"homicide": 95, "gunshot": 95, "stab": 90, "defensive wounds": 90, "ligature": 85, | |
| "blunt force trauma": 85, "subdural hematoma": 80, "asphyxia": 80, "petechial": 75} | |
| def analyze(self, all_results, text=""): | |
| factors = {} | |
| text_lower = text.lower() | |
| max_v = max((s for k,s in self.VIOLENCE.items() if k in text_lower), default=0) | |
| count = sum(1 for k in self.VIOLENCE if k in text_lower) | |
| factors["violence_severity"] = min(100, max_v * min(1.3, 1+count*0.05)) if max_v else 0 | |
| gaps = sum(1 for r in all_results for f in r.findings if f.get("category") == "TIMELINE_GAP") | |
| factors["evidence_gaps"] = min(100, gaps*25+20) | |
| tox = [f for r in all_results for f in r.findings if f.get("category") == "SUBSTANCE"] | |
| factors["toxicology_risk"] = max((f.get("risk_level",0) for f in tox), default=0) | |
| if "homicide" in text_lower: factors["manner_complexity"] = 95 | |
| elif "undetermined" in text_lower: factors["manner_complexity"] = 70 | |
| else: factors["manner_complexity"] = 30 | |
| critical = sum(1 for r in all_results for f in r.findings if f.get("significance") == "CRITICAL") | |
| high = sum(1 for r in all_results for f in r.findings if f.get("significance") == "HIGH") | |
| factors["digital_patterns"] = min(100, critical*30+high*15) | |
| factors["temporal_consistency"] = min(100, gaps*20+30) | |
| cross = sum(1 for r in all_results for f in r.findings if f.get("category") == "CROSS_EVIDENCE_CORRELATION") | |
| factors["cross_evidence"] = min(100, cross*25+10) | |
| score = sum(factors[k]*v for k,v in self.WEIGHTS.items()) | |
| level = "CRITICAL" if score >= 80 else "HIGH" if score >= 60 else "MODERATE" if score >= 40 else "LOW" | |
| findings = [{"category": "RISK_SCORE", "score": round(score,1), "level": level, "factors": factors}] | |
| # Anomalies | |
| if "defensive" in text_lower and "homicide" not in text_lower: | |
| findings.append({"category": "ANOMALY", "type": "manner_mismatch", "severity": "CRITICAL", | |
| "description": "Defensive wounds without homicide classification", | |
| "recommendation": "Review manner determination"}) | |
| if ("benzodiazepine" in text_lower or "diazepam" in text_lower) and "overdose" not in text_lower: | |
| findings.append({"category": "ANOMALY", "type": "sedation_indicator", "severity": "HIGH", | |
| "description": "Sedative in non-overdose death", | |
| "recommendation": "Consider incapacitation prior to injuries"}) | |
| return AgentResult(self.NAME, findings, 0.9, f"Risk: {score:.1f}/100 ({level})") | |
| # ═══ DUAL-MODE TOD ESTIMATION ═══ | |
| class DualModeTODEstimator: | |
| T_INITIAL = 37.2 | |
| RIGOR = {"absent": (0,3), "developing": (2,8), "full": (8,24), "resolving": (24,72)} | |
| LIVIDITY = {"absent": (0,1), "developing": (0.5,4), "present_movable": (2,12), "fixed": (8,200)} | |
| DECOMP = {"absent": (0,24), "early": (24,72), "bloating": (48,168), "advanced": (168,720)} | |
| def estimate(self, rectal_temp, ambient_temp, body_weight, corrective=1.0, | |
| rigor="absent", lividity="absent", decomp="absent", vitreous_potassium=None, **kw): | |
| phase = "late" if decomp in ["bloating","advanced"] else "early" | |
| if phase == "early": | |
| return self._early(rectal_temp, ambient_temp, body_weight, corrective, rigor, lividity, decomp) | |
| return self._late(decomp, vitreous_potassium) | |
| def _early(self, t_rec, t_amb, weight, corr, rigor, lividity, decomp): | |
| if abs(self.T_INITIAL - t_amb) < 0.1: return {"error": "Ambient too close", "phase": "early"} | |
| Q = (t_rec - t_amb) / (self.T_INITIAL - t_amb) | |
| if Q <= 0 or Q >= 1: return {"error": f"Q={Q:.3f} invalid", "phase": "early"} | |
| B = 1.2815 * ((corr*weight)**-0.625) + 0.0284 | |
| try: pmi = brentq(lambda t: 1.25*np.exp(-B*t)-0.25*np.exp(-5*B*t)-Q, 0.01, 200) | |
| except: pmi = None | |
| std = 2.8 if 50 <= weight <= 100 else 3.2 | |
| curve = [{"time": float(t), "temp": float(t_amb+(self.T_INITIAL-t_amb)*(1.25*np.exp(-B*t)-0.25*np.exp(-5*B*t)))} for t in np.linspace(0,48,80)] | |
| rigor_r = self.RIGOR.get(rigor, (0,72)) | |
| livid_r = self.LIVIDITY.get(lividity, (0,200)) | |
| conf = "HIGH" if pmi and abs(pmi-(rigor_r[0]+rigor_r[1])/2) < pmi*0.3 else "MODERATE" if pmi else "LOW" | |
| return {"phase": "early", "method": "Henssge Nomogram (1988)", "pmi_hours": round(pmi,2) if pmi else None, | |
| "lower_bound": round(max(0,pmi-std),1) if pmi else None, "upper_bound": round(pmi+std,1) if pmi else None, | |
| "confidence": conf, "signs": {"rigor": {"state": rigor, "range": rigor_r}, "lividity": {"state": lividity, "range": livid_r}}, | |
| "cooling_curve": curve} | |
| def _late(self, decomp, vk): | |
| dr = self.DECOMP.get(decomp, (72,720)) | |
| vk_est = max(0, (vk-5.5)/0.17) if vk else None | |
| est = ((dr[0]+dr[1])/2 + vk_est)/2 if vk_est else (dr[0]+dr[1])/2 | |
| return {"phase": "late", "method": "Metabolomic AI + Decomposition", "pmi_hours": round(est,1), | |
| "pmi_days": round(est/24,1), "lower_bound": dr[0], "upper_bound": dr[1], | |
| "confidence": "MODERATE" if vk_est else "LOW", "decomposition_stage": decomp} | |
| # ═══ DIGITAL STRATIGRAPHY ENGINE ═══ | |
| class DigitalStratigraphyEngine: | |
| STRATA = ["autopsy","cctv","mobile","gps","iot","toxicology","witness","forensic_lab"] | |
| def __init__(self): | |
| self.strata = {s: [] for s in self.STRATA} | |
| self.graph = ForensicGraph() | |
| self.chain = ChainOfCustody() | |
| def add_evidence(self, stratum, evidence): | |
| evidence["stratum"] = stratum | |
| evidence["id"] = str(uuid.uuid4())[:8] | |
| if stratum in self.strata: self.strata[stratum].append(evidence) | |
| self.chain.add_evidence(evidence["id"], stratum, json.dumps(evidence, default=str)) | |
| self.graph.add_entity(evidence["id"], stratum, {"details": evidence.get("details","")[:50]}) | |
| self._auto_correlate(evidence) | |
| def _auto_correlate(self, new_ev): | |
| ts = new_ev.get("timestamp") | |
| if not ts: return | |
| try: new_t = datetime.fromisoformat(ts) | |
| except: return | |
| for items in self.strata.values(): | |
| for item in items: | |
| if item["id"] == new_ev["id"]: continue | |
| try: | |
| it = datetime.fromisoformat(item.get("timestamp","")) | |
| diff = abs((new_t-it).total_seconds())/60 | |
| if diff <= 30: | |
| self.graph.add_relationship(new_ev["id"], item["id"], "temporal_proximity", {"diff_min": round(diff,1)}) | |
| except: pass | |
| def build_timeline(self): | |
| all_ev = [] | |
| for stratum, items in self.strata.items(): | |
| for item in items: | |
| all_ev.append({"timestamp": item.get("timestamp",""), "stratum": stratum, | |
| "source": item.get("source", stratum), "details": item.get("details",""), "id": item["id"]}) | |
| return sorted(all_ev, key=lambda x: x.get("timestamp","z")) | |
| def get_summary(self): | |
| return {"total": sum(len(v) for v in self.strata.values()), | |
| "strata": {k: len(v) for k,v in self.strata.items() if v}, | |
| "nodes": self.graph.G.number_of_nodes(), "edges": self.graph.G.number_of_edges(), | |
| "chain_blocks": len(self.chain.chain), "chain_valid": self.chain.verify_integrity()[0]} | |
| # ═══ CROSS-CASE INTELLIGENCE ═══ | |
| class CrossCaseIntelligence: | |
| PATTERNS = [ | |
| {"pattern": "sedation_homicide", "indicators": ["benzodiazepine", "ligature", "defensive wounds"], | |
| "description": "Victim sedated then killed — premeditated homicide pattern"}, | |
| {"pattern": "staged_suicide", "indicators": ["ligature", "defensive wounds", "manner.*suicide"], | |
| "description": "Scene staged as suicide but injuries inconsistent"}, | |
| {"pattern": "domestic_violence", "indicators": ["blunt force", "multiple contusions", "defensive"], | |
| "description": "Pattern consistent with domestic violence"}, | |
| ] | |
| def match(self, text): | |
| matches = [] | |
| text_lower = text.lower() | |
| for p in self.PATTERNS: | |
| score = sum(1 for i in p["indicators"] if re.search(i, text_lower)) / len(p["indicators"]) | |
| if score >= 0.5: | |
| matches.append({"pattern": p["pattern"], "match_score": round(score*100,1), "description": p["description"]}) | |
| return sorted(matches, key=lambda x: x["match_score"], reverse=True) | |
| # ═══ NL QUERY ENGINE ═══ | |
| class NLQueryEngine: | |
| PATTERNS = { | |
| r"(?i)timeline|chronolog|sequence": "timeline", | |
| r"(?i)risk|score|danger": "risk", | |
| r"(?i)anomal|suspicious|unusual": "anomaly", | |
| r"(?i)explain|why|how": "explain", | |
| r"(?i)chain.*custody|integrity|tamper": "chain", | |
| } | |
| def process(self, query, engine, results=None): | |
| qtype = "general" | |
| for pat, qt in self.PATTERNS.items(): | |
| if re.search(pat, query): qtype = qt; break | |
| if qtype == "timeline": | |
| tl = engine.build_timeline() | |
| if not tl: return "No timeline data. Add evidence first." | |
| return "📅 **Timeline:**\n" + "\n".join(f"{i}. `{e['timestamp']}` [{e['stratum']}] {e['details']}" for i,e in enumerate(tl[:12],1)) | |
| elif qtype == "risk": | |
| if results: | |
| rf = [f for r in results for f in r.findings if f.get("category") == "RISK_SCORE"] | |
| if rf: return f"⚠️ **Risk: {rf[0]['score']}/100 ({rf[0]['level']})**\n\nFactors:\n" + "\n".join(f"- {k}: {v:.0f}" for k,v in rf[0].get("factors",{}).items()) | |
| return "Run analysis first." | |
| elif qtype == "anomaly": | |
| if results: | |
| anom = [f for r in results for f in r.findings if f.get("category") == "ANOMALY"] | |
| if anom: return f"🚨 **{len(anom)} Anomalies:**\n" + "\n".join(f"- [{a['severity']}] {a['description']}" for a in anom) | |
| return "No anomalies found." | |
| elif qtype == "explain": | |
| if results: | |
| exp = [f for r in results for f in r.findings if f.get("category") == "EXPLANATION"] | |
| if exp: return f"🔍 **Explanation:**\n{exp[0]['text']}\n\n*{exp[0].get('methodology','')}*" | |
| return "No explanations available." | |
| elif qtype == "chain": | |
| s = engine.get_summary() | |
| return f"🔐 **Chain of Custody:** {'✅ VALID' if s['chain_valid'] else '❌ BROKEN'} ({s['chain_blocks']} blocks)" | |
| else: | |
| s = engine.get_summary() | |
| return f"🔬 **Status:** {s['total']} evidence items, {s['nodes']} graph nodes, chain: {'✅' if s['chain_valid'] else '❌'}\n\nTry: timeline, risk, anomalies, explain, chain custody" | |
| # ═══ EVIDENCE PRIORITIZER ═══ | |
| class EvidencePrioritizer: | |
| KEYWORDS = ["weapon","gun","knife","blood","dna","fingerprint","suspect","deleted","encrypted","poison","defensive","ligature","high speed","disconnect"] | |
| def prioritize(self, findings): | |
| scored = [] | |
| for f in findings: | |
| score = 30 | |
| text = (f.get("text","") + " " + f.get("details","")).lower() | |
| for kw in self.KEYWORDS: | |
| if kw in text: score += 12 | |
| if f.get("significance") in ["CRITICAL","HIGH"]: score += 25 | |
| if f.get("category") in ["ANOMALY","CROSS_EVIDENCE_CORRELATION"]: score += 20 | |
| scored.append({**f, "priority_score": min(100,score), "priority": "CRITICAL" if score>=80 else "HIGH" if score>=60 else "MEDIUM"}) | |
| return sorted(scored, key=lambda x: x["priority_score"], reverse=True) | |
| # ═══ ORCHESTRATOR ═══ | |
| class ForensicOrchestrator: | |
| def __init__(self): | |
| self.engine = DigitalStratigraphyEngine() | |
| self.tod = DualModeTODEstimator() | |
| self.agents = {"autopsy": AutopsyAgent(), "timeline": TimelineAgent(), "cctv": CCTVAgent(), | |
| "toxicology": ToxicologyAgent(), "correlation": CorrelationAgent(), | |
| "explainability": ExplainabilityAgent(), "risk": RiskAgent()} | |
| self.cross_case = CrossCaseIntelligence() | |
| self.prioritizer = EvidencePrioritizer() | |
| self.nl = NLQueryEngine() | |
| self.results = [] | |
| def ingest_report(self, text): | |
| self.engine.add_evidence("autopsy", {"details": text[:200], "source": "autopsy_report", "timestamp": datetime.now().isoformat()}) | |
| def ingest_evidence(self, evidence_list): | |
| for e in evidence_list: | |
| stratum = "cctv" if "cctv" in e.get("source","").lower() or "cam" in e.get("source","").lower() else \ | |
| "mobile" if "mobile" in e.get("source","").lower() else "iot" | |
| self.engine.add_evidence(stratum, e) | |
| def run_analysis(self, report, evidence=None): | |
| self.results = [] | |
| self.results.append(self.agents["autopsy"].analyze(report)) | |
| self.results.append(self.agents["toxicology"].analyze(report)) | |
| if evidence: | |
| self.results.append(self.agents["timeline"].analyze(evidence)) | |
| self.results.append(self.agents["cctv"].analyze(evidence)) | |
| self.results.append(self.agents["correlation"].analyze(self.results, evidence or [])) | |
| risk_result = self.agents["risk"].analyze(self.results, report) | |
| self.results.append(risk_result) | |
| rf = [f for f in risk_result.findings if f.get("category") == "RISK_SCORE"] | |
| if rf: | |
| self.results.append(self.agents["explainability"].explain_risk(rf[0]["factors"], rf[0]["score"])) | |
| all_findings = [f for r in self.results for f in r.findings] | |
| return { | |
| "agents": [r.to_dict() for r in self.results], | |
| "risk_score": rf[0]["score"] if rf else 0, | |
| "risk_level": rf[0]["level"] if rf else "UNKNOWN", | |
| "anomalies": [f for f in all_findings if f.get("category") == "ANOMALY"], | |
| "cross_case": self.cross_case.match(report), | |
| "prioritized": self.prioritizer.prioritize(all_findings)[:15], | |
| "timeline": self.engine.build_timeline(), | |
| "graph": self.engine.graph.to_visualization_data(), | |
| "patterns": self.engine.graph.get_suspicious_patterns(), | |
| "chain": {"valid": self.engine.chain.verify_integrity()[0], "blocks": len(self.engine.chain.chain)}, | |
| "stratigraphy": self.engine.get_summary(), | |
| } | |
| def query(self, question): | |
| return self.nl.process(question, self.engine, self.results) | |