Enhance detect_trends_with_forecast function by adding downsample_factor parameter for improved data handling and forecasting accuracy. Adjusted interval calculations and updated Gradio interface to allow user input for downsampling. Improved output structure to include effective interval in results.
741c22e | # 統合版 Gradio アプリ (閾値診断 + 傾向検出 + 予兆解析) | |
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| import json | |
| import os | |
| from sklearn.linear_model import LinearRegression | |
| # --- 共通ユーティリティ --- | |
| def convert_value(v): | |
| if hasattr(v, "item"): | |
| return v.item() | |
| if isinstance(v, (np.integer, int)): | |
| return int(v) | |
| if isinstance(v, (np.floating, float)): | |
| return float(v) | |
| return v | |
| def normalize(s): | |
| return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip() | |
| def find_matching_column(df, col_id, item_name, process_name): | |
| norm_item = normalize(item_name) | |
| candidates = [ | |
| c for c in df.columns | |
| if isinstance(c, str) | |
| and col_id in c | |
| and process_name in c | |
| and norm_item in normalize(c) | |
| ] | |
| return candidates[0] if candidates else None | |
| # --- グローバル変数(全タブで共有) --- | |
| df = None | |
| thresholds_df = None | |
| lag_matrix = None | |
| # --- ファイル読み込み --- | |
| def load_files(csv_file, excel_file, lag_file): | |
| global df, thresholds_df, lag_matrix | |
| try: | |
| df = pd.read_csv(csv_file.name, header=[0, 1, 2]) | |
| timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce") | |
| df = df.drop(df.columns[0], axis=1) | |
| df.insert(0, "timestamp", timestamp_col) | |
| # MultiIndex → 文字列化 | |
| def col_to_str(col): | |
| return "_".join([str(c) for c in col if c]) if isinstance(col, tuple) else str(col) | |
| df.columns = [ | |
| "timestamp" if (isinstance(c, str) and c == "timestamp") else col_to_str(c) | |
| for c in df.columns | |
| ] | |
| thresholds_df = pd.read_excel(excel_file.name) | |
| thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) | |
| for col in ["LL", "L", "H", "HH"]: | |
| if col in thresholds_df.columns: | |
| thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce") | |
| lag_matrix = pd.read_excel(lag_file.name, index_col=0) | |
| return "✅ ファイル読み込み成功" | |
| except Exception as e: | |
| return f"❌ ファイル読み込み失敗: {e}" | |
| # --- Tab1: 閾値診断 --- | |
| def judge_status(value, ll, l, h, hh): | |
| if pd.notna(ll) and value < ll: | |
| return "LOW-LOW" | |
| elif pd.notna(l) and value < l: | |
| return "LOW" | |
| elif pd.notna(hh) and value > hh: | |
| return "HIGH-HIGH" | |
| elif pd.notna(h) and value > h: | |
| return "HIGH" | |
| else: | |
| return "OK" | |
| def diagnose_process_range(process_name, datetime_str, window_minutes): | |
| global df, thresholds_df | |
| if df is None or thresholds_df is None: | |
| return None, None, None, "⚠ ファイル未読み込み", None | |
| try: | |
| target_time = pd.to_datetime(datetime_str) | |
| except Exception: | |
| return None, None, None, f"⚠ 入力した日時 {datetime_str} が無効です。", None | |
| start_time = target_time - pd.Timedelta(minutes=window_minutes) | |
| end_time = target_time | |
| df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)] | |
| if df_window.empty: | |
| return None, None, None, "⚠ 指定した時間幅にデータが見つかりません。", None | |
| proc_thresholds = thresholds_df[thresholds_df["ProcessNo_ProcessName"] == process_name] | |
| if proc_thresholds.empty: | |
| return None, None, None, f"⚠ プロセス {process_name} の閾値が設定されていません。", None | |
| all_results = [] | |
| for _, row in df_window.iterrows(): | |
| for _, thr in proc_thresholds.iterrows(): | |
| col_name = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}" | |
| if col_name not in df.columns: | |
| continue | |
| value = row[col_name] | |
| status = judge_status(value, thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")) | |
| all_results.append({ | |
| "ColumnID": thr["ColumnID"], | |
| "ItemName": thr["ItemName"], | |
| "判定": status, | |
| "重要項目": bool(thr.get("Important", False)), | |
| "時刻": str(row["timestamp"]) | |
| }) | |
| # --- 全項目集計 --- | |
| total = len(all_results) | |
| status_counts = pd.Series([r["判定"] for r in all_results]).value_counts().reindex( | |
| ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 | |
| ) | |
| status_ratio = (status_counts / total * 100).round(1) | |
| result_df_all = pd.DataFrame({"状態": status_counts.index, "件数": status_counts.values, "割合(%)": status_ratio.values}) | |
| # --- 重要項目全体 --- | |
| important_results = [r for r in all_results if r["重要項目"]] | |
| if important_results: | |
| total_imp = len(important_results) | |
| status_counts_imp = pd.Series([r["判定"] for r in important_results]).value_counts().reindex( | |
| ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 | |
| ) | |
| status_ratio_imp = (status_counts_imp / total_imp * 100).round(1) | |
| result_df_imp = pd.DataFrame({"状態": status_counts_imp.index, "件数": status_counts_imp.values, "割合(%)": status_ratio_imp.values}) | |
| else: | |
| result_df_imp = pd.DataFrame(columns=["状態", "件数", "割合(%)"]) | |
| status_ratio_imp = pd.Series(dtype=float) | |
| # --- 重要項目ごと --- | |
| result_per_item = [] | |
| for item in set([r["ItemName"] for r in important_results]): | |
| item_results = [r for r in important_results if r["ItemName"] == item] | |
| total_item = len(item_results) | |
| status_counts_item = pd.Series([r["判定"] for r in item_results]).value_counts().reindex( | |
| ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 | |
| ) | |
| status_ratio_item = (status_counts_item / total_item * 100).round(1) | |
| for s, c, r in zip(status_counts_item.index, status_counts_item.values, status_ratio_item.values): | |
| result_per_item.append({"ItemName": item, "状態": s, "件数": int(c), "割合(%)": float(r)}) | |
| result_df_imp_items = pd.DataFrame(result_per_item) | |
| # --- サマリー --- | |
| summary = ( | |
| f"✅ {process_name} の診断完了({start_time} ~ {end_time})\n" | |
| + "[全項目] " + " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio.items()]) + "\n" | |
| + "[重要項目全体] " + (" / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio_imp.items()]) if not result_df_imp.empty else "対象データなし") | |
| ) | |
| # --- JSON --- | |
| json_data = { | |
| "集計結果": { | |
| "全項目割合": {k: float(v) for k, v in status_ratio.to_dict().items()}, | |
| "重要項目全体割合": {k: float(v) for k, v in status_ratio_imp.to_dict().items()} if not result_df_imp.empty else {}, | |
| "重要項目ごと割合": [dict(row) for _, row in result_df_imp_items.iterrows()] | |
| } | |
| } | |
| result_json = json.dumps(json_data, ensure_ascii=False, indent=2) | |
| return result_df_all, result_df_imp, result_df_imp_items, summary, result_json | |
| # --- Tab2: 傾向検出 --- | |
| def detect_trends_with_forecast(process_name, datetime_str, window_minutes, forecast_minutes, downsample_factor): | |
| global df, thresholds_df | |
| if df is None or thresholds_df is None: | |
| return None, "⚠ ファイル未読み込み", None | |
| target_time = pd.to_datetime(datetime_str) | |
| start_time = target_time - pd.Timedelta(minutes=window_minutes) | |
| df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= target_time)] | |
| if df_window.empty: | |
| return None, "⚠ データなし", None | |
| # サンプリング間隔を推定 | |
| interval = df_window["timestamp"].diff().median() | |
| if pd.isna(interval): | |
| return None, "⚠ サンプリング間隔検出失敗", None | |
| interval_minutes = interval.total_seconds() / 60 | |
| # --- 粗化処理 --- | |
| df_window = df_window.iloc[::int(downsample_factor), :].reset_index(drop=True) | |
| effective_interval = interval_minutes * int(downsample_factor) | |
| proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & | |
| (thresholds_df["Important"] == True)] | |
| if proc_thresholds.empty: | |
| return None, f"⚠ {process_name} の重要項目なし", None | |
| results = [] | |
| for _, thr in proc_thresholds.iterrows(): | |
| col_tuple = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}" | |
| if col_tuple not in df.columns: | |
| continue | |
| series = df_window[col_tuple].dropna() | |
| if len(series) < 3: | |
| continue | |
| x = np.arange(len(series)).reshape(-1, 1) | |
| y = series.values.reshape(-1, 1) | |
| model = LinearRegression().fit(x, y) | |
| slope = model.coef_[0][0] | |
| last_val = series.iloc[-1] | |
| forecast_steps = int(forecast_minutes / effective_interval) | |
| forecast_val = model.predict([[len(series) + forecast_steps]])[0][0] | |
| forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) | |
| risk = "安定" | |
| if pd.notna(thr.get("LL")) and forecast_val <= thr["LL"]: | |
| risk = "LL逸脱予測" | |
| elif pd.notna(thr.get("HH")) and forecast_val >= thr["HH"]: | |
| risk = "HH逸脱予測" | |
| results.append({ | |
| "ItemName": thr["ItemName"], | |
| "傾き": round(float(slope), 4), | |
| "最終値": round(float(last_val), 3), | |
| "予測値": round(float(forecast_val), 3), | |
| "予測時刻": str(forecast_time), | |
| "予測リスク": risk, | |
| "粗化間隔(分)": round(effective_interval, 2) | |
| }) | |
| result_df = pd.DataFrame(results) | |
| result_json = json.dumps(results, ensure_ascii=False, indent=2) | |
| return result_df, "✅ 傾向検出完了", result_json | |
| # --- Tab3: 予兆解析 --- | |
| def forecast_process_with_lag(process_name, datetime_str, forecast_minutes, downsample_factor=3): | |
| global df, thresholds_df, lag_matrix | |
| if df is None or thresholds_df is None or lag_matrix is None: | |
| return None, "⚠ ファイル未読み込み", None | |
| target_time = pd.to_datetime(datetime_str) | |
| forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) | |
| proc_thresholds = thresholds_df[ | |
| (thresholds_df["ProcessNo_ProcessName"] == process_name) & | |
| (thresholds_df["Important"] == True) | |
| ] | |
| if proc_thresholds.empty: | |
| return None, f"⚠ {process_name} の重要項目なし", None | |
| if process_name not in lag_matrix.index: | |
| return None, f"⚠ {process_name} のラグ行なし", None | |
| lag_row = lag_matrix.loc[process_name].dropna() | |
| lag_row = lag_row[lag_row > 0] # 正のラグのみ | |
| if lag_row.empty: | |
| return None, f"⚠ {process_name} に正のラグなし", None | |
| results = [] | |
| for _, thr in proc_thresholds.iterrows(): | |
| y_col = find_matching_column(df, thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) | |
| if y_col is None: | |
| continue | |
| # 学習データ(直近24h) | |
| df_window = df[(df["timestamp"] >= target_time - pd.Timedelta(hours=24)) & | |
| (df["timestamp"] <= target_time)].copy() | |
| if df_window.empty: | |
| continue | |
| base_df = df_window[["timestamp", y_col]].rename(columns={y_col: "y"}) | |
| # === 粗化処理 === | |
| dt = df_window["timestamp"].diff().median() | |
| if pd.notna(dt): | |
| base_min = max(int(dt.total_seconds() // 60), 1) | |
| else: | |
| base_min = 1 | |
| new_interval = max(base_min * int(downsample_factor), 1) | |
| df_down = (base_df.set_index("timestamp") | |
| .resample(f"{new_interval}T").mean() | |
| .dropna() | |
| .reset_index()) | |
| merged_df = df_down.copy() | |
| for up_proc, lag_min in lag_row.items(): | |
| up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c] | |
| for x_col in up_cols: | |
| shifted = df_window.loc[:, ["timestamp", x_col]].copy() | |
| shifted["timestamp"] = shifted["timestamp"] + pd.Timedelta(minutes=lag_min) | |
| shifted = shifted.rename(columns={x_col: f"{x_col}_lag{lag_min}"}) | |
| merged_df = pd.merge_asof( | |
| merged_df.sort_values("timestamp"), | |
| shifted.sort_values("timestamp"), | |
| on="timestamp", | |
| direction="nearest" | |
| ) | |
| X_all = merged_df.drop(columns=["timestamp", "y"], errors="ignore").values | |
| Y_all = merged_df["y"].values | |
| if X_all.shape[1] == 0 or len(Y_all) < 5: | |
| continue | |
| # 重回帰 | |
| model = LinearRegression().fit(X_all, Y_all) | |
| # 未来予測 | |
| X_pred = [] | |
| for up_proc, lag_min in lag_row.items(): | |
| up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c] | |
| for x_col in up_cols: | |
| ref_time = forecast_time - pd.Timedelta(minutes=lag_min) | |
| idx = (df["timestamp"] - ref_time).abs().idxmin() | |
| X_pred.append(df.loc[idx, x_col]) | |
| if not X_pred: | |
| continue | |
| pred_val = model.predict([X_pred])[0] | |
| # 閾値リスク判定 | |
| ll, l, h, hh = thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH") | |
| risk = "OK" | |
| if pd.notna(ll) and pred_val <= ll: | |
| risk = "LOW-LOW" | |
| elif pd.notna(l) and pred_val <= l: | |
| risk = "LOW" | |
| elif pd.notna(hh) and pred_val >= hh: | |
| risk = "HIGH-HIGH" | |
| elif pd.notna(h) and pred_val >= h: | |
| risk = "HIGH" | |
| results.append({ | |
| "ItemName": thr["ItemName"], | |
| "予測値": round(float(pred_val), 3), | |
| "予測時刻": str(forecast_time), | |
| "予測リスク": risk, | |
| "粗化間隔(分)": new_interval | |
| }) | |
| result_df = pd.DataFrame(results) | |
| result_json = json.dumps(results, ensure_ascii=False, indent=2) | |
| return result_df, f"✅ {process_name} の予兆解析完了", result_json | |
| # --- Gradio UI --- | |
| with gr.Blocks(css=".gradio-container {overflow: auto !important;}") as demo: | |
| gr.Markdown("## 統合トレンド解析アプリ (MCP対応)") | |
| with gr.Row(): | |
| csv_input = gr.File(label="CSVファイル", file_types=[".csv"], type="filepath") | |
| excel_input = gr.File(label="Excel閾値ファイル", file_types=[".xlsx"], type="filepath") | |
| lag_input = gr.File(label="ラグファイル", file_types=[".xlsx"], type="filepath") | |
| load_btn = gr.Button("ファイル読み込み") | |
| load_status = gr.Textbox(label="読み込み結果") | |
| with gr.Tabs(): | |
| with gr.Tab("閾値診断"): | |
| process_name1 = gr.Textbox(label="プロセス名") | |
| datetime_str1 = gr.Textbox(label="診断基準日時") | |
| window_minutes1 = gr.Number(label="さかのぼる時間幅(分)", value=60) | |
| run_btn1 = gr.Button("診断実行") | |
| result_df_all = gr.Dataframe(label="全項目の状態集計") | |
| result_df_imp = gr.Dataframe(label="重要項目全体の状態集計") | |
| result_df_imp_items = gr.Dataframe(label="重要項目ごとの状態集計") | |
| summary_output = gr.Textbox(label="サマリー") | |
| json_output = gr.Json(label="JSON集計結果") | |
| run_btn1.click( | |
| diagnose_process_range, | |
| inputs=[process_name1, datetime_str1, window_minutes1], | |
| outputs=[result_df_all, result_df_imp, result_df_imp_items, summary_output, json_output] | |
| ) | |
| with gr.Tab("傾向検出"): | |
| process_name2 = gr.Textbox(label="プロセス名") | |
| datetime_str2 = gr.Textbox(label="基準日時") | |
| window_minutes2 = gr.Number(label="過去の時間幅(分)", value=60) | |
| forecast_minutes2 = gr.Number(label="未来予測時間幅(分)", value=60) | |
| downsample_factor2 = gr.Slider(label="粗化倍率", minimum=1, maximum=10, step=1, value=3) | |
| run_btn2 = gr.Button("傾向検出実行") | |
| result_df2 = gr.Dataframe(label="傾向+予測結果") | |
| summary_output2 = gr.Textbox(label="サマリー") | |
| json_output2 = gr.Json(label="JSON結果") | |
| run_btn2.click( | |
| detect_trends_with_forecast, | |
| inputs=[process_name2, datetime_str2, window_minutes2, forecast_minutes2, downsample_factor2], | |
| outputs=[result_df2, summary_output2, json_output2] | |
| ) | |
| with gr.Tab("予兆解析"): | |
| process_name3 = gr.Textbox(label="プロセス名") | |
| datetime_str3 = gr.Textbox(label="基準日時") | |
| forecast_minutes3 = gr.Number(label="未来予測時間幅(分)", value=60) | |
| downsample_factor3 = gr.Slider(1, 10, value=3, step=1, label="粗化倍率(サンプリング間引き)") | |
| run_btn3 = gr.Button("予兆解析実行") | |
| result_df3 = gr.Dataframe(label="予兆解析結果") | |
| summary_output3 = gr.Textbox(label="サマリー") | |
| json_output3 = gr.Json(label="JSON結果") | |
| run_btn3.click( | |
| forecast_process_with_lag, | |
| inputs=[process_name3, datetime_str3, forecast_minutes3, downsample_factor3], | |
| outputs=[result_df3, summary_output3, json_output3] | |
| ) | |
| load_btn.click(load_files, inputs=[csv_input, excel_input, lag_input], outputs=[load_status]) | |
| if __name__ == "__main__": | |
| use_mcp = os.getenv("USE_MCP", "0") == "1" | |
| if use_mcp: | |
| demo.launch(mcp_server=True) | |
| else: | |
| demo.launch(server_name="0.0.0.0", share=False) | |