Ken-INOUE's picture
Refactor app.py to integrate threshold diagnosis and trend detection features, enhancing Gradio UI for improved user experience.
46ca6fa
# 統合版: 閾値診断 + 傾向検出 (CSV/Excel共通入力, MCP対応)
import gradio as gr
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
import json
import os
# ===== 共通ユーティリティ =====
def judge_status(value, ll, l, h, hh):
if pd.notna(ll) and value < ll:
return "LOW-LOW"
elif pd.notna(l) and value < l:
return "LOW"
elif pd.notna(hh) and value > hh:
return "HIGH-HIGH"
elif pd.notna(h) and value > h:
return "HIGH"
else:
return "OK"
def convert_value(v):
if hasattr(v, "item"):
return v.item()
return float(v) if isinstance(v, (np.floating, float)) else int(v) if isinstance(v, (np.integer, int)) else v
# ===== 閾値診断処理 =====
def diagnose_process_range(df, thresholds_df, process_name, datetime_str, window_minutes):
try:
target_time = pd.to_datetime(datetime_str)
except Exception:
return None, None, None, f"⚠ 入力した日時 {datetime_str} が無効です。", None
start_time = target_time - pd.Timedelta(minutes=window_minutes)
end_time = target_time
df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)]
if df_window.empty:
return None, None, None, "⚠ 指定した時間幅にデータが見つかりません。", None
proc_thresholds = thresholds_df[thresholds_df["ProcessNo_ProcessName"] == process_name]
if proc_thresholds.empty:
return None, None, None, f"⚠ プロセス {process_name} の閾値が設定されていません。", None
all_results = []
for _, row in df_window.iterrows():
for _, thr in proc_thresholds.iterrows():
col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"])
if col_tuple not in df.columns:
continue
value = row[col_tuple]
status = judge_status(value, thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH"))
all_results.append({
"ColumnID": thr["ColumnID"],
"ItemName": thr["ItemName"],
"判定": status,
"重要項目": bool(thr.get("Important", False)),
"時刻": str(row["timestamp"])
})
total = len(all_results)
status_counts = pd.Series([r["判定"] for r in all_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio = (status_counts / total * 100).round(1)
result_df_all = pd.DataFrame({
"状態": status_counts.index,
"件数": status_counts.values,
"割合(%)": status_ratio.values
})
important_results = [r for r in all_results if r["重要項目"]]
if important_results:
total_imp = len(important_results)
status_counts_imp = pd.Series([r["判定"] for r in important_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio_imp = (status_counts_imp / total_imp * 100).round(1)
result_df_imp = pd.DataFrame({
"状態": status_counts_imp.index,
"件数": status_counts_imp.values,
"割合(%)": status_ratio_imp.values
})
else:
result_df_imp = pd.DataFrame(columns=["状態", "件数", "割合(%)"])
status_ratio_imp = pd.Series(dtype=float)
result_per_item = []
for item in [r["ItemName"] for r in important_results]:
item_results = [r for r in important_results if r["ItemName"] == item]
if not item_results:
continue
total_item = len(item_results)
status_counts_item = pd.Series([r["判定"] for r in item_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio_item = (status_counts_item / total_item * 100).round(1)
for s, c, r in zip(status_counts_item.index, status_counts_item.values, status_ratio_item.values):
result_per_item.append({"ItemName": item, "状態": s, "件数": c, "割合(%)": r})
result_df_imp_items = pd.DataFrame(result_per_item)
summary = (
f"✅ {process_name} の診断完了({start_time}{end_time})\n"
+ "[全項目] " + " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio.items()]) + "\n"
+ "[重要項目全体] " + (
" / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio_imp.items()])
if not result_df_imp.empty else "対象データなし"
)
)
json_data = {
"集計結果": {
"全項目割合": {k: convert_value(v) for k, v in status_ratio.to_dict().items()},
"重要項目全体割合": {k: convert_value(v) for k, v in status_ratio_imp.to_dict().items()} if not result_df_imp.empty else {},
"重要項目ごと割合": [
{k: convert_value(v) for k, v in row.items()} for _, row in result_df_imp_items.iterrows()
]
}
}
result_json = json.dumps(json_data, ensure_ascii=False, indent=2)
return result_df_all, result_df_imp, result_df_imp_items, summary, result_json
# ===== 傾向検出処理 =====
def detect_trends_with_forecast(df, thresholds_df, process_name, datetime_str, window_minutes, forecast_minutes):
target_time = pd.to_datetime(datetime_str)
start_time = target_time - pd.Timedelta(minutes=window_minutes)
end_time = target_time
df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)]
if df_window.empty:
return None, "⚠ 指定時間幅にデータなし", None
interval = df_window["timestamp"].diff().median()
if pd.isna(interval):
return None, "⚠ サンプリング間隔を検出できません", None
interval_minutes = interval.total_seconds() / 60
proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) &
(thresholds_df["Important"] == True)]
if proc_thresholds.empty:
return None, f"⚠ プロセス {process_name} の重要項目なし", None
results = []
for _, thr in proc_thresholds.iterrows():
col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"])
if col_tuple not in df.columns:
continue
series = df_window[col_tuple].dropna()
if len(series) < 3:
continue
x = np.arange(len(series)).reshape(-1, 1)
y = series.values.reshape(-1, 1)
model = LinearRegression().fit(x, y)
slope = model.coef_[0][0]
last_val = series.iloc[-1]
n = len(series)
forecast_steps = int(forecast_minutes / interval_minutes)
forecast_index = n + forecast_steps
forecast_val = model.predict([[forecast_index]])[0][0]
forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes)
l, ll, h, hh = thr.get("L"), thr.get("LL"), thr.get("H"), thr.get("HH")
status = "安定"
if slope < 0 and pd.notna(ll):
if last_val > ll:
status = "LL接近下降傾向"
elif last_val <= ll:
status = "LL逸脱下降傾向"
if slope > 0 and pd.notna(hh):
if last_val < hh:
status = "HH接近上昇傾向"
elif last_val >= hh:
status = "HH逸脱上昇傾向"
forecast_status = "安定"
if pd.notna(ll) and forecast_val <= ll:
forecast_status = "LL逸脱予測"
elif pd.notna(hh) and forecast_val >= hh:
forecast_status = "HH逸脱予測"
results.append({
"ItemName": thr["ItemName"],
"傾向": status,
"傾き": round(slope, 4),
"最終値": round(float(last_val), 3) if pd.notna(last_val) else None,
"予測値": round(float(forecast_val), 3),
"予測時刻": str(forecast_time),
"予測傾向": forecast_status,
"サンプリング間隔(分)": interval_minutes,
"LL": ll, "L": l, "H": h, "HH": hh
})
result_df = pd.DataFrame(results)
result_json = json.dumps(results, ensure_ascii=False, indent=2)
return result_df, "✅ 傾向検出+未来予測完了", result_json
# ===== Gradio UI =====
with gr.Blocks(css=".gradio-container {overflow:auto !important}") as demo:
gr.Markdown("## 統合版アプリ (閾値診断 + 傾向検出)")
with gr.Row():
csv_input = gr.File(label="CSVファイルをアップロード", file_types=[".csv"], type="filepath")
excel_input = gr.File(label="Excel閾値ファイルをアップロード", file_types=[".xlsx"], type="filepath")
# ファイルを保持するステート
state_df = gr.State()
state_thresholds = gr.State()
def load_files(csv_file, excel_file):
try:
df = pd.read_csv(csv_file.name, header=[0,1,2])
timestamp_col = df.iloc[:,0]
df = df.drop(df.columns[0], axis=1)
df.insert(0,"timestamp", timestamp_col)
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
thresholds_df = pd.read_excel(excel_file.name)
thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE":True,"FALSE":False})
for col in ["LL","L","H","HH"]:
if col in thresholds_df.columns:
thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce")
return df, thresholds_df, "✅ ファイルを読み込みました"
except Exception as e:
return None, None, f"❌ 読み込み失敗: {e}"
load_btn = gr.Button("ファイルを読み込み")
load_status = gr.Textbox(label="ロード状態")
load_btn.click(load_files, inputs=[csv_input, excel_input], outputs=[state_df, state_thresholds, load_status])
with gr.Tabs():
with gr.Tab("閾値診断"):
process_name = gr.Textbox(label="プロセス名", value="E018-A012_除害RO")
datetime_str = gr.Textbox(label="診断基準日時", value="2025/8/1 1:05")
window_minutes = gr.Number(label="さかのぼる時間幅(分)", value=60)
run_btn = gr.Button("診断を実行")
result_df_all = gr.Dataframe(label="全項目の状態集計結果")
result_df_imp = gr.Dataframe(label="重要項目全体の状態集計結果")
result_df_imp_items = gr.Dataframe(label="重要項目ごとの状態集計結果")
summary_output = gr.Textbox(label="サマリー")
json_output = gr.Json(label="JSON集計結果")
run_btn.click(
diagnose_process_range,
inputs=[state_df, state_thresholds, process_name, datetime_str, window_minutes],
outputs=[result_df_all, result_df_imp, result_df_imp_items, summary_output, json_output]
)
with gr.Tab("傾向検出"):
process_name2 = gr.Textbox(label="プロセス名", value="E018-A012_除害RO")
datetime_str2 = gr.Textbox(label="基準日時", value="2025/8/1 1:05")
window_minutes2 = gr.Number(label="さかのぼる時間幅(分)", value=60)
forecast_minutes2 = gr.Number(label="未来予測時間幅(分)", value=60)
run_btn2 = gr.Button("傾向検出を実行")
result_df2 = gr.Dataframe(label="傾向+未来予測結果")
summary_output2 = gr.Textbox(label="サマリー")
json_output2 = gr.Json(label="JSON結果")
run_btn2.click(
detect_trends_with_forecast,
inputs=[state_df, state_thresholds, process_name2, datetime_str2, window_minutes2, forecast_minutes2],
outputs=[result_df2, summary_output2, json_output2]
)
if __name__ == "__main__":
use_mcp = os.getenv("USE_MCP","0") == "1"
if use_mcp:
demo.launch(mcp_server=True)
else:
demo.launch(server_name="0.0.0.0", share=False)