Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware # Import CORSMiddleware | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressor | |
| from sklearn.preprocessing import RobustScaler, LabelEncoder | |
| from sklearn.model_selection import TimeSeriesSplit, GridSearchCV | |
| import matplotlib.pyplot as plt | |
| import json | |
| import base64 | |
| from io import BytesIO | |
| from PIL import Image | |
| import warnings | |
| import os | |
| from pydantic import BaseModel | |
| warnings.filterwarnings("ignore") | |
| app = FastAPI() | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allows all origins; replace with specific origins in production | |
| allow_credentials=True, | |
| allow_methods=["*"], # Allows all methods (GET, POST, etc.) | |
| allow_headers=["*"], # Allows all headers | |
| ) | |
| class WorkerInput(BaseModel): | |
| worker_id: int | |
| async def worker_forecast(input_data: WorkerInput): | |
| worker_id = input_data.worker_id | |
| # Initialize result dictionary | |
| results = { | |
| 'worker_id': worker_id, | |
| 'metrics': {}, | |
| 'worker_profile': {}, | |
| 'plot': '' | |
| } | |
| # Load dataset | |
| try: | |
| df = pd.read_csv('/app/extended_worker_dataset_random_reduced.csv') | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=500, detail="Dataset file not found") | |
| # Filter for one worker_id | |
| df = df[df['worker_id'] == worker_id].copy() | |
| if df.empty: | |
| raise HTTPException(status_code=404, detail=f"No data found for worker_id {worker_id}") | |
| # Data preprocessing | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], dayfirst=True, errors='coerce') | |
| df['has_job'] = (df['job_type'] != "No Job").astype(int) | |
| # Wage capping | |
| wage_cap = df[df['contracted_wage'] > 0]['contracted_wage'].quantile(0.98) | |
| df['contracted_wage'] = df['contracted_wage'].clip(lower=500, upper=wage_cap) | |
| # Encode categorical | |
| le_job = LabelEncoder() | |
| df['job_type_encoded'] = le_job.fit_transform(df['job_type']) | |
| le_labour = LabelEncoder() | |
| df['labour_category_encoded'] = le_labour.fit_transform(df['labour_category']) | |
| # Lagged and rolling features | |
| df['prev_wage'] = df['contracted_wage'].shift(1).fillna(0) | |
| df['prev_wage2'] = df['contracted_wage'].shift(2).fillna(0) | |
| df['prev_wage3'] = df['contracted_wage'].shift(3).fillna(0) | |
| df['rolling_mean_3'] = df['contracted_wage'].rolling(3, min_periods=1).mean().shift(1).fillna(0) | |
| df['rolling_std_3'] = df['contracted_wage'].rolling(3, min_periods=1).std().shift(1).fillna(0) | |
| df['rolling_mean_7'] = df['contracted_wage'].rolling(7, min_periods=1).mean().shift(1).fillna(0) | |
| # Train/test split | |
| split_point = int(len(df) * 0.8) | |
| train_df, test_df = df.iloc[:split_point].copy(), df.iloc[split_point:].copy() | |
| # Scaling | |
| scaler = RobustScaler() | |
| train_df[['job_type_scaled', 'years_exp_scaled', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled', | |
| 'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] = scaler.fit_transform( | |
| train_df[['job_type_encoded', 'years_of_experience', 'prev_wage', 'prev_wage2', 'prev_wage3', | |
| 'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7', 'labour_category_encoded']] | |
| ) | |
| train_df['job_exp_interaction'] = train_df['job_type_scaled'] * train_df['years_exp_scaled'] | |
| # Date features | |
| for subset in [train_df, test_df]: | |
| subset['dayofweek'] = subset['timestamp'].dt.dayofweek | |
| subset['month'] = subset['timestamp'].dt.month | |
| subset['year'] = subset['timestamp'].dt.year | |
| subset['dayofyear'] = subset['timestamp'].dt.dayofyear | |
| subset['is_weekend'] = subset['dayofweek'].isin([5, 6]).astype(int) | |
| # Classification model | |
| X_train_class = train_df[['dayofweek', 'month', 'year', 'dayofyear', | |
| 'is_weekend', 'job_type_encoded', 'feedback_score', | |
| 'years_of_experience']] | |
| y_train_class = train_df['has_job'] | |
| classifier = RandomForestClassifier(n_estimators=500, max_depth=12, min_samples_split=5, random_state=42) | |
| classifier.fit(X_train_class, y_train_class) | |
| # Regression model (only when has_job=1) | |
| train_df_reg = train_df[train_df['has_job'] == 1].copy() | |
| X_train_reg = train_df_reg[['dayofweek', 'month', 'year', 'dayofyear', | |
| 'is_weekend', 'job_type_scaled', 'feedback_score', | |
| 'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', | |
| 'prev_wage2_scaled', 'prev_wage3_scaled', 'rolling_mean_3_scaled', | |
| 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] | |
| y_train_reg = train_df_reg['contracted_wage'] | |
| # Hyperparameter tuning | |
| tscv = TimeSeriesSplit(n_splits=5) | |
| param_grid = { | |
| 'n_estimators': [200, 300, 400], | |
| 'learning_rate': [0.01, 0.05], | |
| 'max_depth': [3, 4, 5], | |
| 'min_samples_split': [3, 4], | |
| 'min_samples_leaf': [2, 3] | |
| } | |
| grid_search = GridSearchCV(GradientBoostingRegressor(random_state=42), | |
| param_grid, cv=tscv, scoring='neg_mean_absolute_error', n_jobs=-1) | |
| grid_search.fit(X_train_reg, y_train_reg) | |
| best_reg = grid_search.best_estimator_ | |
| best_params = grid_search.best_params_ | |
| # Quantile regressors | |
| reg_lower = GradientBoostingRegressor(loss='quantile', alpha=0.025, **best_params, random_state=42) | |
| reg_upper = GradientBoostingRegressor(loss='quantile', alpha=0.975, **best_params, random_state=42) | |
| reg_lower.fit(X_train_reg, y_train_reg) | |
| reg_upper.fit(X_train_reg, y_train_reg) | |
| # Future dataframe | |
| future_df = test_df[['timestamp', 'job_type', 'job_type_encoded', 'feedback_score', 'years_of_experience', | |
| 'prev_wage', 'prev_wage2', 'prev_wage3', 'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7', | |
| 'labour_category_encoded']].rename(columns={'timestamp': 'ds'}) | |
| future_df['dayofweek'] = future_df['ds'].dt.dayofweek | |
| future_df['month'] = future_df['ds'].dt.month | |
| future_df['year'] = future_df['ds'].dt.year | |
| future_df['dayofyear'] = future_df['ds'].dt.dayofyear | |
| future_df['is_weekend'] = future_df['dayofweek'].isin([5, 6]).astype(int) | |
| future_df[['job_type_scaled', 'years_exp_scaled', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled', | |
| 'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] = scaler.transform( | |
| future_df[['job_type_encoded', 'years_of_experience', 'prev_wage', 'prev_wage2', 'prev_wage3', | |
| 'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7', 'labour_category_encoded']] | |
| ) | |
| future_df['job_exp_interaction'] = future_df['job_type_scaled'] * future_df['years_exp_scaled'] | |
| # Predictions | |
| future_df['has_job_predicted'] = classifier.predict( | |
| future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_encoded', | |
| 'feedback_score', 'years_of_experience']] | |
| ) | |
| future_df['yhat'] = best_reg.predict( | |
| future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score', | |
| 'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled', | |
| 'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] | |
| ) | |
| future_df['yhat_lower'] = reg_lower.predict( | |
| future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score', | |
| 'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled', | |
| 'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] | |
| ) | |
| future_df['yhat_upper'] = reg_upper.predict( | |
| future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score', | |
| 'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled', | |
| 'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] | |
| ) | |
| # Apply job mask | |
| final_forecast_df = future_df.copy() | |
| final_forecast_df['yhat'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, final_forecast_df['yhat']) | |
| final_forecast_df['yhat'] = np.minimum(final_forecast_df['yhat'], wage_cap) | |
| final_forecast_df['yhat_lower'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_lower']) | |
| final_forecast_df['yhat_upper'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_upper']) | |
| final_forecast_df['yhat_lower'] = np.maximum(final_forecast_df['yhat_lower'], 0) | |
| # Evaluation | |
| comparison_df = pd.merge( | |
| test_df[['timestamp', 'contracted_wage']].rename(columns={'timestamp': 'ds', 'contracted_wage': 'y'}), | |
| final_forecast_df[['ds', 'yhat', 'yhat_lower', 'yhat_upper']], on='ds', how='left' | |
| ) | |
| comparison_df = comparison_df.set_index(final_forecast_df.index) # Align indices | |
| valid_comparison_df = comparison_df[comparison_df['y'] > 0].copy() | |
| if not valid_comparison_df.empty: | |
| valid_y = valid_comparison_df['y'].values | |
| valid_yhat = valid_comparison_df['yhat'].values | |
| weights = valid_comparison_df['y'].values / valid_comparison_df['y'].mean() | |
| mae = np.average(np.abs(valid_y - valid_yhat), weights=weights, axis=0) | |
| mape = np.average(np.abs((valid_y - valid_yhat) / valid_y) * 100, weights=weights, axis=0) | |
| else: | |
| mae, mape = np.nan, np.nan | |
| results['metrics']['mae'] = round(mae, 2) if not np.isnan(mae) else None | |
| results['metrics']['mape'] = round(mape, 2) if not np.isnan(mape) else None | |
| # Plot results | |
| plt.figure(figsize=(12, 6)) | |
| plt.plot(final_forecast_df['ds'], final_forecast_df['yhat'], '-', label='Forecasted', color='blue') | |
| plt.fill_between(final_forecast_df['ds'], final_forecast_df['yhat_lower'], final_forecast_df['yhat_upper'], | |
| color='gray', alpha=0.2, label='Uncertainty') | |
| plt.title('Forecasted Daily Earnings (Last 20%)') | |
| plt.xlabel('Date'); plt.ylabel('Contracted Wage') | |
| plt.legend(); plt.grid(True); plt.xticks(rotation=45); plt.tight_layout() | |
| # Save plot as PNG → JPG for compression | |
| buf_png = BytesIO() | |
| plt.savefig(buf_png, format="png", dpi=80, bbox_inches="tight") | |
| plt.close() | |
| buf_png.seek(0) | |
| img = Image.open(buf_png).convert("RGB") | |
| buf_jpg = BytesIO() | |
| img.save(buf_jpg, format="JPEG", quality=70, optimize=True) | |
| buf_jpg.seek(0) | |
| plot_base64 = base64.b64encode(buf_jpg.getvalue()).decode("utf-8") | |
| results['plot'] = f"data:image/jpeg;base64,{plot_base64}" | |
| # Save plot to file in /tmp | |
| plot_filename = f"/tmp/worker_{worker_id}_forecast.jpg" | |
| try: | |
| with open(plot_filename, "wb") as f: | |
| f.write(base64.b64decode(plot_base64)) | |
| except PermissionError as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to write plot file: {str(e)}") | |
| # Worker profile | |
| worker_data = df.copy() | |
| avg_daily = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].mean() | |
| avg_monthly = avg_daily * 30 if not np.isnan(avg_daily) else 0 | |
| job_dist = worker_data['job_type'].value_counts(normalize=True) * 100 | |
| avg_feedback = worker_data['feedback_score'].mean() | |
| work_index = job_dist.drop(labels=['No Job'], errors='ignore').sum() / 100 | |
| earn_stability = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].std() / avg_daily if avg_daily > 0 else np.nan | |
| results['worker_profile'] = { | |
| 'average_daily_earning': round(avg_daily, 2) if not np.isnan(avg_daily) else None, | |
| 'estimated_monthly_earning': round(avg_monthly, 2) if not np.isnan(avg_monthly) else None, | |
| 'job_distribution': job_dist.round(2).to_dict(), | |
| 'average_feedback_score': round(avg_feedback, 2) if not np.isnan(avg_feedback) else None, | |
| 'work_index': round(work_index, 2) if not np.isnan(work_index) else None, | |
| 'earning_stability': round(earn_stability, 2) if not np.isnan(earn_stability) else None | |
| } | |
| def convert_to_serializable(obj): | |
| if isinstance(obj, (np.floating, np.float32, np.float64)): return float(obj) | |
| if isinstance(obj, (np.integer, np.int32, np.int64)): return int(obj) | |
| if isinstance(obj, np.ndarray): return obj.tolist() | |
| return obj | |
| # Return JSON response with results | |
| return JSONResponse(content=json.loads(json.dumps(results, default=convert_to_serializable))) | |
| async def get_forecast_plot(worker_id: int): | |
| plot_filename = f"/tmp/worker_{worker_id}_forecast.jpg" | |
| if os.path.exists(plot_filename): | |
| return FileResponse(plot_filename, media_type="image/jpeg", filename=f"worker_{worker_id}_forecast.jpg") | |
| else: | |
| raise HTTPException(status_code=404, detail=f"Plot for worker_id {worker_id} not found") |