# 変動解析アプリ(単独 Gradio 版・粗化なし) import gradio as gr import pandas as pd import numpy as np import json import os import time from typing import Dict, Optional # ---------- ユーティリティ ---------- def _np_to_py(x): if hasattr(x, "item"): try: return x.item() except Exception: pass if isinstance(x, (np.integer,)): return int(x) if isinstance(x, (np.floating,)): return float(x) return x def robust_mad(x: pd.Series) -> float: """差分系列のロバストなスケール推定量(1.4826×MAD)。""" if len(x) == 0: return np.nan med = np.median(x) mad = np.median(np.abs(x - med)) return 1.4826 * mad def load_thresholds(excel_path: Optional[str]) -> Dict[tuple, bool]: """閾値Excelから Important フラグを辞書に。""" if not excel_path: return {} try: thresholds_df = pd.read_excel(excel_path) if "Important" in thresholds_df.columns: thresholds_df["Important"] = ( thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) ) else: thresholds_df["Important"] = False need = {"ColumnID", "ItemName", "ProcessNo_ProcessName", "Important"} if not need.issubset(set(thresholds_df.columns)): return {} return { (row["ColumnID"], row["ItemName"], row["ProcessNo_ProcessName"]): bool(row["Important"]) for _, row in thresholds_df.iterrows() } except Exception: return {} # ---------- 変動解析ロジック ---------- def analyze_variability_core( df: pd.DataFrame, important_lookup: Dict[tuple, bool], datetime_str: str, window_minutes: int, cv_threshold_pct: float = 10.0, jump_pct_threshold: float = 10.0, mad_sigma: float = 3.0, ): target_time = pd.to_datetime(datetime_str) start_time = target_time - pd.Timedelta(minutes=window_minutes) end_time = target_time dfw = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)].copy() if dfw.empty: return None, f"⚠ 指定時間幅({start_time}~{end_time})にデータが見つかりません。", None, None data_cols = [ c for c in dfw.columns if c != "timestamp" and pd.api.types.is_numeric_dtype(dfw[c]) ] results = [] unstable_count = 0 for col in data_cols: s = dfw[col].dropna() n = len(s) if n < 3: continue mean = float(np.mean(s)) std = float(np.std(s, ddof=1)) if n >= 2 else 0.0 cv_pct = np.nan if mean == 0 else abs(std / mean) * 100.0 diffs = s.diff().dropna() mad_scale = robust_mad(diffs) ref = max(1e-9, abs(float(np.median(s)))) rel_jump = diffs.abs() / ref * 100.0 abs_thr = (mad_sigma * mad_scale) if (not np.isnan(mad_scale) and mad_scale > 0) else np.inf abs_cond = diffs.abs() > abs_thr pct_cond = rel_jump >= jump_pct_threshold spike_mask = abs_cond | pct_cond spike_count = int(spike_mask.sum()) spike_up_count = int((diffs[spike_mask] > 0).sum()) spike_down_count = spike_count - spike_up_count max_step = float(diffs.abs().max()) if len(diffs) else np.nan last_val = float(s.iloc[-1]) first_val = float(s.iloc[0]) important = False if isinstance(col, tuple) and len(col) == 3: important = important_lookup.get(col, False) unstable = (not np.isnan(cv_pct) and cv_pct >= cv_threshold_pct) or (spike_count > 0) if unstable: unstable_count += 1 colid, itemname, proc = (col if isinstance(col, tuple) else ("", str(col), "")) results.append({ "ColumnID": colid, "ItemName": itemname, "Process": proc, "サンプル数": n, "平均": _np_to_py(round(mean, 6)), "標準偏差": _np_to_py(round(std, 6)), "CV(%)": None if np.isnan(cv_pct) else float(round(cv_pct, 3)), "スパイク数": spike_count, "スパイク上昇数": spike_up_count, "スパイク下降数": spike_down_count, "最大|ステップ|": None if np.isnan(max_step) else float(round(max_step, 6)), "最初の値": _np_to_py(round(first_val, 6)), "最後の値": _np_to_py(round(last_val, 6)), "重要項目": bool(important), "不安定判定": bool(unstable), }) result_df = pd.DataFrame(results) if not result_df.empty: result_df = result_df.sort_values( by=["不安定判定", "CV(%)", "スパイク数"], ascending=[False, False, False], na_position="last" ).reset_index(drop=True) total_cols = len(results) summary = ( f"✅ 変動解析完了({start_time} ~ {end_time})\n" f"- 対象項目数: {total_cols}\n" f"- 不安定と判定: {unstable_count} 項目(CV≥{cv_threshold_pct:.1f}% または スパイクあり)\n" f"- スパイク条件: |diff| > {mad_sigma:.1f}×MAD または 1ステップ相対変化 ≥ {jump_pct_threshold:.1f}%" ) records = result_df.to_dict(orient="records") if result_df is not None else [] records = [{k: _np_to_py(v) for k, v in row.items()} for row in records] json_obj = records json_text = json.dumps(json_obj, ensure_ascii=False, indent=2) return result_df, summary, json_obj, json_text # ---------- Gradio ラッパ ---------- def run_variability(csv_file, excel_file, datetime_str, window_minutes, cv_threshold_pct, jump_pct_threshold, mad_sigma): try: df = pd.read_csv(csv_file.name, header=[0, 1, 2]) timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce") df = df.drop(df.columns[0], axis=1) df.insert(0, "timestamp", timestamp_col) except Exception as e: return None, f"❌ CSV 読み込み失敗: {e}", None, None important_lookup = {} if excel_file is not None: important_lookup = load_thresholds(excel_file.name) result_df, summary, json_obj, json_text = analyze_variability_core( df=df, important_lookup=important_lookup, datetime_str=datetime_str, window_minutes=int(window_minutes), cv_threshold_pct=float(cv_threshold_pct), jump_pct_threshold=float(jump_pct_threshold), mad_sigma=float(mad_sigma), ) if result_df is None: return None, summary, None, None fname = f"variability_result_{int(time.time())}.json" with open(fname, "w", encoding="utf-8") as f: f.write(json_text) return result_df, summary, json_obj, fname # ---------- Gradio UI ---------- with gr.Blocks(css=".gradio-container {overflow: auto !important;}") as demo: gr.Markdown("## 変動解析アプリ(単独 / Hugging Face 対応)") with gr.Row(): csv_input = gr.File(label="CSVファイル(3行ヘッダー)", file_types=[".csv"], type="filepath") excel_input = gr.File(label="Excel(任意: Important参照)", file_types=[".xlsx"], type="filepath") with gr.Row(): datetime_str = gr.Textbox(label="基準日時", value="2025/8/1 1:05") window_minutes = gr.Number(label="さかのぼる時間幅(分)", value=60) with gr.Row(): cv_threshold_pct = gr.Number(label="CV(%) しきい値", value=10.0) jump_pct_threshold = gr.Number(label="1ステップ相対ジャンプ率しきい値(%)", value=10.0) mad_sigma = gr.Number(label="MAD倍率(スパイク閾値)", value=3.0) run_btn = gr.Button("変動解析を実行") result_table = gr.Dataframe(label="変動解析結果") summary_out = gr.Textbox(label="サマリー", lines=6) json_out = gr.Json(label="JSONプレビュー") json_file = gr.File(label="JSONダウンロード", type="filepath") run_btn.click( run_variability, inputs=[csv_input, excel_input, datetime_str, window_minutes, cv_threshold_pct, jump_pct_threshold, mad_sigma], outputs=[result_table, summary_out, json_out, json_file] ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", share=False)