Muthukumarank's picture
Add modules/tod_estimator.py
97fe0fa verified
"""
Module 2: Time-of-Death Estimation
====================================
Implements Henssge nomogram for PMI estimation + postmortem signs.
"""
import numpy as np
from scipy.optimize import brentq
from typing import Dict, Any
import plotly.graph_objects as go
from plotly.subplots import make_subplots
class TimeOfDeathEstimator:
"""Estimates post-mortem interval using multiple forensic methods."""
T_BODY_INITIAL = 37.2
RIGOR_TIMING = {
"absent": (0, 3, "Rigor not yet developed — suggests <3 hours PMI"),
"developing": (2, 8, "Rigor developing — suggests 2-8 hours PMI"),
"full": (8, 24, "Rigor fully developed — suggests 8-24 hours PMI"),
"resolving": (24, 72, "Rigor resolving — suggests 24-72 hours PMI"),
}
LIVIDITY_TIMING = {
"absent": (0, 1, "No lividity — very early postmortem (<1 hour)"),
"developing": (0.5, 4, "Lividity developing — suggests 30min-4 hours"),
"present_movable": (2, 12, "Lividity present but movable — suggests 2-12 hours"),
"fixed": (8, 999, "Lividity fixed — suggests >8 hours PMI"),
}
DECOMP_TIMING = {
"absent": (0, 24, "No decomposition — suggests <24 hours"),
"early_discoloration": (24, 72, "Early discoloration — suggests 24-72 hours"),
"bloating": (48, 168, "Bloating stage — suggests 2-7 days"),
"advanced": (168, 999, "Advanced decomposition — suggests >1 week"),
}
def estimate(self, t_rectal, t_ambient, body_weight, corrective_factor=1.0,
rigor="absent", lividity="absent", decomp="absent",
humidity=50, wind_speed=0) -> Dict[str, Any]:
"""Comprehensive PMI estimation."""
henssge_result = self._henssge_estimation(t_rectal, t_ambient, body_weight, corrective_factor)
signs_result = self._postmortem_signs_estimation(rigor, lividity, decomp)
env_correction = self._environmental_correction(humidity, wind_speed, corrective_factor)
combined = self._combine_estimates(henssge_result, signs_result, env_correction)
detail_md = self._generate_detail_markdown(
henssge_result, signs_result, env_correction, combined,
t_rectal, t_ambient, body_weight, corrective_factor
)
return {"summary": combined, "detail_markdown": detail_md,
"methods": {"henssge": henssge_result, "signs": signs_result, "env": env_correction}}
def _henssge_estimation(self, t_rectal, t_ambient, body_weight, corrective):
"""Henssge double-exponential cooling model."""
if abs(self.T_BODY_INITIAL - t_ambient) < 0.1:
return {"error": "Ambient temp too close to body initial temp", "pmi_hours": None}
Q = (t_rectal - t_ambient) / (self.T_BODY_INITIAL - t_ambient)
if Q <= 0 or Q >= 1.0:
return {"error": f"Temperature ratio Q={Q:.3f} out of valid range", "pmi_hours": None}
effective_weight = corrective * body_weight
B = 1.2815 * (effective_weight ** -0.625) + 0.0284
def cooling_equation(t):
return 1.25 * np.exp(-B * t) - 0.25 * np.exp(-5 * B * t) - Q
try:
pmi = brentq(cooling_equation, 0.01, 200, xtol=0.01)
std_error = 2.8 if 50 <= body_weight <= 100 else 3.2
if corrective < 0.7:
std_error *= 1.5
return {
"pmi_hours": round(pmi, 2),
"lower_95ci": round(max(0, pmi - std_error), 1),
"upper_95ci": round(pmi + std_error, 1),
"std_error_hours": std_error,
"Q_ratio": round(Q, 4),
"B_constant": round(B, 5),
"method": "Henssge Nomogram (1988)",
"reliability": "HIGH" if 0.2 < Q < 0.8 else "MODERATE"
}
except ValueError as e:
return {"error": str(e), "pmi_hours": None, "Q_ratio": round(Q, 4)}
def _postmortem_signs_estimation(self, rigor, lividity, decomp):
"""Estimate PMI from postmortem physical signs."""
estimates = []
if rigor in self.RIGOR_TIMING:
low, high, desc = self.RIGOR_TIMING[rigor]
estimates.append({"sign": "Rigor Mortis", "low": low, "high": high, "description": desc, "state": rigor})
if lividity in self.LIVIDITY_TIMING:
low, high, desc = self.LIVIDITY_TIMING[lividity]
estimates.append({"sign": "Lividity", "low": low, "high": high, "description": desc, "state": lividity})
if decomp in self.DECOMP_TIMING:
low, high, desc = self.DECOMP_TIMING[decomp]
estimates.append({"sign": "Decomposition", "low": low, "high": high, "description": desc, "state": decomp})
if not estimates:
return {"pmi_range_low": None, "pmi_range_high": None, "signs": []}
overall_low = max(e["low"] for e in estimates)
overall_high = min(e["high"] for e in estimates)
if overall_low > overall_high:
overall_low = min(e["low"] for e in estimates)
overall_high = max(e["high"] for e in estimates)
consistency = "INCONSISTENT"
else:
consistency = "CONSISTENT"
return {
"pmi_range_low": overall_low,
"pmi_range_high": min(overall_high, 168),
"consistency": consistency,
"signs": estimates,
}
def _environmental_correction(self, humidity, wind_speed, corrective):
humidity_factor = 1.0 + (humidity - 50) * 0.002
wind_factor = 1.0 - min(wind_speed * 0.01, 0.3)
combined_factor = humidity_factor * wind_factor
return {
"humidity_effect": round(humidity_factor, 3),
"wind_effect": round(wind_factor, 3),
"combined_correction": round(combined_factor, 3),
"note": f"Environmental factors {'accelerate' if combined_factor < 1 else 'decelerate'} cooling by {abs(1-combined_factor)*100:.1f}%"
}
def _combine_estimates(self, henssge, signs, env):
combined = {"method_agreement": "N/A", "confidence_level": "LOW"}
estimates = []
if henssge.get("pmi_hours") is not None:
pmi = henssge["pmi_hours"]
env_corrected = pmi * env.get("combined_correction", 1.0)
combined["estimated_pmi_hours"] = round(env_corrected, 1)
combined["henssge_pmi"] = round(pmi, 1)
combined["lower_bound"] = henssge.get("lower_95ci", round(pmi * 0.7, 1))
combined["upper_bound"] = henssge.get("upper_95ci", round(pmi * 1.3, 1))
estimates.append(env_corrected)
if signs.get("pmi_range_low") is not None:
signs_mid = (signs["pmi_range_low"] + signs["pmi_range_high"]) / 2
estimates.append(signs_mid)
combined["signs_pmi_range"] = f"{signs['pmi_range_low']}-{signs['pmi_range_high']}h"
if len(estimates) >= 2:
spread = max(estimates) - min(estimates)
mean_est = np.mean(estimates)
if mean_est > 0 and spread < mean_est * 0.3:
combined["method_agreement"] = "STRONG"
combined["confidence_level"] = "HIGH"
elif mean_est > 0 and spread < mean_est * 0.6:
combined["method_agreement"] = "MODERATE"
combined["confidence_level"] = "MODERATE"
else:
combined["method_agreement"] = "WEAK"
combined["confidence_level"] = "LOW"
elif len(estimates) == 1:
combined["confidence_level"] = "MODERATE"
combined["method_agreement"] = "SINGLE_METHOD"
if not combined.get("estimated_pmi_hours") and signs.get("pmi_range_low") is not None:
combined["estimated_pmi_hours"] = round((signs["pmi_range_low"] + signs["pmi_range_high"]) / 2, 1)
return combined
def _generate_detail_markdown(self, henssge, signs, env, combined, t_rectal, t_ambient, body_weight, corrective):
md = "## ⏱️ Time-of-Death Analysis Report\n\n"
if combined.get("estimated_pmi_hours"):
md += f"### 🎯 Estimated PMI: **{combined['estimated_pmi_hours']} hours**\n"
if combined.get("lower_bound"):
md += f"**95% CI:** {combined['lower_bound']}{combined['upper_bound']} hours\n"
md += f"**Agreement:** {combined['method_agreement']} | **Confidence:** {combined['confidence_level']}\n\n"
md += "---\n### 🌡️ Henssge Nomogram\n\n"
md += f"| Parameter | Value |\n|-----------|-------|\n"
md += f"| Rectal Temp | {t_rectal}°C |\n| Ambient Temp | {t_ambient}°C |\n"
md += f"| Body Weight | {body_weight} kg |\n| Corrective | {corrective} |\n"
if henssge.get("pmi_hours"):
md += f"| **PMI** | **{henssge['pmi_hours']} hours** |\n"
md += f"| Reliability | {henssge.get('reliability', 'N/A')} |\n"
elif henssge.get("error"):
md += f"\n⚠️ {henssge['error']}\n"
md += "\n---\n### 🔬 Postmortem Signs\n\n"
if signs.get("signs"):
md += "| Sign | State | Range |\n|------|-------|-------|\n"
for s in signs["signs"]:
h = f"{s['high']}h" if s['high'] < 999 else ">8h"
md += f"| {s['sign']} | {s['state']} | {s['low']}-{h} |\n"
md += f"\n**Consistency:** {signs.get('consistency', 'N/A')}\n"
md += f"\n---\n### 🌤️ Environmental Correction: {env.get('combined_correction', 'N/A')}×\n"
md += f"{env.get('note', '')}\n"
md += "\n---\n*All estimates are approximations requiring expert corroboration.*\n"
return md
def plot_cooling_curve(self, t_rectal, t_ambient, body_weight, corrective):
"""Generate cooling curve visualization."""
effective_weight = corrective * body_weight
B = 1.2815 * (effective_weight ** -0.625) + 0.0284
time_range = np.linspace(0, 48, 200)
temp_curve = t_ambient + (self.T_BODY_INITIAL - t_ambient) * (
1.25 * np.exp(-B * time_range) - 0.25 * np.exp(-5 * B * time_range)
)
Q = (t_rectal - t_ambient) / (self.T_BODY_INITIAL - t_ambient)
estimated_pmi = None
try:
def eq(t):
return 1.25 * np.exp(-B * t) - 0.25 * np.exp(-5 * B * t) - Q
estimated_pmi = brentq(eq, 0.01, 200)
except:
pass
fig = go.Figure()
fig.add_trace(go.Scatter(
x=time_range, y=temp_curve, mode='lines',
name='Cooling Curve', line=dict(color='#79c0ff', width=3),
hovertemplate='PMI: %{x:.1f}h<br>Temp: %{y:.1f}°C<extra></extra>'
))
fig.add_hline(y=t_ambient, line_dash="dash", line_color="#56d364",
annotation_text=f"Ambient: {t_ambient}°C")
fig.add_hline(y=self.T_BODY_INITIAL, line_dash="dot", line_color="#ffa657",
annotation_text=f"Initial: {self.T_BODY_INITIAL}°C")
if estimated_pmi is not None:
fig.add_trace(go.Scatter(
x=[estimated_pmi], y=[t_rectal], mode='markers+text',
name=f'Measured ({t_rectal}°C)',
marker=dict(color='#f85149', size=15, symbol='x'),
text=[f"PMI ≈ {estimated_pmi:.1f}h"], textposition="top center",
textfont=dict(color='#f85149', size=12)
))
fig.add_vrect(x0=max(0, estimated_pmi - 2.8), x1=estimated_pmi + 2.8,
fillcolor="rgba(248, 81, 73, 0.1)", layer="below", line_width=0)
fig.update_layout(
title="Body Cooling Curve (Henssge Model)",
xaxis_title="Post-Mortem Interval (hours)",
yaxis_title="Body Temperature (°C)",
template="plotly_dark", paper_bgcolor="#0d1117", plot_bgcolor="#161b22",
font=dict(color="#e6edf3"), height=450, showlegend=True,
)
fig.update_xaxes(gridcolor="#30363d", range=[0, 48])
fig.update_yaxes(gridcolor="#30363d", range=[t_ambient - 2, 38])
return fig