| | |
| |
|
| | import gradio as gr |
| | import pandas as pd |
| | import numpy as np |
| | from sklearn.linear_model import LinearRegression |
| | import json |
| | import tempfile |
| | import os |
| |
|
| | |
| | def detect_trends_with_forecast(process_name, datetime_str, window_minutes, forecast_minutes, csv_file, excel_file): |
| | try: |
| | csv_path = csv_file |
| | excel_path = excel_file |
| |
|
| | |
| | df = pd.read_csv(csv_path, header=[0, 1, 2]) |
| | timestamp_col = df.iloc[:, 0] |
| | df = df.drop(df.columns[0], axis=1) |
| | df.insert(0, "timestamp", timestamp_col) |
| | df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
| |
|
| | |
| | thresholds_df = pd.read_excel(excel_path) |
| | thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) |
| | for col in ["LL", "L", "H", "HH"]: |
| | if col in thresholds_df.columns: |
| | thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce") |
| |
|
| | |
| | target_time = pd.to_datetime(datetime_str) |
| | start_time = target_time - pd.Timedelta(minutes=window_minutes) |
| | end_time = target_time |
| | df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)] |
| |
|
| | if df_window.empty: |
| | return None, "⚠ 指定時間幅にデータなし", None |
| |
|
| | |
| | interval = df_window["timestamp"].diff().median() |
| | if pd.isna(interval) or interval == pd.Timedelta(0): |
| | return None, "⚠ サンプリング間隔を検出できません", None |
| | interval_minutes = interval.total_seconds() / 60 |
| |
|
| | |
| | proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & |
| | (thresholds_df["Important"] == True)] |
| | if proc_thresholds.empty: |
| | return None, f"⚠ プロセス {process_name} の重要項目なし", None |
| |
|
| | results = [] |
| | for _, thr in proc_thresholds.iterrows(): |
| | col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) |
| | if col_tuple not in df.columns: |
| | continue |
| |
|
| | series = df_window[col_tuple].dropna() |
| | if len(series) < 3: |
| | continue |
| |
|
| | |
| | x = np.arange(len(series)).reshape(-1, 1) |
| | y = series.values.reshape(-1, 1) |
| | model = LinearRegression().fit(x, y) |
| | slope = model.coef_[0][0] |
| |
|
| | last_val = series.iloc[-1] |
| | n = len(series) |
| |
|
| | |
| | forecast_steps = max(1, int(round(forecast_minutes / interval_minutes))) |
| | forecast_index = n + forecast_steps |
| | forecast_val = model.predict([[forecast_index]])[0][0] |
| | forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) |
| |
|
| | l, ll, h, hh = thr.get("L"), thr.get("LL"), thr.get("H"), thr.get("HH") |
| |
|
| | |
| | status = "安定" |
| | if slope < 0 and pd.notna(ll): |
| | if last_val > ll: |
| | status = "LL接近下降傾向" |
| | elif last_val <= ll: |
| | status = "LL逸脱下降傾向" |
| | if slope > 0 and pd.notna(hh): |
| | if last_val < hh: |
| | status = "HH接近上昇傾向" |
| | elif last_val >= hh: |
| | status = "HH逸脱上昇傾向" |
| |
|
| | |
| | forecast_status = "安定" |
| | if pd.notna(ll) and forecast_val <= ll: |
| | forecast_status = "LL逸脱予測" |
| | elif pd.notna(hh) and forecast_val >= hh: |
| | forecast_status = "HH逸脱予測" |
| |
|
| | results.append({ |
| | "ItemName": thr["ItemName"], |
| | "傾向": status, |
| | "傾き": float(round(slope, 4)), |
| | "最終値": float(round(float(last_val), 3)) if pd.notna(last_val) else None, |
| | "予測値": float(round(float(forecast_val), 3)), |
| | "予測時刻": str(forecast_time), |
| | "予測傾向": forecast_status, |
| | "サンプリング間隔(分)": float(interval_minutes), |
| | "LL": None if pd.isna(ll) else float(ll), |
| | "L": None if pd.isna(l) else float(l), |
| | "H": None if pd.isna(h) else float(h), |
| | "HH": None if pd.isna(hh) else float(hh) |
| | }) |
| |
|
| | result_df = pd.DataFrame(results) |
| |
|
| | |
| | result_json = json.dumps(results, ensure_ascii=False, indent=2) |
| | fd, tmp_path = tempfile.mkstemp(suffix=".json", prefix="trend_result_") |
| | os.close(fd) |
| | with open(tmp_path, "w", encoding="utf-8") as f: |
| | f.write(result_json) |
| |
|
| | summary = f"✅ {process_name} の傾向検出+未来予測完了({start_time}~{end_time}, 予測+{forecast_minutes}分)" |
| | return result_df, summary, tmp_path |
| |
|
| | except Exception as e: |
| | return None, f"❌ エラー: {str(e)}", None |
| |
|
| |
|
| | |
| | with gr.Blocks( |
| | css=".gradio-container {overflow-y: auto !important; height: 100vh;}" |
| | ) as demo: |
| | gr.Markdown("## 傾向検出アプリ(重要項目ごとの詳細+閾値予測)") |
| |
|
| | with gr.Row(): |
| | csv_input = gr.File(label="CSVファイルをアップロード", type="filepath") |
| | excel_input = gr.File(label="閾値テーブル (Excel)", type="filepath") |
| |
|
| | with gr.Row(): |
| | process_name = gr.Textbox(label="プロセス名", placeholder="例: E018-A012_除害RO") |
| | datetime_str = gr.Textbox(label="日時 (例: 2025/8/1 1:05)") |
| | with gr.Row(): |
| | window_minutes = gr.Number(label="参照時間幅(分)", value=60) |
| | forecast_minutes = gr.Number(label="未来予測時間幅(分)", value=60) |
| |
|
| | run_btn = gr.Button("実行") |
| |
|
| | with gr.Row(): |
| | result_table = gr.Dataframe(label="傾向+未来予測結果", wrap=True) |
| | summary_output = gr.Textbox(label="サマリー") |
| | json_download = gr.File(label="JSON結果ダウンロード") |
| |
|
| | run_btn.click( |
| | fn=detect_trends_with_forecast, |
| | inputs=[process_name, datetime_str, window_minutes, forecast_minutes, csv_input, excel_input], |
| | outputs=[result_table, summary_output, json_download] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | |
| | demo.launch( |
| | server_name="0.0.0.0", |
| | mcp_server=True, |
| | share=False, |
| | ssr_mode=False |
| | ) |
| |
|