File size: 7,479 Bytes
bac52f7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | # 予兆解析アプリ Gradio + MCP対応版
import gradio as gr
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
import json
import os
# --- ユーティリティ ---
def normalize(s):
return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip()
def find_matching_column(df, col_id, item_name, process_name):
norm_item = normalize(item_name)
candidates = [
c for c in df.columns
if isinstance(c, str)
and col_id in c
and process_name in c
and norm_item in normalize(c)
]
return candidates[0] if candidates else None
# --- 予兆解析関数 ---
def forecast_process_with_lag(csv_file, excel_file, lag_file, process_name, datetime_str, forecast_minutes):
try:
# CSV 読み込み(3行ヘッダー)
df = pd.read_csv(csv_file.name, header=[0, 1, 2])
timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce")
df = df.drop(df.columns[0], axis=1)
df.insert(0, "timestamp", timestamp_col)
# MultiIndex → 文字列化
def col_to_str(col):
return "_".join([str(c) for c in col if c]) if isinstance(col, tuple) else str(col)
df.columns = [
"timestamp" if (isinstance(c, str) and c == "timestamp") else col_to_str(c)
for c in df.columns
]
# 閾値テーブル
thresholds_df = pd.read_excel(excel_file.name)
thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False})
for col in ["LL", "L", "H", "HH"]:
if col in thresholds_df.columns:
thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce")
# ラグテーブル
lag_matrix = pd.read_excel(lag_file.name, index_col=0)
except Exception as e:
return None, f"❌ 入力ファイルの読み込みに失敗しました: {e}", None
try:
target_time = pd.to_datetime(datetime_str)
forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes)
except Exception:
return None, f"⚠ 入力した日時 {datetime_str} が無効です。", None
proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & (thresholds_df["Important"] == True)]
if proc_thresholds.empty:
return None, f"⚠ プロセス {process_name} に重要項目なし", None
if process_name not in lag_matrix.index:
return None, f"⚠ タイムラグ表に {process_name} の行がありません", None
lag_row = lag_matrix.loc[process_name].dropna()
lag_row = lag_row[lag_row > 0] # 正のラグのみ
if lag_row.empty:
return None, f"⚠ プロセス {process_name} に正のラグを持つ上流工程がありません", None
results = []
for _, thr in proc_thresholds.iterrows():
y_col = find_matching_column(df, thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"])
if y_col is None:
continue
# 学習データ(直近24時間)
df_window = df[df["timestamp"] <= target_time].copy()
df_window = df_window[df_window["timestamp"] >= target_time - pd.Timedelta(hours=24)]
if df_window.empty:
continue
try:
base_df = df_window[["timestamp", y_col]].rename(columns={y_col: "y"})
except KeyError:
continue
merged_df = base_df.copy()
for up_proc, lag_min in lag_row.items():
try:
up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c]
for x_col in up_cols:
shifted = df_window.loc[:, ["timestamp", x_col]].copy()
shifted["timestamp"] = shifted["timestamp"] + pd.Timedelta(minutes=lag_min)
shifted = shifted.rename(columns={x_col: f"{x_col}_lag{lag_min}"})
merged_df = pd.merge_asof(
merged_df.sort_values("timestamp"),
shifted.sort_values("timestamp"),
on="timestamp",
direction="nearest"
)
except Exception:
continue
X_all = merged_df.drop(columns=["timestamp", "y"], errors="ignore").values
Y_all = merged_df["y"].values
if X_all.shape[1] == 0 or len(Y_all) < 5:
continue
# モデル学習
model = LinearRegression().fit(X_all, Y_all)
# 未来予測
X_pred = []
for up_proc, lag_min in lag_row.items():
up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c]
for x_col in up_cols:
try:
ref_time = forecast_time - pd.Timedelta(minutes=lag_min)
idx = (df["timestamp"] - ref_time).abs().idxmin()
X_pred.append(df.loc[idx, x_col])
except Exception:
continue
if not X_pred:
continue
pred_val = model.predict([X_pred])[0]
# 閾値リスク判定
ll, l, h, hh = thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")
risk = "OK"
if pd.notna(ll) and pred_val <= ll:
risk = "LOW-LOW"
elif pd.notna(l) and pred_val <= l:
risk = "LOW"
elif pd.notna(hh) and pred_val >= hh:
risk = "HIGH-HIGH"
elif pd.notna(h) and pred_val >= h:
risk = "HIGH"
results.append({
"ItemName": thr["ItemName"],
"予測値": round(float(pred_val), 3),
"予測時刻": str(forecast_time),
"予測リスク": risk,
"使用上流工程数": len(lag_row)
})
result_df = pd.DataFrame(results)
result_json = json.dumps(results, ensure_ascii=False, indent=2)
summary = f"✅ {process_name} の予兆解析完了 ({target_time} → {forecast_time})"
return result_df, summary, result_json
# --- Gradio UI ---
with gr.Blocks(css="body {overflow-y: scroll;}") as demo:
gr.Markdown("## 予兆解析アプリ (MCP対応)")
with gr.Row():
csv_input = gr.File(label="CSVファイルをアップロード", file_types=[".csv"], type="filepath")
excel_input = gr.File(label="Excel閾値ファイルをアップロード", file_types=[".xlsx"], type="filepath")
lag_input = gr.File(label="タイムラグファイルをアップロード", file_types=[".xlsx"], type="filepath")
process_name = gr.Textbox(label="プロセス名", value="E018-A012_除害RO")
datetime_str = gr.Textbox(label="基準日時", value="2025/8/2 0:05")
forecast_minutes = gr.Number(label="予測時間幅(分)", value=60)
run_btn = gr.Button("予兆解析を実行")
result_df = gr.Dataframe(label="予兆解析結果", wrap=True, interactive=False)
summary_output = gr.Textbox(label="サマリー")
json_output = gr.Json(label="JSON結果")
run_btn.click(
forecast_process_with_lag,
inputs=[csv_input, excel_input, lag_input, process_name, datetime_str, forecast_minutes],
outputs=[result_df, summary_output, json_output]
)
if __name__ == "__main__":
use_mcp = os.getenv("USE_MCP", "0") == "1"
if use_mcp:
demo.launch(mcp_server=True)
else:
demo.launch(server_name="0.0.0.0", share=False)
|