Spaces:
Running on Zero
Running on Zero
| """ | |
| AI Accident Analysis — Scene Analyzer | |
| Orchestrates multi-image accident scene analysis. | |
| Runs LFM inference on each photo and parses the structured output | |
| into components (vehicles, road conditions, evidence, etc.). | |
| """ | |
| import json | |
| import re | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| from PIL import Image | |
| from pathlib import Path | |
| from backend.app.core.inference import inference_engine | |
| from backend.app.db.database import db | |
| from backend.app.utils.logger import get_logger | |
| logger = get_logger("scene_analyzer") | |
| class SceneAnalyzer: | |
| """ | |
| Analyzes accident scene photos and extracts structured observations. | |
| Processes each photo individually, then merges observations across images. | |
| """ | |
| async def analyze_case(self, case_id: int) -> Dict[str, Any]: | |
| """ | |
| Analyze all unanalyzed photos in a case. | |
| Returns analysis results and updates the database. | |
| """ | |
| logger.info(f"Starting analysis for case {case_id}") | |
| start = time.perf_counter() | |
| # Get unanalyzed photos | |
| unanalyzed = await db.get_unanalyzed_photos(case_id) | |
| total = len(unanalyzed) | |
| analyzed = 0 | |
| errors = 0 | |
| if total == 0: | |
| logger.info(f"Case {case_id}: no unanalyzed photos found") | |
| return {"total": 0, "analyzed": 0, "errors": 0, "elapsed_ms": 0} | |
| await db.update_case_status(case_id, "analyzing") | |
| all_analyses = [] | |
| for photo in unanalyzed: | |
| try: | |
| result = await self._analyze_single_photo(photo) | |
| if result: | |
| all_analyses.append(result) | |
| analyzed += 1 | |
| else: | |
| errors += 1 | |
| except Exception as e: | |
| errors += 1 | |
| logger.error(f"Failed to analyze photo {photo['id']}: {e}") | |
| # Identify parties from all analyses | |
| parties = self._identify_parties(all_analyses) | |
| for party in parties: | |
| await db.add_party( | |
| case_id=case_id, | |
| label=party["label"], | |
| vehicle_type=party.get("vehicle_type"), | |
| vehicle_color=party.get("vehicle_color"), | |
| vehicle_description=party.get("description"), | |
| ) | |
| elapsed_ms = round((time.perf_counter() - start) * 1000, 2) | |
| logger.info( | |
| f"Case {case_id} analysis complete: " | |
| f"{analyzed}/{total} photos analyzed, {errors} errors, " | |
| f"{len(parties)} parties identified, {elapsed_ms}ms" | |
| ) | |
| return { | |
| "total": total, | |
| "analyzed": analyzed, | |
| "errors": errors, | |
| "parties_found": len(parties), | |
| "elapsed_ms": elapsed_ms, | |
| } | |
| async def _analyze_single_photo(self, photo: dict) -> Optional[Dict[str, Any]]: | |
| """Analyze a single photo and store results.""" | |
| photo_id = photo["id"] | |
| filepath = photo["filepath"] | |
| logger.info(f"Analyzing photo {photo_id}: {filepath}") | |
| try: | |
| image = Image.open(filepath).convert("RGB") | |
| except Exception as e: | |
| logger.error(f"Cannot open image {filepath}: {e}") | |
| return None | |
| try: | |
| result = inference_engine.analyze_scene(image) | |
| raw_analysis = result["raw_analysis"] | |
| # Parse structured sections from the model response | |
| parsed = self._parse_analysis(raw_analysis) | |
| await db.add_scene_analysis( | |
| photo_id=photo_id, | |
| raw_analysis=raw_analysis, | |
| vehicles_json=json.dumps(parsed.get("vehicles", [])), | |
| road_conditions_json=json.dumps(parsed.get("road_conditions", {})), | |
| evidence_json=json.dumps(parsed.get("evidence", {})), | |
| environmental_json=json.dumps(parsed.get("environmental", {})), | |
| positions_json=parsed.get("positions", ""), | |
| model_id=result["model_id"], | |
| inference_time_ms=result["inference_time_ms"], | |
| ) | |
| logger.info( | |
| f"Photo {photo_id} analyzed: {len(raw_analysis)} chars, " | |
| f"{result['inference_time_ms']}ms, " | |
| f"{len(parsed.get('vehicles', []))} vehicles detected" | |
| ) | |
| return { | |
| "photo_id": photo_id, | |
| "raw_analysis": raw_analysis, | |
| "parsed": parsed, | |
| } | |
| except Exception as e: | |
| logger.error(f"Inference failed for photo {photo_id}: {e}") | |
| return None | |
| def _parse_analysis(self, raw_text: str) -> Dict[str, Any]: | |
| """ | |
| Parse the structured model response into components based on static UI format. | |
| """ | |
| result = { | |
| "vehicles": [], | |
| "road_conditions": {}, | |
| "evidence": {}, | |
| "environmental": {}, | |
| "positions": "", | |
| } | |
| sections = self._split_sections(raw_text) | |
| # Parse vehicles section | |
| if "vehicles" in sections: | |
| result["vehicles"] = self._parse_vehicles(sections["vehicles"]) | |
| # Parse road conditions | |
| if "road_conditions" in sections: | |
| result["road_conditions"] = self._parse_key_values(sections["road_conditions"]) | |
| # Parse evidence | |
| if "evidence" in sections: | |
| result["evidence"] = self._parse_key_values(sections["evidence"]) | |
| # Parse environmental | |
| if "environmental" in sections: | |
| result["environmental"] = self._parse_key_values(sections["environmental"]) | |
| # Parse positions (keep as text) | |
| if "positions" in sections: | |
| result["positions"] = sections["positions"].strip() | |
| return result | |
| def _split_sections(self, text: str) -> Dict[str, str]: | |
| """Split the model response into named sections based on brackets.""" | |
| sections = {} | |
| section_patterns = [ | |
| ("vehicles", r"\[AI Observation\]\s*(.+?)(?=\[Condition Assessment\]|$)"), | |
| ("road_conditions", r"\[Condition Assessment\]\s*(.+?)(?=\[Damage Analysis|\[Summary\]|$)"), | |
| ("evidence", r"\[Damage Analysis.*?\]\s*(.+?)(?=\[Summary\]|$)"), | |
| ("positions", r"\[Summary\]\s*(.+?)$"), | |
| ] | |
| for name, pattern in section_patterns: | |
| match = re.search(pattern, text, re.IGNORECASE | re.DOTALL) | |
| if match: | |
| sections[name] = match.group(1).strip() | |
| return sections | |
| def _parse_vehicles(self, text: str) -> List[Dict[str, str]]: | |
| """Parse vehicle descriptions from the AI Observation section.""" | |
| vehicles = [] | |
| # Look for "Vehicle 1 Make/Model: Year Make Model (Color Type)" | |
| pattern = r"Vehicle\s+(\d+)\s+Make/Model:\s*(.+?)\s*(?:\((.+?)\))?" | |
| matches = re.finditer(pattern, text, re.IGNORECASE) | |
| for match in matches: | |
| label = match.group(1) | |
| make_model = match.group(2).strip() | |
| color_type = match.group(3) | |
| color = "" | |
| vtype = make_model | |
| if color_type: | |
| # E.g. "Blue Sedan" -> color="Blue", type="Sedan" | |
| parts = color_type.strip().split() | |
| if len(parts) > 1: | |
| color = parts[0] | |
| vtype = " ".join(parts[1:]) | |
| else: | |
| vtype = parts[0] | |
| vehicles.append({ | |
| "label": f"Vehicle {label}", | |
| "description": make_model, | |
| "color": color, | |
| "type": vtype | |
| }) | |
| return vehicles if vehicles else [{"description": text[:500]}] | |
| def _parse_key_values(self, text: str) -> Dict[str, str]: | |
| """Parse key-value pairs from a section (bullet points or lines).""" | |
| result = {} | |
| lines = text.strip().split("\n") | |
| for line in lines: | |
| line = line.strip().lstrip("-*•").strip() | |
| if ":" in line: | |
| key, _, value = line.partition(":") | |
| key = key.strip().lower().replace(" ", "_") | |
| value = value.strip().strip(".,") | |
| if key and value: | |
| result[key] = value | |
| elif line: | |
| # Store as a numbered or generic entry | |
| result[f"note_{len(result)}"] = line | |
| return result | |
| def _identify_parties(self, analyses: List[Dict[str, Any]]) -> List[Dict[str, str]]: | |
| """ | |
| Identify distinct parties (Vehicle A, Vehicle B, etc.) | |
| from all photo analyses. | |
| """ | |
| parties = [] | |
| seen_types = set() | |
| vehicle_counter = 0 | |
| labels = ["Vehicle A", "Vehicle B", "Vehicle C", "Vehicle D"] | |
| for analysis in analyses: | |
| parsed = analysis.get("parsed", {}) | |
| for vehicle in parsed.get("vehicles", []): | |
| v_type = vehicle.get("type", "unknown").lower() | |
| v_color = vehicle.get("color", "").lower() | |
| v_key = f"{v_color}_{v_type}".strip("_") | |
| # Simple dedup — don't add if we already have same color+type | |
| if v_key and v_key in seen_types: | |
| continue | |
| if v_key: | |
| seen_types.add(v_key) | |
| if vehicle_counter < len(labels): | |
| label = labels[vehicle_counter] | |
| else: | |
| label = f"Vehicle {vehicle_counter + 1}" | |
| parties.append({ | |
| "label": label, | |
| "vehicle_type": vehicle.get("type"), | |
| "vehicle_color": vehicle.get("color"), | |
| "description": vehicle.get("description", ""), | |
| }) | |
| vehicle_counter += 1 | |
| # If no vehicles found at all, add two generic parties | |
| if not parties: | |
| parties = [ | |
| {"label": "Vehicle A", "vehicle_type": "unknown", | |
| "vehicle_color": None, "description": ""}, | |
| {"label": "Vehicle B", "vehicle_type": "unknown", | |
| "vehicle_color": None, "description": ""}, | |
| ] | |
| return parties | |
| # Singleton | |
| scene_analyzer = SceneAnalyzer() | |