Spaces:
Sleeping
Sleeping
| # Core dependencies (always available) | |
| import numpy as np | |
| import pickle | |
| import re | |
| from pathlib import Path | |
| import joblib | |
| import logging | |
| # Training dependencies (only imported when needed) | |
| try: | |
| import pandas as pd | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.preprocessing import LabelEncoder | |
| from sklearn.metrics import classification_report, accuracy_score | |
| from sklearn.pipeline import Pipeline | |
| TRAINING_DEPENDENCIES_AVAILABLE = True | |
| except ImportError: | |
| TRAINING_DEPENDENCIES_AVAILABLE = False | |
| # These will be None if training dependencies are not available | |
| pd = None | |
| train_test_split = None | |
| TfidfVectorizer = None | |
| RandomForestClassifier = None | |
| LabelEncoder = None | |
| classification_report = None | |
| accuracy_score = None | |
| Pipeline = None | |
| logger = logging.getLogger(__name__) | |
| # Get the directory where this file is located | |
| BASE_DIR = Path(__file__).resolve().parent.parent.parent | |
| MODEL_DIR = BASE_DIR / "models" | |
| MODEL_DIR.mkdir(exist_ok=True) | |
| class IncidentClassifier: | |
| def __init__(self): | |
| self.threat_model = None | |
| self.severity_model = None | |
| self.threat_encoder = None | |
| self.severity_encoder = None | |
| self.is_trained = False | |
| # Try to load pre-trained models automatically | |
| try: | |
| if self.load_models(): | |
| logger.info("Pre-trained models loaded successfully") | |
| else: | |
| logger.warning("No pre-trained models found. Classification will use fallback rules.") | |
| except Exception as e: | |
| logger.warning(f"Failed to load models on initialization: {e}") | |
| def preprocess_text(self, text): | |
| """Clean and preprocess text data""" | |
| if text is None or (pd and pd.isna(text)): | |
| return "" | |
| # Convert to lowercase | |
| text = str(text).lower() | |
| # Remove special characters but keep spaces | |
| text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text) | |
| # Remove extra whitespaces | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def create_severity_labels(self, df): | |
| """Create severity labels based on description content and threat type""" | |
| severity_labels = [] | |
| for _, row in df.iterrows(): | |
| description = str(row['description']).lower() | |
| threat = row['threat'] | |
| # High severity indicators | |
| high_indicators = [ | |
| 'major', 'massive', 'large scale', 'explosion', 'fire', 'fatality', | |
| 'death', 'significant', 'extensive', 'severe', 'critical', | |
| 'emergency', 'disaster', 'toxic', 'hazardous', 'dangerous', | |
| 'thousands', 'gallons', 'barrels', 'tons' | |
| ] | |
| # Medium severity indicators | |
| medium_indicators = [ | |
| 'moderate', 'contained', 'limited', 'minor leak', 'small spill', | |
| 'hundreds', 'investigation', 'response', 'cleanup' | |
| ] | |
| # Low severity indicators | |
| low_indicators = [ | |
| 'minor', 'small', 'trace', 'minimal', 'observation', 'potential', | |
| 'suspected', 'no injuries', 'no damage', 'monitoring' | |
| ] | |
| # Count indicators | |
| high_count = sum(1 for indicator in high_indicators if indicator in description) | |
| medium_count = sum(1 for indicator in medium_indicators if indicator in description) | |
| low_count = sum(1 for indicator in low_indicators if indicator in description) | |
| # Classify based on threat type and indicators | |
| if threat == 'Chemical' or high_count >= 2: | |
| severity = 'high' | |
| elif threat == 'Oil' and (high_count >= 1 or medium_count >= 2): | |
| severity = 'medium' | |
| elif low_count >= 2 or 'minor' in description: | |
| severity = 'low' | |
| elif high_count >= 1: | |
| severity = 'high' | |
| elif medium_count >= 1: | |
| severity = 'medium' | |
| else: | |
| severity = 'low' | |
| severity_labels.append(severity) | |
| return severity_labels | |
| def train_models(self, csv_path=None): | |
| """Train both threat classification and severity assessment models""" | |
| if not TRAINING_DEPENDENCIES_AVAILABLE: | |
| logger.error("Training dependencies (pandas, scikit-learn) not available. Install with: pip install -r requirements-training.txt") | |
| raise ImportError("Training dependencies not available. This method requires pandas and scikit-learn.") | |
| try: | |
| if csv_path is None: | |
| csv_path = BASE_DIR / "incidents.csv" | |
| logger.info(f"Loading dataset from {csv_path}") | |
| df = pd.read_csv(csv_path) | |
| # Clean the data | |
| df = df.dropna(subset=['description', 'threat']) | |
| # Combine name and description for features | |
| df['combined_text'] = df['name'].fillna('') + ' ' + df['description'].fillna('') | |
| df['combined_text'] = df['combined_text'].apply(self.preprocess_text) | |
| # Create severity labels | |
| df['severity'] = self.create_severity_labels(df) | |
| # Prepare features | |
| X = df['combined_text'] | |
| y_threat = df['threat'] | |
| y_severity = df['severity'] | |
| # Split the data | |
| X_train, X_test, y_threat_train, y_threat_test, y_severity_train, y_severity_test = train_test_split( | |
| X, y_threat, y_severity, test_size=0.2, random_state=42, stratify=y_threat | |
| ) | |
| # Train threat classification model | |
| logger.info("Training threat classification model...") | |
| self.threat_model = Pipeline([ | |
| ('tfidf', TfidfVectorizer(max_features=5000, stop_words='english', ngram_range=(1, 2))), | |
| ('classifier', RandomForestClassifier(n_estimators=100, random_state=42)) | |
| ]) | |
| self.threat_model.fit(X_train, y_threat_train) | |
| # Train severity assessment model | |
| logger.info("Training severity assessment model...") | |
| self.severity_model = Pipeline([ | |
| ('tfidf', TfidfVectorizer(max_features=5000, stop_words='english', ngram_range=(1, 2))), | |
| ('classifier', RandomForestClassifier(n_estimators=100, random_state=42)) | |
| ]) | |
| self.severity_model.fit(X_train, y_severity_train) | |
| # Evaluate models | |
| threat_pred = self.threat_model.predict(X_test) | |
| severity_pred = self.severity_model.predict(X_test) | |
| logger.info("Threat Classification Results:") | |
| logger.info(f"Accuracy: {accuracy_score(y_threat_test, threat_pred):.3f}") | |
| logger.info("\n" + classification_report(y_threat_test, threat_pred)) | |
| logger.info("Severity Assessment Results:") | |
| logger.info(f"Accuracy: {accuracy_score(y_severity_test, severity_pred):.3f}") | |
| logger.info("\n" + classification_report(y_severity_test, severity_pred)) | |
| # Save models | |
| self.save_models() | |
| self.is_trained = True | |
| logger.info("Models trained and saved successfully!") | |
| return { | |
| 'threat_accuracy': accuracy_score(y_threat_test, threat_pred), | |
| 'severity_accuracy': accuracy_score(y_severity_test, severity_pred), | |
| 'threat_distribution': df['threat'].value_counts().to_dict(), | |
| 'severity_distribution': df['severity'].value_counts().to_dict() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error training models: {e}") | |
| raise | |
| def save_models(self): | |
| """Save trained models to disk""" | |
| try: | |
| joblib.dump(self.threat_model, MODEL_DIR / "threat_model.pkl") | |
| joblib.dump(self.severity_model, MODEL_DIR / "severity_model.pkl") | |
| logger.info("Models saved successfully") | |
| except Exception as e: | |
| logger.error(f"Error saving models: {e}") | |
| raise | |
| def load_models(self): | |
| """Load trained models from disk""" | |
| try: | |
| threat_model_path = MODEL_DIR / "threat_model.pkl" | |
| severity_model_path = MODEL_DIR / "severity_model.pkl" | |
| if threat_model_path.exists() and severity_model_path.exists(): | |
| self.threat_model = joblib.load(threat_model_path) | |
| self.severity_model = joblib.load(severity_model_path) | |
| self.is_trained = True | |
| logger.info("Models loaded successfully") | |
| return True | |
| else: | |
| logger.warning("Model files not found") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error loading models: {e}") | |
| return False | |
| def predict(self, description, name=""): | |
| """Predict threat type and severity for a given incident description""" | |
| if not self.is_trained: | |
| if not self.load_models(): | |
| raise ValueError("Models not trained or loaded") | |
| # Preprocess input | |
| combined_text = self.preprocess_text(f"{name} {description}") | |
| # Make predictions | |
| threat_pred = self.threat_model.predict([combined_text])[0] | |
| severity_pred = self.severity_model.predict([combined_text])[0] | |
| # Get prediction probabilities for confidence scores | |
| threat_proba = self.threat_model.predict_proba([combined_text])[0] | |
| severity_proba = self.severity_model.predict_proba([combined_text])[0] | |
| threat_confidence = max(threat_proba) | |
| severity_confidence = max(severity_proba) | |
| return { | |
| 'threat': threat_pred, | |
| 'severity': severity_pred, | |
| 'threat_confidence': float(threat_confidence), | |
| 'severity_confidence': float(severity_confidence) | |
| } | |
| # Global instance | |
| incident_classifier = IncidentClassifier() | |
| def get_classifier(): | |
| """Get the global classifier instance""" | |
| return incident_classifier | |
| def train_models(): | |
| """Train the models using the incidents dataset""" | |
| if not TRAINING_DEPENDENCIES_AVAILABLE: | |
| logger.error("Training dependencies not available. Models should be pre-trained for deployment.") | |
| return False | |
| classifier = get_classifier() | |
| return classifier.train_models() | |
| def predict_incident(description, name=""): | |
| """Predict threat and severity for an incident""" | |
| classifier = get_classifier() | |
| return classifier.predict(description, name) |