# Core dependencies (always available) import numpy as np import pickle import re from pathlib import Path import joblib import logging # Training dependencies (only imported when needed) try: import pandas as pd from sklearn.model_selection import train_test_split from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import LabelEncoder from sklearn.metrics import classification_report, accuracy_score from sklearn.pipeline import Pipeline TRAINING_DEPENDENCIES_AVAILABLE = True except ImportError: TRAINING_DEPENDENCIES_AVAILABLE = False # These will be None if training dependencies are not available pd = None train_test_split = None TfidfVectorizer = None RandomForestClassifier = None LabelEncoder = None classification_report = None accuracy_score = None Pipeline = None logger = logging.getLogger(__name__) # Get the directory where this file is located BASE_DIR = Path(__file__).resolve().parent.parent.parent MODEL_DIR = BASE_DIR / "models" MODEL_DIR.mkdir(exist_ok=True) class IncidentClassifier: def __init__(self): self.threat_model = None self.severity_model = None self.threat_encoder = None self.severity_encoder = None self.is_trained = False # Try to load pre-trained models automatically try: if self.load_models(): logger.info("Pre-trained models loaded successfully") else: logger.warning("No pre-trained models found. Classification will use fallback rules.") except Exception as e: logger.warning(f"Failed to load models on initialization: {e}") def preprocess_text(self, text): """Clean and preprocess text data""" if text is None or (pd and pd.isna(text)): return "" # Convert to lowercase text = str(text).lower() # Remove special characters but keep spaces text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text) # Remove extra whitespaces text = re.sub(r'\s+', ' ', text).strip() return text def create_severity_labels(self, df): """Create severity labels based on description content and threat type""" severity_labels = [] for _, row in df.iterrows(): description = str(row['description']).lower() threat = row['threat'] # High severity indicators high_indicators = [ 'major', 'massive', 'large scale', 'explosion', 'fire', 'fatality', 'death', 'significant', 'extensive', 'severe', 'critical', 'emergency', 'disaster', 'toxic', 'hazardous', 'dangerous', 'thousands', 'gallons', 'barrels', 'tons' ] # Medium severity indicators medium_indicators = [ 'moderate', 'contained', 'limited', 'minor leak', 'small spill', 'hundreds', 'investigation', 'response', 'cleanup' ] # Low severity indicators low_indicators = [ 'minor', 'small', 'trace', 'minimal', 'observation', 'potential', 'suspected', 'no injuries', 'no damage', 'monitoring' ] # Count indicators high_count = sum(1 for indicator in high_indicators if indicator in description) medium_count = sum(1 for indicator in medium_indicators if indicator in description) low_count = sum(1 for indicator in low_indicators if indicator in description) # Classify based on threat type and indicators if threat == 'Chemical' or high_count >= 2: severity = 'high' elif threat == 'Oil' and (high_count >= 1 or medium_count >= 2): severity = 'medium' elif low_count >= 2 or 'minor' in description: severity = 'low' elif high_count >= 1: severity = 'high' elif medium_count >= 1: severity = 'medium' else: severity = 'low' severity_labels.append(severity) return severity_labels def train_models(self, csv_path=None): """Train both threat classification and severity assessment models""" if not TRAINING_DEPENDENCIES_AVAILABLE: logger.error("Training dependencies (pandas, scikit-learn) not available. Install with: pip install -r requirements-training.txt") raise ImportError("Training dependencies not available. This method requires pandas and scikit-learn.") try: if csv_path is None: csv_path = BASE_DIR / "incidents.csv" logger.info(f"Loading dataset from {csv_path}") df = pd.read_csv(csv_path) # Clean the data df = df.dropna(subset=['description', 'threat']) # Combine name and description for features df['combined_text'] = df['name'].fillna('') + ' ' + df['description'].fillna('') df['combined_text'] = df['combined_text'].apply(self.preprocess_text) # Create severity labels df['severity'] = self.create_severity_labels(df) # Prepare features X = df['combined_text'] y_threat = df['threat'] y_severity = df['severity'] # Split the data X_train, X_test, y_threat_train, y_threat_test, y_severity_train, y_severity_test = train_test_split( X, y_threat, y_severity, test_size=0.2, random_state=42, stratify=y_threat ) # Train threat classification model logger.info("Training threat classification model...") self.threat_model = Pipeline([ ('tfidf', TfidfVectorizer(max_features=5000, stop_words='english', ngram_range=(1, 2))), ('classifier', RandomForestClassifier(n_estimators=100, random_state=42)) ]) self.threat_model.fit(X_train, y_threat_train) # Train severity assessment model logger.info("Training severity assessment model...") self.severity_model = Pipeline([ ('tfidf', TfidfVectorizer(max_features=5000, stop_words='english', ngram_range=(1, 2))), ('classifier', RandomForestClassifier(n_estimators=100, random_state=42)) ]) self.severity_model.fit(X_train, y_severity_train) # Evaluate models threat_pred = self.threat_model.predict(X_test) severity_pred = self.severity_model.predict(X_test) logger.info("Threat Classification Results:") logger.info(f"Accuracy: {accuracy_score(y_threat_test, threat_pred):.3f}") logger.info("\n" + classification_report(y_threat_test, threat_pred)) logger.info("Severity Assessment Results:") logger.info(f"Accuracy: {accuracy_score(y_severity_test, severity_pred):.3f}") logger.info("\n" + classification_report(y_severity_test, severity_pred)) # Save models self.save_models() self.is_trained = True logger.info("Models trained and saved successfully!") return { 'threat_accuracy': accuracy_score(y_threat_test, threat_pred), 'severity_accuracy': accuracy_score(y_severity_test, severity_pred), 'threat_distribution': df['threat'].value_counts().to_dict(), 'severity_distribution': df['severity'].value_counts().to_dict() } except Exception as e: logger.error(f"Error training models: {e}") raise def save_models(self): """Save trained models to disk""" try: joblib.dump(self.threat_model, MODEL_DIR / "threat_model.pkl") joblib.dump(self.severity_model, MODEL_DIR / "severity_model.pkl") logger.info("Models saved successfully") except Exception as e: logger.error(f"Error saving models: {e}") raise def load_models(self): """Load trained models from disk""" try: threat_model_path = MODEL_DIR / "threat_model.pkl" severity_model_path = MODEL_DIR / "severity_model.pkl" if threat_model_path.exists() and severity_model_path.exists(): self.threat_model = joblib.load(threat_model_path) self.severity_model = joblib.load(severity_model_path) self.is_trained = True logger.info("Models loaded successfully") return True else: logger.warning("Model files not found") return False except Exception as e: logger.error(f"Error loading models: {e}") return False def predict(self, description, name=""): """Predict threat type and severity for a given incident description""" if not self.is_trained: if not self.load_models(): raise ValueError("Models not trained or loaded") # Preprocess input combined_text = self.preprocess_text(f"{name} {description}") # Make predictions threat_pred = self.threat_model.predict([combined_text])[0] severity_pred = self.severity_model.predict([combined_text])[0] # Get prediction probabilities for confidence scores threat_proba = self.threat_model.predict_proba([combined_text])[0] severity_proba = self.severity_model.predict_proba([combined_text])[0] threat_confidence = max(threat_proba) severity_confidence = max(severity_proba) return { 'threat': threat_pred, 'severity': severity_pred, 'threat_confidence': float(threat_confidence), 'severity_confidence': float(severity_confidence) } # Global instance incident_classifier = IncidentClassifier() def get_classifier(): """Get the global classifier instance""" return incident_classifier def train_models(): """Train the models using the incidents dataset""" if not TRAINING_DEPENDENCIES_AVAILABLE: logger.error("Training dependencies not available. Models should be pre-trained for deployment.") return False classifier = get_classifier() return classifier.train_models() def predict_incident(description, name=""): """Predict threat and severity for an incident""" classifier = get_classifier() return classifier.predict(description, name)