# 統合版 Gradio アプリ (閾値診断 + 傾向検出 + 予兆解析) import gradio as gr import pandas as pd import numpy as np import json import os from sklearn.linear_model import LinearRegression # --- 共通ユーティリティ --- def convert_value(v): if hasattr(v, "item"): return v.item() if isinstance(v, (np.integer, int)): return int(v) if isinstance(v, (np.floating, float)): return float(v) return v def normalize(s): return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip() def find_matching_column(df, col_id, item_name, process_name): norm_item = normalize(item_name) candidates = [ c for c in df.columns if isinstance(c, str) and col_id in c and process_name in c and norm_item in normalize(c) ] return candidates[0] if candidates else None # --- グローバル変数(全タブで共有) --- df = None thresholds_df = None lag_matrix = None # --- ファイル読み込み --- def load_files(csv_file, excel_file, lag_file): global df, thresholds_df, lag_matrix try: df = pd.read_csv(csv_file.name, header=[0, 1, 2]) timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce") df = df.drop(df.columns[0], axis=1) df.insert(0, "timestamp", timestamp_col) # MultiIndex → 文字列化 def col_to_str(col): return "_".join([str(c) for c in col if c]) if isinstance(col, tuple) else str(col) df.columns = [ "timestamp" if (isinstance(c, str) and c == "timestamp") else col_to_str(c) for c in df.columns ] thresholds_df = pd.read_excel(excel_file.name) thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) for col in ["LL", "L", "H", "HH"]: if col in thresholds_df.columns: thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce") lag_matrix = pd.read_excel(lag_file.name, index_col=0) return "✅ ファイル読み込み成功" except Exception as e: return f"❌ ファイル読み込み失敗: {e}" # --- Tab1: 閾値診断 --- def judge_status(value, ll, l, h, hh): if pd.notna(ll) and value < ll: return "LOW-LOW" elif pd.notna(l) and value < l: return "LOW" elif pd.notna(hh) and value > hh: return "HIGH-HIGH" elif pd.notna(h) and value > h: return "HIGH" else: return "OK" def diagnose_process_range(process_name, datetime_str, window_minutes): global df, thresholds_df if df is None or thresholds_df is None: return None, None, None, "⚠ ファイル未読み込み", None try: target_time = pd.to_datetime(datetime_str) except Exception: return None, None, None, f"⚠ 入力した日時 {datetime_str} が無効です。", None start_time = target_time - pd.Timedelta(minutes=window_minutes) end_time = target_time df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)] if df_window.empty: return None, None, None, "⚠ 指定した時間幅にデータが見つかりません。", None proc_thresholds = thresholds_df[thresholds_df["ProcessNo_ProcessName"] == process_name] if proc_thresholds.empty: return None, None, None, f"⚠ プロセス {process_name} の閾値が設定されていません。", None all_results = [] for _, row in df_window.iterrows(): for _, thr in proc_thresholds.iterrows(): col_name = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}" if col_name not in df.columns: continue value = row[col_name] status = judge_status(value, thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")) all_results.append({ "ColumnID": thr["ColumnID"], "ItemName": thr["ItemName"], "判定": status, "重要項目": bool(thr.get("Important", False)), "時刻": str(row["timestamp"]) }) # --- 全項目集計 --- total = len(all_results) status_counts = pd.Series([r["判定"] for r in all_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio = (status_counts / total * 100).round(1) result_df_all = pd.DataFrame({"状態": status_counts.index, "件数": status_counts.values, "割合(%)": status_ratio.values}) # --- 重要項目全体 --- important_results = [r for r in all_results if r["重要項目"]] if important_results: total_imp = len(important_results) status_counts_imp = pd.Series([r["判定"] for r in important_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio_imp = (status_counts_imp / total_imp * 100).round(1) result_df_imp = pd.DataFrame({"状態": status_counts_imp.index, "件数": status_counts_imp.values, "割合(%)": status_ratio_imp.values}) else: result_df_imp = pd.DataFrame(columns=["状態", "件数", "割合(%)"]) status_ratio_imp = pd.Series(dtype=float) # --- 重要項目ごと --- result_per_item = [] for item in set([r["ItemName"] for r in important_results]): item_results = [r for r in important_results if r["ItemName"] == item] total_item = len(item_results) status_counts_item = pd.Series([r["判定"] for r in item_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio_item = (status_counts_item / total_item * 100).round(1) for s, c, r in zip(status_counts_item.index, status_counts_item.values, status_ratio_item.values): result_per_item.append({"ItemName": item, "状態": s, "件数": int(c), "割合(%)": float(r)}) result_df_imp_items = pd.DataFrame(result_per_item) # --- サマリー --- summary = ( f"✅ {process_name} の診断完了({start_time} ~ {end_time})\n" + "[全項目] " + " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio.items()]) + "\n" + "[重要項目全体] " + (" / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio_imp.items()]) if not result_df_imp.empty else "対象データなし") ) # --- JSON --- json_data = { "集計結果": { "全項目割合": {k: float(v) for k, v in status_ratio.to_dict().items()}, "重要項目全体割合": {k: float(v) for k, v in status_ratio_imp.to_dict().items()} if not result_df_imp.empty else {}, "重要項目ごと割合": [dict(row) for _, row in result_df_imp_items.iterrows()] } } result_json = json.dumps(json_data, ensure_ascii=False, indent=2) return result_df_all, result_df_imp, result_df_imp_items, summary, result_json # --- Tab2: 傾向検出 --- def detect_trends_with_forecast(process_name, datetime_str, window_minutes, forecast_minutes, downsample_factor): global df, thresholds_df if df is None or thresholds_df is None: return None, "⚠ ファイル未読み込み", None target_time = pd.to_datetime(datetime_str) start_time = target_time - pd.Timedelta(minutes=window_minutes) df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= target_time)] if df_window.empty: return None, "⚠ データなし", None # サンプリング間隔を推定 interval = df_window["timestamp"].diff().median() if pd.isna(interval): return None, "⚠ サンプリング間隔検出失敗", None interval_minutes = interval.total_seconds() / 60 # --- 粗化処理 --- df_window = df_window.iloc[::int(downsample_factor), :].reset_index(drop=True) effective_interval = interval_minutes * int(downsample_factor) proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & (thresholds_df["Important"] == True)] if proc_thresholds.empty: return None, f"⚠ {process_name} の重要項目なし", None results = [] for _, thr in proc_thresholds.iterrows(): col_tuple = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}" if col_tuple not in df.columns: continue series = df_window[col_tuple].dropna() if len(series) < 3: continue x = np.arange(len(series)).reshape(-1, 1) y = series.values.reshape(-1, 1) model = LinearRegression().fit(x, y) slope = model.coef_[0][0] last_val = series.iloc[-1] forecast_steps = int(forecast_minutes / effective_interval) forecast_val = model.predict([[len(series) + forecast_steps]])[0][0] forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) risk = "安定" if pd.notna(thr.get("LL")) and forecast_val <= thr["LL"]: risk = "LL逸脱予測" elif pd.notna(thr.get("HH")) and forecast_val >= thr["HH"]: risk = "HH逸脱予測" results.append({ "ItemName": thr["ItemName"], "傾き": round(float(slope), 4), "最終値": round(float(last_val), 3), "予測値": round(float(forecast_val), 3), "予測時刻": str(forecast_time), "予測リスク": risk, "粗化間隔(分)": round(effective_interval, 2) }) result_df = pd.DataFrame(results) result_json = json.dumps(results, ensure_ascii=False, indent=2) return result_df, "✅ 傾向検出完了", result_json # --- Tab3: 予兆解析 --- def forecast_process_with_lag(process_name, datetime_str, forecast_minutes, downsample_factor=3): global df, thresholds_df, lag_matrix if df is None or thresholds_df is None or lag_matrix is None: return None, "⚠ ファイル未読み込み", None target_time = pd.to_datetime(datetime_str) forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) proc_thresholds = thresholds_df[ (thresholds_df["ProcessNo_ProcessName"] == process_name) & (thresholds_df["Important"] == True) ] if proc_thresholds.empty: return None, f"⚠ {process_name} の重要項目なし", None if process_name not in lag_matrix.index: return None, f"⚠ {process_name} のラグ行なし", None lag_row = lag_matrix.loc[process_name].dropna() lag_row = lag_row[lag_row > 0] # 正のラグのみ if lag_row.empty: return None, f"⚠ {process_name} に正のラグなし", None results = [] for _, thr in proc_thresholds.iterrows(): y_col = find_matching_column(df, thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) if y_col is None: continue # 学習データ(直近24h) df_window = df[(df["timestamp"] >= target_time - pd.Timedelta(hours=24)) & (df["timestamp"] <= target_time)].copy() if df_window.empty: continue base_df = df_window[["timestamp", y_col]].rename(columns={y_col: "y"}) # === 粗化処理 === dt = df_window["timestamp"].diff().median() if pd.notna(dt): base_min = max(int(dt.total_seconds() // 60), 1) else: base_min = 1 new_interval = max(base_min * int(downsample_factor), 1) df_down = (base_df.set_index("timestamp") .resample(f"{new_interval}T").mean() .dropna() .reset_index()) merged_df = df_down.copy() for up_proc, lag_min in lag_row.items(): up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c] for x_col in up_cols: shifted = df_window.loc[:, ["timestamp", x_col]].copy() shifted["timestamp"] = shifted["timestamp"] + pd.Timedelta(minutes=lag_min) shifted = shifted.rename(columns={x_col: f"{x_col}_lag{lag_min}"}) merged_df = pd.merge_asof( merged_df.sort_values("timestamp"), shifted.sort_values("timestamp"), on="timestamp", direction="nearest" ) X_all = merged_df.drop(columns=["timestamp", "y"], errors="ignore").values Y_all = merged_df["y"].values if X_all.shape[1] == 0 or len(Y_all) < 5: continue # 重回帰 model = LinearRegression().fit(X_all, Y_all) # 未来予測 X_pred = [] for up_proc, lag_min in lag_row.items(): up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c] for x_col in up_cols: ref_time = forecast_time - pd.Timedelta(minutes=lag_min) idx = (df["timestamp"] - ref_time).abs().idxmin() X_pred.append(df.loc[idx, x_col]) if not X_pred: continue pred_val = model.predict([X_pred])[0] # 閾値リスク判定 ll, l, h, hh = thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH") risk = "OK" if pd.notna(ll) and pred_val <= ll: risk = "LOW-LOW" elif pd.notna(l) and pred_val <= l: risk = "LOW" elif pd.notna(hh) and pred_val >= hh: risk = "HIGH-HIGH" elif pd.notna(h) and pred_val >= h: risk = "HIGH" results.append({ "ItemName": thr["ItemName"], "予測値": round(float(pred_val), 3), "予測時刻": str(forecast_time), "予測リスク": risk, "粗化間隔(分)": new_interval }) result_df = pd.DataFrame(results) result_json = json.dumps(results, ensure_ascii=False, indent=2) return result_df, f"✅ {process_name} の予兆解析完了", result_json # --- Gradio UI --- with gr.Blocks(css=".gradio-container {overflow: auto !important;}") as demo: gr.Markdown("## 統合トレンド解析アプリ (MCP対応)") with gr.Row(): csv_input = gr.File(label="CSVファイル", file_types=[".csv"], type="filepath") excel_input = gr.File(label="Excel閾値ファイル", file_types=[".xlsx"], type="filepath") lag_input = gr.File(label="ラグファイル", file_types=[".xlsx"], type="filepath") load_btn = gr.Button("ファイル読み込み") load_status = gr.Textbox(label="読み込み結果") with gr.Tabs(): with gr.Tab("閾値診断"): process_name1 = gr.Textbox(label="プロセス名") datetime_str1 = gr.Textbox(label="診断基準日時") window_minutes1 = gr.Number(label="さかのぼる時間幅(分)", value=60) run_btn1 = gr.Button("診断実行") result_df_all = gr.Dataframe(label="全項目の状態集計") result_df_imp = gr.Dataframe(label="重要項目全体の状態集計") result_df_imp_items = gr.Dataframe(label="重要項目ごとの状態集計") summary_output = gr.Textbox(label="サマリー") json_output = gr.Json(label="JSON集計結果") run_btn1.click( diagnose_process_range, inputs=[process_name1, datetime_str1, window_minutes1], outputs=[result_df_all, result_df_imp, result_df_imp_items, summary_output, json_output] ) with gr.Tab("傾向検出"): process_name2 = gr.Textbox(label="プロセス名") datetime_str2 = gr.Textbox(label="基準日時") window_minutes2 = gr.Number(label="過去の時間幅(分)", value=60) forecast_minutes2 = gr.Number(label="未来予測時間幅(分)", value=60) downsample_factor2 = gr.Slider(label="粗化倍率", minimum=1, maximum=10, step=1, value=3) run_btn2 = gr.Button("傾向検出実行") result_df2 = gr.Dataframe(label="傾向+予測結果") summary_output2 = gr.Textbox(label="サマリー") json_output2 = gr.Json(label="JSON結果") run_btn2.click( detect_trends_with_forecast, inputs=[process_name2, datetime_str2, window_minutes2, forecast_minutes2, downsample_factor2], outputs=[result_df2, summary_output2, json_output2] ) with gr.Tab("予兆解析"): process_name3 = gr.Textbox(label="プロセス名") datetime_str3 = gr.Textbox(label="基準日時") forecast_minutes3 = gr.Number(label="未来予測時間幅(分)", value=60) downsample_factor3 = gr.Slider(1, 10, value=3, step=1, label="粗化倍率(サンプリング間引き)") run_btn3 = gr.Button("予兆解析実行") result_df3 = gr.Dataframe(label="予兆解析結果") summary_output3 = gr.Textbox(label="サマリー") json_output3 = gr.Json(label="JSON結果") run_btn3.click( forecast_process_with_lag, inputs=[process_name3, datetime_str3, forecast_minutes3, downsample_factor3], outputs=[result_df3, summary_output3, json_output3] ) load_btn.click(load_files, inputs=[csv_input, excel_input, lag_input], outputs=[load_status]) if __name__ == "__main__": use_mcp = os.getenv("USE_MCP", "0") == "1" if use_mcp: demo.launch(mcp_server=True) else: demo.launch(server_name="0.0.0.0", share=False)