| | |
| |
|
| | import gradio as gr |
| | import pandas as pd |
| | import numpy as np |
| | from sklearn.linear_model import LinearRegression |
| | import json |
| | import os |
| |
|
| | |
| | def judge_status(value, ll, l, h, hh): |
| | if pd.notna(ll) and value < ll: |
| | return "LOW-LOW" |
| | elif pd.notna(l) and value < l: |
| | return "LOW" |
| | elif pd.notna(hh) and value > hh: |
| | return "HIGH-HIGH" |
| | elif pd.notna(h) and value > h: |
| | return "HIGH" |
| | else: |
| | return "OK" |
| |
|
| | def convert_value(v): |
| | if hasattr(v, "item"): |
| | return v.item() |
| | return float(v) if isinstance(v, (np.floating, float)) else int(v) if isinstance(v, (np.integer, int)) else v |
| |
|
| | |
| | def diagnose_process_range(df, thresholds_df, process_name, datetime_str, window_minutes): |
| | try: |
| | target_time = pd.to_datetime(datetime_str) |
| | except Exception: |
| | return None, None, None, f"⚠ 入力した日時 {datetime_str} が無効です。", None |
| |
|
| | start_time = target_time - pd.Timedelta(minutes=window_minutes) |
| | end_time = target_time |
| | df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)] |
| | if df_window.empty: |
| | return None, None, None, "⚠ 指定した時間幅にデータが見つかりません。", None |
| |
|
| | proc_thresholds = thresholds_df[thresholds_df["ProcessNo_ProcessName"] == process_name] |
| | if proc_thresholds.empty: |
| | return None, None, None, f"⚠ プロセス {process_name} の閾値が設定されていません。", None |
| |
|
| | all_results = [] |
| | for _, row in df_window.iterrows(): |
| | for _, thr in proc_thresholds.iterrows(): |
| | col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) |
| | if col_tuple not in df.columns: |
| | continue |
| | value = row[col_tuple] |
| | status = judge_status(value, thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")) |
| | all_results.append({ |
| | "ColumnID": thr["ColumnID"], |
| | "ItemName": thr["ItemName"], |
| | "判定": status, |
| | "重要項目": bool(thr.get("Important", False)), |
| | "時刻": str(row["timestamp"]) |
| | }) |
| |
|
| | total = len(all_results) |
| | status_counts = pd.Series([r["判定"] for r in all_results]).value_counts().reindex( |
| | ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 |
| | ) |
| | status_ratio = (status_counts / total * 100).round(1) |
| | result_df_all = pd.DataFrame({ |
| | "状態": status_counts.index, |
| | "件数": status_counts.values, |
| | "割合(%)": status_ratio.values |
| | }) |
| |
|
| | important_results = [r for r in all_results if r["重要項目"]] |
| | if important_results: |
| | total_imp = len(important_results) |
| | status_counts_imp = pd.Series([r["判定"] for r in important_results]).value_counts().reindex( |
| | ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 |
| | ) |
| | status_ratio_imp = (status_counts_imp / total_imp * 100).round(1) |
| | result_df_imp = pd.DataFrame({ |
| | "状態": status_counts_imp.index, |
| | "件数": status_counts_imp.values, |
| | "割合(%)": status_ratio_imp.values |
| | }) |
| | else: |
| | result_df_imp = pd.DataFrame(columns=["状態", "件数", "割合(%)"]) |
| | status_ratio_imp = pd.Series(dtype=float) |
| |
|
| | result_per_item = [] |
| | for item in [r["ItemName"] for r in important_results]: |
| | item_results = [r for r in important_results if r["ItemName"] == item] |
| | if not item_results: |
| | continue |
| | total_item = len(item_results) |
| | status_counts_item = pd.Series([r["判定"] for r in item_results]).value_counts().reindex( |
| | ["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0 |
| | ) |
| | status_ratio_item = (status_counts_item / total_item * 100).round(1) |
| | for s, c, r in zip(status_counts_item.index, status_counts_item.values, status_ratio_item.values): |
| | result_per_item.append({"ItemName": item, "状態": s, "件数": c, "割合(%)": r}) |
| | result_df_imp_items = pd.DataFrame(result_per_item) |
| |
|
| | summary = ( |
| | f"✅ {process_name} の診断完了({start_time} ~ {end_time})\n" |
| | + "[全項目] " + " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio.items()]) + "\n" |
| | + "[重要項目全体] " + ( |
| | " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio_imp.items()]) |
| | if not result_df_imp.empty else "対象データなし" |
| | ) |
| | ) |
| |
|
| | json_data = { |
| | "集計結果": { |
| | "全項目割合": {k: convert_value(v) for k, v in status_ratio.to_dict().items()}, |
| | "重要項目全体割合": {k: convert_value(v) for k, v in status_ratio_imp.to_dict().items()} if not result_df_imp.empty else {}, |
| | "重要項目ごと割合": [ |
| | {k: convert_value(v) for k, v in row.items()} for _, row in result_df_imp_items.iterrows() |
| | ] |
| | } |
| | } |
| | result_json = json.dumps(json_data, ensure_ascii=False, indent=2) |
| |
|
| | return result_df_all, result_df_imp, result_df_imp_items, summary, result_json |
| |
|
| | |
| | def detect_trends_with_forecast(df, thresholds_df, process_name, datetime_str, window_minutes, forecast_minutes): |
| | target_time = pd.to_datetime(datetime_str) |
| | start_time = target_time - pd.Timedelta(minutes=window_minutes) |
| | end_time = target_time |
| | df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)] |
| | if df_window.empty: |
| | return None, "⚠ 指定時間幅にデータなし", None |
| |
|
| | interval = df_window["timestamp"].diff().median() |
| | if pd.isna(interval): |
| | return None, "⚠ サンプリング間隔を検出できません", None |
| | interval_minutes = interval.total_seconds() / 60 |
| |
|
| | proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & |
| | (thresholds_df["Important"] == True)] |
| | if proc_thresholds.empty: |
| | return None, f"⚠ プロセス {process_name} の重要項目なし", None |
| |
|
| | results = [] |
| | for _, thr in proc_thresholds.iterrows(): |
| | col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) |
| | if col_tuple not in df.columns: |
| | continue |
| | series = df_window[col_tuple].dropna() |
| | if len(series) < 3: |
| | continue |
| |
|
| | x = np.arange(len(series)).reshape(-1, 1) |
| | y = series.values.reshape(-1, 1) |
| | model = LinearRegression().fit(x, y) |
| | slope = model.coef_[0][0] |
| |
|
| | last_val = series.iloc[-1] |
| | n = len(series) |
| |
|
| | forecast_steps = int(forecast_minutes / interval_minutes) |
| | forecast_index = n + forecast_steps |
| | forecast_val = model.predict([[forecast_index]])[0][0] |
| | forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) |
| |
|
| | l, ll, h, hh = thr.get("L"), thr.get("LL"), thr.get("H"), thr.get("HH") |
| |
|
| | status = "安定" |
| | if slope < 0 and pd.notna(ll): |
| | if last_val > ll: |
| | status = "LL接近下降傾向" |
| | elif last_val <= ll: |
| | status = "LL逸脱下降傾向" |
| | if slope > 0 and pd.notna(hh): |
| | if last_val < hh: |
| | status = "HH接近上昇傾向" |
| | elif last_val >= hh: |
| | status = "HH逸脱上昇傾向" |
| |
|
| | forecast_status = "安定" |
| | if pd.notna(ll) and forecast_val <= ll: |
| | forecast_status = "LL逸脱予測" |
| | elif pd.notna(hh) and forecast_val >= hh: |
| | forecast_status = "HH逸脱予測" |
| |
|
| | results.append({ |
| | "ItemName": thr["ItemName"], |
| | "傾向": status, |
| | "傾き": round(slope, 4), |
| | "最終値": round(float(last_val), 3) if pd.notna(last_val) else None, |
| | "予測値": round(float(forecast_val), 3), |
| | "予測時刻": str(forecast_time), |
| | "予測傾向": forecast_status, |
| | "サンプリング間隔(分)": interval_minutes, |
| | "LL": ll, "L": l, "H": h, "HH": hh |
| | }) |
| |
|
| | result_df = pd.DataFrame(results) |
| | result_json = json.dumps(results, ensure_ascii=False, indent=2) |
| | return result_df, "✅ 傾向検出+未来予測完了", result_json |
| |
|
| | |
| | with gr.Blocks(css=".gradio-container {overflow:auto !important}") as demo: |
| | gr.Markdown("## 統合版アプリ (閾値診断 + 傾向検出)") |
| |
|
| | with gr.Row(): |
| | csv_input = gr.File(label="CSVファイルをアップロード", file_types=[".csv"], type="filepath") |
| | excel_input = gr.File(label="Excel閾値ファイルをアップロード", file_types=[".xlsx"], type="filepath") |
| |
|
| | |
| | state_df = gr.State() |
| | state_thresholds = gr.State() |
| |
|
| | def load_files(csv_file, excel_file): |
| | try: |
| | df = pd.read_csv(csv_file.name, header=[0,1,2]) |
| | timestamp_col = df.iloc[:,0] |
| | df = df.drop(df.columns[0], axis=1) |
| | df.insert(0,"timestamp", timestamp_col) |
| | df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
| | thresholds_df = pd.read_excel(excel_file.name) |
| | thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE":True,"FALSE":False}) |
| | for col in ["LL","L","H","HH"]: |
| | if col in thresholds_df.columns: |
| | thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce") |
| | return df, thresholds_df, "✅ ファイルを読み込みました" |
| | except Exception as e: |
| | return None, None, f"❌ 読み込み失敗: {e}" |
| |
|
| | load_btn = gr.Button("ファイルを読み込み") |
| | load_status = gr.Textbox(label="ロード状態") |
| |
|
| | load_btn.click(load_files, inputs=[csv_input, excel_input], outputs=[state_df, state_thresholds, load_status]) |
| |
|
| | with gr.Tabs(): |
| | with gr.Tab("閾値診断"): |
| | process_name = gr.Textbox(label="プロセス名", value="E018-A012_除害RO") |
| | datetime_str = gr.Textbox(label="診断基準日時", value="2025/8/1 1:05") |
| | window_minutes = gr.Number(label="さかのぼる時間幅(分)", value=60) |
| | run_btn = gr.Button("診断を実行") |
| | result_df_all = gr.Dataframe(label="全項目の状態集計結果") |
| | result_df_imp = gr.Dataframe(label="重要項目全体の状態集計結果") |
| | result_df_imp_items = gr.Dataframe(label="重要項目ごとの状態集計結果") |
| | summary_output = gr.Textbox(label="サマリー") |
| | json_output = gr.Json(label="JSON集計結果") |
| |
|
| | run_btn.click( |
| | diagnose_process_range, |
| | inputs=[state_df, state_thresholds, process_name, datetime_str, window_minutes], |
| | outputs=[result_df_all, result_df_imp, result_df_imp_items, summary_output, json_output] |
| | ) |
| |
|
| | with gr.Tab("傾向検出"): |
| | process_name2 = gr.Textbox(label="プロセス名", value="E018-A012_除害RO") |
| | datetime_str2 = gr.Textbox(label="基準日時", value="2025/8/1 1:05") |
| | window_minutes2 = gr.Number(label="さかのぼる時間幅(分)", value=60) |
| | forecast_minutes2 = gr.Number(label="未来予測時間幅(分)", value=60) |
| | run_btn2 = gr.Button("傾向検出を実行") |
| | result_df2 = gr.Dataframe(label="傾向+未来予測結果") |
| | summary_output2 = gr.Textbox(label="サマリー") |
| | json_output2 = gr.Json(label="JSON結果") |
| |
|
| | run_btn2.click( |
| | detect_trends_with_forecast, |
| | inputs=[state_df, state_thresholds, process_name2, datetime_str2, window_minutes2, forecast_minutes2], |
| | outputs=[result_df2, summary_output2, json_output2] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | use_mcp = os.getenv("USE_MCP","0") == "1" |
| | if use_mcp: |
| | demo.launch(mcp_server=True) |
| | else: |
| | demo.launch(server_name="0.0.0.0", share=False) |
| |
|