| | import gradio as gr |
| | import pandas as pd |
| | import numpy as np |
| | import os |
| | import re |
| | from typing import Dict, Tuple, List, Optional, Callable |
| | import plotly.graph_objects as go |
| | from plotly.subplots import make_subplots |
| | import plotly.io as pio |
| |
|
| | |
| | |
| | |
| | DEFAULT_CSV_PATH = "/mnt/data/mock_data_id_9999.csv" |
| |
|
| | |
| | |
| | |
| | def normalize(s: str) -> str: |
| | return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip() |
| |
|
| | def try_read_csv_3header(path_or_file) -> pd.DataFrame: |
| | """ |
| | 3行ヘッダーCSVを読み込む(cp932/utf-8-sig フォールバック)。 |
| | 1列目は timestamp として datetime 変換。 |
| | 2列目以降は (ID, ItemName, ProcessName) の3段。 |
| | """ |
| | last_err = None |
| | for enc in ["cp932", "utf-8-sig", "utf-8"]: |
| | try: |
| | df = pd.read_csv(path_or_file, header=[0, 1, 2], encoding=enc) |
| | break |
| | except Exception as e: |
| | last_err = e |
| | df = None |
| | if df is None: |
| | raise last_err |
| |
|
| | |
| | ts = pd.to_datetime(df.iloc[:, 0], errors="coerce") |
| | df = df.drop(df.columns[0], axis=1) |
| | df.insert(0, "timestamp", ts) |
| |
|
| | return df |
| |
|
| | def col_tuple_to_str(col) -> str: |
| | if isinstance(col, tuple): |
| | return "_".join([str(x) for x in col if x]) |
| | return str(col) |
| |
|
| | def build_index_maps(df: pd.DataFrame): |
| | """ |
| | プロセス(3行目=タプルの3つ目)→ 該当列情報 の辞書を作る。 |
| | 各列は (col_tuple, id, item, process, col_str) |
| | """ |
| | process_map = {} |
| | for col in df.columns: |
| | if col == "timestamp": |
| | continue |
| | if isinstance(col, tuple) and len(col) >= 3: |
| | col_id, item_name, process_name = str(col[0]), str(col[1]), str(col[2]) |
| | else: |
| | parts = str(col).split("_") |
| | if len(parts) >= 3: |
| | col_id, item_name, process_name = parts[0], "_".join(parts[1:-1]), parts[-1] |
| | else: |
| | continue |
| | rec = { |
| | "col_tuple": col, |
| | "id": col_id, |
| | "item": item_name, |
| | "process": process_name, |
| | "col_str": col_tuple_to_str(col), |
| | } |
| | process_map.setdefault(process_name, []).append(rec) |
| | processes = sorted(list(process_map.keys()), key=lambda x: normalize(x)) |
| | return process_map, processes |
| |
|
| | def extract_measure_tag(item_name: str) -> str: |
| | """ |
| | 項目名末尾の計測項目タグを抽出。([...]優先→末尾語) |
| | """ |
| | s = normalize(item_name) |
| | m = re.search(r"\[([^\[\]]+)\]\s*$", s) |
| | if m: |
| | return m.group(1).strip() |
| | tokens = re.split(r"\s+", s) |
| | return tokens[-1] if tokens else s |
| |
|
| | def extract_category(item_name: str) -> str: |
| | """ |
| | 項目名の「最後の '_' 以降」をカテゴリ名として返す。 |
| | 例: '除害RO_A処理水_導電率' → '導電率' / '..._圧力' → '圧力' |
| | '_' が無い場合は「処理水…」の後ろや末尾語を推定。 |
| | """ |
| | s = normalize(item_name) |
| | if "_" in s: |
| | return s.split("_")[-1].strip() |
| | m = re.search(r"処理水[_\s]*(.+)$", s) |
| | if m: |
| | return m.group(1).strip() |
| | toks = re.split(r"\s+", s) |
| | return toks[-1] if toks else s |
| |
|
| | |
| | |
| | |
| | def try_read_thresholds_excel(file) -> Optional[pd.DataFrame]: |
| | """ |
| | しきい値Excel(任意)を読み込み。 |
| | 想定カラム: ColumnID, ItemName, ProcessNo_ProcessName, LL, L, H, HH, Important(任意) |
| | """ |
| | if file is None: |
| | return None |
| | df = pd.read_excel(file) |
| | df.columns = [normalize(c) for c in df.columns] |
| | needed = {"ColumnID", "ItemName", "ProcessNo_ProcessName"} |
| | if not needed.issubset(set(df.columns)): |
| | rename_map = {} |
| | for k in list(df.columns): |
| | nk = normalize(str(k)) |
| | if nk.lower() in ["columnid", "colid", "id"]: |
| | rename_map[k] = "ColumnID" |
| | elif nk.lower() in ["itemname", "item", "name"]: |
| | rename_map[k] = "ItemName" |
| | elif nk.lower() in ["processno_processname", "process", "processname"]: |
| | rename_map[k] = "ProcessNo_ProcessName" |
| | if rename_map: |
| | df = df.rename(columns=rename_map) |
| | for c in ["LL", "L", "H", "HH"]: |
| | if c in df.columns: |
| | df[c] = pd.to_numeric(df[c], errors="coerce") |
| | if "Important" in df.columns: |
| | df["Important"] = ( |
| | df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) |
| | ) |
| | return df |
| |
|
| | def build_threshold_lookup(thr_df: Optional[pd.DataFrame]) -> Dict[Tuple[str, str, str], Tuple[float, float, float, float]]: |
| | """ |
| | キー: (ColumnID, ItemName, ProcessNo_ProcessName) → (LL, L, H, HH) |
| | """ |
| | lookup = {} |
| | if thr_df is None or thr_df.empty: |
| | return lookup |
| | for _, r in thr_df.iterrows(): |
| | colid = normalize(str(r.get("ColumnID", ""))) |
| | item = normalize(str(r.get("ItemName", ""))) |
| | proc = normalize(str(r.get("ProcessNo_ProcessName", ""))) |
| | LL = r.get("LL", np.nan) |
| | L = r.get("L", np.nan) |
| | H = r.get("H", np.nan) |
| | HH = r.get("HH", np.nan) |
| | lookup[(colid, item, proc)] = (LL, L, H, HH) |
| | return lookup |
| |
|
| | def auto_threshold(series: pd.Series) -> Tuple[float, float, float, float]: |
| | """ |
| | 自動しきい値: mean ± std(LL/L/H/HH を mean±2sd / ±1sd とする) |
| | """ |
| | s = series.dropna() |
| | if len(s) < 5: |
| | return (np.nan, np.nan, np.nan, np.nan) |
| | m = float(s.mean()) |
| | sd = float(s.std(ddof=1)) if len(s) >= 2 else 0.0 |
| | return (m - 2*sd, m - sd, m + sd, m + 2*sd) |
| |
|
| | def judge_status(value, LL, L, H, HH) -> str: |
| | if pd.notna(LL) and value <= LL: |
| | return "LL" |
| | if pd.notna(L) and value <= L: |
| | return "L" |
| | if pd.notna(HH) and value >= HH: |
| | return "HH" |
| | if pd.notna(H) and value >= H: |
| | return "H" |
| | return "OK" |
| |
|
| | |
| | STATUS_COLOR = { |
| | "LL": "#2b6cb0", |
| | "L": "#63b3ed", |
| | "OK": "#a0aec0", |
| | "H": "#f6ad55", |
| | "HH": "#e53e3e", |
| | } |
| | LINE_COLOR = "#4a5568" |
| |
|
| | |
| | |
| | |
| | |
| | def _group_key_func(group_by: str) -> Callable[[dict], str]: |
| | if group_by == "item": |
| | return lambda rr: normalize(rr["item"]) |
| | if group_by == "category": |
| | return lambda rr: extract_category(rr["item"]) |
| | |
| | return lambda rr: "ALL" |
| |
|
| | def make_grouped_figure( |
| | df: pd.DataFrame, |
| | process_map: Dict[str, List[dict]], |
| | process_name: str, |
| | selected_items: List[str], |
| | thr_df: Optional[pd.DataFrame], |
| | thr_mode: str, |
| | date_min: Optional[str], |
| | date_max: Optional[str], |
| | group_by: str, |
| | _force_groups: Optional[List[str]] = None, |
| | ) -> Optional[go.Figure]: |
| | if df is None or not process_name: |
| | return None |
| | recs = process_map.get(process_name, []) |
| | if not recs: |
| | return None |
| | selected = set([normalize(x) for x in (selected_items or [])]) |
| | recs = [r for r in recs if normalize(r["item"]) in selected] |
| | if not recs: |
| | return None |
| |
|
| | dfw = df.copy() |
| | if date_min: |
| | dfw = dfw[dfw["timestamp"] >= pd.to_datetime(date_min)] |
| | if date_max: |
| | dfw = dfw[dfw["timestamp"] <= pd.to_datetime(date_max)] |
| | if dfw.empty: |
| | return None |
| |
|
| | thr_lookup = build_threshold_lookup(thr_df) if thr_mode == "excel" else {} |
| | keyfunc = _group_key_func(group_by) |
| |
|
| | |
| | groups: Dict[str, List[dict]] = {} |
| | for r in recs: |
| | groups.setdefault(keyfunc(r), []).append(r) |
| |
|
| | group_names = list(groups.keys()) if _force_groups is None else _force_groups |
| | if not group_names: |
| | return None |
| |
|
| | rows = len(group_names) |
| | if rows <= 1: |
| | vspace = 0.03 |
| | else: |
| | max_vs = (1.0 / (rows - 1)) - 1e-4 |
| | vspace = max(0.0, min(0.03, max_vs)) |
| |
|
| | |
| | if group_by == "all": |
| | subtitles = [f"{process_name} | すべての項目"] |
| | elif group_by == "category": |
| | subtitles = [f"{process_name} | 分類: {g}" for g in group_names] |
| | else: |
| | subtitles = [f"{process_name} | 項目: {g}" for g in group_names] |
| |
|
| | fig = make_subplots( |
| | rows=rows, cols=1, shared_xaxes=True, |
| | vertical_spacing=vspace, |
| | subplot_titles=subtitles |
| | ) |
| |
|
| | |
| | row_idx = 1 |
| | for gname in group_names: |
| | cols = groups.get(gname, []) |
| | for r in cols: |
| | col = r["col_tuple"] |
| | col_str = r["col_str"] |
| | if col in dfw.columns: |
| | series = dfw[col] |
| | elif col_str in dfw.columns: |
| | series = dfw[col_str] |
| | else: |
| | continue |
| |
|
| | x = dfw["timestamp"] |
| | y = pd.to_numeric(series, errors="coerce") |
| |
|
| | if thr_mode == "excel": |
| | key = (normalize(r["id"]), normalize(r["item"]), normalize(r["process"])) |
| | LL, L, H, HH = thr_lookup.get(key, (np.nan, np.nan, np.nan, np.nan)) |
| | if all(pd.isna(v) for v in [LL, L, H, HH]): |
| | LL, L, H, HH = auto_threshold(y) |
| | else: |
| | LL, L, H, HH = auto_threshold(y) |
| |
|
| | |
| | fig.add_trace( |
| | go.Scatter( |
| | x=x, y=y, mode="lines", |
| | name=f"{r['item']} ({r['id']})", |
| | line=dict(color=LINE_COLOR, width=1.5), |
| | hovertemplate="%{x}<br>%{y}<extra>"+f"{r['item']} ({r['id']})"+"</extra>" |
| | ), |
| | row=row_idx, col=1 |
| | ) |
| | |
| | colors = [] |
| | for v in y: |
| | if pd.isna(v): |
| | colors.append("rgba(0,0,0,0)") |
| | else: |
| | st = judge_status(v, LL, L, H, HH) |
| | colors.append(STATUS_COLOR.get(st, STATUS_COLOR["OK"])) |
| | fig.add_trace( |
| | go.Scatter( |
| | x=x, y=y, mode="markers", |
| | name=f"{r['item']} markers", |
| | marker=dict(size=6, color=colors), |
| | showlegend=False, |
| | hovertemplate="%{x}<br>%{y}<extra></extra>" |
| | ), |
| | row=row_idx, col=1 |
| | ) |
| | |
| | row_idx += 1 |
| |
|
| | fig.update_layout( |
| | title=( |
| | f"{process_name} | " |
| | + ("一括表示" if group_by == "all" |
| | else "分類別表示(カテゴリ)" if group_by == "category" |
| | else "個別表示(項目)") |
| | ), |
| | xaxis_title="timestamp", |
| | showlegend=True, |
| | margin=dict(l=10, r=10, t=40, b=10), |
| | hovermode="x unified", |
| | height=max(420, 260 * rows), |
| | ) |
| | return fig |
| |
|
| | |
| | def make_grouped_figure_paged( |
| | df: pd.DataFrame, |
| | process_map: Dict[str, List[dict]], |
| | process_name: str, |
| | selected_items: List[str], |
| | thr_df: Optional[pd.DataFrame], |
| | thr_mode: str, |
| | date_min: Optional[str], |
| | date_max: Optional[str], |
| | page: int, |
| | per_page: int, |
| | group_by: str, |
| | ) -> Tuple[Optional[go.Figure], int, List[str]]: |
| | recs = process_map.get(process_name, []) |
| | if not recs: |
| | return None, 0, [] |
| | selected = set([normalize(x) for x in (selected_items or [])]) |
| | recs = [r for r in recs if normalize(r["item"]) in selected] |
| | if not recs: |
| | return None, 0, [] |
| |
|
| | keyfunc = _group_key_func(group_by) |
| | groups: Dict[str, List[dict]] = {} |
| | for r in recs: |
| | groups.setdefault(keyfunc(r), []).append(r) |
| | all_names = list(groups.keys()) |
| | total_pages = max(1, int(np.ceil(len(all_names) / max(1, per_page)))) |
| | page = int(max(1, min(page, total_pages))) |
| | start = (page - 1) * per_page |
| | end = start + per_page |
| | names_slice = all_names[start:end] |
| |
|
| | fig = make_grouped_figure( |
| | df, process_map, process_name, selected_items, thr_df, thr_mode, |
| | date_min, date_max, group_by=group_by, _force_groups=names_slice |
| | ) |
| | return fig, total_pages, all_names |
| |
|
| | |
| | |
| | |
| | G_DF: Optional[pd.DataFrame] = None |
| | G_PROCESS_MAP = {} |
| | G_PROCESSES = [] |
| | G_THRESHOLDS_DF: Optional[pd.DataFrame] = None |
| |
|
| | |
| | |
| | |
| | def initialize_default_csv(): |
| | """ |
| | 起動時にデフォルトCSVが存在すれば読み込む。 |
| | """ |
| | global G_DF, G_PROCESS_MAP, G_PROCESSES |
| | if os.path.exists(DEFAULT_CSV_PATH): |
| | try: |
| | df = try_read_csv_3header(DEFAULT_CSV_PATH) |
| | G_DF = df |
| | G_PROCESS_MAP, G_PROCESSES = build_index_maps(df) |
| | return ( |
| | f"✅ 既定CSVを読み込みました: {DEFAULT_CSV_PATH}", |
| | gr.update(choices=G_PROCESSES, value=(G_PROCESSES[0] if G_PROCESSES else None)), |
| | G_PROCESSES |
| | ) |
| | except Exception as e: |
| | return f"⚠ 既定CSV読み込み失敗: {e}", gr.update(), [] |
| | return "ℹ CSVをアップロードしてください。", gr.update(), [] |
| |
|
| | def on_csv_upload(file): |
| | """ |
| | CSVアップロード → パース → プロセス候補更新 |
| | """ |
| | global G_DF, G_PROCESS_MAP, G_PROCESSES |
| | if file is None: |
| | return "⚠ ファイルが選択されていません。", gr.update(choices=[]), [] |
| | try: |
| | df = try_read_csv_3header(file.name if hasattr(file, "name") else file) |
| | G_DF = df |
| | G_PROCESS_MAP, G_PROCESSES = build_index_maps(df) |
| | return ( |
| | f"✅ CSV読み込み: {df.shape[0]}行 × {df.shape[1]}列", |
| | gr.update(choices=G_PROCESSES, value=(G_PROCESSES[0] if G_PROCESSES else None)), |
| | G_PROCESSES |
| | ) |
| | except Exception as e: |
| | return f"❌ 読み込みエラー: {e}", gr.update(choices=[]), [] |
| |
|
| | def on_thr_upload(file): |
| | """ |
| | しきい値Excelアップロード → メモリ更新 |
| | """ |
| | global G_THRESHOLDS_DF |
| | if file is None: |
| | G_THRESHOLDS_DF = None |
| | return "ℹ しきい値ファイルなし(自動しきい値が使われます)" |
| | try: |
| | thr = try_read_thresholds_excel(file.name if hasattr(file, "name") else file) |
| | G_THRESHOLDS_DF = thr |
| | return f"✅ しきい値を読み込みました({thr.shape[0]}件)" |
| | except Exception as e: |
| | G_THRESHOLDS_DF = None |
| | return f"❌ しきい値読み込みエラー: {e}" |
| |
|
| | def update_items(process_name: str): |
| | """ |
| | プロセス選択に応じて、項目(2行目)候補を返す。 |
| | """ |
| | if not process_name or process_name not in G_PROCESS_MAP: |
| | return gr.update(choices=[], value=[]) |
| | items = sorted(list({rec["item"] for rec in G_PROCESS_MAP[process_name]}), key=lambda x: normalize(x)) |
| | |
| | return gr.update(choices=items, value=items) |
| |
|
| | def render_any(process_name: str, items: List[str], display_mode: str, thr_mode_label: str, |
| | date_min, date_max, page: int, per_page: int): |
| | """ |
| | 表示モードに応じて Plot を返す。 |
| | - 一括表示: 全選択項目を1枚の行(ALL)にまとめる |
| | - 分類別表示: 末尾カテゴリごとにサブプロット。多い場合はページ分割 |
| | - 個別表示: 項目ごとにサブプロット。多い場合はページ分割 |
| | """ |
| | if G_DF is None: |
| | return "⚠ データ未読み込み", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
| | if not process_name: |
| | return "⚠ プロセスを選択してください", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
| | if not items: |
| | return "⚠ 項目を選択してください", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
| |
|
| | mode = "excel" if str(thr_mode_label).startswith("excel") else "auto" |
| |
|
| | |
| | if str(display_mode).startswith("一括"): |
| | fig = make_grouped_figure( |
| | G_DF, G_PROCESS_MAP, process_name, items, G_THRESHOLDS_DF, mode, date_min, date_max, group_by="all" |
| | ) |
| | if fig is None: |
| | return "⚠ 図を生成できませんでした(データ無し or 条件不一致)", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
| | return "✅ 一括表示を描画しました", gr.update(value=fig, visible=True), gr.update(visible=False), gr.update(visible=False) |
| |
|
| | |
| | if str(display_mode).startswith("分類"): |
| | fig, total_pages, all_names = make_grouped_figure_paged( |
| | G_DF, G_PROCESS_MAP, process_name, items, G_THRESHOLDS_DF, mode, |
| | date_min, date_max, page=int(page), per_page=int(per_page), group_by="category" |
| | ) |
| | if fig is None: |
| | return "⚠ 図を生成できませんでした(データ無し or 条件不一致)", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
| | info = f"分類(カテゴリ)数: {len(all_names)} | ページ {int(max(1,min(page, total_pages)))} / {total_pages} | 件/ページ={int(per_page)}" |
| | return "✅ 分類別表示(末尾語カテゴリ)を描画しました", gr.update(value=fig, visible=True), gr.update(value=info, visible=True), gr.update(visible=True) |
| |
|
| | |
| | fig, total_pages, all_names = make_grouped_figure_paged( |
| | G_DF, G_PROCESS_MAP, process_name, items, G_THRESHOLDS_DF, mode, |
| | date_min, date_max, page=int(page), per_page=int(per_page), group_by="item" |
| | ) |
| | if fig is None: |
| | return "⚠ 図を生成できませんでした(データ無し or 条件不一致)", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
| | info = f"項目数: {len(all_names)} | ページ {int(max(1,min(page, total_pages)))} / {total_pages} | 件/ページ={int(per_page)}" |
| | return "✅ 個別表示(項目)を描画しました", gr.update(value=fig, visible=True), gr.update(value=info, visible=True), gr.update(visible=True) |
| |
|
| | |
| | |
| | |
| | init_msg, init_proc_update, _ = initialize_default_csv() |
| | init_value = init_proc_update.get("value") if isinstance(init_proc_update, dict) else None |
| | init_choices = init_proc_update.get("choices") if isinstance(init_proc_update, dict) else [] |
| |
|
| | with gr.Blocks(css=""" |
| | .gradio-container {overflow: auto !important;} |
| | """) as demo: |
| | gr.Markdown("## トレンドグラフ専用アプリ(3行ヘッダー対応・プロセス別・分類/個別・閾値色分け)") |
| |
|
| | with gr.Row(): |
| | csv_uploader = gr.File(label="① 時系列CSV(3行ヘッダー)", file_count="single", file_types=[".csv"]) |
| | thr_uploader = gr.File(label="② 閾値Excel(任意: LL/L/H/HH)", file_count="single", file_types=[".xlsx", ".xls"]) |
| |
|
| | with gr.Row(): |
| | thr_mode = gr.Radio( |
| | ["excel(アップロード優先・無ければ自動)", "自動(平均±標準偏差)"], |
| | value="excel(アップロード優先・無ければ自動)", |
| | label="しきい値モード" |
| | ) |
| | date_min = gr.Textbox(label="抽出開始日時(任意)例: 2024-07-01 00:00") |
| | date_max = gr.Textbox(label="抽出終了日時(任意)例: 2024-07-31 23:59") |
| |
|
| | |
| | display_mode = gr.Radio( |
| | ["一括表示", "分類別表示(カテゴリ)", "個別表示(項目)"], |
| | value="一括表示", |
| | label="表示形式" |
| | ) |
| |
|
| | status_csv = gr.Markdown(init_msg) |
| | status_thr = gr.Markdown() |
| |
|
| | process_dd = gr.Dropdown(label="対象プロセス(3行ヘッダーの3行目)", |
| | choices=init_choices, value=init_value) |
| | items_cb = gr.CheckboxGroup(label="表示する項目(3行ヘッダーの2行目)", choices=[], value=[]) |
| |
|
| | with gr.Row(): |
| | btn_render = gr.Button("トレンド図を生成", variant="primary") |
| |
|
| | msg = gr.Markdown() |
| | plot = gr.Plot(label="トレンド図", visible=True) |
| |
|
| | |
| | with gr.Row(): |
| | per_page = gr.Slider(1, 12, value=8, step=1, label="件/ページ(分類別・個別)", visible=False) |
| | page_no = gr.Number(value=1, label="ページ(1〜)", precision=0, visible=False) |
| | page_info = gr.Markdown(visible=False) |
| |
|
| | |
| | csv_uploader.change( |
| | on_csv_upload, |
| | inputs=[csv_uploader], |
| | outputs=[status_csv, process_dd, gr.State()], |
| | ) |
| |
|
| | |
| | thr_uploader.change( |
| | on_thr_upload, |
| | inputs=[thr_uploader], |
| | outputs=[status_thr], |
| | ) |
| |
|
| | |
| | process_dd.change( |
| | update_items, |
| | inputs=[process_dd], |
| | outputs=[items_cb], |
| | ) |
| |
|
| | |
| | btn_render.click( |
| | fn=lambda proc, items, disp_mode, mode, dmin, dmax, p, pp: |
| | render_any(proc, items, disp_mode, mode, dmin, dmax, p, pp), |
| | inputs=[process_dd, items_cb, display_mode, thr_mode, date_min, date_max, page_no, per_page], |
| | outputs=[msg, plot, page_info, page_no], |
| | ) |
| |
|
| | |
| | def _toggle_page_controls(mode): |
| | show = not str(mode).startswith("一括") |
| | return gr.update(visible=show), gr.update(visible=show), gr.update(visible=show) |
| | display_mode.change( |
| | _toggle_page_controls, |
| | inputs=[display_mode], |
| | outputs=[per_page, page_no, page_info], |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | |
| | demo.launch(ssr_mode=False) |
| |
|