# 予兆解析アプリ Gradio + MCP対応版 import gradio as gr import pandas as pd import numpy as np from sklearn.linear_model import LinearRegression import json import os # --- ユーティリティ --- def normalize(s): return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip() def find_matching_column(df, col_id, item_name, process_name): norm_item = normalize(item_name) candidates = [ c for c in df.columns if isinstance(c, str) and col_id in c and process_name in c and norm_item in normalize(c) ] return candidates[0] if candidates else None # --- 予兆解析関数 --- def forecast_process_with_lag(csv_file, excel_file, lag_file, process_name, datetime_str, forecast_minutes): try: # CSV 読み込み(3行ヘッダー) df = pd.read_csv(csv_file.name, header=[0, 1, 2]) timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce") df = df.drop(df.columns[0], axis=1) df.insert(0, "timestamp", timestamp_col) # MultiIndex → 文字列化 def col_to_str(col): return "_".join([str(c) for c in col if c]) if isinstance(col, tuple) else str(col) df.columns = [ "timestamp" if (isinstance(c, str) and c == "timestamp") else col_to_str(c) for c in df.columns ] # 閾値テーブル thresholds_df = pd.read_excel(excel_file.name) thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) for col in ["LL", "L", "H", "HH"]: if col in thresholds_df.columns: thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce") # ラグテーブル lag_matrix = pd.read_excel(lag_file.name, index_col=0) except Exception as e: return None, f"❌ 入力ファイルの読み込みに失敗しました: {e}", None try: target_time = pd.to_datetime(datetime_str) forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) except Exception: return None, f"⚠ 入力した日時 {datetime_str} が無効です。", None proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & (thresholds_df["Important"] == True)] if proc_thresholds.empty: return None, f"⚠ プロセス {process_name} に重要項目なし", None if process_name not in lag_matrix.index: return None, f"⚠ タイムラグ表に {process_name} の行がありません", None lag_row = lag_matrix.loc[process_name].dropna() lag_row = lag_row[lag_row > 0] # 正のラグのみ if lag_row.empty: return None, f"⚠ プロセス {process_name} に正のラグを持つ上流工程がありません", None results = [] for _, thr in proc_thresholds.iterrows(): y_col = find_matching_column(df, thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) if y_col is None: continue # 学習データ(直近24時間) df_window = df[df["timestamp"] <= target_time].copy() df_window = df_window[df_window["timestamp"] >= target_time - pd.Timedelta(hours=24)] if df_window.empty: continue try: base_df = df_window[["timestamp", y_col]].rename(columns={y_col: "y"}) except KeyError: continue merged_df = base_df.copy() for up_proc, lag_min in lag_row.items(): try: up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c] for x_col in up_cols: shifted = df_window.loc[:, ["timestamp", x_col]].copy() shifted["timestamp"] = shifted["timestamp"] + pd.Timedelta(minutes=lag_min) shifted = shifted.rename(columns={x_col: f"{x_col}_lag{lag_min}"}) merged_df = pd.merge_asof( merged_df.sort_values("timestamp"), shifted.sort_values("timestamp"), on="timestamp", direction="nearest" ) except Exception: continue X_all = merged_df.drop(columns=["timestamp", "y"], errors="ignore").values Y_all = merged_df["y"].values if X_all.shape[1] == 0 or len(Y_all) < 5: continue # モデル学習 model = LinearRegression().fit(X_all, Y_all) # 未来予測 X_pred = [] for up_proc, lag_min in lag_row.items(): up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c] for x_col in up_cols: try: ref_time = forecast_time - pd.Timedelta(minutes=lag_min) idx = (df["timestamp"] - ref_time).abs().idxmin() X_pred.append(df.loc[idx, x_col]) except Exception: continue if not X_pred: continue pred_val = model.predict([X_pred])[0] # 閾値リスク判定 ll, l, h, hh = thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH") risk = "OK" if pd.notna(ll) and pred_val <= ll: risk = "LOW-LOW" elif pd.notna(l) and pred_val <= l: risk = "LOW" elif pd.notna(hh) and pred_val >= hh: risk = "HIGH-HIGH" elif pd.notna(h) and pred_val >= h: risk = "HIGH" results.append({ "ItemName": thr["ItemName"], "予測値": round(float(pred_val), 3), "予測時刻": str(forecast_time), "予測リスク": risk, "使用上流工程数": len(lag_row) }) result_df = pd.DataFrame(results) result_json = json.dumps(results, ensure_ascii=False, indent=2) summary = f"✅ {process_name} の予兆解析完了 ({target_time} → {forecast_time})" return result_df, summary, result_json # --- Gradio UI --- with gr.Blocks(css="body {overflow-y: scroll;}") as demo: gr.Markdown("## 予兆解析アプリ (MCP対応)") with gr.Row(): csv_input = gr.File(label="CSVファイルをアップロード", file_types=[".csv"], type="filepath") excel_input = gr.File(label="Excel閾値ファイルをアップロード", file_types=[".xlsx"], type="filepath") lag_input = gr.File(label="タイムラグファイルをアップロード", file_types=[".xlsx"], type="filepath") process_name = gr.Textbox(label="プロセス名", value="E018-A012_除害RO") datetime_str = gr.Textbox(label="基準日時", value="2025/8/2 0:05") forecast_minutes = gr.Number(label="予測時間幅(分)", value=60) run_btn = gr.Button("予兆解析を実行") result_df = gr.Dataframe(label="予兆解析結果", wrap=True, interactive=False) summary_output = gr.Textbox(label="サマリー") json_output = gr.Json(label="JSON結果") run_btn.click( forecast_process_with_lag, inputs=[csv_input, excel_input, lag_input, process_name, datetime_str, forecast_minutes], outputs=[result_df, summary_output, json_output] ) if __name__ == "__main__": use_mcp = os.getenv("USE_MCP", "0") == "1" if use_mcp: demo.launch(mcp_server=True) else: demo.launch(server_name="0.0.0.0", share=False)