| | |
| | |
| | |
| |
|
| | import io |
| | import os |
| | from typing import Any, Dict, List, Optional, Sequence, Tuple, Union |
| |
|
| | import pandas as pd |
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| | from scipy.stats import ttest_ind, pointbiserialr |
| | from sklearn.linear_model import LogisticRegression |
| | from sklearn.impute import SimpleImputer |
| | from sklearn.preprocessing import StandardScaler |
| | from sklearn.pipeline import Pipeline |
| | import gradio as gr |
| | from PIL import Image |
| | from dotenv import load_dotenv |
| |
|
| | |
| | try: |
| | from supabase import create_client |
| | except Exception: |
| | |
| | create_client = None |
| | import supabase as supabase_v1 |
| |
|
| | plt.switch_backend("Agg") |
| |
|
| | |
| | import matplotlib |
| | matplotlib.rcParams['font.family'] = ['DejaVu Sans', 'Hiragino Sans', 'Yu Gothic', 'Meiryo', |
| | 'Takao', 'IPAexGothic', 'IPAPGothic', 'VL PGothic', 'Noto Sans CJK JP'] |
| |
|
| | |
| | load_dotenv() |
| | SUPABASE_URL: Optional[str] = os.environ.get("SUPABASE_URL") |
| | SUPABASE_KEY: Optional[str] = os.environ.get("SUPABASE_KEY") |
| | TABLE_NAME: str = "estimated_cause_mocdata" |
| |
|
| |
|
| | def _get_supabase_client() -> Any: |
| | """ |
| | Supabase クライアントを生成して返す(v2 を優先、なければ v1 にフォールバック)。 |
| | |
| | 環境変数: |
| | SUPABASE_URL (str): Supabase の URL |
| | SUPABASE_KEY (str): Supabase の API キー |
| | |
| | Raises: |
| | RuntimeError: 必要な環境変数が未設定、またはクライアント生成に失敗した場合。 |
| | |
| | Returns: |
| | Any: Supabase クライアントオブジェクト(v2 または v1)。 |
| | """ |
| | if not SUPABASE_URL or not SUPABASE_KEY: |
| | raise RuntimeError("環境変数 SUPABASE_URL または SUPABASE_KEY が設定されていません。") |
| | if create_client is not None: |
| | return create_client(SUPABASE_URL, SUPABASE_KEY) |
| | |
| | return supabase_v1.create_client(SUPABASE_URL, SUPABASE_KEY) |
| |
|
| |
|
| | def _fetch_supabase_df() -> pd.DataFrame: |
| | """ |
| | Supabase の指定テーブルから全件取得し、pandas DataFrame に変換して返す。 |
| | |
| | テーブル: |
| | TABLE_NAME: 既定では 'estimated_cause_mocdata' |
| | |
| | Raises: |
| | RuntimeError: 通信エラー、期待した形でデータが得られない、またはレコードが空の場合。 |
| | |
| | Returns: |
| | pd.DataFrame: 取得したレコードの DataFrame。 |
| | """ |
| | client = _get_supabase_client() |
| | |
| | try: |
| | resp: Any = client.table(TABLE_NAME).select("*").execute() |
| | data: Optional[List[Dict[str, Any]]] = getattr(resp, "data", None) if hasattr(resp, "data") else None |
| | if data is None: |
| | |
| | if isinstance(resp, dict) and "data" in resp: |
| | data = resp["data"] |
| | if not data: |
| | raise RuntimeError(f"Supabase テーブル '{TABLE_NAME}' からデータを取得できませんでした。") |
| | df = pd.DataFrame(data) |
| | if df.empty: |
| | raise RuntimeError(f"Supabase テーブル '{TABLE_NAME}' にレコードがありません。") |
| | return df |
| | except Exception as e: |
| | raise RuntimeError(f"Supabase 取得エラー: {e}") from e |
| |
|
| |
|
| | def _boxplot_image( |
| | a: Union[pd.Series, Sequence[float], np.ndarray], |
| | b: Union[pd.Series, Sequence[float], np.ndarray], |
| | feature_name: str |
| | ) -> np.ndarray: |
| | """ |
| | 2群(正常/悪化)の値から箱ひげ図を描画し、画像(ndarray)を返す。 |
| | |
| | Args: |
| | a: 正常(0) 群の値(Series, list, ndarray など数値配列) |
| | b: 悪化(1) 群の値(Series, list, ndarray など数値配列) |
| | feature_name (str): グラフタイトルや y 軸ラベルに用いる特徴量名 |
| | |
| | Returns: |
| | np.ndarray: 生成した箱ひげ図の画像配列(RGB)。 |
| | """ |
| | fig = plt.figure() |
| | plt.boxplot([a, b], labels=["正常(0)", "悪化(1)"]) |
| | plt.title(f"Boxplot: {feature_name}") |
| | plt.ylabel(feature_name) |
| | buf = io.BytesIO() |
| | fig.savefig(buf, format="png", bbox_inches="tight") |
| | plt.close(fig) |
| | buf.seek(0) |
| | img = Image.open(buf) |
| | return np.array(img) |
| |
|
| |
|
| | |
| | AnalysisResult = Tuple[ |
| | str, |
| | Optional[pd.DataFrame], |
| | Optional[pd.DataFrame], |
| | Optional[pd.DataFrame], |
| | Optional[pd.DataFrame], |
| | List[Tuple[np.ndarray, str]], |
| | Optional[pd.DataFrame], |
| | ] |
| |
|
| |
|
| | def analyze_from_supabase(threshold: Union[int, float], top_k: Union[int, float]) -> AnalysisResult: |
| | """ |
| | Supabase から水質データを取得し、閾値によるラベリングを行った上で |
| | 相関(point-biserial)、t検定、箱ひげ図、ロジスティック回帰による特徴量重要度を算出する。 |
| | |
| | Args: |
| | threshold (int | float): 目的変数列 'CODcr(S)sin' を悪化(1) と判定する閾値。 |
| | top_k (int | float): ロジスティック回帰の係数上位として表示する特徴量数。 |
| | |
| | Returns: |
| | AnalysisResult: 以下の7要素タプル |
| | - status_md (str): 解析の要約(マークダウン) |
| | - head_df (pd.DataFrame | None): 先頭行のプレビュー |
| | - label_counts (pd.DataFrame | None): 目的変数の分布 |
| | - corr_df (pd.DataFrame | None): point-biserial 相関表 |
| | - ttest_df (pd.DataFrame | None): t検定の結果表 |
| | - gallery_imgs (List[Tuple[np.ndarray, str]]): 箱ひげ図(画像配列, タイトル)のリスト |
| | - coef_df (pd.DataFrame | None): ロジスティック回帰の係数ランキング |
| | """ |
| | |
| | try: |
| | df = _fetch_supabase_df() |
| | except Exception as e: |
| | msg = ( |
| | f"❌ データ取得に失敗:{e}\n" |
| | f"- .env に SUPABASE_URL / SUPABASE_KEY を設定してください\n" |
| | f"- テーブル名: {TABLE_NAME}" |
| | ) |
| | return (msg, None, None, None, None, [], None) |
| |
|
| | status_md: str = f"**テーブル:** `{TABLE_NAME}`\n\n**データ形状:** {df.shape[0]} 行 × {df.shape[1]} 列\n\n" |
| | head_df: pd.DataFrame = df.head() |
| |
|
| | |
| | target_col = "CODcr(S)sin" |
| | if target_col not in df.columns: |
| | return ( |
| | f"❌ 必須列 '{target_col}' が見つかりません。現在の列: {list(df.columns)}", |
| | None, None, None, None, [], None |
| | ) |
| |
|
| | df = df.copy() |
| | |
| | df[target_col] = pd.to_numeric(df[target_col], errors="coerce") |
| | df["label"] = (df[target_col] > float(threshold)).astype(int) |
| |
|
| | label_counts: pd.DataFrame = df["label"].value_counts(dropna=False).rename_axis("label").to_frame("count") |
| | status_md += ( |
| | f"**閾値:** {threshold}\n\n" |
| | f"**目的変数の分布:**\n" |
| | f"- 正常(0): {int(label_counts.loc[0,'count']) if 0 in label_counts.index else 0}\n" |
| | f"- 悪化(1): {int(label_counts.loc[1,'count']) if 1 in label_counts.index else 0}\n" |
| | ) |
| |
|
| | |
| | X: pd.DataFrame = df.drop(columns=[target_col, "label"]) |
| | y: pd.Series = df["label"] |
| |
|
| | |
| | if "分散菌槽DO" in X.columns: |
| | X["分散菌槽DO"] = X["分散菌槽DO"].astype(str).str.replace(",", ".", regex=False) |
| | X["分散菌槽DO"] = pd.to_numeric(X["分散菌槽DO"], errors="coerce") |
| |
|
| | |
| | rows: List[Tuple[str, float, float]] = [] |
| | for col in X.columns: |
| | try: |
| | col_num = pd.to_numeric(X[col], errors="coerce") |
| | r, p = pointbiserialr(y, col_num) |
| | rows.append((col, float(r), float(p))) |
| | except Exception: |
| | rows.append((col, float("nan"), float("nan"))) |
| | corr_df: pd.DataFrame = ( |
| | pd.DataFrame(rows, columns=["feature", "r_pb", "pval"]) |
| | .set_index("feature") |
| | .sort_values(by="r_pb", key=lambda s: s.abs(), ascending=False) |
| | ) |
| |
|
| | |
| | ttest_rows: List[Dict[str, Union[str, float, int]]] = [] |
| | for col in X.columns: |
| | col_num = pd.to_numeric(X[col], errors="coerce") |
| | a = col_num[y == 0].dropna() |
| | b = col_num[y == 1].dropna() |
| | if len(a) > 1 and len(b) > 1: |
| | try: |
| | t, p = ttest_ind(a, b, equal_var=False) |
| | ttest_rows.append( |
| | { |
| | "feature": col, |
| | "mean_normal": float(a.mean()), |
| | "mean_bad": float(b.mean()), |
| | "pval": float(p), |
| | "n_normal": int(len(a)), |
| | "n_bad": int(len(b)), |
| | } |
| | ) |
| | except Exception: |
| | pass |
| | ttest_df: pd.DataFrame = ( |
| | pd.DataFrame(ttest_rows).set_index("feature").sort_values(by="pval", ascending=True) |
| | if ttest_rows else pd.DataFrame() |
| | ) |
| |
|
| | |
| | gallery_imgs: List[Tuple[np.ndarray, str]] = [] |
| | for col in X.columns: |
| | col_num = pd.to_numeric(X[col], errors="coerce") |
| | a_plot = col_num[y == 0].dropna() |
| | b_plot = col_num[y == 1].dropna() |
| | if len(a_plot) > 0 and len(b_plot) > 0: |
| | img_array = _boxplot_image(a_plot, b_plot, col) |
| | gallery_imgs.append((img_array, f"Boxplot: {col}")) |
| |
|
| | |
| | X_num = X.apply(pd.to_numeric, errors="coerce").select_dtypes(include=np.number) |
| | X_num = X_num.loc[:, X_num.notna().sum() > 0] |
| |
|
| | if X_num.shape[1] == 0: |
| | coef_df: pd.DataFrame = pd.DataFrame(columns=["feature", "coef", "sign", "rank"]).set_index("feature") |
| | status_md += "\n⚠️ 数値説明変数がありませんでした。係数は空です。" |
| | else: |
| | pipe: Pipeline = Pipeline( |
| | steps=[ |
| | ("imputer", SimpleImputer(strategy="median")), |
| | ("scaler", StandardScaler()), |
| | ("clf", LogisticRegression(max_iter=500, class_weight="balanced")), |
| | ] |
| | ) |
| | try: |
| | pipe.fit(X_num, y) |
| | coef = pd.Series(pipe.named_steps["clf"].coef_[0], index=X_num.columns) |
| | coef_abs_sorted = coef.abs().sort_values(ascending=False) |
| | top_features = coef_abs_sorted.head(int(top_k)).index.tolist() |
| |
|
| | coef_df = ( |
| | pd.DataFrame( |
| | { |
| | "coef": coef, |
| | "abs_coef": coef.abs(), |
| | "sign": np.where(coef > 0, "↑ (増加で悪化リスク上昇)", "↓ (増加で悪化リスク低下)"), |
| | } |
| | ) |
| | .sort_values(by="abs_coef", ascending=False) |
| | .drop(columns=["abs_coef"]) |
| | ) |
| | coef_df["rank"] = np.arange(1, len(coef_df) + 1) |
| | status_md += "\n\n**悪化原因の候補(上位{}項目)**:\n- ".format(int(top_k)) + "\n- ".join( |
| | [f"{f}: 係数={coef[f]:.3f} {('↑' if coef[f]>0 else '↓')}" for f in top_features] |
| | ) |
| | except Exception as e: |
| | status_md += f"\n❗ ロジスティック回帰の学習に失敗しました: {e}" |
| | coef_df = pd.DataFrame(columns=["feature", "coef", "sign", "rank"]).set_index("feature") |
| |
|
| | status_md += "\n\n✅ 解析完了:Supabase データに対して ロジスティック回帰 を実行しました。" |
| |
|
| | return status_md, head_df, label_counts, corr_df, ttest_df, gallery_imgs, coef_df |
| |
|
| |
|
| | |
| | with gr.Blocks(title="水質悪化原因分析") as demo: |
| | gr.Markdown( |
| | """ |
| | # 水質悪化原因分析 |
| | `.env` の **SUPABASE_URL** / **SUPABASE_KEY** を用意し、テーブル **estimated_cause_mocdata** からデータを取得して解析します。 |
| | 解析対象列は **CODcr(S)sin**(悪化=1 判定用)を想定しています。 |
| | """ |
| | ) |
| | with gr.Row(): |
| | threshold_in = gr.Number(value=100, precision=0, label="CODcr(S)sin の閾値(悪化=1)") |
| | topk_in = gr.Slider(1, 10, value=4, step=1, label="ロジスティック回帰の上位特徴量 数") |
| | run_btn = gr.Button("Supabase から取得して解析", variant="primary") |
| |
|
| | status_out = gr.Markdown() |
| | head_out = gr.Dataframe(label="データ先頭", interactive=False) |
| | label_out = gr.Dataframe(label="目的変数の分布", interactive=False) |
| | corr_out = gr.Dataframe(label="相関 (point-biserial)", interactive=False) |
| | ttest_out = gr.Dataframe(label="t検定結果(p値の小さい順)", interactive=False) |
| | gallery_out = gr.Gallery(label="箱ひげ図(正常 vs 悪化)", columns=2, height="auto") |
| | coef_out = gr.Dataframe(label="ロジスティック回帰 係数ランキング", interactive=False) |
| |
|
| | run_btn.click( |
| | analyze_from_supabase, |
| | inputs=[threshold_in, topk_in], |
| | outputs=[status_out, head_out, label_out, corr_out, ttest_out, gallery_out, coef_out], |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | |
| | demo.launch(mcp_server=True) |