""" Module 4: Case Risk Scoring & Anomaly Detection ================================================= Multi-factor weighted risk scoring with anomaly detection. """ import numpy as np import pandas as pd from typing import Dict, List, Any import plotly.graph_objects as go from plotly.subplots import make_subplots class CaseRiskScorer: """Multi-dimensional case risk assessment.""" FACTOR_WEIGHTS = { "violence_severity": 0.25, "evidence_gaps": 0.15, "toxicology_risk": 0.10, "manner_complexity": 0.15, "digital_patterns": 0.15, "temporal_consistency": 0.10, "evidence_quantity": 0.10, } VIOLENCE_KEYWORDS = { "homicide": 95, "gunshot": 95, "stab": 90, "defensive wounds": 90, "ligature": 85, "strangulation": 90, "blunt force trauma": 85, "subdural hematoma": 80, "hemorrhage": 75, "fracture": 70, "contusion": 65, "laceration": 70, "asphyxia": 80, "petechial": 75, "abrasion": 40, "drowning": 60, "poisoning": 70, "overdose": 50, "natural": 10, "cardiac": 15, } TOX_RISK = { "fentanyl": 85, "cyanide": 95, "arsenic": 95, "cocaine": 55, "heroin": 65, "benzodiazepine": 40, "diazepam": 40, "alcohol": 30, "no substances": 5, "trace levels": 25, } def compute_risk(self, state: Dict) -> Dict[str, Any]: features = self._extract_features(state) factor_scores = self._compute_factor_scores(features) risk_score = sum(factor_scores[k] * v for k, v in self.FACTOR_WEIGHTS.items()) risk_score = round(risk_score, 1) risk_classification = self._classify(risk_score) anomalies = self._detect_anomalies(features, factor_scores) anomaly_plot = self._build_plot(factor_scores, risk_score) explanation_md = self._explain(factor_scores, anomalies, risk_score, risk_classification) return { "risk_score": risk_score, "risk_classification": risk_classification, "anomaly_plot": anomaly_plot, "explanation_markdown": explanation_md, "anomalies": anomalies, } def _extract_features(self, state): features = {"report_text": state.get("report_text", ""), "injuries": [], "toxicology": [], "causes": [], "manner": [], "digital_evidence": state.get("digital_evidence", []), "correlations": state.get("correlations", []), "tod_estimate": state.get("tod_estimate", {})} entities = state.get("entities", []) for e in entities: label = e.label if hasattr(e, "label") else e.get("label", "") text = e.text if hasattr(e, "text") else e.get("text", "") if label == "INJURY": features["injuries"].append(text) elif label == "TOXICOLOGY": features["toxicology"].append(text) elif label == "CAUSE_OF_DEATH": features["causes"].append(text) elif label == "MANNER_OF_DEATH": features["manner"].append(text) return features def _compute_factor_scores(self, features): scores = {} # Violence all_text = " ".join(features["injuries"] + features["causes"]).lower() max_v = max((s for k, s in self.VIOLENCE_KEYWORDS.items() if k in all_text), default=0) count = sum(1 for k in self.VIOLENCE_KEYWORDS if k in all_text) scores["violence_severity"] = min(100, max_v * min(1.3, 1.0 + count * 0.05)) if max_v else 0 # Evidence gaps corrs = features.get("correlations", []) high_sig = sum(1 for c in corrs if c.get("significance") == "HIGH") scores["evidence_gaps"] = min(100, high_sig * 20 + 30) if corrs else 50 # Toxicology tox_text = " ".join(features["toxicology"]).lower() scores["toxicology_risk"] = max((s for k, s in self.TOX_RISK.items() if k in tox_text), default=0) # Manner manner_text = " ".join(features["manner"] + features["causes"]).lower() if "homicide" in manner_text: scores["manner_complexity"] = 95 elif "undetermined" in manner_text: scores["manner_complexity"] = 70 elif "suicide" in manner_text: scores["manner_complexity"] = 60 elif "accident" in manner_text: scores["manner_complexity"] = 40 elif "natural" in manner_text: scores["manner_complexity"] = 15 else: scores["manner_complexity"] = 50 # Digital patterns critical = sum(1 for c in corrs if c.get("significance") == "CRITICAL") high = sum(1 for c in corrs if c.get("significance") == "HIGH") scores["digital_patterns"] = min(100, critical * 30 + high * 15 + 10) if corrs else 30 # Temporal tod = features.get("tod_estimate", {}) agree = tod.get("method_agreement", "N/A") scores["temporal_consistency"] = {"STRONG": 20, "MODERATE": 50, "WEAK": 80}.get(agree, 40) # Evidence quantity has = sum([bool(features["injuries"]), bool(tod), bool(features["digital_evidence"])]) scores["evidence_quantity"] = {3: 70, 2: 50, 1: 30}.get(has, 10) return scores def _classify(self, score): if score >= 75: return {"🔴 HIGH RISK": score / 100} elif score >= 50: return {"🟡 MODERATE RISK": score / 100} elif score >= 25: return {"🟢 LOW RISK": score / 100} return {"⚪ MINIMAL RISK": score / 100} def _detect_anomalies(self, features, scores): anomalies = [] injuries_text = " ".join(features["injuries"]).lower() manner_text = " ".join(features["manner"]).lower() if "defensive" in injuries_text and "homicide" not in manner_text and manner_text: anomalies.append({"type": "Defensive wounds without homicide classification", "severity": "CRITICAL", "recommendation": "Review manner — defensive wounds suggest interpersonal violence"}) if scores["violence_severity"] > 70 and "accident" in manner_text: anomalies.append({"type": "Violence-manner mismatch", "severity": "HIGH", "recommendation": "High violence inconsistent with accidental manner"}) tox_text = " ".join(features["toxicology"]).lower() if any(d in tox_text for d in ["benzodiazepine", "diazepam"]) and "overdose" not in " ".join(features["causes"]).lower(): anomalies.append({"type": "Sedative in non-overdose death", "severity": "HIGH", "recommendation": "Consider incapacitation prior to injuries"}) if len(features["causes"]) > 2: anomalies.append({"type": "Multiple causes of death", "severity": "MODERATE", "recommendation": "Review for staged scene or complex mechanism"}) return anomalies def _build_plot(self, factor_scores, risk_score): fig = make_subplots(rows=1, cols=2, specs=[[{"type": "polar"}, {"type": "xy"}]], subplot_titles=("Risk Radar", "Factor Scores")) cats = [k.replace("_", " ").title() for k in factor_scores.keys()] vals = list(factor_scores.values()) fig.add_trace(go.Scatterpolar(r=vals + [vals[0]], theta=cats + [cats[0]], fill="toself", fillcolor="rgba(248, 81, 73, 0.3)", line=dict(color="#f85149", width=2), name="Risk"), row=1, col=1) colors = ["#f85149" if v >= 70 else "#ffa657" if v >= 40 else "#56d364" for v in vals] fig.add_trace(go.Bar(x=vals, y=cats, orientation="h", marker_color=colors, text=[f"{v:.0f}" for v in vals], textposition="inside", name="Score"), row=1, col=2) fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117", plot_bgcolor="#161b22", font=dict(color="#e6edf3"), height=420, showlegend=False, polar=dict(radialaxis=dict(range=[0, 100], gridcolor="#30363d"), bgcolor="#161b22")) fig.update_xaxes(gridcolor="#30363d", range=[0, 100], row=1, col=2) fig.update_yaxes(gridcolor="#30363d", row=1, col=2) return fig def _explain(self, factor_scores, anomalies, risk_score, classification): label = list(classification.keys())[0] md = f"## ⚠️ Risk Assessment: **{risk_score:.1f}/100** — {label}\n\n" md += "### Factor Breakdown\n| Factor | Score | Level |\n|--------|-------|-------|\n" for f, s in sorted(factor_scores.items(), key=lambda x: -x[1]): lvl = "🔴 HIGH" if s >= 70 else "🟡 MOD" if s >= 40 else "🟢 LOW" md += f"| {f.replace('_', ' ').title()} | {s:.0f} | {lvl} |\n" md += "\n" if anomalies: md += f"### 🚨 Anomalies ({len(anomalies)})\n\n" for a in anomalies: icon = "🚨" if a["severity"] == "CRITICAL" else "⚠️" md += f"{icon} **{a['type']}** [{a['severity']}]\n- 💡 {a['recommendation']}\n\n" md += "---\n### Recommendations\n" if risk_score >= 75: md += "1. 🚨 **PRIORITY CASE** — Assign senior investigative team\n2. Preserve all evidence\n3. Cross-reference with case database\n" elif risk_score >= 50: md += "1. ⚠️ Thorough investigation warranted\n2. Follow up on anomalies\n3. Consider additional forensic tests\n" else: md += "1. ✅ Standard procedures recommended\n2. Monitor for new evidence\n" md += "\n*Risk scores are advisory — final determination by qualified investigators.*\n" return md