import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LinearRegression, Ridge, Lasso from sklearn.metrics import mean_squared_error, r2_score import pickle import os class GDPPredictor: def __init__(self): self.df = None self.cleaned_df = None self.features_to_use = [] self.target = 'Real GDP (USD bn)' self.scaler = StandardScaler() self.all_results = {} self.ensemble_weights = {} self.feature_ranges = {} self.is_trained = False def load_data(self, file_path='Consolidated.csv'): """Load and prepare the dataset""" self.df = pd.read_csv(file_path) self.cleaned_df = self.df.copy() # Define all features (excluding target and Year) self.features_to_use = [col for col in self.cleaned_df.columns if col != self.target and col != 'Year'] # Calculate feature ranges for reference for feature in self.features_to_use: feature_min = self.cleaned_df[feature].min() feature_max = self.cleaned_df[feature].max() feature_mean = self.cleaned_df[feature].mean() feature_current = self.cleaned_df[self.cleaned_df['Year'] == self.cleaned_df['Year'].max()][feature].values[0] self.feature_ranges[feature] = (feature_min, feature_max, feature_mean, feature_current) return len(self.features_to_use) def train_models(self): """Train all regression models""" if self.cleaned_df is None: raise ValueError("Data not loaded. Call load_data() first.") # Prepare data for modeling X = self.cleaned_df[self.features_to_use] y = self.cleaned_df[self.target] # Scale features X_scaled = self.scaler.fit_transform(X) # Define parameter ranges ridge_alphas = [0.001, 0.01, 0.1, 1.0] lasso_alphas = [0.001, 0.01, 0.1, 1.0] # Train Linear Regression lr_model = LinearRegression() lr_model.fit(X_scaled, y) y_pred = lr_model.predict(X_scaled) rmse = np.sqrt(mean_squared_error(y, y_pred)) r2 = r2_score(y, y_pred) self.all_results['Linear Regression'] = { 'model': lr_model, 'rmse': rmse, 'r2': r2 } # Train Ridge models for alpha in ridge_alphas: model_name = f"Ridge (alpha={alpha})" ridge_model = Ridge(alpha=alpha) ridge_model.fit(X_scaled, y) y_pred = ridge_model.predict(X_scaled) rmse = np.sqrt(mean_squared_error(y, y_pred)) r2 = r2_score(y, y_pred) self.all_results[model_name] = { 'model': ridge_model, 'rmse': rmse, 'r2': r2 } # Train Lasso models for alpha in lasso_alphas: model_name = f"Lasso (alpha={alpha})" lasso_model = Lasso(alpha=alpha, max_iter=10000) try: lasso_model.fit(X_scaled, y) y_pred = lasso_model.predict(X_scaled) rmse = np.sqrt(mean_squared_error(y, y_pred)) r2 = r2_score(y, y_pred) self.all_results[model_name] = { 'model': lasso_model, 'rmse': rmse, 'r2': r2 } except Exception as e: print(f"Error training {model_name}: {e}") # Create ensemble weights based on R2 scores total_r2 = sum(max(0.01, self.all_results[m]['r2']) for m in self.all_results) for name in self.all_results: # Use max to ensure no negative weights (in case R2 is negative) weight = max(0.01, self.all_results[name]['r2']) / total_r2 self.ensemble_weights[name] = weight self.is_trained = True return self.all_results def get_feature_info(self): """Return feature categories with current values and ranges""" if not self.features_to_use: raise ValueError("Data not loaded. Call load_data() first.") # Organize features into categories for better user experience feature_categories = { "Foreign Investment & Aid": ['Foreign Aid (USD bn)', 'FDI (bn USD)'], "Economic Indicators": ['BOP (UDS bn)', 'Forex Reserves (in US $ Billion)', 'Money Supply (M3) in billion rupees', 'Inflation (in % of GDP)', 'CPI', 'Debt to GDP ratio'], "Financial Markets": ['Sensex', 'IIP Index (Base year 1980)'], "Resources & Savings": ['Savings (billion dollars)', 'Oil Prices', 'Bank Rate'], "Agriculture": ['Agriculture Production(Food Grains) in Lakhs Tonnes'], "Trade": ['With US(in USD Bn)', 'With China(in USD bn)'] } # Add uncategorized features to "Other Indicators" categorized = [] for category, feat_list in feature_categories.items(): categorized.extend(feat_list) uncategorized = [f for f in self.features_to_use if f not in categorized] if uncategorized: feature_categories["Other Indicators"] = uncategorized # Prepare results with only features that exist in our dataset result = {} for category, category_features in feature_categories.items(): valid_features = [f for f in category_features if f in self.features_to_use] if valid_features: result[category] = { feature: self.feature_ranges[feature] for feature in valid_features } return result def predict_gdp(self, input_values): """Predict GDP using all models and ensemble approach""" if not self.is_trained: raise ValueError("Models not trained. Call train_models() first.") try: # Check for missing features missing_features = [f for f in self.features_to_use if f not in input_values] if missing_features: raise ValueError(f"Missing values for features: {missing_features}") # Create input array input_data = np.array([input_values[feature] for feature in self.features_to_use]).reshape(1, -1) # Scale input input_scaled = self.scaler.transform(input_data) # Make predictions with each model predictions = {} for name, model_info in self.all_results.items(): model = model_info['model'] pred = model.predict(input_scaled)[0] predictions[name] = pred # Calculate ensemble prediction (weighted average) ensemble_pred = 0 for name, pred in predictions.items(): weight = self.ensemble_weights[name] ensemble_pred += pred * weight predictions['Ensemble'] = ensemble_pred return predictions except Exception as e: raise ValueError(f"Error predicting GDP: {e}") def get_latest_gdp(self): """Return the most recent GDP value and year""" if self.cleaned_df is None: raise ValueError("Data not loaded. Call load_data() first.") latest_year = self.cleaned_df['Year'].max() latest_gdp = self.cleaned_df[self.cleaned_df['Year'] == latest_year][self.target].values[0] return latest_year, latest_gdp def save_models(self, filepath='gdp_models.pkl'): """Save the trained models to disk""" if not self.is_trained: raise ValueError("Models not trained. Call train_models() first.") # Prepare data to save save_data = { 'all_results': self.all_results, 'ensemble_weights': self.ensemble_weights, 'features_to_use': self.features_to_use, 'feature_ranges': self.feature_ranges, 'scaler': self.scaler } with open(filepath, 'wb') as f: pickle.dump(save_data, f) def load_models(self, filepath='gdp_models.pkl'): """Load trained models from disk""" if not os.path.exists(filepath): raise FileNotFoundError(f"Model file {filepath} not found.") with open(filepath, 'rb') as f: save_data = pickle.load(f) self.all_results = save_data['all_results'] self.ensemble_weights = save_data['ensemble_weights'] self.features_to_use = save_data['features_to_use'] self.feature_ranges = save_data['feature_ranges'] self.scaler = save_data['scaler'] self.is_trained = True