Muthukumarank's picture
Add modules/risk_scorer.py
b6ab230 verified
"""
Module 4: Case Risk Scoring & Anomaly Detection
=================================================
Multi-factor weighted risk scoring with anomaly detection.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Any
import plotly.graph_objects as go
from plotly.subplots import make_subplots
class CaseRiskScorer:
"""Multi-dimensional case risk assessment."""
FACTOR_WEIGHTS = {
"violence_severity": 0.25,
"evidence_gaps": 0.15,
"toxicology_risk": 0.10,
"manner_complexity": 0.15,
"digital_patterns": 0.15,
"temporal_consistency": 0.10,
"evidence_quantity": 0.10,
}
VIOLENCE_KEYWORDS = {
"homicide": 95, "gunshot": 95, "stab": 90, "defensive wounds": 90,
"ligature": 85, "strangulation": 90, "blunt force trauma": 85,
"subdural hematoma": 80, "hemorrhage": 75, "fracture": 70,
"contusion": 65, "laceration": 70, "asphyxia": 80, "petechial": 75,
"abrasion": 40, "drowning": 60, "poisoning": 70, "overdose": 50,
"natural": 10, "cardiac": 15,
}
TOX_RISK = {
"fentanyl": 85, "cyanide": 95, "arsenic": 95, "cocaine": 55,
"heroin": 65, "benzodiazepine": 40, "diazepam": 40,
"alcohol": 30, "no substances": 5, "trace levels": 25,
}
def compute_risk(self, state: Dict) -> Dict[str, Any]:
features = self._extract_features(state)
factor_scores = self._compute_factor_scores(features)
risk_score = sum(factor_scores[k] * v for k, v in self.FACTOR_WEIGHTS.items())
risk_score = round(risk_score, 1)
risk_classification = self._classify(risk_score)
anomalies = self._detect_anomalies(features, factor_scores)
anomaly_plot = self._build_plot(factor_scores, risk_score)
explanation_md = self._explain(factor_scores, anomalies, risk_score, risk_classification)
return {
"risk_score": risk_score,
"risk_classification": risk_classification,
"anomaly_plot": anomaly_plot,
"explanation_markdown": explanation_md,
"anomalies": anomalies,
}
def _extract_features(self, state):
features = {"report_text": state.get("report_text", ""), "injuries": [], "toxicology": [],
"causes": [], "manner": [], "digital_evidence": state.get("digital_evidence", []),
"correlations": state.get("correlations", []), "tod_estimate": state.get("tod_estimate", {})}
entities = state.get("entities", [])
for e in entities:
label = e.label if hasattr(e, "label") else e.get("label", "")
text = e.text if hasattr(e, "text") else e.get("text", "")
if label == "INJURY": features["injuries"].append(text)
elif label == "TOXICOLOGY": features["toxicology"].append(text)
elif label == "CAUSE_OF_DEATH": features["causes"].append(text)
elif label == "MANNER_OF_DEATH": features["manner"].append(text)
return features
def _compute_factor_scores(self, features):
scores = {}
# Violence
all_text = " ".join(features["injuries"] + features["causes"]).lower()
max_v = max((s for k, s in self.VIOLENCE_KEYWORDS.items() if k in all_text), default=0)
count = sum(1 for k in self.VIOLENCE_KEYWORDS if k in all_text)
scores["violence_severity"] = min(100, max_v * min(1.3, 1.0 + count * 0.05)) if max_v else 0
# Evidence gaps
corrs = features.get("correlations", [])
high_sig = sum(1 for c in corrs if c.get("significance") == "HIGH")
scores["evidence_gaps"] = min(100, high_sig * 20 + 30) if corrs else 50
# Toxicology
tox_text = " ".join(features["toxicology"]).lower()
scores["toxicology_risk"] = max((s for k, s in self.TOX_RISK.items() if k in tox_text), default=0)
# Manner
manner_text = " ".join(features["manner"] + features["causes"]).lower()
if "homicide" in manner_text: scores["manner_complexity"] = 95
elif "undetermined" in manner_text: scores["manner_complexity"] = 70
elif "suicide" in manner_text: scores["manner_complexity"] = 60
elif "accident" in manner_text: scores["manner_complexity"] = 40
elif "natural" in manner_text: scores["manner_complexity"] = 15
else: scores["manner_complexity"] = 50
# Digital patterns
critical = sum(1 for c in corrs if c.get("significance") == "CRITICAL")
high = sum(1 for c in corrs if c.get("significance") == "HIGH")
scores["digital_patterns"] = min(100, critical * 30 + high * 15 + 10) if corrs else 30
# Temporal
tod = features.get("tod_estimate", {})
agree = tod.get("method_agreement", "N/A")
scores["temporal_consistency"] = {"STRONG": 20, "MODERATE": 50, "WEAK": 80}.get(agree, 40)
# Evidence quantity
has = sum([bool(features["injuries"]), bool(tod), bool(features["digital_evidence"])])
scores["evidence_quantity"] = {3: 70, 2: 50, 1: 30}.get(has, 10)
return scores
def _classify(self, score):
if score >= 75: return {"πŸ”΄ HIGH RISK": score / 100}
elif score >= 50: return {"🟑 MODERATE RISK": score / 100}
elif score >= 25: return {"🟒 LOW RISK": score / 100}
return {"βšͺ MINIMAL RISK": score / 100}
def _detect_anomalies(self, features, scores):
anomalies = []
injuries_text = " ".join(features["injuries"]).lower()
manner_text = " ".join(features["manner"]).lower()
if "defensive" in injuries_text and "homicide" not in manner_text and manner_text:
anomalies.append({"type": "Defensive wounds without homicide classification",
"severity": "CRITICAL", "recommendation": "Review manner β€” defensive wounds suggest interpersonal violence"})
if scores["violence_severity"] > 70 and "accident" in manner_text:
anomalies.append({"type": "Violence-manner mismatch",
"severity": "HIGH", "recommendation": "High violence inconsistent with accidental manner"})
tox_text = " ".join(features["toxicology"]).lower()
if any(d in tox_text for d in ["benzodiazepine", "diazepam"]) and "overdose" not in " ".join(features["causes"]).lower():
anomalies.append({"type": "Sedative in non-overdose death",
"severity": "HIGH", "recommendation": "Consider incapacitation prior to injuries"})
if len(features["causes"]) > 2:
anomalies.append({"type": "Multiple causes of death",
"severity": "MODERATE", "recommendation": "Review for staged scene or complex mechanism"})
return anomalies
def _build_plot(self, factor_scores, risk_score):
fig = make_subplots(rows=1, cols=2,
specs=[[{"type": "polar"}, {"type": "xy"}]],
subplot_titles=("Risk Radar", "Factor Scores"))
cats = [k.replace("_", " ").title() for k in factor_scores.keys()]
vals = list(factor_scores.values())
fig.add_trace(go.Scatterpolar(r=vals + [vals[0]], theta=cats + [cats[0]],
fill="toself", fillcolor="rgba(248, 81, 73, 0.3)",
line=dict(color="#f85149", width=2), name="Risk"), row=1, col=1)
colors = ["#f85149" if v >= 70 else "#ffa657" if v >= 40 else "#56d364" for v in vals]
fig.add_trace(go.Bar(x=vals, y=cats, orientation="h", marker_color=colors,
text=[f"{v:.0f}" for v in vals], textposition="inside", name="Score"), row=1, col=2)
fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117",
plot_bgcolor="#161b22", font=dict(color="#e6edf3"), height=420, showlegend=False,
polar=dict(radialaxis=dict(range=[0, 100], gridcolor="#30363d"), bgcolor="#161b22"))
fig.update_xaxes(gridcolor="#30363d", range=[0, 100], row=1, col=2)
fig.update_yaxes(gridcolor="#30363d", row=1, col=2)
return fig
def _explain(self, factor_scores, anomalies, risk_score, classification):
label = list(classification.keys())[0]
md = f"## ⚠️ Risk Assessment: **{risk_score:.1f}/100** β€” {label}\n\n"
md += "### Factor Breakdown\n| Factor | Score | Level |\n|--------|-------|-------|\n"
for f, s in sorted(factor_scores.items(), key=lambda x: -x[1]):
lvl = "πŸ”΄ HIGH" if s >= 70 else "🟑 MOD" if s >= 40 else "🟒 LOW"
md += f"| {f.replace('_', ' ').title()} | {s:.0f} | {lvl} |\n"
md += "\n"
if anomalies:
md += f"### 🚨 Anomalies ({len(anomalies)})\n\n"
for a in anomalies:
icon = "🚨" if a["severity"] == "CRITICAL" else "⚠️"
md += f"{icon} **{a['type']}** [{a['severity']}]\n- πŸ’‘ {a['recommendation']}\n\n"
md += "---\n### Recommendations\n"
if risk_score >= 75:
md += "1. 🚨 **PRIORITY CASE** β€” Assign senior investigative team\n2. Preserve all evidence\n3. Cross-reference with case database\n"
elif risk_score >= 50:
md += "1. ⚠️ Thorough investigation warranted\n2. Follow up on anomalies\n3. Consider additional forensic tests\n"
else:
md += "1. βœ… Standard procedures recommended\n2. Monitor for new evidence\n"
md += "\n*Risk scores are advisory β€” final determination by qualified investigators.*\n"
return md