Spaces:
Sleeping
Sleeping
| import os | |
| from datetime import datetime, timedelta | |
| import tempfile | |
| import gradio as gr | |
| import hopsworks | |
| import numpy as np | |
| import pandas as pd | |
| from xgboost import XGBRegressor | |
| from functions.util import plot_air_quality_forecast | |
| # ------------------------- | |
| # CONFIG | |
| # ------------------------- | |
| SENSOR_CANONICAL = { | |
| # Whitehall Street | |
| "whitehall_street": "whitehall_street", | |
| "whitehall": "whitehall_street", | |
| # Meadowside | |
| "meadowside": "meadowside", | |
| # Lochee Road | |
| "lochee_road": "lochee_road", | |
| # Seagate | |
| "seagate": "seagate", | |
| # Broughty Ferry Road | |
| "broughty_ferry_road": "broughty_ferry_road", | |
| "ferry_road": "broughty_ferry_road", | |
| # Mains Loan (typo hell) | |
| "mains_loan": "mains_loan", | |
| "mains_laon": "mains_loan", # typo | |
| "mains_loa": "mains_loan", # typo | |
| } | |
| CITY_NAME = "Dundee" | |
| # internal sensor names (as in Hopsworks) -> pretty labels for UI | |
| SENSOR_LABELS = { | |
| "whitehall_street": "Whitehall Street", | |
| "meadowside": "Meadowside", | |
| "lochee_road": "Lochee Road", | |
| "seagate": "Seagate", | |
| "broughty_ferry_road": "Broughty Ferry Road", | |
| "mains_loan": "Mains Loan", | |
| } | |
| DISPLAY_TO_INTERNAL = {v: k for k, v in SENSOR_LABELS.items()} | |
| feature_order = [ | |
| "date", "pm10", "no2", | |
| "temperature_2m_mean", "precipitation_sum", | |
| "wind_speed_10m_max", "wind_direction_10m_dominant", | |
| "pm25_lag1", "pm25_lag2", "pm25_lag3", | |
| "sensor_broughty_ferry_road", "sensor_lochee_road", | |
| "sensor_mains_loan", "sensor_meadowside", | |
| "sensor_seagate", "sensor_whitehall_street" | |
| ] | |
| # ------------------------- | |
| # HOPSWORKS + MODEL LOADING (LAZY) | |
| # ------------------------- | |
| _resources = None | |
| def _login_hopsworks(): | |
| api_key = os.environ.get("HOPSWORKS_API_KEY") | |
| if not api_key: | |
| raise RuntimeError( | |
| "HOPSWORKS_API_KEY is not set. " | |
| "Add it as a secret in your HuggingFace Space settings." | |
| ) | |
| project = hopsworks.login(api_key_value=api_key) | |
| return project | |
| def load_resources(): | |
| """ | |
| Connect to Hopsworks and load model + feature view + weather FG. | |
| This is our SINGLE lag-aware model setup. | |
| """ | |
| project = _login_hopsworks() | |
| fs = project.get_feature_store() | |
| fv = fs.get_feature_view("dundee_fv", version=3) # lag-feature FeatureView | |
| fv.init_batch_scoring(1) | |
| weather_fg = fs.get_feature_group("dundee_weather_fg", version=1) | |
| mr = project.get_model_registry() | |
| model_obj = mr.get_model("dundee_pm25_xgboost", version=2) | |
| model_dir = model_obj.download() | |
| model = XGBRegressor() | |
| model.load_model(model_dir + "/model.json") | |
| return { | |
| "project": project, | |
| "fs": fs, | |
| "fv": fv, | |
| "weather_fg": weather_fg, | |
| "model": model, | |
| } | |
| def get_resources(): | |
| """Lazy loader for the single model configuration.""" | |
| global _resources | |
| if _resources is None: | |
| _resources = load_resources() | |
| return _resources | |
| # ------------------------- | |
| # HELPER: AQI CATEGORY (optional, not yet used in UI) | |
| # ------------------------- | |
| def pm25_to_aqi_category(pm25: float) -> str: | |
| """Rough AQI-style category from PM2.5 (µg/m3).""" | |
| if pm25 <= 12: | |
| return "Good 😊" | |
| if pm25 <= 35.4: | |
| return "Moderate 🙂" | |
| if pm25 <= 55.4: | |
| return "Unhealthy for Sensitive Groups 😐" | |
| if pm25 <= 150.4: | |
| return "Unhealthy 😷" | |
| if pm25 <= 250.4: | |
| return "Very Unhealthy 🤢" | |
| return "Hazardous ☠️" | |
| def sensor_one_hot(sensor_internal: str): | |
| cols = { | |
| "sensor_broughty_ferry_road": 0, | |
| "sensor_lochee_road": 0, | |
| "sensor_mains_loan": 0, | |
| "sensor_meadowside": 0, | |
| "sensor_seagate": 0, | |
| "sensor_whitehall_street": 0, | |
| } | |
| col_name = f"sensor_{sensor_internal}" | |
| if col_name in cols: | |
| cols[col_name] = 1 | |
| return cols | |
| # ------------------------- | |
| # FORECAST LOGIC (lag-aware model) | |
| # ------------------------- | |
| def generate_forecast(sensor_internal: str, days: int) -> str | None: | |
| print("DEBUG: Sensor internal =", sensor_internal) | |
| print("DEBUG: Canonical =", SENSOR_CANONICAL.get(sensor_internal, sensor_internal)) | |
| """ | |
| Generate forecast PNG path for given sensor. | |
| Returns path to saved PNG or None if no data. | |
| """ | |
| resources = get_resources() | |
| model = resources["model"] | |
| weather_fg = resources["weather_fg"] | |
| project = resources["project"] | |
| today = datetime.utcnow().date() | |
| # Future weather (city-level, same for all sensors) | |
| df_future = weather_fg.read().sort_values("date") | |
| df_future["date"] = pd.to_datetime(df_future["date"], unit="ms").dt.date | |
| print("DEBUG: WEATHER FUTURE DATES:", df_future["date"].tail(15).tolist()) | |
| print("DEBUG: Today:", today) | |
| # PM2.5 history for this sensor | |
| aq_fg = project.get_feature_store().get_feature_group("dundee_air_quality", version=1) | |
| aq_df = aq_fg.read() | |
| aq_df["date"] = pd.to_datetime(aq_df["date"], unit="ms").dt.date | |
| canonical = SENSOR_CANONICAL.get(sensor_internal, sensor_internal) | |
| aq_df = aq_df[aq_df["sensor"] == canonical].sort_values("date") | |
| pm25_history = list(aq_df["pm25"].values[-3:]) | |
| pm10_history = list(aq_df["pm10"].values[-3:]) | |
| no2_history = list(aq_df["no2"].values[-3:]) | |
| print("DEBUG: AQ DF HEAD\n", aq_df.head()) | |
| print("DEBUG: AQ DF UNIQUE SENSORS:", aq_df["sensor"].unique()) | |
| if aq_df.empty: | |
| return None | |
| if len(pm25_history) < 3: | |
| # Not enough history, bail gracefully | |
| return None | |
| preds = [] | |
| for offset in range(1, days + 1): | |
| target_date = today + timedelta(days=offset) | |
| row = df_future[df_future["date"] == target_date] | |
| if len(row) == 0: | |
| continue | |
| weather = row.iloc[0] | |
| # Build sensor one-hot | |
| sensor_flags = sensor_one_hot(sensor_internal) | |
| # Build X row *exactly matching model training* | |
| X = { | |
| "date": target_date.toordinal(), # model saw date as integer-ish | |
| "pm10": pm10_history[-1], | |
| "no2": no2_history[-1], | |
| "temperature_2m_mean": weather["temperature_2m_mean"], | |
| "precipitation_sum": weather["precipitation_sum"], | |
| "wind_speed_10m_max": weather["wind_speed_10m_max"], | |
| "wind_direction_10m_dominant": weather["wind_direction_10m_dominant"], | |
| "pm25_lag1": pm25_history[-1], | |
| "pm25_lag2": pm25_history[-2], | |
| "pm25_lag3": pm25_history[-3], | |
| **sensor_flags | |
| } | |
| # Convert to DataFrame | |
| X_df = pd.DataFrame([X]) | |
| X_df = X_df[feature_order] | |
| pred = float(model.predict(X_df)[0]) | |
| preds.append({"date": target_date, "predicted_pm25": pred}) | |
| # update histories | |
| pm25_history.append(pred) | |
| pm10_history.append(pm10_history[-1]) # no future pm10 → hold last known | |
| no2_history.append(no2_history[-1]) | |
| if not preds: | |
| return None | |
| df_preds = pd.DataFrame(preds) | |
| tmp_path = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name | |
| sensor_label = SENSOR_LABELS[sensor_internal] | |
| plot_air_quality_forecast( | |
| CITY_NAME, | |
| sensor_label, | |
| df_preds, | |
| tmp_path, | |
| hindcast=False, | |
| ) | |
| return tmp_path | |
| # ------------------------- | |
| # HINDCAST LOGIC (lag-aware model) | |
| # ------------------------- | |
| def generate_hindcast(sensor_internal: str, days: int) -> str | None: | |
| print("DEBUG: Sensor internal =", sensor_internal) | |
| print("DEBUG: Canonical =", SENSOR_CANONICAL.get(sensor_internal, sensor_internal)) | |
| """ | |
| Generate hindcast PNG path for given sensor. | |
| Returns path to saved PNG or None if no data. | |
| """ | |
| resources = get_resources() | |
| model = resources["model"] | |
| weather_fg = resources["weather_fg"] | |
| project = resources["project"] | |
| today = datetime.utcnow().date() | |
| start_date = today - timedelta(days=days + 3) # extra for lags | |
| end_date = today | |
| # Weather history | |
| weather_df = weather_fg.read() | |
| weather_df["date"] = pd.to_datetime(weather_df["date"], unit="ms").dt.date | |
| weather_df = weather_df[ | |
| (weather_df["date"] >= start_date) & (weather_df["date"] <= end_date) | |
| ].sort_values("date") | |
| # PM2.5 history per sensor | |
| aq_fg = project.get_feature_store().get_feature_group("dundee_air_quality", version=1) | |
| aq_df = aq_fg.read() | |
| aq_df["date"] = pd.to_datetime(aq_df["date"], unit="ms").dt.date | |
| canonical = SENSOR_CANONICAL.get(sensor_internal, sensor_internal) | |
| aq_df = aq_df[aq_df["sensor"] == canonical].sort_values("date") | |
| pm25_history = list(aq_df["pm25"].values[-3:]) | |
| pm10_history = list(aq_df["pm10"].values[-3:]) | |
| no2_history = list(aq_df["no2"].values[-3:]) | |
| if aq_df.empty: | |
| return None | |
| # Merge weather + actual pm25 | |
| df = pd.merge(weather_df, aq_df[["date", "pm25", "pm10", "no2"]], on="date") | |
| if df.empty: | |
| return None | |
| df["pm25_lag1"] = df["pm25"].shift(1) | |
| df["pm25_lag2"] = df["pm25"].shift(2) | |
| df["pm25_lag3"] = df["pm25"].shift(3) | |
| df = df.dropna().tail(days) | |
| if df.empty: | |
| return None | |
| # Build sensor one-hot flags | |
| sensor_flags = sensor_one_hot(sensor_internal) | |
| df["sensor_broughty_ferry_road"] = sensor_flags["sensor_broughty_ferry_road"] | |
| df["sensor_lochee_road"] = sensor_flags["sensor_lochee_road"] | |
| df["sensor_mains_loan"] = sensor_flags["sensor_mains_loan"] | |
| df["sensor_meadowside"] = sensor_flags["sensor_meadowside"] | |
| df["sensor_seagate"] = sensor_flags["sensor_seagate"] | |
| df["sensor_whitehall_street"] = sensor_flags["sensor_whitehall_street"] | |
| # Convert date to ordinal (model expects numeric date) | |
| df["date_ordinal"] = df["date"].apply(lambda d: d.toordinal()) | |
| # Build X with correct order using the ordinal date | |
| X_df = df.assign(date=df["date_ordinal"])[feature_order] | |
| # Predict hindcast | |
| df["predicted_pm25"] = model.predict(X_df) | |
| df_hind = df # df still has real dates! | |
| tmp_path = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name | |
| sensor_label = SENSOR_LABELS[sensor_internal] | |
| plot_air_quality_forecast( | |
| CITY_NAME, | |
| sensor_label, | |
| df_hind, | |
| tmp_path, | |
| hindcast=True, | |
| ) | |
| return tmp_path | |
| # ------------------------- | |
| # GRADIO UI (MAX VIBES, SINGLE MODEL) | |
| # ------------------------- | |
| def run_dashboard(sensor_display: str, forecast_days: int, hindcast_days: int): | |
| try: | |
| sensor_internal = DISPLAY_TO_INTERNAL[sensor_display] | |
| except KeyError: | |
| return None, None, f"Unknown sensor: {sensor_display}" | |
| try: | |
| forecast_path = generate_forecast(sensor_internal, forecast_days) | |
| hindcast_path = generate_hindcast(sensor_internal, hindcast_days) | |
| except Exception as e: | |
| # Don't explode the UI; show error text and empty images | |
| return None, None, f"⚠️ Something went wrong: {str(e)}" | |
| if forecast_path is None and hindcast_path is None: | |
| return None, None, "No data available for this sensor/time range yet." | |
| summary_text = "" | |
| if forecast_path is not None: | |
| summary_text = ( | |
| f"✅ Forecast generated for **{sensor_display}** using the lag-aware Dundee PM2.5 model.\n\n" | |
| f"City: **{CITY_NAME}** \n" | |
| f"Last updated: **{datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}**" | |
| ) | |
| return forecast_path, hindcast_path, summary_text | |
| with gr.Blocks(theme="soft") as demo: | |
| gr.Markdown( | |
| """ | |
| # 🌤️ Dundee Air Quality Dashboard | |
| Lag-aware PM2.5 forecasts and hindcasts for Dundee’s air quality sensors. | |
| Select a **sensor**, set your horizons, and hit **Update**. | |
| """ | |
| ) | |
| with gr.Row(): | |
| sensor_dropdown = gr.Dropdown( | |
| choices=list(DISPLAY_TO_INTERNAL.keys()), | |
| value="Meadowside", | |
| label="Sensor", | |
| info="Choose which sensor in Dundee to analyze.", | |
| ) | |
| with gr.Row(): | |
| forecast_days_slider = gr.Slider( | |
| minimum=3, | |
| maximum=10, | |
| value=7, | |
| step=1, | |
| label="Forecast days (future)", | |
| ) | |
| hindcast_days_slider = gr.Slider( | |
| minimum=3, | |
| maximum=10, | |
| value=7, | |
| step=1, | |
| label="Hindcast days (past)", | |
| ) | |
| update_btn = gr.Button("🚀 Update dashboard", variant="primary") | |
| with gr.Row(): | |
| forecast_img = gr.Image(label="Forecast (PM2.5)", show_label=True) | |
| hindcast_img = gr.Image(label="Hindcast (PM2.5)", show_label=True) | |
| summary_box = gr.Markdown() | |
| update_btn.click( | |
| fn=run_dashboard, | |
| inputs=[sensor_dropdown, forecast_days_slider, hindcast_days_slider], | |
| outputs=[forecast_img, hindcast_img, summary_box], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |