Ken-INOUE's picture
Refactor analyze_variability_json to streamline top 10 results extraction by consolidating DataFrame operations. Update UI button click handling to ensure consistent output across UI and MCP functionalities.
6e55dc4
# 統合版 Gradio アプリ (閾値診断 + 傾向検出 + 予兆解析 + 変動解析, Supabase固定ファイル)
import gradio as gr
import pandas as pd
import numpy as np
import json
from dotenv import load_dotenv
import os
from supabase import create_client, Client
from sklearn.linear_model import LinearRegression
# .envファイルを読み込む
load_dotenv()
# --- Supabase 設定 ---
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
SUPABASE_BUCKET = os.getenv("SUPABASE_BUCKET")
CSV_FILE = os.getenv("CSV_PATH")
EXCEL_FILE = os.getenv("EXCEL_PATH")
LAG_FILE = os.getenv("LAG_PATH")
TMP_DIR = "./tmp"
def _create_supabase_client() -> Client | None:
"""
Create Supabase client if credentials are available; return None otherwise.
"""
if not SUPABASE_URL or not SUPABASE_KEY:
return None
try:
return create_client(SUPABASE_URL, SUPABASE_KEY)
except Exception:
return None
supabase: Client | None = _create_supabase_client()
# --- 共通ユーティリティ ---
def convert_value(v) -> int | float | str:
"""
numpy型を Python 標準の型に変換する。
Args:
v: numpyの値またはPythonの値
Returns:
int | float | str: Python標準型に変換した値
"""
if hasattr(v, "item"):
return v.item()
if isinstance(v, (np.integer, int)):
return int(v)
if isinstance(v, (np.floating, float)):
return float(v)
return v
def normalize(s: str) -> str:
"""
文字列を正規化(全角スペース・改行削除)する。
Args:
s (str): 入力文字列
Returns:
str: 正規化後の文字列
"""
return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip()
def col_to_str(col) -> str:
"""
Convert multi-index column tuple to a single string identifier.
"""
return "_".join([str(c) for c in col if c]) if isinstance(col, tuple) else str(col)
def find_matching_column(df: pd.DataFrame, col_id: str, item_name: str, process_name: str) -> str | None:
"""
DataFrame内で指定条件に一致する列を探索する。
Args:
df (pd.DataFrame): データフレーム
col_id (str): カラムID
item_name (str): 項目名
process_name (str): プロセス名
Returns:
str | None: 一致するカラム名、見つからなければ None
"""
norm_item = normalize(item_name)
candidates = [
c for c in df.columns
if isinstance(c, str)
and col_id in c
and process_name in c
and norm_item in normalize(c)
]
return candidates[0] if candidates else None
# --- グローバル変数 ---
DF = None
THRESHOLDS_DF = None
LAG_MATRIX = None
INITIAL_CHOICES = []
# --- Supabaseからファイルをロード ----
def load_from_supabase() -> str:
"""
Supabase Storage から CSV・Excel・ラグファイルを読み込み、
グローバル変数 (DF, THRESHOLDS_DF, LAG_MATRIX, INITIAL_CHOICES) を更新する。
Returns:
str: 読み込み結果のメッセージ
"""
global DF, THRESHOLDS_DF, LAG_MATRIX, INITIAL_CHOICES
try:
if supabase is None:
return "❌ Supabase クライアント未初期化: 環境変数 SUPABASE_URL/SUPABASE_KEY を確認してください"
if not SUPABASE_BUCKET:
return "❌ SUPABASE_BUCKET が未設定です"
# --- CSV ---
csv_path = CSV_FILE
if not csv_path:
return "❌ CSV_PATH が未設定です"
os.makedirs(TMP_DIR, exist_ok=True)
csv_bytes = supabase.storage.from_(SUPABASE_BUCKET).download(csv_path)
with open(os.path.join(TMP_DIR, "data.csv"), "wb") as f:
f.write(csv_bytes)
DF = pd.read_csv(os.path.join(TMP_DIR, "data.csv"), header=[0, 1, 2])
timestamp_col = pd.to_datetime(DF.iloc[:, 0], errors="coerce")
DF = DF.drop(DF.columns[0], axis=1)
DF.insert(0, "timestamp", timestamp_col)
DF.columns = [
"timestamp" if (isinstance(c, str) and c == "timestamp") else col_to_str(c)
for c in DF.columns
]
# --- Excel 閾値 ---
excel_path = EXCEL_FILE
if not excel_path:
return "❌ EXCEL_PATH が未設定です"
thr_bytes = supabase.storage.from_(SUPABASE_BUCKET).download(excel_path)
with open(os.path.join(TMP_DIR, "thresholds.xlsx"), "wb") as f:
f.write(thr_bytes)
THRESHOLDS_DF = pd.read_excel(os.path.join(TMP_DIR, "thresholds.xlsx"))
THRESHOLDS_DF["Important"] = (
THRESHOLDS_DF["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False})
)
for col in ["LL", "L", "H", "HH"]:
if col in THRESHOLDS_DF.columns:
THRESHOLDS_DF[col] = pd.to_numeric(THRESHOLDS_DF[col], errors="coerce")
# --- ラグファイル ---
lag_path = LAG_FILE
if not lag_path:
return "❌ LAG_PATH が未設定です"
lag_bytes = supabase.storage.from_(SUPABASE_BUCKET).download(lag_path)
with open(os.path.join(TMP_DIR, "lag.xlsx"), "wb") as f:
f.write(lag_bytes)
LAG_MATRIX = pd.read_excel(os.path.join(TMP_DIR, "lag.xlsx"), index_col=0)
print(50*'=')
print('INITIAL_CHOICES')
print(50*'-')
# INITIAL_CHOICES を更新
if THRESHOLDS_DF is not None and "ProcessNo_ProcessName" in THRESHOLDS_DF.columns:
INITIAL_CHOICES = sorted(THRESHOLDS_DF["ProcessNo_ProcessName"].dropna().unique().tolist())
else:
INITIAL_CHOICES = []
print('INITIAL_CHOICES: ', INITIAL_CHOICES)
print(50*'=')
return "✅ Supabase からファイルを読み込み成功"
except Exception as e:
return f"❌ Supabase からの読み込み失敗: {e}"
# --- Tab1: 閾値診断 ---
def judge_status(value: float, ll: float, l: float, h: float, hh: float) -> str:
"""
測定値を閾値と比較してステータスを判定する。
Returns:
str: 判定結果 ("LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH")
"""
if pd.notna(ll) and value < ll:
return "LOW-LOW"
elif pd.notna(l) and value < l:
return "LOW"
elif pd.notna(hh) and value > hh:
return "HIGH-HIGH"
elif pd.notna(h) and value > h:
return "HIGH"
else:
return "OK"
def parse_datetime_safe(s: str) -> pd.Timestamp | None:
"""
/ や - 区切り、ゼロ埋めの有無などを吸収して datetime に変換する。
"""
try:
return pd.to_datetime(s, errors="coerce")
except Exception:
return None
def _diagnose_process_range_core(process_name: str, datetime_str: str, window_minutes: int
) -> tuple[pd.DataFrame | None, pd.DataFrame | None,
pd.DataFrame | None, str, str]:
"""
指定プロセスに対して閾値診断を行い、テーブル・サマリー・JSONを生成するコア処理。
"""
global DF, THRESHOLDS_DF
if DF is None or THRESHOLDS_DF is None:
return None, None, None, "⚠ ファイル未読み込み", None
# ✅ 日時を安全に解釈
target_time = parse_datetime_safe(datetime_str)
if target_time is None or pd.isna(target_time):
return None, None, None, f"⚠ 入力した日時 {datetime_str} が無効です。", None
start_time = target_time - pd.Timedelta(minutes=window_minutes)
end_time = target_time
df_window = DF[(DF["timestamp"] >= start_time) & (DF["timestamp"] <= end_time)]
if df_window.empty:
return None, None, None, "⚠ 指定した時間幅にデータが見つかりません。", None
# ✅ プロセス名を normalize して突き合わせる
proc_thresholds = THRESHOLDS_DF[
THRESHOLDS_DF["ProcessNo_ProcessName"].apply(normalize) == normalize(process_name)
]
if proc_thresholds.empty:
return None, None, None, f"⚠ プロセス {process_name} の閾値が設定されていません。", None
all_results = []
for _, row in df_window.iterrows():
for _, thr in proc_thresholds.iterrows():
col_name = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}"
if col_name not in DF.columns:
continue
value = row[col_name]
status = judge_status(value, thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH"))
all_results.append({
"ColumnID": thr["ColumnID"], "ItemName": thr["ItemName"], "判定": status,
"重要項目": bool(thr.get("Important", False)), "時刻": str(row["timestamp"])
})
total = len(all_results)
status_counts = pd.Series([r["判定"] for r in all_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio = (status_counts / total * 100).round(1)
result_df_all = pd.DataFrame({"状態": status_counts.index, "件数": status_counts.values, "割合(%)": status_ratio.values})
important_results = [r for r in all_results if r["重要項目"]]
if important_results:
total_imp = len(important_results)
status_counts_imp = pd.Series([r["判定"] for r in important_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio_imp = (status_counts_imp / total_imp * 100).round(1)
result_df_imp = pd.DataFrame({"状態": status_counts_imp.index, "件数": status_counts_imp.values, "割合(%)": status_ratio_imp.values})
else:
result_df_imp = pd.DataFrame(columns=["状態", "件数", "割合(%)"])
status_ratio_imp = pd.Series(dtype=float)
result_per_item = []
for item in set([r["ItemName"] for r in important_results]):
item_results = [r for r in important_results if r["ItemName"] == item]
total_item = len(item_results)
status_counts_item = pd.Series([r["判定"] for r in item_results]).value_counts().reindex(
["LOW-LOW", "LOW", "OK", "HIGH", "HIGH-HIGH"], fill_value=0
)
status_ratio_item = (status_counts_item / total_item * 100).round(1)
for s, c, r in zip(status_counts_item.index, status_counts_item.values, status_ratio_item.values):
result_per_item.append({"ItemName": item, "状態": s, "件数": int(c), "割合(%)": float(r)})
result_df_imp_items = pd.DataFrame(result_per_item)
summary = (
f"✅ {process_name} の診断完了({start_time}{end_time})\n"
+ "[全項目] " + " / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio.items()]) + "\n"
+ "[重要項目全体] " + (" / ".join([f"{s}:{r:.1f}%" for s, r in status_ratio_imp.items()]) if not result_df_imp.empty else "対象データなし")
)
json_data = {
"集計結果": {
"全項目割合": {k: float(v) for k, v in status_ratio.to_dict().items()},
"重要項目全体割合": {k: float(v) for k, v in status_ratio_imp.to_dict().items()} if not result_df_imp.empty else {},
"重要項目ごと割合": [dict(row) for _, row in result_df_imp_items.iterrows()]
}
}
result_json = json.dumps(json_data, ensure_ascii=False, indent=2)
return result_df_all, result_df_imp, result_df_imp_items, summary, result_json
def diagnose_process_range_ui(
process_name: str,
datetime_str: str,
window_minutes: int
) -> tuple[pd.DataFrame | None, pd.DataFrame | None, pd.DataFrame | None, str]:
"""
UI 用: Dropdown で選択されたプロセス名を受け取り、
閾値診断を実行してテーブルとサマリーを返す。
Args:
process_name (str): UI の Dropdown から選択されたプロセス名
datetime_str (str): 基準日時(例: "2024-06-01 12:00")
window_minutes (int): 遡る時間幅(分)
Returns:
tuple:
result_df_all (pd.DataFrame | None): 全項目の状態集計
result_df_imp (pd.DataFrame | None): 重要項目全体の状態集計
result_df_imp_items (pd.DataFrame | None): 重要項目ごとの状態集計
summary (str): サマリーテキスト
"""
result_df_all, result_df_imp, result_df_imp_items, summary, _ = _diagnose_process_range_core(
process_name, datetime_str, window_minutes
)
return result_df_all, result_df_imp, result_df_imp_items, summary
def diagnose_process_range_json(
process_name: str,
datetime_str: str,
window_minutes: int
) -> dict:
"""
指定されたプロセス名、基準日時、さかのぼる時間幅(分)を受け取り、閾値診断を実行し、その結果をJSON形式(dict)で返す関数。
Args:
process_name (str): 診断対象となるプロセス名。UIのドロップダウン等で選択された値を想定。
datetime_str (str): 診断の基準となる日時(例: "2024-06-01 12:00")。この日時を基準にデータを抽出する。
window_minutes (int): 基準日時からさかのぼる時間幅(分単位)。この範囲のデータで診断を行う。
Returns:
dict: 閾値診断の結果を含むJSONオブジェクト。
- "meta": 入力パラメータ情報(process_name, datetime_str, window_minutes)
- "tab1":
- "summary": 診断のサマリーテキスト(例: 正常/異常の割合や診断完了メッセージ等)
- "results":
- "全項目集計": 全項目の状態集計(リスト形式)
- "重要項目全体集計": 重要項目全体の状態集計(リスト形式)
- "重要項目ごと集計": 重要項目ごとの状態集計(リスト形式)
例外発生時は、"summary"にエラーメッセージ、各集計は空リストとなる。
注意:
- 内部で _diagnose_process_range_core を呼び出し、DataFrameやサマリー、JSON文字列を取得する。
- 取得したJSON文字列が有効な場合はそれをパースして返す。無効な場合や例外時はDataFrameをdict化して返す。
- MCP等のAPI連携や外部システム連携用途を想定。
"""
print(50*'=')
print('in diagnose_process_range_json()')
print('process_name: ', process_name)
print('datetime_str: ', datetime_str)
print('window_minutes: ', window_minutes)
print(50*'=')
try:
result_df_all, result_df_imp, result_df_imp_items, summary, result_json = _diagnose_process_range_core(
process_name, datetime_str, window_minutes
)
# --- case1: core が JSON を返している場合 ---
if result_json and isinstance(result_json, str):
try:
parsed = json.loads(result_json)
return parsed
except Exception:
# JSON が壊れている場合 → fallback
pass
# --- case2: fallback (DataFrameをdictに変換) ---
def df_to_records(df):
return df.to_dict(orient="records") if df is not None else []
return {
"meta": {
"process_name": process_name,
"datetime_str": datetime_str,
"window_minutes": window_minutes,
},
"tab1": {
"summary": summary if summary else "診断結果は得られませんでした。",
"results": {
"全項目集計": df_to_records(result_df_all),
"重要項目全体集計": df_to_records(result_df_imp),
"重要項目ごと集計": df_to_records(result_df_imp_items),
},
},
}
except Exception as e:
# --- case3: 例外発生時 ---
return {
"meta": {
"process_name": process_name,
"datetime_str": datetime_str,
"window_minutes": window_minutes,
},
"tab1": {
"summary": f"診断エラー: {str(e)}",
"results": {
"全項目集計": [],
"重要項目全体集計": [],
"重要項目ごと集計": [],
},
},
}
# --- Tab2: 傾向検出 ---
def detect_trends_with_forecast(process_name: str, datetime_str: str, window_minutes: int,
forecast_minutes: int, downsample_factor: int
) -> tuple[pd.DataFrame | None, str, str]:
"""
線形回帰を用いて傾向検出と未来予測を行う。
Args:
process_name (str): プロセス名
datetime_str (str): 基準日時
window_minutes (int): 遡る時間幅
forecast_minutes (int): 予測時間幅
downsample_factor (int): サンプリング粗化倍率
Returns:
tuple: (予測結果DataFrame, サマリーテキスト, JSON文字列)
"""
global DF, THRESHOLDS_DF
if DF is None or THRESHOLDS_DF is None:
return None, "⚠ ファイル未読み込み", None
target_time = pd.to_datetime(datetime_str)
start_time = target_time - pd.Timedelta(minutes=window_minutes)
df_window = DF[(DF["timestamp"] >= start_time) & (DF["timestamp"] <= target_time)]
if df_window.empty:
return None, "⚠ データなし", None
interval = df_window["timestamp"].diff().median()
if pd.isna(interval):
return None, "⚠ サンプリング間隔検出失敗", None
interval_minutes = interval.total_seconds() / 60
df_window = df_window.iloc[::int(downsample_factor), :].reset_index(drop=True)
effective_interval = interval_minutes * int(downsample_factor)
proc_thresholds = THRESHOLDS_DF[
(THRESHOLDS_DF["ProcessNo_ProcessName"].apply(normalize) == normalize(process_name)) &
(THRESHOLDS_DF["Important"] == True)
]
if proc_thresholds.empty:
return None, f"⚠ {process_name} の重要項目なし", None
results = []
for _, thr in proc_thresholds.iterrows():
col_tuple = f"{thr['ColumnID']}_{thr['ItemName']}_{thr['ProcessNo_ProcessName']}"
if col_tuple not in DF.columns:
continue
series = df_window[col_tuple].dropna()
if len(series) < 3:
continue
x = np.arange(len(series)).reshape(-1, 1)
y = series.values.reshape(-1, 1)
model = LinearRegression().fit(x, y)
slope = model.coef_[0][0]
last_val = series.iloc[-1]
forecast_steps = int(forecast_minutes / effective_interval)
forecast_val = model.predict([[len(series) + forecast_steps]])[0][0]
forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes)
risk = "安定"
if pd.notna(thr.get("LL")) and forecast_val <= thr["LL"]:
risk = "LL逸脱予測"
elif pd.notna(thr.get("HH")) and forecast_val >= thr["HH"]:
risk = "HH逸脱予測"
results.append({
"ItemName": thr["ItemName"], "傾き": round(float(slope), 4),
"最終値": round(float(last_val), 3), "予測値": round(float(forecast_val), 3),
"予測時刻": str(forecast_time), "予測リスク": risk, "粗化間隔(分)": round(effective_interval, 2)
})
result_df = pd.DataFrame(results)
result_json = json.dumps(results, ensure_ascii=False, indent=2)
return result_df, "✅ 傾向検出完了", result_json
# --- Tab2: 傾向検出 JSON 版 ---
def detect_trends_with_forecast_json(
process_name: str,
datetime_str: str,
window_minutes: int,
forecast_minutes: int,
downsample_factor: int
) -> dict:
"""
MCP 用: 傾向検出を実行して JSON(dict) を返す
"""
result_df, summary, result_json = detect_trends_with_forecast(
process_name, datetime_str, window_minutes, forecast_minutes, downsample_factor
)
# JSON文字列を優先的にパース
if result_json and isinstance(result_json, str):
try:
parsed = json.loads(result_json)
return {
"meta": {
"process_name": process_name,
"datetime_str": datetime_str,
"window_minutes": window_minutes,
"forecast_minutes": forecast_minutes,
},
"tab2": {
"summary": summary,
"results": parsed
}
}
except Exception:
pass
# fallback
return {
"meta": {
"process_name": process_name,
"datetime_str": datetime_str,
"window_minutes": window_minutes,
"forecast_minutes": forecast_minutes,
},
"tab2": {
"summary": summary if summary else "診断結果なし",
"results": result_df.to_dict(orient="records") if result_df is not None else [],
},
}
# --- Tab3: 予兆解析 ---
def forecast_process_with_lag(process_name: str, datetime_str: str, forecast_minutes: int,
downsample_factor: int = 3
) -> tuple[pd.DataFrame | None, str, str]:
"""
ラグ行列を利用して予兆解析を行う。
Args:
process_name (str): プロセス名
datetime_str (str): 基準日時
forecast_minutes (int): 予測時間幅
downsample_factor (int): サンプリング粗化倍率
Returns:
tuple: (解析結果DataFrame, サマリーテキスト, JSON文字列)
"""
global DF, THRESHOLDS_DF, LAG_MATRIX
if DF is None or THRESHOLDS_DF is None or LAG_MATRIX is None:
return None, "⚠ ファイル未読み込み", None
target_time = pd.to_datetime(datetime_str)
forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes)
proc_thresholds = THRESHOLDS_DF[
(THRESHOLDS_DF["ProcessNo_ProcessName"].apply(normalize) == normalize(process_name)) &
(THRESHOLDS_DF["Important"] == True)
]
if proc_thresholds.empty:
return None, f"⚠ {process_name} の重要項目なし", None
# ✅ normalize対応のインデックス探索
matched_idx = None
for idx in LAG_MATRIX.index:
if normalize(idx) == normalize(process_name):
matched_idx = idx
break
if matched_idx is None:
return None, f"⚠ {process_name} のラグ行なし", None
lag_row = LAG_MATRIX.loc[matched_idx].dropna()
lag_row = lag_row[lag_row > 0] # 正のラグのみ
if lag_row.empty:
return None, f"⚠ {process_name} に正のラグなし", None
results = []
for _, thr in proc_thresholds.iterrows():
y_col = find_matching_column(DF, thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"])
if y_col is None:
continue
# 学習データ(直近24h)
df_window = DF[(DF["timestamp"] >= target_time - pd.Timedelta(hours=24)) &
(DF["timestamp"] <= target_time)].copy()
if df_window.empty:
continue
base_df = df_window[["timestamp", y_col]].rename(columns={y_col: "y"})
# === 粗化処理 ===
dt = df_window["timestamp"].diff().median()
base_min = max(int(dt.total_seconds() // 60), 1) if pd.notna(dt) else 1
new_interval = max(base_min * int(downsample_factor), 1)
df_down = (base_df.set_index("timestamp")
.resample(f"{new_interval}min").mean()
.dropna()
.reset_index())
merged_df = df_down.copy()
for up_proc, lag_min in lag_row.items():
up_cols = [c for c in DF.columns if isinstance(c, str) and up_proc in c]
for x_col in up_cols:
shifted = df_window.loc[:, ["timestamp", x_col]].copy()
shifted["timestamp"] = shifted["timestamp"] + pd.Timedelta(minutes=lag_min)
shifted = shifted.rename(columns={x_col: f"{x_col}_lag{lag_min}"})
merged_df = pd.merge_asof(
merged_df.sort_values("timestamp"),
shifted.sort_values("timestamp"),
on="timestamp",
direction="nearest"
)
X_all = merged_df.drop(columns=["timestamp", "y"], errors="ignore").values
Y_all = merged_df["y"].values
if X_all.shape[1] == 0 or len(Y_all) < 5:
continue
model = LinearRegression().fit(X_all, Y_all)
# 未来予測
X_pred = []
for up_proc, lag_min in lag_row.items():
up_cols = [c for c in DF.columns if isinstance(c, str) and up_proc in c]
for x_col in up_cols:
ref_time = forecast_time - pd.Timedelta(minutes=lag_min)
idx = (DF["timestamp"] - ref_time).abs().idxmin()
X_pred.append(DF.loc[idx, x_col])
if not X_pred:
continue
pred_val = model.predict([X_pred])[0]
# 閾値リスク判定
ll, l, h, hh = thr.get("LL"), thr.get("L"), thr.get("H"), thr.get("HH")
risk = "OK"
if pd.notna(ll) and pred_val <= ll:
risk = "LOW-LOW"
elif pd.notna(l) and pred_val <= l:
risk = "LOW"
elif pd.notna(hh) and pred_val >= hh:
risk = "HIGH-HIGH"
elif pd.notna(h) and pred_val >= h:
risk = "HIGH"
results.append({
"ItemName": thr["ItemName"],
"予測値": round(float(pred_val), 3),
"予測時刻": str(forecast_time),
"予測リスク": risk,
"粗化間隔(分)": new_interval
})
result_df = pd.DataFrame(results)
result_json = json.dumps(results, ensure_ascii=False, indent=2)
return result_df, f"✅ {process_name} の予兆解析完了", result_json
# --- Tab3: 予兆解析 JSON 版 ---
def forecast_process_with_lag_json(
process_name: str,
datetime_str: str,
forecast_minutes: int,
downsample_factor: int = 3
) -> dict:
"""
MCP 用: 予兆解析を実行して JSON(dict) を返す
"""
result_df, summary, result_json = forecast_process_with_lag(
process_name, datetime_str, forecast_minutes, downsample_factor
)
if result_json and isinstance(result_json, str):
try:
return json.loads(result_json)
except Exception:
pass
return {
"meta": {
"process_name": process_name,
"datetime_str": datetime_str,
"forecast_minutes": forecast_minutes,
},
"tab3": {
"summary": summary if summary else "診断結果なし",
"results": result_df.to_dict(orient="records") if result_df is not None else [],
},
}
# --- Tab4: 変動解析 ---
def robust_mad(x: pd.Series) -> float:
"""
Median Absolute Deviation (MAD) を計算する。
Args:
x (pd.Series): 数値系列
Returns:
float: MAD値
"""
if len(x) == 0:
return np.nan
med = np.median(x)
mad = np.median(np.abs(x - med))
return 1.4826 * mad
def analyze_variability(datetime_str: str, window_minutes: int,
cv_threshold_pct: float = 10.0,
jump_pct_threshold: float = 10.0,
mad_sigma: float = 3.0
) -> tuple[pd.DataFrame | None, str, str]:
"""
変動解析を行い、不安定項目を検出する。
Args:
datetime_str (str): 基準日時
window_minutes (int): 遡る時間幅
cv_threshold_pct (float): CVしきい値 (%)
jump_pct_threshold (float): ジャンプ率しきい値 (%)
mad_sigma (float): MAD倍率
Returns:
tuple: (解析結果DataFrame, サマリーテキスト, JSON文字列)
"""
global DF
if DF is None:
return None, "⚠ ファイル未読み込み", None
target_time = pd.to_datetime(datetime_str)
start_time = target_time - pd.Timedelta(minutes=window_minutes)
end_time = target_time
dfw = DF[(DF["timestamp"] >= start_time) & (DF["timestamp"] <= end_time)].copy()
if dfw.empty:
return None, f"⚠ 指定時間幅({start_time}{end_time})にデータが見つかりません。", None
data_cols = [c for c in dfw.columns if c != "timestamp" and pd.api.types.is_numeric_dtype(dfw[c])]
results = []
unstable_count = 0
for col in data_cols:
s = dfw[col].dropna()
n = len(s)
if n < 3:
continue
mean = float(np.mean(s))
std = float(np.std(s, ddof=1)) if n >= 2 else 0.0
cv_pct = abs(std / mean) * 100.0 if mean != 0 else np.nan
diffs = s.diff().dropna()
mad_scale = robust_mad(diffs)
ref = max(1e-9, abs(float(np.median(s))))
rel_jump = diffs.abs() / ref * 100.0
abs_cond = diffs.abs() > (mad_sigma * mad_scale if not np.isnan(mad_scale) and mad_scale > 0 else np.inf)
pct_cond = rel_jump >= jump_pct_threshold
spike_mask = abs_cond | pct_cond
spike_count = int(spike_mask.sum())
spike_up_count = int((diffs[spike_mask] > 0).sum())
spike_down_count = spike_count - spike_up_count
max_step = float(diffs.abs().max()) if len(diffs) else np.nan
last_val, first_val = float(s.iloc[-1]), float(s.iloc[0])
unstable = (not np.isnan(cv_pct) and cv_pct >= cv_threshold_pct) or (spike_count > 0)
if unstable:
unstable_count += 1
results.append({
"項目名": col,
"サンプル数": n,
"平均": mean,
"標準偏差": std,
"CV(%)": None if np.isnan(cv_pct) else round(cv_pct, 3),
"スパイク数": spike_count,
"スパイク上昇数": spike_up_count,
"スパイク下降数": spike_down_count,
"最大|ステップ|": None if np.isnan(max_step) else round(max_step, 6),
"最初の値": first_val,
"最後の値": last_val,
"不安定判定": bool(unstable)
})
result_df = pd.DataFrame(results)
if not result_df.empty:
result_df = result_df.sort_values(
by=["不安定判定", "CV(%)", "スパイク数"],
ascending=[False, False, False],
na_position="last"
).reset_index(drop=True)
summary = (
f"✅ 変動解析完了({start_time}{end_time})\n"
f"- 対象項目数: {len(results)}\n"
f"- 不安定と判定: {unstable_count} 項目(CV≥{cv_threshold_pct:.1f}% または スパイクあり)\n"
f"- スパイク条件: |diff| > {mad_sigma:.1f}×MAD または 相対変化 ≥ {jump_pct_threshold:.1f}%"
)
# NaN / inf を安全に処理して JSON 化
safe_df = result_df.replace([np.inf, -np.inf], np.nan).fillna("null")
result_json = json.dumps(safe_df.to_dict(orient="records"), ensure_ascii=False, indent=2)
return result_df, summary, result_json
# --- Tab4: 変動解析 JSON 版 ---
def analyze_variability_json(
datetime_str: str,
window_minutes: int,
cv_threshold_pct: float = 10.0,
jump_pct_threshold: float = 10.0,
mad_sigma: float = 3.0
) -> dict:
"""
MCP 用: 変動解析を実行して JSON(dict) を返す。
返すのは 項目名, 平均, CV(%), スパイク数, 不安定判定 のみ(上位10件)。
"""
result_df, summary, _ = analyze_variability(
datetime_str, window_minutes, cv_threshold_pct, jump_pct_threshold, mad_sigma
)
if result_df is not None and not result_df.empty:
# 必要なカラムだけに絞り込んで上位10件
top10 = (
result_df.dropna(subset=["CV(%)"])
.sort_values(by="CV(%)", ascending=False)
.head(10)[["項目名", "平均", "CV(%)", "スパイク数", "不安定判定"]]
)
results = top10.to_dict(orient="records")
else:
results = []
return result_df, summary, {
"meta": {
"datetime_str": datetime_str,
"window_minutes": window_minutes,
"cv_threshold_pct": cv_threshold_pct,
"jump_pct_threshold": jump_pct_threshold,
"mad_sigma": mad_sigma,
},
"tab4": {
"summary": summary if summary else "診断結果なし",
"top10_by_cv": results,
},
}
# UI初期化前にload_from_supabaseを呼び出し
load_from_supabase()
# --- Gradio UI ---
with gr.Blocks(css=".gradio-container {overflow: auto !important;}") as demo:
gr.Markdown("## 統合トレンド解析アプリ (MCP対応, Supabase固定ファイル)")
# --- プロセス入力欄 ---
process_selector = gr.Dropdown(
label="プロセス名 (UI + MCP)",
choices=INITIAL_CHOICES, # ← グローバル変数を参照
allow_custom_value=True,
interactive=True
)
# 共通入力欄
with gr.Row():
datetime_str_global = gr.DateTime(label="診断基準日時", value='2025-08-02 12:00:00', type='string')
window_minutes_global = gr.Number(label="さかのぼる時間幅(分)", value=60)
forecast_minutes_global = gr.Number(label="未来予測時間幅(分)", value=60)
# Tabs
with gr.Tabs():
with gr.Tab("閾値診断"):
run_btn1 = gr.Button("診断実行")
result_df_all = gr.Dataframe(label="全項目の状態集計")
result_df_imp = gr.Dataframe(label="重要項目全体の状態集計")
result_df_imp_items = gr.Dataframe(label="重要項目ごとの状態集計")
summary_output = gr.Textbox(label="サマリー")
json_output = gr.Json(label="JSON集計結果")
# ✅ UI用
run_btn1.click(
diagnose_process_range_ui,
inputs=[process_selector, datetime_str_global, window_minutes_global],
outputs=[result_df_all, result_df_imp, result_df_imp_items, summary_output]
)
# ✅ MCP用 (UIへの出力は不要にする)
run_btn1.click(
diagnose_process_range_json,
inputs=[process_selector, datetime_str_global, window_minutes_global],
outputs=[json_output]
)
with gr.Tab("傾向検出"):
downsample_factor2 = gr.Slider(label="粗化倍率", minimum=1, maximum=10, step=1, value=3)
run_btn2 = gr.Button("傾向検出実行")
result_df2 = gr.Dataframe(label="傾向+予測結果")
summary_output2 = gr.Textbox(label="サマリー")
json_output2 = gr.Json(label="JSON結果")
# ✅ UI用
run_btn2.click(
detect_trends_with_forecast,
inputs=[process_selector, datetime_str_global, window_minutes_global, forecast_minutes_global, downsample_factor2],
outputs=[result_df2, summary_output2, json_output2]
)
# ✅ MCP用
run_btn2.click(
detect_trends_with_forecast_json,
inputs=[process_selector, datetime_str_global, window_minutes_global, forecast_minutes_global, downsample_factor2],
outputs=[json_output2]
)
with gr.Tab("予兆解析"):
downsample_factor3 = gr.Slider(1, 10, value=3, step=1, label="粗化倍率(サンプリング間引き)")
run_btn3 = gr.Button("予兆解析実行")
result_df3 = gr.Dataframe(label="予兆解析結果")
summary_output3 = gr.Textbox(label="サマリー")
json_output3 = gr.Json(label="JSON結果")
# ✅ UI用
run_btn3.click(
forecast_process_with_lag,
inputs=[process_selector, datetime_str_global, forecast_minutes_global, downsample_factor3],
outputs=[result_df3, summary_output3, json_output3]
)
# ✅ MCP用
run_btn3.click(
forecast_process_with_lag_json,
inputs=[process_selector, datetime_str_global, forecast_minutes_global, downsample_factor3],
outputs=[json_output3]
)
with gr.Tab("変動解析"):
cv_threshold4 = gr.Number(label="CVしきい値(%)", value=10.0)
jump_threshold4 = gr.Number(label="ジャンプ率しきい値(%)", value=10.0)
mad_sigma4 = gr.Number(label="MAD倍率", value=3.0)
run_btn4 = gr.Button("変動解析実行")
result_df4 = gr.Dataframe(label="変動解析結果")
summary_output4 = gr.Textbox(label="サマリー")
json_output4 = gr.Json(label="JSON結果")
# ✅ UI + MCP 共通
run_btn4.click(
analyze_variability_json,
inputs=[datetime_str_global, window_minutes_global, cv_threshold4, jump_threshold4, mad_sigma4],
outputs=[result_df4, summary_output4, json_output4]
)
if __name__ == "__main__":
demo.launch(mcp_server=True)