File size: 6,859 Bytes
1eb5c08 d7bc1e2 f1a3458 d7bc1e2 19040f3 f1a3458 1eb5c08 73596a8 f1a3458 19040f3 73596a8 19040f3 f1a3458 73596a8 f1a3458 1eb5c08 19040f3 73596a8 f1a3458 73596a8 d7bc1e2 73596a8 19040f3 73596a8 1eb5c08 19040f3 73596a8 19040f3 73596a8 19040f3 73596a8 19040f3 1eb5c08 73596a8 19040f3 d7bc1e2 73596a8 d7bc1e2 73596a8 c4bdd99 73596a8 d7bc1e2 73596a8 d7bc1e2 73596a8 d7bc1e2 73596a8 d7bc1e2 73596a8 19040f3 73596a8 d7bc1e2 f1a3458 1eb5c08 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | # 傾向検出アプリ(Gradio版・MCP対応)
import gradio as gr
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
import json
import tempfile
import os
# --- 傾向検出+未来予測関数 ---
def detect_trends_with_forecast(process_name, datetime_str, window_minutes, forecast_minutes, csv_file, excel_file):
try:
csv_path = csv_file
excel_path = excel_file
# CSV読み込み(3行ヘッダー)
df = pd.read_csv(csv_path, header=[0, 1, 2])
timestamp_col = df.iloc[:, 0]
df = df.drop(df.columns[0], axis=1)
df.insert(0, "timestamp", timestamp_col)
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
# 閾値テーブル
thresholds_df = pd.read_excel(excel_path)
thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False})
for col in ["LL", "L", "H", "HH"]:
if col in thresholds_df.columns:
thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce")
# --- 時間範囲 ---
target_time = pd.to_datetime(datetime_str)
start_time = target_time - pd.Timedelta(minutes=window_minutes)
end_time = target_time
df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)]
if df_window.empty:
return None, "⚠ 指定時間幅にデータなし", None
# サンプリング間隔
interval = df_window["timestamp"].diff().median()
if pd.isna(interval) or interval == pd.Timedelta(0):
return None, "⚠ サンプリング間隔を検出できません", None
interval_minutes = interval.total_seconds() / 60
# 閾値:対象プロセスかつ重要項目
proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) &
(thresholds_df["Important"] == True)]
if proc_thresholds.empty:
return None, f"⚠ プロセス {process_name} の重要項目なし", None
results = []
for _, thr in proc_thresholds.iterrows():
col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"])
if col_tuple not in df.columns:
continue
series = df_window[col_tuple].dropna()
if len(series) < 3:
continue
# 線形回帰
x = np.arange(len(series)).reshape(-1, 1)
y = series.values.reshape(-1, 1)
model = LinearRegression().fit(x, y)
slope = model.coef_[0][0]
last_val = series.iloc[-1]
n = len(series)
# 未来予測
forecast_steps = max(1, int(round(forecast_minutes / interval_minutes)))
forecast_index = n + forecast_steps
forecast_val = model.predict([[forecast_index]])[0][0]
forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes)
l, ll, h, hh = thr.get("L"), thr.get("LL"), thr.get("H"), thr.get("HH")
# 現在の傾向
status = "安定"
if slope < 0 and pd.notna(ll):
if last_val > ll:
status = "LL接近下降傾向"
elif last_val <= ll:
status = "LL逸脱下降傾向"
if slope > 0 and pd.notna(hh):
if last_val < hh:
status = "HH接近上昇傾向"
elif last_val >= hh:
status = "HH逸脱上昇傾向"
# 未来予測の逸脱
forecast_status = "安定"
if pd.notna(ll) and forecast_val <= ll:
forecast_status = "LL逸脱予測"
elif pd.notna(hh) and forecast_val >= hh:
forecast_status = "HH逸脱予測"
results.append({
"ItemName": thr["ItemName"],
"傾向": status,
"傾き": float(round(slope, 4)),
"最終値": float(round(float(last_val), 3)) if pd.notna(last_val) else None,
"予測値": float(round(float(forecast_val), 3)),
"予測時刻": str(forecast_time),
"予測傾向": forecast_status,
"サンプリング間隔(分)": float(interval_minutes),
"LL": None if pd.isna(ll) else float(ll),
"L": None if pd.isna(l) else float(l),
"H": None if pd.isna(h) else float(h),
"HH": None if pd.isna(hh) else float(hh)
})
result_df = pd.DataFrame(results)
# JSON保存(ファイルパスを返す)
result_json = json.dumps(results, ensure_ascii=False, indent=2)
fd, tmp_path = tempfile.mkstemp(suffix=".json", prefix="trend_result_")
os.close(fd)
with open(tmp_path, "w", encoding="utf-8") as f:
f.write(result_json)
summary = f"✅ {process_name} の傾向検出+未来予測完了({start_time}~{end_time}, 予測+{forecast_minutes}分)"
return result_df, summary, tmp_path
except Exception as e:
return None, f"❌ エラー: {str(e)}", None
# --- Gradio UI ---
with gr.Blocks(
css=".gradio-container {overflow-y: auto !important; height: 100vh;}"
) as demo:
gr.Markdown("## 傾向検出アプリ(重要項目ごとの詳細+閾値予測)")
with gr.Row():
csv_input = gr.File(label="CSVファイルをアップロード", type="filepath")
excel_input = gr.File(label="閾値テーブル (Excel)", type="filepath")
with gr.Row():
process_name = gr.Textbox(label="プロセス名", placeholder="例: E018-A012_除害RO")
datetime_str = gr.Textbox(label="日時 (例: 2025/8/1 1:05)")
with gr.Row():
window_minutes = gr.Number(label="参照時間幅(分)", value=60)
forecast_minutes = gr.Number(label="未来予測時間幅(分)", value=60)
run_btn = gr.Button("実行")
with gr.Row():
result_table = gr.Dataframe(label="傾向+未来予測結果", wrap=True)
summary_output = gr.Textbox(label="サマリー")
json_download = gr.File(label="JSON結果ダウンロード")
run_btn.click(
fn=detect_trends_with_forecast,
inputs=[process_name, datetime_str, window_minutes, forecast_minutes, csv_input, excel_input],
outputs=[result_table, summary_output, json_download]
)
if __name__ == "__main__":
# Web UI と MCP を同時に有効化
demo.launch(
server_name="0.0.0.0",
mcp_server=True, # MCP サーバー起動
share=False, # Hugging Face なら不要
ssr_mode=False # SSR は無効に
)
|