| """ |
| Module 4: Case Risk Scoring & Anomaly Detection |
| ================================================= |
| Multi-factor weighted risk scoring with anomaly detection. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| from typing import Dict, List, Any |
| import plotly.graph_objects as go |
| from plotly.subplots import make_subplots |
|
|
|
|
| class CaseRiskScorer: |
| """Multi-dimensional case risk assessment.""" |
|
|
| FACTOR_WEIGHTS = { |
| "violence_severity": 0.25, |
| "evidence_gaps": 0.15, |
| "toxicology_risk": 0.10, |
| "manner_complexity": 0.15, |
| "digital_patterns": 0.15, |
| "temporal_consistency": 0.10, |
| "evidence_quantity": 0.10, |
| } |
|
|
| VIOLENCE_KEYWORDS = { |
| "homicide": 95, "gunshot": 95, "stab": 90, "defensive wounds": 90, |
| "ligature": 85, "strangulation": 90, "blunt force trauma": 85, |
| "subdural hematoma": 80, "hemorrhage": 75, "fracture": 70, |
| "contusion": 65, "laceration": 70, "asphyxia": 80, "petechial": 75, |
| "abrasion": 40, "drowning": 60, "poisoning": 70, "overdose": 50, |
| "natural": 10, "cardiac": 15, |
| } |
|
|
| TOX_RISK = { |
| "fentanyl": 85, "cyanide": 95, "arsenic": 95, "cocaine": 55, |
| "heroin": 65, "benzodiazepine": 40, "diazepam": 40, |
| "alcohol": 30, "no substances": 5, "trace levels": 25, |
| } |
|
|
| def compute_risk(self, state: Dict) -> Dict[str, Any]: |
| features = self._extract_features(state) |
| factor_scores = self._compute_factor_scores(features) |
| risk_score = sum(factor_scores[k] * v for k, v in self.FACTOR_WEIGHTS.items()) |
| risk_score = round(risk_score, 1) |
| risk_classification = self._classify(risk_score) |
| anomalies = self._detect_anomalies(features, factor_scores) |
| anomaly_plot = self._build_plot(factor_scores, risk_score) |
| explanation_md = self._explain(factor_scores, anomalies, risk_score, risk_classification) |
|
|
| return { |
| "risk_score": risk_score, |
| "risk_classification": risk_classification, |
| "anomaly_plot": anomaly_plot, |
| "explanation_markdown": explanation_md, |
| "anomalies": anomalies, |
| } |
|
|
| def _extract_features(self, state): |
| features = {"report_text": state.get("report_text", ""), "injuries": [], "toxicology": [], |
| "causes": [], "manner": [], "digital_evidence": state.get("digital_evidence", []), |
| "correlations": state.get("correlations", []), "tod_estimate": state.get("tod_estimate", {})} |
|
|
| entities = state.get("entities", []) |
| for e in entities: |
| label = e.label if hasattr(e, "label") else e.get("label", "") |
| text = e.text if hasattr(e, "text") else e.get("text", "") |
| if label == "INJURY": features["injuries"].append(text) |
| elif label == "TOXICOLOGY": features["toxicology"].append(text) |
| elif label == "CAUSE_OF_DEATH": features["causes"].append(text) |
| elif label == "MANNER_OF_DEATH": features["manner"].append(text) |
| return features |
|
|
| def _compute_factor_scores(self, features): |
| scores = {} |
| |
| all_text = " ".join(features["injuries"] + features["causes"]).lower() |
| max_v = max((s for k, s in self.VIOLENCE_KEYWORDS.items() if k in all_text), default=0) |
| count = sum(1 for k in self.VIOLENCE_KEYWORDS if k in all_text) |
| scores["violence_severity"] = min(100, max_v * min(1.3, 1.0 + count * 0.05)) if max_v else 0 |
|
|
| |
| corrs = features.get("correlations", []) |
| high_sig = sum(1 for c in corrs if c.get("significance") == "HIGH") |
| scores["evidence_gaps"] = min(100, high_sig * 20 + 30) if corrs else 50 |
|
|
| |
| tox_text = " ".join(features["toxicology"]).lower() |
| scores["toxicology_risk"] = max((s for k, s in self.TOX_RISK.items() if k in tox_text), default=0) |
|
|
| |
| manner_text = " ".join(features["manner"] + features["causes"]).lower() |
| if "homicide" in manner_text: scores["manner_complexity"] = 95 |
| elif "undetermined" in manner_text: scores["manner_complexity"] = 70 |
| elif "suicide" in manner_text: scores["manner_complexity"] = 60 |
| elif "accident" in manner_text: scores["manner_complexity"] = 40 |
| elif "natural" in manner_text: scores["manner_complexity"] = 15 |
| else: scores["manner_complexity"] = 50 |
|
|
| |
| critical = sum(1 for c in corrs if c.get("significance") == "CRITICAL") |
| high = sum(1 for c in corrs if c.get("significance") == "HIGH") |
| scores["digital_patterns"] = min(100, critical * 30 + high * 15 + 10) if corrs else 30 |
|
|
| |
| tod = features.get("tod_estimate", {}) |
| agree = tod.get("method_agreement", "N/A") |
| scores["temporal_consistency"] = {"STRONG": 20, "MODERATE": 50, "WEAK": 80}.get(agree, 40) |
|
|
| |
| has = sum([bool(features["injuries"]), bool(tod), bool(features["digital_evidence"])]) |
| scores["evidence_quantity"] = {3: 70, 2: 50, 1: 30}.get(has, 10) |
|
|
| return scores |
|
|
| def _classify(self, score): |
| if score >= 75: return {"π΄ HIGH RISK": score / 100} |
| elif score >= 50: return {"π‘ MODERATE RISK": score / 100} |
| elif score >= 25: return {"π’ LOW RISK": score / 100} |
| return {"βͺ MINIMAL RISK": score / 100} |
|
|
| def _detect_anomalies(self, features, scores): |
| anomalies = [] |
| injuries_text = " ".join(features["injuries"]).lower() |
| manner_text = " ".join(features["manner"]).lower() |
|
|
| if "defensive" in injuries_text and "homicide" not in manner_text and manner_text: |
| anomalies.append({"type": "Defensive wounds without homicide classification", |
| "severity": "CRITICAL", "recommendation": "Review manner β defensive wounds suggest interpersonal violence"}) |
|
|
| if scores["violence_severity"] > 70 and "accident" in manner_text: |
| anomalies.append({"type": "Violence-manner mismatch", |
| "severity": "HIGH", "recommendation": "High violence inconsistent with accidental manner"}) |
|
|
| tox_text = " ".join(features["toxicology"]).lower() |
| if any(d in tox_text for d in ["benzodiazepine", "diazepam"]) and "overdose" not in " ".join(features["causes"]).lower(): |
| anomalies.append({"type": "Sedative in non-overdose death", |
| "severity": "HIGH", "recommendation": "Consider incapacitation prior to injuries"}) |
|
|
| if len(features["causes"]) > 2: |
| anomalies.append({"type": "Multiple causes of death", |
| "severity": "MODERATE", "recommendation": "Review for staged scene or complex mechanism"}) |
| return anomalies |
|
|
| def _build_plot(self, factor_scores, risk_score): |
| fig = make_subplots(rows=1, cols=2, |
| specs=[[{"type": "polar"}, {"type": "xy"}]], |
| subplot_titles=("Risk Radar", "Factor Scores")) |
|
|
| cats = [k.replace("_", " ").title() for k in factor_scores.keys()] |
| vals = list(factor_scores.values()) |
|
|
| fig.add_trace(go.Scatterpolar(r=vals + [vals[0]], theta=cats + [cats[0]], |
| fill="toself", fillcolor="rgba(248, 81, 73, 0.3)", |
| line=dict(color="#f85149", width=2), name="Risk"), row=1, col=1) |
|
|
| colors = ["#f85149" if v >= 70 else "#ffa657" if v >= 40 else "#56d364" for v in vals] |
| fig.add_trace(go.Bar(x=vals, y=cats, orientation="h", marker_color=colors, |
| text=[f"{v:.0f}" for v in vals], textposition="inside", name="Score"), row=1, col=2) |
|
|
| fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117", |
| plot_bgcolor="#161b22", font=dict(color="#e6edf3"), height=420, showlegend=False, |
| polar=dict(radialaxis=dict(range=[0, 100], gridcolor="#30363d"), bgcolor="#161b22")) |
| fig.update_xaxes(gridcolor="#30363d", range=[0, 100], row=1, col=2) |
| fig.update_yaxes(gridcolor="#30363d", row=1, col=2) |
| return fig |
|
|
| def _explain(self, factor_scores, anomalies, risk_score, classification): |
| label = list(classification.keys())[0] |
| md = f"## β οΈ Risk Assessment: **{risk_score:.1f}/100** β {label}\n\n" |
| md += "### Factor Breakdown\n| Factor | Score | Level |\n|--------|-------|-------|\n" |
| for f, s in sorted(factor_scores.items(), key=lambda x: -x[1]): |
| lvl = "π΄ HIGH" if s >= 70 else "π‘ MOD" if s >= 40 else "π’ LOW" |
| md += f"| {f.replace('_', ' ').title()} | {s:.0f} | {lvl} |\n" |
| md += "\n" |
|
|
| if anomalies: |
| md += f"### π¨ Anomalies ({len(anomalies)})\n\n" |
| for a in anomalies: |
| icon = "π¨" if a["severity"] == "CRITICAL" else "β οΈ" |
| md += f"{icon} **{a['type']}** [{a['severity']}]\n- π‘ {a['recommendation']}\n\n" |
|
|
| md += "---\n### Recommendations\n" |
| if risk_score >= 75: |
| md += "1. π¨ **PRIORITY CASE** β Assign senior investigative team\n2. Preserve all evidence\n3. Cross-reference with case database\n" |
| elif risk_score >= 50: |
| md += "1. β οΈ Thorough investigation warranted\n2. Follow up on anomalies\n3. Consider additional forensic tests\n" |
| else: |
| md += "1. β
Standard procedures recommended\n2. Monitor for new evidence\n" |
| md += "\n*Risk scores are advisory β final determination by qualified investigators.*\n" |
| return md |
|
|