| """ |
| Synthetic Accident Dataset Generator |
| ===================================== |
| Generates realistic synthetic traffic accident data for training |
| the MindSpore AI model. |
| |
| This dataset simulates various accident scenarios at roundabouts |
| with different vehicle types, speeds, directions, and conditions. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| import json |
| import random |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from typing import Dict, List, Tuple, Any |
|
|
| import sys |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| from config import ( |
| CASE_STUDY_LOCATION, |
| VEHICLE_TYPES, |
| ACCIDENT_TYPES, |
| CONTRIBUTING_FACTORS, |
| ROAD_TYPES, |
| DATA_DIR, |
| PROCESSED_DATA_DIR |
| ) |
|
|
|
|
| |
| |
| |
|
|
| |
| DIRECTIONS = { |
| 'north': 0, |
| 'northeast': 45, |
| 'east': 90, |
| 'southeast': 135, |
| 'south': 180, |
| 'southwest': 225, |
| 'west': 270, |
| 'northwest': 315 |
| } |
|
|
| |
| VEHICLE_ACTIONS = [ |
| 'going_straight', |
| 'turning_left', |
| 'turning_right', |
| 'entering_roundabout', |
| 'exiting_roundabout', |
| 'changing_lane_left', |
| 'changing_lane_right', |
| 'slowing_down', |
| 'accelerating', |
| 'stopped' |
| ] |
|
|
| |
| WEATHER_CONDITIONS = { |
| 'clear': 0.55, |
| 'cloudy': 0.20, |
| 'rainy': 0.12, |
| 'foggy': 0.07, |
| 'sandstorm': 0.06 |
| } |
|
|
| |
| ROAD_CONDITIONS = { |
| 'dry': 0.65, |
| 'wet': 0.18, |
| 'sandy': 0.12, |
| 'oily': 0.05 |
| } |
|
|
| |
| ROAD_TYPE_WEIGHTS = { |
| 'roundabout': 0.30, |
| 'crossroad': 0.25, |
| 't_junction': 0.15, |
| 'highway_merge': 0.10, |
| 'parking': 0.05, |
| 'highway': 0.08, |
| 'urban_road': 0.05, |
| 'other': 0.02 |
| } |
|
|
| |
| TIME_DISTRIBUTION = { |
| 'morning_rush': (7, 9, 0.25), |
| 'midday': (10, 15, 0.20), |
| 'evening_rush': (16, 19, 0.30), |
| 'night': (20, 23, 0.15), |
| 'late_night': (0, 6, 0.10) |
| } |
|
|
| |
| LIGHTING_CONDITIONS = ['daylight', 'dusk', 'dawn', 'night_lit', 'night_dark'] |
|
|
|
|
| |
| |
| |
|
|
| ACCIDENT_SCHEMA = { |
| "accident_id": "string", |
| "timestamp": "datetime", |
| "location": { |
| "name": "string", |
| "latitude": "float", |
| "longitude": "float", |
| "road_type": "string" |
| }, |
| "conditions": { |
| "weather": "string", |
| "road_condition": "string", |
| "visibility": "float", |
| "lighting": "string" |
| }, |
| "vehicle_1": { |
| "type": "string", |
| "speed_kmh": "float", |
| "direction": "string", |
| "direction_angle": "float", |
| "action": "string", |
| "braking": "boolean", |
| "signaling": "boolean", |
| "path": "list[tuple]" |
| }, |
| "vehicle_2": { |
| "type": "string", |
| "speed_kmh": "float", |
| "direction": "string", |
| "direction_angle": "float", |
| "action": "string", |
| "braking": "boolean", |
| "signaling": "boolean", |
| "path": "list[tuple]" |
| }, |
| "accident_details": { |
| "type": "string", |
| "severity": "string", |
| "collision_angle": "float", |
| "collision_point": "tuple", |
| "contributing_factors": "list[string]", |
| "fault_vehicle": "int" |
| }, |
| "outcomes": { |
| "scenario_probability": "float", |
| "damage_estimate": "string", |
| "injuries": "boolean" |
| } |
| } |
|
|
|
|
| |
| |
| |
|
|
| def generate_accident_id() -> str: |
| """Generate unique accident ID.""" |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| random_suffix = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=4)) |
| return f"ACC-{timestamp}-{random_suffix}" |
|
|
|
|
| def generate_timestamp() -> datetime: |
| """Generate realistic accident timestamp based on distribution.""" |
| |
| period = random.choices( |
| list(TIME_DISTRIBUTION.keys()), |
| weights=[v[2] for v in TIME_DISTRIBUTION.values()] |
| )[0] |
| |
| start_hour, end_hour, _ = TIME_DISTRIBUTION[period] |
| |
| |
| days_ago = random.randint(0, 365) |
| base_date = datetime.now() - timedelta(days=days_ago) |
| |
| |
| hour = random.randint(start_hour, end_hour) |
| minute = random.randint(0, 59) |
| |
| return base_date.replace(hour=hour, minute=minute, second=0, microsecond=0) |
|
|
|
|
| def select_weather() -> Tuple[str, float]: |
| """Select weather condition and corresponding visibility.""" |
| weather = random.choices( |
| list(WEATHER_CONDITIONS.keys()), |
| weights=list(WEATHER_CONDITIONS.values()) |
| )[0] |
| |
| visibility_map = { |
| 'clear': random.uniform(0.9, 1.0), |
| 'cloudy': random.uniform(0.8, 0.95), |
| 'rainy': random.uniform(0.5, 0.8), |
| 'foggy': random.uniform(0.2, 0.5), |
| 'sandstorm': random.uniform(0.1, 0.4) |
| } |
| |
| return weather, visibility_map[weather] |
|
|
|
|
| def select_road_condition(weather: str) -> str: |
| """Select road condition based on weather.""" |
| if weather == 'rainy': |
| return 'wet' |
| elif weather == 'sandstorm': |
| return random.choice(['sandy', 'dry']) |
| else: |
| return random.choices( |
| list(ROAD_CONDITIONS.keys()), |
| weights=list(ROAD_CONDITIONS.values()) |
| )[0] |
|
|
|
|
| def generate_vehicle_data(vehicle_num: int, accident_type: str, road_type: str = 'roundabout') -> Dict: |
| """Generate realistic vehicle data based on accident type and road type.""" |
| |
| |
| vehicle_type = random.choices( |
| list(VEHICLE_TYPES.keys()), |
| weights=[0.50, 0.30, 0.10, 0.05, 0.05] |
| )[0] |
| |
| specs = VEHICLE_TYPES[vehicle_type] |
| |
| |
| speed_modifier = { |
| 'roundabout': 0.6, |
| 'crossroad': 0.7, |
| 't_junction': 0.65, |
| 'highway_merge': 0.9, |
| 'parking': 0.2, |
| 'highway': 1.0, |
| 'urban_road': 0.5, |
| 'other': 0.6 |
| }.get(road_type, 0.6) |
| |
| if accident_type == 'rear_end_collision': |
| if vehicle_num == 1: |
| speed = random.uniform(20, 50) * speed_modifier |
| else: |
| speed = random.uniform(40, 80) * speed_modifier |
| elif accident_type == 'head_on_collision': |
| speed = random.uniform(50, 100) * speed_modifier |
| elif accident_type in ['roundabout_entry_collision', 'intersection_collision']: |
| speed = random.uniform(30, 60) * speed_modifier |
| else: |
| speed = random.uniform(30, specs['max_speed'] * 0.7) * speed_modifier |
| |
| |
| speed = min(speed, specs['max_speed']) |
| |
| |
| direction = random.choice(list(DIRECTIONS.keys())) |
| |
| |
| if road_type == 'roundabout': |
| if accident_type == 'roundabout_entry_collision': |
| action = random.choice(['entering_roundabout', 'going_straight']) |
| else: |
| action = random.choice(['entering_roundabout', 'exiting_roundabout', 'going_straight']) |
| elif road_type in ['crossroad', 't_junction']: |
| action = random.choice(['going_straight', 'turning_left', 'turning_right', 'stopped']) |
| elif road_type == 'highway_merge': |
| action = random.choice(['going_straight', 'changing_lane_left', 'changing_lane_right', 'accelerating']) |
| elif road_type == 'parking': |
| action = random.choice(['slowing_down', 'stopped', 'going_straight']) |
| elif road_type == 'highway': |
| action = random.choice(['going_straight', 'changing_lane_left', 'changing_lane_right']) |
| else: |
| if accident_type == 'lane_change_collision': |
| action = random.choice(['changing_lane_left', 'changing_lane_right']) |
| elif accident_type == 'rear_end_collision': |
| action = 'going_straight' if vehicle_num == 2 else random.choice(['slowing_down', 'stopped']) |
| else: |
| action = random.choice(VEHICLE_ACTIONS) |
| |
| |
| braking = random.random() < 0.4 |
| signaling = random.random() < 0.3 |
| |
| |
| path = generate_vehicle_path(direction, accident_type) |
| |
| return { |
| 'type': vehicle_type, |
| 'speed_kmh': round(speed, 1), |
| 'direction': direction, |
| 'direction_angle': DIRECTIONS[direction], |
| 'action': action, |
| 'braking': braking, |
| 'signaling': signaling, |
| 'path': path |
| } |
|
|
|
|
| def generate_vehicle_path(direction: str, accident_type: str) -> List[List[float]]: |
| """Generate a realistic vehicle path for the roundabout.""" |
| |
| base_lat = CASE_STUDY_LOCATION['latitude'] |
| base_lng = CASE_STUDY_LOCATION['longitude'] |
| |
| |
| direction_offsets = { |
| 'north': (0.002, 0), |
| 'south': (-0.002, 0), |
| 'east': (0, 0.002), |
| 'west': (0, -0.002), |
| 'northeast': (0.0015, 0.0015), |
| 'northwest': (0.0015, -0.0015), |
| 'southeast': (-0.0015, 0.0015), |
| 'southwest': (-0.0015, -0.0015) |
| } |
| |
| offset = direction_offsets.get(direction, (0.002, 0)) |
| |
| |
| start_lat = base_lat + offset[0] |
| start_lng = base_lng + offset[1] |
| |
| |
| path = [ |
| [start_lat, start_lng], |
| [start_lat - offset[0] * 0.5, start_lng - offset[1] * 0.5], |
| [base_lat + random.uniform(-0.0003, 0.0003), |
| base_lng + random.uniform(-0.0003, 0.0003)] |
| ] |
| |
| return path |
|
|
|
|
| def calculate_collision_angle(v1_direction: str, v2_direction: str) -> float: |
| """Calculate the angle of collision between two vehicles.""" |
| angle1 = DIRECTIONS[v1_direction] |
| angle2 = DIRECTIONS[v2_direction] |
| |
| diff = abs(angle1 - angle2) |
| if diff > 180: |
| diff = 360 - diff |
| |
| return diff |
|
|
|
|
| def determine_accident_type(v1_direction: str, v2_direction: str, |
| v1_action: str, v2_action: str, |
| road_type: str = 'roundabout') -> str: |
| """Determine accident type based on vehicle directions, actions, and road type.""" |
| |
| collision_angle = calculate_collision_angle(v1_direction, v2_direction) |
| |
| |
| if collision_angle > 150: |
| return 'head_on_collision' |
| |
| |
| if collision_angle < 30: |
| return 'rear_end_collision' |
| |
| |
| if 60 < collision_angle < 120: |
| return 'side_impact' |
| |
| |
| if road_type == 'roundabout' and ('roundabout' in v1_action or 'roundabout' in v2_action): |
| return 'roundabout_entry_collision' |
| |
| |
| if 'changing_lane' in v1_action or 'changing_lane' in v2_action: |
| return 'lane_change_collision' |
| |
| |
| if road_type in ['crossroad', 't_junction']: |
| return 'intersection_collision' |
| |
| |
| if 30 <= collision_angle <= 60: |
| return 'sideswipe' |
| |
| |
| return 'intersection_collision' |
|
|
|
|
| def determine_contributing_factors( |
| v1_data: Dict, |
| v2_data: Dict, |
| weather: str, |
| road_condition: str, |
| road_type: str = 'roundabout' |
| ) -> List[str]: |
| """Determine contributing factors based on accident data.""" |
| |
| factors = [] |
| |
| |
| speed_limits = { |
| 'roundabout': 50, 'crossroad': 60, 't_junction': 50, |
| 'highway_merge': 80, 'parking': 20, 'highway': 120, 'urban_road': 50, 'other': 60 |
| } |
| speed_limit = speed_limits.get(road_type, 60) |
| |
| if v1_data['speed_kmh'] > speed_limit or v2_data['speed_kmh'] > speed_limit: |
| factors.append('speeding') |
| |
| |
| collision_angle = calculate_collision_angle(v1_data['direction'], v2_data['direction']) |
| if collision_angle < 30 and abs(v1_data['speed_kmh'] - v2_data['speed_kmh']) > 20: |
| factors.append('following_too_closely') |
| |
| |
| if road_type == 'roundabout' and ('roundabout' in v1_data['action'] or 'roundabout' in v2_data['action']): |
| factors.append('failure_to_yield') |
| elif road_type in ['crossroad', 't_junction']: |
| if random.random() < 0.4: |
| factors.append('failure_to_yield') |
| |
| |
| if 'changing_lane' in v1_data['action'] or 'changing_lane' in v2_data['action']: |
| factors.append('improper_lane_change') |
| |
| |
| if not v1_data['signaling'] and ('turn' in v1_data['action'] or 'changing' in v1_data['action']): |
| factors.append('failure_to_signal') |
| |
| |
| if weather in ['rainy', 'foggy', 'sandstorm']: |
| factors.append('weather_conditions') |
| |
| |
| if road_condition != 'dry': |
| factors.append('road_conditions') |
| |
| |
| random_factors = ['distracted_driving', 'improper_turn', 'running_red_light', 'fatigue'] |
| if random.random() < 0.3: |
| factors.append(random.choice(random_factors)) |
| |
| return factors[:4] |
|
|
|
|
| def determine_fault(v1_data: Dict, v2_data: Dict, accident_type: str) -> int: |
| """Determine which vehicle is primarily at fault.""" |
| |
| v1_score = 0 |
| v2_score = 0 |
| |
| |
| if v1_data['speed_kmh'] > v2_data['speed_kmh']: |
| v1_score += 1 |
| else: |
| v2_score += 1 |
| |
| |
| if not v1_data['signaling']: |
| v1_score += 1 |
| if not v2_data['signaling']: |
| v2_score += 1 |
| |
| |
| if not v1_data['braking']: |
| v1_score += 1 |
| if not v2_data['braking']: |
| v2_score += 1 |
| |
| |
| risky_actions = ['accelerating', 'changing_lane_left', 'changing_lane_right'] |
| if v1_data['action'] in risky_actions: |
| v1_score += 1 |
| if v2_data['action'] in risky_actions: |
| v2_score += 1 |
| |
| |
| if accident_type == 'rear_end_collision': |
| v2_score += 2 |
| |
| return 1 if v1_score > v2_score else 2 |
|
|
|
|
| def calculate_scenario_probability( |
| v1_data: Dict, |
| v2_data: Dict, |
| weather: str, |
| road_condition: str, |
| accident_type: str, |
| road_type: str = 'roundabout' |
| ) -> float: |
| """Calculate the probability of this accident scenario.""" |
| |
| base_prob = 0.5 |
| |
| |
| road_risk = { |
| 'roundabout': 0.05, 'crossroad': 0.1, 't_junction': 0.08, |
| 'highway_merge': 0.12, 'parking': -0.1, 'highway': 0.15, |
| 'urban_road': 0.03, 'other': 0.05 |
| } |
| base_prob += road_risk.get(road_type, 0.05) |
| |
| |
| collision_angle = calculate_collision_angle(v1_data['direction'], v2_data['direction']) |
| if 60 < collision_angle < 120: |
| base_prob += 0.15 |
| elif collision_angle < 30: |
| base_prob += 0.1 |
| |
| |
| combined_speed = v1_data['speed_kmh'] + v2_data['speed_kmh'] |
| if combined_speed > 100: |
| base_prob += 0.1 |
| if combined_speed > 150: |
| base_prob += 0.1 |
| |
| |
| weather_impact = { |
| 'clear': 0, 'cloudy': 0.02, 'rainy': 0.08, |
| 'foggy': 0.1, 'sandstorm': 0.12 |
| } |
| base_prob += weather_impact.get(weather, 0) |
| |
| |
| road_impact = {'dry': 0, 'wet': 0.08, 'sandy': 0.1, 'oily': 0.15} |
| base_prob += road_impact.get(road_condition, 0) |
| |
| |
| risky_actions = ['changing_lane_left', 'changing_lane_right', 'accelerating', 'entering_roundabout'] |
| if v1_data['action'] in risky_actions: |
| base_prob += 0.05 |
| if v2_data['action'] in risky_actions: |
| base_prob += 0.05 |
| |
| |
| if not v1_data['braking'] and not v2_data['braking']: |
| base_prob += 0.05 |
| |
| |
| base_prob += random.uniform(-0.1, 0.1) |
| |
| return max(0.1, min(0.95, base_prob)) |
|
|
|
|
| |
| |
| |
|
|
| def generate_single_accident() -> Dict: |
| """Generate a single accident record.""" |
| |
| |
| accident_id = generate_accident_id() |
| timestamp = generate_timestamp() |
| weather, visibility = select_weather() |
| road_condition = select_road_condition(weather) |
| |
| |
| road_type = random.choices( |
| list(ROAD_TYPE_WEIGHTS.keys()), |
| weights=list(ROAD_TYPE_WEIGHTS.values()) |
| )[0] |
| |
| |
| hour = timestamp.hour |
| if 7 <= hour < 17: |
| lighting = 'daylight' |
| elif hour in [6, 17, 18]: |
| lighting = random.choice(['dusk', 'dawn']) |
| elif 19 <= hour <= 23 or 0 <= hour < 6: |
| lighting = random.choice(['night_lit', 'night_dark']) |
| else: |
| lighting = 'daylight' |
| |
| |
| if lighting in ['night_dark']: |
| visibility = visibility * 0.6 |
| elif lighting in ['night_lit']: |
| visibility = visibility * 0.8 |
| elif lighting in ['dusk', 'dawn']: |
| visibility = visibility * 0.9 |
| |
| |
| accident_type = random.choice(ACCIDENT_TYPES) |
| |
| |
| vehicle_1 = generate_vehicle_data(1, accident_type, road_type) |
| vehicle_2 = generate_vehicle_data(2, accident_type, road_type) |
| |
| |
| actual_accident_type = determine_accident_type( |
| vehicle_1['direction'], vehicle_2['direction'], |
| vehicle_1['action'], vehicle_2['action'], |
| road_type |
| ) |
| |
| |
| collision_angle = calculate_collision_angle( |
| vehicle_1['direction'], vehicle_2['direction'] |
| ) |
| |
| |
| collision_point = [ |
| CASE_STUDY_LOCATION['latitude'] + random.uniform(-0.0005, 0.0005), |
| CASE_STUDY_LOCATION['longitude'] + random.uniform(-0.0005, 0.0005) |
| ] |
| |
| |
| factors = determine_contributing_factors( |
| vehicle_1, vehicle_2, weather, road_condition, road_type |
| ) |
| |
| |
| fault_vehicle = determine_fault(vehicle_1, vehicle_2, actual_accident_type) |
| |
| |
| combined_speed = vehicle_1['speed_kmh'] + vehicle_2['speed_kmh'] |
| if combined_speed > 120: |
| severity = 'severe' |
| elif combined_speed > 80: |
| severity = 'moderate' |
| else: |
| severity = 'minor' |
| |
| |
| probability = calculate_scenario_probability( |
| vehicle_1, vehicle_2, weather, road_condition, actual_accident_type, road_type |
| ) |
| |
| return { |
| 'accident_id': accident_id, |
| 'timestamp': timestamp.isoformat(), |
| 'location': { |
| 'name': CASE_STUDY_LOCATION['name'], |
| 'latitude': CASE_STUDY_LOCATION['latitude'], |
| 'longitude': CASE_STUDY_LOCATION['longitude'], |
| 'road_type': road_type |
| }, |
| 'conditions': { |
| 'weather': weather, |
| 'road_condition': road_condition, |
| 'visibility': round(visibility, 2), |
| 'lighting': lighting |
| }, |
| 'vehicle_1': vehicle_1, |
| 'vehicle_2': vehicle_2, |
| 'accident_details': { |
| 'type': actual_accident_type, |
| 'severity': severity, |
| 'collision_angle': collision_angle, |
| 'collision_point': collision_point, |
| 'contributing_factors': factors, |
| 'fault_vehicle': fault_vehicle |
| }, |
| 'outcomes': { |
| 'scenario_probability': round(probability, 3), |
| 'damage_estimate': severity, |
| 'injuries': severity in ['moderate', 'severe'] and random.random() > 0.4 |
| } |
| } |
|
|
|
|
| def generate_dataset(num_samples: int = 1000) -> pd.DataFrame: |
| """Generate a complete synthetic accident dataset.""" |
| |
| print(f"Generating {num_samples} synthetic accident records...") |
| |
| accidents = [] |
| for i in range(num_samples): |
| if (i + 1) % 1000 == 0: |
| print(f" Generated {i + 1}/{num_samples} records...") |
| |
| accident = generate_single_accident() |
| |
| |
| flat_record = { |
| 'accident_id': accident['accident_id'], |
| 'timestamp': accident['timestamp'], |
| 'location_name': accident['location']['name'], |
| 'latitude': accident['location']['latitude'], |
| 'longitude': accident['location']['longitude'], |
| 'road_type': accident['location']['road_type'], |
| 'weather': accident['conditions']['weather'], |
| 'road_condition': accident['conditions']['road_condition'], |
| 'visibility': accident['conditions']['visibility'], |
| 'lighting': accident['conditions']['lighting'], |
| |
| |
| 'v1_type': accident['vehicle_1']['type'], |
| 'v1_speed': accident['vehicle_1']['speed_kmh'], |
| 'v1_direction': accident['vehicle_1']['direction'], |
| 'v1_direction_angle': accident['vehicle_1']['direction_angle'], |
| 'v1_action': accident['vehicle_1']['action'], |
| 'v1_braking': accident['vehicle_1']['braking'], |
| 'v1_signaling': accident['vehicle_1']['signaling'], |
| |
| |
| 'v2_type': accident['vehicle_2']['type'], |
| 'v2_speed': accident['vehicle_2']['speed_kmh'], |
| 'v2_direction': accident['vehicle_2']['direction'], |
| 'v2_direction_angle': accident['vehicle_2']['direction_angle'], |
| 'v2_action': accident['vehicle_2']['action'], |
| 'v2_braking': accident['vehicle_2']['braking'], |
| 'v2_signaling': accident['vehicle_2']['signaling'], |
| |
| |
| 'accident_type': accident['accident_details']['type'], |
| 'severity': accident['accident_details']['severity'], |
| 'collision_angle': accident['accident_details']['collision_angle'], |
| 'contributing_factors': ','.join(accident['accident_details']['contributing_factors']), |
| 'fault_vehicle': accident['accident_details']['fault_vehicle'], |
| |
| |
| 'scenario_probability': accident['outcomes']['scenario_probability'], |
| 'injuries': accident['outcomes']['injuries'] |
| } |
| |
| accidents.append(flat_record) |
| |
| df = pd.DataFrame(accidents) |
| print(f"Dataset generated with {len(df)} records.") |
| |
| return df |
|
|
|
|
| def generate_training_features(df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Convert dataset to feature vectors for MindSpore training. |
| |
| Features (32 total): |
| - Vehicle 1: type, speed, direction, angle, action, braking, signaling (7) |
| - Vehicle 2: type, speed, direction, angle, action, braking, signaling (7) |
| - Environment: weather, road_condition, visibility, lighting, road_type (5) |
| - Derived: collision_angle, speed_diff, combined_speed, same_direction, |
| speed_product, angle_diff, time_of_day, risk_score, |
| v1_action_risk, v2_action_risk, relative_speed, approach_rate (12) |
| - Total: 31 input features |
| |
| Returns: |
| X: Feature matrix (N x 31) |
| y: Labels - accident type encoded (N,) |
| """ |
| |
| |
| direction_encoding = {d: i for i, d in enumerate(DIRECTIONS.keys())} |
| action_encoding = {a: i for i, a in enumerate(VEHICLE_ACTIONS)} |
| vehicle_encoding = {v: i for i, v in enumerate(VEHICLE_TYPES.keys())} |
| weather_encoding = {'clear': 0, 'cloudy': 1, 'rainy': 2, 'foggy': 3, 'sandstorm': 4} |
| road_encoding = {'dry': 0, 'wet': 1, 'sandy': 2, 'oily': 3} |
| road_type_encoding = { |
| 'roundabout': 0, 'crossroad': 1, 't_junction': 2, 'highway_merge': 3, |
| 'parking': 4, 'highway': 5, 'urban_road': 6, 'other': 7 |
| } |
| lighting_encoding = {'daylight': 0, 'dusk': 1, 'dawn': 2, 'night_lit': 3, 'night_dark': 4} |
| accident_encoding = {a: i for i, a in enumerate(ACCIDENT_TYPES)} |
| |
| |
| action_risk = { |
| 'going_straight': 0.3, 'turning_left': 0.5, 'turning_right': 0.4, |
| 'entering_roundabout': 0.6, 'exiting_roundabout': 0.5, |
| 'changing_lane_left': 0.7, 'changing_lane_right': 0.7, |
| 'slowing_down': 0.4, 'accelerating': 0.6, 'stopped': 0.2 |
| } |
| |
| features = [] |
| labels = [] |
| |
| for _, row in df.iterrows(): |
| |
| try: |
| hour = pd.to_datetime(row['timestamp']).hour |
| except: |
| hour = 12 |
| |
| |
| v1_speed = row['v1_speed'] |
| v2_speed = row['v2_speed'] |
| v1_angle = row['v1_direction_angle'] |
| v2_angle = row['v2_direction_angle'] |
| collision_angle = row['collision_angle'] |
| |
| speed_diff = abs(v1_speed - v2_speed) |
| combined_speed = v1_speed + v2_speed |
| same_direction = 1 if row['v1_direction'] == row['v2_direction'] else 0 |
| speed_product = v1_speed * v2_speed |
| angle_diff = (v1_angle - v2_angle) % 360 |
| if angle_diff > 180: |
| angle_diff = 360 - angle_diff |
| |
| |
| weather_risk = {'clear': 0.1, 'cloudy': 0.2, 'rainy': 0.5, 'foggy': 0.7, 'sandstorm': 0.8} |
| road_risk = {'dry': 0.1, 'wet': 0.5, 'sandy': 0.6, 'oily': 0.8} |
| base_risk = weather_risk.get(row['weather'], 0.3) + road_risk.get(row['road_condition'], 0.3) |
| |
| |
| if angle_diff > 90: |
| relative_speed = v1_speed + v2_speed |
| else: |
| relative_speed = abs(v1_speed - v2_speed) |
| |
| |
| approach_rate = relative_speed * (1 - row['visibility']) * (1 + base_risk) |
| |
| feature_vector = [ |
| |
| vehicle_encoding.get(row['v1_type'], 0) / 5, |
| v1_speed / 200, |
| direction_encoding.get(row['v1_direction'], 0) / 8, |
| v1_angle / 360, |
| action_encoding.get(row['v1_action'], 0) / 10, |
| 1 if row['v1_braking'] else 0, |
| 1 if row['v1_signaling'] else 0, |
| |
| |
| vehicle_encoding.get(row['v2_type'], 0) / 5, |
| v2_speed / 200, |
| direction_encoding.get(row['v2_direction'], 0) / 8, |
| v2_angle / 360, |
| action_encoding.get(row['v2_action'], 0) / 10, |
| 1 if row['v2_braking'] else 0, |
| 1 if row['v2_signaling'] else 0, |
| |
| |
| weather_encoding.get(row['weather'], 0) / 5, |
| road_encoding.get(row['road_condition'], 0) / 4, |
| row['visibility'], |
| lighting_encoding.get(row.get('lighting', 'daylight'), 0) / 5, |
| road_type_encoding.get(row.get('road_type', 'roundabout'), 0) / 8, |
| |
| |
| collision_angle / 180, |
| speed_diff / 200, |
| combined_speed / 400, |
| same_direction, |
| speed_product / 40000, |
| angle_diff / 180, |
| hour / 24, |
| base_risk, |
| action_risk.get(row['v1_action'], 0.5), |
| action_risk.get(row['v2_action'], 0.5), |
| relative_speed / 400, |
| min(approach_rate / 200, 1.0), |
| ] |
| |
| features.append(feature_vector) |
| labels.append(accident_encoding.get(row['accident_type'], 0)) |
| |
| X = np.array(features, dtype=np.float32) |
| y = np.array(labels, dtype=np.int32) |
| |
| return X, y |
|
|
|
|
| def generate_training_features_extended(df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """ |
| Extended feature generation that also outputs probability targets. |
| |
| Returns: |
| X: Feature matrix |
| y_class: Classification labels (accident type) |
| y_prob: Probability targets (for regression) |
| """ |
| X, y_class = generate_training_features(df) |
| y_prob = df['scenario_probability'].values.astype(np.float32) |
| |
| return X, y_class, y_prob |
|
|
|
|
| def save_dataset(df: pd.DataFrame, filename: str = "synthetic_accidents"): |
| """Save the dataset in multiple formats.""" |
| |
| |
| DATA_DIR.mkdir(parents=True, exist_ok=True) |
| PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True) |
| |
| |
| csv_path = PROCESSED_DATA_DIR / f"{filename}.csv" |
| df.to_csv(csv_path, index=False) |
| print(f"Saved CSV: {csv_path}") |
| |
| |
| json_path = PROCESSED_DATA_DIR / f"{filename}.json" |
| df.to_json(json_path, orient='records', indent=2) |
| print(f"Saved JSON: {json_path}") |
| |
| |
| X, y = generate_training_features(df) |
| |
| np_path = PROCESSED_DATA_DIR / f"{filename}_features.npz" |
| np.savez(np_path, X=X, y=y) |
| print(f"Saved NumPy features: {np_path}") |
| |
| |
| schema_path = DATA_DIR / "accident_schema.json" |
| with open(schema_path, 'w') as f: |
| json.dump(ACCIDENT_SCHEMA, f, indent=2) |
| print(f"Saved schema: {schema_path}") |
| |
| return csv_path, json_path, np_path |
|
|
|
|
| def print_dataset_statistics(df: pd.DataFrame): |
| """Print statistics about the generated dataset.""" |
| |
| print("\n" + "="*60) |
| print("DATASET STATISTICS") |
| print("="*60) |
| |
| print(f"\nTotal records: {len(df)}") |
| |
| print(f"\n--- Accident Types ---") |
| print(df['accident_type'].value_counts()) |
| |
| print(f"\n--- Weather Conditions ---") |
| print(df['weather'].value_counts()) |
| |
| print(f"\n--- Road Conditions ---") |
| print(df['road_condition'].value_counts()) |
| |
| print(f"\n--- Severity Distribution ---") |
| print(df['severity'].value_counts()) |
| |
| print(f"\n--- Vehicle Types (V1) ---") |
| print(df['v1_type'].value_counts()) |
| |
| print(f"\n--- Speed Statistics ---") |
| print(f"V1 Speed: Mean={df['v1_speed'].mean():.1f}, Std={df['v1_speed'].std():.1f}") |
| print(f"V2 Speed: Mean={df['v2_speed'].mean():.1f}, Std={df['v2_speed'].std():.1f}") |
| |
| print(f"\n--- Fault Distribution ---") |
| print(df['fault_vehicle'].value_counts()) |
| |
| print(f"\n--- Injuries ---") |
| print(df['injuries'].value_counts()) |
| |
| print("\n" + "="*60) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| print("="*60) |
| print("SYNTHETIC ACCIDENT DATASET GENERATOR") |
| print("Huawei AI Innovation Challenge 2026") |
| print("="*60) |
| |
| |
| df = generate_dataset(num_samples=1000) |
| |
| |
| print_dataset_statistics(df) |
| |
| |
| csv_path, json_path, np_path = save_dataset(df) |
| |
| print("\n" + "="*60) |
| print("DATASET GENERATION COMPLETE!") |
| print("="*60) |
| print(f"\nFiles saved:") |
| print(f" - {csv_path}") |
| print(f" - {json_path}") |
| print(f" - {np_path}") |
|
|