""" MindSpore Model Loader ====================== Loads the trained MindSpore model for accident prediction. Supports both MindSpore (.ckpt) and NumPy fallback (.npz). """ import numpy as np import json from pathlib import Path from typing import Dict, Tuple, Optional # Try to import MindSpore MINDSPORE_AVAILABLE = False try: import mindspore as ms import mindspore.nn as nn from mindspore import Tensor, context, load_checkpoint, load_param_into_net import mindspore.ops as ops MINDSPORE_AVAILABLE = True print(f"✅ MindSpore {ms.__version__} available") except ImportError: print("⚠️ MindSpore not available, using NumPy fallback") # ============================================================ # ENCODING MAPS (must match training) # ============================================================ DIRECTION_MAP = { 'north': 0, 'northeast': 1, 'east': 2, 'southeast': 3, 'south': 4, 'southwest': 5, 'west': 6, 'northwest': 7 } ACTION_MAP = { 'going_straight': 0, 'turning_left': 1, 'turning_right': 2, 'entering_roundabout': 3, 'exiting_roundabout': 4, 'changing_lane_left': 5, 'changing_lane_right': 6, 'slowing_down': 7, 'accelerating': 8, 'stopped': 9 } VEHICLE_MAP = {'sedan': 0, 'suv': 1, 'truck': 2, 'motorcycle': 3, 'bus': 4} WEATHER_MAP = {'clear': 0, 'cloudy': 1, 'rainy': 2, 'foggy': 3, 'sandstorm': 4} ROAD_COND_MAP = {'dry': 0, 'wet': 1, 'sandy': 2, 'oily': 3} ROAD_TYPE_MAP = { 'roundabout': 0, 'crossroad': 1, 't_junction': 2, 'highway_merge': 3, 'parking': 4, 'highway': 5, 'urban_road': 6, 'other': 7 } LIGHTING_MAP = {'daylight': 0, 'dusk': 1, 'dawn': 2, 'night_lit': 3, 'night_dark': 4} ACCIDENT_MAP = { 'rear_end_collision': 0, 'side_impact': 1, 'head_on_collision': 2, 'sideswipe': 3, 'roundabout_entry_collision': 4, 'lane_change_collision': 5, 'intersection_collision': 6 } ACCIDENT_NAMES = {v: k for k, v in ACCIDENT_MAP.items()} # ============================================================ # MINDSPORE MODEL DEFINITION # ============================================================ if MINDSPORE_AVAILABLE: class AccidentClassifier(nn.Cell): """ MindSpore Neural Network for Traffic Accident Classification. Architecture: 31 → 128 → 64 → 32 → 7 """ def __init__(self, input_dim=31, num_classes=7): super(AccidentClassifier, self).__init__() # Layer 1: Input → 128 self.fc1 = nn.Dense(input_dim, 128) self.bn1 = nn.BatchNorm1d(128) self.relu1 = nn.ReLU() self.dropout1 = nn.Dropout(p=0.3) # Layer 2: 128 → 64 self.fc2 = nn.Dense(128, 64) self.bn2 = nn.BatchNorm1d(64) self.relu2 = nn.ReLU() self.dropout2 = nn.Dropout(p=0.3) # Layer 3: 64 → 32 self.fc3 = nn.Dense(64, 32) self.bn3 = nn.BatchNorm1d(32) self.relu3 = nn.ReLU() self.dropout3 = nn.Dropout(p=0.2) # Output: 32 → 7 self.fc4 = nn.Dense(32, num_classes) def construct(self, x): x = self.fc1(x) x = self.bn1(x) x = self.relu1(x) x = self.dropout1(x) x = self.fc2(x) x = self.bn2(x) x = self.relu2(x) x = self.dropout2(x) x = self.fc3(x) x = self.bn3(x) x = self.relu3(x) x = self.dropout3(x) x = self.fc4(x) return x # ============================================================ # NUMPY FALLBACK MODEL # ============================================================ class NumpyAccidentModel: """NumPy fallback when MindSpore is not available.""" def __init__(self, input_dim=31, num_classes=7): self.input_dim = input_dim self.num_classes = num_classes self.trained = False # Initialize weights np.random.seed(42) self.W1 = np.random.randn(input_dim, 128) * np.sqrt(2.0 / input_dim) self.b1 = np.zeros(128) self.W2 = np.random.randn(128, 64) * np.sqrt(2.0 / 128) self.b2 = np.zeros(64) self.W3 = np.random.randn(64, 32) * np.sqrt(2.0 / 64) self.b3 = np.zeros(32) self.W4 = np.random.randn(32, num_classes) * np.sqrt(2.0 / 32) self.b4 = np.zeros(num_classes) def relu(self, x): return np.maximum(0, x) def softmax(self, x): exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True)) return exp_x / np.sum(exp_x, axis=-1, keepdims=True) def forward(self, x): x = self.relu(x @ self.W1 + self.b1) x = self.relu(x @ self.W2 + self.b2) x = self.relu(x @ self.W3 + self.b3) x = x @ self.W4 + self.b4 return self.softmax(x) def predict(self, x): if x.ndim == 1: x = x.reshape(1, -1) probs = self.forward(x) classes = np.argmax(probs, axis=1) return classes, probs def load(self, filepath): data = np.load(filepath) self.W1 = data['W1'] self.b1 = data['b1'] self.W2 = data['W2'] self.b2 = data['b2'] self.W3 = data['W3'] self.b3 = data['b3'] self.W4 = data.get('W4', data.get('weight_4', np.random.randn(32, 7) * 0.1)) self.b4 = data.get('b4', data.get('bias_4', np.zeros(7))) self.trained = True print(f"✅ NumPy model loaded from {filepath}") # ============================================================ # FEATURE EXTRACTION # ============================================================ def extract_features(accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> np.ndarray: """ Extract 31 features from accident data. Must match the training feature extraction exactly! """ # Vehicle 1 features (7) v1_type = VEHICLE_MAP.get(vehicle_1.get('type', 'sedan'), 0) / 5 v1_speed = vehicle_1.get('speed', 50) / 200 v1_dir = DIRECTION_MAP.get(vehicle_1.get('direction', 'north'), 0) / 8 v1_angle = v1_dir * 45 / 360 v1_action = ACTION_MAP.get(vehicle_1.get('action', 'going_straight'), 0) / 10 v1_braking = 1.0 if vehicle_1.get('braking', False) else 0.0 v1_signaling = 1.0 if vehicle_1.get('signaling', False) else 0.0 # Vehicle 2 features (7) v2_type = VEHICLE_MAP.get(vehicle_2.get('type', 'sedan'), 0) / 5 v2_speed = vehicle_2.get('speed', 50) / 200 v2_dir = DIRECTION_MAP.get(vehicle_2.get('direction', 'east'), 2) / 8 v2_angle = v2_dir * 45 / 360 v2_action = ACTION_MAP.get(vehicle_2.get('action', 'going_straight'), 0) / 10 v2_braking = 1.0 if vehicle_2.get('braking', False) else 0.0 v2_signaling = 1.0 if vehicle_2.get('signaling', False) else 0.0 # Environment features (5) weather = WEATHER_MAP.get(accident_info.get('weather', 'clear'), 0) / 5 road_cond = ROAD_COND_MAP.get(accident_info.get('road_condition', 'dry'), 0) / 4 visibility = accident_info.get('visibility', 1.0) lighting = LIGHTING_MAP.get(accident_info.get('lighting', 'daylight'), 0) / 5 road_type = ROAD_TYPE_MAP.get(accident_info.get('road_type', 'roundabout'), 0) / 8 # Derived features (12) angle1 = DIRECTION_MAP.get(vehicle_1.get('direction', 'north'), 0) * 45 angle2 = DIRECTION_MAP.get(vehicle_2.get('direction', 'east'), 2) * 45 collision_angle = abs(angle1 - angle2) if collision_angle > 180: collision_angle = 360 - collision_angle collision_angle_norm = collision_angle / 180 speed1 = vehicle_1.get('speed', 50) speed2 = vehicle_2.get('speed', 50) speed_diff = abs(speed1 - speed2) / 200 combined_speed = (speed1 + speed2) / 400 same_direction = 1.0 if vehicle_1.get('direction') == vehicle_2.get('direction') else 0.0 speed_product = (speed1 * speed2) / 40000 weather_risk = [0.1, 0.2, 0.5, 0.7, 0.8][WEATHER_MAP.get(accident_info.get('weather', 'clear'), 0)] road_risk = [0.1, 0.5, 0.6, 0.8][ROAD_COND_MAP.get(accident_info.get('road_condition', 'dry'), 0)] base_risk = (weather_risk + road_risk) / 2 action_risk = { 'going_straight': 0.3, 'turning_left': 0.5, 'turning_right': 0.4, 'entering_roundabout': 0.6, 'exiting_roundabout': 0.5, 'changing_lane_left': 0.7, 'changing_lane_right': 0.7, 'slowing_down': 0.4, 'accelerating': 0.6, 'stopped': 0.2 } v1_action_risk = action_risk.get(vehicle_1.get('action', 'going_straight'), 0.5) v2_action_risk = action_risk.get(vehicle_2.get('action', 'going_straight'), 0.5) relative_speed = (speed1 + speed2) / 400 if collision_angle > 90 else abs(speed1 - speed2) / 200 approach_rate = min(relative_speed * (1 + base_risk), 1.0) time_factor = 0.5 # Default noon # Build feature vector (31 features) features = np.array([ # Vehicle 1 (7) v1_type, v1_speed, v1_dir, v1_angle, v1_action, v1_braking, v1_signaling, # Vehicle 2 (7) v2_type, v2_speed, v2_dir, v2_angle, v2_action, v2_braking, v2_signaling, # Environment (5) weather, road_cond, visibility, lighting, road_type, # Derived (12) collision_angle_norm, speed_diff, combined_speed, same_direction, speed_product, collision_angle_norm, time_factor, base_risk, v1_action_risk, v2_action_risk, relative_speed, approach_rate ], dtype=np.float32) return features # ============================================================ # MODEL MANAGER CLASS # ============================================================ class AccidentModelManager: """ Manages loading and inference for accident prediction model. Automatically uses MindSpore if available, otherwise NumPy fallback. """ def __init__(self, model_dir: str = None): self.model_dir = Path(model_dir) if model_dir else Path(__file__).parent / "trained" self.model = None self.metadata = None self.backend = None self._loaded = False def load(self, ckpt_path: str = None, npz_path: str = None): """Load the model from checkpoint.""" # Try MindSpore first if MINDSPORE_AVAILABLE and ckpt_path: ckpt_file = Path(ckpt_path) if ckpt_file.exists(): try: context.set_context(mode=context.GRAPH_MODE, device_target="CPU") self.model = AccidentClassifier(input_dim=31, num_classes=7) param_dict = load_checkpoint(str(ckpt_file)) load_param_into_net(self.model, param_dict) self.model.set_train(False) self.backend = "MindSpore" self._loaded = True print(f"✅ MindSpore model loaded from {ckpt_file}") return True except Exception as e: print(f"⚠️ Failed to load MindSpore model: {e}") # Fallback to NumPy if npz_path: npz_file = Path(npz_path) if npz_file.exists(): try: self.model = NumpyAccidentModel(input_dim=31, num_classes=7) self.model.load(str(npz_file)) self.backend = "NumPy" self._loaded = True return True except Exception as e: print(f"⚠️ Failed to load NumPy model: {e}") # Try default paths default_ckpt = self.model_dir / "best_accident_model.ckpt" default_npz = self.model_dir / "accident_model.npz" if MINDSPORE_AVAILABLE and default_ckpt.exists(): return self.load(ckpt_path=str(default_ckpt)) elif default_npz.exists(): return self.load(npz_path=str(default_npz)) print("⚠️ No model found. Using untrained model.") self.model = NumpyAccidentModel(input_dim=31, num_classes=7) self.backend = "NumPy (untrained)" self._loaded = True return False def load_metadata(self, metadata_path: str = None): """Load model metadata.""" if metadata_path: meta_file = Path(metadata_path) else: meta_file = self.model_dir / "model_metadata.json" if meta_file.exists(): with open(meta_file, 'r') as f: self.metadata = json.load(f) print(f"✅ Metadata loaded from {meta_file}") return self.metadata return None def predict(self, accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> Dict: """ Predict accident type from input data. Returns: Dict with predicted_class, class_name, probabilities, confidence """ if not self._loaded: self.load() # Extract features features = extract_features(accident_info, vehicle_1, vehicle_2) # Get prediction if self.backend == "MindSpore" and MINDSPORE_AVAILABLE: self.model.set_train(False) x = Tensor(features.reshape(1, -1), ms.float32) logits = self.model(x) softmax = ops.Softmax(axis=1) probs = softmax(logits)[0].asnumpy() pred_class = int(np.argmax(probs)) else: pred_class, probs = self.model.predict(features) pred_class = int(pred_class[0]) if isinstance(pred_class, np.ndarray) else int(pred_class) probs = probs[0] if probs.ndim > 1 else probs # Build result result = { 'predicted_class': pred_class, 'class_name': ACCIDENT_NAMES[pred_class], 'confidence': float(probs[pred_class]), 'probabilities': { ACCIDENT_NAMES[i]: float(probs[i]) for i in range(len(probs)) }, 'backend': self.backend } return result def get_all_predictions(self, accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> list: """Get all accident types sorted by probability.""" result = self.predict(accident_info, vehicle_1, vehicle_2) sorted_predictions = sorted( result['probabilities'].items(), key=lambda x: x[1], reverse=True ) return [ {'type': name, 'probability': prob} for name, prob in sorted_predictions ] # ============================================================ # GLOBAL MODEL INSTANCE # ============================================================ _model_manager = None def get_model_manager(model_dir: str = None) -> AccidentModelManager: """Get or create global model manager instance.""" global _model_manager if _model_manager is None: _model_manager = AccidentModelManager(model_dir) return _model_manager def predict_accident(accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> Dict: """Convenience function for prediction.""" manager = get_model_manager() if not manager._loaded: manager.load() return manager.predict(accident_info, vehicle_1, vehicle_2) # ============================================================ # TEST # ============================================================ if __name__ == "__main__": print("\n" + "="*60) print("🧪 Testing MindSpore Model Loader") print("="*60) # Create manager manager = AccidentModelManager() manager.load() # Test prediction test_accident = { 'weather': 'clear', 'road_condition': 'dry', 'visibility': 1.0, 'lighting': 'daylight', 'road_type': 'roundabout' } test_v1 = { 'type': 'sedan', 'speed': 45, 'direction': 'north', 'action': 'going_straight', 'braking': False, 'signaling': False } test_v2 = { 'type': 'suv', 'speed': 55, 'direction': 'east', 'action': 'entering_roundabout', 'braking': False, 'signaling': True } result = manager.predict(test_accident, test_v1, test_v2) print(f"\n🔮 Prediction Result:") print(f" Backend: {result['backend']}") print(f" Predicted: {result['class_name']}") print(f" Confidence: {result['confidence']*100:.1f}%") print(f"\n📊 All Probabilities:") for name, prob in sorted(result['probabilities'].items(), key=lambda x: -x[1]): bar = '█' * int(prob * 20) print(f" {name:30}: {prob*100:5.1f}% {bar}")