# 閾値診断アプリ Gradio + MCP対応版 import gradio as gr import pandas as pd import numpy as np import json import os # --- ユーティリティ --- def judge_status(value, ll, l, h, hh): if pd.notna(ll) and value < ll: return "LOW-LOW" elif pd.notna(l) and value < l: return "LOW" elif pd.notna(hh) and value > hh: return "HIGH-HIGH" elif pd.notna(h) and value > h: return "HIGH" else: return "OK" def convert_value(v): if hasattr(v, "item"): return v.item() return float(v) if isinstance(v, (np.floating, float)) else int(v) if isinstance(v, (np.integer, int)) else v # --- 診断関数 --- def diagnose_process_range(csv_file, excel_file, process_name, datetime_str, window_minutes): try: df = pd.read_csv(csv_file.name, header=[0, 1, 2]) timestamp_col = df.iloc[:, 0] df = df.drop(df.columns[0], axis=1) df.insert(0, "timestamp", timestamp_col) df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") thresholds_df = pd.read_excel(excel_file.name) thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) for col in ["LL", "L", "H", "HH"]: if col in thresholds_df.columns: thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce") except Exception as e: return None, None, None, f"❌ 入力ファイルの読み込みに失敗しました: {e}", None try: target_time = pd.to_datetime(datetime_str) except Exception: return None, None, None, f"⚠ 入力した日時 {datetime_str} が無効です。", None start_time = target_time - pd.Timedelta(minutes=window_minutes) end_time = target_time df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)] if df_window.empty: return None, None, None, "⚠ 指定した時間幅にデータが見つかりません。", None proc_thresholds = thresholds_df[thresholds_df["ProcessNo_ProcessName"] == process_name] if proc_thresholds.empty: return None, None, None, f"⚠ プロセス {process_name} の閾値が設定されていません。", None all_results = [] for _, row in df_window.iterrows(): for _, thr in proc_thresholds.iterrows(): col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) if col_tuple not in df.columns: continue value = row[col_tuple] status = judge_status(value, thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")) all_results.append({ "ColumnID": thr["ColumnID"], "ItemName": thr["ItemName"], "判定": status, "重要項目": bool(thr.get("Important", False)), "時刻": str(row["timestamp"]) }) total = len(all_results) status_counts = pd.Series([r["判定"] for r in all_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio = (status_counts / total * 100).round(1) result_df_all = pd.DataFrame({ "状態": status_counts.index, "件数": status_counts.values, "割合(%)": status_ratio.values }) important_results = [r for r in all_results if r["重要項目"]] if important_results: total_imp = len(important_results) status_counts_imp = pd.Series([r["判定"] for r in important_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio_imp = (status_counts_imp / total_imp * 100).round(1) result_df_imp = pd.DataFrame({ "状態": status_counts_imp.index, "件数": status_counts_imp.values, "割合(%)": status_ratio_imp.values }) else: result_df_imp = pd.DataFrame(columns=["状態", "件数", "割合(%)"]) status_ratio_imp = pd.Series(dtype=float) result_per_item = [] for item in [r["ItemName"] for r in important_results]: item_results = [r for r in important_results if r["ItemName"] == item] if not item_results: continue total_item = len(item_results) status_counts_item = pd.Series([r["判定"] for r in item_results]).value_counts().reindex( ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 ) status_ratio_item = (status_counts_item / total_item * 100).round(1) for s, c, r in zip(status_counts_item.index, status_counts_item.values, status_ratio_item.values): result_per_item.append({"ItemName": item, "状態": s, "件数": c, "割合(%)": r}) result_df_imp_items = pd.DataFrame(result_per_item) summary = ( f"✅ {process_name} の診断完了({start_time} ~ {end_time})\n" + "[全項目] " + " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio.items()]) + "\n" + "[重要項目全体] " + ( " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio_imp.items()]) if not result_df_imp.empty else "対象データなし" ) ) json_data = { "集計結果": { "全項目割合": {k: convert_value(v) for k, v in status_ratio.to_dict().items()}, "重要項目全体割合": {k: convert_value(v) for k, v in status_ratio_imp.to_dict().items()} if not result_df_imp.empty else {}, "重要項目ごと割合": [ {k: convert_value(v) for k, v in row.items()} for _, row in result_df_imp_items.iterrows() ] } } result_json = json.dumps(json_data, ensure_ascii=False, indent=2) return result_df_all, result_df_imp, result_df_imp_items, summary, result_json # --- Gradio UI --- with gr.Blocks() as demo: gr.Markdown("## 閾値診断アプリ (MCP対応)") with gr.Row(): csv_input = gr.File(label="CSVファイルをアップロード", file_types=[".csv"], type="filepath") excel_input = gr.File(label="Excel閾値ファイルをアップロード", file_types=[".xlsx"], type="filepath") process_name = gr.Textbox(label="プロセス名", value="E018-A012_除害RO") datetime_str = gr.Textbox(label="診断基準日時", value="2025/8/1 1:05") window_minutes = gr.Number(label="さかのぼる時間幅(分)", value=60) run_btn = gr.Button("診断を実行") result_df_all = gr.Dataframe(label="全項目の状態集計結果") result_df_imp = gr.Dataframe(label="重要項目全体の状態集計結果") result_df_imp_items = gr.Dataframe(label="重要項目ごとの状態集計結果") summary_output = gr.Textbox(label="サマリー") json_output = gr.Json(label="JSON集計結果") run_btn.click( diagnose_process_range, inputs=[csv_input, excel_input, process_name, datetime_str, window_minutes], outputs=[result_df_all, result_df_imp, result_df_imp_items, summary_output, json_output] ) if __name__ == "__main__": use_mcp = os.getenv("USE_MCP", "0") == "1" if use_mcp: demo.launch(mcp_server=True) else: demo.launch(server_name="0.0.0.0", share=False)