Spaces:
Running on Zero
Running on Zero
| """ | |
| AI Accident Analysis β Database Layer | |
| SQLite database for accident cases, photos, scene analyses, | |
| traffic violations, parties, and fault analysis. | |
| Uses aiosqlite for async compatibility with FastAPI. | |
| """ | |
| import aiosqlite | |
| from pathlib import Path | |
| from typing import Optional, List, Dict, Any | |
| from backend.app.config import settings | |
| from backend.app.utils.logger import get_logger | |
| logger = get_logger("database") | |
| SCHEMA = """ | |
| CREATE TABLE IF NOT EXISTS cases ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| case_number TEXT UNIQUE NOT NULL, | |
| officer_name TEXT, | |
| location TEXT, | |
| incident_date TEXT, | |
| notes TEXT, | |
| status TEXT DEFAULT 'pending', | |
| created_at TEXT NOT NULL DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS photos ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| case_id INTEGER NOT NULL, | |
| filename TEXT NOT NULL, | |
| filepath TEXT NOT NULL, | |
| file_size INTEGER, | |
| width INTEGER, | |
| height INTEGER, | |
| photo_type TEXT DEFAULT 'general', | |
| uploaded_at TEXT NOT NULL DEFAULT (datetime('now')), | |
| FOREIGN KEY (case_id) REFERENCES cases(id) ON DELETE CASCADE | |
| ); | |
| CREATE TABLE IF NOT EXISTS scene_analyses ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| photo_id INTEGER NOT NULL UNIQUE, | |
| raw_analysis TEXT NOT NULL, | |
| vehicles_json TEXT, | |
| road_conditions_json TEXT, | |
| evidence_json TEXT, | |
| environmental_json TEXT, | |
| positions_json TEXT, | |
| model_id TEXT, | |
| inference_time_ms REAL, | |
| analyzed_at TEXT NOT NULL DEFAULT (datetime('now')), | |
| FOREIGN KEY (photo_id) REFERENCES photos(id) ON DELETE CASCADE | |
| ); | |
| CREATE TABLE IF NOT EXISTS parties ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| case_id INTEGER NOT NULL, | |
| label TEXT NOT NULL, | |
| vehicle_type TEXT, | |
| vehicle_color TEXT, | |
| vehicle_description TEXT, | |
| FOREIGN KEY (case_id) REFERENCES cases(id) ON DELETE CASCADE | |
| ); | |
| CREATE TABLE IF NOT EXISTS violations ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| case_id INTEGER NOT NULL, | |
| party_id INTEGER, | |
| rule_id TEXT NOT NULL, | |
| rule_title TEXT NOT NULL, | |
| rule_category TEXT, | |
| severity TEXT, | |
| confidence REAL NOT NULL, | |
| evidence_summary TEXT, | |
| photo_id INTEGER, | |
| FOREIGN KEY (case_id) REFERENCES cases(id) ON DELETE CASCADE, | |
| FOREIGN KEY (party_id) REFERENCES parties(id), | |
| FOREIGN KEY (photo_id) REFERENCES photos(id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS fault_analyses ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| case_id INTEGER NOT NULL UNIQUE, | |
| primary_fault_party_id INTEGER, | |
| fault_distribution_json TEXT, | |
| probable_cause TEXT, | |
| overall_confidence REAL, | |
| analysis_summary TEXT, | |
| generated_at TEXT NOT NULL DEFAULT (datetime('now')), | |
| FOREIGN KEY (case_id) REFERENCES cases(id) ON DELETE CASCADE, | |
| FOREIGN KEY (primary_fault_party_id) REFERENCES parties(id) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_photos_case ON photos(case_id); | |
| CREATE INDEX IF NOT EXISTS idx_analyses_photo ON scene_analyses(photo_id); | |
| CREATE INDEX IF NOT EXISTS idx_violations_case ON violations(case_id); | |
| CREATE INDEX IF NOT EXISTS idx_violations_party ON violations(party_id); | |
| CREATE INDEX IF NOT EXISTS idx_parties_case ON parties(case_id); | |
| """ | |
| class Database: | |
| """Async SQLite database wrapper.""" | |
| def __init__(self): | |
| self.db_path = str(settings.db_path) | |
| self._connection: Optional[aiosqlite.Connection] = None | |
| async def connect(self): | |
| """Initialize database connection and create schema.""" | |
| logger.info(f"Connecting to database: {self.db_path}") | |
| Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) | |
| self._connection = await aiosqlite.connect(self.db_path) | |
| self._connection.row_factory = aiosqlite.Row | |
| await self._connection.execute("PRAGMA journal_mode=WAL") | |
| await self._connection.execute("PRAGMA foreign_keys=ON") | |
| for statement in SCHEMA.strip().split(";"): | |
| stmt = statement.strip() | |
| if stmt: | |
| await self._connection.execute(stmt) | |
| await self._connection.commit() | |
| logger.info("Database schema initialized") | |
| async def disconnect(self): | |
| """Close database connection.""" | |
| if self._connection: | |
| await self._connection.close() | |
| def conn(self) -> aiosqlite.Connection: | |
| if not self._connection: | |
| raise RuntimeError("Database not connected. Call connect() first.") | |
| return self._connection | |
| # ββ Cases ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def create_case(self, case_number: str, officer_name: str = None, | |
| location: str = None, incident_date: str = None, | |
| notes: str = None) -> int: | |
| cursor = await self.conn.execute( | |
| """INSERT INTO cases (case_number, officer_name, location, incident_date, notes) | |
| VALUES (?, ?, ?, ?, ?)""", | |
| (case_number, officer_name, location, incident_date, notes), | |
| ) | |
| await self.conn.commit() | |
| return cursor.lastrowid | |
| async def get_case(self, case_id: int) -> Optional[dict]: | |
| cursor = await self.conn.execute("SELECT * FROM cases WHERE id = ?", (case_id,)) | |
| row = await cursor.fetchone() | |
| return dict(row) if row else None | |
| async def list_cases(self) -> List[dict]: | |
| cursor = await self.conn.execute( | |
| "SELECT * FROM cases ORDER BY created_at DESC" | |
| ) | |
| rows = await cursor.fetchall() | |
| return [dict(r) for r in rows] | |
| async def update_case_status(self, case_id: int, status: str): | |
| await self.conn.execute( | |
| "UPDATE cases SET status = ? WHERE id = ?", (status, case_id) | |
| ) | |
| await self.conn.commit() | |
| async def delete_case(self, case_id: int): | |
| await self.conn.execute("DELETE FROM cases WHERE id = ?", (case_id,)) | |
| await self.conn.commit() | |
| async def update_case(self, case_id: int, officer_name: str = None, | |
| location: str = None, incident_date: str = None, | |
| notes: str = None) -> bool: | |
| """Update case details.""" | |
| updates = [] | |
| params = [] | |
| if officer_name is not None: | |
| updates.append("officer_name = ?") | |
| params.append(officer_name) | |
| if location is not None: | |
| updates.append("location = ?") | |
| params.append(location) | |
| if incident_date is not None: | |
| updates.append("incident_date = ?") | |
| params.append(incident_date) | |
| if notes is not None: | |
| updates.append("notes = ?") | |
| params.append(notes) | |
| if not updates: | |
| return False | |
| params.append(case_id) | |
| query = f"UPDATE cases SET {', '.join(updates)} WHERE id = ?" | |
| await self.conn.execute(query, tuple(params)) | |
| await self.conn.commit() | |
| return True | |
| # ββ Photos βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def add_photo(self, case_id: int, filename: str, filepath: str, | |
| file_size: int, width: int = None, height: int = None, | |
| photo_type: str = "general") -> int: | |
| cursor = await self.conn.execute( | |
| """INSERT INTO photos (case_id, filename, filepath, file_size, width, height, photo_type) | |
| VALUES (?, ?, ?, ?, ?, ?, ?)""", | |
| (case_id, filename, filepath, file_size, width, height, photo_type), | |
| ) | |
| await self.conn.commit() | |
| return cursor.lastrowid | |
| async def get_photos_by_case(self, case_id: int) -> List[dict]: | |
| cursor = await self.conn.execute( | |
| "SELECT * FROM photos WHERE case_id = ? ORDER BY uploaded_at", (case_id,) | |
| ) | |
| rows = await cursor.fetchall() | |
| return [dict(r) for r in rows] | |
| async def get_photo(self, photo_id: int) -> Optional[dict]: | |
| cursor = await self.conn.execute("SELECT * FROM photos WHERE id = ?", (photo_id,)) | |
| row = await cursor.fetchone() | |
| return dict(row) if row else None | |
| async def get_unanalyzed_photos(self, case_id: int) -> List[dict]: | |
| cursor = await self.conn.execute( | |
| """SELECT p.* FROM photos p | |
| LEFT JOIN scene_analyses sa ON p.id = sa.photo_id | |
| WHERE p.case_id = ? AND sa.id IS NULL""", | |
| (case_id,), | |
| ) | |
| rows = await cursor.fetchall() | |
| return [dict(r) for r in rows] | |
| # ββ Scene Analyses βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def add_scene_analysis(self, photo_id: int, raw_analysis: str, | |
| vehicles_json: str = None, | |
| road_conditions_json: str = None, | |
| evidence_json: str = None, | |
| environmental_json: str = None, | |
| positions_json: str = None, | |
| model_id: str = None, | |
| inference_time_ms: float = None) -> int: | |
| cursor = await self.conn.execute( | |
| """INSERT INTO scene_analyses | |
| (photo_id, raw_analysis, vehicles_json, road_conditions_json, | |
| evidence_json, environmental_json, positions_json, model_id, inference_time_ms) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", | |
| (photo_id, raw_analysis, vehicles_json, road_conditions_json, | |
| evidence_json, environmental_json, positions_json, model_id, inference_time_ms), | |
| ) | |
| await self.conn.commit() | |
| return cursor.lastrowid | |
| async def get_analyses_by_case(self, case_id: int) -> List[dict]: | |
| cursor = await self.conn.execute( | |
| """SELECT sa.*, p.filename, p.filepath, p.photo_type | |
| FROM scene_analyses sa | |
| JOIN photos p ON sa.photo_id = p.id | |
| WHERE p.case_id = ?""", | |
| (case_id,), | |
| ) | |
| rows = await cursor.fetchall() | |
| return [dict(r) for r in rows] | |
| # ββ Parties ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def add_party(self, case_id: int, label: str, | |
| vehicle_type: str = None, vehicle_color: str = None, | |
| vehicle_description: str = None) -> int: | |
| cursor = await self.conn.execute( | |
| """INSERT INTO parties (case_id, label, vehicle_type, vehicle_color, vehicle_description) | |
| VALUES (?, ?, ?, ?, ?)""", | |
| (case_id, label, vehicle_type, vehicle_color, vehicle_description), | |
| ) | |
| await self.conn.commit() | |
| return cursor.lastrowid | |
| async def get_parties_by_case(self, case_id: int) -> List[dict]: | |
| cursor = await self.conn.execute( | |
| "SELECT * FROM parties WHERE case_id = ?", (case_id,) | |
| ) | |
| rows = await cursor.fetchall() | |
| return [dict(r) for r in rows] | |
| async def clear_parties(self, case_id: int): | |
| await self.conn.execute("DELETE FROM parties WHERE case_id = ?", (case_id,)) | |
| await self.conn.commit() | |
| # ββ Violations βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def add_violation(self, case_id: int, rule_id: str, rule_title: str, | |
| confidence: float, party_id: int = None, | |
| rule_category: str = None, severity: str = None, | |
| evidence_summary: str = None, | |
| photo_id: int = None) -> int: | |
| cursor = await self.conn.execute( | |
| """INSERT INTO violations | |
| (case_id, party_id, rule_id, rule_title, rule_category, severity, | |
| confidence, evidence_summary, photo_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", | |
| (case_id, party_id, rule_id, rule_title, rule_category, severity, | |
| confidence, evidence_summary, photo_id), | |
| ) | |
| await self.conn.commit() | |
| return cursor.lastrowid | |
| async def get_violations_by_case(self, case_id: int) -> List[dict]: | |
| cursor = await self.conn.execute( | |
| """SELECT v.*, p.label as party_label | |
| FROM violations v | |
| LEFT JOIN parties p ON v.party_id = p.id | |
| WHERE v.case_id = ? | |
| ORDER BY v.confidence DESC""", | |
| (case_id,), | |
| ) | |
| rows = await cursor.fetchall() | |
| return [dict(r) for r in rows] | |
| async def clear_violations(self, case_id: int): | |
| await self.conn.execute("DELETE FROM violations WHERE case_id = ?", (case_id,)) | |
| await self.conn.commit() | |
| # ββ Fault Analysis βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def save_fault_analysis(self, case_id: int, | |
| primary_fault_party_id: int = None, | |
| fault_distribution_json: str = None, | |
| probable_cause: str = None, | |
| overall_confidence: float = None, | |
| analysis_summary: str = None) -> int: | |
| # Upsert β replace if already exists | |
| await self.conn.execute( | |
| "DELETE FROM fault_analyses WHERE case_id = ?", (case_id,) | |
| ) | |
| cursor = await self.conn.execute( | |
| """INSERT INTO fault_analyses | |
| (case_id, primary_fault_party_id, fault_distribution_json, | |
| probable_cause, overall_confidence, analysis_summary) | |
| VALUES (?, ?, ?, ?, ?, ?)""", | |
| (case_id, primary_fault_party_id, fault_distribution_json, | |
| probable_cause, overall_confidence, analysis_summary), | |
| ) | |
| await self.conn.commit() | |
| return cursor.lastrowid | |
| async def get_fault_analysis(self, case_id: int) -> Optional[dict]: | |
| cursor = await self.conn.execute( | |
| """SELECT fa.*, p.label as fault_party_label | |
| FROM fault_analyses fa | |
| LEFT JOIN parties p ON fa.primary_fault_party_id = p.id | |
| WHERE fa.case_id = ?""", | |
| (case_id,), | |
| ) | |
| row = await cursor.fetchone() | |
| return dict(row) if row else None | |
| # Singleton database instance | |
| db = Database() | |