# 統合版 Gradio アプリ (閾値診断 + 傾向検出 + 予兆解析 + 変動解析, Supabase固定ファイル) import gradio as gr import pandas as pd import numpy as np import json from dotenv import load_dotenv import os from supabase import create_client, Client from sklearn.linear_model import LinearRegression # .envファイルを読み込む load_dotenv() # --- Supabase 設定 --- SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") SUPABASE_BUCKET = os.getenv("SUPABASE_BUCKET") CSV_FILE = os.getenv("CSV_PATH") EXCEL_FILE = os.getenv("EXCEL_PATH") LAG_FILE = os.getenv("LAG_PATH") TMP_DIR = "./tmp" def _create_supabase_client() -> Client | None: """ Create Supabase client if credentials are available; return None otherwise. """ if not SUPABASE_URL or not SUPABASE_KEY: return None try: return create_client(SUPABASE_URL, SUPABASE_KEY) except Exception: return None supabase: Client | None = _create_supabase_client() # --- 共通ユーティリティ --- def convert_value(v) -> int | float | str: """ numpy型を Python 標準の型に変換する。 Args: v: numpyの値またはPythonの値 Returns: int | float | str: Python標準型に変換した値 """ if hasattr(v, "item"): return v.item() if isinstance(v, (np.integer, int)): return int(v) if isinstance(v, (np.floating, float)): return float(v) return v def normalize(s: str) -> str: """ 文字列を正規化(全角スペース・改行削除)する。 Args: s (str): 入力文字列 Returns: str: 正規化後の文字列 """ return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip() def col_to_str(col) -> str: """ Convert multi-index column tuple to a single string identifier. """ return "_".join([str(c) for c in col if c]) if isinstance(col, tuple) else str(col) def find_matching_column(df: pd.DataFrame, col_id: str, item_name: str, process_name: str) -> str | None: """ DataFrame内で指定条件に一致する列を探索する。 Args: df (pd.DataFrame): データフレーム col_id (str): カラムID item_name (str): 項目名 process_name (str): プロセス名 Returns: str | None: 一致するカラム名、見つからなければ None """ norm_item = normalize(item_name) candidates = [ c for c in df.columns if isinstance(c, str) and col_id in c and process_name in c and norm_item in normalize(c) ] return candidates[0] if candidates else None # --- グローバル変数 --- DF = None THRESHOLDS_DF = None LAG_MATRIX = None INITIAL_CHOICES = [] # --- Supabaseからファイルをロード ---- def load_from_supabase() -> str: """ Supabase Storage から CSV・Excel・ラグファイルを読み込み、 グローバル変数 (DF, THRESHOLDS_DF, LAG_MATRIX, INITIAL_CHOICES) を更新する。 Returns: str: 読み込み結果のメッセージ """ global DF, THRESHOLDS_DF, LAG_MATRIX, INITIAL_CHOICES try: if supabase is None: return "❌ Supabase クライアント未初期化: 環境変数 SUPABASE_URL/SUPABASE_KEY を確認してください" if not SUPABASE_BUCKET: return "❌ SUPABASE_BUCKET が未設定です" # --- CSV --- csv_path = CSV_FILE if not csv_path: return "❌ CSV_PATH が未設定です" os.makedirs(TMP_DIR, exist_ok=True) csv_bytes = supabase.storage.from_(SUPABASE_BUCKET).download(csv_path) with open(os.path.join(TMP_DIR, "data.csv"), "wb") as f: f.write(csv_bytes) DF = pd.read_csv(os.path.join(TMP_DIR, "data.csv"), header=[0, 1, 2]) timestamp_col = pd.to_datetime(DF.iloc[:, 0], errors="coerce") DF = DF.drop(DF.columns[0], axis=1) DF.insert(0, "timestamp", timestamp_col) DF.columns = [ "timestamp" if (isinstance(c, str) and c == "timestamp") else col_to_str(c) for c in DF.columns ] # --- Excel 閾値 --- excel_path = EXCEL_FILE if not excel_path: return "❌ EXCEL_PATH が未設定です" thr_bytes = supabase.storage.from_(SUPABASE_BUCKET).download(excel_path) with open(os.path.join(TMP_DIR, "thresholds.xlsx"), "wb") as f: f.write(thr_bytes) THRESHOLDS_DF = pd.read_excel(os.path.join(TMP_DIR, "thresholds.xlsx")) THRESHOLDS_DF["Important"] = ( THRESHOLDS_DF["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) ) for col in ["LL", "L", "H", "HH"]: if col in THRESHOLDS_DF.columns: THRESHOLDS_DF[col] = pd.to_numeric(THRESHOLDS_DF[col], errors="coerce") # --- ラグファイル --- lag_path = LAG_FILE if not lag_path: return "❌ LAG_PATH が未設定です" lag_bytes = supabase.storage.from_(SUPABASE_BUCKET).download(lag_path) with open(os.path.join(TMP_DIR, "lag.xlsx"), "wb") as f: f.write(lag_bytes) LAG_MATRIX = pd.read_excel(os.path.join(TMP_DIR, "lag.xlsx"), index_col=0) print(50*'=') print('INITIAL_CHOICES') print(50*'-') # INITIAL_CHOICES を更新 if THRESHOLDS_DF is not None and "ProcessNo_ProcessName" in THRESHOLDS_DF.columns: INITIAL_CHOICES = sorted(THRESHOLDS_DF["ProcessNo_ProcessName"].dropna().unique().tolist()) else: INITIAL_CHOICES = [] print('INITIAL_CHOICES: ', INITIAL_CHOICES) print(50*'=') return "✅ Supabase からファイルを読み込み成功" except Exception as e: return f"❌ Supabase からの読み込み失敗: {e}" # --- Tab1: 閾値診断 --- def judge_status(value: float, ll: float, l: float, h: float, hh: float) -> str: """ 測定値を閾値と比較してステータスを判定する。 Returns: str: 判定結果 ("LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH") """ if pd.notna(ll) and value < ll: return "LOW-LOW" elif pd.notna(l) and value < l: return "LOW" elif pd.notna(hh) and value > hh: return "HIGH-HIGH" elif pd.notna(h) and value > h: return "HIGH" else: return "OK" def parse_datetime_safe(s: str) -> pd.Timestamp | None: """ / や - 区切り、ゼロ埋めの有無などを吸収して datetime に変換する。 """ try: return pd.to_datetime(s, errors="coerce") except Exception: return None def _diagnose_process_range_core(process_name: str, datetime_str: str, window_minutes: int ) -> tuple[pd.DataFrame | None, pd.DataFrame | None, pd.DataFrame | None, str, str]: """ 指定プロセスに対して閾値診断を行い、テーブル・サマリー・JSONを生成するコア処理。 """ global DF, THRESHOLDS_DF if DF is None or THRESHOLDS_DF is None: return None, None, None, "⚠ ファイル未読み込み", None # ✅ 日時を安全に解釈 target_time = parse_datetime_safe(datetime_str) if target_time is None or pd.isna(target_time): return None, None, None, f"⚠ 入力した日時 {datetime_str} が無効です。", None start_time = target_time - pd.Timedelta(minutes=window_minutes) end_time = target_time df_window = DF[(DF["timestamp"] >= start_time) & (DF["timestamp"] <= end_time)] if df_window.empty: return None, None, None, "⚠ 指定した時間幅にデータが見つかりません。", None # ✅ プロセス名を normalize して突き合わせる proc_thresholds = THRESHOLDS_DF[ THRESHOLDS_DF["ProcessNo_ProcessName"].apply(normalize) == normalize(process_name) ] if proc_thresholds.empty: return None, None, None, f"⚠ プロセス {process_name} の閾値が設定されていません。", None all_results = [] for _, row in df_window.iterrows(): for _, thr in proc_thresholds.iterrows(): col_name = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}" if col_name not in DF.columns: continue value = row[col_name] status = judge_status(value, thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")) all_results.append({ "ColumnID": thr["ColumnID"], "ItemName": thr["ItemName"], "判定": status, "重要項目": bool(thr.get("Important", False)), "時刻": str(row["timestamp"]) }) total = len(all_results) status_counts = pd.Series([r["判定"] for r in all_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio = (status_counts / total * 100).round(1) result_df_all = pd.DataFrame({"状態": status_counts.index, "件数": status_counts.values, "割合(%)": status_ratio.values}) important_results = [r for r in all_results if r["重要項目"]] if important_results: total_imp = len(important_results) status_counts_imp = pd.Series([r["判定"] for r in important_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio_imp = (status_counts_imp / total_imp * 100).round(1) result_df_imp = pd.DataFrame({"状態": status_counts_imp.index, "件数": status_counts_imp.values, "割合(%)": status_ratio_imp.values}) else: result_df_imp = pd.DataFrame(columns=["状態", "件数", "割合(%)"]) status_ratio_imp = pd.Series(dtype=float) result_per_item = [] for item in set([r["ItemName"] for r in important_results]): item_results = [r for r in important_results if r["ItemName"] == item] total_item = len(item_results) status_counts_item = pd.Series([r["判定"] for r in item_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio_item = (status_counts_item / total_item * 100).round(1) for s, c, r in zip(status_counts_item.index, status_counts_item.values, status_ratio_item.values): result_per_item.append({"ItemName": item, "状態": s, "件数": int(c), "割合(%)": float(r)}) result_df_imp_items = pd.DataFrame(result_per_item) summary = ( f"✅ {process_name} の診断完了({start_time} ~ {end_time})\n" + "[全項目] " + " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio.items()]) + "\n" + "[重要項目全体] " + (" / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio_imp.items()]) if not result_df_imp.empty else "対象データなし") ) json_data = { "集計結果": { "全項目割合": {k: float(v) for k, v in status_ratio.to_dict().items()}, "重要項目全体割合": {k: float(v) for k, v in status_ratio_imp.to_dict().items()} if not result_df_imp.empty else {}, "重要項目ごと割合": [dict(row) for _, row in result_df_imp_items.iterrows()] } } result_json = json.dumps(json_data, ensure_ascii=False, indent=2) return result_df_all, result_df_imp, result_df_imp_items, summary, result_json def diagnose_process_range_ui( process_name: str, datetime_str: str, window_minutes: int ) -> tuple[pd.DataFrame | None, pd.DataFrame | None, pd.DataFrame | None, str]: """ UI 用: Dropdown で選択されたプロセス名を受け取り、 閾値診断を実行してテーブルとサマリーを返す。 Args: process_name (str): UI の Dropdown から選択されたプロセス名 datetime_str (str): 基準日時(例: "2024-06-01 12:00") window_minutes (int): 遡る時間幅(分) Returns: tuple: result_df_all (pd.DataFrame | None): 全項目の状態集計 result_df_imp (pd.DataFrame | None): 重要項目全体の状態集計 result_df_imp_items (pd.DataFrame | None): 重要項目ごとの状態集計 summary (str): サマリーテキスト """ result_df_all, result_df_imp, result_df_imp_items, summary, _ = _diagnose_process_range_core( process_name, datetime_str, window_minutes ) return result_df_all, result_df_imp, result_df_imp_items, summary def diagnose_process_range_json( process_name: str, datetime_str: str, window_minutes: int ) -> dict: """ 指定されたプロセス名、基準日時、さかのぼる時間幅(分)を受け取り、閾値診断を実行し、その結果をJSON形式(dict)で返す関数。 Args: process_name (str): 診断対象となるプロセス名。UIのドロップダウン等で選択された値を想定。 datetime_str (str): 診断の基準となる日時(例: "2024-06-01 12:00")。この日時を基準にデータを抽出する。 window_minutes (int): 基準日時からさかのぼる時間幅(分単位)。この範囲のデータで診断を行う。 Returns: dict: 閾値診断の結果を含むJSONオブジェクト。 - "meta": 入力パラメータ情報(process_name, datetime_str, window_minutes) - "tab1": - "summary": 診断のサマリーテキスト(例: 正常/異常の割合や診断完了メッセージ等) - "results": - "全項目集計": 全項目の状態集計(リスト形式) - "重要項目全体集計": 重要項目全体の状態集計(リスト形式) - "重要項目ごと集計": 重要項目ごとの状態集計(リスト形式) 例外発生時は、"summary"にエラーメッセージ、各集計は空リストとなる。 注意: - 内部で _diagnose_process_range_core を呼び出し、DataFrameやサマリー、JSON文字列を取得する。 - 取得したJSON文字列が有効な場合はそれをパースして返す。無効な場合や例外時はDataFrameをdict化して返す。 - MCP等のAPI連携や外部システム連携用途を想定。 """ print(50*'=') print('in diagnose_process_range_json()') print('process_name: ', process_name) print('datetime_str: ', datetime_str) print('window_minutes: ', window_minutes) print(50*'=') try: result_df_all, result_df_imp, result_df_imp_items, summary, result_json = _diagnose_process_range_core( process_name, datetime_str, window_minutes ) # --- case1: core が JSON を返している場合 --- if result_json and isinstance(result_json, str): try: parsed = json.loads(result_json) return parsed except Exception: # JSON が壊れている場合 → fallback pass # --- case2: fallback (DataFrameをdictに変換) --- def df_to_records(df): return df.to_dict(orient="records") if df is not None else [] return { "meta": { "process_name": process_name, "datetime_str": datetime_str, "window_minutes": window_minutes, }, "tab1": { "summary": summary if summary else "診断結果は得られませんでした。", "results": { "全項目集計": df_to_records(result_df_all), "重要項目全体集計": df_to_records(result_df_imp), "重要項目ごと集計": df_to_records(result_df_imp_items), }, }, } except Exception as e: # --- case3: 例外発生時 --- return { "meta": { "process_name": process_name, "datetime_str": datetime_str, "window_minutes": window_minutes, }, "tab1": { "summary": f"診断エラー: {str(e)}", "results": { "全項目集計": [], "重要項目全体集計": [], "重要項目ごと集計": [], }, }, } # --- Tab2: 傾向検出 --- def detect_trends_with_forecast(process_name: str, datetime_str: str, window_minutes: int, forecast_minutes: int, downsample_factor: int ) -> tuple[pd.DataFrame | None, str, str]: """ 線形回帰を用いて傾向検出と未来予測を行う。 Args: process_name (str): プロセス名 datetime_str (str): 基準日時 window_minutes (int): 遡る時間幅 forecast_minutes (int): 予測時間幅 downsample_factor (int): サンプリング粗化倍率 Returns: tuple: (予測結果DataFrame, サマリーテキスト, JSON文字列) """ global DF, THRESHOLDS_DF if DF is None or THRESHOLDS_DF is None: return None, "⚠ ファイル未読み込み", None target_time = pd.to_datetime(datetime_str) start_time = target_time - pd.Timedelta(minutes=window_minutes) df_window = DF[(DF["timestamp"] >= start_time) & (DF["timestamp"] <= target_time)] if df_window.empty: return None, "⚠ データなし", None interval = df_window["timestamp"].diff().median() if pd.isna(interval): return None, "⚠ サンプリング間隔検出失敗", None interval_minutes = interval.total_seconds() / 60 df_window = df_window.iloc[::int(downsample_factor), :].reset_index(drop=True) effective_interval = interval_minutes * int(downsample_factor) proc_thresholds = THRESHOLDS_DF[ (THRESHOLDS_DF["ProcessNo_ProcessName"].apply(normalize) == normalize(process_name)) & (THRESHOLDS_DF["Important"] == True) ] if proc_thresholds.empty: return None, f"⚠ {process_name} の重要項目なし", None results = [] for _, thr in proc_thresholds.iterrows(): col_tuple = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}" if col_tuple not in DF.columns: continue series = df_window[col_tuple].dropna() if len(series) < 3: continue x = np.arange(len(series)).reshape(-1, 1) y = series.values.reshape(-1, 1) model = LinearRegression().fit(x, y) slope = model.coef_[0][0] last_val = series.iloc[-1] forecast_steps = int(forecast_minutes / effective_interval) forecast_val = model.predict([[len(series) + forecast_steps]])[0][0] forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) risk = "安定" if pd.notna(thr.get("LL")) and forecast_val <= thr["LL"]: risk = "LL逸脱予測" elif pd.notna(thr.get("HH")) and forecast_val >= thr["HH"]: risk = "HH逸脱予測" results.append({ "ItemName": thr["ItemName"], "傾き": round(float(slope), 4), "最終値": round(float(last_val), 3), "予測値": round(float(forecast_val), 3), "予測時刻": str(forecast_time), "予測リスク": risk, "粗化間隔(分)": round(effective_interval, 2) }) result_df = pd.DataFrame(results) result_json = json.dumps(results, ensure_ascii=False, indent=2) return result_df, "✅ 傾向検出完了", result_json # --- Tab2: 傾向検出 JSON 版 --- def detect_trends_with_forecast_json( process_name: str, datetime_str: str, window_minutes: int, forecast_minutes: int, downsample_factor: int ) -> dict: """ MCP 用: 傾向検出を実行して JSON(dict) を返す """ result_df, summary, result_json = detect_trends_with_forecast( process_name, datetime_str, window_minutes, forecast_minutes, downsample_factor ) # JSON文字列を優先的にパース if result_json and isinstance(result_json, str): try: parsed = json.loads(result_json) return { "meta": { "process_name": process_name, "datetime_str": datetime_str, "window_minutes": window_minutes, "forecast_minutes": forecast_minutes, }, "tab2": { "summary": summary, "results": parsed } } except Exception: pass # fallback return { "meta": { "process_name": process_name, "datetime_str": datetime_str, "window_minutes": window_minutes, "forecast_minutes": forecast_minutes, }, "tab2": { "summary": summary if summary else "診断結果なし", "results": result_df.to_dict(orient="records") if result_df is not None else [], }, } # --- Tab3: 予兆解析 --- def forecast_process_with_lag(process_name: str, datetime_str: str, forecast_minutes: int, downsample_factor: int = 3 ) -> tuple[pd.DataFrame | None, str, str]: """ ラグ行列を利用して予兆解析を行う。 Args: process_name (str): プロセス名 datetime_str (str): 基準日時 forecast_minutes (int): 予測時間幅 downsample_factor (int): サンプリング粗化倍率 Returns: tuple: (解析結果DataFrame, サマリーテキスト, JSON文字列) """ global DF, THRESHOLDS_DF, LAG_MATRIX if DF is None or THRESHOLDS_DF is None or LAG_MATRIX is None: return None, "⚠ ファイル未読み込み", None target_time = pd.to_datetime(datetime_str) forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) proc_thresholds = THRESHOLDS_DF[ (THRESHOLDS_DF["ProcessNo_ProcessName"].apply(normalize) == normalize(process_name)) & (THRESHOLDS_DF["Important"] == True) ] if proc_thresholds.empty: return None, f"⚠ {process_name} の重要項目なし", None # ✅ normalize対応のインデックス探索 matched_idx = None for idx in LAG_MATRIX.index: if normalize(idx) == normalize(process_name): matched_idx = idx break if matched_idx is None: return None, f"⚠ {process_name} のラグ行なし", None lag_row = LAG_MATRIX.loc[matched_idx].dropna() lag_row = lag_row[lag_row > 0] # 正のラグのみ if lag_row.empty: return None, f"⚠ {process_name} に正のラグなし", None results = [] for _, thr in proc_thresholds.iterrows(): y_col = find_matching_column(DF, thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) if y_col is None: continue # 学習データ(直近24h) df_window = DF[(DF["timestamp"] >= target_time - pd.Timedelta(hours=24)) & (DF["timestamp"] <= target_time)].copy() if df_window.empty: continue base_df = df_window[["timestamp", y_col]].rename(columns={y_col: "y"}) # === 粗化処理 === dt = df_window["timestamp"].diff().median() base_min = max(int(dt.total_seconds() // 60), 1) if pd.notna(dt) else 1 new_interval = max(base_min * int(downsample_factor), 1) df_down = (base_df.set_index("timestamp") .resample(f"{new_interval}min").mean() .dropna() .reset_index()) merged_df = df_down.copy() for up_proc, lag_min in lag_row.items(): up_cols = [c for c in DF.columns if isinstance(c, str) and up_proc in c] for x_col in up_cols: shifted = df_window.loc[:, ["timestamp", x_col]].copy() shifted["timestamp"] = shifted["timestamp"] + pd.Timedelta(minutes=lag_min) shifted = shifted.rename(columns={x_col: f"{x_col}_lag{lag_min}"}) merged_df = pd.merge_asof( merged_df.sort_values("timestamp"), shifted.sort_values("timestamp"), on="timestamp", direction="nearest" ) X_all = merged_df.drop(columns=["timestamp", "y"], errors="ignore").values Y_all = merged_df["y"].values if X_all.shape[1] == 0 or len(Y_all) < 5: continue model = LinearRegression().fit(X_all, Y_all) # 未来予測 X_pred = [] for up_proc, lag_min in lag_row.items(): up_cols = [c for c in DF.columns if isinstance(c, str) and up_proc in c] for x_col in up_cols: ref_time = forecast_time - pd.Timedelta(minutes=lag_min) idx = (DF["timestamp"] - ref_time).abs().idxmin() X_pred.append(DF.loc[idx, x_col]) if not X_pred: continue pred_val = model.predict([X_pred])[0] # 閾値リスク判定 ll, l, h, hh = thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH") risk = "OK" if pd.notna(ll) and pred_val <= ll: risk = "LOW-LOW" elif pd.notna(l) and pred_val <= l: risk = "LOW" elif pd.notna(hh) and pred_val >= hh: risk = "HIGH-HIGH" elif pd.notna(h) and pred_val >= h: risk = "HIGH" results.append({ "ItemName": thr["ItemName"], "予測値": round(float(pred_val), 3), "予測時刻": str(forecast_time), "予測リスク": risk, "粗化間隔(分)": new_interval }) result_df = pd.DataFrame(results) result_json = json.dumps(results, ensure_ascii=False, indent=2) return result_df, f"✅ {process_name} の予兆解析完了", result_json # --- Tab3: 予兆解析 JSON 版 --- def forecast_process_with_lag_json( process_name: str, datetime_str: str, forecast_minutes: int, downsample_factor: int = 3 ) -> dict: """ MCP 用: 予兆解析を実行して JSON(dict) を返す """ result_df, summary, result_json = forecast_process_with_lag( process_name, datetime_str, forecast_minutes, downsample_factor ) if result_json and isinstance(result_json, str): try: return json.loads(result_json) except Exception: pass return { "meta": { "process_name": process_name, "datetime_str": datetime_str, "forecast_minutes": forecast_minutes, }, "tab3": { "summary": summary if summary else "診断結果なし", "results": result_df.to_dict(orient="records") if result_df is not None else [], }, } # --- Tab4: 変動解析 --- def robust_mad(x: pd.Series) -> float: """ Median Absolute Deviation (MAD) を計算する。 Args: x (pd.Series): 数値系列 Returns: float: MAD値 """ if len(x) == 0: return np.nan med = np.median(x) mad = np.median(np.abs(x - med)) return 1.4826 * mad def analyze_variability(datetime_str: str, window_minutes: int, cv_threshold_pct: float = 10.0, jump_pct_threshold: float = 10.0, mad_sigma: float = 3.0 ) -> tuple[pd.DataFrame | None, str, str]: """ 変動解析を行い、不安定項目を検出する。 Args: datetime_str (str): 基準日時 window_minutes (int): 遡る時間幅 cv_threshold_pct (float): CVしきい値 (%) jump_pct_threshold (float): ジャンプ率しきい値 (%) mad_sigma (float): MAD倍率 Returns: tuple: (解析結果DataFrame, サマリーテキスト, JSON文字列) """ global DF if DF is None: return None, "⚠ ファイル未読み込み", None target_time = pd.to_datetime(datetime_str) start_time = target_time - pd.Timedelta(minutes=window_minutes) end_time = target_time dfw = DF[(DF["timestamp"] >= start_time) & (DF["timestamp"] <= end_time)].copy() if dfw.empty: return None, f"⚠ 指定時間幅({start_time}~{end_time})にデータが見つかりません。", None data_cols = [c for c in dfw.columns if c != "timestamp" and pd.api.types.is_numeric_dtype(dfw[c])] results = [] unstable_count = 0 for col in data_cols: s = dfw[col].dropna() n = len(s) if n < 3: continue mean = float(np.mean(s)) std = float(np.std(s, ddof=1)) if n >= 2 else 0.0 cv_pct = abs(std / mean) * 100.0 if mean != 0 else np.nan diffs = s.diff().dropna() mad_scale = robust_mad(diffs) ref = max(1e-9, abs(float(np.median(s)))) rel_jump = diffs.abs() / ref * 100.0 abs_cond = diffs.abs() > (mad_sigma * mad_scale if not np.isnan(mad_scale) and mad_scale > 0 else np.inf) pct_cond = rel_jump >= jump_pct_threshold spike_mask = abs_cond | pct_cond spike_count = int(spike_mask.sum()) spike_up_count = int((diffs[spike_mask] > 0).sum()) spike_down_count = spike_count - spike_up_count max_step = float(diffs.abs().max()) if len(diffs) else np.nan last_val, first_val = float(s.iloc[-1]), float(s.iloc[0]) unstable = (not np.isnan(cv_pct) and cv_pct >= cv_threshold_pct) or (spike_count > 0) if unstable: unstable_count += 1 results.append({ "項目名": col, "サンプル数": n, "平均": mean, "標準偏差": std, "CV(%)": None if np.isnan(cv_pct) else round(cv_pct, 3), "スパイク数": spike_count, "スパイク上昇数": spike_up_count, "スパイク下降数": spike_down_count, "最大|ステップ|": None if np.isnan(max_step) else round(max_step, 6), "最初の値": first_val, "最後の値": last_val, "不安定判定": bool(unstable) }) result_df = pd.DataFrame(results) if not result_df.empty: result_df = result_df.sort_values( by=["不安定判定", "CV(%)", "スパイク数"], ascending=[False, False, False], na_position="last" ).reset_index(drop=True) summary = ( f"✅ 変動解析完了({start_time} ~ {end_time})\n" f"- 対象項目数: {len(results)}\n" f"- 不安定と判定: {unstable_count} 項目(CV≥{cv_threshold_pct:.1f}% または スパイクあり)\n" f"- スパイク条件: |diff| > {mad_sigma:.1f}×MAD または 相対変化 ≥ {jump_pct_threshold:.1f}%" ) # NaN / inf を安全に処理して JSON 化 safe_df = result_df.replace([np.inf, -np.inf], np.nan).fillna("null") result_json = json.dumps(safe_df.to_dict(orient="records"), ensure_ascii=False, indent=2) return result_df, summary, result_json # --- Tab4: 変動解析 JSON 版 --- def analyze_variability_json( datetime_str: str, window_minutes: int, cv_threshold_pct: float = 10.0, jump_pct_threshold: float = 10.0, mad_sigma: float = 3.0 ) -> dict: """ MCP 用: 変動解析を実行して JSON(dict) を返す。 返すのは 項目名, 平均, CV(%), スパイク数, 不安定判定 のみ(上位10件)。 """ result_df, summary, _ = analyze_variability( datetime_str, window_minutes, cv_threshold_pct, jump_pct_threshold, mad_sigma ) if result_df is not None and not result_df.empty: # 必要なカラムだけに絞り込んで上位10件 top10 = ( result_df.dropna(subset=["CV(%)"]) .sort_values(by="CV(%)", ascending=False) .head(10)[["項目名", "平均", "CV(%)", "スパイク数", "不安定判定"]] ) results = top10.to_dict(orient="records") else: results = [] return result_df, summary, { "meta": { "datetime_str": datetime_str, "window_minutes": window_minutes, "cv_threshold_pct": cv_threshold_pct, "jump_pct_threshold": jump_pct_threshold, "mad_sigma": mad_sigma, }, "tab4": { "summary": summary if summary else "診断結果なし", "top10_by_cv": results, }, } # UI初期化前にload_from_supabaseを呼び出し load_from_supabase() # --- Gradio UI --- with gr.Blocks(css=".gradio-container {overflow: auto !important;}") as demo: gr.Markdown("## 統合トレンド解析アプリ (MCP対応, Supabase固定ファイル)") # --- プロセス入力欄 --- process_selector = gr.Dropdown( label="プロセス名 (UI + MCP)", choices=INITIAL_CHOICES, # ← グローバル変数を参照 allow_custom_value=True, interactive=True ) # 共通入力欄 with gr.Row(): datetime_str_global = gr.DateTime(label="診断基準日時", value='2025-08-02 12:00:00', type='string') window_minutes_global = gr.Number(label="さかのぼる時間幅(分)", value=60) forecast_minutes_global = gr.Number(label="未来予測時間幅(分)", value=60) # Tabs with gr.Tabs(): with gr.Tab("閾値診断"): run_btn1 = gr.Button("診断実行") result_df_all = gr.Dataframe(label="全項目の状態集計") result_df_imp = gr.Dataframe(label="重要項目全体の状態集計") result_df_imp_items = gr.Dataframe(label="重要項目ごとの状態集計") summary_output = gr.Textbox(label="サマリー") json_output = gr.Json(label="JSON集計結果") # ✅ UI用 run_btn1.click( diagnose_process_range_ui, inputs=[process_selector, datetime_str_global, window_minutes_global], outputs=[result_df_all, result_df_imp, result_df_imp_items, summary_output] ) # ✅ MCP用 (UIへの出力は不要にする) run_btn1.click( diagnose_process_range_json, inputs=[process_selector, datetime_str_global, window_minutes_global], outputs=[json_output] ) with gr.Tab("傾向検出"): downsample_factor2 = gr.Slider(label="粗化倍率", minimum=1, maximum=10, step=1, value=3) run_btn2 = gr.Button("傾向検出実行") result_df2 = gr.Dataframe(label="傾向+予測結果") summary_output2 = gr.Textbox(label="サマリー") json_output2 = gr.Json(label="JSON結果") # ✅ UI用 run_btn2.click( detect_trends_with_forecast, inputs=[process_selector, datetime_str_global, window_minutes_global, forecast_minutes_global, downsample_factor2], outputs=[result_df2, summary_output2, json_output2] ) # ✅ MCP用 run_btn2.click( detect_trends_with_forecast_json, inputs=[process_selector, datetime_str_global, window_minutes_global, forecast_minutes_global, downsample_factor2], outputs=[json_output2] ) with gr.Tab("予兆解析"): downsample_factor3 = gr.Slider(1, 10, value=3, step=1, label="粗化倍率(サンプリング間引き)") run_btn3 = gr.Button("予兆解析実行") result_df3 = gr.Dataframe(label="予兆解析結果") summary_output3 = gr.Textbox(label="サマリー") json_output3 = gr.Json(label="JSON結果") # ✅ UI用 run_btn3.click( forecast_process_with_lag, inputs=[process_selector, datetime_str_global, forecast_minutes_global, downsample_factor3], outputs=[result_df3, summary_output3, json_output3] ) # ✅ MCP用 run_btn3.click( forecast_process_with_lag_json, inputs=[process_selector, datetime_str_global, forecast_minutes_global, downsample_factor3], outputs=[json_output3] ) with gr.Tab("変動解析"): cv_threshold4 = gr.Number(label="CVしきい値(%)", value=10.0) jump_threshold4 = gr.Number(label="ジャンプ率しきい値(%)", value=10.0) mad_sigma4 = gr.Number(label="MAD倍率", value=3.0) run_btn4 = gr.Button("変動解析実行") result_df4 = gr.Dataframe(label="変動解析結果") summary_output4 = gr.Textbox(label="サマリー") json_output4 = gr.Json(label="JSON結果") # ✅ UI + MCP 共通 run_btn4.click( analyze_variability_json, inputs=[datetime_str_global, window_minutes_global, cv_threshold4, jump_threshold4, mad_sigma4], outputs=[result_df4, summary_output4, json_output4] ) if __name__ == "__main__": demo.launch(mcp_server=True)