""" Data Schema Definitions ======================= Defines the structure of data used throughout the Traffic Accident Reconstruction System. """ from dataclasses import dataclass, field from typing import List, Tuple, Optional, Dict, Any from enum import Enum from datetime import datetime # ============================================================ # ENUMERATIONS # ============================================================ class VehicleType(Enum): """Types of vehicles.""" SEDAN = "sedan" SUV = "suv" TRUCK = "truck" MOTORCYCLE = "motorcycle" BUS = "bus" class Direction(Enum): """Cardinal and intercardinal directions.""" NORTH = "north" NORTHEAST = "northeast" EAST = "east" SOUTHEAST = "southeast" SOUTH = "south" SOUTHWEST = "southwest" WEST = "west" NORTHWEST = "northwest" class VehicleAction(Enum): """Possible vehicle actions.""" GOING_STRAIGHT = "going_straight" TURNING_LEFT = "turning_left" TURNING_RIGHT = "turning_right" ENTERING_ROUNDABOUT = "entering_roundabout" EXITING_ROUNDABOUT = "exiting_roundabout" CHANGING_LANE_LEFT = "changing_lane_left" CHANGING_LANE_RIGHT = "changing_lane_right" SLOWING_DOWN = "slowing_down" ACCELERATING = "accelerating" STOPPED = "stopped" class WeatherCondition(Enum): """Weather conditions.""" CLEAR = "clear" CLOUDY = "cloudy" RAINY = "rainy" FOGGY = "foggy" SANDSTORM = "sandstorm" class RoadCondition(Enum): """Road surface conditions.""" DRY = "dry" WET = "wet" SANDY = "sandy" OILY = "oily" class AccidentType(Enum): """Types of traffic accidents.""" REAR_END_COLLISION = "rear_end_collision" SIDE_IMPACT = "side_impact" HEAD_ON_COLLISION = "head_on_collision" SIDESWIPE = "sideswipe" ROUNDABOUT_ENTRY_COLLISION = "roundabout_entry_collision" LANE_CHANGE_COLLISION = "lane_change_collision" INTERSECTION_COLLISION = "intersection_collision" class Severity(Enum): """Accident severity levels.""" MINOR = "minor" MODERATE = "moderate" SEVERE = "severe" class RoadType(Enum): """Types of roads.""" ROUNDABOUT = "roundabout" INTERSECTION = "intersection" HIGHWAY = "highway" URBAN_ROAD = "urban_road" # ============================================================ # DATA CLASSES # ============================================================ @dataclass class Location: """Geographic location data.""" name: str name_arabic: str = "" latitude: float = 0.0 longitude: float = 0.0 radius_meters: int = 150 city: str = "" country: str = "" road_type: RoadType = RoadType.ROUNDABOUT def to_dict(self) -> Dict[str, Any]: return { "name": self.name, "name_arabic": self.name_arabic, "latitude": self.latitude, "longitude": self.longitude, "radius_meters": self.radius_meters, "city": self.city, "country": self.country, "road_type": self.road_type.value } @dataclass class Conditions: """Environmental conditions at time of accident.""" weather: WeatherCondition = WeatherCondition.CLEAR road_condition: RoadCondition = RoadCondition.DRY visibility: float = 1.0 # 0.0 to 1.0 lighting: str = "daylight" # daylight, dusk, night, artificial def to_dict(self) -> Dict[str, Any]: return { "weather": self.weather.value, "road_condition": self.road_condition.value, "visibility": self.visibility, "lighting": self.lighting } @dataclass class VehicleData: """Data for a single vehicle.""" vehicle_type: VehicleType = VehicleType.SEDAN speed_kmh: float = 50.0 direction: Direction = Direction.NORTH action: VehicleAction = VehicleAction.GOING_STRAIGHT braking: bool = False signaling: bool = False lights_on: bool = True horn_used: bool = False path: List[Tuple[float, float]] = field(default_factory=list) description: str = "" @property def direction_angle(self) -> float: """Convert direction to angle in degrees.""" angles = { Direction.NORTH: 0, Direction.NORTHEAST: 45, Direction.EAST: 90, Direction.SOUTHEAST: 135, Direction.SOUTH: 180, Direction.SOUTHWEST: 225, Direction.WEST: 270, Direction.NORTHWEST: 315 } return angles.get(self.direction, 0) def to_dict(self) -> Dict[str, Any]: return { "type": self.vehicle_type.value, "speed_kmh": self.speed_kmh, "direction": self.direction.value, "direction_angle": self.direction_angle, "action": self.action.value, "braking": self.braking, "signaling": self.signaling, "lights_on": self.lights_on, "horn_used": self.horn_used, "path": self.path, "description": self.description } @dataclass class AccidentDetails: """Details about the accident.""" accident_type: AccidentType = AccidentType.SIDE_IMPACT severity: Severity = Severity.MODERATE collision_angle: float = 90.0 collision_point: Tuple[float, float] = (0.0, 0.0) contributing_factors: List[str] = field(default_factory=list) fault_vehicle: int = 0 # 0 = undetermined, 1 or 2 def to_dict(self) -> Dict[str, Any]: return { "type": self.accident_type.value, "severity": self.severity.value, "collision_angle": self.collision_angle, "collision_point": list(self.collision_point), "contributing_factors": self.contributing_factors, "fault_vehicle": self.fault_vehicle } @dataclass class ScenarioMetrics: """Metrics for an accident scenario.""" collision_probability: float = 0.5 path_overlap: float = 0.5 speed_differential: float = 0.0 time_to_collision: float = 0.0 impact_force_estimate: float = 0.0 def to_dict(self) -> Dict[str, Any]: return { "collision_probability": self.collision_probability, "path_overlap": self.path_overlap, "speed_differential": self.speed_differential, "time_to_collision": self.time_to_collision, "impact_force_estimate": self.impact_force_estimate } @dataclass class Scenario: """A single accident scenario.""" scenario_id: int = 0 accident_type: AccidentType = AccidentType.SIDE_IMPACT probability: float = 0.5 description: str = "" contributing_factors: List[str] = field(default_factory=list) metrics: ScenarioMetrics = field(default_factory=ScenarioMetrics) vehicle_1_path: List[Tuple[float, float]] = field(default_factory=list) vehicle_2_path: List[Tuple[float, float]] = field(default_factory=list) collision_point: Tuple[float, float] = (0.0, 0.0) def to_dict(self) -> Dict[str, Any]: return { "id": self.scenario_id, "accident_type": self.accident_type.value, "probability": self.probability, "description": self.description, "contributing_factors": self.contributing_factors, "metrics": self.metrics.to_dict(), "vehicle_1_path": self.vehicle_1_path, "vehicle_2_path": self.vehicle_2_path, "collision_point": list(self.collision_point) } @dataclass class FaultAssessment: """Preliminary fault assessment.""" vehicle_1_contribution: float = 50.0 # Percentage vehicle_2_contribution: float = 50.0 likely_at_fault: int = 0 # 0 = undetermined, 1 or 2 primary_factor: str = "" confidence: float = 0.5 def to_dict(self) -> Dict[str, Any]: return { "vehicle_1_contribution": self.vehicle_1_contribution, "vehicle_2_contribution": self.vehicle_2_contribution, "likely_at_fault": self.likely_at_fault, "primary_factor": self.primary_factor, "confidence": self.confidence } @dataclass class TimelineEvent: """An event in the accident timeline.""" time_offset: float = 0.0 # Seconds relative to collision (negative = before) event: str = "" def to_dict(self) -> Dict[str, Any]: return { "time": self.time_offset, "event": self.event } @dataclass class AccidentRecord: """Complete accident record.""" accident_id: str = "" timestamp: datetime = field(default_factory=datetime.now) location: Location = field(default_factory=Location) conditions: Conditions = field(default_factory=Conditions) vehicle_1: VehicleData = field(default_factory=VehicleData) vehicle_2: VehicleData = field(default_factory=VehicleData) accident_details: AccidentDetails = field(default_factory=AccidentDetails) def to_dict(self) -> Dict[str, Any]: return { "accident_id": self.accident_id, "timestamp": self.timestamp.isoformat(), "location": self.location.to_dict(), "conditions": self.conditions.to_dict(), "vehicle_1": self.vehicle_1.to_dict(), "vehicle_2": self.vehicle_2.to_dict(), "accident_details": self.accident_details.to_dict() } @dataclass class AnalysisResult: """Results from AI analysis.""" scenarios: List[Scenario] = field(default_factory=list) most_likely_scenario_id: int = 0 overall_collision_probability: float = 0.5 fault_assessment: FaultAssessment = field(default_factory=FaultAssessment) timeline: List[TimelineEvent] = field(default_factory=list) analysis_timestamp: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: return { "scenarios": [s.to_dict() for s in self.scenarios], "most_likely_scenario": { "id": self.most_likely_scenario_id, "probability": self.scenarios[self.most_likely_scenario_id - 1].probability if self.scenarios else 0 }, "overall_collision_probability": self.overall_collision_probability, "preliminary_fault_assessment": self.fault_assessment.to_dict(), "timeline": [e.to_dict() for e in self.timeline], "analysis_timestamp": self.analysis_timestamp.isoformat() } # ============================================================ # FEATURE VECTOR SCHEMA (for MindSpore) # ============================================================ FEATURE_SCHEMA = { "input_features": [ # Vehicle 1 features (7) "v1_type_encoded", # 0-4 normalized "v1_speed_normalized", # 0-1 (speed/200) "v1_direction_encoded", # 0-1 (direction/8) "v1_angle_normalized", # 0-1 (angle/360) "v1_action_encoded", # 0-1 (action/10) "v1_braking", # 0 or 1 "v1_signaling", # 0 or 1 # Vehicle 2 features (7) "v2_type_encoded", "v2_speed_normalized", "v2_direction_encoded", "v2_angle_normalized", "v2_action_encoded", "v2_braking", "v2_signaling", # Environmental features (3) "weather_encoded", # 0-1 (weather/5) "road_condition_encoded",# 0-1 (condition/4) "visibility", # 0-1 # Derived features (6) "collision_angle_normalized", # 0-1 (angle/180) "speed_differential", # 0-1 (diff/200) "combined_speed_normalized", # 0-1 (sum/400) "same_direction", # 0 or 1 "speed_product_normalized", # 0-1 "angle_difference_normalized" # -1 to 1 ], "total_input_features": 23, "output_labels": [ "rear_end_collision", "side_impact", "head_on_collision", "sideswipe", "roundabout_entry_collision", "lane_change_collision", "intersection_collision" ], "total_output_classes": 7 } # ============================================================ # VALIDATION FUNCTIONS # ============================================================ def validate_speed(speed: float, vehicle_type: VehicleType) -> bool: """Validate speed is within acceptable range for vehicle type.""" max_speeds = { VehicleType.SEDAN: 180, VehicleType.SUV: 160, VehicleType.TRUCK: 120, VehicleType.MOTORCYCLE: 200, VehicleType.BUS: 100 } return 0 <= speed <= max_speeds.get(vehicle_type, 200) def validate_coordinates(lat: float, lng: float) -> bool: """Validate geographic coordinates.""" return -90 <= lat <= 90 and -180 <= lng <= 180 def validate_path(path: List[Tuple[float, float]]) -> bool: """Validate vehicle path has at least 2 points.""" if len(path) < 2: return False return all(validate_coordinates(p[0], p[1]) for p in path) def validate_probability(prob: float) -> bool: """Validate probability is between 0 and 1.""" return 0 <= prob <= 1