# -*- coding: utf-8 -*- from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse import pandas as pd import numpy as np from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from sklearn.preprocessing import LabelEncoder, StandardScaler from sklearn.metrics import accuracy_score import matplotlib.pyplot as plt import json import base64 from io import BytesIO import warnings warnings.filterwarnings("ignore") app = FastAPI() @app.post("/predict_worker_earnings/") async def predict_worker_earnings(worker_id: int): try: # Initialize result dictionary results = { 'worker_id': worker_id, 'classification_metrics': {}, 'worker_profile': {}, 'plot': '' } # Load data df = pd.read_csv('/content/drive/MyDrive/30_year_crop_data/extended_worker_dataset.csv') # Filter for one worker_id df = df[df['worker_id'] == worker_id].copy() if df.empty: raise HTTPException(status_code=404, detail=f"No data found for worker_id {worker_id}") # Data preprocessing df['timestamp'] = pd.to_datetime(df['timestamp']) df['has_job'] = (df['job_type'] != "No Job").astype(int) wage_cap = df[df['contracted_wage'] > 0]['contracted_wage'].quantile(0.90) df['contracted_wage'] = df['contracted_wage'].clip(lower=500, upper=wage_cap) # Encode job_type le = LabelEncoder() df['job_type_encoded'] = le.fit_transform(df['job_type']) # Split data split_point = int(len(df) * 0.8) train_df = df.iloc[:split_point].copy() test_df = df.iloc[split_point:].copy() # Scale features scaler = StandardScaler() train_df[['job_type_scaled', 'years_exp_scaled']] = scaler.fit_transform( train_df[['job_type_encoded', 'years_of_experience']] ) train_df['job_exp_interaction'] = train_df['job_type_scaled'] * train_df['years_exp_scaled'] for subset in [train_df, test_df]: subset['dayofweek'] = subset['timestamp'].dt.dayofweek subset['month'] = subset['timestamp'].dt.month subset['year'] = subset['timestamp'].dt.year subset['dayofyear'] = subset['timestamp'].dt.dayofyear subset['is_weekend'] = subset['dayofweek'].isin([5, 6]).astype(int) # Train classifier X_train_class = train_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_encoded', 'feedback_score', 'years_of_experience']] y_train_class = train_df['has_job'] classifier = RandomForestClassifier( n_estimators=500, max_depth=12, min_samples_split=5, random_state=42 ) classifier.fit(X_train_class, y_train_class) # Train regressor train_df_reg = train_df[train_df['has_job'] == 1].copy() X_train_reg = train_df_reg[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score', 'years_exp_scaled', 'job_exp_interaction']] y_train_reg = train_df_reg['contracted_wage'] regressor = RandomForestRegressor( n_estimators=300, max_depth=10, min_samples_split=4, random_state=42 ) regressor.fit(X_train_reg, y_train_reg) # Prepare future dataframe future_df = test_df[['timestamp', 'job_type', 'job_type_encoded', 'feedback_score', 'years_of_experience']].rename(columns={'timestamp': 'ds'}) future_df['dayofweek'] = future_df['ds'].dt.dayofweek future_df['month'] = future_df['ds'].dt.month future_df['year'] = future_df['ds'].dt.year future_df['dayofyear'] = future_df['ds'].dt.dayofyear future_df['is_weekend'] = future_df['dayofweek'].isin([5, 6]).astype(int) future_df[['job_type_scaled', 'years_exp_scaled']] = scaler.transform( future_df[['job_type_encoded', 'years_of_experience']] ) future_df['job_exp_interaction'] = future_df['job_type_scaled'] * future_df['years_exp_scaled'] # Predict job/no-job future_df['has_job_predicted'] = classifier.predict( future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_encoded', 'feedback_score', 'years_of_experience']] ) # Evaluate classifier accuracy test_df['has_job'] = (test_df['job_type'] != "No Job").astype(int) acc = accuracy_score(test_df['has_job'], future_df['has_job_predicted']) results['classification_metrics']['accuracy'] = round(acc * 100, 2) # Predict wages future_df['yhat'] = regressor.predict( future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score', 'years_exp_scaled', 'job_exp_interaction']] ) # Apply job prediction mask final_forecast_df = future_df.copy() final_forecast_df['yhat'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, final_forecast_df['yhat']) final_forecast_df['yhat'] = np.minimum(final_forecast_df['yhat'], wage_cap) # Uncertainty intervals predictions = regressor.predict(X_train_reg) std_dev = np.std([tree.predict(X_train_reg) for tree in regressor.estimators_], axis=0) future_df['yhat_lower'] = np.maximum(final_forecast_df['yhat'] - 1.96 * std_dev.mean(), 0) future_df['yhat_upper'] = final_forecast_df['yhat'] + 1.96 * std_dev.mean() final_forecast_df['yhat_lower'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_lower']) final_forecast_df['yhat_upper'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_upper']) # Evaluation comparison_df = pd.merge( test_df[['timestamp', 'contracted_wage']].rename(columns={'timestamp': 'ds', 'contracted_wage': 'y'}), final_forecast_df[['ds', 'yhat', 'yhat_lower', 'yhat_upper']], on='ds' ) valid_comparison_df = comparison_df[comparison_df['y'] > 0] if not valid_comparison_df.empty: weights = valid_comparison_df['y'] / valid_comparison_df['y'].mean() mae = np.average([abs(a - p) for a, p in zip(valid_comparison_df['y'], valid_comparison_df['yhat'])], weights=weights) mape = np.average([abs((a - p) / a) * 100 for a, p in zip(valid_comparison_df['y'], valid_comparison_df['yhat'])], weights=weights) else: mae = np.nan mape = np.nan results['classification_metrics']['mae'] = round(mae, 2) if not np.isnan(mae) else None results['classification_metrics']['mape'] = round(mape, 2) if not np.isnan(mape) else None # Plot results plt.figure(figsize=(12, 6)) plt.plot(comparison_df['ds'], comparison_df['y'], 'o-', label='Actual Values', markersize=4) plt.plot(comparison_df['ds'], comparison_df['yhat'], '-', label='Forecasted Values') plt.fill_between(comparison_df['ds'], comparison_df['yhat_lower'], comparison_df['yhat_upper'], color='gray', alpha=0.2, label='Uncertainty Interval') plt.title('Actual vs. Forecasted Daily Earnings (Last 20% of Dataset)') plt.xlabel('Date') plt.ylabel('Contracted Wage') plt.legend() plt.grid(True) plt.xticks(rotation=45) plt.tight_layout() buffer = BytesIO() plt.savefig(buffer, format='png') buffer.seek(0) plot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') results['plot'] = f'data:image/png;base64,{plot_base64}' plt.close() # Worker Profile for Microfinance worker_data = df.copy() avg_daily_earning = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].mean() avg_monthly_earning = avg_daily_earning * 30 if not np.isnan(avg_daily_earning) else 0 job_distribution = worker_data['job_type'].value_counts(normalize=True) * 100 avg_feedback = worker_data['feedback_score'].mean() workholic_index = job_distribution.drop(labels=['No Job'], errors='ignore').sum() / 100 if avg_daily_earning > 0: earning_stability = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].std() / avg_daily_earning else: earning_stability = np.nan results['worker_profile'] = { 'average_daily_earning': round(avg_daily_earning, 2) if not np.isnan(avg_daily_earning) else None, 'estimated_monthly_earning': round(avg_monthly_earning, 2) if not np.isnan(avg_monthly_earning) else None, 'job_distribution': job_distribution.round(2).to_dict(), 'average_feedback_score': round(avg_feedback, 2) if not np.isnan(avg_feedback) else None, 'workholic_index': round(workholic_index, 2) if not np.isnan(workholic_index) else None, 'earning_stability': round(earning_stability, 2) if not np.isnan(earning_stability) else None } def convert_to_serializable(obj): if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.ndarray): return obj.tolist() return obj return JSONResponse(content=json.loads(json.dumps(results, default=convert_to_serializable))) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)