# 傾向検出アプリ(Gradio版・MCP対応) import gradio as gr import pandas as pd import numpy as np from sklearn.linear_model import LinearRegression import json import tempfile import os # --- 傾向検出+未来予測関数 --- def detect_trends_with_forecast(process_name, datetime_str, window_minutes, forecast_minutes, csv_file, excel_file): try: csv_path = csv_file excel_path = excel_file # CSV読み込み(3行ヘッダー) df = pd.read_csv(csv_path, header=[0, 1, 2]) timestamp_col = df.iloc[:, 0] df = df.drop(df.columns[0], axis=1) df.insert(0, "timestamp", timestamp_col) df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") # 閾値テーブル thresholds_df = pd.read_excel(excel_path) thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) for col in ["LL", "L", "H", "HH"]: if col in thresholds_df.columns: thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce") # --- 時間範囲 --- target_time = pd.to_datetime(datetime_str) start_time = target_time - pd.Timedelta(minutes=window_minutes) end_time = target_time df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)] if df_window.empty: return None, "⚠ 指定時間幅にデータなし", None # サンプリング間隔 interval = df_window["timestamp"].diff().median() if pd.isna(interval) or interval == pd.Timedelta(0): return None, "⚠ サンプリング間隔を検出できません", None interval_minutes = interval.total_seconds() / 60 # 閾値:対象プロセスかつ重要項目 proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & (thresholds_df["Important"] == True)] if proc_thresholds.empty: return None, f"⚠ プロセス {process_name} の重要項目なし", None results = [] for _, thr in proc_thresholds.iterrows(): col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) if col_tuple not in df.columns: continue series = df_window[col_tuple].dropna() if len(series) < 3: continue # 線形回帰 x = np.arange(len(series)).reshape(-1, 1) y = series.values.reshape(-1, 1) model = LinearRegression().fit(x, y) slope = model.coef_[0][0] last_val = series.iloc[-1] n = len(series) # 未来予測 forecast_steps = max(1, int(round(forecast_minutes / interval_minutes))) forecast_index = n + forecast_steps forecast_val = model.predict([[forecast_index]])[0][0] forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) l, ll, h, hh = thr.get("L"), thr.get("LL"), thr.get("H"), thr.get("HH") # 現在の傾向 status = "安定" if slope < 0 and pd.notna(ll): if last_val > ll: status = "LL接近下降傾向" elif last_val <= ll: status = "LL逸脱下降傾向" if slope > 0 and pd.notna(hh): if last_val < hh: status = "HH接近上昇傾向" elif last_val >= hh: status = "HH逸脱上昇傾向" # 未来予測の逸脱 forecast_status = "安定" if pd.notna(ll) and forecast_val <= ll: forecast_status = "LL逸脱予測" elif pd.notna(hh) and forecast_val >= hh: forecast_status = "HH逸脱予測" results.append({ "ItemName": thr["ItemName"], "傾向": status, "傾き": float(round(slope, 4)), "最終値": float(round(float(last_val), 3)) if pd.notna(last_val) else None, "予測値": float(round(float(forecast_val), 3)), "予測時刻": str(forecast_time), "予測傾向": forecast_status, "サンプリング間隔(分)": float(interval_minutes), "LL": None if pd.isna(ll) else float(ll), "L": None if pd.isna(l) else float(l), "H": None if pd.isna(h) else float(h), "HH": None if pd.isna(hh) else float(hh) }) result_df = pd.DataFrame(results) # JSON保存(ファイルパスを返す) result_json = json.dumps(results, ensure_ascii=False, indent=2) fd, tmp_path = tempfile.mkstemp(suffix=".json", prefix="trend_result_") os.close(fd) with open(tmp_path, "w", encoding="utf-8") as f: f.write(result_json) summary = f"✅ {process_name} の傾向検出+未来予測完了({start_time}~{end_time}, 予測+{forecast_minutes}分)" return result_df, summary, tmp_path except Exception as e: return None, f"❌ エラー: {str(e)}", None # --- Gradio UI --- with gr.Blocks( css=".gradio-container {overflow-y: auto !important; height: 100vh;}" ) as demo: gr.Markdown("## 傾向検出アプリ(重要項目ごとの詳細+閾値予測)") with gr.Row(): csv_input = gr.File(label="CSVファイルをアップロード", type="filepath") excel_input = gr.File(label="閾値テーブル (Excel)", type="filepath") with gr.Row(): process_name = gr.Textbox(label="プロセス名", placeholder="例: E018-A012_除害RO") datetime_str = gr.Textbox(label="日時 (例: 2025/8/1 1:05)") with gr.Row(): window_minutes = gr.Number(label="参照時間幅(分)", value=60) forecast_minutes = gr.Number(label="未来予測時間幅(分)", value=60) run_btn = gr.Button("実行") with gr.Row(): result_table = gr.Dataframe(label="傾向+未来予測結果", wrap=True) summary_output = gr.Textbox(label="サマリー") json_download = gr.File(label="JSON結果ダウンロード") run_btn.click( fn=detect_trends_with_forecast, inputs=[process_name, datetime_str, window_minutes, forecast_minutes, csv_input, excel_input], outputs=[result_table, summary_output, json_download] ) if __name__ == "__main__": # Web UI と MCP を同時に有効化 demo.launch( server_name="0.0.0.0", mcp_server=True, # MCP サーバー起動 share=False, # Hugging Face なら不要 ssr_mode=False # SSR は無効に )