car_web2 / data /synthetic_dataset_generator.py
wesam0099's picture
Deploy REAL original app
f158aab
"""
Synthetic Accident Dataset Generator
=====================================
Generates realistic synthetic traffic accident data for training
the MindSpore AI model.
This dataset simulates various accident scenarios at roundabouts
with different vehicle types, speeds, directions, and conditions.
"""
import numpy as np
import pandas as pd
import json
import random
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Tuple, Any
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import (
CASE_STUDY_LOCATION,
VEHICLE_TYPES,
ACCIDENT_TYPES,
CONTRIBUTING_FACTORS,
ROAD_TYPES,
DATA_DIR,
PROCESSED_DATA_DIR
)
# ============================================================
# CONSTANTS FOR DATA GENERATION
# ============================================================
# Directions with angles (for roundabout entry/exit)
DIRECTIONS = {
'north': 0,
'northeast': 45,
'east': 90,
'southeast': 135,
'south': 180,
'southwest': 225,
'west': 270,
'northwest': 315
}
# Actions vehicles can take
VEHICLE_ACTIONS = [
'going_straight',
'turning_left',
'turning_right',
'entering_roundabout',
'exiting_roundabout',
'changing_lane_left',
'changing_lane_right',
'slowing_down',
'accelerating',
'stopped'
]
# Weather conditions with probability weights
WEATHER_CONDITIONS = {
'clear': 0.55,
'cloudy': 0.20,
'rainy': 0.12,
'foggy': 0.07,
'sandstorm': 0.06
}
# Road conditions with probability weights
ROAD_CONDITIONS = {
'dry': 0.65,
'wet': 0.18,
'sandy': 0.12,
'oily': 0.05
}
# Road types with probability weights (expanded)
ROAD_TYPE_WEIGHTS = {
'roundabout': 0.30,
'crossroad': 0.25,
't_junction': 0.15,
'highway_merge': 0.10,
'parking': 0.05,
'highway': 0.08,
'urban_road': 0.05,
'other': 0.02
}
# Time of day distribution (hour: probability)
TIME_DISTRIBUTION = {
'morning_rush': (7, 9, 0.25), # 7-9 AM, 25% of accidents
'midday': (10, 15, 0.20), # 10 AM - 3 PM, 20%
'evening_rush': (16, 19, 0.30), # 4-7 PM, 30%
'night': (20, 23, 0.15), # 8-11 PM, 15%
'late_night': (0, 6, 0.10) # Midnight - 6 AM, 10%
}
# Lighting conditions
LIGHTING_CONDITIONS = ['daylight', 'dusk', 'dawn', 'night_lit', 'night_dark']
# ============================================================
# DATA SCHEMA DEFINITION
# ============================================================
ACCIDENT_SCHEMA = {
"accident_id": "string",
"timestamp": "datetime",
"location": {
"name": "string",
"latitude": "float",
"longitude": "float",
"road_type": "string"
},
"conditions": {
"weather": "string",
"road_condition": "string",
"visibility": "float", # 0-1 scale
"lighting": "string" # daylight, dusk, night, artificial
},
"vehicle_1": {
"type": "string",
"speed_kmh": "float",
"direction": "string",
"direction_angle": "float",
"action": "string",
"braking": "boolean",
"signaling": "boolean",
"path": "list[tuple]"
},
"vehicle_2": {
"type": "string",
"speed_kmh": "float",
"direction": "string",
"direction_angle": "float",
"action": "string",
"braking": "boolean",
"signaling": "boolean",
"path": "list[tuple]"
},
"accident_details": {
"type": "string",
"severity": "string", # minor, moderate, severe
"collision_angle": "float",
"collision_point": "tuple",
"contributing_factors": "list[string]",
"fault_vehicle": "int" # 1 or 2
},
"outcomes": {
"scenario_probability": "float",
"damage_estimate": "string",
"injuries": "boolean"
}
}
# ============================================================
# HELPER FUNCTIONS
# ============================================================
def generate_accident_id() -> str:
"""Generate unique accident ID."""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
random_suffix = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=4))
return f"ACC-{timestamp}-{random_suffix}"
def generate_timestamp() -> datetime:
"""Generate realistic accident timestamp based on distribution."""
# Select time period based on weights
period = random.choices(
list(TIME_DISTRIBUTION.keys()),
weights=[v[2] for v in TIME_DISTRIBUTION.values()]
)[0]
start_hour, end_hour, _ = TIME_DISTRIBUTION[period]
# Generate random date within last year
days_ago = random.randint(0, 365)
base_date = datetime.now() - timedelta(days=days_ago)
# Generate random time within period
hour = random.randint(start_hour, end_hour)
minute = random.randint(0, 59)
return base_date.replace(hour=hour, minute=minute, second=0, microsecond=0)
def select_weather() -> Tuple[str, float]:
"""Select weather condition and corresponding visibility."""
weather = random.choices(
list(WEATHER_CONDITIONS.keys()),
weights=list(WEATHER_CONDITIONS.values())
)[0]
visibility_map = {
'clear': random.uniform(0.9, 1.0),
'cloudy': random.uniform(0.8, 0.95),
'rainy': random.uniform(0.5, 0.8),
'foggy': random.uniform(0.2, 0.5),
'sandstorm': random.uniform(0.1, 0.4)
}
return weather, visibility_map[weather]
def select_road_condition(weather: str) -> str:
"""Select road condition based on weather."""
if weather == 'rainy':
return 'wet'
elif weather == 'sandstorm':
return random.choice(['sandy', 'dry'])
else:
return random.choices(
list(ROAD_CONDITIONS.keys()),
weights=list(ROAD_CONDITIONS.values())
)[0]
def generate_vehicle_data(vehicle_num: int, accident_type: str, road_type: str = 'roundabout') -> Dict:
"""Generate realistic vehicle data based on accident type and road type."""
# Select vehicle type with realistic distribution
vehicle_type = random.choices(
list(VEHICLE_TYPES.keys()),
weights=[0.50, 0.30, 0.10, 0.05, 0.05] # sedan most common
)[0]
specs = VEHICLE_TYPES[vehicle_type]
# Generate speed based on accident type and road type
speed_modifier = {
'roundabout': 0.6,
'crossroad': 0.7,
't_junction': 0.65,
'highway_merge': 0.9,
'parking': 0.2,
'highway': 1.0,
'urban_road': 0.5,
'other': 0.6
}.get(road_type, 0.6)
if accident_type == 'rear_end_collision':
if vehicle_num == 1:
speed = random.uniform(20, 50) * speed_modifier
else:
speed = random.uniform(40, 80) * speed_modifier
elif accident_type == 'head_on_collision':
speed = random.uniform(50, 100) * speed_modifier
elif accident_type in ['roundabout_entry_collision', 'intersection_collision']:
speed = random.uniform(30, 60) * speed_modifier
else:
speed = random.uniform(30, specs['max_speed'] * 0.7) * speed_modifier
# Ensure speed doesn't exceed vehicle max
speed = min(speed, specs['max_speed'])
# Select direction
direction = random.choice(list(DIRECTIONS.keys()))
# Select action based on accident type and road type
if road_type == 'roundabout':
if accident_type == 'roundabout_entry_collision':
action = random.choice(['entering_roundabout', 'going_straight'])
else:
action = random.choice(['entering_roundabout', 'exiting_roundabout', 'going_straight'])
elif road_type in ['crossroad', 't_junction']:
action = random.choice(['going_straight', 'turning_left', 'turning_right', 'stopped'])
elif road_type == 'highway_merge':
action = random.choice(['going_straight', 'changing_lane_left', 'changing_lane_right', 'accelerating'])
elif road_type == 'parking':
action = random.choice(['slowing_down', 'stopped', 'going_straight'])
elif road_type == 'highway':
action = random.choice(['going_straight', 'changing_lane_left', 'changing_lane_right'])
else:
if accident_type == 'lane_change_collision':
action = random.choice(['changing_lane_left', 'changing_lane_right'])
elif accident_type == 'rear_end_collision':
action = 'going_straight' if vehicle_num == 2 else random.choice(['slowing_down', 'stopped'])
else:
action = random.choice(VEHICLE_ACTIONS)
# Braking and signaling
braking = random.random() < 0.4 # 40% chance of braking
signaling = random.random() < 0.3 # 30% chance of signaling
# Generate simplified path (entry point, intermediate, collision area)
path = generate_vehicle_path(direction, accident_type)
return {
'type': vehicle_type,
'speed_kmh': round(speed, 1),
'direction': direction,
'direction_angle': DIRECTIONS[direction],
'action': action,
'braking': braking,
'signaling': signaling,
'path': path
}
def generate_vehicle_path(direction: str, accident_type: str) -> List[List[float]]:
"""Generate a realistic vehicle path for the roundabout."""
base_lat = CASE_STUDY_LOCATION['latitude']
base_lng = CASE_STUDY_LOCATION['longitude']
# Offset based on direction (entry points)
direction_offsets = {
'north': (0.002, 0),
'south': (-0.002, 0),
'east': (0, 0.002),
'west': (0, -0.002),
'northeast': (0.0015, 0.0015),
'northwest': (0.0015, -0.0015),
'southeast': (-0.0015, 0.0015),
'southwest': (-0.0015, -0.0015)
}
offset = direction_offsets.get(direction, (0.002, 0))
# Generate path points
start_lat = base_lat + offset[0]
start_lng = base_lng + offset[1]
# Path moves toward center (collision zone)
path = [
[start_lat, start_lng],
[start_lat - offset[0] * 0.5, start_lng - offset[1] * 0.5],
[base_lat + random.uniform(-0.0003, 0.0003),
base_lng + random.uniform(-0.0003, 0.0003)]
]
return path
def calculate_collision_angle(v1_direction: str, v2_direction: str) -> float:
"""Calculate the angle of collision between two vehicles."""
angle1 = DIRECTIONS[v1_direction]
angle2 = DIRECTIONS[v2_direction]
diff = abs(angle1 - angle2)
if diff > 180:
diff = 360 - diff
return diff
def determine_accident_type(v1_direction: str, v2_direction: str,
v1_action: str, v2_action: str,
road_type: str = 'roundabout') -> str:
"""Determine accident type based on vehicle directions, actions, and road type."""
collision_angle = calculate_collision_angle(v1_direction, v2_direction)
# Head-on: ~180 degrees
if collision_angle > 150:
return 'head_on_collision'
# Rear-end: ~0 degrees, same direction
if collision_angle < 30:
return 'rear_end_collision'
# Side impact: ~90 degrees
if 60 < collision_angle < 120:
return 'side_impact'
# Roundabout specific
if road_type == 'roundabout' and ('roundabout' in v1_action or 'roundabout' in v2_action):
return 'roundabout_entry_collision'
# Lane change
if 'changing_lane' in v1_action or 'changing_lane' in v2_action:
return 'lane_change_collision'
# Intersection/crossroad collision
if road_type in ['crossroad', 't_junction']:
return 'intersection_collision'
# Default sideswipe for smaller angles
if 30 <= collision_angle <= 60:
return 'sideswipe'
# Default to intersection collision
return 'intersection_collision'
def determine_contributing_factors(
v1_data: Dict,
v2_data: Dict,
weather: str,
road_condition: str,
road_type: str = 'roundabout'
) -> List[str]:
"""Determine contributing factors based on accident data."""
factors = []
# Speed-related
speed_limits = {
'roundabout': 50, 'crossroad': 60, 't_junction': 50,
'highway_merge': 80, 'parking': 20, 'highway': 120, 'urban_road': 50, 'other': 60
}
speed_limit = speed_limits.get(road_type, 60)
if v1_data['speed_kmh'] > speed_limit or v2_data['speed_kmh'] > speed_limit:
factors.append('speeding')
# Following distance (for similar directions)
collision_angle = calculate_collision_angle(v1_data['direction'], v2_data['direction'])
if collision_angle < 30 and abs(v1_data['speed_kmh'] - v2_data['speed_kmh']) > 20:
factors.append('following_too_closely')
# Failure to yield
if road_type == 'roundabout' and ('roundabout' in v1_data['action'] or 'roundabout' in v2_data['action']):
factors.append('failure_to_yield')
elif road_type in ['crossroad', 't_junction']:
if random.random() < 0.4:
factors.append('failure_to_yield')
# Improper lane change
if 'changing_lane' in v1_data['action'] or 'changing_lane' in v2_data['action']:
factors.append('improper_lane_change')
# Signaling
if not v1_data['signaling'] and ('turn' in v1_data['action'] or 'changing' in v1_data['action']):
factors.append('failure_to_signal')
# Weather conditions
if weather in ['rainy', 'foggy', 'sandstorm']:
factors.append('weather_conditions')
# Road conditions
if road_condition != 'dry':
factors.append('road_conditions')
# Add some randomness
random_factors = ['distracted_driving', 'improper_turn', 'running_red_light', 'fatigue']
if random.random() < 0.3:
factors.append(random.choice(random_factors))
return factors[:4] # Limit to 4 factors
def determine_fault(v1_data: Dict, v2_data: Dict, accident_type: str) -> int:
"""Determine which vehicle is primarily at fault."""
v1_score = 0
v2_score = 0
# Speed factor
if v1_data['speed_kmh'] > v2_data['speed_kmh']:
v1_score += 1
else:
v2_score += 1
# Signaling factor
if not v1_data['signaling']:
v1_score += 1
if not v2_data['signaling']:
v2_score += 1
# Braking factor (not braking is worse)
if not v1_data['braking']:
v1_score += 1
if not v2_data['braking']:
v2_score += 1
# Action-based fault
risky_actions = ['accelerating', 'changing_lane_left', 'changing_lane_right']
if v1_data['action'] in risky_actions:
v1_score += 1
if v2_data['action'] in risky_actions:
v2_score += 1
# Rear-end: usually rear vehicle at fault
if accident_type == 'rear_end_collision':
v2_score += 2
return 1 if v1_score > v2_score else 2
def calculate_scenario_probability(
v1_data: Dict,
v2_data: Dict,
weather: str,
road_condition: str,
accident_type: str,
road_type: str = 'roundabout'
) -> float:
"""Calculate the probability of this accident scenario."""
base_prob = 0.5
# Road type risk factor
road_risk = {
'roundabout': 0.05, 'crossroad': 0.1, 't_junction': 0.08,
'highway_merge': 0.12, 'parking': -0.1, 'highway': 0.15,
'urban_road': 0.03, 'other': 0.05
}
base_prob += road_risk.get(road_type, 0.05)
# Collision angle impact
collision_angle = calculate_collision_angle(v1_data['direction'], v2_data['direction'])
if 60 < collision_angle < 120: # Side impact most likely at roundabout
base_prob += 0.15
elif collision_angle < 30: # Rear-end
base_prob += 0.1
# Speed impact
combined_speed = v1_data['speed_kmh'] + v2_data['speed_kmh']
if combined_speed > 100:
base_prob += 0.1
if combined_speed > 150:
base_prob += 0.1
# Weather impact
weather_impact = {
'clear': 0, 'cloudy': 0.02, 'rainy': 0.08,
'foggy': 0.1, 'sandstorm': 0.12
}
base_prob += weather_impact.get(weather, 0)
# Road condition impact
road_impact = {'dry': 0, 'wet': 0.08, 'sandy': 0.1, 'oily': 0.15}
base_prob += road_impact.get(road_condition, 0)
# Action risk
risky_actions = ['changing_lane_left', 'changing_lane_right', 'accelerating', 'entering_roundabout']
if v1_data['action'] in risky_actions:
base_prob += 0.05
if v2_data['action'] in risky_actions:
base_prob += 0.05
# Not braking increases risk
if not v1_data['braking'] and not v2_data['braking']:
base_prob += 0.05
# Add some randomness
base_prob += random.uniform(-0.1, 0.1)
return max(0.1, min(0.95, base_prob))
# ============================================================
# MAIN DATASET GENERATION
# ============================================================
def generate_single_accident() -> Dict:
"""Generate a single accident record."""
# Generate basic info
accident_id = generate_accident_id()
timestamp = generate_timestamp()
weather, visibility = select_weather()
road_condition = select_road_condition(weather)
# Select road type
road_type = random.choices(
list(ROAD_TYPE_WEIGHTS.keys()),
weights=list(ROAD_TYPE_WEIGHTS.values())
)[0]
# Determine lighting based on time
hour = timestamp.hour
if 7 <= hour < 17:
lighting = 'daylight'
elif hour in [6, 17, 18]:
lighting = random.choice(['dusk', 'dawn'])
elif 19 <= hour <= 23 or 0 <= hour < 6:
lighting = random.choice(['night_lit', 'night_dark'])
else:
lighting = 'daylight'
# Adjust visibility based on lighting
if lighting in ['night_dark']:
visibility = visibility * 0.6
elif lighting in ['night_lit']:
visibility = visibility * 0.8
elif lighting in ['dusk', 'dawn']:
visibility = visibility * 0.9
# Pre-select accident type for more realistic data
accident_type = random.choice(ACCIDENT_TYPES)
# Generate vehicle data
vehicle_1 = generate_vehicle_data(1, accident_type, road_type)
vehicle_2 = generate_vehicle_data(2, accident_type, road_type)
# Recalculate accident type based on actual vehicle data
actual_accident_type = determine_accident_type(
vehicle_1['direction'], vehicle_2['direction'],
vehicle_1['action'], vehicle_2['action'],
road_type
)
# Calculate collision details
collision_angle = calculate_collision_angle(
vehicle_1['direction'], vehicle_2['direction']
)
# Collision point (near center of roundabout)
collision_point = [
CASE_STUDY_LOCATION['latitude'] + random.uniform(-0.0005, 0.0005),
CASE_STUDY_LOCATION['longitude'] + random.uniform(-0.0005, 0.0005)
]
# Determine contributing factors
factors = determine_contributing_factors(
vehicle_1, vehicle_2, weather, road_condition, road_type
)
# Determine fault
fault_vehicle = determine_fault(vehicle_1, vehicle_2, actual_accident_type)
# Determine severity
combined_speed = vehicle_1['speed_kmh'] + vehicle_2['speed_kmh']
if combined_speed > 120:
severity = 'severe'
elif combined_speed > 80:
severity = 'moderate'
else:
severity = 'minor'
# Calculate probability
probability = calculate_scenario_probability(
vehicle_1, vehicle_2, weather, road_condition, actual_accident_type, road_type
)
return {
'accident_id': accident_id,
'timestamp': timestamp.isoformat(),
'location': {
'name': CASE_STUDY_LOCATION['name'],
'latitude': CASE_STUDY_LOCATION['latitude'],
'longitude': CASE_STUDY_LOCATION['longitude'],
'road_type': road_type
},
'conditions': {
'weather': weather,
'road_condition': road_condition,
'visibility': round(visibility, 2),
'lighting': lighting
},
'vehicle_1': vehicle_1,
'vehicle_2': vehicle_2,
'accident_details': {
'type': actual_accident_type,
'severity': severity,
'collision_angle': collision_angle,
'collision_point': collision_point,
'contributing_factors': factors,
'fault_vehicle': fault_vehicle
},
'outcomes': {
'scenario_probability': round(probability, 3),
'damage_estimate': severity,
'injuries': severity in ['moderate', 'severe'] and random.random() > 0.4
}
}
def generate_dataset(num_samples: int = 1000) -> pd.DataFrame:
"""Generate a complete synthetic accident dataset."""
print(f"Generating {num_samples} synthetic accident records...")
accidents = []
for i in range(num_samples):
if (i + 1) % 1000 == 0:
print(f" Generated {i + 1}/{num_samples} records...")
accident = generate_single_accident()
# Flatten for DataFrame
flat_record = {
'accident_id': accident['accident_id'],
'timestamp': accident['timestamp'],
'location_name': accident['location']['name'],
'latitude': accident['location']['latitude'],
'longitude': accident['location']['longitude'],
'road_type': accident['location']['road_type'],
'weather': accident['conditions']['weather'],
'road_condition': accident['conditions']['road_condition'],
'visibility': accident['conditions']['visibility'],
'lighting': accident['conditions']['lighting'],
# Vehicle 1
'v1_type': accident['vehicle_1']['type'],
'v1_speed': accident['vehicle_1']['speed_kmh'],
'v1_direction': accident['vehicle_1']['direction'],
'v1_direction_angle': accident['vehicle_1']['direction_angle'],
'v1_action': accident['vehicle_1']['action'],
'v1_braking': accident['vehicle_1']['braking'],
'v1_signaling': accident['vehicle_1']['signaling'],
# Vehicle 2
'v2_type': accident['vehicle_2']['type'],
'v2_speed': accident['vehicle_2']['speed_kmh'],
'v2_direction': accident['vehicle_2']['direction'],
'v2_direction_angle': accident['vehicle_2']['direction_angle'],
'v2_action': accident['vehicle_2']['action'],
'v2_braking': accident['vehicle_2']['braking'],
'v2_signaling': accident['vehicle_2']['signaling'],
# Accident details
'accident_type': accident['accident_details']['type'],
'severity': accident['accident_details']['severity'],
'collision_angle': accident['accident_details']['collision_angle'],
'contributing_factors': ','.join(accident['accident_details']['contributing_factors']),
'fault_vehicle': accident['accident_details']['fault_vehicle'],
# Outcomes
'scenario_probability': accident['outcomes']['scenario_probability'],
'injuries': accident['outcomes']['injuries']
}
accidents.append(flat_record)
df = pd.DataFrame(accidents)
print(f"Dataset generated with {len(df)} records.")
return df
def generate_training_features(df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert dataset to feature vectors for MindSpore training.
Features (32 total):
- Vehicle 1: type, speed, direction, angle, action, braking, signaling (7)
- Vehicle 2: type, speed, direction, angle, action, braking, signaling (7)
- Environment: weather, road_condition, visibility, lighting, road_type (5)
- Derived: collision_angle, speed_diff, combined_speed, same_direction,
speed_product, angle_diff, time_of_day, risk_score,
v1_action_risk, v2_action_risk, relative_speed, approach_rate (12)
- Total: 31 input features
Returns:
X: Feature matrix (N x 31)
y: Labels - accident type encoded (N,)
"""
# Encode categorical variables
direction_encoding = {d: i for i, d in enumerate(DIRECTIONS.keys())}
action_encoding = {a: i for i, a in enumerate(VEHICLE_ACTIONS)}
vehicle_encoding = {v: i for i, v in enumerate(VEHICLE_TYPES.keys())}
weather_encoding = {'clear': 0, 'cloudy': 1, 'rainy': 2, 'foggy': 3, 'sandstorm': 4}
road_encoding = {'dry': 0, 'wet': 1, 'sandy': 2, 'oily': 3}
road_type_encoding = {
'roundabout': 0, 'crossroad': 1, 't_junction': 2, 'highway_merge': 3,
'parking': 4, 'highway': 5, 'urban_road': 6, 'other': 7
}
lighting_encoding = {'daylight': 0, 'dusk': 1, 'dawn': 2, 'night_lit': 3, 'night_dark': 4}
accident_encoding = {a: i for i, a in enumerate(ACCIDENT_TYPES)}
# Action risk scores
action_risk = {
'going_straight': 0.3, 'turning_left': 0.5, 'turning_right': 0.4,
'entering_roundabout': 0.6, 'exiting_roundabout': 0.5,
'changing_lane_left': 0.7, 'changing_lane_right': 0.7,
'slowing_down': 0.4, 'accelerating': 0.6, 'stopped': 0.2
}
features = []
labels = []
for _, row in df.iterrows():
# Extract time of day (hour) from timestamp if available
try:
hour = pd.to_datetime(row['timestamp']).hour
except:
hour = 12 # Default to noon
# Calculate derived features
v1_speed = row['v1_speed']
v2_speed = row['v2_speed']
v1_angle = row['v1_direction_angle']
v2_angle = row['v2_direction_angle']
collision_angle = row['collision_angle']
speed_diff = abs(v1_speed - v2_speed)
combined_speed = v1_speed + v2_speed
same_direction = 1 if row['v1_direction'] == row['v2_direction'] else 0
speed_product = v1_speed * v2_speed
angle_diff = (v1_angle - v2_angle) % 360
if angle_diff > 180:
angle_diff = 360 - angle_diff
# Risk score based on conditions
weather_risk = {'clear': 0.1, 'cloudy': 0.2, 'rainy': 0.5, 'foggy': 0.7, 'sandstorm': 0.8}
road_risk = {'dry': 0.1, 'wet': 0.5, 'sandy': 0.6, 'oily': 0.8}
base_risk = weather_risk.get(row['weather'], 0.3) + road_risk.get(row['road_condition'], 0.3)
# Relative speed (closing speed)
if angle_diff > 90: # Approaching
relative_speed = v1_speed + v2_speed
else: # Same direction
relative_speed = abs(v1_speed - v2_speed)
# Approach rate (how quickly vehicles are approaching collision)
approach_rate = relative_speed * (1 - row['visibility']) * (1 + base_risk)
feature_vector = [
# Vehicle 1 features (7)
vehicle_encoding.get(row['v1_type'], 0) / 5,
v1_speed / 200, # Normalize speed
direction_encoding.get(row['v1_direction'], 0) / 8,
v1_angle / 360,
action_encoding.get(row['v1_action'], 0) / 10,
1 if row['v1_braking'] else 0,
1 if row['v1_signaling'] else 0,
# Vehicle 2 features (7)
vehicle_encoding.get(row['v2_type'], 0) / 5,
v2_speed / 200,
direction_encoding.get(row['v2_direction'], 0) / 8,
v2_angle / 360,
action_encoding.get(row['v2_action'], 0) / 10,
1 if row['v2_braking'] else 0,
1 if row['v2_signaling'] else 0,
# Environmental features (5)
weather_encoding.get(row['weather'], 0) / 5,
road_encoding.get(row['road_condition'], 0) / 4,
row['visibility'],
lighting_encoding.get(row.get('lighting', 'daylight'), 0) / 5,
road_type_encoding.get(row.get('road_type', 'roundabout'), 0) / 8,
# Derived features (12)
collision_angle / 180,
speed_diff / 200,
combined_speed / 400,
same_direction,
speed_product / 40000,
angle_diff / 180,
hour / 24, # Time of day
base_risk, # Risk score
action_risk.get(row['v1_action'], 0.5), # V1 action risk
action_risk.get(row['v2_action'], 0.5), # V2 action risk
relative_speed / 400, # Relative/closing speed
min(approach_rate / 200, 1.0), # Approach rate (capped)
]
features.append(feature_vector)
labels.append(accident_encoding.get(row['accident_type'], 0))
X = np.array(features, dtype=np.float32)
y = np.array(labels, dtype=np.int32)
return X, y
def generate_training_features_extended(df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Extended feature generation that also outputs probability targets.
Returns:
X: Feature matrix
y_class: Classification labels (accident type)
y_prob: Probability targets (for regression)
"""
X, y_class = generate_training_features(df)
y_prob = df['scenario_probability'].values.astype(np.float32)
return X, y_class, y_prob
def save_dataset(df: pd.DataFrame, filename: str = "synthetic_accidents"):
"""Save the dataset in multiple formats."""
# Create directories if needed
DATA_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Save as CSV
csv_path = PROCESSED_DATA_DIR / f"{filename}.csv"
df.to_csv(csv_path, index=False)
print(f"Saved CSV: {csv_path}")
# Save as JSON (full records)
json_path = PROCESSED_DATA_DIR / f"{filename}.json"
df.to_json(json_path, orient='records', indent=2)
print(f"Saved JSON: {json_path}")
# Generate and save training features
X, y = generate_training_features(df)
np_path = PROCESSED_DATA_DIR / f"{filename}_features.npz"
np.savez(np_path, X=X, y=y)
print(f"Saved NumPy features: {np_path}")
# Save schema
schema_path = DATA_DIR / "accident_schema.json"
with open(schema_path, 'w') as f:
json.dump(ACCIDENT_SCHEMA, f, indent=2)
print(f"Saved schema: {schema_path}")
return csv_path, json_path, np_path
def print_dataset_statistics(df: pd.DataFrame):
"""Print statistics about the generated dataset."""
print("\n" + "="*60)
print("DATASET STATISTICS")
print("="*60)
print(f"\nTotal records: {len(df)}")
print(f"\n--- Accident Types ---")
print(df['accident_type'].value_counts())
print(f"\n--- Weather Conditions ---")
print(df['weather'].value_counts())
print(f"\n--- Road Conditions ---")
print(df['road_condition'].value_counts())
print(f"\n--- Severity Distribution ---")
print(df['severity'].value_counts())
print(f"\n--- Vehicle Types (V1) ---")
print(df['v1_type'].value_counts())
print(f"\n--- Speed Statistics ---")
print(f"V1 Speed: Mean={df['v1_speed'].mean():.1f}, Std={df['v1_speed'].std():.1f}")
print(f"V2 Speed: Mean={df['v2_speed'].mean():.1f}, Std={df['v2_speed'].std():.1f}")
print(f"\n--- Fault Distribution ---")
print(df['fault_vehicle'].value_counts())
print(f"\n--- Injuries ---")
print(df['injuries'].value_counts())
print("\n" + "="*60)
# ============================================================
# MAIN EXECUTION
# ============================================================
if __name__ == "__main__":
print("="*60)
print("SYNTHETIC ACCIDENT DATASET GENERATOR")
print("Huawei AI Innovation Challenge 2026")
print("="*60)
# Generate dataset
df = generate_dataset(num_samples=1000)
# Print statistics
print_dataset_statistics(df)
# Save dataset
csv_path, json_path, np_path = save_dataset(df)
print("\n" + "="*60)
print("DATASET GENERATION COMPLETE!")
print("="*60)
print(f"\nFiles saved:")
print(f" - {csv_path}")
print(f" - {json_path}")
print(f" - {np_path}")