| | """ |
| | MindSpore Model Loader |
| | ====================== |
| | Loads the trained MindSpore model for accident prediction. |
| | Supports both MindSpore (.ckpt) and NumPy fallback (.npz). |
| | """ |
| |
|
| | import numpy as np |
| | import json |
| | from pathlib import Path |
| | from typing import Dict, Tuple, Optional |
| |
|
| | |
| | MINDSPORE_AVAILABLE = False |
| | try: |
| | import mindspore as ms |
| | import mindspore.nn as nn |
| | from mindspore import Tensor, context, load_checkpoint, load_param_into_net |
| | import mindspore.ops as ops |
| | MINDSPORE_AVAILABLE = True |
| | print(f"โ
MindSpore {ms.__version__} available") |
| | except ImportError: |
| | print("โ ๏ธ MindSpore not available, using NumPy fallback") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | DIRECTION_MAP = { |
| | 'north': 0, 'northeast': 1, 'east': 2, 'southeast': 3, |
| | 'south': 4, 'southwest': 5, 'west': 6, 'northwest': 7 |
| | } |
| |
|
| | ACTION_MAP = { |
| | 'going_straight': 0, 'turning_left': 1, 'turning_right': 2, |
| | 'entering_roundabout': 3, 'exiting_roundabout': 4, |
| | 'changing_lane_left': 5, 'changing_lane_right': 6, |
| | 'slowing_down': 7, 'accelerating': 8, 'stopped': 9 |
| | } |
| |
|
| | VEHICLE_MAP = {'sedan': 0, 'suv': 1, 'truck': 2, 'motorcycle': 3, 'bus': 4} |
| | WEATHER_MAP = {'clear': 0, 'cloudy': 1, 'rainy': 2, 'foggy': 3, 'sandstorm': 4} |
| | ROAD_COND_MAP = {'dry': 0, 'wet': 1, 'sandy': 2, 'oily': 3} |
| | ROAD_TYPE_MAP = { |
| | 'roundabout': 0, 'crossroad': 1, 't_junction': 2, 'highway_merge': 3, |
| | 'parking': 4, 'highway': 5, 'urban_road': 6, 'other': 7 |
| | } |
| | LIGHTING_MAP = {'daylight': 0, 'dusk': 1, 'dawn': 2, 'night_lit': 3, 'night_dark': 4} |
| |
|
| | ACCIDENT_MAP = { |
| | 'rear_end_collision': 0, |
| | 'side_impact': 1, |
| | 'head_on_collision': 2, |
| | 'sideswipe': 3, |
| | 'roundabout_entry_collision': 4, |
| | 'lane_change_collision': 5, |
| | 'intersection_collision': 6 |
| | } |
| |
|
| | ACCIDENT_NAMES = {v: k for k, v in ACCIDENT_MAP.items()} |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if MINDSPORE_AVAILABLE: |
| | class AccidentClassifier(nn.Cell): |
| | """ |
| | MindSpore Neural Network for Traffic Accident Classification. |
| | Architecture: 31 โ 128 โ 64 โ 32 โ 7 |
| | """ |
| | |
| | def __init__(self, input_dim=31, num_classes=7): |
| | super(AccidentClassifier, self).__init__() |
| | |
| | |
| | self.fc1 = nn.Dense(input_dim, 128) |
| | self.bn1 = nn.BatchNorm1d(128) |
| | self.relu1 = nn.ReLU() |
| | self.dropout1 = nn.Dropout(p=0.3) |
| | |
| | |
| | self.fc2 = nn.Dense(128, 64) |
| | self.bn2 = nn.BatchNorm1d(64) |
| | self.relu2 = nn.ReLU() |
| | self.dropout2 = nn.Dropout(p=0.3) |
| | |
| | |
| | self.fc3 = nn.Dense(64, 32) |
| | self.bn3 = nn.BatchNorm1d(32) |
| | self.relu3 = nn.ReLU() |
| | self.dropout3 = nn.Dropout(p=0.2) |
| | |
| | |
| | self.fc4 = nn.Dense(32, num_classes) |
| | |
| | def construct(self, x): |
| | x = self.fc1(x) |
| | x = self.bn1(x) |
| | x = self.relu1(x) |
| | x = self.dropout1(x) |
| | |
| | x = self.fc2(x) |
| | x = self.bn2(x) |
| | x = self.relu2(x) |
| | x = self.dropout2(x) |
| | |
| | x = self.fc3(x) |
| | x = self.bn3(x) |
| | x = self.relu3(x) |
| | x = self.dropout3(x) |
| | |
| | x = self.fc4(x) |
| | return x |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class NumpyAccidentModel: |
| | """NumPy fallback when MindSpore is not available.""" |
| | |
| | def __init__(self, input_dim=31, num_classes=7): |
| | self.input_dim = input_dim |
| | self.num_classes = num_classes |
| | self.trained = False |
| | |
| | |
| | np.random.seed(42) |
| | self.W1 = np.random.randn(input_dim, 128) * np.sqrt(2.0 / input_dim) |
| | self.b1 = np.zeros(128) |
| | self.W2 = np.random.randn(128, 64) * np.sqrt(2.0 / 128) |
| | self.b2 = np.zeros(64) |
| | self.W3 = np.random.randn(64, 32) * np.sqrt(2.0 / 64) |
| | self.b3 = np.zeros(32) |
| | self.W4 = np.random.randn(32, num_classes) * np.sqrt(2.0 / 32) |
| | self.b4 = np.zeros(num_classes) |
| | |
| | def relu(self, x): |
| | return np.maximum(0, x) |
| | |
| | def softmax(self, x): |
| | exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True)) |
| | return exp_x / np.sum(exp_x, axis=-1, keepdims=True) |
| | |
| | def forward(self, x): |
| | x = self.relu(x @ self.W1 + self.b1) |
| | x = self.relu(x @ self.W2 + self.b2) |
| | x = self.relu(x @ self.W3 + self.b3) |
| | x = x @ self.W4 + self.b4 |
| | return self.softmax(x) |
| | |
| | def predict(self, x): |
| | if x.ndim == 1: |
| | x = x.reshape(1, -1) |
| | probs = self.forward(x) |
| | classes = np.argmax(probs, axis=1) |
| | return classes, probs |
| | |
| | def load(self, filepath): |
| | data = np.load(filepath) |
| | self.W1 = data['W1'] |
| | self.b1 = data['b1'] |
| | self.W2 = data['W2'] |
| | self.b2 = data['b2'] |
| | self.W3 = data['W3'] |
| | self.b3 = data['b3'] |
| | self.W4 = data.get('W4', data.get('weight_4', np.random.randn(32, 7) * 0.1)) |
| | self.b4 = data.get('b4', data.get('bias_4', np.zeros(7))) |
| | self.trained = True |
| | print(f"โ
NumPy model loaded from {filepath}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def extract_features(accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> np.ndarray: |
| | """ |
| | Extract 31 features from accident data. |
| | Must match the training feature extraction exactly! |
| | """ |
| | |
| | |
| | v1_type = VEHICLE_MAP.get(vehicle_1.get('type', 'sedan'), 0) / 5 |
| | v1_speed = vehicle_1.get('speed', 50) / 200 |
| | v1_dir = DIRECTION_MAP.get(vehicle_1.get('direction', 'north'), 0) / 8 |
| | v1_angle = v1_dir * 45 / 360 |
| | v1_action = ACTION_MAP.get(vehicle_1.get('action', 'going_straight'), 0) / 10 |
| | v1_braking = 1.0 if vehicle_1.get('braking', False) else 0.0 |
| | v1_signaling = 1.0 if vehicle_1.get('signaling', False) else 0.0 |
| | |
| | |
| | v2_type = VEHICLE_MAP.get(vehicle_2.get('type', 'sedan'), 0) / 5 |
| | v2_speed = vehicle_2.get('speed', 50) / 200 |
| | v2_dir = DIRECTION_MAP.get(vehicle_2.get('direction', 'east'), 2) / 8 |
| | v2_angle = v2_dir * 45 / 360 |
| | v2_action = ACTION_MAP.get(vehicle_2.get('action', 'going_straight'), 0) / 10 |
| | v2_braking = 1.0 if vehicle_2.get('braking', False) else 0.0 |
| | v2_signaling = 1.0 if vehicle_2.get('signaling', False) else 0.0 |
| | |
| | |
| | weather = WEATHER_MAP.get(accident_info.get('weather', 'clear'), 0) / 5 |
| | road_cond = ROAD_COND_MAP.get(accident_info.get('road_condition', 'dry'), 0) / 4 |
| | visibility = accident_info.get('visibility', 1.0) |
| | lighting = LIGHTING_MAP.get(accident_info.get('lighting', 'daylight'), 0) / 5 |
| | road_type = ROAD_TYPE_MAP.get(accident_info.get('road_type', 'roundabout'), 0) / 8 |
| | |
| | |
| | angle1 = DIRECTION_MAP.get(vehicle_1.get('direction', 'north'), 0) * 45 |
| | angle2 = DIRECTION_MAP.get(vehicle_2.get('direction', 'east'), 2) * 45 |
| | collision_angle = abs(angle1 - angle2) |
| | if collision_angle > 180: |
| | collision_angle = 360 - collision_angle |
| | collision_angle_norm = collision_angle / 180 |
| | |
| | speed1 = vehicle_1.get('speed', 50) |
| | speed2 = vehicle_2.get('speed', 50) |
| | speed_diff = abs(speed1 - speed2) / 200 |
| | combined_speed = (speed1 + speed2) / 400 |
| | same_direction = 1.0 if vehicle_1.get('direction') == vehicle_2.get('direction') else 0.0 |
| | speed_product = (speed1 * speed2) / 40000 |
| | |
| | weather_risk = [0.1, 0.2, 0.5, 0.7, 0.8][WEATHER_MAP.get(accident_info.get('weather', 'clear'), 0)] |
| | road_risk = [0.1, 0.5, 0.6, 0.8][ROAD_COND_MAP.get(accident_info.get('road_condition', 'dry'), 0)] |
| | base_risk = (weather_risk + road_risk) / 2 |
| | |
| | action_risk = { |
| | 'going_straight': 0.3, 'turning_left': 0.5, 'turning_right': 0.4, |
| | 'entering_roundabout': 0.6, 'exiting_roundabout': 0.5, |
| | 'changing_lane_left': 0.7, 'changing_lane_right': 0.7, |
| | 'slowing_down': 0.4, 'accelerating': 0.6, 'stopped': 0.2 |
| | } |
| | v1_action_risk = action_risk.get(vehicle_1.get('action', 'going_straight'), 0.5) |
| | v2_action_risk = action_risk.get(vehicle_2.get('action', 'going_straight'), 0.5) |
| | |
| | relative_speed = (speed1 + speed2) / 400 if collision_angle > 90 else abs(speed1 - speed2) / 200 |
| | approach_rate = min(relative_speed * (1 + base_risk), 1.0) |
| | time_factor = 0.5 |
| | |
| | |
| | features = np.array([ |
| | |
| | v1_type, v1_speed, v1_dir, v1_angle, v1_action, v1_braking, v1_signaling, |
| | |
| | v2_type, v2_speed, v2_dir, v2_angle, v2_action, v2_braking, v2_signaling, |
| | |
| | weather, road_cond, visibility, lighting, road_type, |
| | |
| | collision_angle_norm, speed_diff, combined_speed, same_direction, |
| | speed_product, collision_angle_norm, time_factor, base_risk, |
| | v1_action_risk, v2_action_risk, relative_speed, approach_rate |
| | ], dtype=np.float32) |
| | |
| | return features |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class AccidentModelManager: |
| | """ |
| | Manages loading and inference for accident prediction model. |
| | Automatically uses MindSpore if available, otherwise NumPy fallback. |
| | """ |
| | |
| | def __init__(self, model_dir: str = None): |
| | self.model_dir = Path(model_dir) if model_dir else Path(__file__).parent / "trained" |
| | self.model = None |
| | self.metadata = None |
| | self.backend = None |
| | self._loaded = False |
| | |
| | def load(self, ckpt_path: str = None, npz_path: str = None): |
| | """Load the model from checkpoint.""" |
| | |
| | |
| | if MINDSPORE_AVAILABLE and ckpt_path: |
| | ckpt_file = Path(ckpt_path) |
| | if ckpt_file.exists(): |
| | try: |
| | context.set_context(mode=context.GRAPH_MODE, device_target="CPU") |
| | self.model = AccidentClassifier(input_dim=31, num_classes=7) |
| | param_dict = load_checkpoint(str(ckpt_file)) |
| | load_param_into_net(self.model, param_dict) |
| | self.model.set_train(False) |
| | self.backend = "MindSpore" |
| | self._loaded = True |
| | print(f"โ
MindSpore model loaded from {ckpt_file}") |
| | return True |
| | except Exception as e: |
| | print(f"โ ๏ธ Failed to load MindSpore model: {e}") |
| | |
| | |
| | if npz_path: |
| | npz_file = Path(npz_path) |
| | if npz_file.exists(): |
| | try: |
| | self.model = NumpyAccidentModel(input_dim=31, num_classes=7) |
| | self.model.load(str(npz_file)) |
| | self.backend = "NumPy" |
| | self._loaded = True |
| | return True |
| | except Exception as e: |
| | print(f"โ ๏ธ Failed to load NumPy model: {e}") |
| | |
| | |
| | default_ckpt = self.model_dir / "best_accident_model.ckpt" |
| | default_npz = self.model_dir / "accident_model.npz" |
| | |
| | if MINDSPORE_AVAILABLE and default_ckpt.exists(): |
| | return self.load(ckpt_path=str(default_ckpt)) |
| | elif default_npz.exists(): |
| | return self.load(npz_path=str(default_npz)) |
| | |
| | print("โ ๏ธ No model found. Using untrained model.") |
| | self.model = NumpyAccidentModel(input_dim=31, num_classes=7) |
| | self.backend = "NumPy (untrained)" |
| | self._loaded = True |
| | return False |
| | |
| | def load_metadata(self, metadata_path: str = None): |
| | """Load model metadata.""" |
| | if metadata_path: |
| | meta_file = Path(metadata_path) |
| | else: |
| | meta_file = self.model_dir / "model_metadata.json" |
| | |
| | if meta_file.exists(): |
| | with open(meta_file, 'r') as f: |
| | self.metadata = json.load(f) |
| | print(f"โ
Metadata loaded from {meta_file}") |
| | return self.metadata |
| | return None |
| | |
| | def predict(self, accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> Dict: |
| | """ |
| | Predict accident type from input data. |
| | |
| | Returns: |
| | Dict with predicted_class, class_name, probabilities, confidence |
| | """ |
| | if not self._loaded: |
| | self.load() |
| | |
| | |
| | features = extract_features(accident_info, vehicle_1, vehicle_2) |
| | |
| | |
| | if self.backend == "MindSpore" and MINDSPORE_AVAILABLE: |
| | self.model.set_train(False) |
| | x = Tensor(features.reshape(1, -1), ms.float32) |
| | logits = self.model(x) |
| | softmax = ops.Softmax(axis=1) |
| | probs = softmax(logits)[0].asnumpy() |
| | pred_class = int(np.argmax(probs)) |
| | else: |
| | pred_class, probs = self.model.predict(features) |
| | pred_class = int(pred_class[0]) if isinstance(pred_class, np.ndarray) else int(pred_class) |
| | probs = probs[0] if probs.ndim > 1 else probs |
| | |
| | |
| | result = { |
| | 'predicted_class': pred_class, |
| | 'class_name': ACCIDENT_NAMES[pred_class], |
| | 'confidence': float(probs[pred_class]), |
| | 'probabilities': { |
| | ACCIDENT_NAMES[i]: float(probs[i]) |
| | for i in range(len(probs)) |
| | }, |
| | 'backend': self.backend |
| | } |
| | |
| | return result |
| | |
| | def get_all_predictions(self, accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> list: |
| | """Get all accident types sorted by probability.""" |
| | result = self.predict(accident_info, vehicle_1, vehicle_2) |
| | |
| | sorted_predictions = sorted( |
| | result['probabilities'].items(), |
| | key=lambda x: x[1], |
| | reverse=True |
| | ) |
| | |
| | return [ |
| | {'type': name, 'probability': prob} |
| | for name, prob in sorted_predictions |
| | ] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | _model_manager = None |
| |
|
| | def get_model_manager(model_dir: str = None) -> AccidentModelManager: |
| | """Get or create global model manager instance.""" |
| | global _model_manager |
| | if _model_manager is None: |
| | _model_manager = AccidentModelManager(model_dir) |
| | return _model_manager |
| |
|
| |
|
| | def predict_accident(accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> Dict: |
| | """Convenience function for prediction.""" |
| | manager = get_model_manager() |
| | if not manager._loaded: |
| | manager.load() |
| | return manager.predict(accident_info, vehicle_1, vehicle_2) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | print("\n" + "="*60) |
| | print("๐งช Testing MindSpore Model Loader") |
| | print("="*60) |
| | |
| | |
| | manager = AccidentModelManager() |
| | manager.load() |
| | |
| | |
| | test_accident = { |
| | 'weather': 'clear', |
| | 'road_condition': 'dry', |
| | 'visibility': 1.0, |
| | 'lighting': 'daylight', |
| | 'road_type': 'roundabout' |
| | } |
| | |
| | test_v1 = { |
| | 'type': 'sedan', |
| | 'speed': 45, |
| | 'direction': 'north', |
| | 'action': 'going_straight', |
| | 'braking': False, |
| | 'signaling': False |
| | } |
| | |
| | test_v2 = { |
| | 'type': 'suv', |
| | 'speed': 55, |
| | 'direction': 'east', |
| | 'action': 'entering_roundabout', |
| | 'braking': False, |
| | 'signaling': True |
| | } |
| | |
| | result = manager.predict(test_accident, test_v1, test_v2) |
| | |
| | print(f"\n๐ฎ Prediction Result:") |
| | print(f" Backend: {result['backend']}") |
| | print(f" Predicted: {result['class_name']}") |
| | print(f" Confidence: {result['confidence']*100:.1f}%") |
| | print(f"\n๐ All Probabilities:") |
| | for name, prob in sorted(result['probabilities'].items(), key=lambda x: -x[1]): |
| | bar = 'โ' * int(prob * 20) |
| | print(f" {name:30}: {prob*100:5.1f}% {bar}") |
| |
|