Ken-INOUE's picture
Implement initial project structure and setup
7b56cc8
# 変動解析アプリ(単独 Gradio 版・粗化なし)
import gradio as gr
import pandas as pd
import numpy as np
import json
import os
import time
from typing import Dict, Optional
# ---------- ユーティリティ ----------
def _np_to_py(x):
if hasattr(x, "item"):
try:
return x.item()
except Exception:
pass
if isinstance(x, (np.integer,)):
return int(x)
if isinstance(x, (np.floating,)):
return float(x)
return x
def robust_mad(x: pd.Series) -> float:
"""差分系列のロバストなスケール推定量(1.4826×MAD)。"""
if len(x) == 0:
return np.nan
med = np.median(x)
mad = np.median(np.abs(x - med))
return 1.4826 * mad
def load_thresholds(excel_path: Optional[str]) -> Dict[tuple, bool]:
"""閾値Excelから Important フラグを辞書に。"""
if not excel_path:
return {}
try:
thresholds_df = pd.read_excel(excel_path)
if "Important" in thresholds_df.columns:
thresholds_df["Important"] = (
thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False})
)
else:
thresholds_df["Important"] = False
need = {"ColumnID", "ItemName", "ProcessNo_ProcessName", "Important"}
if not need.issubset(set(thresholds_df.columns)):
return {}
return {
(row["ColumnID"], row["ItemName"], row["ProcessNo_ProcessName"]): bool(row["Important"])
for _, row in thresholds_df.iterrows()
}
except Exception:
return {}
# ---------- 変動解析ロジック ----------
def analyze_variability_core(
df: pd.DataFrame,
important_lookup: Dict[tuple, bool],
datetime_str: str,
window_minutes: int,
cv_threshold_pct: float = 10.0,
jump_pct_threshold: float = 10.0,
mad_sigma: float = 3.0,
):
target_time = pd.to_datetime(datetime_str)
start_time = target_time - pd.Timedelta(minutes=window_minutes)
end_time = target_time
dfw = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)].copy()
if dfw.empty:
return None, f"⚠ 指定時間幅({start_time}{end_time})にデータが見つかりません。", None, None
data_cols = [
c for c in dfw.columns
if c != "timestamp" and pd.api.types.is_numeric_dtype(dfw[c])
]
results = []
unstable_count = 0
for col in data_cols:
s = dfw[col].dropna()
n = len(s)
if n < 3:
continue
mean = float(np.mean(s))
std = float(np.std(s, ddof=1)) if n >= 2 else 0.0
cv_pct = np.nan if mean == 0 else abs(std / mean) * 100.0
diffs = s.diff().dropna()
mad_scale = robust_mad(diffs)
ref = max(1e-9, abs(float(np.median(s))))
rel_jump = diffs.abs() / ref * 100.0
abs_thr = (mad_sigma * mad_scale) if (not np.isnan(mad_scale) and mad_scale > 0) else np.inf
abs_cond = diffs.abs() > abs_thr
pct_cond = rel_jump >= jump_pct_threshold
spike_mask = abs_cond | pct_cond
spike_count = int(spike_mask.sum())
spike_up_count = int((diffs[spike_mask] > 0).sum())
spike_down_count = spike_count - spike_up_count
max_step = float(diffs.abs().max()) if len(diffs) else np.nan
last_val = float(s.iloc[-1])
first_val = float(s.iloc[0])
important = False
if isinstance(col, tuple) and len(col) == 3:
important = important_lookup.get(col, False)
unstable = (not np.isnan(cv_pct) and cv_pct >= cv_threshold_pct) or (spike_count > 0)
if unstable:
unstable_count += 1
colid, itemname, proc = (col if isinstance(col, tuple) else ("", str(col), ""))
results.append({
"ColumnID": colid,
"ItemName": itemname,
"Process": proc,
"サンプル数": n,
"平均": _np_to_py(round(mean, 6)),
"標準偏差": _np_to_py(round(std, 6)),
"CV(%)": None if np.isnan(cv_pct) else float(round(cv_pct, 3)),
"スパイク数": spike_count,
"スパイク上昇数": spike_up_count,
"スパイク下降数": spike_down_count,
"最大|ステップ|": None if np.isnan(max_step) else float(round(max_step, 6)),
"最初の値": _np_to_py(round(first_val, 6)),
"最後の値": _np_to_py(round(last_val, 6)),
"重要項目": bool(important),
"不安定判定": bool(unstable),
})
result_df = pd.DataFrame(results)
if not result_df.empty:
result_df = result_df.sort_values(
by=["不安定判定", "CV(%)", "スパイク数"],
ascending=[False, False, False],
na_position="last"
).reset_index(drop=True)
total_cols = len(results)
summary = (
f"✅ 変動解析完了({start_time}{end_time})\n"
f"- 対象項目数: {total_cols}\n"
f"- 不安定と判定: {unstable_count} 項目(CV≥{cv_threshold_pct:.1f}% または スパイクあり)\n"
f"- スパイク条件: |diff| > {mad_sigma:.1f}×MAD または 1ステップ相対変化 ≥ {jump_pct_threshold:.1f}%"
)
records = result_df.to_dict(orient="records") if result_df is not None else []
records = [{k: _np_to_py(v) for k, v in row.items()} for row in records]
json_obj = records
json_text = json.dumps(json_obj, ensure_ascii=False, indent=2)
return result_df, summary, json_obj, json_text
# ---------- Gradio ラッパ ----------
def run_variability(csv_file, excel_file, datetime_str, window_minutes, cv_threshold_pct, jump_pct_threshold, mad_sigma):
try:
df = pd.read_csv(csv_file.name, header=[0, 1, 2])
timestamp_col = pd.to_datetime(df.iloc[:, 0], errors="coerce")
df = df.drop(df.columns[0], axis=1)
df.insert(0, "timestamp", timestamp_col)
except Exception as e:
return None, f"❌ CSV 読み込み失敗: {e}", None, None
important_lookup = {}
if excel_file is not None:
important_lookup = load_thresholds(excel_file.name)
result_df, summary, json_obj, json_text = analyze_variability_core(
df=df,
important_lookup=important_lookup,
datetime_str=datetime_str,
window_minutes=int(window_minutes),
cv_threshold_pct=float(cv_threshold_pct),
jump_pct_threshold=float(jump_pct_threshold),
mad_sigma=float(mad_sigma),
)
if result_df is None:
return None, summary, None, None
fname = f"variability_result_{int(time.time())}.json"
with open(fname, "w", encoding="utf-8") as f:
f.write(json_text)
return result_df, summary, json_obj, fname
# ---------- Gradio UI ----------
with gr.Blocks(css=".gradio-container {overflow: auto !important;}") as demo:
gr.Markdown("## 変動解析アプリ(単独 / Hugging Face 対応)")
with gr.Row():
csv_input = gr.File(label="CSVファイル(3行ヘッダー)", file_types=[".csv"], type="filepath")
excel_input = gr.File(label="Excel(任意: Important参照)", file_types=[".xlsx"], type="filepath")
with gr.Row():
datetime_str = gr.Textbox(label="基準日時", value="2025/8/1 1:05")
window_minutes = gr.Number(label="さかのぼる時間幅(分)", value=60)
with gr.Row():
cv_threshold_pct = gr.Number(label="CV(%) しきい値", value=10.0)
jump_pct_threshold = gr.Number(label="1ステップ相対ジャンプ率しきい値(%)", value=10.0)
mad_sigma = gr.Number(label="MAD倍率(スパイク閾値)", value=3.0)
run_btn = gr.Button("変動解析を実行")
result_table = gr.Dataframe(label="変動解析結果")
summary_out = gr.Textbox(label="サマリー", lines=6)
json_out = gr.Json(label="JSONプレビュー")
json_file = gr.File(label="JSONダウンロード", type="filepath")
run_btn.click(
run_variability,
inputs=[csv_input, excel_input, datetime_str, window_minutes, cv_threshold_pct, jump_pct_threshold, mad_sigma],
outputs=[result_table, summary_out, json_out, json_file]
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", share=False)