from fastapi import FastAPI from fastapi.responses import JSONResponse import pandas as pd import numpy as np import lightgbm as lgb from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error app = FastAPI(title="Displacement Prediction API", description="API for predicting displacement using LightGBM quantile regression") @app.get("/predict", response_class=JSONResponse) async def predict_displacement(): """ Endpoint to predict displacement using LightGBM quantile regression. Returns JSON with evaluation metrics and chart data for the first 100 test samples. """ # Load data try: df = pd.read_csv("synthetic_ps_points.csv") except FileNotFoundError: return JSONResponse( status_code=404, content={"error": "Dataset file 'synthetic_ps_points.csv' not found in the working directory."} ) # Identify displacement columns disp_cols = [c for c in df.columns if c.startswith("disp_mm_")] disp_cols = sorted(disp_cols, key=lambda x: pd.to_datetime(x.replace("disp_mm_",""), format="%Y%m%d")) # Convert wide to long format long_df = df.melt( id_vars=["ps_id", "lat", "lon", "velocity_mm_yr", "risk"], value_vars=disp_cols, var_name="date", value_name="disp_mm" ) # Parse date long_df["date"] = long_df["date"].str.replace("disp_mm_", "").astype(int) long_df["date"] = pd.to_datetime(long_df["date"], format="%Y%m%d") # Sort by ps_id and date long_df = long_df.sort_values(["ps_id", "date"]) # Create lag features (last 3 displacements) long_df["lag1"] = long_df.groupby("ps_id")["disp_mm"].shift(1) long_df["lag2"] = long_df.groupby("ps_id")["disp_mm"].shift(2) long_df["lag3"] = long_df.groupby("ps_id")["disp_mm"].shift(3) # Drop rows with NaN values from lagging long_df = long_df.dropna() # Features and target X = long_df[["lat", "lon", "velocity_mm_yr", "lag1", "lag2", "lag3"]] y = long_df["disp_mm"] # Train-test split (no shuffling to preserve time series order) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # Train LightGBM models for quantiles quantiles = [0.1, 0.5, 0.9] models = {} for q in quantiles: params = { "objective": "quantile", "alpha": q, "learning_rate": 0.05, "n_estimators": 500, "max_depth": 6, "verbose": -1 } model = lgb.LGBMRegressor(**params) model.fit(X_train, y_train) models[q] = model # Predict quantiles preds = {q: models[q].predict(X_test) for q in quantiles} # Evaluation metrics # Point forecast evaluation (using 0.5 quantile) mse = mean_squared_error(y_test, preds[0.5]) rmse = np.sqrt(mse) mae = mean_absolute_error(y_test, preds[0.5]) # Pinball loss function for quantile evaluation def pinball_loss(y_true, y_pred, alpha): error = y_true - y_pred loss = np.maximum(alpha * error, (alpha - 1) * error) return np.mean(loss) pinball_loss_01 = pinball_loss(y_test, preds[0.1], 0.1) pinball_loss_05 = pinball_loss(y_test, preds[0.5], 0.5) pinball_loss_09 = pinball_loss(y_test, preds[0.9], 0.9) # Coverage and width of the 80% prediction interval coverage = np.mean((y_test >= preds[0.1]) & (y_test <= preds[0.9])) interval_width = np.mean(preds[0.9] - preds[0.1]) # Calculate velocity last_disp_test = X_test["lag1"].iloc[-1] last_pred_disp = preds[0.5][-1] time_diff_days = (long_df["date"].iloc[-1] - long_df["date"].iloc[-2]).days time_diff_years = time_diff_days / 365.25 predicted_velocity = (last_pred_disp - last_disp_test) / time_diff_years actual_velocity = long_df["velocity_mm_yr"].iloc[-1] # Prepare Chartify-compatible JSON output (first 100 samples) chart_data = [] for i in range(min(100, len(y_test))): chart_data.append({ "index": i, "actual": float(y_test.values[i]), "predicted_median": float(preds[0.5][i]), "lower_bound": float(preds[0.1][i]), "upper_bound": float(preds[0.9][i]) }) # Combine metrics and chart data response = { "metrics": { "mse": float(mse), "rmse": float(rmse), "mae": float(mae), "pinball_loss_0.1": float(pinball_loss_01), "pinball_loss_0.5": float(pinball_loss_05), "pinball_loss_0.9": float(pinball_loss_09), "coverage_80_percent": float(coverage * 100), "interval_width": float(interval_width), "actual_velocity": float(actual_velocity), "predicted_velocity": float(predicted_velocity), "velocity_error": float(abs(actual_velocity - predicted_velocity)) }, "chart_data": chart_data } return JSONResponse(content=response) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)