Ken-INOUE's picture
Initial implementation of the project structure and core functionality.
bac52f7
# 予兆解析アプリ Gradio + MCP対応版
import gradio as gr
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
import json
import os
# --- ユーティリティ ---
def normalize(s):
return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip()
def find_matching_column(df, col_id, item_name, process_name):
norm_item = normalize(item_name)
candidates = [
c for c in df.columns
if isinstance(c, str)
and col_id in c
and process_name in c
and norm_item in normalize(c)
]
return candidates[0] if candidates else None
# --- 予兆解析関数 ---
def forecast_process_with_lag(csv_file, excel_file, lag_file, process_name, datetime_str, forecast_minutes):
try:
# CSV 読み込み(3行ヘッダー)
df = pd.read_csv(csv_file.name, header=[0, 1, 2])
timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce")
df = df.drop(df.columns[0], axis=1)
df.insert(0, "timestamp", timestamp_col)
# MultiIndex → 文字列化
def col_to_str(col):
return "_".join([str(c) for c in col if c]) if isinstance(col, tuple) else str(col)
df.columns = [
"timestamp" if (isinstance(c, str) and c == "timestamp") else col_to_str(c)
for c in df.columns
]
# 閾値テーブル
thresholds_df = pd.read_excel(excel_file.name)
thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False})
for col in ["LL", "L", "H", "HH"]:
if col in thresholds_df.columns:
thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce")
# ラグテーブル
lag_matrix = pd.read_excel(lag_file.name, index_col=0)
except Exception as e:
return None, f"❌ 入力ファイルの読み込みに失敗しました: {e}", None
try:
target_time = pd.to_datetime(datetime_str)
forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes)
except Exception:
return None, f"⚠ 入力した日時 {datetime_str} が無効です。", None
proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & (thresholds_df["Important"] == True)]
if proc_thresholds.empty:
return None, f"⚠ プロセス {process_name} に重要項目なし", None
if process_name not in lag_matrix.index:
return None, f"⚠ タイムラグ表に {process_name} の行がありません", None
lag_row = lag_matrix.loc[process_name].dropna()
lag_row = lag_row[lag_row > 0] # 正のラグのみ
if lag_row.empty:
return None, f"⚠ プロセス {process_name} に正のラグを持つ上流工程がありません", None
results = []
for _, thr in proc_thresholds.iterrows():
y_col = find_matching_column(df, thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"])
if y_col is None:
continue
# 学習データ(直近24時間)
df_window = df[df["timestamp"] <= target_time].copy()
df_window = df_window[df_window["timestamp"] >= target_time - pd.Timedelta(hours=24)]
if df_window.empty:
continue
try:
base_df = df_window[["timestamp", y_col]].rename(columns={y_col: "y"})
except KeyError:
continue
merged_df = base_df.copy()
for up_proc, lag_min in lag_row.items():
try:
up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c]
for x_col in up_cols:
shifted = df_window.loc[:, ["timestamp", x_col]].copy()
shifted["timestamp"] = shifted["timestamp"] + pd.Timedelta(minutes=lag_min)
shifted = shifted.rename(columns={x_col: f"{x_col}_lag{lag_min}"})
merged_df = pd.merge_asof(
merged_df.sort_values("timestamp"),
shifted.sort_values("timestamp"),
on="timestamp",
direction="nearest"
)
except Exception:
continue
X_all = merged_df.drop(columns=["timestamp", "y"], errors="ignore").values
Y_all = merged_df["y"].values
if X_all.shape[1] == 0 or len(Y_all) < 5:
continue
# モデル学習
model = LinearRegression().fit(X_all, Y_all)
# 未来予測
X_pred = []
for up_proc, lag_min in lag_row.items():
up_cols = [c for c in df.columns if isinstance(c, str) and up_proc in c]
for x_col in up_cols:
try:
ref_time = forecast_time - pd.Timedelta(minutes=lag_min)
idx = (df["timestamp"] - ref_time).abs().idxmin()
X_pred.append(df.loc[idx, x_col])
except Exception:
continue
if not X_pred:
continue
pred_val = model.predict([X_pred])[0]
# 閾値リスク判定
ll, l, h, hh = thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")
risk = "OK"
if pd.notna(ll) and pred_val <= ll:
risk = "LOW-LOW"
elif pd.notna(l) and pred_val <= l:
risk = "LOW"
elif pd.notna(hh) and pred_val >= hh:
risk = "HIGH-HIGH"
elif pd.notna(h) and pred_val >= h:
risk = "HIGH"
results.append({
"ItemName": thr["ItemName"],
"予測値": round(float(pred_val), 3),
"予測時刻": str(forecast_time),
"予測リスク": risk,
"使用上流工程数": len(lag_row)
})
result_df = pd.DataFrame(results)
result_json = json.dumps(results, ensure_ascii=False, indent=2)
summary = f"✅ {process_name} の予兆解析完了 ({target_time}{forecast_time})"
return result_df, summary, result_json
# --- Gradio UI ---
with gr.Blocks(css="body {overflow-y: scroll;}") as demo:
gr.Markdown("## 予兆解析アプリ (MCP対応)")
with gr.Row():
csv_input = gr.File(label="CSVファイルをアップロード", file_types=[".csv"], type="filepath")
excel_input = gr.File(label="Excel閾値ファイルをアップロード", file_types=[".xlsx"], type="filepath")
lag_input = gr.File(label="タイムラグファイルをアップロード", file_types=[".xlsx"], type="filepath")
process_name = gr.Textbox(label="プロセス名", value="E018-A012_除害RO")
datetime_str = gr.Textbox(label="基準日時", value="2025/8/2 0:05")
forecast_minutes = gr.Number(label="予測時間幅(分)", value=60)
run_btn = gr.Button("予兆解析を実行")
result_df = gr.Dataframe(label="予兆解析結果", wrap=True, interactive=False)
summary_output = gr.Textbox(label="サマリー")
json_output = gr.Json(label="JSON結果")
run_btn.click(
forecast_process_with_lag,
inputs=[csv_input, excel_input, lag_input, process_name, datetime_str, forecast_minutes],
outputs=[result_df, summary_output, json_output]
)
if __name__ == "__main__":
use_mcp = os.getenv("USE_MCP", "0") == "1"
if use_mcp:
demo.launch(mcp_server=True)
else:
demo.launch(server_name="0.0.0.0", share=False)