File size: 13,389 Bytes
25488f2
9a4a869
128b4d6
3798f44
19dcae0
9a4a869
 
 
19dcae0
 
 
 
9a4a869
19dcae0
9a4a869
 
19dcae0
 
3798f44
 
 
128b4d6
 
 
 
 
 
 
 
 
9a4a869
25488f2
 
9a4a869
 
 
 
 
 
 
 
 
 
 
 
 
3798f44
58ee41f
9a4a869
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19dcae0
9a4a869
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58ee41f
 
 
 
 
 
 
9a4a869
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58ee41f
9a4a869
 
 
128b4d6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware  # Import CORSMiddleware
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressor
from sklearn.preprocessing import RobustScaler, LabelEncoder
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
import matplotlib.pyplot as plt
import json
import base64
from io import BytesIO
from PIL import Image
import warnings
import os
from pydantic import BaseModel

warnings.filterwarnings("ignore")

app = FastAPI()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins; replace with specific origins in production
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods (GET, POST, etc.)
    allow_headers=["*"],  # Allows all headers
)

class WorkerInput(BaseModel):
    worker_id: int

@app.post("/worker_forecast/")
async def worker_forecast(input_data: WorkerInput):
    worker_id = input_data.worker_id

    # Initialize result dictionary
    results = {
        'worker_id': worker_id,
        'metrics': {},
        'worker_profile': {},
        'plot': ''
    }

    # Load dataset
    try:
        df = pd.read_csv('/app/extended_worker_dataset_random_reduced.csv')
    except FileNotFoundError:
        raise HTTPException(status_code=500, detail="Dataset file not found")

    # Filter for one worker_id
    df = df[df['worker_id'] == worker_id].copy()
    if df.empty:
        raise HTTPException(status_code=404, detail=f"No data found for worker_id {worker_id}")

    # Data preprocessing
    df['timestamp'] = pd.to_datetime(df['timestamp'], dayfirst=True, errors='coerce')
    df['has_job'] = (df['job_type'] != "No Job").astype(int)

    # Wage capping
    wage_cap = df[df['contracted_wage'] > 0]['contracted_wage'].quantile(0.98)
    df['contracted_wage'] = df['contracted_wage'].clip(lower=500, upper=wage_cap)

    # Encode categorical
    le_job = LabelEncoder()
    df['job_type_encoded'] = le_job.fit_transform(df['job_type'])
    le_labour = LabelEncoder()
    df['labour_category_encoded'] = le_labour.fit_transform(df['labour_category'])

    # Lagged and rolling features
    df['prev_wage'] = df['contracted_wage'].shift(1).fillna(0)
    df['prev_wage2'] = df['contracted_wage'].shift(2).fillna(0)
    df['prev_wage3'] = df['contracted_wage'].shift(3).fillna(0)
    df['rolling_mean_3'] = df['contracted_wage'].rolling(3, min_periods=1).mean().shift(1).fillna(0)
    df['rolling_std_3'] = df['contracted_wage'].rolling(3, min_periods=1).std().shift(1).fillna(0)
    df['rolling_mean_7'] = df['contracted_wage'].rolling(7, min_periods=1).mean().shift(1).fillna(0)

    # Train/test split
    split_point = int(len(df) * 0.8)
    train_df, test_df = df.iloc[:split_point].copy(), df.iloc[split_point:].copy()

    # Scaling
    scaler = RobustScaler()
    train_df[['job_type_scaled', 'years_exp_scaled', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
              'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] = scaler.fit_transform(
        train_df[['job_type_encoded', 'years_of_experience', 'prev_wage', 'prev_wage2', 'prev_wage3',
                  'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7', 'labour_category_encoded']]
    )
    train_df['job_exp_interaction'] = train_df['job_type_scaled'] * train_df['years_exp_scaled']

    # Date features
    for subset in [train_df, test_df]:
        subset['dayofweek'] = subset['timestamp'].dt.dayofweek
        subset['month'] = subset['timestamp'].dt.month
        subset['year'] = subset['timestamp'].dt.year
        subset['dayofyear'] = subset['timestamp'].dt.dayofyear
        subset['is_weekend'] = subset['dayofweek'].isin([5, 6]).astype(int)

    # Classification model
    X_train_class = train_df[['dayofweek', 'month', 'year', 'dayofyear',
                              'is_weekend', 'job_type_encoded', 'feedback_score',
                              'years_of_experience']]
    y_train_class = train_df['has_job']
    classifier = RandomForestClassifier(n_estimators=500, max_depth=12, min_samples_split=5, random_state=42)
    classifier.fit(X_train_class, y_train_class)

    # Regression model (only when has_job=1)
    train_df_reg = train_df[train_df['has_job'] == 1].copy()
    X_train_reg = train_df_reg[['dayofweek', 'month', 'year', 'dayofyear',
                                'is_weekend', 'job_type_scaled', 'feedback_score',
                                'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled',
                                'prev_wage2_scaled', 'prev_wage3_scaled', 'rolling_mean_3_scaled',
                                'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']]
    y_train_reg = train_df_reg['contracted_wage']

    # Hyperparameter tuning
    tscv = TimeSeriesSplit(n_splits=5)
    param_grid = {
        'n_estimators': [200, 300, 400],
        'learning_rate': [0.01, 0.05],
        'max_depth': [3, 4, 5],
        'min_samples_split': [3, 4],
        'min_samples_leaf': [2, 3]
    }
    grid_search = GridSearchCV(GradientBoostingRegressor(random_state=42),
                               param_grid, cv=tscv, scoring='neg_mean_absolute_error', n_jobs=-1)
    grid_search.fit(X_train_reg, y_train_reg)
    best_reg = grid_search.best_estimator_
    best_params = grid_search.best_params_

    # Quantile regressors
    reg_lower = GradientBoostingRegressor(loss='quantile', alpha=0.025, **best_params, random_state=42)
    reg_upper = GradientBoostingRegressor(loss='quantile', alpha=0.975, **best_params, random_state=42)
    reg_lower.fit(X_train_reg, y_train_reg)
    reg_upper.fit(X_train_reg, y_train_reg)

    # Future dataframe
    future_df = test_df[['timestamp', 'job_type', 'job_type_encoded', 'feedback_score', 'years_of_experience',
                         'prev_wage', 'prev_wage2', 'prev_wage3', 'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7',
                         'labour_category_encoded']].rename(columns={'timestamp': 'ds'})
    future_df['dayofweek'] = future_df['ds'].dt.dayofweek
    future_df['month'] = future_df['ds'].dt.month
    future_df['year'] = future_df['ds'].dt.year
    future_df['dayofyear'] = future_df['ds'].dt.dayofyear
    future_df['is_weekend'] = future_df['dayofweek'].isin([5, 6]).astype(int)
    future_df[['job_type_scaled', 'years_exp_scaled', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
               'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] = scaler.transform(
        future_df[['job_type_encoded', 'years_of_experience', 'prev_wage', 'prev_wage2', 'prev_wage3',
                   'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7', 'labour_category_encoded']]
    )
    future_df['job_exp_interaction'] = future_df['job_type_scaled'] * future_df['years_exp_scaled']

    # Predictions
    future_df['has_job_predicted'] = classifier.predict(
        future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_encoded',
                   'feedback_score', 'years_of_experience']]
    )
    future_df['yhat'] = best_reg.predict(
        future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score',
                   'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
                   'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']]
    )
    future_df['yhat_lower'] = reg_lower.predict(
        future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score',
                   'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
                   'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']]
    )
    future_df['yhat_upper'] = reg_upper.predict(
        future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score',
                   'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
                   'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']]
    )

    # Apply job mask
    final_forecast_df = future_df.copy()
    final_forecast_df['yhat'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, final_forecast_df['yhat'])
    final_forecast_df['yhat'] = np.minimum(final_forecast_df['yhat'], wage_cap)
    final_forecast_df['yhat_lower'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_lower'])
    final_forecast_df['yhat_upper'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_upper'])
    final_forecast_df['yhat_lower'] = np.maximum(final_forecast_df['yhat_lower'], 0)

    # Evaluation
    comparison_df = pd.merge(
        test_df[['timestamp', 'contracted_wage']].rename(columns={'timestamp': 'ds', 'contracted_wage': 'y'}),
        final_forecast_df[['ds', 'yhat', 'yhat_lower', 'yhat_upper']], on='ds', how='left'
    )
    comparison_df = comparison_df.set_index(final_forecast_df.index)  # Align indices
    valid_comparison_df = comparison_df[comparison_df['y'] > 0].copy()

    if not valid_comparison_df.empty:
        valid_y = valid_comparison_df['y'].values
        valid_yhat = valid_comparison_df['yhat'].values
        weights = valid_comparison_df['y'].values / valid_comparison_df['y'].mean()
        
        mae = np.average(np.abs(valid_y - valid_yhat), weights=weights, axis=0)
        mape = np.average(np.abs((valid_y - valid_yhat) / valid_y) * 100, weights=weights, axis=0)
    else:
        mae, mape = np.nan, np.nan

    results['metrics']['mae'] = round(mae, 2) if not np.isnan(mae) else None
    results['metrics']['mape'] = round(mape, 2) if not np.isnan(mape) else None

    # Plot results
    plt.figure(figsize=(12, 6))
    plt.plot(final_forecast_df['ds'], final_forecast_df['yhat'], '-', label='Forecasted', color='blue')
    plt.fill_between(final_forecast_df['ds'], final_forecast_df['yhat_lower'], final_forecast_df['yhat_upper'],
                     color='gray', alpha=0.2, label='Uncertainty')
    plt.title('Forecasted Daily Earnings (Last 20%)')
    plt.xlabel('Date'); plt.ylabel('Contracted Wage')
    plt.legend(); plt.grid(True); plt.xticks(rotation=45); plt.tight_layout()

    # Save plot as PNG → JPG for compression
    buf_png = BytesIO()
    plt.savefig(buf_png, format="png", dpi=80, bbox_inches="tight")
    plt.close()
    buf_png.seek(0)
    img = Image.open(buf_png).convert("RGB")
    buf_jpg = BytesIO()
    img.save(buf_jpg, format="JPEG", quality=70, optimize=True)
    buf_jpg.seek(0)
    plot_base64 = base64.b64encode(buf_jpg.getvalue()).decode("utf-8")
    results['plot'] = f"data:image/jpeg;base64,{plot_base64}"

    # Save plot to file in /tmp
    plot_filename = f"/tmp/worker_{worker_id}_forecast.jpg"
    try:
        with open(plot_filename, "wb") as f:
            f.write(base64.b64decode(plot_base64))
    except PermissionError as e:
        raise HTTPException(status_code=500, detail=f"Failed to write plot file: {str(e)}")

    # Worker profile
    worker_data = df.copy()
    avg_daily = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].mean()
    avg_monthly = avg_daily * 30 if not np.isnan(avg_daily) else 0
    job_dist = worker_data['job_type'].value_counts(normalize=True) * 100
    avg_feedback = worker_data['feedback_score'].mean()
    work_index = job_dist.drop(labels=['No Job'], errors='ignore').sum() / 100
    earn_stability = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].std() / avg_daily if avg_daily > 0 else np.nan

    results['worker_profile'] = {
        'average_daily_earning': round(avg_daily, 2) if not np.isnan(avg_daily) else None,
        'estimated_monthly_earning': round(avg_monthly, 2) if not np.isnan(avg_monthly) else None,
        'job_distribution': job_dist.round(2).to_dict(),
        'average_feedback_score': round(avg_feedback, 2) if not np.isnan(avg_feedback) else None,
        'work_index': round(work_index, 2) if not np.isnan(work_index) else None,
        'earning_stability': round(earn_stability, 2) if not np.isnan(earn_stability) else None
    }

    def convert_to_serializable(obj):
        if isinstance(obj, (np.floating, np.float32, np.float64)): return float(obj)
        if isinstance(obj, (np.integer, np.int32, np.int64)): return int(obj)
        if isinstance(obj, np.ndarray): return obj.tolist()
        return obj

    # Return JSON response with results
    return JSONResponse(content=json.loads(json.dumps(results, default=convert_to_serializable)))

@app.get("/worker_forecast/plot/{worker_id}")
async def get_forecast_plot(worker_id: int):
    plot_filename = f"/tmp/worker_{worker_id}_forecast.jpg"
    if os.path.exists(plot_filename):
        return FileResponse(plot_filename, media_type="image/jpeg", filename=f"worker_{worker_id}_forecast.jpg")
    else:
        raise HTTPException(status_code=404, detail=f"Plot for worker_id {worker_id} not found")