TraceSceneFinal / backend /app /core /scene_analyzer.py
Siddharth Ravikumar
Deploy Chatbot, Animation, and Full TraceScene Application
efaef67
"""
AI Accident Analysis — Scene Analyzer
Orchestrates multi-image accident scene analysis.
Runs LFM inference on each photo and parses the structured output
into components (vehicles, road conditions, evidence, etc.).
"""
import json
import re
import time
from typing import List, Dict, Any, Optional
from PIL import Image
from pathlib import Path
from backend.app.core.inference import inference_engine
from backend.app.db.database import db
from backend.app.utils.logger import get_logger
logger = get_logger("scene_analyzer")
class SceneAnalyzer:
"""
Analyzes accident scene photos and extracts structured observations.
Processes each photo individually, then merges observations across images.
"""
async def analyze_case(self, case_id: int) -> Dict[str, Any]:
"""
Analyze all unanalyzed photos in a case.
Returns analysis results and updates the database.
"""
logger.info(f"Starting analysis for case {case_id}")
start = time.perf_counter()
# Get unanalyzed photos
unanalyzed = await db.get_unanalyzed_photos(case_id)
total = len(unanalyzed)
analyzed = 0
errors = 0
if total == 0:
logger.info(f"Case {case_id}: no unanalyzed photos found")
return {"total": 0, "analyzed": 0, "errors": 0, "elapsed_ms": 0}
await db.update_case_status(case_id, "analyzing")
all_analyses = []
for photo in unanalyzed:
try:
result = await self._analyze_single_photo(photo)
if result:
all_analyses.append(result)
analyzed += 1
else:
errors += 1
except Exception as e:
errors += 1
logger.error(f"Failed to analyze photo {photo['id']}: {e}")
# Identify parties from all analyses
parties = self._identify_parties(all_analyses)
for party in parties:
await db.add_party(
case_id=case_id,
label=party["label"],
vehicle_type=party.get("vehicle_type"),
vehicle_color=party.get("vehicle_color"),
vehicle_description=party.get("description"),
)
elapsed_ms = round((time.perf_counter() - start) * 1000, 2)
logger.info(
f"Case {case_id} analysis complete: "
f"{analyzed}/{total} photos analyzed, {errors} errors, "
f"{len(parties)} parties identified, {elapsed_ms}ms"
)
return {
"total": total,
"analyzed": analyzed,
"errors": errors,
"parties_found": len(parties),
"elapsed_ms": elapsed_ms,
}
async def _analyze_single_photo(self, photo: dict) -> Optional[Dict[str, Any]]:
"""Analyze a single photo and store results."""
photo_id = photo["id"]
filepath = photo["filepath"]
logger.info(f"Analyzing photo {photo_id}: {filepath}")
try:
image = Image.open(filepath).convert("RGB")
except Exception as e:
logger.error(f"Cannot open image {filepath}: {e}")
return None
try:
result = inference_engine.analyze_scene(image)
raw_analysis = result["raw_analysis"]
# Parse structured sections from the model response
parsed = self._parse_analysis(raw_analysis)
await db.add_scene_analysis(
photo_id=photo_id,
raw_analysis=raw_analysis,
vehicles_json=json.dumps(parsed.get("vehicles", [])),
road_conditions_json=json.dumps(parsed.get("road_conditions", {})),
evidence_json=json.dumps(parsed.get("evidence", {})),
environmental_json=json.dumps(parsed.get("environmental", {})),
positions_json=parsed.get("positions", ""),
model_id=result["model_id"],
inference_time_ms=result["inference_time_ms"],
)
logger.info(
f"Photo {photo_id} analyzed: {len(raw_analysis)} chars, "
f"{result['inference_time_ms']}ms, "
f"{len(parsed.get('vehicles', []))} vehicles detected"
)
return {
"photo_id": photo_id,
"raw_analysis": raw_analysis,
"parsed": parsed,
}
except Exception as e:
logger.error(f"Inference failed for photo {photo_id}: {e}")
return None
def _parse_analysis(self, raw_text: str) -> Dict[str, Any]:
"""
Parse the structured model response into components based on static UI format.
"""
result = {
"vehicles": [],
"road_conditions": {},
"evidence": {},
"environmental": {},
"positions": "",
}
sections = self._split_sections(raw_text)
# Parse vehicles section
if "vehicles" in sections:
result["vehicles"] = self._parse_vehicles(sections["vehicles"])
# Parse road conditions
if "road_conditions" in sections:
result["road_conditions"] = self._parse_key_values(sections["road_conditions"])
# Parse evidence
if "evidence" in sections:
result["evidence"] = self._parse_key_values(sections["evidence"])
# Parse environmental
if "environmental" in sections:
result["environmental"] = self._parse_key_values(sections["environmental"])
# Parse positions (keep as text)
if "positions" in sections:
result["positions"] = sections["positions"].strip()
return result
def _split_sections(self, text: str) -> Dict[str, str]:
"""Split the model response into named sections based on brackets."""
sections = {}
section_patterns = [
("vehicles", r"\[AI Observation\]\s*(.+?)(?=\[Condition Assessment\]|$)"),
("road_conditions", r"\[Condition Assessment\]\s*(.+?)(?=\[Damage Analysis|\[Summary\]|$)"),
("evidence", r"\[Damage Analysis.*?\]\s*(.+?)(?=\[Summary\]|$)"),
("positions", r"\[Summary\]\s*(.+?)$"),
]
for name, pattern in section_patterns:
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
if match:
sections[name] = match.group(1).strip()
return sections
def _parse_vehicles(self, text: str) -> List[Dict[str, str]]:
"""Parse vehicle descriptions from the AI Observation section."""
vehicles = []
# Look for "Vehicle 1 Make/Model: Year Make Model (Color Type)"
pattern = r"Vehicle\s+(\d+)\s+Make/Model:\s*(.+?)\s*(?:\((.+?)\))?"
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
label = match.group(1)
make_model = match.group(2).strip()
color_type = match.group(3)
color = ""
vtype = make_model
if color_type:
# E.g. "Blue Sedan" -> color="Blue", type="Sedan"
parts = color_type.strip().split()
if len(parts) > 1:
color = parts[0]
vtype = " ".join(parts[1:])
else:
vtype = parts[0]
vehicles.append({
"label": f"Vehicle {label}",
"description": make_model,
"color": color,
"type": vtype
})
return vehicles if vehicles else [{"description": text[:500]}]
def _parse_key_values(self, text: str) -> Dict[str, str]:
"""Parse key-value pairs from a section (bullet points or lines)."""
result = {}
lines = text.strip().split("\n")
for line in lines:
line = line.strip().lstrip("-*•").strip()
if ":" in line:
key, _, value = line.partition(":")
key = key.strip().lower().replace(" ", "_")
value = value.strip().strip(".,")
if key and value:
result[key] = value
elif line:
# Store as a numbered or generic entry
result[f"note_{len(result)}"] = line
return result
def _identify_parties(self, analyses: List[Dict[str, Any]]) -> List[Dict[str, str]]:
"""
Identify distinct parties (Vehicle A, Vehicle B, etc.)
from all photo analyses.
"""
parties = []
seen_types = set()
vehicle_counter = 0
labels = ["Vehicle A", "Vehicle B", "Vehicle C", "Vehicle D"]
for analysis in analyses:
parsed = analysis.get("parsed", {})
for vehicle in parsed.get("vehicles", []):
v_type = vehicle.get("type", "unknown").lower()
v_color = vehicle.get("color", "").lower()
v_key = f"{v_color}_{v_type}".strip("_")
# Simple dedup — don't add if we already have same color+type
if v_key and v_key in seen_types:
continue
if v_key:
seen_types.add(v_key)
if vehicle_counter < len(labels):
label = labels[vehicle_counter]
else:
label = f"Vehicle {vehicle_counter + 1}"
parties.append({
"label": label,
"vehicle_type": vehicle.get("type"),
"vehicle_color": vehicle.get("color"),
"description": vehicle.get("description", ""),
})
vehicle_counter += 1
# If no vehicles found at all, add two generic parties
if not parties:
parties = [
{"label": "Vehicle A", "vehicle_type": "unknown",
"vehicle_color": None, "description": ""},
{"label": "Vehicle B", "vehicle_type": "unknown",
"vehicle_color": None, "description": ""},
]
return parties
# Singleton
scene_analyzer = SceneAnalyzer()