car_web2 / models /mindspore_loader.py
wesam0099's picture
Back to original working version
1512569
"""
MindSpore Model Loader
======================
Loads the trained MindSpore model for accident prediction.
Supports both MindSpore (.ckpt) and NumPy fallback (.npz).
"""
import numpy as np
import json
from pathlib import Path
from typing import Dict, Tuple, Optional
# Try to import MindSpore
MINDSPORE_AVAILABLE = False
try:
import mindspore as ms
import mindspore.nn as nn
from mindspore import Tensor, context, load_checkpoint, load_param_into_net
import mindspore.ops as ops
MINDSPORE_AVAILABLE = True
print(f"โœ… MindSpore {ms.__version__} available")
except ImportError:
print("โš ๏ธ MindSpore not available, using NumPy fallback")
# ============================================================
# ENCODING MAPS (must match training)
# ============================================================
DIRECTION_MAP = {
'north': 0, 'northeast': 1, 'east': 2, 'southeast': 3,
'south': 4, 'southwest': 5, 'west': 6, 'northwest': 7
}
ACTION_MAP = {
'going_straight': 0, 'turning_left': 1, 'turning_right': 2,
'entering_roundabout': 3, 'exiting_roundabout': 4,
'changing_lane_left': 5, 'changing_lane_right': 6,
'slowing_down': 7, 'accelerating': 8, 'stopped': 9
}
VEHICLE_MAP = {'sedan': 0, 'suv': 1, 'truck': 2, 'motorcycle': 3, 'bus': 4}
WEATHER_MAP = {'clear': 0, 'cloudy': 1, 'rainy': 2, 'foggy': 3, 'sandstorm': 4}
ROAD_COND_MAP = {'dry': 0, 'wet': 1, 'sandy': 2, 'oily': 3}
ROAD_TYPE_MAP = {
'roundabout': 0, 'crossroad': 1, 't_junction': 2, 'highway_merge': 3,
'parking': 4, 'highway': 5, 'urban_road': 6, 'other': 7
}
LIGHTING_MAP = {'daylight': 0, 'dusk': 1, 'dawn': 2, 'night_lit': 3, 'night_dark': 4}
ACCIDENT_MAP = {
'rear_end_collision': 0,
'side_impact': 1,
'head_on_collision': 2,
'sideswipe': 3,
'roundabout_entry_collision': 4,
'lane_change_collision': 5,
'intersection_collision': 6
}
ACCIDENT_NAMES = {v: k for k, v in ACCIDENT_MAP.items()}
# ============================================================
# MINDSPORE MODEL DEFINITION
# ============================================================
if MINDSPORE_AVAILABLE:
class AccidentClassifier(nn.Cell):
"""
MindSpore Neural Network for Traffic Accident Classification.
Architecture: 31 โ†’ 128 โ†’ 64 โ†’ 32 โ†’ 7
"""
def __init__(self, input_dim=31, num_classes=7):
super(AccidentClassifier, self).__init__()
# Layer 1: Input โ†’ 128
self.fc1 = nn.Dense(input_dim, 128)
self.bn1 = nn.BatchNorm1d(128)
self.relu1 = nn.ReLU()
self.dropout1 = nn.Dropout(p=0.3)
# Layer 2: 128 โ†’ 64
self.fc2 = nn.Dense(128, 64)
self.bn2 = nn.BatchNorm1d(64)
self.relu2 = nn.ReLU()
self.dropout2 = nn.Dropout(p=0.3)
# Layer 3: 64 โ†’ 32
self.fc3 = nn.Dense(64, 32)
self.bn3 = nn.BatchNorm1d(32)
self.relu3 = nn.ReLU()
self.dropout3 = nn.Dropout(p=0.2)
# Output: 32 โ†’ 7
self.fc4 = nn.Dense(32, num_classes)
def construct(self, x):
x = self.fc1(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.dropout1(x)
x = self.fc2(x)
x = self.bn2(x)
x = self.relu2(x)
x = self.dropout2(x)
x = self.fc3(x)
x = self.bn3(x)
x = self.relu3(x)
x = self.dropout3(x)
x = self.fc4(x)
return x
# ============================================================
# NUMPY FALLBACK MODEL
# ============================================================
class NumpyAccidentModel:
"""NumPy fallback when MindSpore is not available."""
def __init__(self, input_dim=31, num_classes=7):
self.input_dim = input_dim
self.num_classes = num_classes
self.trained = False
# Initialize weights
np.random.seed(42)
self.W1 = np.random.randn(input_dim, 128) * np.sqrt(2.0 / input_dim)
self.b1 = np.zeros(128)
self.W2 = np.random.randn(128, 64) * np.sqrt(2.0 / 128)
self.b2 = np.zeros(64)
self.W3 = np.random.randn(64, 32) * np.sqrt(2.0 / 64)
self.b3 = np.zeros(32)
self.W4 = np.random.randn(32, num_classes) * np.sqrt(2.0 / 32)
self.b4 = np.zeros(num_classes)
def relu(self, x):
return np.maximum(0, x)
def softmax(self, x):
exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
return exp_x / np.sum(exp_x, axis=-1, keepdims=True)
def forward(self, x):
x = self.relu(x @ self.W1 + self.b1)
x = self.relu(x @ self.W2 + self.b2)
x = self.relu(x @ self.W3 + self.b3)
x = x @ self.W4 + self.b4
return self.softmax(x)
def predict(self, x):
if x.ndim == 1:
x = x.reshape(1, -1)
probs = self.forward(x)
classes = np.argmax(probs, axis=1)
return classes, probs
def load(self, filepath):
data = np.load(filepath)
self.W1 = data['W1']
self.b1 = data['b1']
self.W2 = data['W2']
self.b2 = data['b2']
self.W3 = data['W3']
self.b3 = data['b3']
self.W4 = data.get('W4', data.get('weight_4', np.random.randn(32, 7) * 0.1))
self.b4 = data.get('b4', data.get('bias_4', np.zeros(7)))
self.trained = True
print(f"โœ… NumPy model loaded from {filepath}")
# ============================================================
# FEATURE EXTRACTION
# ============================================================
def extract_features(accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> np.ndarray:
"""
Extract 31 features from accident data.
Must match the training feature extraction exactly!
"""
# Vehicle 1 features (7)
v1_type = VEHICLE_MAP.get(vehicle_1.get('type', 'sedan'), 0) / 5
v1_speed = vehicle_1.get('speed', 50) / 200
v1_dir = DIRECTION_MAP.get(vehicle_1.get('direction', 'north'), 0) / 8
v1_angle = v1_dir * 45 / 360
v1_action = ACTION_MAP.get(vehicle_1.get('action', 'going_straight'), 0) / 10
v1_braking = 1.0 if vehicle_1.get('braking', False) else 0.0
v1_signaling = 1.0 if vehicle_1.get('signaling', False) else 0.0
# Vehicle 2 features (7)
v2_type = VEHICLE_MAP.get(vehicle_2.get('type', 'sedan'), 0) / 5
v2_speed = vehicle_2.get('speed', 50) / 200
v2_dir = DIRECTION_MAP.get(vehicle_2.get('direction', 'east'), 2) / 8
v2_angle = v2_dir * 45 / 360
v2_action = ACTION_MAP.get(vehicle_2.get('action', 'going_straight'), 0) / 10
v2_braking = 1.0 if vehicle_2.get('braking', False) else 0.0
v2_signaling = 1.0 if vehicle_2.get('signaling', False) else 0.0
# Environment features (5)
weather = WEATHER_MAP.get(accident_info.get('weather', 'clear'), 0) / 5
road_cond = ROAD_COND_MAP.get(accident_info.get('road_condition', 'dry'), 0) / 4
visibility = accident_info.get('visibility', 1.0)
lighting = LIGHTING_MAP.get(accident_info.get('lighting', 'daylight'), 0) / 5
road_type = ROAD_TYPE_MAP.get(accident_info.get('road_type', 'roundabout'), 0) / 8
# Derived features (12)
angle1 = DIRECTION_MAP.get(vehicle_1.get('direction', 'north'), 0) * 45
angle2 = DIRECTION_MAP.get(vehicle_2.get('direction', 'east'), 2) * 45
collision_angle = abs(angle1 - angle2)
if collision_angle > 180:
collision_angle = 360 - collision_angle
collision_angle_norm = collision_angle / 180
speed1 = vehicle_1.get('speed', 50)
speed2 = vehicle_2.get('speed', 50)
speed_diff = abs(speed1 - speed2) / 200
combined_speed = (speed1 + speed2) / 400
same_direction = 1.0 if vehicle_1.get('direction') == vehicle_2.get('direction') else 0.0
speed_product = (speed1 * speed2) / 40000
weather_risk = [0.1, 0.2, 0.5, 0.7, 0.8][WEATHER_MAP.get(accident_info.get('weather', 'clear'), 0)]
road_risk = [0.1, 0.5, 0.6, 0.8][ROAD_COND_MAP.get(accident_info.get('road_condition', 'dry'), 0)]
base_risk = (weather_risk + road_risk) / 2
action_risk = {
'going_straight': 0.3, 'turning_left': 0.5, 'turning_right': 0.4,
'entering_roundabout': 0.6, 'exiting_roundabout': 0.5,
'changing_lane_left': 0.7, 'changing_lane_right': 0.7,
'slowing_down': 0.4, 'accelerating': 0.6, 'stopped': 0.2
}
v1_action_risk = action_risk.get(vehicle_1.get('action', 'going_straight'), 0.5)
v2_action_risk = action_risk.get(vehicle_2.get('action', 'going_straight'), 0.5)
relative_speed = (speed1 + speed2) / 400 if collision_angle > 90 else abs(speed1 - speed2) / 200
approach_rate = min(relative_speed * (1 + base_risk), 1.0)
time_factor = 0.5 # Default noon
# Build feature vector (31 features)
features = np.array([
# Vehicle 1 (7)
v1_type, v1_speed, v1_dir, v1_angle, v1_action, v1_braking, v1_signaling,
# Vehicle 2 (7)
v2_type, v2_speed, v2_dir, v2_angle, v2_action, v2_braking, v2_signaling,
# Environment (5)
weather, road_cond, visibility, lighting, road_type,
# Derived (12)
collision_angle_norm, speed_diff, combined_speed, same_direction,
speed_product, collision_angle_norm, time_factor, base_risk,
v1_action_risk, v2_action_risk, relative_speed, approach_rate
], dtype=np.float32)
return features
# ============================================================
# MODEL MANAGER CLASS
# ============================================================
class AccidentModelManager:
"""
Manages loading and inference for accident prediction model.
Automatically uses MindSpore if available, otherwise NumPy fallback.
"""
def __init__(self, model_dir: str = None):
self.model_dir = Path(model_dir) if model_dir else Path(__file__).parent / "trained"
self.model = None
self.metadata = None
self.backend = None
self._loaded = False
def load(self, ckpt_path: str = None, npz_path: str = None):
"""Load the model from checkpoint."""
# Try MindSpore first
if MINDSPORE_AVAILABLE and ckpt_path:
ckpt_file = Path(ckpt_path)
if ckpt_file.exists():
try:
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
self.model = AccidentClassifier(input_dim=31, num_classes=7)
param_dict = load_checkpoint(str(ckpt_file))
load_param_into_net(self.model, param_dict)
self.model.set_train(False)
self.backend = "MindSpore"
self._loaded = True
print(f"โœ… MindSpore model loaded from {ckpt_file}")
return True
except Exception as e:
print(f"โš ๏ธ Failed to load MindSpore model: {e}")
# Fallback to NumPy
if npz_path:
npz_file = Path(npz_path)
if npz_file.exists():
try:
self.model = NumpyAccidentModel(input_dim=31, num_classes=7)
self.model.load(str(npz_file))
self.backend = "NumPy"
self._loaded = True
return True
except Exception as e:
print(f"โš ๏ธ Failed to load NumPy model: {e}")
# Try default paths
default_ckpt = self.model_dir / "best_accident_model.ckpt"
default_npz = self.model_dir / "accident_model.npz"
if MINDSPORE_AVAILABLE and default_ckpt.exists():
return self.load(ckpt_path=str(default_ckpt))
elif default_npz.exists():
return self.load(npz_path=str(default_npz))
print("โš ๏ธ No model found. Using untrained model.")
self.model = NumpyAccidentModel(input_dim=31, num_classes=7)
self.backend = "NumPy (untrained)"
self._loaded = True
return False
def load_metadata(self, metadata_path: str = None):
"""Load model metadata."""
if metadata_path:
meta_file = Path(metadata_path)
else:
meta_file = self.model_dir / "model_metadata.json"
if meta_file.exists():
with open(meta_file, 'r') as f:
self.metadata = json.load(f)
print(f"โœ… Metadata loaded from {meta_file}")
return self.metadata
return None
def predict(self, accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> Dict:
"""
Predict accident type from input data.
Returns:
Dict with predicted_class, class_name, probabilities, confidence
"""
if not self._loaded:
self.load()
# Extract features
features = extract_features(accident_info, vehicle_1, vehicle_2)
# Get prediction
if self.backend == "MindSpore" and MINDSPORE_AVAILABLE:
self.model.set_train(False)
x = Tensor(features.reshape(1, -1), ms.float32)
logits = self.model(x)
softmax = ops.Softmax(axis=1)
probs = softmax(logits)[0].asnumpy()
pred_class = int(np.argmax(probs))
else:
pred_class, probs = self.model.predict(features)
pred_class = int(pred_class[0]) if isinstance(pred_class, np.ndarray) else int(pred_class)
probs = probs[0] if probs.ndim > 1 else probs
# Build result
result = {
'predicted_class': pred_class,
'class_name': ACCIDENT_NAMES[pred_class],
'confidence': float(probs[pred_class]),
'probabilities': {
ACCIDENT_NAMES[i]: float(probs[i])
for i in range(len(probs))
},
'backend': self.backend
}
return result
def get_all_predictions(self, accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> list:
"""Get all accident types sorted by probability."""
result = self.predict(accident_info, vehicle_1, vehicle_2)
sorted_predictions = sorted(
result['probabilities'].items(),
key=lambda x: x[1],
reverse=True
)
return [
{'type': name, 'probability': prob}
for name, prob in sorted_predictions
]
# ============================================================
# GLOBAL MODEL INSTANCE
# ============================================================
_model_manager = None
def get_model_manager(model_dir: str = None) -> AccidentModelManager:
"""Get or create global model manager instance."""
global _model_manager
if _model_manager is None:
_model_manager = AccidentModelManager(model_dir)
return _model_manager
def predict_accident(accident_info: Dict, vehicle_1: Dict, vehicle_2: Dict) -> Dict:
"""Convenience function for prediction."""
manager = get_model_manager()
if not manager._loaded:
manager.load()
return manager.predict(accident_info, vehicle_1, vehicle_2)
# ============================================================
# TEST
# ============================================================
if __name__ == "__main__":
print("\n" + "="*60)
print("๐Ÿงช Testing MindSpore Model Loader")
print("="*60)
# Create manager
manager = AccidentModelManager()
manager.load()
# Test prediction
test_accident = {
'weather': 'clear',
'road_condition': 'dry',
'visibility': 1.0,
'lighting': 'daylight',
'road_type': 'roundabout'
}
test_v1 = {
'type': 'sedan',
'speed': 45,
'direction': 'north',
'action': 'going_straight',
'braking': False,
'signaling': False
}
test_v2 = {
'type': 'suv',
'speed': 55,
'direction': 'east',
'action': 'entering_roundabout',
'braking': False,
'signaling': True
}
result = manager.predict(test_accident, test_v1, test_v2)
print(f"\n๐Ÿ”ฎ Prediction Result:")
print(f" Backend: {result['backend']}")
print(f" Predicted: {result['class_name']}")
print(f" Confidence: {result['confidence']*100:.1f}%")
print(f"\n๐Ÿ“Š All Probabilities:")
for name, prob in sorted(result['probabilities'].items(), key=lambda x: -x[1]):
bar = 'โ–ˆ' * int(prob * 20)
print(f" {name:30}: {prob*100:5.1f}% {bar}")