Spaces:
Runtime error
Runtime error
| """ | |
| 水質予測アプリケーション(Supabase × GBDT) | |
| ========================================== | |
| このモジュールは、Supabaseデータベースから水質データを取得し、 | |
| Gradient Boosting Decision Tree (GBDT) を使用して水質指標の予測を行う | |
| Gradioアプリケーションです。 | |
| 主な機能: | |
| - Supabaseのテーブル "Water_Quality_Forecast_Data" からデータを取得 | |
| - JSTの「今日」を基準にN日後のCODcr(S)sinを予測 | |
| - 時系列交差検証による最適なハイパーパラメータの選択 | |
| - 線形外挿による将来特徴量の推定 | |
| - 予測結果の可視化とレポート生成 | |
| 推奨インストール: | |
| pip install gradio pandas numpy scikit-learn matplotlib pillow python-dotenv supabase | |
| 環境変数: | |
| SUPABASE_URL: SupabaseプロジェクトのURL | |
| SUPABASE_KEY: SupabaseプロジェクトのAPIキー | |
| Author: Water Quality Prediction Team | |
| Version: 1.0.0 | |
| """ | |
| # app_gbdt_supabase.py | |
| # ========================================================= | |
| # Gradioアプリ:Supabaseのテーブル "Water_Quality_Forecast_Data" を読み込み | |
| # JSTの「今日」を基準に N日後の CODcr(S)sin を GBDT で予測します。 | |
| # | |
| # 推奨インストール: | |
| # pip install gradio pandas numpy scikit-learn matplotlib pillow python-dotenv supabase | |
| # ========================================================= | |
| from __future__ import annotations | |
| import io | |
| import os | |
| import json | |
| import traceback | |
| from typing import Any, Dict, List, Tuple, Optional | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.model_selection import TimeSeriesSplit, KFold, cross_val_score | |
| from sklearn.linear_model import LinearRegression | |
| from sklearn.ensemble import HistGradientBoostingRegressor | |
| from PIL import Image | |
| import gradio as gr | |
| from datetime import datetime | |
| try: | |
| from zoneinfo import ZoneInfo | |
| JST = ZoneInfo("Asia/Tokyo") | |
| except Exception: | |
| JST = None | |
| # ---------- Supabase ---------- | |
| import supabase | |
| SUPABASE_URL: Optional[str] = os.environ.get("SUPABASE_URL") | |
| SUPABASE_KEY: Optional[str] = os.environ.get("SUPABASE_KEY") | |
| TABLE_NAME: str = "Water_Quality_Forecast_Data" | |
| supabase_client = supabase.create_client(SUPABASE_URL, SUPABASE_KEY) | |
| # ---------- 共通ユーティリティ ---------- | |
| def _now_jst_date_str() -> str: | |
| """ | |
| 現在のJST日付を文字列として取得する。 | |
| Returns: | |
| str: 現在のJST日付を "YYYY-MM-DD" 形式で返す。 | |
| タイムゾーン情報が利用できない場合は、 | |
| システムのローカル時刻を使用する。 | |
| """ | |
| if JST: | |
| return datetime.now(JST).strftime("%Y-%m-%d") | |
| return datetime.now().strftime("%Y-%m-%d") | |
| def _ensure_datetime(df: pd.DataFrame, time_col: str, time_format: Optional[str]) -> pd.DataFrame: | |
| """ | |
| 指定された列を日時型に変換し、時系列データとして整備する。 | |
| Args: | |
| df (pd.DataFrame): 処理対象のデータフレーム | |
| time_col (str): 日時列として使用する列名 | |
| time_format (Optional[str]): 日時フォーマット文字列。 | |
| Noneまたは空文字列の場合は自動判定 | |
| Returns: | |
| pd.DataFrame: 日時列が変換され、時系列順にソートされたデータフレーム | |
| Raises: | |
| KeyError: 指定された時間列が存在しない場合 | |
| ValueError: 時間列を日時に変換できない場合 | |
| """ | |
| if time_col not in df.columns: | |
| raise KeyError(f"時間列 '{time_col}' が見つかりません。列: {list(df.columns)}") | |
| s = df[time_col].copy() | |
| if time_format and len(time_format.strip()) > 0: | |
| try: | |
| df[time_col] = pd.to_datetime(s, format=time_format) | |
| except Exception: | |
| df[time_col] = pd.to_datetime(s, errors="coerce") | |
| else: | |
| df[time_col] = pd.to_datetime(s, errors="coerce") | |
| if df[time_col].isna().all(): | |
| raise ValueError(f"時間列 '{time_col}' を日時に変換できませんでした。") | |
| return df.sort_values(time_col).reset_index(drop=True) | |
| def _linear_extrapolate_feature(dfw, time_col, feat_col, t0, forecast_time) -> Optional[float]: | |
| """ | |
| 線形回帰を使用して特徴量の将来値を外挿する。 | |
| Args: | |
| dfw (pd.DataFrame): 学習用のデータフレーム | |
| time_col (str): 時間列の名前 | |
| feat_col (str): 外挿対象の特徴量列名 | |
| t0 (pd.Timestamp): 基準時刻(通常はデータの開始時刻) | |
| forecast_time (pd.Timestamp): 予測対象の時刻 | |
| Returns: | |
| Optional[float]: 外挿された特徴量の値。 | |
| データが不足している場合(3点未満)はNoneを返す。 | |
| """ | |
| series = dfw[[time_col, feat_col]].dropna() | |
| if len(series) < 3: | |
| return None | |
| x = ((series[time_col] - t0).dt.total_seconds() / 86400).values.reshape(-1, 1) | |
| y = series[feat_col].astype(float).values | |
| model = LinearRegression() | |
| model.fit(x, y) | |
| x_future = (forecast_time - t0).total_seconds() / 86400 | |
| return float(model.predict([[x_future]])[0]) | |
| def _select_features(df: pd.DataFrame, time_col: str, target_col: str) -> List[str]: | |
| """ | |
| 機械学習の説明変数として使用する特徴量列を選択する。 | |
| Args: | |
| df (pd.DataFrame): 対象のデータフレーム | |
| time_col (str): 時間列の名前(除外対象) | |
| target_col (str): ターゲット列の名前(除外対象) | |
| Returns: | |
| List[str]: 選択された特徴量列名のリスト | |
| Raises: | |
| ValueError: 有効な特徴量が見つからない場合 | |
| """ | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| features = [c for c in numeric_cols if c not in [target_col] and c != time_col] | |
| if len(features) == 0: | |
| raise ValueError("説明変数が見つかりません(時間列/ターゲット以外の数値列が必要)。") | |
| return features | |
| # ---------- GBDT: CV ---------- | |
| def _cv_select_gbdt(X: np.ndarray, y: np.ndarray) -> Tuple[Dict[str, Any], float]: | |
| """ | |
| 交差検証を使用してHistGradientBoostingRegressorの最適なハイパーパラメータを選択する。 | |
| Args: | |
| X (np.ndarray): 学習用特徴量データ | |
| y (np.ndarray): 学習用ターゲットデータ | |
| Returns: | |
| Tuple[Dict[str, Any], float]: 最適なパラメータ辞書とCVスコアのタプル。 | |
| データが不足している場合はデフォルトパラメータとNaNを返す。 | |
| Note: | |
| - サンプル数が8未満の場合はデフォルトパラメータを使用 | |
| - 時系列データの場合はTimeSeriesSplit、それ以外はKFoldを使用 | |
| - グリッドサーチでR2スコアを最大化するパラメータを探索 | |
| """ | |
| if X.shape[0] < 8: | |
| return ( | |
| dict(learning_rate=0.05, max_depth=3, max_leaf_nodes=31, | |
| min_samples_leaf=5, l2_regularization=0.0, early_stopping=False), | |
| np.nan | |
| ) | |
| n_splits = 3 if X.shape[0] >= 12 else 2 | |
| try: | |
| cv = TimeSeriesSplit(n_splits=n_splits) | |
| except Exception: | |
| cv = KFold(n_splits=n_splits, shuffle=False) | |
| grid = [ | |
| dict(learning_rate=lr, max_depth=md, max_leaf_nodes=mln, | |
| min_samples_leaf=msl, l2_regularization=l2, early_stopping=False) | |
| for lr in [0.03, 0.05, 0.1] | |
| for md in [3, 5, None] | |
| for mln in [15, 31, 63] | |
| for msl in [5, 10] | |
| for l2 in [0.0, 0.1] | |
| ] | |
| best_score, best_params = -np.inf, None | |
| for params in grid: | |
| model = HistGradientBoostingRegressor(**params, random_state=42) | |
| pipe = Pipeline([("imputer", SimpleImputer(strategy="median")), ("gbdt", model)]) | |
| try: | |
| scores = cross_val_score(pipe, X, y, cv=cv, scoring="r2") | |
| score = float(np.nanmean(scores)) | |
| except Exception: | |
| score = np.nan | |
| if not np.isnan(score) and score > best_score: | |
| best_score, best_params = score, params | |
| if best_params is None: | |
| best_params = dict(learning_rate=0.05, max_depth=3, max_leaf_nodes=31, | |
| min_samples_leaf=5, l2_regularization=0.0, early_stopping=False) | |
| return best_params, best_score | |
| # ---------- GBDT 予測本体 ---------- | |
| def gbdt_forecast_codcr(df, time_col, target_col, datetime_str, window_days, forecast_days): | |
| """ | |
| GBDTを使用して水質指標の将来値を予測する。 | |
| Args: | |
| df (pd.DataFrame): 学習用のデータフレーム | |
| time_col (str): 時間列の名前 | |
| target_col (str): 予測対象のターゲット列名 | |
| datetime_str (str): 基準日時(学習期間の終了日時) | |
| window_days (int): 学習に使用する過去の日数 | |
| forecast_days (int): 予測先の日数 | |
| Returns: | |
| Tuple[pd.DataFrame, str, str, io.BytesIO, str]: | |
| - 予測結果のデータフレーム | |
| - サマリーメッセージ | |
| - 特徴量情報のJSON文字列 | |
| - 可視化画像のBytesIOオブジェクト | |
| - グラフ座標データのJSON文字列 | |
| Note: | |
| - 交差検証で最適なハイパーパラメータを自動選択 | |
| - 将来特徴量は線形外挿で推定(失敗時は直近値または中央値で代用) | |
| - 予測結果とグラフを生成 | |
| """ | |
| if time_col not in df.columns: | |
| return pd.DataFrame(), f"⚠ '{time_col}' が見つかりません", "[]", None, "[]" | |
| if target_col not in df.columns: | |
| return pd.DataFrame(), f"⚠ ターゲット列 '{target_col}' が見つかりません", "[]", None, "[]" | |
| target_time = pd.to_datetime(datetime_str) | |
| start_time = target_time - pd.Timedelta(days=window_days) | |
| forecast_time = target_time + pd.Timedelta(days=forecast_days) | |
| dfw = df[(df[time_col] >= start_time) & (df[time_col] <= target_time)].copy() | |
| if dfw.empty: | |
| return pd.DataFrame(), f"⚠ データなし({start_time:%Y-%m-%d}~{target_time:%Y-%m-%d})", "[]", None, "[]" | |
| features = _select_features(dfw, time_col, target_col) | |
| X, y = dfw[features].astype(float), dfw[target_col].astype(float).values | |
| best_params, cv_r2 = _cv_select_gbdt(X.values, y) | |
| model = HistGradientBoostingRegressor(**best_params, random_state=42) | |
| pipe = Pipeline([("imputer", SimpleImputer(strategy="median")), ("gbdt", model)]) | |
| pipe.fit(X.values, y) | |
| t0 = dfw[time_col].min() | |
| x_future_vals, feat_info = [], [] | |
| for col in features: | |
| val = _linear_extrapolate_feature(dfw, time_col, col, t0, forecast_time) | |
| if val is None: | |
| last_val = dfw[col].dropna() | |
| if len(last_val) > 0: | |
| val, note = float(last_val.iloc[-1]), "直近値で代用" | |
| else: | |
| med = float(df[col].dropna().median()) if df[col].notna().any() else 0.0 | |
| val, note = med, "中央値で代用" | |
| else: | |
| note = "" | |
| x_future_vals.append(val) | |
| feat_info.append({"Feature": col, "Extrapolated": val, "Note": note}) | |
| y_future = float(pipe.predict(np.array(x_future_vals).reshape(1, -1))[0]) | |
| y_hat_in = pipe.predict(X.values).reshape(-1) | |
| result_df = pd.DataFrame([{ | |
| "ItemName": target_col, | |
| "予測日": forecast_time.strftime("%Y-%m-%d"), | |
| "学習サンプル数": int(len(dfw)), | |
| "特徴量数": int(len(features)), | |
| "GBDT_params": best_params, | |
| "CV_R2(参考)": (None if np.isnan(cv_r2) else round(float(cv_r2), 4)), | |
| "現在値": (None if dfw[target_col].dropna().empty else round(float(dfw[target_col].dropna().iloc[-1]), 6)), | |
| "予測値": round(y_future, 6), | |
| "メモ": "" | |
| }]) | |
| extras_json = json.dumps(feat_info, ensure_ascii=False) | |
| fig, ax = plt.subplots(figsize=(7.2, 4.5)) | |
| ax.plot(dfw[time_col], dfw[target_col], marker="o", linestyle="-", label="Real data") | |
| ax.plot(dfw[time_col], y_hat_in, linestyle="--", label="GBDT Compliance") | |
| ax.scatter([forecast_time], [y_future], marker="*", s=160, label=f"{forecast_days}day Forecast") | |
| ax.set_title(f"{target_col} GBDT Prediction (Record date:{target_time:%Y-%m-%d})") | |
| # 閾値ラインを追加(y=100) | |
| threshold = 200 | |
| ax.axhline(y=threshold, color="red", linestyle=":", linewidth=1.5, alpha=0.8, label=f"Threshold={threshold}") | |
| ax.grid(True, alpha=0.3) | |
| ax.legend() | |
| buf = io.BytesIO() | |
| plt.tight_layout() | |
| fig.savefig(buf, format="png", dpi=150) | |
| plt.close(fig) | |
| buf.seek(0) | |
| # グラフの座標データを準備 | |
| coordinate_data = { | |
| "real_data": [ | |
| {"x": time.strftime("%Y-%m-%d %H:%M:%S"), "y": float(value)} | |
| for time, value in zip(dfw[time_col], dfw[target_col]) | |
| ], | |
| "gbdt_fitted": [ | |
| {"x": time.strftime("%Y-%m-%d %H:%M:%S"), "y": float(value)} | |
| for time, value in zip(dfw[time_col], y_hat_in) | |
| ], | |
| "forecast_point": { | |
| "x": forecast_time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "y": float(y_future) | |
| }, | |
| "time_range": { | |
| "start": start_time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "end": target_time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "forecast": forecast_time.strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| } | |
| coordinate_json = json.dumps(coordinate_data, ensure_ascii=False) | |
| summary = ( | |
| f"✅ GBDT予測完了: {forecast_days}日後 = **{round(y_future, 6)}**\n\n" | |
| f"- 学習期間: {start_time:%Y-%m-%d} ~ {target_time:%Y-%m-%d}\n" | |
| f"- 特徴量数: {len(features)} / CV_R2(参考): {None if np.isnan(cv_r2) else round(float(cv_r2), 4)}\n" | |
| f"- 使用モデル: HistGradientBoostingRegressor\n" | |
| f"- ベストパラメータ: {best_params}" | |
| ) | |
| return result_df, summary, extras_json, buf, coordinate_json | |
| def _format_exception_for_ui(e: Exception) -> str: | |
| """ | |
| 例外をGradio UI用のフォーマットされた文字列に変換する。 | |
| Args: | |
| e (Exception): フォーマット対象の例外オブジェクト | |
| Returns: | |
| str: UI表示用のフォーマットされたエラーメッセージ | |
| """ | |
| return f"❌ エラー: {type(e).__name__}\n\n{e}\n\n```\n{traceback.format_exc(limit=5)}\n```" | |
| # ---------- Gradio コールバック ---------- | |
| def predict_from_supabase_gbdt(time_col, time_format, target_col, window_days, forecast_days): | |
| """ | |
| Supabaseからデータを取得してGBDT予測を実行するGradioコールバック関数。 | |
| Args: | |
| time_col (str): 時間列の名前 | |
| time_format (str): 時間フォーマット文字列(空文字列の場合は自動判定) | |
| target_col (str): 予測対象のターゲット列名 | |
| window_days (int): 学習に使用する過去の日数 | |
| forecast_days (int): 予測先の日数 | |
| Returns: | |
| Tuple[pd.DataFrame, str, PIL.Image, str, str]: | |
| - 予測結果のデータフレーム | |
| - サマリーメッセージまたはエラーメッセージ | |
| - 可視化画像(エラー時はNone) | |
| - 特徴量情報のJSON文字列 | |
| - グラフ座標データのJSON文字列 | |
| Note: | |
| - Supabaseのテーブル "Water_Quality_Forecast_Data" から全データを取得 | |
| - 現在のJST日付を基準日として使用 | |
| - エラーが発生した場合は適切なエラーメッセージを返す | |
| """ | |
| try: | |
| resp = supabase_client.table(TABLE_NAME).select("*").execute() | |
| data = resp.data if hasattr(resp, "data") else resp.get("data") | |
| df = pd.DataFrame(data) | |
| fmt = (time_format or "").strip() or None | |
| df = _ensure_datetime(df, time_col, fmt) | |
| today = _now_jst_date_str() | |
| result_df, summary, extras_json, plot_buf, coordinate_json = gbdt_forecast_codcr( | |
| df, time_col, target_col, today, window_days, forecast_days | |
| ) | |
| img = Image.open(plot_buf) if plot_buf else None | |
| return result_df, summary, img, extras_json, coordinate_json | |
| except Exception as e: | |
| return pd.DataFrame(), _format_exception_for_ui(e), None, "[]", "[]" | |
| # ---------- Gradio UI ---------- | |
| with gr.Blocks(title="N日後予測(Supabase / CODcr(S)sin / GBDT)") as demo: | |
| gr.Markdown( | |
| """ | |
| # 水質予測(Supabase × GBDT) | |
| - Supabase のテーブル **`Water_Quality_Forecast_Data`** からデータを取得し、 | |
| **JSTの今日**を基準に **N日後**の **CODcr(S)sin** を **GBDT** で予測します | |
| """ | |
| ) | |
| with gr.Row(): | |
| time_col_in = gr.Textbox(value="Time", label="時間列名") | |
| time_fmt_in = gr.Textbox(value="", label="時間フォーマット(空欄=自動判定)") | |
| with gr.Row(): | |
| target_col_in = gr.Textbox(value="CODcr(S)sin", label="ターゲット列名") | |
| window_days_in = gr.Slider(7, 180, value=60, step=1, label="学習日数") | |
| forecast_days_in = gr.Slider(1, 30, value=1, step=1, label="予測先日数") | |
| run_btn = gr.Button("Supabaseから予測", variant="primary") | |
| df_out = gr.DataFrame(label="予測結果") | |
| msg_out = gr.Markdown(label="サマリー / エラー表示") | |
| plot_out = gr.Image(label="可視化") | |
| extras_out = gr.JSON(label="将来特徴量") | |
| coordinates_out = gr.JSON(label="グラフ座標データ") | |
| run_btn.click( | |
| fn=predict_from_supabase_gbdt, | |
| inputs=[time_col_in, time_fmt_in, target_col_in, window_days_in, forecast_days_in], | |
| outputs=[df_out, msg_out, plot_out, extras_out, coordinates_out] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |