Sanjay / app /services /ml_model_training.py
TheDeepDas's picture
Docker
6c9c901
# Core dependencies (always available)
import numpy as np
import pickle
import re
from pathlib import Path
import joblib
import logging
# Training dependencies (only imported when needed)
try:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, accuracy_score
from sklearn.pipeline import Pipeline
TRAINING_DEPENDENCIES_AVAILABLE = True
except ImportError:
TRAINING_DEPENDENCIES_AVAILABLE = False
# These will be None if training dependencies are not available
pd = None
train_test_split = None
TfidfVectorizer = None
RandomForestClassifier = None
LabelEncoder = None
classification_report = None
accuracy_score = None
Pipeline = None
logger = logging.getLogger(__name__)
# Get the directory where this file is located
BASE_DIR = Path(__file__).resolve().parent.parent.parent
MODEL_DIR = BASE_DIR / "models"
MODEL_DIR.mkdir(exist_ok=True)
class IncidentClassifier:
def __init__(self):
self.threat_model = None
self.severity_model = None
self.threat_encoder = None
self.severity_encoder = None
self.is_trained = False
# Try to load pre-trained models automatically
try:
if self.load_models():
logger.info("Pre-trained models loaded successfully")
else:
logger.warning("No pre-trained models found. Classification will use fallback rules.")
except Exception as e:
logger.warning(f"Failed to load models on initialization: {e}")
def preprocess_text(self, text):
"""Clean and preprocess text data"""
if text is None or (pd and pd.isna(text)):
return ""
# Convert to lowercase
text = str(text).lower()
# Remove special characters but keep spaces
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
# Remove extra whitespaces
text = re.sub(r'\s+', ' ', text).strip()
return text
def create_severity_labels(self, df):
"""Create severity labels based on description content and threat type"""
severity_labels = []
for _, row in df.iterrows():
description = str(row['description']).lower()
threat = row['threat']
# High severity indicators
high_indicators = [
'major', 'massive', 'large scale', 'explosion', 'fire', 'fatality',
'death', 'significant', 'extensive', 'severe', 'critical',
'emergency', 'disaster', 'toxic', 'hazardous', 'dangerous',
'thousands', 'gallons', 'barrels', 'tons'
]
# Medium severity indicators
medium_indicators = [
'moderate', 'contained', 'limited', 'minor leak', 'small spill',
'hundreds', 'investigation', 'response', 'cleanup'
]
# Low severity indicators
low_indicators = [
'minor', 'small', 'trace', 'minimal', 'observation', 'potential',
'suspected', 'no injuries', 'no damage', 'monitoring'
]
# Count indicators
high_count = sum(1 for indicator in high_indicators if indicator in description)
medium_count = sum(1 for indicator in medium_indicators if indicator in description)
low_count = sum(1 for indicator in low_indicators if indicator in description)
# Classify based on threat type and indicators
if threat == 'Chemical' or high_count >= 2:
severity = 'high'
elif threat == 'Oil' and (high_count >= 1 or medium_count >= 2):
severity = 'medium'
elif low_count >= 2 or 'minor' in description:
severity = 'low'
elif high_count >= 1:
severity = 'high'
elif medium_count >= 1:
severity = 'medium'
else:
severity = 'low'
severity_labels.append(severity)
return severity_labels
def train_models(self, csv_path=None):
"""Train both threat classification and severity assessment models"""
if not TRAINING_DEPENDENCIES_AVAILABLE:
logger.error("Training dependencies (pandas, scikit-learn) not available. Install with: pip install -r requirements-training.txt")
raise ImportError("Training dependencies not available. This method requires pandas and scikit-learn.")
try:
if csv_path is None:
csv_path = BASE_DIR / "incidents.csv"
logger.info(f"Loading dataset from {csv_path}")
df = pd.read_csv(csv_path)
# Clean the data
df = df.dropna(subset=['description', 'threat'])
# Combine name and description for features
df['combined_text'] = df['name'].fillna('') + ' ' + df['description'].fillna('')
df['combined_text'] = df['combined_text'].apply(self.preprocess_text)
# Create severity labels
df['severity'] = self.create_severity_labels(df)
# Prepare features
X = df['combined_text']
y_threat = df['threat']
y_severity = df['severity']
# Split the data
X_train, X_test, y_threat_train, y_threat_test, y_severity_train, y_severity_test = train_test_split(
X, y_threat, y_severity, test_size=0.2, random_state=42, stratify=y_threat
)
# Train threat classification model
logger.info("Training threat classification model...")
self.threat_model = Pipeline([
('tfidf', TfidfVectorizer(max_features=5000, stop_words='english', ngram_range=(1, 2))),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
self.threat_model.fit(X_train, y_threat_train)
# Train severity assessment model
logger.info("Training severity assessment model...")
self.severity_model = Pipeline([
('tfidf', TfidfVectorizer(max_features=5000, stop_words='english', ngram_range=(1, 2))),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
self.severity_model.fit(X_train, y_severity_train)
# Evaluate models
threat_pred = self.threat_model.predict(X_test)
severity_pred = self.severity_model.predict(X_test)
logger.info("Threat Classification Results:")
logger.info(f"Accuracy: {accuracy_score(y_threat_test, threat_pred):.3f}")
logger.info("\n" + classification_report(y_threat_test, threat_pred))
logger.info("Severity Assessment Results:")
logger.info(f"Accuracy: {accuracy_score(y_severity_test, severity_pred):.3f}")
logger.info("\n" + classification_report(y_severity_test, severity_pred))
# Save models
self.save_models()
self.is_trained = True
logger.info("Models trained and saved successfully!")
return {
'threat_accuracy': accuracy_score(y_threat_test, threat_pred),
'severity_accuracy': accuracy_score(y_severity_test, severity_pred),
'threat_distribution': df['threat'].value_counts().to_dict(),
'severity_distribution': df['severity'].value_counts().to_dict()
}
except Exception as e:
logger.error(f"Error training models: {e}")
raise
def save_models(self):
"""Save trained models to disk"""
try:
joblib.dump(self.threat_model, MODEL_DIR / "threat_model.pkl")
joblib.dump(self.severity_model, MODEL_DIR / "severity_model.pkl")
logger.info("Models saved successfully")
except Exception as e:
logger.error(f"Error saving models: {e}")
raise
def load_models(self):
"""Load trained models from disk"""
try:
threat_model_path = MODEL_DIR / "threat_model.pkl"
severity_model_path = MODEL_DIR / "severity_model.pkl"
if threat_model_path.exists() and severity_model_path.exists():
self.threat_model = joblib.load(threat_model_path)
self.severity_model = joblib.load(severity_model_path)
self.is_trained = True
logger.info("Models loaded successfully")
return True
else:
logger.warning("Model files not found")
return False
except Exception as e:
logger.error(f"Error loading models: {e}")
return False
def predict(self, description, name=""):
"""Predict threat type and severity for a given incident description"""
if not self.is_trained:
if not self.load_models():
raise ValueError("Models not trained or loaded")
# Preprocess input
combined_text = self.preprocess_text(f"{name} {description}")
# Make predictions
threat_pred = self.threat_model.predict([combined_text])[0]
severity_pred = self.severity_model.predict([combined_text])[0]
# Get prediction probabilities for confidence scores
threat_proba = self.threat_model.predict_proba([combined_text])[0]
severity_proba = self.severity_model.predict_proba([combined_text])[0]
threat_confidence = max(threat_proba)
severity_confidence = max(severity_proba)
return {
'threat': threat_pred,
'severity': severity_pred,
'threat_confidence': float(threat_confidence),
'severity_confidence': float(severity_confidence)
}
# Global instance
incident_classifier = IncidentClassifier()
def get_classifier():
"""Get the global classifier instance"""
return incident_classifier
def train_models():
"""Train the models using the incidents dataset"""
if not TRAINING_DEPENDENCIES_AVAILABLE:
logger.error("Training dependencies not available. Models should be pre-trained for deployment.")
return False
classifier = get_classifier()
return classifier.train_models()
def predict_incident(description, name=""):
"""Predict threat and severity for an incident"""
classifier = get_classifier()
return classifier.predict(description, name)