Spaces:
Sleeping
Sleeping
| # ```python | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor | |
| from sklearn.preprocessing import LabelEncoder, StandardScaler | |
| from sklearn.metrics import accuracy_score | |
| import matplotlib.pyplot as plt | |
| import json | |
| import base64 | |
| from io import BytesIO | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| app = FastAPI() | |
| class WorkerIdRequest(BaseModel): | |
| worker_id: int | |
| async def predict_worker_earnings(request: WorkerIdRequest): | |
| try: | |
| worker_id = request.worker_id | |
| # Initialize result dictionary | |
| results = { | |
| 'worker_id': worker_id, | |
| 'classification_metrics': {}, | |
| 'worker_profile': {}, | |
| 'plot': '' | |
| } | |
| # Load data | |
| try: | |
| df = pd.read_csv('/app/data/extended_worker_dataset.csv') | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=500, detail="CSV file not found at /app/data/extended_worker_dataset.csv") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error reading CSV file: {str(e)}") | |
| # Filter for one worker_id | |
| df = df[df['worker_id'] == worker_id].copy() | |
| if df.empty: | |
| raise HTTPException(status_code=404, detail=f"No data found for worker_id {worker_id}") | |
| # Data preprocessing | |
| try: | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error converting timestamp: {str(e)}") | |
| df['has_job'] = (df['job_type'] != "No Job").astype(int) | |
| try: | |
| wage_cap = df[df['contracted_wage'] > 0]['contracted_wage'].quantile(0.90) | |
| if np.isnan(wage_cap) or wage_cap <= 500: | |
| raise ValueError("Invalid wage cap calculated") | |
| df['contracted_wage'] = df['contracted_wage'].clip(lower=500, upper=wage_cap) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error processing wage data: {str(e)}") | |
| # Encode job_type | |
| le = LabelEncoder() | |
| try: | |
| df['job_type_encoded'] = le.fit_transform(df['job_type']) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error encoding job_type: {str(e)}") | |
| # Split data | |
| if len(df) < 2: | |
| raise HTTPException(status_code=400, detail="Insufficient data points for training and testing") | |
| split_point = int(len(df) * 0.8) | |
| train_df = df.iloc[:split_point].copy() | |
| test_df = df.iloc[split_point:].copy() | |
| # Scale features | |
| scaler = StandardScaler() | |
| try: | |
| train_df[['job_type_scaled', 'years_exp_scaled']] = scaler.fit_transform( | |
| train_df[['job_type_encoded', 'years_of_experience']] | |
| ) | |
| train_df['job_exp_interaction'] = train_df['job_type_scaled'] * train_df['years_exp_scaled'] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error scaling features: {str(e)}") | |
| for subset in [train_df, test_df]: | |
| subset['dayofweek'] = subset['timestamp'].dt.dayofweek | |
| subset['month'] = subset['timestamp'].dt.month | |
| subset['year'] = subset['timestamp'].dt.year | |
| subset['dayofyear'] = subset['timestamp'].dt.dayofyear | |
| subset['is_weekend'] = subset['dayofweek'].isin([5, 6]).astype(int) | |
| # Train classifier | |
| X_train_class = train_df[['dayofweek', 'month', 'year', 'dayofyear', | |
| 'is_weekend', 'job_type_encoded', 'feedback_score', | |
| 'years_of_experience']] | |
| y_train_class = train_df['has_job'] | |
| try: | |
| classifier = RandomForestClassifier( | |
| n_estimators=500, max_depth=12, min_samples_split=5, random_state=42 | |
| ) | |
| classifier.fit(X_train_class, y_train_class) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error training classifier: {str(e)}") | |
| # Train regressor | |
| train_df_reg = train_df[train_df['has_job'] == 1].copy() | |
| if train_df_reg.empty: | |
| raise HTTPException(status_code=404, detail="No data available for regression (all has_job == 0)") | |
| X_train_reg = train_df_reg[['dayofweek', 'month', 'year', 'dayofyear', | |
| 'is_weekend', 'job_type_scaled', 'feedback_score', | |
| 'years_exp_scaled', 'job_exp_interaction']] | |
| y_train_reg = train_df_reg['contracted_wage'] | |
| try: | |
| regressor = RandomForestRegressor( | |
| n_estimators=300, max_depth=10, min_samples_split=4, random_state=42 | |
| ) | |
| regressor.fit(X_train_reg, y_train_reg) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error training regressor: {str(e)}") | |
| # Prepare future dataframe | |
| future_df = test_df[['timestamp', 'job_type', 'job_type_encoded', | |
| 'feedback_score', 'years_of_experience']].rename(columns={'timestamp': 'ds'}) | |
| future_df['dayofweek'] = future_df['ds'].dt.dayofweek | |
| future_df['month'] = future_df['ds'].dt.month | |
| future_df['year'] = future_df['ds'].dt.year | |
| future_df['dayofyear'] = future_df['ds'].dt.dayofyear | |
| future_df['is_weekend'] = future_df['dayofweek'].isin([5, 6]).astype(int) | |
| try: | |
| future_df[['job_type_scaled', 'years_exp_scaled']] = scaler.transform( | |
| future_df[['job_type_encoded', 'years_of_experience']] | |
| ) | |
| future_df['job_exp_interaction'] = future_df['job_type_scaled'] * future_df['years_exp_scaled'] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error transforming future dataframe: {str(e)}") | |
| # Predict job/no-job | |
| try: | |
| future_df['has_job_predicted'] = classifier.predict( | |
| future_df[['dayofweek', 'month', 'year', 'dayofyear', | |
| 'is_weekend', 'job_type_encoded', 'feedback_score', | |
| 'years_of_experience']] | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error predicting has_job: {str(e)}") | |
| # Evaluate classifier accuracy | |
| test_df['has_job'] = (test_df['job_type'] != "No Job").astype(int) | |
| try: | |
| acc = accuracy_score(test_df['has_job'], future_df['has_job_predicted']) | |
| results['classification_metrics']['accuracy'] = round(acc * 100, 2) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error calculating accuracy: {str(e)}") | |
| # Predict wages | |
| try: | |
| future_df['yhat'] = regressor.predict( | |
| future_df[['dayofweek', 'month', 'year', 'dayofyear', | |
| 'is_weekend', 'job_type_scaled', 'feedback_score', | |
| 'years_exp_scaled', 'job_exp_interaction']] | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error predicting wages: {str(e)}") | |
| # Apply job prediction mask | |
| final_forecast_df = future_df.copy() | |
| final_forecast_df['yhat'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, final_forecast_df['yhat']) | |
| final_forecast_df['yhat'] = np.minimum(final_forecast_df['yhat'], wage_cap) | |
| # Uncertainty intervals | |
| try: | |
| predictions = regressor.predict(X_train_reg) | |
| std_dev = np.std([tree.predict(X_train_reg) for tree in regressor.estimators_], axis=0) | |
| future_df['yhat_lower'] = np.maximum(final_forecast_df['yhat'] - 1.96 * std_dev.mean(), 0) | |
| future_df['yhat_upper'] = final_forecast_df['yhat'] + 1.96 * std_dev.mean() | |
| final_forecast_df['yhat_lower'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_lower']) | |
| final_forecast_df['yhat_upper'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_upper']) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error calculating uncertainty intervals: {str(e)}") | |
| # Evaluation | |
| try: | |
| comparison_df = pd.merge( | |
| test_df[['timestamp', 'contracted_wage']].rename(columns={'timestamp': 'ds', 'contracted_wage': 'y'}), | |
| final_forecast_df[['ds', 'yhat', 'yhat_lower', 'yhat_upper']], on='ds' | |
| ) | |
| valid_comparison_df = comparison_df[comparison_df['y'] > 0] | |
| if not valid_comparison_df.empty: | |
| weights = valid_comparison_df['y'] / valid_comparison_df['y'].mean() | |
| mae = np.average([abs(a - p) for a, p in zip(valid_comparison_df['y'], valid_comparison_df['yhat'])], weights=weights) | |
| mape = np.average([abs((a - p) / a) * 100 for a, p in zip(valid_comparison_df['y'], valid_comparison_df['yhat'])], weights=weights) | |
| else: | |
| mae = np.nan | |
| mape = np.nan | |
| results['classification_metrics']['mae'] = round(mae, 2) if not np.isnan(mae) else None | |
| results['classification_metrics']['mape'] = round(mape, 2) if not np.isnan(mape) else None | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error evaluating predictions: {str(e)}") | |
| # Plot results | |
| try: | |
| plt.figure(figsize=(12, 6)) | |
| plt.plot(comparison_df['ds'], comparison_df['y'], 'o-', label='Actual Values', markersize=4) | |
| plt.plot(comparison_df['ds'], comparison_df['yhat'], '-', label='Forecasted Values') | |
| plt.fill_between(comparison_df['ds'], comparison_df['yhat_lower'], comparison_df['yhat_upper'], | |
| color='gray', alpha=0.2, label='Uncertainty Interval') | |
| plt.title('Actual vs. Forecasted Daily Earnings (Last 20% of Dataset)') | |
| plt.xlabel('Date') | |
| plt.ylabel('Contracted Wage') | |
| plt.legend() | |
| plt.grid(True) | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| buffer = BytesIO() | |
| plt.savefig(buffer, format='png') | |
| buffer.seek(0) | |
| plot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') | |
| results['plot'] = f'data:image/png;base64,{plot_base64}' | |
| plt.close() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error generating plot: {str(e)}") | |
| # Worker Profile for Microfinance | |
| try: | |
| worker_data = df.copy() | |
| avg_daily_earning = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].mean() | |
| avg_monthly_earning = avg_daily_earning * 30 if not np.isnan(avg_daily_earning) else 0 | |
| job_distribution = worker_data['job_type'].value_counts(normalize=True) * 100 | |
| avg_feedback = worker_data['feedback_score'].mean() | |
| workholic_index = job_distribution.drop(labels=['No Job'], errors='ignore').sum() / 100 | |
| if avg_daily_earning > 0: | |
| earning_stability = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].std() / avg_daily_earning | |
| else: | |
| earning_stability = np.nan | |
| results['worker_profile'] = { | |
| 'average_daily_earning': round(avg_daily_earning, 2) if not np.isnan(avg_daily_earning) else None, | |
| 'estimated_monthly_earning': round(avg_monthly_earning, 2) if not np.isnan(avg_monthly_earning) else None, | |
| 'job_distribution': job_distribution.round(2).to_dict(), | |
| 'average_feedback_score': round(avg_feedback, 2) if not np.isnan(avg_feedback) else None, | |
| 'workholic_index': round(workholic_index, 2) if not np.isnan(workholic_index) else None, | |
| 'earning_stability': round(earning_stability, 2) if not np.isnan(earning_stability) else None | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error generating worker profile: {str(e)}") | |
| def convert_to_serializable(obj): | |
| if isinstance(obj, np.floating): | |
| return float(obj) | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| return obj | |
| return json.loads(json.dumps(results, default=convert_to_serializable)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |