modelLoanStatusCode / modelLoanAPI.py
agentsay's picture
Update modelLoanAPI.py
128b4d6 verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware # Import CORSMiddleware
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressor
from sklearn.preprocessing import RobustScaler, LabelEncoder
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
import matplotlib.pyplot as plt
import json
import base64
from io import BytesIO
from PIL import Image
import warnings
import os
from pydantic import BaseModel
warnings.filterwarnings("ignore")
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins; replace with specific origins in production
allow_credentials=True,
allow_methods=["*"], # Allows all methods (GET, POST, etc.)
allow_headers=["*"], # Allows all headers
)
class WorkerInput(BaseModel):
worker_id: int
@app.post("/worker_forecast/")
async def worker_forecast(input_data: WorkerInput):
worker_id = input_data.worker_id
# Initialize result dictionary
results = {
'worker_id': worker_id,
'metrics': {},
'worker_profile': {},
'plot': ''
}
# Load dataset
try:
df = pd.read_csv('/app/extended_worker_dataset_random_reduced.csv')
except FileNotFoundError:
raise HTTPException(status_code=500, detail="Dataset file not found")
# Filter for one worker_id
df = df[df['worker_id'] == worker_id].copy()
if df.empty:
raise HTTPException(status_code=404, detail=f"No data found for worker_id {worker_id}")
# Data preprocessing
df['timestamp'] = pd.to_datetime(df['timestamp'], dayfirst=True, errors='coerce')
df['has_job'] = (df['job_type'] != "No Job").astype(int)
# Wage capping
wage_cap = df[df['contracted_wage'] > 0]['contracted_wage'].quantile(0.98)
df['contracted_wage'] = df['contracted_wage'].clip(lower=500, upper=wage_cap)
# Encode categorical
le_job = LabelEncoder()
df['job_type_encoded'] = le_job.fit_transform(df['job_type'])
le_labour = LabelEncoder()
df['labour_category_encoded'] = le_labour.fit_transform(df['labour_category'])
# Lagged and rolling features
df['prev_wage'] = df['contracted_wage'].shift(1).fillna(0)
df['prev_wage2'] = df['contracted_wage'].shift(2).fillna(0)
df['prev_wage3'] = df['contracted_wage'].shift(3).fillna(0)
df['rolling_mean_3'] = df['contracted_wage'].rolling(3, min_periods=1).mean().shift(1).fillna(0)
df['rolling_std_3'] = df['contracted_wage'].rolling(3, min_periods=1).std().shift(1).fillna(0)
df['rolling_mean_7'] = df['contracted_wage'].rolling(7, min_periods=1).mean().shift(1).fillna(0)
# Train/test split
split_point = int(len(df) * 0.8)
train_df, test_df = df.iloc[:split_point].copy(), df.iloc[split_point:].copy()
# Scaling
scaler = RobustScaler()
train_df[['job_type_scaled', 'years_exp_scaled', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] = scaler.fit_transform(
train_df[['job_type_encoded', 'years_of_experience', 'prev_wage', 'prev_wage2', 'prev_wage3',
'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7', 'labour_category_encoded']]
)
train_df['job_exp_interaction'] = train_df['job_type_scaled'] * train_df['years_exp_scaled']
# Date features
for subset in [train_df, test_df]:
subset['dayofweek'] = subset['timestamp'].dt.dayofweek
subset['month'] = subset['timestamp'].dt.month
subset['year'] = subset['timestamp'].dt.year
subset['dayofyear'] = subset['timestamp'].dt.dayofyear
subset['is_weekend'] = subset['dayofweek'].isin([5, 6]).astype(int)
# Classification model
X_train_class = train_df[['dayofweek', 'month', 'year', 'dayofyear',
'is_weekend', 'job_type_encoded', 'feedback_score',
'years_of_experience']]
y_train_class = train_df['has_job']
classifier = RandomForestClassifier(n_estimators=500, max_depth=12, min_samples_split=5, random_state=42)
classifier.fit(X_train_class, y_train_class)
# Regression model (only when has_job=1)
train_df_reg = train_df[train_df['has_job'] == 1].copy()
X_train_reg = train_df_reg[['dayofweek', 'month', 'year', 'dayofyear',
'is_weekend', 'job_type_scaled', 'feedback_score',
'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled',
'prev_wage2_scaled', 'prev_wage3_scaled', 'rolling_mean_3_scaled',
'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']]
y_train_reg = train_df_reg['contracted_wage']
# Hyperparameter tuning
tscv = TimeSeriesSplit(n_splits=5)
param_grid = {
'n_estimators': [200, 300, 400],
'learning_rate': [0.01, 0.05],
'max_depth': [3, 4, 5],
'min_samples_split': [3, 4],
'min_samples_leaf': [2, 3]
}
grid_search = GridSearchCV(GradientBoostingRegressor(random_state=42),
param_grid, cv=tscv, scoring='neg_mean_absolute_error', n_jobs=-1)
grid_search.fit(X_train_reg, y_train_reg)
best_reg = grid_search.best_estimator_
best_params = grid_search.best_params_
# Quantile regressors
reg_lower = GradientBoostingRegressor(loss='quantile', alpha=0.025, **best_params, random_state=42)
reg_upper = GradientBoostingRegressor(loss='quantile', alpha=0.975, **best_params, random_state=42)
reg_lower.fit(X_train_reg, y_train_reg)
reg_upper.fit(X_train_reg, y_train_reg)
# Future dataframe
future_df = test_df[['timestamp', 'job_type', 'job_type_encoded', 'feedback_score', 'years_of_experience',
'prev_wage', 'prev_wage2', 'prev_wage3', 'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7',
'labour_category_encoded']].rename(columns={'timestamp': 'ds'})
future_df['dayofweek'] = future_df['ds'].dt.dayofweek
future_df['month'] = future_df['ds'].dt.month
future_df['year'] = future_df['ds'].dt.year
future_df['dayofyear'] = future_df['ds'].dt.dayofyear
future_df['is_weekend'] = future_df['dayofweek'].isin([5, 6]).astype(int)
future_df[['job_type_scaled', 'years_exp_scaled', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']] = scaler.transform(
future_df[['job_type_encoded', 'years_of_experience', 'prev_wage', 'prev_wage2', 'prev_wage3',
'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7', 'labour_category_encoded']]
)
future_df['job_exp_interaction'] = future_df['job_type_scaled'] * future_df['years_exp_scaled']
# Predictions
future_df['has_job_predicted'] = classifier.predict(
future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_encoded',
'feedback_score', 'years_of_experience']]
)
future_df['yhat'] = best_reg.predict(
future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score',
'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']]
)
future_df['yhat_lower'] = reg_lower.predict(
future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score',
'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']]
)
future_df['yhat_upper'] = reg_upper.predict(
future_df[['dayofweek', 'month', 'year', 'dayofyear', 'is_weekend', 'job_type_scaled', 'feedback_score',
'years_exp_scaled', 'job_exp_interaction', 'prev_wage_scaled', 'prev_wage2_scaled', 'prev_wage3_scaled',
'rolling_mean_3_scaled', 'rolling_std_3_scaled', 'rolling_mean_7_scaled', 'labour_category_scaled']]
)
# Apply job mask
final_forecast_df = future_df.copy()
final_forecast_df['yhat'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, final_forecast_df['yhat'])
final_forecast_df['yhat'] = np.minimum(final_forecast_df['yhat'], wage_cap)
final_forecast_df['yhat_lower'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_lower'])
final_forecast_df['yhat_upper'] = np.where(final_forecast_df['has_job_predicted'] == 0, 0, future_df['yhat_upper'])
final_forecast_df['yhat_lower'] = np.maximum(final_forecast_df['yhat_lower'], 0)
# Evaluation
comparison_df = pd.merge(
test_df[['timestamp', 'contracted_wage']].rename(columns={'timestamp': 'ds', 'contracted_wage': 'y'}),
final_forecast_df[['ds', 'yhat', 'yhat_lower', 'yhat_upper']], on='ds', how='left'
)
comparison_df = comparison_df.set_index(final_forecast_df.index) # Align indices
valid_comparison_df = comparison_df[comparison_df['y'] > 0].copy()
if not valid_comparison_df.empty:
valid_y = valid_comparison_df['y'].values
valid_yhat = valid_comparison_df['yhat'].values
weights = valid_comparison_df['y'].values / valid_comparison_df['y'].mean()
mae = np.average(np.abs(valid_y - valid_yhat), weights=weights, axis=0)
mape = np.average(np.abs((valid_y - valid_yhat) / valid_y) * 100, weights=weights, axis=0)
else:
mae, mape = np.nan, np.nan
results['metrics']['mae'] = round(mae, 2) if not np.isnan(mae) else None
results['metrics']['mape'] = round(mape, 2) if not np.isnan(mape) else None
# Plot results
plt.figure(figsize=(12, 6))
plt.plot(final_forecast_df['ds'], final_forecast_df['yhat'], '-', label='Forecasted', color='blue')
plt.fill_between(final_forecast_df['ds'], final_forecast_df['yhat_lower'], final_forecast_df['yhat_upper'],
color='gray', alpha=0.2, label='Uncertainty')
plt.title('Forecasted Daily Earnings (Last 20%)')
plt.xlabel('Date'); plt.ylabel('Contracted Wage')
plt.legend(); plt.grid(True); plt.xticks(rotation=45); plt.tight_layout()
# Save plot as PNG → JPG for compression
buf_png = BytesIO()
plt.savefig(buf_png, format="png", dpi=80, bbox_inches="tight")
plt.close()
buf_png.seek(0)
img = Image.open(buf_png).convert("RGB")
buf_jpg = BytesIO()
img.save(buf_jpg, format="JPEG", quality=70, optimize=True)
buf_jpg.seek(0)
plot_base64 = base64.b64encode(buf_jpg.getvalue()).decode("utf-8")
results['plot'] = f"data:image/jpeg;base64,{plot_base64}"
# Save plot to file in /tmp
plot_filename = f"/tmp/worker_{worker_id}_forecast.jpg"
try:
with open(plot_filename, "wb") as f:
f.write(base64.b64decode(plot_base64))
except PermissionError as e:
raise HTTPException(status_code=500, detail=f"Failed to write plot file: {str(e)}")
# Worker profile
worker_data = df.copy()
avg_daily = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].mean()
avg_monthly = avg_daily * 30 if not np.isnan(avg_daily) else 0
job_dist = worker_data['job_type'].value_counts(normalize=True) * 100
avg_feedback = worker_data['feedback_score'].mean()
work_index = job_dist.drop(labels=['No Job'], errors='ignore').sum() / 100
earn_stability = worker_data[worker_data['contracted_wage'] > 0]['contracted_wage'].std() / avg_daily if avg_daily > 0 else np.nan
results['worker_profile'] = {
'average_daily_earning': round(avg_daily, 2) if not np.isnan(avg_daily) else None,
'estimated_monthly_earning': round(avg_monthly, 2) if not np.isnan(avg_monthly) else None,
'job_distribution': job_dist.round(2).to_dict(),
'average_feedback_score': round(avg_feedback, 2) if not np.isnan(avg_feedback) else None,
'work_index': round(work_index, 2) if not np.isnan(work_index) else None,
'earning_stability': round(earn_stability, 2) if not np.isnan(earn_stability) else None
}
def convert_to_serializable(obj):
if isinstance(obj, (np.floating, np.float32, np.float64)): return float(obj)
if isinstance(obj, (np.integer, np.int32, np.int64)): return int(obj)
if isinstance(obj, np.ndarray): return obj.tolist()
return obj
# Return JSON response with results
return JSONResponse(content=json.loads(json.dumps(results, default=convert_to_serializable)))
@app.get("/worker_forecast/plot/{worker_id}")
async def get_forecast_plot(worker_id: int):
plot_filename = f"/tmp/worker_{worker_id}_forecast.jpg"
if os.path.exists(plot_filename):
return FileResponse(plot_filename, media_type="image/jpeg", filename=f"worker_{worker_id}_forecast.jpg")
else:
raise HTTPException(status_code=404, detail=f"Plot for worker_id {worker_id} not found")