forensic-triage-intelligence / modules /autopsy_analyzer.py
Muthukumarank's picture
Add modules/autopsy_analyzer.py
c0e4f03 verified
"""
Module 1: AI-Based Autopsy Report Analysis
==========================================
Extracts forensic entities from unstructured autopsy reports using
pattern-based NLP with forensic-specific rules.
"""
import re
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass
@dataclass
class ForensicEntity:
text: str
label: str
start: int
end: int
confidence: float
context: str = ""
class AutopsyAnalyzer:
"""NLP-based forensic entity extractor for autopsy reports."""
def __init__(self):
self._compile_patterns()
def _compile_patterns(self):
"""Compile regex patterns for forensic entity extraction."""
self.patterns = {
"CAUSE_OF_DEATH": [
r"(?i)cause\s+of\s+death[:\s]*([^\n.]{5,120})",
r"(?i)died\s+(?:of|from|due\s+to)\s+([^\n.]+)",
r"(?i)fatal\s+([^\n.,]+(?:injury|trauma|hemorrhage|asphyxia|poisoning|failure))",
r"(?i)(?:primary|immediate|underlying)\s+cause[:\s]*([^\n.]+)",
r"(?i)(asphyxia\s+due\s+to\s+[^\n.,]+)",
r"(?i)(blunt force (?:head )?trauma[^\n.,]{0,50})",
],
"MANNER_OF_DEATH": [
r"(?i)manner\s+of\s+death[:\s]*(homicide|suicide|accident(?:al)?|natural|undetermined|pending)",
],
"INJURY": [
r"(?i)(blunt\s+force\s+trauma[^\n.,]{0,80})",
r"(?i)(sharp\s+force\s+(?:injury|trauma|wound)[^\n.,]{0,60})",
r"(?i)(gunshot\s+wound[^\n.,]{0,60})",
r"(?i)(stab\s+wound[^\n.,]{0,60})",
r"(?i)(laceration[s]?[^\n.,]{0,60})",
r"(?i)(contusion[s]?\s+(?:on|of|to)\s+[^\n.,]{0,60})",
r"(?i)(abrasion[s]?[^\n.,]{0,60})",
r"(?i)(fracture[s]?[^\n.,]{0,60})",
r"(?i)(subdural\s+hematoma[^\n.,]{0,60})",
r"(?i)(defensive\s+wounds?[^\n.,]{0,80})",
r"(?i)(petechial\s+hemorrhages?[^\n.,]{0,60})",
r"(?i)(ligature\s+mark[^\n.,]{0,80})",
],
"TOXICOLOGY": [
r"(?i)(blood\s+alcohol[:\s]*\d+\.\d+\s*g/dL[^\n.,]*)",
r"(?i)(benzodiazepines?[:\s]*[^\n.,]{0,60})",
r"(?i)((?:cocaine|heroin|methamphetamine|fentanyl|morphine|diazepam)[^\n.,]{0,60})",
r"(?i)(no\s+illicit\s+substances?\s+detected)",
r"(?i)(trace\s+levels?\s*[-–]?\s*[^\n.,]{0,40})",
],
"TIME_INDICATOR": [
r"(?i)((?:approximately|about|estimated)\s+\d+[-–]\d+\s+hours?\s+(?:prior|before|after)[^\n.,]*)",
r"(?i)((?:March|April|May|June|July|August|September|October|November|December|January|February)\s+\d{1,2},?\s*\d{4}[,\s]*\d{1,2}:\d{2}\s*(?:AM|PM)?)",
r"(?i)(rigor\s+mortis\s+is\s+fully\s+developed[^\n.,]{0,40})",
r"(?i)(lividity\s+is\s+fixed[^\n.,]{0,40})",
r"(?i)(time\s+of\s+death\s+estimated[^\n.,]{0,80})",
],
"ANATOMICAL": [
r"(?i)((?:right|left)\s+temporal\s+region[^\n.,]{0,30})",
r"(?i)((?:right|left)\s+(?:forearm|shoulder|arm|leg)[^\n.,]{0,30})",
r"(?i)(conjunctivae\s+bilaterally)",
r"(?i)((?:right|left)\s+hemisphere[^\n.,]{0,30})",
r"(?i)(hyoid\s+bone)",
],
"MEDICAL_FINDING": [
r"(?i)(cerebral\s+edema[^\n.,]*)",
r"(?i)(lungs?\s+show\s+[^\n.,]+)",
r"(?i)((?:heart|liver|brain)[:\s]*\d+\s*g[^\n.,]{0,30})",
r"(?i)(stomach\s+contents[:\s]*[^\n.,]+)",
r"(?i)(mild\s+fatty\s+changes)",
r"(?i)(hyoid\s+bone\s+intact)",
],
"DEMOGRAPHIC": [
r"(?i)((?:well|poorly)[-\s]nourished\s+adult\s+(?:male|female)[^\n.,]{0,40})",
r"(?i)(approximately\s+\d+[-–]\d+\s+years)",
r"(?i)(weighing\s+approximately\s+\d+\s*kg)",
r"(?i)(approximately\s+\d['\u2019]\d+[\"″\u201d]?\s*tall)",
],
"LOCATION": [
r"(?i)(abandoned\s+warehouse[^\n.,]{0,40})",
r"(?i)(industrial\s+district[^\n.,]{0,30})",
r"(?i)(block\s+\d+)",
],
"EVIDENCE": [
r"(?i)(skin\s+under\s+fingernails\s+collected\s+for\s+DNA\s+analysis)",
r"(?i)(foreign\s+fibers?\s+recovered[^\n.,]{0,60})",
r"(?i)(fingerprints?\s+(?:collected|recovered|found|lifted)[^\n.,]{0,40})",
r"(?i)(synthetic,?\s*(?:blue|red|black|white)[^\n.,]{0,40})",
],
}
def extract_text_from_file(self, filepath: str) -> str:
"""Extract text from uploaded file."""
if filepath is None:
return ""
if filepath.endswith(".pdf"):
try:
import pdfplumber
with pdfplumber.open(filepath) as pdf:
return "\n".join(page.extract_text() or "" for page in pdf.pages)
except ImportError:
with open(filepath, "rb") as f:
content = f.read()
text = content.decode("utf-8", errors="ignore")
text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text)
return text
elif filepath.endswith(".txt"):
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
return "[Unsupported file format]"
def analyze(self, text: str) -> Dict[str, Any]:
"""Perform full NLP analysis on autopsy report text."""
entities = self._extract_entities(text)
highlighted = self._build_highlighted_text(text, entities)
entities_table = self._build_entities_table(entities)
summary = self._generate_summary(entities, text)
time_events = self._extract_time_events(entities)
return {
"entities": entities,
"highlighted_text": highlighted,
"entities_table": entities_table,
"summary": summary,
"summary_markdown": self._format_summary_markdown(summary),
"time_events": time_events,
}
def _extract_entities(self, text: str) -> List[ForensicEntity]:
"""Extract all forensic entities from text."""
entities = []
seen_spans = set()
for label, patterns in self.patterns.items():
for pattern in patterns:
for match in re.finditer(pattern, text):
if match.lastindex and match.lastindex >= 1:
entity_text = match.group(1).strip()
start = match.start(1)
end = match.end(1)
else:
entity_text = match.group(0).strip()
start = match.start(0)
end = match.end(0)
if len(entity_text) < 3:
continue
# Check overlap
overlapping = False
for es, ee in seen_spans:
if start < ee and end > es:
overlapping = True
break
if overlapping:
continue
seen_spans.add((start, end))
confidence = min(0.95, 0.75 + len(entity_text) * 0.002)
ctx_start = max(0, start - 30)
ctx_end = min(len(text), end + 30)
context = text[ctx_start:ctx_end].strip()
entities.append(ForensicEntity(
text=entity_text, label=label,
start=start, end=end,
confidence=confidence, context=context
))
entities.sort(key=lambda e: e.start)
return entities
def _build_highlighted_text(self, text: str, entities: List[ForensicEntity]) -> List[Tuple[str, str]]:
"""Build highlighted text for Gradio."""
if not entities:
return [(text, None)]
highlighted = []
last_end = 0
for entity in entities:
if entity.start > last_end:
highlighted.append((text[last_end:entity.start], None))
highlighted.append((text[entity.start:entity.end], entity.label))
last_end = entity.end
if last_end < len(text):
highlighted.append((text[last_end:], None))
return highlighted
def _build_entities_table(self, entities: List[ForensicEntity]) -> List[Dict]:
"""Build structured entities table."""
return [
{
"Entity": e.text[:100],
"Category": e.label,
"Confidence": round(e.confidence, 3),
"Context": e.context[:80]
}
for e in entities
]
def _generate_summary(self, entities: List[ForensicEntity], text: str) -> Dict:
"""Generate analysis summary."""
summary = {
"total_entities": len(entities),
"categories": {},
"cause_of_death": [],
"manner_of_death": None,
"injuries": [],
"toxicology_findings": [],
"key_time_indicators": [],
"evidence_collected": [],
}
for entity in entities:
summary["categories"][entity.label] = summary["categories"].get(entity.label, 0) + 1
if entity.label == "CAUSE_OF_DEATH":
summary["cause_of_death"].append(entity.text)
elif entity.label == "MANNER_OF_DEATH":
summary["manner_of_death"] = entity.text
elif entity.label == "INJURY":
summary["injuries"].append(entity.text)
elif entity.label == "TOXICOLOGY":
summary["toxicology_findings"].append(entity.text)
elif entity.label == "TIME_INDICATOR":
summary["key_time_indicators"].append(entity.text)
elif entity.label == "EVIDENCE":
summary["evidence_collected"].append(entity.text)
return summary
def _format_summary_markdown(self, summary: Dict) -> str:
"""Format summary as markdown."""
md = "## 📋 Autopsy Analysis Summary\n\n"
md += f"**Total Entities Extracted:** {summary['total_entities']}\n\n"
md += "### Entity Categories\n"
md += "| Category | Count |\n|----------|-------|\n"
for cat, count in sorted(summary["categories"].items(), key=lambda x: -x[1]):
md += f"| {cat} | {count} |\n"
md += "\n"
md += "### 🔑 Key Findings\n\n"
if summary["manner_of_death"]:
md += f"**Manner of Death:** `{summary['manner_of_death']}`\n\n"
if summary["cause_of_death"]:
md += "**Cause(s) of Death:**\n"
for cod in summary["cause_of_death"]:
md += f"- {cod}\n"
md += "\n"
if summary["injuries"]:
md += f"**Injuries ({len(summary['injuries'])}):**\n"
for inj in summary["injuries"][:10]:
md += f"- {inj}\n"
md += "\n"
if summary["toxicology_findings"]:
md += "**Toxicology:**\n"
for tox in summary["toxicology_findings"]:
md += f"- {tox}\n"
md += "\n"
if summary["evidence_collected"]:
md += "**Evidence Collected:**\n"
for ev in summary["evidence_collected"]:
md += f"- {ev}\n"
md += "\n"
if summary["key_time_indicators"]:
md += "**Time Indicators:**\n"
for ti in summary["key_time_indicators"]:
md += f"- {ti}\n"
return md
def _extract_time_events(self, entities: List[ForensicEntity]) -> List[Dict]:
"""Extract timeline events from entities."""
return [
{
"event": e.text,
"category": "Physical Evidence",
"source": "Autopsy Report",
"timestamp": e.text,
}
for e in entities if e.label == "TIME_INDICATOR"
]