| | |
| | import gradio as gr |
| | import pandas as pd |
| | import numpy as np |
| | import json |
| | import os |
| | import time |
| | from typing import Dict, Optional |
| |
|
| | |
| | def _np_to_py(x): |
| | if hasattr(x, "item"): |
| | try: |
| | return x.item() |
| | except Exception: |
| | pass |
| | if isinstance(x, (np.integer,)): |
| | return int(x) |
| | if isinstance(x, (np.floating,)): |
| | return float(x) |
| | return x |
| |
|
| | def robust_mad(x: pd.Series) -> float: |
| | """差分系列のロバストなスケール推定量(1.4826×MAD)。""" |
| | if len(x) == 0: |
| | return np.nan |
| | med = np.median(x) |
| | mad = np.median(np.abs(x - med)) |
| | return 1.4826 * mad |
| |
|
| | def load_thresholds(excel_path: Optional[str]) -> Dict[tuple, bool]: |
| | """閾値Excelから Important フラグを辞書に。""" |
| | if not excel_path: |
| | return {} |
| | try: |
| | thresholds_df = pd.read_excel(excel_path) |
| | if "Important" in thresholds_df.columns: |
| | thresholds_df["Important"] = ( |
| | thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) |
| | ) |
| | else: |
| | thresholds_df["Important"] = False |
| | need = {"ColumnID", "ItemName", "ProcessNo_ProcessName", "Important"} |
| | if not need.issubset(set(thresholds_df.columns)): |
| | return {} |
| | return { |
| | (row["ColumnID"], row["ItemName"], row["ProcessNo_ProcessName"]): bool(row["Important"]) |
| | for _, row in thresholds_df.iterrows() |
| | } |
| | except Exception: |
| | return {} |
| |
|
| | |
| | def analyze_variability_core( |
| | df: pd.DataFrame, |
| | important_lookup: Dict[tuple, bool], |
| | datetime_str: str, |
| | window_minutes: int, |
| | cv_threshold_pct: float = 10.0, |
| | jump_pct_threshold: float = 10.0, |
| | mad_sigma: float = 3.0, |
| | ): |
| | target_time = pd.to_datetime(datetime_str) |
| | start_time = target_time - pd.Timedelta(minutes=window_minutes) |
| | end_time = target_time |
| |
|
| | dfw = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)].copy() |
| | if dfw.empty: |
| | return None, f"⚠ 指定時間幅({start_time}~{end_time})にデータが見つかりません。", None, None |
| |
|
| | data_cols = [ |
| | c for c in dfw.columns |
| | if c != "timestamp" and pd.api.types.is_numeric_dtype(dfw[c]) |
| | ] |
| |
|
| | results = [] |
| | unstable_count = 0 |
| |
|
| | for col in data_cols: |
| | s = dfw[col].dropna() |
| | n = len(s) |
| | if n < 3: |
| | continue |
| |
|
| | mean = float(np.mean(s)) |
| | std = float(np.std(s, ddof=1)) if n >= 2 else 0.0 |
| | cv_pct = np.nan if mean == 0 else abs(std / mean) * 100.0 |
| |
|
| | diffs = s.diff().dropna() |
| | mad_scale = robust_mad(diffs) |
| | ref = max(1e-9, abs(float(np.median(s)))) |
| | rel_jump = diffs.abs() / ref * 100.0 |
| |
|
| | abs_thr = (mad_sigma * mad_scale) if (not np.isnan(mad_scale) and mad_scale > 0) else np.inf |
| | abs_cond = diffs.abs() > abs_thr |
| | pct_cond = rel_jump >= jump_pct_threshold |
| | spike_mask = abs_cond | pct_cond |
| |
|
| | spike_count = int(spike_mask.sum()) |
| | spike_up_count = int((diffs[spike_mask] > 0).sum()) |
| | spike_down_count = spike_count - spike_up_count |
| | max_step = float(diffs.abs().max()) if len(diffs) else np.nan |
| | last_val = float(s.iloc[-1]) |
| | first_val = float(s.iloc[0]) |
| |
|
| | important = False |
| | if isinstance(col, tuple) and len(col) == 3: |
| | important = important_lookup.get(col, False) |
| |
|
| | unstable = (not np.isnan(cv_pct) and cv_pct >= cv_threshold_pct) or (spike_count > 0) |
| | if unstable: |
| | unstable_count += 1 |
| |
|
| | colid, itemname, proc = (col if isinstance(col, tuple) else ("", str(col), "")) |
| |
|
| | results.append({ |
| | "ColumnID": colid, |
| | "ItemName": itemname, |
| | "Process": proc, |
| | "サンプル数": n, |
| | "平均": _np_to_py(round(mean, 6)), |
| | "標準偏差": _np_to_py(round(std, 6)), |
| | "CV(%)": None if np.isnan(cv_pct) else float(round(cv_pct, 3)), |
| | "スパイク数": spike_count, |
| | "スパイク上昇数": spike_up_count, |
| | "スパイク下降数": spike_down_count, |
| | "最大|ステップ|": None if np.isnan(max_step) else float(round(max_step, 6)), |
| | "最初の値": _np_to_py(round(first_val, 6)), |
| | "最後の値": _np_to_py(round(last_val, 6)), |
| | "重要項目": bool(important), |
| | "不安定判定": bool(unstable), |
| | }) |
| |
|
| | result_df = pd.DataFrame(results) |
| | if not result_df.empty: |
| | result_df = result_df.sort_values( |
| | by=["不安定判定", "CV(%)", "スパイク数"], |
| | ascending=[False, False, False], |
| | na_position="last" |
| | ).reset_index(drop=True) |
| |
|
| | total_cols = len(results) |
| | summary = ( |
| | f"✅ 変動解析完了({start_time} ~ {end_time})\n" |
| | f"- 対象項目数: {total_cols}\n" |
| | f"- 不安定と判定: {unstable_count} 項目(CV≥{cv_threshold_pct:.1f}% または スパイクあり)\n" |
| | f"- スパイク条件: |diff| > {mad_sigma:.1f}×MAD または 1ステップ相対変化 ≥ {jump_pct_threshold:.1f}%" |
| | ) |
| |
|
| | records = result_df.to_dict(orient="records") if result_df is not None else [] |
| | records = [{k: _np_to_py(v) for k, v in row.items()} for row in records] |
| | json_obj = records |
| | json_text = json.dumps(json_obj, ensure_ascii=False, indent=2) |
| |
|
| | return result_df, summary, json_obj, json_text |
| |
|
| | |
| | def run_variability(csv_file, excel_file, datetime_str, window_minutes, cv_threshold_pct, jump_pct_threshold, mad_sigma): |
| | try: |
| | df = pd.read_csv(csv_file.name, header=[0, 1, 2]) |
| | timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce") |
| | df = df.drop(df.columns[0], axis=1) |
| | df.insert(0, "timestamp", timestamp_col) |
| | except Exception as e: |
| | return None, f"❌ CSV 読み込み失敗: {e}", None, None |
| |
|
| | important_lookup = {} |
| | if excel_file is not None: |
| | important_lookup = load_thresholds(excel_file.name) |
| |
|
| | result_df, summary, json_obj, json_text = analyze_variability_core( |
| | df=df, |
| | important_lookup=important_lookup, |
| | datetime_str=datetime_str, |
| | window_minutes=int(window_minutes), |
| | cv_threshold_pct=float(cv_threshold_pct), |
| | jump_pct_threshold=float(jump_pct_threshold), |
| | mad_sigma=float(mad_sigma), |
| | ) |
| |
|
| | if result_df is None: |
| | return None, summary, None, None |
| |
|
| | fname = f"variability_result_{int(time.time())}.json" |
| | with open(fname, "w", encoding="utf-8") as f: |
| | f.write(json_text) |
| |
|
| | return result_df, summary, json_obj, fname |
| |
|
| | |
| | with gr.Blocks(css=".gradio-container {overflow: auto !important;}") as demo: |
| | gr.Markdown("## 変動解析アプリ(単独 / Hugging Face 対応)") |
| |
|
| | with gr.Row(): |
| | csv_input = gr.File(label="CSVファイル(3行ヘッダー)", file_types=[".csv"], type="filepath") |
| | excel_input = gr.File(label="Excel(任意: Important参照)", file_types=[".xlsx"], type="filepath") |
| |
|
| | with gr.Row(): |
| | datetime_str = gr.Textbox(label="基準日時", value="2025/8/1 1:05") |
| | window_minutes = gr.Number(label="さかのぼる時間幅(分)", value=60) |
| |
|
| | with gr.Row(): |
| | cv_threshold_pct = gr.Number(label="CV(%) しきい値", value=10.0) |
| | jump_pct_threshold = gr.Number(label="1ステップ相対ジャンプ率しきい値(%)", value=10.0) |
| | mad_sigma = gr.Number(label="MAD倍率(スパイク閾値)", value=3.0) |
| |
|
| | run_btn = gr.Button("変動解析を実行") |
| |
|
| | result_table = gr.Dataframe(label="変動解析結果") |
| | summary_out = gr.Textbox(label="サマリー", lines=6) |
| | json_out = gr.Json(label="JSONプレビュー") |
| | json_file = gr.File(label="JSONダウンロード", type="filepath") |
| |
|
| | run_btn.click( |
| | run_variability, |
| | inputs=[csv_input, excel_input, datetime_str, window_minutes, cv_threshold_pct, jump_pct_threshold, mad_sigma], |
| | outputs=[result_table, summary_out, json_out, json_file] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(server_name="0.0.0.0", share=False) |
| |
|