MTeguri's picture
Update threshold value in GBDT forecast plot in app.py from 100 to 200 to reflect new compliance standards, enhancing the interpretability of forecast results.
d8d5467
"""
水質予測アプリケーション(Supabase × GBDT)
==========================================
このモジュールは、Supabaseデータベースから水質データを取得し、
Gradient Boosting Decision Tree (GBDT) を使用して水質指標の予測を行う
Gradioアプリケーションです。
主な機能:
- Supabaseのテーブル "Water_Quality_Forecast_Data" からデータを取得
- JSTの「今日」を基準にN日後のCODcr(S)sinを予測
- 時系列交差検証による最適なハイパーパラメータの選択
- 線形外挿による将来特徴量の推定
- 予測結果の可視化とレポート生成
推奨インストール:
pip install gradio pandas numpy scikit-learn matplotlib pillow python-dotenv supabase
環境変数:
SUPABASE_URL: SupabaseプロジェクトのURL
SUPABASE_KEY: SupabaseプロジェクトのAPIキー
Author: Water Quality Prediction Team
Version: 1.0.0
"""
# app_gbdt_supabase.py
# =========================================================
# Gradioアプリ:Supabaseのテーブル "Water_Quality_Forecast_Data" を読み込み
# JSTの「今日」を基準に N日後の CODcr(S)sin を GBDT で予測します。
#
# 推奨インストール:
# pip install gradio pandas numpy scikit-learn matplotlib pillow python-dotenv supabase
# =========================================================
from __future__ import annotations
import io
import os
import json
import traceback
from typing import Any, Dict, List, Tuple, Optional
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import TimeSeriesSplit, KFold, cross_val_score
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import HistGradientBoostingRegressor
from PIL import Image
import gradio as gr
from datetime import datetime
try:
from zoneinfo import ZoneInfo
JST = ZoneInfo("Asia/Tokyo")
except Exception:
JST = None
# ---------- Supabase ----------
import supabase
SUPABASE_URL: Optional[str] = os.environ.get("SUPABASE_URL")
SUPABASE_KEY: Optional[str] = os.environ.get("SUPABASE_KEY")
TABLE_NAME: str = "Water_Quality_Forecast_Data"
supabase_client = supabase.create_client(SUPABASE_URL, SUPABASE_KEY)
# ---------- 共通ユーティリティ ----------
def _now_jst_date_str() -> str:
"""
現在のJST日付を文字列として取得する。
Returns:
str: 現在のJST日付を "YYYY-MM-DD" 形式で返す。
タイムゾーン情報が利用できない場合は、
システムのローカル時刻を使用する。
"""
if JST:
return datetime.now(JST).strftime("%Y-%m-%d")
return datetime.now().strftime("%Y-%m-%d")
def _ensure_datetime(df: pd.DataFrame, time_col: str, time_format: Optional[str]) -> pd.DataFrame:
"""
指定された列を日時型に変換し、時系列データとして整備する。
Args:
df (pd.DataFrame): 処理対象のデータフレーム
time_col (str): 日時列として使用する列名
time_format (Optional[str]): 日時フォーマット文字列。
Noneまたは空文字列の場合は自動判定
Returns:
pd.DataFrame: 日時列が変換され、時系列順にソートされたデータフレーム
Raises:
KeyError: 指定された時間列が存在しない場合
ValueError: 時間列を日時に変換できない場合
"""
if time_col not in df.columns:
raise KeyError(f"時間列 '{time_col}' が見つかりません。列: {list(df.columns)}")
s = df[time_col].copy()
if time_format and len(time_format.strip()) > 0:
try:
df[time_col] = pd.to_datetime(s, format=time_format)
except Exception:
df[time_col] = pd.to_datetime(s, errors="coerce")
else:
df[time_col] = pd.to_datetime(s, errors="coerce")
if df[time_col].isna().all():
raise ValueError(f"時間列 '{time_col}' を日時に変換できませんでした。")
return df.sort_values(time_col).reset_index(drop=True)
def _linear_extrapolate_feature(dfw, time_col, feat_col, t0, forecast_time) -> Optional[float]:
"""
線形回帰を使用して特徴量の将来値を外挿する。
Args:
dfw (pd.DataFrame): 学習用のデータフレーム
time_col (str): 時間列の名前
feat_col (str): 外挿対象の特徴量列名
t0 (pd.Timestamp): 基準時刻(通常はデータの開始時刻)
forecast_time (pd.Timestamp): 予測対象の時刻
Returns:
Optional[float]: 外挿された特徴量の値。
データが不足している場合(3点未満)はNoneを返す。
"""
series = dfw[[time_col, feat_col]].dropna()
if len(series) < 3:
return None
x = ((series[time_col] - t0).dt.total_seconds() / 86400).values.reshape(-1, 1)
y = series[feat_col].astype(float).values
model = LinearRegression()
model.fit(x, y)
x_future = (forecast_time - t0).total_seconds() / 86400
return float(model.predict([[x_future]])[0])
def _select_features(df: pd.DataFrame, time_col: str, target_col: str) -> List[str]:
"""
機械学習の説明変数として使用する特徴量列を選択する。
Args:
df (pd.DataFrame): 対象のデータフレーム
time_col (str): 時間列の名前(除外対象)
target_col (str): ターゲット列の名前(除外対象)
Returns:
List[str]: 選択された特徴量列名のリスト
Raises:
ValueError: 有効な特徴量が見つからない場合
"""
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
features = [c for c in numeric_cols if c not in [target_col] and c != time_col]
if len(features) == 0:
raise ValueError("説明変数が見つかりません(時間列/ターゲット以外の数値列が必要)。")
return features
# ---------- GBDT: CV ----------
def _cv_select_gbdt(X: np.ndarray, y: np.ndarray) -> Tuple[Dict[str, Any], float]:
"""
交差検証を使用してHistGradientBoostingRegressorの最適なハイパーパラメータを選択する。
Args:
X (np.ndarray): 学習用特徴量データ
y (np.ndarray): 学習用ターゲットデータ
Returns:
Tuple[Dict[str, Any], float]: 最適なパラメータ辞書とCVスコアのタプル。
データが不足している場合はデフォルトパラメータとNaNを返す。
Note:
- サンプル数が8未満の場合はデフォルトパラメータを使用
- 時系列データの場合はTimeSeriesSplit、それ以外はKFoldを使用
- グリッドサーチでR2スコアを最大化するパラメータを探索
"""
if X.shape[0] < 8:
return (
dict(learning_rate=0.05, max_depth=3, max_leaf_nodes=31,
min_samples_leaf=5, l2_regularization=0.0, early_stopping=False),
np.nan
)
n_splits = 3 if X.shape[0] >= 12 else 2
try:
cv = TimeSeriesSplit(n_splits=n_splits)
except Exception:
cv = KFold(n_splits=n_splits, shuffle=False)
grid = [
dict(learning_rate=lr, max_depth=md, max_leaf_nodes=mln,
min_samples_leaf=msl, l2_regularization=l2, early_stopping=False)
for lr in [0.03, 0.05, 0.1]
for md in [3, 5, None]
for mln in [15, 31, 63]
for msl in [5, 10]
for l2 in [0.0, 0.1]
]
best_score, best_params = -np.inf, None
for params in grid:
model = HistGradientBoostingRegressor(**params, random_state=42)
pipe = Pipeline([("imputer", SimpleImputer(strategy="median")), ("gbdt", model)])
try:
scores = cross_val_score(pipe, X, y, cv=cv, scoring="r2")
score = float(np.nanmean(scores))
except Exception:
score = np.nan
if not np.isnan(score) and score > best_score:
best_score, best_params = score, params
if best_params is None:
best_params = dict(learning_rate=0.05, max_depth=3, max_leaf_nodes=31,
min_samples_leaf=5, l2_regularization=0.0, early_stopping=False)
return best_params, best_score
# ---------- GBDT 予測本体 ----------
def gbdt_forecast_codcr(df, time_col, target_col, datetime_str, window_days, forecast_days):
"""
GBDTを使用して水質指標の将来値を予測する。
Args:
df (pd.DataFrame): 学習用のデータフレーム
time_col (str): 時間列の名前
target_col (str): 予測対象のターゲット列名
datetime_str (str): 基準日時(学習期間の終了日時)
window_days (int): 学習に使用する過去の日数
forecast_days (int): 予測先の日数
Returns:
Tuple[pd.DataFrame, str, str, io.BytesIO, str]:
- 予測結果のデータフレーム
- サマリーメッセージ
- 特徴量情報のJSON文字列
- 可視化画像のBytesIOオブジェクト
- グラフ座標データのJSON文字列
Note:
- 交差検証で最適なハイパーパラメータを自動選択
- 将来特徴量は線形外挿で推定(失敗時は直近値または中央値で代用)
- 予測結果とグラフを生成
"""
if time_col not in df.columns:
return pd.DataFrame(), f"⚠ '{time_col}' が見つかりません", "[]", None, "[]"
if target_col not in df.columns:
return pd.DataFrame(), f"⚠ ターゲット列 '{target_col}' が見つかりません", "[]", None, "[]"
target_time = pd.to_datetime(datetime_str)
start_time = target_time - pd.Timedelta(days=window_days)
forecast_time = target_time + pd.Timedelta(days=forecast_days)
dfw = df[(df[time_col] >= start_time) & (df[time_col] <= target_time)].copy()
if dfw.empty:
return pd.DataFrame(), f"⚠ データなし({start_time:%Y-%m-%d}{target_time:%Y-%m-%d})", "[]", None, "[]"
features = _select_features(dfw, time_col, target_col)
X, y = dfw[features].astype(float), dfw[target_col].astype(float).values
best_params, cv_r2 = _cv_select_gbdt(X.values, y)
model = HistGradientBoostingRegressor(**best_params, random_state=42)
pipe = Pipeline([("imputer", SimpleImputer(strategy="median")), ("gbdt", model)])
pipe.fit(X.values, y)
t0 = dfw[time_col].min()
x_future_vals, feat_info = [], []
for col in features:
val = _linear_extrapolate_feature(dfw, time_col, col, t0, forecast_time)
if val is None:
last_val = dfw[col].dropna()
if len(last_val) > 0:
val, note = float(last_val.iloc[-1]), "直近値で代用"
else:
med = float(df[col].dropna().median()) if df[col].notna().any() else 0.0
val, note = med, "中央値で代用"
else:
note = ""
x_future_vals.append(val)
feat_info.append({"Feature": col, "Extrapolated": val, "Note": note})
y_future = float(pipe.predict(np.array(x_future_vals).reshape(1, -1))[0])
y_hat_in = pipe.predict(X.values).reshape(-1)
result_df = pd.DataFrame([{
"ItemName": target_col,
"予測日": forecast_time.strftime("%Y-%m-%d"),
"学習サンプル数": int(len(dfw)),
"特徴量数": int(len(features)),
"GBDT_params": best_params,
"CV_R2(参考)": (None if np.isnan(cv_r2) else round(float(cv_r2), 4)),
"現在値": (None if dfw[target_col].dropna().empty else round(float(dfw[target_col].dropna().iloc[-1]), 6)),
"予測値": round(y_future, 6),
"メモ": ""
}])
extras_json = json.dumps(feat_info, ensure_ascii=False)
fig, ax = plt.subplots(figsize=(7.2, 4.5))
ax.plot(dfw[time_col], dfw[target_col], marker="o", linestyle="-", label="Real data")
ax.plot(dfw[time_col], y_hat_in, linestyle="--", label="GBDT Compliance")
ax.scatter([forecast_time], [y_future], marker="*", s=160, label=f"{forecast_days}day Forecast")
ax.set_title(f"{target_col} GBDT Prediction (Record date:{target_time:%Y-%m-%d})")
# 閾値ラインを追加(y=100)
threshold = 200
ax.axhline(y=threshold, color="red", linestyle=":", linewidth=1.5, alpha=0.8, label=f"Threshold={threshold}")
ax.grid(True, alpha=0.3)
ax.legend()
buf = io.BytesIO()
plt.tight_layout()
fig.savefig(buf, format="png", dpi=150)
plt.close(fig)
buf.seek(0)
# グラフの座標データを準備
coordinate_data = {
"real_data": [
{"x": time.strftime("%Y-%m-%d %H:%M:%S"), "y": float(value)}
for time, value in zip(dfw[time_col], dfw[target_col])
],
"gbdt_fitted": [
{"x": time.strftime("%Y-%m-%d %H:%M:%S"), "y": float(value)}
for time, value in zip(dfw[time_col], y_hat_in)
],
"forecast_point": {
"x": forecast_time.strftime("%Y-%m-%d %H:%M:%S"),
"y": float(y_future)
},
"time_range": {
"start": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"end": target_time.strftime("%Y-%m-%d %H:%M:%S"),
"forecast": forecast_time.strftime("%Y-%m-%d %H:%M:%S")
}
}
coordinate_json = json.dumps(coordinate_data, ensure_ascii=False)
summary = (
f"✅ GBDT予測完了: {forecast_days}日後 = **{round(y_future, 6)}**\n\n"
f"- 学習期間: {start_time:%Y-%m-%d}{target_time:%Y-%m-%d}\n"
f"- 特徴量数: {len(features)} / CV_R2(参考): {None if np.isnan(cv_r2) else round(float(cv_r2), 4)}\n"
f"- 使用モデル: HistGradientBoostingRegressor\n"
f"- ベストパラメータ: {best_params}"
)
return result_df, summary, extras_json, buf, coordinate_json
def _format_exception_for_ui(e: Exception) -> str:
"""
例外をGradio UI用のフォーマットされた文字列に変換する。
Args:
e (Exception): フォーマット対象の例外オブジェクト
Returns:
str: UI表示用のフォーマットされたエラーメッセージ
"""
return f"❌ エラー: {type(e).__name__}\n\n{e}\n\n```\n{traceback.format_exc(limit=5)}\n```"
# ---------- Gradio コールバック ----------
def predict_from_supabase_gbdt(time_col, time_format, target_col, window_days, forecast_days):
"""
Supabaseからデータを取得してGBDT予測を実行するGradioコールバック関数。
Args:
time_col (str): 時間列の名前
time_format (str): 時間フォーマット文字列(空文字列の場合は自動判定)
target_col (str): 予測対象のターゲット列名
window_days (int): 学習に使用する過去の日数
forecast_days (int): 予測先の日数
Returns:
Tuple[pd.DataFrame, str, PIL.Image, str, str]:
- 予測結果のデータフレーム
- サマリーメッセージまたはエラーメッセージ
- 可視化画像(エラー時はNone)
- 特徴量情報のJSON文字列
- グラフ座標データのJSON文字列
Note:
- Supabaseのテーブル "Water_Quality_Forecast_Data" から全データを取得
- 現在のJST日付を基準日として使用
- エラーが発生した場合は適切なエラーメッセージを返す
"""
try:
resp = supabase_client.table(TABLE_NAME).select("*").execute()
data = resp.data if hasattr(resp, "data") else resp.get("data")
df = pd.DataFrame(data)
fmt = (time_format or "").strip() or None
df = _ensure_datetime(df, time_col, fmt)
today = _now_jst_date_str()
result_df, summary, extras_json, plot_buf, coordinate_json = gbdt_forecast_codcr(
df, time_col, target_col, today, window_days, forecast_days
)
img = Image.open(plot_buf) if plot_buf else None
return result_df, summary, img, extras_json, coordinate_json
except Exception as e:
return pd.DataFrame(), _format_exception_for_ui(e), None, "[]", "[]"
# ---------- Gradio UI ----------
with gr.Blocks(title="N日後予測(Supabase / CODcr(S)sin / GBDT)") as demo:
gr.Markdown(
"""
# 水質予測(Supabase × GBDT)
- Supabase のテーブル **`Water_Quality_Forecast_Data`** からデータを取得し、
**JSTの今日**を基準に **N日後**の **CODcr(S)sin** を **GBDT** で予測します
"""
)
with gr.Row():
time_col_in = gr.Textbox(value="Time", label="時間列名")
time_fmt_in = gr.Textbox(value="", label="時間フォーマット(空欄=自動判定)")
with gr.Row():
target_col_in = gr.Textbox(value="CODcr(S)sin", label="ターゲット列名")
window_days_in = gr.Slider(7, 180, value=60, step=1, label="学習日数")
forecast_days_in = gr.Slider(1, 30, value=1, step=1, label="予測先日数")
run_btn = gr.Button("Supabaseから予測", variant="primary")
df_out = gr.DataFrame(label="予測結果")
msg_out = gr.Markdown(label="サマリー / エラー表示")
plot_out = gr.Image(label="可視化")
extras_out = gr.JSON(label="将来特徴量")
coordinates_out = gr.JSON(label="グラフ座標データ")
run_btn.click(
fn=predict_from_supabase_gbdt,
inputs=[time_col_in, time_fmt_in, target_col_in, window_days_in, forecast_days_in],
outputs=[df_out, msg_out, plot_out, extras_out, coordinates_out]
)
if __name__ == "__main__":
demo.launch(mcp_server=True)