""" CatBoost Model Training Script for AgriPredict This script trains the CatBoost model using the existing training dataset. Includes comprehensive accuracy metrics: MAE, RMSE, MAPE, Bias, MASE, R-Squared """ import pandas as pd import numpy as np from datetime import datetime, timedelta from catboost import CatBoostRegressor, Pool from sklearn.model_selection import train_test_split, TimeSeriesSplit from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score import joblib import os from typing import Dict, Any, Tuple, Optional import logging import json # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ForecastMetrics: """Comprehensive forecast accuracy metrics calculator""" @staticmethod def calculate_all_metrics(y_true: np.ndarray, y_pred: np.ndarray, y_train: Optional[np.ndarray] = None) -> Dict[str, float]: """ Calculate all forecast accuracy metrics Args: y_true: Actual values y_pred: Predicted values y_train: Training data (for MASE calculation) Returns: Dictionary with all metrics """ y_true = np.array(y_true).flatten() y_pred = np.array(y_pred).flatten() # Remove any NaN or infinite values mask = np.isfinite(y_true) & np.isfinite(y_pred) y_true = y_true[mask] y_pred = y_pred[mask] if len(y_true) == 0: return { 'mae': np.nan, 'rmse': np.nan, 'mape': np.nan, 'bias': np.nan, 'mase': np.nan, 'r_squared': np.nan } # MAE - Mean Absolute Error mae = mean_absolute_error(y_true, y_pred) # RMSE - Root Mean Squared Error rmse = np.sqrt(mean_squared_error(y_true, y_pred)) # MAPE - Mean Absolute Percentage Error (handle zero values) non_zero_mask = y_true != 0 if np.any(non_zero_mask): mape = np.mean(np.abs((y_true[non_zero_mask] - y_pred[non_zero_mask]) / y_true[non_zero_mask])) * 100 else: mape = np.nan # Bias - Mean Forecast Error (MFE) bias = np.mean(y_pred - y_true) # MASE - Mean Absolute Scaled Error if y_train is not None and len(y_train) > 1: y_train = np.array(y_train).flatten() # Naive forecast error (one-step ahead) naive_errors = np.abs(np.diff(y_train)) scaling_factor = np.mean(naive_errors) if scaling_factor > 0: mase = mae / scaling_factor else: mase = np.nan else: mase = np.nan # R-Squared r_squared = r2_score(y_true, y_pred) return { 'mae': round(float(mae), 4), 'rmse': round(float(rmse), 4), 'mape': round(float(mape), 4) if not np.isnan(mape) else None, 'bias': round(float(bias), 4), 'mase': round(float(mase), 4) if not np.isnan(mase) else None, 'r_squared': round(float(r_squared), 4) } class CatBoostTrainer: """CatBoost model trainer for agricultural demand forecasting""" def __init__(self): self.model = None self.feature_names = None self.metrics = None self.training_config = None def load_training_data(self, filepath: str = None) -> pd.DataFrame: """ Load training data from CSV file Args: filepath: Path to training data CSV Returns: DataFrame with training data """ if filepath is None: # Default to the existing training data filepath = os.path.join(os.path.dirname(__file__), '..', 'data', 'catboost_training_data.csv') if not os.path.exists(filepath): filepath = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'catboost_training_data.csv') if not os.path.exists(filepath): logger.warning(f"Training data not found at {filepath}, generating synthetic data") return self.generate_artificial_data(n_samples=5000) logger.info(f"Loading training data from {filepath}") df = pd.read_csv(filepath) # Parse date column if 'date' in df.columns: df['date'] = pd.to_datetime(df['date']) logger.info(f"Loaded {len(df)} samples with {len(df.columns)} features") return df def generate_artificial_data(self, n_samples: int = 5000) -> pd.DataFrame: """ Generate artificial agricultural data for training Args: n_samples: Number of samples to generate Returns: DataFrame with artificial agricultural data """ logger.info(f"Generating {n_samples} artificial data samples") # Generate date range start_date = datetime(2023, 1, 1) dates = [start_date + timedelta(days=i) for i in range(n_samples)] np.random.seed(42) # For reproducible results data = [] for date in dates: # Seasonal patterns day_of_year = date.timetuple().tm_yday seasonal_factor = 1 + 0.3 * np.sin(2 * np.pi * day_of_year / 365) # Base demand with seasonal variation base_quantity = np.random.normal(100, 20) * seasonal_factor # Price influenced by season and demand base_price = 25 + 5 * np.sin(2 * np.pi * day_of_year / 365) price_noise = np.random.normal(0, 2) price = base_price + price_noise # Add some correlation between price and quantity quantity_noise = np.random.normal(0, 15) quantity = base_quantity + quantity_noise - 0.1 * (price - 25) # Ensure positive values quantity = max(1, quantity) price = max(5, price) data.append({ 'date': date, 'quantity': round(quantity, 2), 'price': round(price, 2), 'day_of_week': date.weekday(), 'month': date.month, 'day_of_month': date.day, 'quarter': (date.month - 1) // 3 + 1, 'is_weekend': 1 if date.weekday() >= 5 else 0, 'season': self._get_season(date.month) }) df = pd.DataFrame(data) # Add lag features for lag in [1, 7, 14, 30]: df[f'price_lag_{lag}'] = df['price'].shift(lag) df[f'quantity_lag_{lag}'] = df['quantity'].shift(lag) # Add rolling statistics for window in [7, 14, 30]: df[f'price_rolling_mean_{window}'] = df['price'].rolling(window).mean() df[f'price_rolling_std_{window}'] = df['price'].rolling(window).std() df[f'quantity_rolling_mean_{window}'] = df['quantity'].rolling(window).mean() # Add price change features df['price_change'] = df['price'].pct_change() df['price_change_7d'] = df['price'].pct_change(7) # Drop rows with NaN values df = df.dropna().reset_index(drop=True) logger.info(f"Generated dataset with {len(df)} samples and {len(df.columns)} features") return df def _get_season(self, month: int) -> str: """Get season based on month""" if month in [12, 1, 2]: return 'winter' elif month in [3, 4, 5]: return 'spring' elif month in [6, 7, 8]: return 'summer' else: return 'fall' def prepare_features(self, df: pd.DataFrame, target_col: str = 'target_quantity') -> tuple: """ Prepare features for training Args: df: Input DataFrame target_col: Name of target column Returns: Tuple of (X, y, feature_names, categorical_features) """ # Define columns to exclude from features exclude_cols = ['date', 'target_quantity', 'product_name', 'region_name', 'product_category'] # Check if target column exists if target_col not in df.columns: # Fallback to price if target_quantity doesn't exist target_col = 'price' if 'price' in df.columns else df.columns[-1] logger.warning(f"target_quantity not found, using {target_col} as target") # Define feature columns feature_cols = [col for col in df.columns if col not in exclude_cols and col != target_col] # Identify categorical features categorical_features = [] for col in feature_cols: if df[col].dtype == 'object' or df[col].dtype.name == 'category': categorical_features.append(col) # Remove categorical features for now (CatBoost handles them, but we'll use numeric only) feature_cols = [col for col in feature_cols if col not in categorical_features] # Prepare features and target X = df[feature_cols].copy() y = df[target_col].copy() # Handle any remaining NaN values X = X.fillna(X.median()) logger.info(f"Prepared {len(feature_cols)} features for training, target: {target_col}") return X, y, feature_cols, categorical_features def train_model(self, X_train, y_train, X_val=None, y_val=None, **kwargs) -> CatBoostRegressor: """ Train CatBoost model Args: X_train: Training features y_train: Training target X_val: Validation features (optional) y_val: Validation target (optional) **kwargs: Additional CatBoost parameters Returns: Trained CatBoost model """ # Default parameters optimized for demand forecasting default_params = { 'iterations': 1000, 'learning_rate': 0.05, 'depth': 8, 'loss_function': 'MAE', 'eval_metric': 'MAE', 'random_seed': 42, 'verbose': 100, 'early_stopping_rounds': 100, 'l2_leaf_reg': 3, 'border_count': 128, 'thread_count': -1 } # Update with custom parameters default_params.update(kwargs) self.training_config = default_params.copy() # Create model model = CatBoostRegressor(**default_params) # Prepare data train_pool = Pool(X_train, y_train) if X_val is not None and y_val is not None: val_pool = Pool(X_val, y_val) model.fit(train_pool, eval_set=val_pool, use_best_model=True) else: model.fit(train_pool) self.model = model self.feature_names = list(X_train.columns) logger.info(f"Trained CatBoost model with {model.tree_count_} trees") return model def evaluate_model(self, X_test, y_test, y_train=None) -> Dict[str, float]: """ Evaluate model performance with comprehensive metrics Args: X_test: Test features y_test: Test target y_train: Training target (for MASE calculation) Returns: Dictionary with evaluation metrics """ if self.model is None: raise ValueError("Model not trained yet") # Make predictions y_pred = self.model.predict(X_test) # Calculate all metrics self.metrics = ForecastMetrics.calculate_all_metrics( y_true=y_test, y_pred=y_pred, y_train=y_train ) logger.info(f"Model Evaluation Metrics:") logger.info(f" MAE: {self.metrics['mae']}") logger.info(f" RMSE: {self.metrics['rmse']}") logger.info(f" MAPE: {self.metrics['mape']}%") logger.info(f" Bias: {self.metrics['bias']}") logger.info(f" MASE: {self.metrics['mase']}") logger.info(f" R²: {self.metrics['r_squared']}") return self.metrics def save_model(self, filepath: str): """ Save trained model to file with metadata Args: filepath: Path to save the model """ if self.model is None: raise ValueError("Model not trained yet") # Create directory if it doesn't exist os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.', exist_ok=True) # Prepare model data with metadata model_data = { 'model': self.model, 'feature_names': self.feature_names, 'metrics': self.metrics, 'training_config': self.training_config, 'training_date': datetime.now().isoformat(), 'version': '2.0' } # Save model using joblib joblib.dump(model_data, filepath) # Also save the CatBoost native format for faster loading native_path = filepath.replace('.pkl', '.cbm') self.model.save_model(native_path) # Save metrics to JSON for easy access metrics_path = filepath.replace('.pkl', '_metrics.json') with open(metrics_path, 'w') as f: json.dump({ 'metrics': self.metrics, 'feature_names': self.feature_names, 'training_date': model_data['training_date'], 'training_config': {k: str(v) for k, v in (self.training_config or {}).items()} }, f, indent=2) logger.info(f"Model saved to {filepath}") logger.info(f"Native model saved to {native_path}") logger.info(f"Metrics saved to {metrics_path}") def load_model(self, filepath: str): """ Load trained model from file Args: filepath: Path to the saved model """ if not os.path.exists(filepath): raise FileNotFoundError(f"Model file not found: {filepath}") # Load model model_data = joblib.load(filepath) self.model = model_data['model'] self.feature_names = model_data.get('feature_names', []) self.metrics = model_data.get('metrics', {}) self.training_config = model_data.get('training_config', {}) logger.info(f"Model loaded from {filepath}") logger.info(f"Model metrics: {self.metrics}") def predict(self, features: pd.DataFrame) -> np.ndarray: """ Make predictions with trained model Args: features: Input features Returns: Predictions array """ if self.model is None: raise ValueError("Model not trained or loaded yet") # Ensure features are in correct order if self.feature_names: # Only use features that exist in both available_features = [f for f in self.feature_names if f in features.columns] missing_features = [f for f in self.feature_names if f not in features.columns] if missing_features: logger.warning(f"Missing features: {missing_features[:5]}...") # Add missing features with default values for f in missing_features: features[f] = 0 features = features[self.feature_names] return self.model.predict(features) def get_feature_importance(self, top_n: int = 20) -> Dict[str, float]: """ Get feature importance from trained model Args: top_n: Number of top features to return Returns: Dictionary of feature names and their importance scores """ if self.model is None: raise ValueError("Model not trained yet") importance = self.model.get_feature_importance() feature_importance = dict(zip(self.feature_names, importance)) # Sort by importance sorted_importance = dict(sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)[:top_n]) return sorted_importance def main(): """Main training function""" logger.info("=" * 60) logger.info("Starting CatBoost Model Training for AgriPredict") logger.info("=" * 60) # Initialize trainer trainer = CatBoostTrainer() # Load training data (use existing CSV or generate synthetic) df = trainer.load_training_data() logger.info(f"Dataset shape: {df.shape}") logger.info(f"Columns: {list(df.columns)[:10]}...") # Prepare features X, y, feature_names, categorical_features = trainer.prepare_features(df) logger.info(f"Feature matrix shape: {X.shape}") logger.info(f"Target shape: {y.shape}") logger.info(f"Target statistics: mean={y.mean():.2f}, std={y.std():.2f}, min={y.min():.2f}, max={y.max():.2f}") # Split data (use time-based split for time series data) # Reserve last 20% for testing split_idx = int(len(X) * 0.8) X_train_full, X_test = X.iloc[:split_idx], X.iloc[split_idx:] y_train_full, y_test = y.iloc[:split_idx], y.iloc[split_idx:] # Further split training data for validation val_split_idx = int(len(X_train_full) * 0.8) X_train, X_val = X_train_full.iloc[:val_split_idx], X_train_full.iloc[val_split_idx:] y_train, y_val = y_train_full.iloc[:val_split_idx], y_train_full.iloc[val_split_idx:] logger.info(f"Train set: {len(X_train)} samples") logger.info(f"Validation set: {len(X_val)} samples") logger.info(f"Test set: {len(X_test)} samples") # Train model logger.info("-" * 40) logger.info("Training CatBoost model...") model = trainer.train_model(X_train, y_train, X_val, y_val) # Evaluate model on test set logger.info("-" * 40) logger.info("Evaluating model on test set...") metrics = trainer.evaluate_model(X_test, y_test, y_train=y_train) # Get feature importance logger.info("-" * 40) logger.info("Top 10 Feature Importance:") importance = trainer.get_feature_importance(top_n=10) for feat, imp in importance.items(): logger.info(f" {feat}: {imp:.2f}") # Save model model_dir = os.path.join(os.path.dirname(__file__), 'models') os.makedirs(model_dir, exist_ok=True) model_path = os.path.join(model_dir, "catboost_model.pkl") trainer.save_model(model_path) logger.info("=" * 60) logger.info("Training completed successfully!") logger.info(f"Model saved to: {model_path}") logger.info("=" * 60) # Print final summary print("\n" + "=" * 60) print("TRAINING SUMMARY") print("=" * 60) print(f"Total samples: {len(df)}") print(f"Features used: {len(feature_names)}") print(f"Trees in model: {model.tree_count_}") print("\nTest Set Metrics:") print(f" MAE: {metrics['mae']:.4f}") print(f" RMSE: {metrics['rmse']:.4f}") print(f" MAPE: {metrics['mape']:.2f}%" if metrics['mape'] else " MAPE: N/A") print(f" Bias: {metrics['bias']:.4f}") print(f" MASE: {metrics['mase']:.4f}" if metrics['mase'] else " MASE: N/A") print(f" R²: {metrics['r_squared']:.4f}") print("=" * 60) return trainer if __name__ == "__main__": trained_trainer = main()