agripredict-analysis / train_catboost.py
adsurkasur's picture
Update AgriPredict Analysis Service
b2c7817 verified
"""
CatBoost Model Training Script for AgriPredict
This script trains the CatBoost model using the existing training dataset.
Includes comprehensive accuracy metrics: MAE, RMSE, MAPE, Bias, MASE, R-Squared
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from catboost import CatBoostRegressor, Pool
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import joblib
import os
from typing import Dict, Any, Tuple, Optional
import logging
import json
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ForecastMetrics:
"""Comprehensive forecast accuracy metrics calculator"""
@staticmethod
def calculate_all_metrics(y_true: np.ndarray, y_pred: np.ndarray,
y_train: Optional[np.ndarray] = None) -> Dict[str, float]:
"""
Calculate all forecast accuracy metrics
Args:
y_true: Actual values
y_pred: Predicted values
y_train: Training data (for MASE calculation)
Returns:
Dictionary with all metrics
"""
y_true = np.array(y_true).flatten()
y_pred = np.array(y_pred).flatten()
# Remove any NaN or infinite values
mask = np.isfinite(y_true) & np.isfinite(y_pred)
y_true = y_true[mask]
y_pred = y_pred[mask]
if len(y_true) == 0:
return {
'mae': np.nan, 'rmse': np.nan, 'mape': np.nan,
'bias': np.nan, 'mase': np.nan, 'r_squared': np.nan
}
# MAE - Mean Absolute Error
mae = mean_absolute_error(y_true, y_pred)
# RMSE - Root Mean Squared Error
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
# MAPE - Mean Absolute Percentage Error (handle zero values)
non_zero_mask = y_true != 0
if np.any(non_zero_mask):
mape = np.mean(np.abs((y_true[non_zero_mask] - y_pred[non_zero_mask]) / y_true[non_zero_mask])) * 100
else:
mape = np.nan
# Bias - Mean Forecast Error (MFE)
bias = np.mean(y_pred - y_true)
# MASE - Mean Absolute Scaled Error
if y_train is not None and len(y_train) > 1:
y_train = np.array(y_train).flatten()
# Naive forecast error (one-step ahead)
naive_errors = np.abs(np.diff(y_train))
scaling_factor = np.mean(naive_errors)
if scaling_factor > 0:
mase = mae / scaling_factor
else:
mase = np.nan
else:
mase = np.nan
# R-Squared
r_squared = r2_score(y_true, y_pred)
return {
'mae': round(float(mae), 4),
'rmse': round(float(rmse), 4),
'mape': round(float(mape), 4) if not np.isnan(mape) else None,
'bias': round(float(bias), 4),
'mase': round(float(mase), 4) if not np.isnan(mase) else None,
'r_squared': round(float(r_squared), 4)
}
class CatBoostTrainer:
"""CatBoost model trainer for agricultural demand forecasting"""
def __init__(self):
self.model = None
self.feature_names = None
self.metrics = None
self.training_config = None
def load_training_data(self, filepath: str = None) -> pd.DataFrame:
"""
Load training data from CSV file
Args:
filepath: Path to training data CSV
Returns:
DataFrame with training data
"""
if filepath is None:
# Default to the existing training data
filepath = os.path.join(os.path.dirname(__file__), '..', 'data', 'catboost_training_data.csv')
if not os.path.exists(filepath):
filepath = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'catboost_training_data.csv')
if not os.path.exists(filepath):
logger.warning(f"Training data not found at {filepath}, generating synthetic data")
return self.generate_artificial_data(n_samples=5000)
logger.info(f"Loading training data from {filepath}")
df = pd.read_csv(filepath)
# Parse date column
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
logger.info(f"Loaded {len(df)} samples with {len(df.columns)} features")
return df
def generate_artificial_data(self, n_samples: int = 5000) -> pd.DataFrame:
"""
Generate artificial agricultural data for training
Args:
n_samples: Number of samples to generate
Returns:
DataFrame with artificial agricultural data
"""
logger.info(f"Generating {n_samples} artificial data samples")
# Generate date range
start_date = datetime(2023, 1, 1)
dates = [start_date + timedelta(days=i) for i in range(n_samples)]
np.random.seed(42) # For reproducible results
data = []
for date in dates:
# Seasonal patterns
day_of_year = date.timetuple().tm_yday
seasonal_factor = 1 + 0.3 * np.sin(2 * np.pi * day_of_year / 365)
# Base demand with seasonal variation
base_quantity = np.random.normal(100, 20) * seasonal_factor
# Price influenced by season and demand
base_price = 25 + 5 * np.sin(2 * np.pi * day_of_year / 365)
price_noise = np.random.normal(0, 2)
price = base_price + price_noise
# Add some correlation between price and quantity
quantity_noise = np.random.normal(0, 15)
quantity = base_quantity + quantity_noise - 0.1 * (price - 25)
# Ensure positive values
quantity = max(1, quantity)
price = max(5, price)
data.append({
'date': date,
'quantity': round(quantity, 2),
'price': round(price, 2),
'day_of_week': date.weekday(),
'month': date.month,
'day_of_month': date.day,
'quarter': (date.month - 1) // 3 + 1,
'is_weekend': 1 if date.weekday() >= 5 else 0,
'season': self._get_season(date.month)
})
df = pd.DataFrame(data)
# Add lag features
for lag in [1, 7, 14, 30]:
df[f'price_lag_{lag}'] = df['price'].shift(lag)
df[f'quantity_lag_{lag}'] = df['quantity'].shift(lag)
# Add rolling statistics
for window in [7, 14, 30]:
df[f'price_rolling_mean_{window}'] = df['price'].rolling(window).mean()
df[f'price_rolling_std_{window}'] = df['price'].rolling(window).std()
df[f'quantity_rolling_mean_{window}'] = df['quantity'].rolling(window).mean()
# Add price change features
df['price_change'] = df['price'].pct_change()
df['price_change_7d'] = df['price'].pct_change(7)
# Drop rows with NaN values
df = df.dropna().reset_index(drop=True)
logger.info(f"Generated dataset with {len(df)} samples and {len(df.columns)} features")
return df
def _get_season(self, month: int) -> str:
"""Get season based on month"""
if month in [12, 1, 2]:
return 'winter'
elif month in [3, 4, 5]:
return 'spring'
elif month in [6, 7, 8]:
return 'summer'
else:
return 'fall'
def prepare_features(self, df: pd.DataFrame, target_col: str = 'target_quantity') -> tuple:
"""
Prepare features for training
Args:
df: Input DataFrame
target_col: Name of target column
Returns:
Tuple of (X, y, feature_names, categorical_features)
"""
# Define columns to exclude from features
exclude_cols = ['date', 'target_quantity', 'product_name', 'region_name', 'product_category']
# Check if target column exists
if target_col not in df.columns:
# Fallback to price if target_quantity doesn't exist
target_col = 'price' if 'price' in df.columns else df.columns[-1]
logger.warning(f"target_quantity not found, using {target_col} as target")
# Define feature columns
feature_cols = [col for col in df.columns if col not in exclude_cols and col != target_col]
# Identify categorical features
categorical_features = []
for col in feature_cols:
if df[col].dtype == 'object' or df[col].dtype.name == 'category':
categorical_features.append(col)
# Remove categorical features for now (CatBoost handles them, but we'll use numeric only)
feature_cols = [col for col in feature_cols if col not in categorical_features]
# Prepare features and target
X = df[feature_cols].copy()
y = df[target_col].copy()
# Handle any remaining NaN values
X = X.fillna(X.median())
logger.info(f"Prepared {len(feature_cols)} features for training, target: {target_col}")
return X, y, feature_cols, categorical_features
def train_model(self, X_train, y_train, X_val=None, y_val=None, **kwargs) -> CatBoostRegressor:
"""
Train CatBoost model
Args:
X_train: Training features
y_train: Training target
X_val: Validation features (optional)
y_val: Validation target (optional)
**kwargs: Additional CatBoost parameters
Returns:
Trained CatBoost model
"""
# Default parameters optimized for demand forecasting
default_params = {
'iterations': 1000,
'learning_rate': 0.05,
'depth': 8,
'loss_function': 'MAE',
'eval_metric': 'MAE',
'random_seed': 42,
'verbose': 100,
'early_stopping_rounds': 100,
'l2_leaf_reg': 3,
'border_count': 128,
'thread_count': -1
}
# Update with custom parameters
default_params.update(kwargs)
self.training_config = default_params.copy()
# Create model
model = CatBoostRegressor(**default_params)
# Prepare data
train_pool = Pool(X_train, y_train)
if X_val is not None and y_val is not None:
val_pool = Pool(X_val, y_val)
model.fit(train_pool, eval_set=val_pool, use_best_model=True)
else:
model.fit(train_pool)
self.model = model
self.feature_names = list(X_train.columns)
logger.info(f"Trained CatBoost model with {model.tree_count_} trees")
return model
def evaluate_model(self, X_test, y_test, y_train=None) -> Dict[str, float]:
"""
Evaluate model performance with comprehensive metrics
Args:
X_test: Test features
y_test: Test target
y_train: Training target (for MASE calculation)
Returns:
Dictionary with evaluation metrics
"""
if self.model is None:
raise ValueError("Model not trained yet")
# Make predictions
y_pred = self.model.predict(X_test)
# Calculate all metrics
self.metrics = ForecastMetrics.calculate_all_metrics(
y_true=y_test,
y_pred=y_pred,
y_train=y_train
)
logger.info(f"Model Evaluation Metrics:")
logger.info(f" MAE: {self.metrics['mae']}")
logger.info(f" RMSE: {self.metrics['rmse']}")
logger.info(f" MAPE: {self.metrics['mape']}%")
logger.info(f" Bias: {self.metrics['bias']}")
logger.info(f" MASE: {self.metrics['mase']}")
logger.info(f" R²: {self.metrics['r_squared']}")
return self.metrics
def save_model(self, filepath: str):
"""
Save trained model to file with metadata
Args:
filepath: Path to save the model
"""
if self.model is None:
raise ValueError("Model not trained yet")
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.', exist_ok=True)
# Prepare model data with metadata
model_data = {
'model': self.model,
'feature_names': self.feature_names,
'metrics': self.metrics,
'training_config': self.training_config,
'training_date': datetime.now().isoformat(),
'version': '2.0'
}
# Save model using joblib
joblib.dump(model_data, filepath)
# Also save the CatBoost native format for faster loading
native_path = filepath.replace('.pkl', '.cbm')
self.model.save_model(native_path)
# Save metrics to JSON for easy access
metrics_path = filepath.replace('.pkl', '_metrics.json')
with open(metrics_path, 'w') as f:
json.dump({
'metrics': self.metrics,
'feature_names': self.feature_names,
'training_date': model_data['training_date'],
'training_config': {k: str(v) for k, v in (self.training_config or {}).items()}
}, f, indent=2)
logger.info(f"Model saved to {filepath}")
logger.info(f"Native model saved to {native_path}")
logger.info(f"Metrics saved to {metrics_path}")
def load_model(self, filepath: str):
"""
Load trained model from file
Args:
filepath: Path to the saved model
"""
if not os.path.exists(filepath):
raise FileNotFoundError(f"Model file not found: {filepath}")
# Load model
model_data = joblib.load(filepath)
self.model = model_data['model']
self.feature_names = model_data.get('feature_names', [])
self.metrics = model_data.get('metrics', {})
self.training_config = model_data.get('training_config', {})
logger.info(f"Model loaded from {filepath}")
logger.info(f"Model metrics: {self.metrics}")
def predict(self, features: pd.DataFrame) -> np.ndarray:
"""
Make predictions with trained model
Args:
features: Input features
Returns:
Predictions array
"""
if self.model is None:
raise ValueError("Model not trained or loaded yet")
# Ensure features are in correct order
if self.feature_names:
# Only use features that exist in both
available_features = [f for f in self.feature_names if f in features.columns]
missing_features = [f for f in self.feature_names if f not in features.columns]
if missing_features:
logger.warning(f"Missing features: {missing_features[:5]}...")
# Add missing features with default values
for f in missing_features:
features[f] = 0
features = features[self.feature_names]
return self.model.predict(features)
def get_feature_importance(self, top_n: int = 20) -> Dict[str, float]:
"""
Get feature importance from trained model
Args:
top_n: Number of top features to return
Returns:
Dictionary of feature names and their importance scores
"""
if self.model is None:
raise ValueError("Model not trained yet")
importance = self.model.get_feature_importance()
feature_importance = dict(zip(self.feature_names, importance))
# Sort by importance
sorted_importance = dict(sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)[:top_n])
return sorted_importance
def main():
"""Main training function"""
logger.info("=" * 60)
logger.info("Starting CatBoost Model Training for AgriPredict")
logger.info("=" * 60)
# Initialize trainer
trainer = CatBoostTrainer()
# Load training data (use existing CSV or generate synthetic)
df = trainer.load_training_data()
logger.info(f"Dataset shape: {df.shape}")
logger.info(f"Columns: {list(df.columns)[:10]}...")
# Prepare features
X, y, feature_names, categorical_features = trainer.prepare_features(df)
logger.info(f"Feature matrix shape: {X.shape}")
logger.info(f"Target shape: {y.shape}")
logger.info(f"Target statistics: mean={y.mean():.2f}, std={y.std():.2f}, min={y.min():.2f}, max={y.max():.2f}")
# Split data (use time-based split for time series data)
# Reserve last 20% for testing
split_idx = int(len(X) * 0.8)
X_train_full, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
y_train_full, y_test = y.iloc[:split_idx], y.iloc[split_idx:]
# Further split training data for validation
val_split_idx = int(len(X_train_full) * 0.8)
X_train, X_val = X_train_full.iloc[:val_split_idx], X_train_full.iloc[val_split_idx:]
y_train, y_val = y_train_full.iloc[:val_split_idx], y_train_full.iloc[val_split_idx:]
logger.info(f"Train set: {len(X_train)} samples")
logger.info(f"Validation set: {len(X_val)} samples")
logger.info(f"Test set: {len(X_test)} samples")
# Train model
logger.info("-" * 40)
logger.info("Training CatBoost model...")
model = trainer.train_model(X_train, y_train, X_val, y_val)
# Evaluate model on test set
logger.info("-" * 40)
logger.info("Evaluating model on test set...")
metrics = trainer.evaluate_model(X_test, y_test, y_train=y_train)
# Get feature importance
logger.info("-" * 40)
logger.info("Top 10 Feature Importance:")
importance = trainer.get_feature_importance(top_n=10)
for feat, imp in importance.items():
logger.info(f" {feat}: {imp:.2f}")
# Save model
model_dir = os.path.join(os.path.dirname(__file__), 'models')
os.makedirs(model_dir, exist_ok=True)
model_path = os.path.join(model_dir, "catboost_model.pkl")
trainer.save_model(model_path)
logger.info("=" * 60)
logger.info("Training completed successfully!")
logger.info(f"Model saved to: {model_path}")
logger.info("=" * 60)
# Print final summary
print("\n" + "=" * 60)
print("TRAINING SUMMARY")
print("=" * 60)
print(f"Total samples: {len(df)}")
print(f"Features used: {len(feature_names)}")
print(f"Trees in model: {model.tree_count_}")
print("\nTest Set Metrics:")
print(f" MAE: {metrics['mae']:.4f}")
print(f" RMSE: {metrics['rmse']:.4f}")
print(f" MAPE: {metrics['mape']:.2f}%" if metrics['mape'] else " MAPE: N/A")
print(f" Bias: {metrics['bias']:.4f}")
print(f" MASE: {metrics['mase']:.4f}" if metrics['mase'] else " MASE: N/A")
print(f" R²: {metrics['r_squared']:.4f}")
print("=" * 60)
return trainer
if __name__ == "__main__":
trained_trainer = main()