Spaces:
Build error
Build error
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import HTMLResponse | |
| from pydantic import BaseModel | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import sys | |
| import pickle | |
| # Add project root to path for imports if running from src | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
| from src.config import global_config | |
| from src.pipeline import RossmannPipeline | |
| from src.frontend import FRONTEND_HTML | |
| from src.core import setup_logger, settings | |
| logger = setup_logger(__name__) | |
| app = FastAPI( | |
| title=global_config.model.name, | |
| description=global_config.model.description, | |
| version="2.0.0" | |
| ) | |
| # Global variables | |
| pipeline = None | |
| store_metadata = None | |
| def startup_event(): | |
| global pipeline, store_metadata | |
| logger.info("Starting up application...") | |
| # 1. Load Model | |
| # Assuming the model is saved in models/rossmann_production_model.pkl as per old main.py | |
| # or we can train one if missing (but sticking to serving existing model for refactor) | |
| model_path = os.path.abspath("models/rossmann_production_model.pkl") | |
| if not os.path.exists(model_path): | |
| logger.warning(f"Model not found at {model_path}. Application may not work until trained.") | |
| # 2. Initialize Pipeline | |
| # We use the configured archive path (train.csv or schema) to init the pipeline components | |
| pipeline = RossmannPipeline(global_config.data.archive_path) | |
| if os.path.exists(model_path): | |
| with open(model_path, 'rb') as f: | |
| pipeline.model = pickle.load(f) | |
| logger.info("Model loaded successfully.") | |
| # 3. Load Store Metadata (for Open/Promo2 checks if needed, or simple merging) | |
| store_path = global_config.data.store_path | |
| if store_path and os.path.exists(store_path): | |
| store_metadata = pd.read_csv(store_path) | |
| logger.info(f"Store metadata loaded from {store_path}") | |
| class PredictionRequest(BaseModel): | |
| Store: int | |
| Date: str | |
| Promo: int | |
| StateHoliday: str | |
| SchoolHoliday: int | |
| Assortment: str | |
| StoreType: str | |
| CompetitionDistance: int | |
| ForecastDays: int = 1 # Horizon | |
| class ExplanationItem(BaseModel): | |
| feature: str | |
| impact: float | |
| formatted_val: str | |
| class PredictionResponse(BaseModel): | |
| Store: int | |
| Date: str | |
| PredictedSales: float | |
| ConfidenceInterval: list # [lower, upper] | |
| Explanation: list[ExplanationItem] = [] | |
| Forecast: list = [] # List of {date: str, sales: float} | |
| Status: str | |
| DebugInfo: dict = {} | |
| def read_root(): | |
| return FRONTEND_HTML | |
| def health_check(): | |
| return { | |
| "status": "healthy", | |
| "model_loaded": pipeline is not None and pipeline.model is not None, | |
| "config_name": global_config.model.name | |
| } | |
| def predict(request: PredictionRequest): | |
| if not pipeline or not pipeline.model: | |
| raise HTTPException(status_code=503, detail="Model not loaded") | |
| try: | |
| # 1. Generate Date Range (Batch Prediction) | |
| start_date = pd.to_datetime(request.Date) | |
| dates = [start_date + pd.Timedelta(days=i) for i in range(request.ForecastDays)] | |
| # 2. Prepare Input Batch | |
| rows = [] | |
| for d in dates: | |
| rows.append({ | |
| 'Store': request.Store, | |
| 'Date': d, | |
| 'Promo': request.Promo, | |
| 'StateHoliday': request.StateHoliday, | |
| 'SchoolHoliday': request.SchoolHoliday, | |
| 'Assortment': request.Assortment, | |
| 'StoreType': request.StoreType, | |
| 'CompetitionDistance': request.CompetitionDistance, | |
| 'Open': 1 | |
| }) | |
| input_data = pd.DataFrame(rows) | |
| # 3. Features | |
| processed_df = pipeline.run_feature_engineering(input_data) | |
| # 4. Encoding | |
| if 'StoreType' in processed_df.columns: | |
| processed_df['StoreType'] = processed_df['StoreType'].astype(str).map({'a':1, 'b':2, 'c':3, 'd':4}).fillna(0) | |
| if 'Assortment' in processed_df.columns: | |
| processed_df['Assortment'] = processed_df['Assortment'].astype(str).map({'a':1, 'b':2, 'c':3}).fillna(0) | |
| # 5. Selection | |
| feature_cols = [ | |
| 'Store', 'DayOfWeek', 'Promo', 'StateHoliday', 'SchoolHoliday', | |
| 'Year', 'Month', 'Day', 'IsWeekend', 'DayOfMonth', | |
| 'CompetitionDistance', 'StoreType', 'Assortment' | |
| ] | |
| for i in range(1, 6): | |
| feature_cols.extend([f'fourier_sin_{i}', f'fourier_cos_{i}']) | |
| feature_cols.append('days_to_easter') | |
| feature_cols.append('easter_effect') | |
| X = pd.DataFrame() | |
| for c in feature_cols: | |
| if c in processed_df.columns: | |
| val = processed_df[c] | |
| # Robustness: Cap Year to training range (2013-2015) | |
| if c == 'Year': | |
| val = val.clip(upper=2015) | |
| X[c] = val | |
| else: | |
| X[c] = 0 | |
| # Ensure numeric types | |
| X = X.apply(pd.to_numeric, errors='coerce').fillna(0) | |
| # 6. Predict & Explain | |
| # Standard Prediction | |
| y_log = pipeline.model.predict(X) | |
| y_sales = np.expm1(y_log) | |
| # DEBUG LOGGING | |
| print(f">>> DEBUG: X shape={X.shape}") | |
| print(f">>> DEBUG: X row 0={X.iloc[0].to_dict()}") | |
| print(f">>> DEBUG: Raw Log Pred row 0={y_log[0]:.4f}") | |
| logger.info(f"Target Pred: {y_sales[0]:.2f}") | |
| # Explanations (for first day) | |
| import xgboost as xgb | |
| X_first = X.iloc[[0]] | |
| dmat = xgb.DMatrix(X_first, feature_names=feature_cols) | |
| booster = pipeline.model.get_booster() | |
| contribs = booster.predict(dmat, pred_contribs=True)[0] | |
| feature_impacts = contribs[:-1] | |
| explanation_items = [] | |
| indicators = list(zip(feature_cols, feature_impacts)) | |
| indicators.sort(key=lambda x: abs(x[1]), reverse=True) | |
| for name, log_impact in indicators[:6]: | |
| uplift_pct = (np.exp(log_impact) - 1) * 100 | |
| mapping = { | |
| "Promo": "Promotion Lift", | |
| "CompetitionDistance": "Local Competition", | |
| "IsWeekend": "Weekend Traffic", | |
| "Month": "Seasonal Factor", | |
| "StateHoliday": "Holiday Impact", | |
| "SchoolHoliday": "School Schedule", | |
| "Year": "Annual Growth", | |
| "fourier_sin_1": "Core Seaonality" | |
| } | |
| display_name = mapping.get(name, name) | |
| if "fourier" in name: display_name = "Seasonality" | |
| explanation_items.append(ExplanationItem( | |
| feature=display_name, | |
| impact=uplift_pct, | |
| formatted_val=f"{uplift_pct:+.1f}%" | |
| )) | |
| # 7. Formatting Forecast with Confidence Bands | |
| # RMSPE is ~12%, so 95% CI is approx +/- 23.5% | |
| forecast_result = [] | |
| for d, s in zip(dates, y_sales): | |
| # Precision: use round(s, 2) but the UI might round again | |
| forecast_result.append({ | |
| "date": d.strftime('%Y-%m-%d'), | |
| "sales": float(round(s, 2)), | |
| "lb": float(round(s * 0.85, 2)), | |
| "ub": float(round(s * 1.15, 2)) | |
| }) | |
| # Global KPI bounds | |
| lower_bound = y_sales[0] * 0.85 | |
| upper_bound = y_sales[0] * 1.15 | |
| return PredictionResponse( | |
| Store=request.Store, | |
| Date=request.Date, | |
| PredictedSales=float(y_sales[0]), | |
| ConfidenceInterval=[float(lower_bound), float(upper_bound)], | |
| Explanation=explanation_items, | |
| Forecast=forecast_result, | |
| Status="success", | |
| DebugInfo={ | |
| "y_log": float(y_log[0]), | |
| "X_row0": X.iloc[0].to_dict() | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Prediction error: {e}") | |
| # Return fallback error, but we want to see traceback in logs | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |