Ken-INOUE's picture
Enhance detect_trends_with_forecast function by adding downsample_factor parameter for improved data handling and forecasting accuracy. Adjusted interval calculations and updated Gradio interface to allow user input for downsampling. Improved output structure to include effective interval in results.
741c22e
# 統合版 Gradio アプリ (閾値診断 + 傾向検出 + 予兆解析)
import gradio as gr
import pandas as pd
import numpy as np
import json
import os
from sklearn.linear_model import LinearRegression
# --- 共通ユーティリティ ---
def convert_value(v):
if hasattr(v, "item"):
return v.item()
if isinstance(v, (np.integer, int)):
return int(v)
if isinstance(v, (np.floating, float)):
return float(v)
return v
def normalize(s):
return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip()
def find_matching_column(df, col_id, item_name, process_name):
norm_item = normalize(item_name)
candidates = [
c for c in df.columns
if isinstance(c, str)
and col_id in c
and process_name in c
and norm_item in normalize(c)
]
return candidates[0] if candidates else None
# --- グローバル変数(全タブで共有) ---
df = None
thresholds_df = None
lag_matrix = None
# --- ファイル読み込み ---
def load_files(csv_file, excel_file, lag_file):
global df, thresholds_df, lag_matrix
try:
df = pd.read_csv(csv_file.name, header=[0, 1, 2])
timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce")
df = df.drop(df.columns[0], axis=1)
df.insert(0, "timestamp", timestamp_col)
# MultiIndex → 文字列化
def col_to_str(col):
return "_".join([str(c) for c in col if c]) if isinstance(col, tuple) else str(col)
df.columns = [
"timestamp" if (isinstance(c, str) and c == "timestamp") else col_to_str(c)
for c in df.columns
]
thresholds_df = pd.read_excel(excel_file.name)
thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False})
for col in ["LL", "L", "H", "HH"]:
if col in thresholds_df.columns:
thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce")
lag_matrix = pd.read_excel(lag_file.name, index_col=0)
return "✅ ファイル読み込み成功"
except Exception as e:
return f"❌ ファイル読み込み失敗: {e}"
# --- Tab1: 閾値診断 ---
def judge_status(value, ll, l, h, hh):
if pd.notna(ll) and value < ll:
return "LOW-LOW"
elif pd.notna(l) and value < l:
return "LOW"
elif pd.notna(hh) and value > hh:
return "HIGH-HIGH"
elif pd.notna(h) and value > h:
return "HIGH"
else:
return "OK"
def diagnose_process_range(process_name, datetime_str, window_minutes):
global df, thresholds_df
if df is None or thresholds_df is None:
return None, None, None, "⚠ ファイル未読み込み", None
try:
target_time = pd.to_datetime(datetime_str)
except Exception:
return None, None, None, f"⚠ 入力した日時 {datetime_str} が無効です。", None
start_time = target_time - pd.Timedelta(minutes=window_minutes)
end_time = target_time
df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)]
if df_window.empty:
return None, None, None, "⚠ 指定した時間幅にデータが見つかりません。", None
proc_thresholds = thresholds_df[thresholds_df["ProcessNo_ProcessName"] == process_name]
if proc_thresholds.empty:
return None, None, None, f"⚠ プロセス {process_name} の閾値が設定されていません。", None
all_results = []
for _, row in df_window.iterrows():
for _, thr in proc_thresholds.iterrows():
col_name = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}"
if col_name not in df.columns:
continue
value = row[col_name]
status = judge_status(value, thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH"))
all_results.append({
"ColumnID": thr["ColumnID"],
"ItemName": thr["ItemName"],
"判定": status,
"重要項目": bool(thr.get("Important", False)),
"時刻": str(row["timestamp"])
})
# --- 全項目集計 ---
total = len(all_results)
status_counts = pd.Series([r["判定"] for r in all_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio = (status_counts / total * 100).round(1)
result_df_all = pd.DataFrame({"状態": status_counts.index, "件数": status_counts.values, "割合(%)": status_ratio.values})
# --- 重要項目全体 ---
important_results = [r for r in all_results if r["重要項目"]]
if important_results:
total_imp = len(important_results)
status_counts_imp = pd.Series([r["判定"] for r in important_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio_imp = (status_counts_imp / total_imp * 100).round(1)
result_df_imp = pd.DataFrame({"状態": status_counts_imp.index, "件数": status_counts_imp.values, "割合(%)": status_ratio_imp.values})
else:
result_df_imp = pd.DataFrame(columns=["状態", "件数", "割合(%)"])
status_ratio_imp = pd.Series(dtype=float)
# --- 重要項目ごと ---
result_per_item = []
for item in set([r["ItemName"] for r in important_results]):
item_results = [r for r in important_results if r["ItemName"] == item]
total_item = len(item_results)
status_counts_item = pd.Series([r["判定"] for r in item_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio_item = (status_counts_item / total_item * 100).round(1)
for s, c, r in zip(status_counts_item.index, status_counts_item.values, status_ratio_item.values):
result_per_item.append({"ItemName": item, "状態": s, "件数": int(c), "割合(%)": float(r)})
result_df_imp_items = pd.DataFrame(result_per_item)
# --- サマリー ---
summary = (
f"✅ {process_name} の診断完了({start_time}{end_time})\n"
+ "[全項目] " + " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio.items()]) + "\n"
+ "[重要項目全体] " + (" / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio_imp.items()]) if not result_df_imp.empty else "対象データなし")
)
# --- JSON ---
json_data = {
"集計結果": {
"全項目割合": {k: float(v) for k, v in status_ratio.to_dict().items()},
"重要項目全体割合": {k: float(v) for k, v in status_ratio_imp.to_dict().items()} if not result_df_imp.empty else {},
"重要項目ごと割合": [dict(row) for _, row in result_df_imp_items.iterrows()]
}
}
result_json = json.dumps(json_data, ensure_ascii=False, indent=2)
return result_df_all, result_df_imp, result_df_imp_items, summary, result_json
# --- Tab2: 傾向検出 ---
def detect_trends_with_forecast(process_name, datetime_str, window_minutes, forecast_minutes, downsample_factor):
global df, thresholds_df
if df is None or thresholds_df is None:
return None, "⚠ ファイル未読み込み", None
target_time = pd.to_datetime(datetime_str)
start_time = target_time - pd.Timedelta(minutes=window_minutes)
df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= target_time)]
if df_window.empty:
return None, "⚠ データなし", None
# サンプリング間隔を推定
interval = df_window["timestamp"].diff().median()
if pd.isna(interval):
return None, "⚠ サンプリング間隔検出失敗", None
interval_minutes = interval.total_seconds() / 60
# --- 粗化処理 ---
df_window = df_window.iloc[::int(downsample_factor), :].reset_index(drop=True)
effective_interval = interval_minutes * int(downsample_factor)
proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) &
(thresholds_df["Important"] == True)]
if proc_thresholds.empty:
return None, f"⚠ {process_name} の重要項目なし", None
results = []
for _, thr in proc_thresholds.iterrows():
col_tuple = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}"
if col_tuple not in df.columns:
continue
series = df_window[col_tuple].dropna()
if len(series) < 3:
continue
x = np.arange(len(series)).reshape(-1, 1)
y = series.values.reshape(-1, 1)
model = LinearRegression().fit(x, y)
slope = model.coef_[0][0]
last_val = series.iloc[-1]
forecast_steps = int(forecast_minutes / effective_interval)
forecast_val = model.predict([[len(series) + forecast_steps]])[0][0]
forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes)
risk = "安定"
if pd.notna(thr.get("LL")) and forecast_val <= thr["LL"]:
risk = "LL逸脱予測"
elif pd.notna(thr.get("HH")) and forecast_val >= thr["HH"]:
risk = "HH逸脱予測"
results.append({
"ItemName": thr["ItemName"],
"傾き": round(float(slope), 4),
"最終値": round(float(last_val), 3),
"予測値": round(float(forecast_val), 3),
"予測時刻": str(forecast_time),
"予測リスク": risk,
"粗化間隔(分)": round(effective_interval, 2)
})
result_df = pd.DataFrame(results)
result_json = json.dumps(results, ensure_ascii=False, indent=2)
return result_df, "✅ 傾向検出完了", result_json
# --- Tab3: 予兆解析 ---
def forecast_process_with_lag(process_name, datetime_str, forecast_minutes, downsample_factor=3):
global df, thresholds_df, lag_matrix
if df is None or thresholds_df is None or lag_matrix is None:
return None, "⚠ ファイル未読み込み", None
target_time = pd.to_datetime(datetime_str)
forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes)
proc_thresholds = thresholds_df[
(thresholds_df["ProcessNo_ProcessName"] == process_name) &
(thresholds_df["Important"] == True)
]
if proc_thresholds.empty:
return None, f"⚠ {process_name} の重要項目なし", None
if process_name not in lag_matrix.index:
return None, f"⚠ {process_name} のラグ行なし", None
lag_row = lag_matrix.loc[process_name].dropna()
lag_row = lag_row[lag_row > 0] # 正のラグのみ
if lag_row.empty:
return None, f"⚠ {process_name} に正のラグなし", None
results = []
for _, thr in proc_thresholds.iterrows():
y_col = find_matching_column(df, thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"])
if y_col is None:
continue
# 学習データ(直近24h)
df_window = df[(df["timestamp"] >= target_time - pd.Timedelta(hours=24)) &
(df["timestamp"] <= target_time)].copy()
if df_window.empty:
continue
base_df = df_window[["timestamp", y_col]].rename(columns={y_col: "y"})
# === 粗化処理 ===
dt = df_window["timestamp"].diff().median()
if pd.notna(dt):
base_min = max(int(dt.total_seconds() // 60), 1)
else:
base_min = 1
new_interval = max(base_min * int(downsample_factor), 1)
df_down = (base_df.set_index("timestamp")
.resample(f"{new_interval}T").mean()
.dropna()
.reset_index())
merged_df = df_down.copy()
for up_proc, lag_min in lag_row.items():
up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c]
for x_col in up_cols:
shifted = df_window.loc[:, ["timestamp", x_col]].copy()
shifted["timestamp"] = shifted["timestamp"] + pd.Timedelta(minutes=lag_min)
shifted = shifted.rename(columns={x_col: f"{x_col}_lag{lag_min}"})
merged_df = pd.merge_asof(
merged_df.sort_values("timestamp"),
shifted.sort_values("timestamp"),
on="timestamp",
direction="nearest"
)
X_all = merged_df.drop(columns=["timestamp", "y"], errors="ignore").values
Y_all = merged_df["y"].values
if X_all.shape[1] == 0 or len(Y_all) < 5:
continue
# 重回帰
model = LinearRegression().fit(X_all, Y_all)
# 未来予測
X_pred = []
for up_proc, lag_min in lag_row.items():
up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c]
for x_col in up_cols:
ref_time = forecast_time - pd.Timedelta(minutes=lag_min)
idx = (df["timestamp"] - ref_time).abs().idxmin()
X_pred.append(df.loc[idx, x_col])
if not X_pred:
continue
pred_val = model.predict([X_pred])[0]
# 閾値リスク判定
ll, l, h, hh = thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")
risk = "OK"
if pd.notna(ll) and pred_val <= ll:
risk = "LOW-LOW"
elif pd.notna(l) and pred_val <= l:
risk = "LOW"
elif pd.notna(hh) and pred_val >= hh:
risk = "HIGH-HIGH"
elif pd.notna(h) and pred_val >= h:
risk = "HIGH"
results.append({
"ItemName": thr["ItemName"],
"予測値": round(float(pred_val), 3),
"予測時刻": str(forecast_time),
"予測リスク": risk,
"粗化間隔(分)": new_interval
})
result_df = pd.DataFrame(results)
result_json = json.dumps(results, ensure_ascii=False, indent=2)
return result_df, f"✅ {process_name} の予兆解析完了", result_json
# --- Gradio UI ---
with gr.Blocks(css=".gradio-container {overflow: auto !important;}") as demo:
gr.Markdown("## 統合トレンド解析アプリ (MCP対応)")
with gr.Row():
csv_input = gr.File(label="CSVファイル", file_types=[".csv"], type="filepath")
excel_input = gr.File(label="Excel閾値ファイル", file_types=[".xlsx"], type="filepath")
lag_input = gr.File(label="ラグファイル", file_types=[".xlsx"], type="filepath")
load_btn = gr.Button("ファイル読み込み")
load_status = gr.Textbox(label="読み込み結果")
with gr.Tabs():
with gr.Tab("閾値診断"):
process_name1 = gr.Textbox(label="プロセス名")
datetime_str1 = gr.Textbox(label="診断基準日時")
window_minutes1 = gr.Number(label="さかのぼる時間幅(分)", value=60)
run_btn1 = gr.Button("診断実行")
result_df_all = gr.Dataframe(label="全項目の状態集計")
result_df_imp = gr.Dataframe(label="重要項目全体の状態集計")
result_df_imp_items = gr.Dataframe(label="重要項目ごとの状態集計")
summary_output = gr.Textbox(label="サマリー")
json_output = gr.Json(label="JSON集計結果")
run_btn1.click(
diagnose_process_range,
inputs=[process_name1, datetime_str1, window_minutes1],
outputs=[result_df_all, result_df_imp, result_df_imp_items, summary_output, json_output]
)
with gr.Tab("傾向検出"):
process_name2 = gr.Textbox(label="プロセス名")
datetime_str2 = gr.Textbox(label="基準日時")
window_minutes2 = gr.Number(label="過去の時間幅(分)", value=60)
forecast_minutes2 = gr.Number(label="未来予測時間幅(分)", value=60)
downsample_factor2 = gr.Slider(label="粗化倍率", minimum=1, maximum=10, step=1, value=3)
run_btn2 = gr.Button("傾向検出実行")
result_df2 = gr.Dataframe(label="傾向+予測結果")
summary_output2 = gr.Textbox(label="サマリー")
json_output2 = gr.Json(label="JSON結果")
run_btn2.click(
detect_trends_with_forecast,
inputs=[process_name2, datetime_str2, window_minutes2, forecast_minutes2, downsample_factor2],
outputs=[result_df2, summary_output2, json_output2]
)
with gr.Tab("予兆解析"):
process_name3 = gr.Textbox(label="プロセス名")
datetime_str3 = gr.Textbox(label="基準日時")
forecast_minutes3 = gr.Number(label="未来予測時間幅(分)", value=60)
downsample_factor3 = gr.Slider(1, 10, value=3, step=1, label="粗化倍率(サンプリング間引き)")
run_btn3 = gr.Button("予兆解析実行")
result_df3 = gr.Dataframe(label="予兆解析結果")
summary_output3 = gr.Textbox(label="サマリー")
json_output3 = gr.Json(label="JSON結果")
run_btn3.click(
forecast_process_with_lag,
inputs=[process_name3, datetime_str3, forecast_minutes3, downsample_factor3],
outputs=[result_df3, summary_output3, json_output3]
)
load_btn.click(load_files, inputs=[csv_input, excel_input, lag_input], outputs=[load_status])
if __name__ == "__main__":
use_mcp = os.getenv("USE_MCP", "0") == "1"
if use_mcp:
demo.launch(mcp_server=True)
else:
demo.launch(server_name="0.0.0.0", share=False)