Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.linear_model import LinearRegression, Ridge, Lasso | |
| from sklearn.metrics import mean_squared_error, r2_score | |
| import pickle | |
| import os | |
| class GDPPredictor: | |
| def __init__(self): | |
| self.df = None | |
| self.cleaned_df = None | |
| self.features_to_use = [] | |
| self.target = 'Real GDP (USD bn)' | |
| self.scaler = StandardScaler() | |
| self.all_results = {} | |
| self.ensemble_weights = {} | |
| self.feature_ranges = {} | |
| self.is_trained = False | |
| def load_data(self, file_path='Consolidated.csv'): | |
| """Load and prepare the dataset""" | |
| self.df = pd.read_csv(file_path) | |
| self.cleaned_df = self.df.copy() | |
| # Define all features (excluding target and Year) | |
| self.features_to_use = [col for col in self.cleaned_df.columns if col != self.target and col != 'Year'] | |
| # Calculate feature ranges for reference | |
| for feature in self.features_to_use: | |
| feature_min = self.cleaned_df[feature].min() | |
| feature_max = self.cleaned_df[feature].max() | |
| feature_mean = self.cleaned_df[feature].mean() | |
| feature_current = self.cleaned_df[self.cleaned_df['Year'] == self.cleaned_df['Year'].max()][feature].values[0] | |
| self.feature_ranges[feature] = (feature_min, feature_max, feature_mean, feature_current) | |
| return len(self.features_to_use) | |
| def train_models(self): | |
| """Train all regression models""" | |
| if self.cleaned_df is None: | |
| raise ValueError("Data not loaded. Call load_data() first.") | |
| # Prepare data for modeling | |
| X = self.cleaned_df[self.features_to_use] | |
| y = self.cleaned_df[self.target] | |
| # Scale features | |
| X_scaled = self.scaler.fit_transform(X) | |
| # Define parameter ranges | |
| ridge_alphas = [0.001, 0.01, 0.1, 1.0] | |
| lasso_alphas = [0.001, 0.01, 0.1, 1.0] | |
| # Train Linear Regression | |
| lr_model = LinearRegression() | |
| lr_model.fit(X_scaled, y) | |
| y_pred = lr_model.predict(X_scaled) | |
| rmse = np.sqrt(mean_squared_error(y, y_pred)) | |
| r2 = r2_score(y, y_pred) | |
| self.all_results['Linear Regression'] = { | |
| 'model': lr_model, | |
| 'rmse': rmse, | |
| 'r2': r2 | |
| } | |
| # Train Ridge models | |
| for alpha in ridge_alphas: | |
| model_name = f"Ridge (alpha={alpha})" | |
| ridge_model = Ridge(alpha=alpha) | |
| ridge_model.fit(X_scaled, y) | |
| y_pred = ridge_model.predict(X_scaled) | |
| rmse = np.sqrt(mean_squared_error(y, y_pred)) | |
| r2 = r2_score(y, y_pred) | |
| self.all_results[model_name] = { | |
| 'model': ridge_model, | |
| 'rmse': rmse, | |
| 'r2': r2 | |
| } | |
| # Train Lasso models | |
| for alpha in lasso_alphas: | |
| model_name = f"Lasso (alpha={alpha})" | |
| lasso_model = Lasso(alpha=alpha, max_iter=10000) | |
| try: | |
| lasso_model.fit(X_scaled, y) | |
| y_pred = lasso_model.predict(X_scaled) | |
| rmse = np.sqrt(mean_squared_error(y, y_pred)) | |
| r2 = r2_score(y, y_pred) | |
| self.all_results[model_name] = { | |
| 'model': lasso_model, | |
| 'rmse': rmse, | |
| 'r2': r2 | |
| } | |
| except Exception as e: | |
| print(f"Error training {model_name}: {e}") | |
| # Create ensemble weights based on R2 scores | |
| total_r2 = sum(max(0.01, self.all_results[m]['r2']) for m in self.all_results) | |
| for name in self.all_results: | |
| # Use max to ensure no negative weights (in case R2 is negative) | |
| weight = max(0.01, self.all_results[name]['r2']) / total_r2 | |
| self.ensemble_weights[name] = weight | |
| self.is_trained = True | |
| return self.all_results | |
| def get_feature_info(self): | |
| """Return feature categories with current values and ranges""" | |
| if not self.features_to_use: | |
| raise ValueError("Data not loaded. Call load_data() first.") | |
| # Organize features into categories for better user experience | |
| feature_categories = { | |
| "Foreign Investment & Aid": ['Foreign Aid (USD bn)', 'FDI (bn USD)'], | |
| "Economic Indicators": ['BOP (UDS bn)', 'Forex Reserves (in US $ Billion)', | |
| 'Money Supply (M3) in billion rupees', 'Inflation (in % of GDP)', | |
| 'CPI', 'Debt to GDP ratio'], | |
| "Financial Markets": ['Sensex', 'IIP Index (Base year 1980)'], | |
| "Resources & Savings": ['Savings (billion dollars)', 'Oil Prices', 'Bank Rate'], | |
| "Agriculture": ['Agriculture Production(Food Grains) in Lakhs Tonnes'], | |
| "Trade": ['With US(in USD Bn)', 'With China(in USD bn)'] | |
| } | |
| # Add uncategorized features to "Other Indicators" | |
| categorized = [] | |
| for category, feat_list in feature_categories.items(): | |
| categorized.extend(feat_list) | |
| uncategorized = [f for f in self.features_to_use if f not in categorized] | |
| if uncategorized: | |
| feature_categories["Other Indicators"] = uncategorized | |
| # Prepare results with only features that exist in our dataset | |
| result = {} | |
| for category, category_features in feature_categories.items(): | |
| valid_features = [f for f in category_features if f in self.features_to_use] | |
| if valid_features: | |
| result[category] = { | |
| feature: self.feature_ranges[feature] for feature in valid_features | |
| } | |
| return result | |
| def predict_gdp(self, input_values): | |
| """Predict GDP using all models and ensemble approach""" | |
| if not self.is_trained: | |
| raise ValueError("Models not trained. Call train_models() first.") | |
| try: | |
| # Check for missing features | |
| missing_features = [f for f in self.features_to_use if f not in input_values] | |
| if missing_features: | |
| raise ValueError(f"Missing values for features: {missing_features}") | |
| # Create input array | |
| input_data = np.array([input_values[feature] for feature in self.features_to_use]).reshape(1, -1) | |
| # Scale input | |
| input_scaled = self.scaler.transform(input_data) | |
| # Make predictions with each model | |
| predictions = {} | |
| for name, model_info in self.all_results.items(): | |
| model = model_info['model'] | |
| pred = model.predict(input_scaled)[0] | |
| predictions[name] = pred | |
| # Calculate ensemble prediction (weighted average) | |
| ensemble_pred = 0 | |
| for name, pred in predictions.items(): | |
| weight = self.ensemble_weights[name] | |
| ensemble_pred += pred * weight | |
| predictions['Ensemble'] = ensemble_pred | |
| return predictions | |
| except Exception as e: | |
| raise ValueError(f"Error predicting GDP: {e}") | |
| def get_latest_gdp(self): | |
| """Return the most recent GDP value and year""" | |
| if self.cleaned_df is None: | |
| raise ValueError("Data not loaded. Call load_data() first.") | |
| latest_year = self.cleaned_df['Year'].max() | |
| latest_gdp = self.cleaned_df[self.cleaned_df['Year'] == latest_year][self.target].values[0] | |
| return latest_year, latest_gdp | |
| def save_models(self, filepath='gdp_models.pkl'): | |
| """Save the trained models to disk""" | |
| if not self.is_trained: | |
| raise ValueError("Models not trained. Call train_models() first.") | |
| # Prepare data to save | |
| save_data = { | |
| 'all_results': self.all_results, | |
| 'ensemble_weights': self.ensemble_weights, | |
| 'features_to_use': self.features_to_use, | |
| 'feature_ranges': self.feature_ranges, | |
| 'scaler': self.scaler | |
| } | |
| with open(filepath, 'wb') as f: | |
| pickle.dump(save_data, f) | |
| def load_models(self, filepath='gdp_models.pkl'): | |
| """Load trained models from disk""" | |
| if not os.path.exists(filepath): | |
| raise FileNotFoundError(f"Model file {filepath} not found.") | |
| with open(filepath, 'rb') as f: | |
| save_data = pickle.load(f) | |
| self.all_results = save_data['all_results'] | |
| self.ensemble_weights = save_data['ensemble_weights'] | |
| self.features_to_use = save_data['features_to_use'] | |
| self.feature_ranges = save_data['feature_ranges'] | |
| self.scaler = save_data['scaler'] | |
| self.is_trained = True |