diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -1,61 +1,2986 @@
- # Row 2: 스테이지 리프트 + 포레스트 + 버블
+# -*- coding: utf-8 -*-
+
+# app.py — Bayesian Journey Dashboard (Colab-friendly, robust Excel + plots)
+# Fixes:
+# - ✅ 정규화 유틸(_as_all, _ensure_key_cols 등) 포함
+# - ✅ pick_row_for 포함
+# - ✅ Plotly 축 그리드 속성 정리(유효하지 않은 prop 제거)
+# - ✅ 포트 충돌 시 자동 대체 포트로 재시도
+
+import os, json, re, traceback
+import numpy as np
+import pandas as pd
+import plotly.graph_objects as go
+import plotly.express as px
+from dash import Dash, html, dcc, dash_table, Input, Output, State
+from dash.dash_table import FormatTemplate
+from dash.dash_table.Format import Format, Scheme
+import dash # (NEW) 인터랙션 로그용
+
+# (파일 상단 import 근처에 추가)
+import io
+
+import hashlib
+
+FLOW_SALT = os.getenv("FLOW_SALT", "phi-v1-2025-01") # 필요시 환경변수로 바꿔치기 가능
+FLOW_SALT = os.getenv("FLOW_SALT", "phi-v1-2025-01")
+FLOW_GLOBAL = True # True면 전역 고정, False면 해시 기반
+GLOBAL_K = 11.3
+
+def _flow_scale(seg, mod, loy):
+ if FLOW_GLOBAL:
+ return GLOBAL_K
+ key = f"{seg}|{mod}|{loy}|{FLOW_SALT}"
+ h = int(hashlib.sha256(key.encode("utf-8")).hexdigest()[:8], 16)
+ return 7.5 + (h % 1100) / 100.0
+
+# ======== 인터랙션 공용 설정 ========
+GRAPH_CONFIG = {
+ "displayModeBar": True,
+ "scrollZoom": True, # 휠로 줌
+ "doubleClick": "reset", # 더블클릭 리셋
+ "modeBarButtonsToAdd": ["lasso2d", "select2d"],
+ "showTips": True,
+}
+
+# ===================== 기본 경로 =====================
+from pathlib import Path
+ROOT = Path(__file__).resolve().parent
+DEFAULT_PATH = os.getenv("EXCEL_PATH", str(ROOT / "bayesian_analysis_total_v1.xlsx"))
+EXCEL_PATH = DEFAULT_PATH
+# (load_excel 호출은 DEFAULT_PATH 그대로여도 동작, 명시하려면 EXCEL_PATH로)
+
+# ===================== 레벨 상수 =====================
+LEVEL_OVERALL="전체"; LEVEL_SEGMENT="세그먼트"; LEVEL_MODEL="모델"
+LEVEL_LOYALTY="충성도"; LEVEL_SEG_X_LOY="세그×충성도"
+LEVEL_SEG_X_MODEL="세그×모델"; LEVEL_MODEL_X_LOY="모델×충성도"
+LEVEL_MOD_X_SEG_X_LOY="모델×세그×충성도"
+
+# === 정규화 ===
+ALL_ALIASES = {"ALL","all","All","", " ", " ", "전체", "NONE","None","none","nan","NaN", None}
+LVL_ALIASES = {
+ "모델전체×세그×충성도": "모델×세그×충성도",
+ "세그x모델": "세그×모델",
+ "모델x충성도": "모델×충성도",
+ "세그x충성도": "세그×충성도",
+}
+
+def _as_all(v) -> str:
+ s = "ALL" if v is None else str(v).strip()
+ return "ALL" if s in ALL_ALIASES else s
+
+def _ensure_key_cols(df: pd.DataFrame) -> pd.DataFrame:
+ df = df.copy()
+ for c in ["analysis_level","segment","model","loyalty"]:
+ if c not in df.columns:
+ df[c] = "ALL"
+ df[c] = (
+ df[c].astype(str).str.strip()
+ .replace({
+ "": "ALL", "전체":"ALL",
+ "NONE":"ALL","None":"ALL","none":"ALL",
+ "nan":"ALL","NaN":"ALL",
+ "ALL":"ALL","All":"ALL","all":"ALL"
+ })
+ .fillna("ALL")
+ )
+ if "level" not in df.columns:
+ df["level"] = df["analysis_level"] if "analysis_level" in df.columns else "전체"
+ df["level"] = (
+ df["level"].astype(str).str.strip()
+ .replace({"ALL":"전체","All":"전체","all":"전체"})
+ .replace(LVL_ALIASES)
+ )
+ if "analysis_level" in df.columns:
+ df["analysis_level"] = df["analysis_level"].replace(LVL_ALIASES)
+ return df
+
+# ---- Store JSON 로더 & 스왑 감지 유틸 ----
+def _looks_split_df_json(s: str) -> bool:
+ try:
+ o = json.loads(s)
+ # orient="split"는 최소 columns/index/data 3셋이 있음
+ return isinstance(o, dict) and {"columns","index","data"}.issubset(set(o.keys()))
+ except Exception:
+ return False
+
+def _looks_overall_json(s: str) -> bool:
+ try:
+ o = json.loads(s)
+ return isinstance(o, dict) and any(k in o for k in ("pref_mean","rec_mean","intent_mean","buy_mean"))
+ except Exception:
+ return False
+
+def _safe_read_df_split(js: str | dict | None) -> pd.DataFrame:
+ if js is None:
+ return pd.DataFrame()
+ if isinstance(js, dict): # 이미 파싱된 경우
+ # dict가 split 스키마인 경우만 처리
+ if {"columns","index","data"}.issubset(set(js.keys())):
+ return pd.read_json(io.StringIO(json.dumps(js)), orient="split")
+ return pd.DataFrame()
+ # str
+ try:
+ return pd.read_json(io.StringIO(js), orient="split")
+ except Exception:
+ return pd.DataFrame()
+
+def _safe_read_overall(js: str | dict | None) -> dict:
+ if js is None:
+ return {}
+ if isinstance(js, dict):
+ return js
+ try:
+ o = json.loads(js)
+ return o if isinstance(o, dict) else {}
+ except Exception:
+ return {}
+
+def _maybe_swap_sankey_overall(js_sankey, js_overall):
+ """
+ sankey 캐시와 overall이 뒤바뀌어 들어온 경우 자동 교정.
+ (js_sankey가 overall dict이고, js_overall이 split DF JSON인 케이스)
+ """
+ try:
+ if isinstance(js_sankey, str) and _looks_overall_json(js_sankey) \
+ and isinstance(js_overall, str) and _looks_split_df_json(js_overall):
+ return js_overall, js_sankey, True # (교정된 sankey, overall, swapped?)
+ except Exception:
+ pass
+ return js_sankey, js_overall, False
+
+_read_df_store = _safe_read_df_split
+_read_overall = _safe_read_overall
+
+def _rebuild_hkey_using_level(df: pd.DataFrame) -> pd.DataFrame:
+ df = _ensure_key_cols(df).copy()
+ if "level" in df.columns and df["level"].notna().any():
+ pass
+ elif "analysis_level" in df.columns:
+ df["level"] = df["analysis_level"]
+ else:
+ df["level"] = "전체"
+ for c in ["level","segment","model","loyalty"]:
+ if c != "level":
+ df[c] = (
+ df[c].astype(str).str.strip()
+ .replace({"": "ALL","전체":"ALL","NONE":"ALL","None":"ALL","none":"ALL","nan":"ALL","NaN":"ALL"})
+ .fillna("ALL")
+ )
+ df["level"] = df["level"].replace(LVL_ALIASES)
+ df["hierarchy_key"] = df["level"] + "|" + df["segment"] + "|" + df["model"] + "|" + df["loyalty"]
+ return df
+
+def sample_col_in_df(df) -> str | None:
+ for c in ["pref_sample_size","sample_size","n","N","base","베이스수","표본수"]:
+ if c in df.columns: return c
+ return None
+
+# ==== 비공개 유량 스케일 ====
+FLOW_GLOBAL = True
+GLOBAL_K = 11.3
+
+# ==== 공용: Shape-safe helpers (가짜 키 자동 차단 + 보정) ====
+
+_ALLOWED_SHAPE_KEYS = {
+ "editable","fillcolor","fillrule","label","layer","legend","legendgroup","legendgrouptitle",
+ "legendrank","legendwidth","line","name","opacity","path","showlegend","templateitemname",
+ "type","visible","x0","x1","xanchor","xref","xsizemode","y0","y1","yanchor","yref","ysizemode",
+}
+
+# 여기가 문제의 가짜 키들
+_SHIFT_KEYS = ("x0shift", "x1shift", "y0shift", "y1shift")
+
+def _line_from_kwargs(kwargs: dict):
+ line = {}
+ if "line_color" in kwargs: line["color"] = kwargs.pop("line_color")
+ if "line_width" in kwargs: line["width"] = kwargs.pop("line_width")
+ if "line_dash" in kwargs: line["dash"] = kwargs.pop("line_dash")
+ return {k: v for k, v in line.items() if v is not None}
+
+def _clean_shape_kwargs(kwargs: dict):
+ """
+ 1) *_shift 키 제거
+ 2) line_* → line 병합
+ 3) 허용 키만 남기기
+ """
+ kwargs = dict(kwargs) # shallow copy
+ # 1) 가짜 shift 키 모두 제거
+ for k in _SHIFT_KEYS:
+ kwargs.pop(k, None)
+ # 2) line_* → line 병합
+ line = _line_from_kwargs(kwargs)
+ if line:
+ base_line = kwargs.get("line") or {}
+ kwargs["line"] = {**base_line, **line}
+ # 3) 허용 키만 통과
+ return {k: v for k, v in kwargs.items() if (k in _ALLOWED_SHAPE_KEYS and v is not None)}
+
+def add_vline_safe(fig, x, **kwargs):
+ """세로 기준선(가짜 키 차단, line_* 병합)"""
+ base = dict(
+ type="line", xref="x", x0=float(x), x1=float(x),
+ yref="paper", y0=0, y1=1,
+ layer=kwargs.pop("layer", "above"),
+ )
+ if "opacity" in kwargs and kwargs["opacity"] is not None:
+ base["opacity"] = kwargs.pop("opacity")
+ base.update(_clean_shape_kwargs(kwargs))
+ return fig.add_shape(**base)
+
+def add_hline_safe(fig, y, **kwargs):
+ """가로 기준선(가짜 키 차단, line_* 병합)"""
+ base = dict(
+ type="line", yref="y", y0=float(y), y1=float(y),
+ xref="paper", x0=0, x1=1,
+ layer=kwargs.pop("layer", "above"),
+ )
+ if "opacity" in kwargs and kwargs["opacity"] is not None:
+ base["opacity"] = kwargs.pop("opacity")
+ base.update(_clean_shape_kwargs(kwargs))
+ return fig.add_shape(**base)
+
+def _pad_top(fig, px=40):
+ # 기존 margin 유지 + top만 늘림
+ m = fig.layout.margin or {}
+ fig.update_layout(margin=dict(
+ l=int(getattr(m, "l", 10) or 10),
+ r=int(getattr(m, "r", 10) or 10),
+ b=int(getattr(m, "b", 10) or 10),
+ t=int(getattr(m, "t", 0) or 0) + int(px),
+ ))
+ return fig
+
+def add_vrect_safe(fig, x0, x1, **kwargs):
+ """
+ add_vrect 대체: x0shift/x1shift를 값에 반영 후 제거하고,
+ 나머지 키는 안전하게 정리해서 rect shape로 추가.
+ """
+ # ── shift 보정 ──
+ dx0 = float(kwargs.pop("x0shift", 0) or 0)
+ dx1 = float(kwargs.pop("x1shift", 0) or 0)
+ x0 = float(x0) + dx0
+ x1 = float(x1) + dx1
+
+ # yref 자동 판정(명시가 있으면 존중)
+ yref = kwargs.pop("yref", None)
+ has_y = ("y0" in kwargs) or ("y1" in kwargs)
+ if yref is None:
+ yref = "y" if has_y else "paper"
+
+ # paper 좌표 기본값
+ y0_default, y1_default = (0, 1) if yref == "paper" else (None, None)
+
+ base = dict(
+ type="rect", xref="x", x0=x0, x1=x1,
+ yref=yref, y0=kwargs.pop("y0", y0_default), y1=kwargs.pop("y1", y1_default),
+ layer=kwargs.pop("layer", "below"),
+ fillcolor=kwargs.pop("fillcolor", "rgba(0,0,0,0.06)"),
+ )
+ if base["yref"] == "y":
+ # 데이터 축이면 None인 y0/y1 제거
+ if base.get("y0") is None: base.pop("y0", None)
+ if base.get("y1") is None: base.pop("y1", None)
+
+ if "opacity" in kwargs and kwargs["opacity"] is not None:
+ base["opacity"] = kwargs.pop("opacity")
+
+ base.update(_clean_shape_kwargs(kwargs))
+ return fig.add_shape(**base)
+
+# (선택) 만약 어딘가에서 layout.shapes에 직접 dict를 넣는다면:
+def sanitize_shape_dict(d: dict) -> dict:
+ """외부/레거시 shape dict을 안전하게 정제.
+ - x0shift/x1shift/y0shift/y1shift 값을 좌표에 반영하고 키 제거
+ - line_* 키 병합
+ - 허용되지 않는 키 삭제
+ """
+ d = dict(d or {})
+
+ # 1) shift -> 좌표 반영
+ for sh_key, coord_key in (("x0shift","x0"),("x1shift","x1"),("y0shift","y0"),("y1shift","y1")):
+ if sh_key in d:
+ try:
+ if coord_key in d and d[coord_key] is not None:
+ d[coord_key] = float(d[coord_key]) + float(d.pop(sh_key) or 0.0)
+ else:
+ d.pop(sh_key, None)
+ except Exception:
+ d.pop(sh_key, None)
+
+ # 2) line_* -> line 병합
+ line = {}
+ if "line_color" in d: line["color"] = d.pop("line_color")
+ if "line_width" in d: line["width"] = d.pop("line_width")
+ if "line_dash" in d: line["dash"] = d.pop("line_dash")
+ if line:
+ base_line = d.get("line") or {}
+ d["line"] = {**base_line, **{k:v for k,v in line.items() if v is not None}}
+
+ # 3) 허용 키만 남기기
+ return {k: v for k, v in d.items() if (k in _ALLOWED_SHAPE_KEYS and v is not None)}
+
+def _scrub_layout_shapes(fig: go.Figure) -> go.Figure:
+ """
+ layout.shapes에 남아있는 비정상 키(x0shift 같은 잔재)를 일괄 제거.
+ """
+ try:
+ shapes = list(fig.layout.shapes) if fig.layout.shapes is not None else []
+ cleaned = []
+ for sh in shapes:
+ try:
+ sd = sh.to_plotly_json() if hasattr(sh, "to_plotly_json") else dict(sh)
+ cleaned.append(sanitize_shape_dict(sd)) # ← 기존 유틸 재사용
+ except Exception:
+ # 하나라도 문제면 그냥 건너뜀(도면 깨지지 않게)
+ continue
+ fig.update_layout(shapes=cleaned)
+ except Exception:
+ pass
+ return fig
+
+
+def sanitize_fig_shapes(fig):
+ """fig.layout.shapes 전부 sanitize."""
+ try:
+ shapes = list(fig.layout.shapes) if fig.layout.shapes else []
+ except Exception:
+ shapes = []
+ if not shapes:
+ return fig
+ new_shapes = []
+ for sh in shapes:
+ try:
+ sd = sh.to_plotly_json() if hasattr(sh, "to_plotly_json") else dict(sh)
+ new_shapes.append(sanitize_shape_dict(sd))
+ except Exception:
+ # 망가진 건 버림
+ pass
+ fig.update_layout(shapes=new_shapes)
+ return fig
+
+# ===================== 팔레트 =====================
+COL_RED = "#C32C2C" # 빨강
+COL_ORANGE = "#D24D3E" # 주황
+COL_YELLOW = "#DE937A" # 노랑
+COL_BEIGE = "#D49442" # 베이지
+COL_GREEN_LITE = "#2B8E81" # 초록(기본)
+COL_GREEN_DARK = "#21786E" # 초록 진한톤(필요시)
+COL_GRAY = "#D3D3D3"
+
+def _hex_to_rgb_tuple(h): # 유틸
+ h = h.lstrip("#")
+ return [int(h[i:i+2], 16) for i in (0,2,4)]
+
+def royg_color_for(values: np.ndarray) -> list:
+ v = np.asarray(values, dtype=float)
+ if v.size == 0: return []
+ if not np.isfinite(v).any():
+ return [COL_GREEN_DARK] * len(v)
+
+ lo = np.nanmin(v); hi = np.nanmax(v)
+ t = np.zeros_like(v) if (not np.isfinite(lo) or not np.isfinite(hi) or hi-lo < 1e-12) else (v-lo)/(hi-lo)
+
+ # 낮은값(좋음) → 높은값(나쁨): 초 → 베 → 노 → 주 → 빨
+ cols = np.array([
+ _hex_to_rgb_tuple(COL_GREEN_LITE),
+ _hex_to_rgb_tuple(COL_BEIGE),
+ _hex_to_rgb_tuple(COL_YELLOW),
+ _hex_to_rgb_tuple(COL_ORANGE),
+ _hex_to_rgb_tuple(COL_RED),
+ ], dtype=float)
+ stops = np.array([0.0, 0.25, 0.5, 0.75, 1.0])
+
+ r = np.interp(t, stops, cols[:,0]); g = np.interp(t, stops, cols[:,1]); b = np.interp(t, stops, cols[:,2])
+ out = []
+ for rr, gg, bb in zip(r,g,b):
+ if not (np.isfinite(rr) and np.isfinite(gg) and np.isfinite(bb)):
+ out.append('rgb(140,140,140)')
+ else:
+ out.append(f'rgb({int(round(rr))},{int(round(gg))},{int(round(bb))})')
+ return out
+
+
+# ==== DESIGN CONSTANTS (tiers & neutrals) ====
+COL_BLUE_DEEP = "#1E3A8A" # 진파랑(하이엔드)
+COL_BLUE_SKY = "#60A5FA" # 하늘(미드)
+COL_GRAY_MED = "#9CA3AF" # 회색(로우/중립)
+COL_BLACK = "#111111" # 포레스트 플롯용
+
+# 세그/티어 → 색 매핑 (모든 키는 소문자 기준으로 저장)
+_SEG_TIER_COLOR = {
+ # High/Premium 계열
+ "highend": COL_BLUE_DEEP, "high": COL_BLUE_DEEP, "premium": COL_BLUE_DEEP,
+ "하이엔드": COL_BLUE_DEEP, "프리미엄": COL_BLUE_DEEP,
+ # Mid 계열
+ "midend": COL_BLUE_SKY, "mid": COL_BLUE_SKY, "midrange": COL_BLUE_SKY,
+ "미드": COL_BLUE_SKY, "중간": COL_BLUE_SKY,
+ # Low/Entry 계열
+ "lowend": COL_GRAY_MED, "low": COL_GRAY_MED, "entry": COL_GRAY_MED,
+ "로우엔드": COL_GRAY_MED, "저가": COL_GRAY_MED,
+}
+
+def _norm_key(x) -> str:
+ return "" if x is None else str(x).strip().lower()
+
+def _tier_color_for_segment(seg: str) -> str:
+ """세그 이름을 느슨하게 받아 컬러로 매핑(대소문자/공백/한글 허용)."""
+ return _SEG_TIER_COLOR.get(_norm_key(seg), COL_GRAY_MED)
+
+def _model_dominant_segment(df_scope: pd.DataFrame) -> dict:
+ """
+ 모델별 '표본수 가중' 우세 세그. segment가 ALL/전체인 행은 제외.
+ 반환: {model(str): segment(str)}
+ """
+ if df_scope is None or df_scope.empty or "model" not in df_scope.columns or "segment" not in df_scope.columns:
+ return {}
+
+ s = df_scope.copy()
+ # ALL/전체 drop
+ seg_norm = s["segment"].astype(str).str.strip()
+ m_valid = ~seg_norm.isin(["ALL", "전체"]) & seg_norm.notna()
+ s = s[m_valid]
+ if s.empty:
+ return {}
+
+ w = pd.to_numeric(s.get("pref_sample_size", 1), errors="coerce").replace([np.inf, -np.inf], np.nan).fillna(1.0)
+ s["__w__"] = w
+
+ grp = s.groupby(["model", "segment"], as_index=False)["__w__"].sum()
+ # 각 모델에서 가중치 최대인 세그 1개 선택
+ dom = grp.sort_values(["model", "__w__"], ascending=[True, False]).drop_duplicates("model")
+ return {str(r["model"]): str(r["segment"]) for _, r in dom.iterrows()}
+
+
+# ===================== 앱 =====================
+app = Dash(__name__)
+app.title = "Bayesian Journey Dashboard"
+px.defaults.template = "plotly_white"
+
+def _safe_num(x, default=np.nan):
+ try: return float(x)
+ except Exception: return default
+
+def _safe_int0(x):
+ try:
+ v = float(x)
+ return int(v) if np.isfinite(v) else 0
+ except Exception:
+ return 0
+
+def _norm_cols(df: pd.DataFrame) -> pd.DataFrame:
+ if df is None or df.empty: return pd.DataFrame()
+ df = df.copy()
+ df.columns = [str(c).strip() for c in df.columns]
+ for c in df.columns:
+ if df[c].dtype == "O":
+ ser = pd.to_numeric(df[c], errors="coerce")
+ if ser.notna().mean() >= 0.5: df[c] = ser
+ return df
+
+def _ci_to_sd(lo, hi):
+ lo = np.asarray(lo, dtype=float); hi = np.asarray(hi, dtype=float)
+ return (hi - lo)/(2*1.96)
+
+def _grade_from_p(p):
+ if not np.isfinite(p): return "N/A"
+ if p >= 0.70: return "A"
+ if p >= 0.55: return "B"
+ if p >= 0.45: return "C"
+ return "D"
+
+def _auto_dtick(span):
+ # 0~1 퍼센트 축 span 기준
+ if span <= 0.30: return 0.05 # 5%
+ if span >= 0.80: return 0.20 # 20%
+ return 0.10 # 10%
+
+def apply_dense_grid(fig: go.Figure, x_prob: bool = False, y_prob: bool = False) -> go.Figure:
+ # 1) 기존 높이 보존(없을 때만 360 지정)
+ cur_h = getattr(fig.layout, "height", None)
+ fig.update_layout(
+ height=(cur_h if cur_h is not None else 360),
+ showlegend=True,
+ paper_bgcolor="#fff",
+ plot_bgcolor="#fff",
+ font=dict(color="#111"),
+ margin=dict(l=10, r=10, t=30, b=10),
+ )
+
+ # 2) 기본 격자
+ fig.update_xaxes(showline=False, mirror=False, linewidth=0)
+ fig.update_yaxes(showline=False, mirror=False, linewidth=0)
+
+ # 3) plotly 버전별 minor 옵션 안전 처리
+ try:
+ fig.update_xaxes(minor=dict(showgrid=False))
+ fig.update_yaxes(minor=dict(showgrid=False))
+ except Exception:
+ pass
+
+ # 4) 확률축(0~1) 포맷
+ if x_prob:
+ xr = (getattr(fig.layout.xaxis, "range", None) or [0, 1])
+ span = (xr[1] - xr[0]) if isinstance(xr, (list, tuple)) and len(xr) == 2 else 1.0
+ fig.update_xaxes(tick0=0, dtick=_auto_dtick(span), tickformat=".0%")
+ if y_prob:
+ yr = (getattr(fig.layout.yaxis, "range", None) or [0, 1])
+ span = (yr[1] - yr[0]) if isinstance(yr, (list, tuple)) and len(yr) == 2 else 1.0
+ fig.update_yaxes(tick0=0, dtick=_auto_dtick(span), tickformat=".0%")
+
+ # 5) 인터랙션 상태 유지
+ fig.update_layout(uirevision="keep")
+
+ # 6) 레이아웃 shape 잔재(x0shift 등) 전역 스크럽
+ try:
+ fig = _scrub_layout_shapes(fig) # sanitize_shape_dict를 내부에서 활용
+ except Exception:
+ pass
+
+ return fig
+
+
+ # ★ 여기 추가: 모든 shape 정제
+ try:
+ sanitize_fig_shapes(fig)
+ except Exception:
+ pass
+
+ return fig
+
+
+# ---- Excel 오픈(엔진 폴백 + 디버그 수집) ----
+def _open_excel_with_fallback(path: str):
+ errs = []
+ for eng in ["openpyxl", None, "xlrd"]:
+ try:
+ xls = pd.ExcelFile(path, engine=eng) if eng else pd.ExcelFile(path)
+ return xls, (eng or "auto")
+ except Exception as e:
+ errs.append(f"{(eng or 'auto')}: {type(e).__name__}::{e}")
+ raise RuntimeError("Excel open failed | " + " | ".join(errs))
+
+def _find_sheet(xls: pd.ExcelFile, candidates):
+ names = xls.sheet_names
+ norm = lambda s: re.sub(r"\s+", "", str(s)).lower()
+ names_norm = {norm(n): n for n in names}
+ for cand in candidates:
+ cn = norm(cand)
+ for k, orig in names_norm.items():
+ if cn in k:
+ return orig
+ return None
+
+def load_excel(path: str):
+ if not os.path.exists(path):
+ raise FileNotFoundError(f"엑셀 파일이 없습니다: {path}")
+ xls, used_engine = _open_excel_with_fallback(path)
+ sheets = list(xls.sheet_names)
+
+ sh_master = _find_sheet(xls, ["VBA마스터테이블", "마스터", "master", "mastertable", "마스터테이블"])
+ sh_tm = _find_sheet(xls, ["베이지안전이확률매트릭스", "전이확률", "transition", "matrix"])
+ sh_sankey = _find_sheet(xls, ["베이지안생키다이어그램", "생키", "sankey", "flow"])
+
+ dbg = {"engine": used_engine, "sheets": sheets,
+ "matched": {"master": sh_master, "tm": sh_tm, "sankey": sh_sankey}}
+
+ if not sh_master:
+ raise ValueError(f"필수 시트(마스터) 미발견 | sheets={sheets}")
+
+ df_master = _norm_cols(pd.read_excel(xls, sh_master))
+ df_tm = _norm_cols(pd.read_excel(xls, sh_tm)) if sh_tm else pd.DataFrame()
+ df_sankey = _norm_cols(pd.read_excel(xls, sh_sankey)) if sh_sankey else pd.DataFrame()
+
+ df_master = _rebuild_hkey_using_level(df_master)
+ if not df_tm.empty: df_tm = _rebuild_hkey_using_level(df_tm)
+ if not df_sankey.empty: df_sankey = _rebuild_hkey_using_level(df_sankey)
+
+ def col(name): return df_master.get(name, pd.Series(np.nan, index=df_master.index))
+ overall = {
+ "pref_mean": float(np.nanmean(col("pref_success_rate"))),
+ "rec_mean": float(np.nanmean(col("rec_success_rate"))),
+ "intent_mean": float(np.nanmean(col("intent_success_rate"))),
+ "buy_mean": float(np.nanmean(col("buy_success_rate"))),
+ "pref_sd": float(np.nanmean(_ci_to_sd(col("pref_ci_lower"), col("pref_ci_upper")))),
+ "rec_sd": float(np.nanmean(_ci_to_sd(col("rec_ci_lower"), col("rec_ci_upper")))),
+ "intent_sd": float(np.nanmean(_ci_to_sd(col("intent_ci_lower"), col("intent_ci_upper")))),
+ "buy_sd": float(np.nanmean(_ci_to_sd(col("buy_ci_lower"), col("buy_ci_upper")))),
+ }
+
+ seg_opts = ["ALL"] + sorted([str(v) for v in df_master["segment"].dropna().unique() if str(v)!="ALL"])
+ loy_opts = ["ALL"] + sorted([str(v) for v in df_master["loyalty"].dropna().unique() if str(v)!="ALL"])
+ mod_opts_all = ["ALL"] + sorted([str(v) for v in df_master["model"].dropna().unique() if str(v)!="ALL"])
+
+ return df_master, df_tm, df_sankey, overall, seg_opts, mod_opts_all, loy_opts, dbg
+
+# ===================== 선택/집계 로직 =====================
+def pick_row_for(df_master: pd.DataFrame, seg, mod, loy):
+ seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy)
+ df = _ensure_key_cols(df_master)
+
+ sort_col = sample_col_in_df(df)
+ if sort_col is None:
+ sort_col = "__tmp_n__"; df[sort_col] = 1
+
+ def add_pref_score(sub: pd.DataFrame) -> pd.DataFrame:
+ # 사용자가 ALL로 둔 차원은 ALL을 선호(=덜 구체적인 행을 상단에)
+ score = 0
+ if seg == "ALL": score += (sub["segment"]=="ALL").astype(int)
+ if mod == "ALL": score += (sub["model"]=="ALL").astype(int)
+ if loy == "ALL": score += (sub["loyalty"]=="ALL").astype(int)
+ sub = sub.copy(); sub["__score__"] = score
+ return sub
+
+ chosen = (seg!="ALL") + (mod!="ALL") + (loy!="ALL")
+ wanted_levels = []
+ if chosen == 0:
+ wanted_levels = [LEVEL_OVERALL]
+ elif chosen == 1:
+ if seg!="ALL": wanted_levels = [LEVEL_SEGMENT, LEVEL_OVERALL]
+ if mod!="ALL": wanted_levels = [LEVEL_MODEL, LEVEL_OVERALL]
+ if loy!="ALL": wanted_levels = [LEVEL_LOYALTY, LEVEL_OVERALL]
+ elif chosen == 2:
+ if seg!="ALL" and mod!="ALL":
+ wanted_levels = [LEVEL_SEG_X_MODEL, LEVEL_SEGMENT, LEVEL_MODEL, LEVEL_OVERALL]
+ elif seg!="ALL" and loy!="ALL":
+ wanted_levels = [LEVEL_SEG_X_LOY, LEVEL_SEGMENT, LEVEL_LOYALTY, LEVEL_OVERALL]
+ elif mod!="ALL" and loy!="ALL":
+ wanted_levels = [LEVEL_MODEL_X_LOY, LEVEL_MODEL, LEVEL_LOYALTY, LEVEL_OVERALL]
+ else:
+ wanted_levels = [
+ LEVEL_MOD_X_SEG_X_LOY, LEVEL_SEG_X_LOY, LEVEL_SEG_X_MODEL, LEVEL_MODEL_X_LOY,
+ LEVEL_MODEL, LEVEL_SEGMENT, LEVEL_LOYALTY, LEVEL_OVERALL
+ ]
+
+ # 1) 레벨 우선 매칭
+ for lvl in wanted_levels:
+ sub = df[df["level"] == lvl]
+ if seg!="ALL": sub = sub[sub["segment"] == seg]
+ if mod!="ALL": sub = sub[sub["model"] == mod]
+ if loy!="ALL": sub = sub[sub["loyalty"] == loy]
+ if not sub.empty:
+ sub = add_pref_score(sub).sort_values(["__score__", sort_col], ascending=[False, False])
+ row = sub.iloc[0]
+ return row.drop(labels=[c for c in ["__score__","__tmp_n__"] if c in row.index])
+
+ # 2) 정확 조합 실패 시, 부분조합 매칭
+ sub = df.copy()
+ if seg!="ALL": sub = sub[sub["segment"] == seg]
+ if mod!="ALL": sub = sub[sub["model"] == mod]
+ if loy!="ALL": sub = sub[sub["loyalty"] == loy]
+ if not sub.empty:
+ sub = add_pref_score(sub).sort_values(["__score__", sort_col], ascending=[False, False])
+ row = sub.iloc[0]
+ return row.drop(labels=[c for c in ["__score__","__tmp_n__"] if c in row.index])
+
+ # 3) 단일 컬럼만 맞는 행이라도
+ for col, val in [("segment", seg), ("model", mod), ("loyalty", loy)]:
+ if val != "ALL":
+ sub = df[df[col]==val]
+ if not sub.empty:
+ sub = add_pref_score(sub).sort_values(["__score__", sort_col], ascending=[False, False])
+ row = sub.iloc[0]
+ return row.drop(labels=[c for c in ["__score__","__tmp_n__"] if c in row.index])
+
+ # 4) 완전 실패 시 표본수 최대
+ row = df.sort_values(sort_col, ascending=False).iloc[0]
+ return row.drop(labels=[c for c in ["__score__","__tmp_n__"] if c in row.index])
+
+# ===================== 차트/표 유틸 =====================
+def _pick_sample_for_stage(r, stage_prefix: str) -> int:
+ for c in [f"{stage_prefix}_sample_size", "sample_size", "n", "N", "base", "베이스수", "표본수"]:
+ if c in r and pd.notna(r.get(c)):
+ return _safe_int0(r.get(c))
+ return _safe_int0(r.get("pref_sample_size"))
+
+def metrics_table_row(r):
+ def sd_from_ci(lo, hi):
+ if pd.isna(lo) or pd.isna(hi): return np.nan
+ return (hi - lo)/(2*1.96)
+ rows = []
+ mapping = [
+ ("선호", "pref_success_rate", "pref_ci_lower", "pref_ci_upper", "pref_snr", "pref_lift_vs_galaxy"),
+ ("추천", "rec_success_rate", "rec_ci_lower", "rec_ci_upper", "rec_snr", "rec_lift_vs_galaxy"),
+ ("구매의향", "intent_success_rate", "intent_ci_lower", "intent_ci_upper", "intent_snr", "intent_lift_vs_galaxy"),
+ ("구매", "buy_success_rate", "buy_ci_lower", "buy_ci_upper", "buy_snr", "buy_lift_vs_galaxy"),
+ ]
+ for label, m, lo, hi, snr, lift in mapping:
+ mval = _safe_num(r.get(m))
+ loval = _safe_num(r.get(lo))
+ hival = _safe_num(r.get(hi))
+ snrval = _safe_num(r.get(snr))
+ liftval= _safe_num(r.get(lift))
+ stage_prefix = m.split("_")[0]
+ rows.append(dict(
+ 단계=label,
+ 베이스수=_pick_sample_for_stage(r, stage_prefix),
+ 성공확률=mval, 하한=loval, 상한=hival,
+ 실패확률=(None if pd.isna(mval) else 1-mval),
+ 판정=("성공" if (np.isfinite(mval) and mval>=0.5) else ("실패" if np.isfinite(mval) else "N/A")),
+ 평가등급=("N/A" if not np.isfinite(mval) else ("A" if mval>=0.70 else "B" if mval>=0.55 else "C" if mval>=0.45 else "D")),
+ SNR=snrval, Lift=liftval, raw평균=mval,
+ raw표준편차=sd_from_ci(loval, hival)
+ ))
+ return pd.DataFrame(rows)
+
+def drops_from_anywhere(row, df_tm, seg, mod, loy):
+ seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy)
+ d1 = _safe_num(row.get("bayesian_dropout_pref_to_rec"))
+ d2 = _safe_num(row.get("bayesian_dropout_rec_to_intent"))
+ d3 = _safe_num(row.get("bayesian_dropout_intent_to_buy"))
+ full = _safe_num(row.get("bayesian_full_conversion"))
+ if df_tm is None or df_tm.empty:
+ return d1, d2, d3, full
+ need = [np.isfinite(d1), np.isfinite(d2), np.isfinite(d3), np.isfinite(full)]
+ if all(need): return d1, d2, d3, full
+ m = pd.Series(True, index=df_tm.index)
+ if "segment" in df_tm and seg!="ALL": m &= (df_tm["segment"].astype(str)==seg)
+ if "model" in df_tm and mod!="ALL": m &= (df_tm["model"].astype(str)==mod)
+ if "loyalty" in df_tm and loy!="ALL": m &= (df_tm["loyalty"].astype(str)==loy)
+ sub = df_tm[m].copy()
+ if sub.empty: sub = df_tm.copy()
+ w = pd.to_numeric(sub.get("pref_sample_size", pd.Series(1, index=sub.index)), errors="coerce").fillna(1)
+ def wmean(col):
+ v = pd.to_numeric(sub.get(col, pd.Series(np.nan, index=sub.index)), errors="coerce")
+ if v.notna().any(): return float(np.nansum(v*w)/np.nansum(w))
+ return np.nan
+ d1 = d1 if np.isfinite(d1) else wmean("bayesian_dropout_pref_to_rec")
+ d2 = d2 if np.isfinite(d2) else wmean("bayesian_dropout_rec_to_intent")
+ d3 = d3 if np.isfinite(d3) else wmean("bayesian_dropout_intent_to_buy")
+ full = full if np.isfinite(full) else wmean("bayesian_full_conversion")
+ return d1, d2, d3, full
+
+def biggest_drop_text_by_sources(row, df_tm, seg, mod, loy):
+ d1, d2, d3, _ = drops_from_anywhere(row, df_tm, seg, mod, loy)
+ pairs = [("선호→추천", d1), ("추천→구매의향", d2), ("구매의향→구매", d3)]
+ pairs = [(n, v) for n, v in pairs if np.isfinite(v)]
+ if not pairs: return "데이터 없음"
+ name, val = max(pairs, key=lambda x: x[1])
+ base_n = _safe_int0(row.get("pref_sample_size"))
+ return f"{name}에서 {val*100:.1f}%p 손실 (샘플 {base_n:,})"
+
+def compose_composite_row(df_scope: pd.DataFrame) -> pd.Series:
+ if df_scope is None or df_scope.empty:
+ return pd.Series(dtype=float)
+ s = df_scope.copy()
+ w = pd.to_numeric(s.get("pref_sample_size", pd.Series(1, index=s.index)), errors="coerce").fillna(1.0)
+ w_sum = float(np.nansum(w)) if np.isfinite(np.nansum(w)) and np.nansum(w) > 0 else 1.0
+ w_norm = w / w_sum
+ def wmean(col):
+ v = pd.to_numeric(s.get(col, pd.Series(np.nan, index=s.index)), errors="coerce")
+ if v.notna().any(): return float(np.nansum(v * w_norm))
+ return np.nan
+ def combine_ci(lo_col, hi_col, mean_col):
+ m = pd.to_numeric(s.get(mean_col, pd.Series(np.nan, index=s.index)), errors="coerce")
+ lo = pd.to_numeric(s.get(lo_col, pd.Series(np.nan, index=s.index)), errors="coerce")
+ hi = pd.to_numeric(s.get(hi_col, pd.Series(np.nan, index=s.index)), errors="coerce")
+ if not (m.notna().any() and lo.notna().any() and hi.notna().any()):
+ return np.nan, np.nan
+ m_bar = float(np.nansum(m * w_norm))
+ sd = (hi - lo) / (2 * 1.96)
+ sd = pd.to_numeric(sd, errors="coerce")
+ var = np.nansum(w_norm * (sd**2 + (m - m_bar)**2))
+ sd_c = float(np.sqrt(var)) if np.isfinite(var) else np.nan
+ if not np.isfinite(sd_c): return np.nan, np.nan
+ return (m_bar - 1.96 * sd_c), (m_bar + 1.96 * sd_c)
+ pref_m = wmean("pref_success_rate")
+ rec_m = wmean("rec_success_rate")
+ intent_m = wmean("intent_success_rate")
+ buy_m = wmean("buy_success_rate")
+ pref_lo, pref_hi = combine_ci("pref_ci_lower", "pref_ci_upper", "pref_success_rate")
+ rec_lo, rec_hi = combine_ci("rec_ci_lower", "rec_ci_upper", "rec_success_rate")
+ intent_lo, intent_hi = combine_ci("intent_ci_lower", "intent_ci_upper", "intent_success_rate")
+ buy_lo, buy_hi = combine_ci("buy_ci_lower", "buy_ci_upper", "buy_success_rate")
+ d1 = wmean("bayesian_dropout_pref_to_rec")
+ d2 = wmean("bayesian_dropout_rec_to_intent")
+ d3 = wmean("bayesian_dropout_intent_to_buy")
+ full = wmean("bayesian_full_conversion")
+ pref_snr = wmean("pref_snr"); rec_snr = wmean("rec_snr")
+ intent_snr = wmean("intent_snr"); buy_snr = wmean("buy_snr")
+ pref_lift = wmean("pref_lift_vs_galaxy"); rec_lift = wmean("rec_lift_vs_galaxy")
+ intent_lift = wmean("intent_lift_vs_galaxy"); buy_lift = wmean("buy_lift_vs_galaxy")
+ out = {
+ "pref_sample_size": float(np.nansum(w)),
+ "pref_success_rate": pref_m, "pref_ci_lower": pref_lo, "pref_ci_upper": pref_hi,
+ "rec_success_rate": rec_m, "rec_ci_lower": rec_lo, "rec_ci_upper": rec_hi,
+ "intent_success_rate": intent_m, "intent_ci_lower": intent_lo, "intent_ci_upper": intent_hi,
+ "buy_success_rate": buy_m, "buy_ci_lower": buy_lo, "buy_ci_upper": buy_hi,
+ "bayesian_dropout_pref_to_rec": d1,
+ "bayesian_dropout_rec_to_intent": d2,
+ "bayesian_dropout_intent_to_buy": d3,
+ "bayesian_full_conversion": full,
+ "pref_snr": pref_snr, "rec_snr": rec_snr, "intent_snr": intent_snr, "buy_snr": buy_snr,
+ "pref_lift_vs_galaxy": pref_lift, "rec_lift_vs_galaxy": rec_lift,
+ "intent_lift_vs_galaxy": intent_lift, "buy_lift_vs_galaxy": buy_lift,
+ }
+ return pd.Series(out)
+
+# ===================== 차트 =====================
+def _empty_fig(msg="Load data first", height=360, hide_axes=False):
+ fig = go.Figure()
+ fig.add_annotation(text=msg, x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False)
+ fig.update_layout(
+ height=height,
+ margin=dict(l=10, r=10, t=30, b=10),
+ paper_bgcolor="#ffffff",
+ plot_bgcolor="#ffffff",
+ uirevision="keep",
+ )
+ fig = apply_dense_grid(fig) # 기존 스타일 유지
+
+ if hide_axes: # Sankey 등 카테시안 축이 불필요한 경우
+ fig.update_xaxes(visible=False, showgrid=False, zeroline=False)
+ fig.update_yaxes(visible=False, showgrid=False, zeroline=False)
+
+ return fig
+
+def hex_to_rgba(hex_color: str, a: float | None = None) -> str:
+ s = hex_color.strip().lstrip("#")
+ if len(s) in (3, 4):
+ s = "".join(ch * 2 for ch in s)
+ if len(s) == 6:
+ r = int(s[0:2], 16); g = int(s[2:4], 16); b = int(s[4:6], 16)
+ alpha = 1.0 if a is None else float(a)
+ elif len(s) == 8:
+ r = int(s[0:2], 16); g = int(s[2:4], 16); b = int(s[4:6], 16)
+ hex_alpha = int(s[6:8], 16) / 255.0
+ alpha = hex_alpha if a is None else float(a)
+ else:
+ raise ValueError("hex must be #RGB, #RRGGBB, or #RRGGBBAA")
+ alpha = max(0.0, min(1.0, alpha))
+ return f"rgba({r},{g},{b},{alpha:.3g})"
+
+
+def _normalize_stage_label(v: str) -> str | None:
+ if v is None:
+ return None
+ s = str(v).strip().lower()
+ s = re.sub(r'[\s\-\_]+', ' ', s) # 공백/-,_ 정리
+ joined = s.replace(' ', '')
+
+ # 전체
+ if any(k in (s, joined) for k in [
+ "overall","total","all","전체","전체사용자","모든사용자","allusers","all user","all-user"
+ ]):
+ return "전체"
+
+ # 미선호(비선호/탈락/드랍/No preference 등)
+ if any(k in (s, joined) for k in [
+ "미선호","비선호","선호아님","선호 아님",
+ "nopref","no preference","dislike","탈락","drop","dropped"
+ ]):
+ return "미선호"
+
+ # 구매의향(의향/의도/의사/intent 계열)
+ if ("의향" in s) or ("의도" in s) or ("의사" in s) \
+ or ("intent" in s) or ("intention" in s) \
+ or ("purchaseintent" in joined) or ("purchase-intent" in s):
+ return "구매의향"
+
+ # 구매(실제구매/구매완료/구매확정/구입/결제/매출/buy/purchase)
+ if ("구매" in s) or ("구입" in s) or ("결제" in s) or ("결재" in s) or ("매출" in s) \
+ or (s == "buy") or ("purchase" in s):
+ return "구매"
+
+ # 선호
+ if ("선호" in s) or ("호감" in s) or ("preference" in s) or (s == "pref"):
+ return "선호"
+
+ # 추천
+ if (s == "rec") or ("recommend" in s) or ("추천" in s):
+ return "추천"
+
+ return None
+
+# ==== STAGES & ORDER (기존 것을 교체) ====
+STAGES = ["전체", "미선호", "선호", "추천", "구매의향", "구매"]
+ORDER = {v:i for i,v in enumerate(STAGES)}
+
+# 색상 하나 추가(은은한 회색 계열 권장)
+COL_STAGE_DROP = "#CBD5E1" # 미선호
+
+def _group_forward_flows(df_sankey, seg, mod, loy):
+ if df_sankey is None or df_sankey.empty:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","flow_phi"])
+ seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy)
+ s = df_sankey.copy()
+ m = pd.Series(True, index=s.index)
+ if "segment" in s and seg!="ALL": m &= (s["segment"].astype(str)==seg)
+ if "model" in s and mod!="ALL": m &= (s["model"].astype(str)==mod)
+ if "loyalty" in s and loy!="ALL": m &= (s["loyalty"].astype(str)==loy)
+ s = s[m].copy()
+ if s.empty:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","flow_phi"])
+
+ alias = {
+ "all":"전체","ALL":"전체","전체":"전체",
+ "pref":"선호","preference":"선호","선호도":"선호",
+ "rec":"추천","recommend":"추천","추천도":"추천",
+ "intent":"구매의향","intention":"구매의향","구매의도":"구매의향",
+ "purchase":"구매","buy":"구매","실제구매":"구매"
+ }
+ s["from_stage"] = s.get("from_stage", s.get("from", s.get("source"))).astype(str).str.strip().replace(alias)
+ s["to_stage"] = s.get("to_stage", s.get("to", s.get("target"))).astype(str).str.strip().replace(alias)
+
+ # 🔑 count 별칭 허용
+ cnt_cands = ["bayesian_flow_count","count","value","weight","n","freq"]
+ cnt_col = next((c for c in cnt_cands if c in s.columns), None)
+ if cnt_col is None:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","flow_phi"])
+
+ s[cnt_col] = pd.to_numeric(s[cnt_col], errors="coerce")
+ s = s[np.isfinite(s[cnt_col]) & (s[cnt_col]>0)]
+ s = s[s["from_stage"].isin(STAGES) & s["to_stage"].isin(STAGES)]
+ s = s[s.apply(lambda r: ORDER[r["from_stage"]] < ORDER[r["to_stage"]], axis=1)]
+ if s.empty:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","flow_phi"])
+
+ g = (s.groupby(["from_stage","to_stage"], as_index=False)[cnt_col]
+ .sum().rename(columns={cnt_col:"count"}))
+
+ # [유입 없는 단계 보강] 전체→단계 링크 자동 추가
+ pairs = set(zip(g["from_stage"], g["to_stage"]))
+ def _has_incoming(stage):
+ k = ORDER[stage]
+ return any((prev, stage) in pairs for prev in STAGES[:k])
+
+ add_rows = []
+ for st in STAGES[1:]:
+ if not _has_incoming(st):
+ out_sum = float(g.loc[g["from_stage"] == st, "count"].sum())
+ if out_sum > 0:
+ add_rows.append({"from_stage": "전체", "to_stage": st, "count": out_sum})
+ if add_rows:
+ g = pd.concat([g, pd.DataFrame(add_rows)], ignore_index=True)
+
+ # φ 스케일 적용
+ k = _flow_scale(seg, mod, loy)
+ g["flow_phi"] = g["count"].astype(float) * k
+ return g
+
+# ===== Sankey 내부용 테이블 빌더(간접 포함, 구매로 접기 옵션) =====
+
+# 노드(베이지) & 링크(회색) 팔레트
+COL_STAGE_OVERALL = "#B68E5C" # 전체
+COL_STAGE_PREF = "#C6955E" # 선호
+COL_STAGE_REC = "#D5A86D" # 추천
+COL_STAGE_INTENT = "#BE8F4E" # 의향
+COL_STAGE_BUY = "#A97F45" # 구매
+COL_LINK_DIRECT = "#4B5563" # 직접(짙은 회색)
+COL_LINK_INDIRECT = "#D1D5DB" # 간접(연한 회색)
+
+def _sankey_build_table(df_sankey, seg="ALL", mod="ALL", loy="ALL",
+ collapse_to_buy=True, collapse_from=("선호","추천","구매의향")) -> pd.DataFrame:
+ if df_sankey is None or df_sankey.empty:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"])
+
+ s = df_sankey.copy()
+
+ # --- [NEW] 호환 가드: 열 별칭을 표준 이름으로 통일 ---
+ # 1) from/to 별칭 → from_stage/to_stage
+ from_col = next((c for c in ["from_stage","from","source","src"] if c in s.columns), None)
+ to_col = next((c for c in ["to_stage","to","target","dst"] if c in s.columns), None)
+ if from_col and from_col != "from_stage":
+ s = s.rename(columns={from_col: "from_stage"})
+ if to_col and to_col != "to_stage":
+ s = s.rename(columns={to_col: "to_stage"})
+
+ # 필수 열 없으면 빈 테이블 반환 (안전 가드)
+ if "from_stage" not in s.columns or "to_stage" not in s.columns:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"])
+
+ # 2) 수치 열 별칭 → bayesian_flow_count
+ alt_cnt = next((c for c in ["bayesian_flow_count","count","value","flow","weight","n","freq"]
+ if c in s.columns), None)
+ if alt_cnt and alt_cnt != "bayesian_flow_count":
+ s = s.rename(columns={alt_cnt: "bayesian_flow_count"})
+
+
+ # 필터
+ for col, val in (("segment", seg), ("model", mod), ("loyalty", loy)):
+ if col in s.columns and str(val) != "ALL":
+ s = s[s[col].astype(str) == str(val)]
+ if s.empty:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"])
+
+ # 라벨 정규화 → 순방향만
+ s["from_stage"] = s.get("from_stage", s.get("from", s.get("source"))).map(_normalize_stage_label)
+ s["to_stage"] = s.get("to_stage", s.get("to", s.get("target"))).map(_normalize_stage_label)
+ s = s.dropna(subset=["from_stage","to_stage"])
+ s = s[s["from_stage"].isin(STAGES) & s["to_stage"].isin(STAGES)]
+ s = s[s.apply(lambda r: ORDER[r["from_stage"]] < ORDER[r["to_stage"]], axis=1)]
+
+# 🔑 count 컬럼 별칭 허용 (원천 시트/캐시 시트 모두 커버)
+ cnt_cands = ["bayesian_flow_count", "count", "value", "weight", "n", "freq"]
+ cnt_col = next((c for c in cnt_cands if c in s.columns), None)
+ if cnt_col is None:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"])
+
+ s[cnt_col] = pd.to_numeric(s[cnt_col], errors="coerce")
+ s = s[np.isfinite(s[cnt_col]) & (s[cnt_col] > 0)]
+ if s.empty:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"])
+
+ # 기본 집계
+ g = (s.groupby(["from_stage","to_stage"], as_index=False)[cnt_col]
+ .sum().rename(columns={cnt_col:"count"}))
+
+ # 유입 없는 단계 보강(전체→단계)
+ pairs = set(zip(g["from_stage"], g["to_stage"]))
+ def _has_incoming(stage):
+ k = ORDER[stage]
+ return any((prev, stage) in pairs for prev in STAGES[:k])
+ add_rows = []
+ for st in STAGES[1:]:
+ if not _has_incoming(st):
+ out_sum = float(g.loc[g["from_stage"]==st, "count"].sum())
+ if out_sum > 0:
+ add_rows.append({"from_stage":"전체","to_stage":st,"count":out_sum})
+ if add_rows:
+ g = pd.concat([g, pd.DataFrame(add_rows)], ignore_index=True)
+
+ # (옵션) 구매로 접은 간접 링크 추가: 선호/추천/구매의향 → 구매
+ if collapse_to_buy:
+ buy_in = float(pd.to_numeric(g.loc[g["to_stage"]=="구매","count"], errors="coerce").fillna(0).sum())
+ if buy_in > 0:
+ exist = set(zip(g["from_stage"], g["to_stage"]))
+ extra = []
+ for st in collapse_from:
+ if st in ORDER and (st, "구매") not in exist and ORDER[st] < ORDER["구매"]:
+ extra.append({"from_stage": st, "to_stage": "구매", "count": buy_in})
+ if extra:
+ g = pd.concat([g, pd.DataFrame(extra)], ignore_index=True)
+
+ # 메타 칼럼
+ kphi = _flow_scale(seg, mod, loy) # 비공개 스케일
+ g["flow_phi"] = g["count"].astype(float) * kphi
+ g["dist"] = g["to_stage"].map(ORDER) - g["from_stage"].map(ORDER)
+ g["kind"] = np.where(g["dist"]==1, "직접", "간접")
+ g["to_buy"] = (g["to_stage"] == "구매")
+
+ cols = ["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"]
+ return g[cols].sort_values(["dist","from_stage","to_stage"]).reset_index(drop=True)
+
+
+# ====== Sankey 색/스테이지 ======
+STAGES = ["전체","미선호","선호","추천","구매의향","구매"]
+ORDER = {v:i for i,v in enumerate(STAGES)}
+
+COL_STAGE_OVERALL = "#B68E5C"
+COL_STAGE_NONPREF = "#9CA3AF" # ← 미선호(회색)
+COL_STAGE_PREF = "#C6955E"
+COL_STAGE_REC = "#D5A86D"
+COL_STAGE_INTENT = "#BE8F4E"
+COL_STAGE_BUY = "#A97F45"
+
+COL_LINK_DIRECT = "#4B5563" # 짙은 회색 (직접)
+COL_LINK_INDIRECT = "#D1D5DB" # 연한 회색 (간접)
+# ─────────────────────────────────────────────────────────
+# (유틸) 간접 "→구매" 접기 보강
+def add_collapsed_to_buy(tbl: pd.DataFrame, add_from=("선호","추천","구매의향")) -> pd.DataFrame:
+ if tbl is None or tbl.empty:
+ return tbl
+
+ # ── 기준 단계/순서(미선호 포함 6단계)
+ stages = ["전체","미선호","선호","추천","구매의향","구매"]
+ order = {v:i for i,v in enumerate(stages)}
+ t = tbl.copy()
+
+ # ── 구매 유입 총량
+ buy_in = float(pd.to_numeric(t.loc[t["to_stage"]=="구매","count"], errors="coerce").fillna(0).sum())
+
+ # ── φ 스케일(k) 추정
+ kphi = 1.0
+ if "flow_phi" in t.columns and "count" in t.columns:
+ r = pd.to_numeric(t["flow_phi"], errors="coerce") / pd.to_numeric(t["count"], errors="coerce")
+ r = r.replace([np.inf,-np.inf], np.nan).dropna()
+ if not r.empty:
+ kphi = float(np.median(r))
+
+ # ── 그룹 메타(snapshot): 단일값이면 그 값, 아니면 "ALL"
+ meta_cols = [c for c in ["segment","model","loyalty","level"] if c in t.columns]
+ meta = {c: (t[c].dropna().iloc[0] if t[c].nunique(dropna=True)==1 else "ALL") for c in meta_cols}
+
+ extra = []
+ for s in add_from:
+ if s not in order or order[s] >= order["구매"]:
+ continue
+ # 이미 존재하면 중복 추가 금지
+ if ((t["from_stage"]==s) & (t["to_stage"]=="구매")).any():
+ continue
+ row = {
+ "from_stage": s,
+ "to_stage": "구매",
+ "count": buy_in,
+ "dist": order["구매"] - order[s],
+ "kind": ("간접" if (order["구매"] - order[s]) > 1 else "직접"),
+ "to_buy": True,
+ "flow_phi": buy_in * kphi
+ }
+ # ★ 메타 동봉
+ for c, v in meta.items():
+ row[c] = v
+ extra.append(row)
+
+ if extra:
+ t = pd.concat([t, pd.DataFrame(extra)], ignore_index=True)
+
+ return t.sort_values(["dist","from_stage","to_stage"]).reset_index(drop=True)
+# ─────────────────────────────────────────────────────────
+
+# ⬇⬇ 핵심 수정: 라벨을 먼저 느슨한 별칭으로 치환 후, 정규화 함수에 태움
+def _normalize_stage_soft(series: pd.Series) -> pd.Series:
+ if series.empty:
+ return series
+ s = series.astype(str).str.strip()
+
+ # 1) 강제 별칭(정확치환) — 의향/의도/의사/intent, 구매완료/실제구매, 전체사용자 등
+ alias_exact = {
+ # 전체
+ "전체사용자": "전체", "모든 사용자": "전체", "all": "전체", "ALL": "전체",
+ # 선호
+ "선호도": "선호", "선호도높음": "선호", "호감도": "선호", "호감도높음": "선호",
+ # 추천
+ "추천도": "추천", "추천도높음": "추천",
+ # 의향/의도/의사/intent (다양형)
+ "구매의향": "구매의향", "구매 의향": "구매의향", "구매의향높음": "구매의향", "구매의향 높음": "구매의향",
+ "구매의도": "구매의향", "구매 의도": "구매의향", "구매의도높음": "구매의향", "구매의도 높음": "구매의향",
+ "구매의사": "구매의향", "의사 있음": "구매의향",
+ "intent": "구매의향", "Intent": "구매의향", "Intention": "구매의향",
+ "Purchase Intent": "구매의향", "PURCHASE_INTENT": "구매의향",
+ # 구매
+ "실제구매": "구매", "구매 확정": "구매", "구매확정": "구매", "구매 완료": "구매", "구매완료": "구매",
+ "결제": "구매", "결재": "구매", "매출": "구매",
+ #미선호
+ "미선호": "미선호", "비선호": "미선호", "선호 아님": "미선호", "탈락": "미선호",
+ }
+ s = s.replace(alias_exact)
+
+ # 2) 토큰/부분일치 기반 정규화(전역 함수가 있으면 재사용)
+ def _norm_one(x: str) -> str | None:
+ try:
+ return _normalize_stage_label(x) # 전역 정의 존재 시 활용
+ except Exception:
+ pass
+ # 폴백: 부분일치
+ xl = x.lower().replace(" ", "")
+ if any(k in xl for k in ["all","전체"]): return "전체"
+ if any(k in xl for k in ["선호","호감"]): return "선호"
+ if "추천" in xl or "rec" in xl: return "추천"
+ if any(k in xl for k in ["의향","의도","의사","intent"]): return "구매의향"
+ if any(k in xl for k in ["구매","구입","결제","결재","완료","확정","매출","purch","buy"]): return "구매"
+ if any(k in xl for k in ["미선호","비선호","선호��님","nopref","npreference","탈락","drop"]): return "미선호"
+ return None
+
+ return s.map(_norm_one)
+
+
+# 파일 상단 어딘가(상수들 근처)에 추가
+LVL_PRIORITY = [
+ "모델×세그×충성도","세그×모델","모델×충성도","세그×충성도",
+ "모델","세그먼트","충성도","전체"
+]
+
+def _sanitize_sankey_table(
+ tbl: pd.DataFrame,
+ seg="ALL", mod="ALL", loy="ALL",
+ enforce_single_level: bool = True,
+ drop_overall_if_mixed: bool = True
+) -> pd.DataFrame:
+ cols = ["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"]
+ if tbl is None or tbl.empty:
+ return pd.DataFrame(columns=cols)
+
+ t = tbl.copy()
+
+ # (1) 선택값 필터 (있을 때만)
+ for col, val in (("segment", seg), ("model", mod), ("loyalty", loy)):
+ if col in t.columns and str(val) != "ALL":
+ t = t[t[col].astype(str).str.strip() == str(val)]
+ if t.empty:
+ return pd.DataFrame(columns=cols)
+
+ # (2) 레벨 단일화 (혼입 방지) + 과잉 드랍 완화
+ original = t
+ if enforce_single_level and "level" in t.columns:
+ picked = None
+ for lv in LVL_PRIORITY:
+ cand = t[t["level"].astype(str) == lv]
+ if not cand.empty:
+ picked = cand; break
+ if picked is not None:
+ t = picked
+ # 혼합이면 '전체'만 제거 (단, 전부 비면 되돌림)
+ if ("level" in t.columns) and (t["level"].astype(str).nunique() > 1) and drop_overall_if_mixed:
+ t2 = t[t["level"].astype(str) != "전체"]
+ if not t2.empty:
+ t = t2
+
+ if t.empty:
+ # 과잉 필터/드랍으로 비었으면 원본으로 되돌려 계속
+ t = original.copy()
+
+ # (3) 컬럼 별칭
+ alias = {
+ "from_stage": ["from_stage","from","source","src"],
+ "to_stage": ["to_stage","to","target","dst"],
+ "count": ["count","bayesian_flow_count","flow","value","weight","n","freq"],
+ }
+ def pick(name):
+ keys = {str(c).strip().lower(): c for c in t.columns}
+ for a in alias[name]:
+ if a in keys: return keys[a]
+ return None
+ c_from = pick("from_stage"); c_to = pick("to_stage"); c_cnt = pick("count")
+ if not all([c_from, c_to, c_cnt]):
+ return pd.DataFrame(columns=cols)
+
+ t = t.rename(columns={c_from:"from_stage", c_to:"to_stage", c_cnt:"count"})
+
+ # (4) 라벨 정규화 + 순방향만
+ t["from_stage"] = _normalize_stage_soft(t["from_stage"])
+ t["to_stage"] = _normalize_stage_soft(t["to_stage"])
+ t = t.dropna(subset=["from_stage","to_stage"])
+ t = t[t["from_stage"].isin(STAGES) & t["to_stage"].isin(STAGES)]
+ t = t[t.apply(lambda r: ORDER[r["from_stage"]] < ORDER[r["to_stage"]], axis=1)]
+
+ # (5) 수치 변환
+ t["count"] = pd.to_numeric(t["count"], errors="coerce")
+ t = t[np.isfinite(t["count"]) & (t["count"] > 0)]
+
+ # (5-보강) 과도 필터로 비면 완화 모드: 단계 조건만 적용하고 수치만 보정
+ if t.empty:
+ t = original.rename(columns={c_from:"from_stage", c_to:"to_stage", c_cnt:"count"}).copy()
+ t["from_stage"] = _normalize_stage_soft(t["from_stage"])
+ t["to_stage"] = _normalize_stage_soft(t["to_stage"])
+ t = t.dropna(subset=["from_stage","to_stage"])
+ t = t[t["from_stage"].isin(STAGES) & t["to_stage"].isin(STAGES)]
+ t["count"] = pd.to_numeric(t["count"], errors="coerce").fillna(0)
+ t = t[t["count"] > 0]
+ if t.empty:
+ return pd.DataFrame(columns=cols)
+
+ # (6) 메타 보강
+ t["dist"] = (t["to_stage"].map(ORDER) - t["from_stage"].map(ORDER)).astype(int)
+ if "kind" not in t.columns:
+ t["kind"] = np.where(t["dist"]==1, "직접", "간접")
+ else:
+ miss = ~t["kind"].astype(str).isin(["직접","간접"])
+ t.loc[miss,"kind"] = np.where(t.loc[miss,"dist"]==1, "직접","간접")
+ t["to_buy"] = (t["to_stage"]=="구매")
+
+ # (7) φ
+ kphi = _flow_scale(seg, mod, loy)
+ if "flow_phi" not in t.columns:
+ t["flow_phi"] = t["count"].astype(float) * kphi
+ else:
+ t["flow_phi"] = pd.to_numeric(t["flow_phi"], errors="coerce")
+ miss = ~np.isfinite(t["flow_phi"])
+ t.loc[miss, "flow_phi"] = t.loc[miss, "count"].astype(float) * kphi
+
+ return t[cols].sort_values(["dist","from_stage","to_stage"]).reset_index(drop=True)
+
+
+def _sankey_from_master_row(row: pd.Series, seg, mod, loy) -> pd.DataFrame:
+ n = _safe_int0(row.get("pref_sample_size"))
+ if n <= 0:
+ return pd.DataFrame(columns=[
+ "from_stage","to_stage","count","dist","kind","to_buy","flow_phi",
+ "segment","model","loyalty"
+ ])
+
+ def P(x):
+ v = _safe_num(x)
+ if not np.isfinite(v): return np.nan
+ return v/100.0 if v > 1.5 else v
+
+ # (A) 확률 안전화: NaN이면 0, 0~1로 클립
+ def P01(x):
+ v = P(x)
+ return np.nan if not np.isfinite(v) else float(min(1.0, max(0.0, v)))
+
+ p_pref = P(row.get("pref_success_rate"))
+ p_rec = P(row.get("rec_success_rate"))
+ p_intent = P(row.get("intent_success_rate"))
+ p_buy = P(row.get("buy_success_rate"))
+ d1 = P(row.get("bayesian_dropout_pref_to_rec"))
+ d2 = P(row.get("bayesian_dropout_rec_to_intent"))
+ d3 = P(row.get("bayesian_dropout_intent_to_buy"))
+
+ pref = n * (p_pref if np.isfinite(p_pref) else 0.0)
+ rec = pref * (1 - d1) if np.isfinite(pref) and np.isfinite(d1) else n * (p_rec if np.isfinite(p_rec) else 0.0)
+ intent = rec * (1 - d2) if np.isfinite(rec) and np.isfinite(d2) else n * (p_intent if np.isfinite(p_intent) else 0.0)
+ buy = intent*(1 - d3) if np.isfinite(intent) and np.isfinite(d3) else n * (p_buy if np.isfinite(p_buy) else 0.0)
+
+ drop0 = max(0.0, float(n) - float(pref))
+
+ rows = [
+ {"from_stage":"전체","to_stage":"미선호", "count": drop0},
+ {"from_stage":"전체","to_stage":"선호", "count": pref},
+ {"from_stage":"선호","to_stage":"추천", "count":max(0.0, rec)},
+ {"from_stage":"추천","to_stage":"구매의향", "count":max(0.0, intent)},
+ {"from_stage":"구매의향","to_stage":"구매", "count":max(0.0, buy)},
+ ]
+
+ g = pd.DataFrame(rows).dropna()
+ g["count"] = pd.to_numeric(g["count"], errors="coerce").fillna(0)
+ g = g[g["count"] > 0]
+ g["dist"] = g["to_stage"].map(ORDER) - g["from_stage"].map(ORDER)
+ g["kind"] = np.where(g["dist"]==1, "직접", "간접")
+ g["to_buy"] = (g["to_stage"]=="구매")
+ kphi = _flow_scale(seg, mod, loy)
+ g["flow_phi"] = g["count"].astype(float) * kphi
+ g["segment"] = seg; g["model"] = mod; g["loyalty"] = loy
+ return g[[
+ "from_stage","to_stage","count","dist","kind","to_buy","flow_phi",
+ "segment","model","loyalty"
+ ]]
+
+LEVELS_FOR_SANKEY = [
+ ("전체", []),
+ ("세그먼트", ["segment"]),
+ ("모델", ["model"]),
+ ("충성도", ["loyalty"]),
+ ("세그×모델", ["segment","model"]),
+ ("세그×충성도", ["segment","loyalty"]),
+ ("모델×충성도", ["model","loyalty"]),
+ ("모델×세그×충성도", ["segment","model","loyalty"]),
+]
+
+def build_sankey_cache_from_master(df_master: pd.DataFrame,
+ collapse_to_buy=True,
+ collapse_from=("선호","추천","구매의향")) -> pd.DataFrame:
+ dfm = _ensure_key_cols(df_master).copy()
+ out = []
+ for _lvl, keys in LEVELS_FOR_SANKEY:
+ if not keys:
+ seg, mod, loy = "ALL","ALL","ALL"
+ row = compose_composite_row(dfm)
+ if not row.empty:
+ part = _sankey_from_master_row(row, seg, mod, loy)
+ part["level"] = _lvl
+ out.append(part)
+ continue
+
+ for vals, grp in dfm.groupby(keys, dropna=False):
+ if not isinstance(vals, tuple): vals = (vals,)
+ seg = vals[keys.index("segment")] if "segment" in keys else "ALL"
+ mod = vals[keys.index("model")] if "model" in keys else "ALL"
+ loy = vals[keys.index("loyalty")] if "loyalty" in keys else "ALL"
+ row = compose_composite_row(grp)
+ if row.empty:
+ continue
+ part = _sankey_from_master_row(row, seg, mod, loy)
+ part["level"] = _lvl
+ out.append(part)
+
+ if not out:
+ return pd.DataFrame(columns=[
+ "from_stage","to_stage","count","dist","kind","to_buy","flow_phi",
+ "segment","model","loyalty","level"
+ ])
+
+ full = pd.concat(out, ignore_index=True)
+ if collapse_to_buy and not full.empty:
+ full = (full.groupby(["level","segment","model","loyalty"], group_keys=False)
+ .apply(lambda g: add_collapsed_to_buy(g, add_from=collapse_from))
+ .reset_index(drop=True))
+ return full
+
+def build_sankey_flow_table(
+ df_or_tbl: pd.DataFrame | None,
+ seg="ALL", mod="ALL", loy="ALL",
+ collapse_to_buy=True,
+ collapse_from=("선호","추천","구매의향")
+):
+ if df_or_tbl is None or df_or_tbl.empty:
+ return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"])
+
+ s = df_or_tbl.copy()
+ low = {str(c).strip().lower(): c for c in s.columns}
+
+ looks_table = (("from_stage" in low and "to_stage" in low) and
+ (("count" in low) or ("flow_phi" in low) or ("bayesian_flow_count" in low)))
+
+ if looks_table:
+ t = _sanitize_sankey_table(
+ s, seg=seg, mod=mod, loy=loy,
+ enforce_single_level=True, drop_overall_if_mixed=True
+ )
+ if collapse_to_buy:
+ t = add_collapsed_to_buy(t, add_from=collapse_from)
+ return t
+
+ return _sankey_build_table(
+ s, seg=seg, mod=mod, loy=loy,
+ collapse_to_buy=collapse_to_buy, collapse_from=collapse_from
+ )
+
+
+def sankey_figure(
+ df_sankey: pd.DataFrame | None,
+ seg, mod, loy,
+ normalize=False, base_stage="전체",
+ drag=False, show_kind=True,
+ table_override: pd.DataFrame | None = None,
+):
+ # ── 0) 레거시/실수 호환: normalize 자리에 DataFrame이 들어온 경우 보정
+ # (스모크 테스트에서 positional로 override가 들어오는 패턴 방지)
+ if isinstance(normalize, pd.DataFrame) and table_override is None:
+ table_override = normalize
+ normalize = False # 의미 없는 값이었으므로 안전 기본값
+
+ # ── 1) 테이블 소스 선택
+ if table_override is not None:
+ # override가 raw여도 안전하게 정규화/보강
+ g = _sanitize_sankey_table(table_override, seg=seg, mod=mod, loy=loy)
+ else:
+ g = build_sankey_flow_table(df_sankey, seg=seg, mod=mod, loy=loy, collapse_to_buy=True)
+
+ if g is None or g.empty:
+ return _empty_fig("No Sankey data")
+
+ # ── 2) 색/인덱스 준비
+ idx = {v:i for i,v in enumerate(STAGES)}
+
+ STAGE_COLOR = {
+ "전체": COL_STAGE_OVERALL,
+ "미선호": COL_STAGE_NONPREF,
+ "선호": COL_STAGE_PREF,
+ "추천": COL_STAGE_REC,
+ "구매의향": COL_STAGE_INTENT,
+ "구매": COL_STAGE_BUY,
+ }
+
+ # ★ 여기 한 줄: Sankey에서 '전체'만 검정으로
+ STAGE_COLOR["전체"] = "#000000" # 또는 COL_BLACK
+ node_colors = [STAGE_COLOR[s] for s in STAGES]
+
+ # ✅ 노드 x 좌표도 6개로
+ xs = [0.00, 0.18, 0.34, 0.54, 0.74, 0.94]
+
+ # ── 3) 그림
+ fig = go.Figure()
+ fig.add_trace(go.Sankey(
+ arrangement=("freeform" if drag else "fixed"),
+ valueformat=",.1f", valuesuffix=" φ",
+ node=dict(
+ pad=14, thickness=18, label=STAGES,
+ x=xs, y=[0.50]*len(STAGES),
+ color=node_colors, line=dict(color="#9aa0a6", width=0.7),
+ ),
+ link=dict(
+ source=[idx[a] for a in g["from_stage"]],
+ target=[idx[b] for b in g["to_stage"]],
+ value=g["flow_phi"].astype(float).tolist(),
+ color=(
+ np.where(g["kind"].astype(str)=="직접",
+ hex_to_rgba(COL_LINK_DIRECT, 0.90),
+ hex_to_rgba(COL_LINK_INDIRECT, 0.70))
+ if show_kind else [hex_to_rgba(COL_LINK_DIRECT, 0.85)] * len(g)
+ ).tolist(),
+ customdata=np.stack([
+ g["kind"].astype(str).to_numpy(),
+ g["dist"].astype(int).to_numpy(),
+ g["count"].astype(float).to_numpy(),
+ ], axis=-1),
+ hovertemplate=(
+ "%{customdata[0]} | %{source.label} → %{target.label}"
+ "
점프: %{customdata[1]}단계"
+ "
실제유량: %{customdata[2]:,} (표시 %{value:,.1f} φ)"
+ ""
+ ),
+ ),
+ ))
+
+ if show_kind:
+ fig.add_trace(go.Scatter(x=[None], y=[None], mode="markers",
+ marker=dict(size=10, color=hex_to_rgba(COL_LINK_DIRECT, 0.90)), name="직접(인접)"))
+ fig.add_trace(go.Scatter(x=[None], y=[None], mode="markers",
+ marker=dict(size=10, color=hex_to_rgba(COL_LINK_INDIRECT, 0.70)), name="간접(스킵)"))
+
+ base = base_stage if base_stage in STAGES else "전체"
+ tot_dir = float(g.loc[g["kind"]=="직접", "flow_phi"].sum())
+ tot_ind = float(g.loc[g["kind"]=="간접", "flow_phi"].sum())
+ # sankey_figure 끝부분
+ fig.update_layout(
+ title=f"Journey Sankey · 모든 순방향(스킵 포함) · 기준={base}",
+ height=390, showlegend=True,
+ paper_bgcolor="#fff", plot_bgcolor="#fff",
+ font=dict(color="#111"),
+ margin=dict(l=10, r=10, t=32, b=64),
+ )
+ fig.add_annotation(
+ x=0, y=-0.20, xref="paper", yref="paper",
+ showarrow=False, align="left",
+ text=f"직접 {tot_dir:,.1f} φ · 간접 {tot_ind:,.1f} φ",
+ font=dict(size=11, color="#444")
+ )
+
+ # ↓↓↓ 이 네 줄은 반드시 함수 안쪽(같은 들여쓰기 레벨)이어야 함
+ fig = apply_dense_grid(fig) # 공통 스타일
+
+ # Sankey 전용: 축 감추기(카테시안 축 없음)
+ fig.update_xaxes(visible=False, showgrid=False, zeroline=False, fixedrange=True)
+ fig.update_yaxes(visible=False, showgrid=False, zeroline=False, fixedrange=True)
+
+ return fig
+
+
+# ==== STAGE COLORS (전체→선호→추천→의향→구매) ====
+COL_STAGE_OVERALL = "#C32C2C" # 빨
+COL_STAGE_PREF = "#D24D3E" # 주
+COL_STAGE_REC = "#DE937A" # 노
+COL_STAGE_INTENT = "#D49442" # 베(골드톤)
+COL_STAGE_BUY = "#2B8E81" # 초록 ← 오타 수정
+COL_STAGE_NONPREF = "#9CA3AF" # 미선호(회색)
+
+
+def matrix_funnel_figure(row, df_tm, seg, mod, loy, **kwargs):
+ """
+ 누적 퍼널:
+ - 퍼센트 문자열(예: '45.5%')/공백 섞여도 robust parsing
+ - 값이 비어도(drop/success 둘 다 NaN) 최소 2단계 이상 강제로 그려줌
+ - 기본 높이 420 (FUNNEL_H가 있으면 그 값 따름)
+ """
+ # --- Robust percent parser -------------------------------------------------
+ def _p(x):
+ if x is None:
+ return np.nan
+ if isinstance(x, str):
+ s = x.strip()
+ if not s:
+ return np.nan
+ if s.endswith("%"):
+ try:
+ return float(s[:-1].strip()) / 100.0
+ except Exception:
+ return np.nan
+ try:
+ return float(s)
+ except Exception:
+ return np.nan
+ try:
+ x = float(x)
+ except Exception:
+ return np.nan
+ # 1.5 초과면 퍼센트로 간주(23 => 0.23)
+ return x / 100.0 if x > 1.5 else x
+
+ def _clip01(v):
+ return np.nan if not np.isfinite(v) else float(min(1.0, max(0.0, v)))
+
+ # --- 1) 드롭/최종율 확보 ---------------------------------------------------
+ d1_raw, d2_raw, d3_raw, full_raw = drops_from_anywhere(row, df_tm, seg, mod, loy)
+ d1, d2, d3 = map(_clip01, map(_p, (d1_raw, d2_raw, d3_raw)))
+ full_conv = _p(full_raw)
+
+ # --- 2) 단계별 성공률 ------------------------------------------------------
+ pref_sr = _p(row.get("pref_success_rate"))
+ rec_sr = _p(row.get("rec_success_rate"))
+ intent_sr = _p(row.get("intent_success_rate"))
+ buy_sr = _p(row.get("buy_success_rate"))
+
+ # --- 3) 누적율 계산(드롭우선, 결측 폴백) -----------------------------------
+ overall = 1.0
+ pref = pref_sr
+ rec = pref * (1 - d1) if np.isfinite(pref) and np.isfinite(d1) else rec_sr
+ intent = rec * (1 - d2) if np.isfinite(rec) and np.isfinite(d2) else intent_sr
+
+ if np.isfinite(intent) and np.isfinite(d3):
+ buy = intent * (1 - d3)
+ elif np.isfinite(buy_sr):
+ buy = buy_sr
+ elif np.isfinite(full_conv):
+ buy = full_conv
+ else:
+ buy = intent
+
+ # 단조감소 보장 + [0,1] 클리핑
+ seq = [overall, _clip01(pref), _clip01(rec), _clip01(intent), _clip01(buy)]
+ for i in range(1, len(seq)):
+ if np.isfinite(seq[i]) and np.isfinite(seq[i-1]) and seq[i] > seq[i-1]:
+ seq[i] = seq[i-1]
+ overall, pref, rec, intent, buy = seq
+
+ # --- 4) 라벨/값 구성(비어도 항상 그리기) -----------------------------------
+ labels, values = ["전체"], [overall]
+ if np.isfinite(pref): labels.append("선호"); values.append(pref)
+ if np.isfinite(rec): labels.append("추천"); values.append(rec)
+ if np.isfinite(intent): labels.append("구매의향"); values.append(intent)
+ if np.isfinite(buy): labels.append("구매"); values.append(buy)
+
+ if len(labels) <= 1:
+ # 드롭률 기반으로 최소 2단계라도 구성
+ v = [1.0]
+ if np.isfinite(d1): v.append(v[-1]*(1-d1))
+ if np.isfinite(d2): v.append(v[-1]*(1-d2))
+ if np.isfinite(d3): v.append(v[-1]*(1-d3))
+ if len(v) == 1:
+ est = _clip01(buy_sr if np.isfinite(buy_sr) else full_conv)
+ v.append(0.0 if not np.isfinite(est) else est)
+ names = ["전체","선호","추천","구매의향","구매"][:len(v)]
+ labels, values = names, v
+
+ txtpos = ["inside" if v >= 0.07 else "outside" for v in values]
+
+ color_map = {
+ "전체": hex_to_rgba(COL_STAGE_OVERALL, 0.85),
+ "선호": hex_to_rgba(COL_STAGE_PREF, 0.85),
+ "추천": hex_to_rgba(COL_STAGE_REC, 0.85),
+ "구매의향": hex_to_rgba(COL_STAGE_INTENT, 0.85),
+ "구매": hex_to_rgba(COL_STAGE_BUY, 0.85),
+ }
+ colors = [color_map.get(l, hex_to_rgba(COL_GRAY, 0.85)) for l in labels]
+
+ fig = go.Figure(go.Funnel(
+ y=labels,
+ x=values,
+ name="누적율",
+ customdata=values,
+ textinfo="none",
+ texttemplate="%{customdata:.1%}",
+ textposition=txtpos,
+ hovertemplate="%{label}: %{customdata:.1%}",
+ marker=dict(color=colors, line=dict(width=0.6, color="rgba(0,0,0,0.25)")),
+ connector=dict(line=dict(color="rgba(0,0,0,0.25)", width=0.6)),
+ ))
+
+ # — 높이 확장 & 여백 다이어트
+ fig.update_layout(
+ title="Funnel (누적율)",
+ height=FUNNEL_H if 'FUNNEL_H' in globals() else 420,
+ margin=dict(l=6, r=6, t=26, b=14),
+ paper_bgcolor="#ffffff",
+ plot_bgcolor="#ffffff",
+ )
+ fig.update_xaxes(dtick=_auto_dtick(1.0), tickformat=".0%")
+
+ return apply_dense_grid(fig, x_prob=True)
+
+def survival_curve_figure(row, df_tm, seg, mod, loy):
+ d1, d2, d3, _ = drops_from_anywhere(row, df_tm, seg, mod, loy)
+ vals = [1.0]
+ if np.isfinite(d1): vals.append(vals[-1]*(1-d1))
+ if np.isfinite(d2): vals.append(vals[-1]*(1-d2))
+ if np.isfinite(d3): vals.append(vals[-1]*(1-d3))
+ if len(vals) == 1: return _empty_fig("No Survival data")
+ stages = ["Start","선호","추천","��매의향","구매"][:len(vals)]
+ xs = list(range(len(vals)))
+ fig = go.Figure()
+ fig.add_trace(go.Scatter(
+ x=xs, y=vals, mode="lines+markers",
+ line=dict(width=3, color=COL_GRAY), marker=dict(color=COL_GREEN_LITE),
+ hovertemplate="단계=%{text}
생존=%{y:.1%}", text=stages, name="생존확률"
+ ))
+ drops = [d1,d2,d3]
+ for i, dv in enumerate(drops, start=1):
+ if i < len(vals) and np.isfinite(dv):
+ fig.add_annotation(x=i-0.5, y=(vals[i-1]+vals[i])/2,
+ text=f"실패 {dv:.1%}", showarrow=False,
+ font=dict(size=11, color=COL_ORANGE))
+ fig.update_layout(height=320, title="스테이지 생존 커브",
+ xaxis=dict(tickmode="array", tickvals=xs, ticktext=stages),
+ yaxis=dict(range=[0,1], tickformat=".1%"))
+ return apply_dense_grid(fig, y_prob=True)
+
+def waterfall_figure(row, df_tm, seg, mod, loy):
+ d1, d2, d3, full = drops_from_anywhere(row, df_tm, seg, mod, loy)
+
+ def _as_prob(p):
+ p = _safe_num(p)
+ if not np.isfinite(p): return np.nan
+ return p/100.0 if p > 1.5 else p
+
+ d1, d2, d3 = map(_as_prob, [d1, d2, d3])
+ buy_sr = _as_prob(row.get("buy_success_rate"))
+ intent = _as_prob(row.get("intent_success_rate"))
+ full_in = _as_prob(full)
+
+ # 최종 구매율 보정
+ full = full_in
+ if not np.isfinite(full):
+ if np.isfinite(buy_sr): full = buy_sr
+ elif np.isfinite(intent) and np.isfinite(d3): full = intent * (1.0 - d3)
+ elif all(np.isfinite([d1, d2, d3])): full = (1.0 - d1) * (1.0 - d2) * (1.0 - d3)
+
+ # 절대 드롭
+ if all(np.isfinite([d1, d2, d3])):
+ drop1 = 1.0 * d1
+ drop2 = (1.0 - d1) * d2
+ drop3 = (1.0 - d1) * (1.0 - d2) * d3
+ else:
+ drop1 = d1 if np.isfinite(d1) else 0.0
+ drop2 = d2 if np.isfinite(d2) else 0.0
+ drop3 = d3 if np.isfinite(d3) else 0.0
+
+ # 최종율 미지정이면 드롭 합으로 보정
+ final_rate = float(full) if np.isfinite(full) else max(0.0, 1.0 - drop1 - drop2 - drop3)
+ if not any(np.isfinite(v) for v in [drop1, drop2, drop3]) and not np.isfinite(final_rate):
+ return _empty_fig("No Waterfall data")
+
+ def _fmt_drop(v):
+ return "" if not np.isfinite(v) else (f"-{v:.1%}" if v >= 1e-6 else "-0.0%")
+
+ # ★ 여기부터: '전체 100%' 막대 제거 버전
+ measures = ["relative", "relative", "relative", "total"]
+ x = ["선호→추천
Drop", "추천→구매의향
Drop", "구매의향→구매
Drop", "구매율"]
+ y = [-drop1, -drop2, -drop3, final_rate]
+ texts = [_fmt_drop(drop1), _fmt_drop(drop2), _fmt_drop(drop3), f"{final_rate:.1%}"]
+ positions = ["inside", "inside", "inside", "outside"]
+
+ fig = go.Figure(go.Waterfall(
+ measure=measures, x=x, y=y,
+ name="drop-off",
+ text=texts, textposition=positions,
+ insidetextfont=dict(color="white"),
+ outsidetextfont=dict(color="#111"),
+ decreasing={"marker":{"color": COL_GRAY_MED}},
+ increasing={"marker":{"color": COL_GRAY_MED}},
+ totals={"marker":{"color": COL_BLUE_DEEP}},
+ connector={"line":{"color":"rgba(0,0,0,0.25)", "width":0.6}},
+ cliponaxis=False, constraintext="both"
+ ))
+
+ fig.update_layout(
+ height=320,
+ title="드롭오프 워터폴",
+ yaxis_tickformat=".1%",
+ xaxis=dict(tickangle=0, automargin=True),
+ margin=dict(l=8, r=8, t=30, b=14), # 좌우 여백 살짝 더 줄임
+ uniformtext_minsize=9, uniformtext_mode="hide",
+ )
+
+ # 공통 스타일 먼저
+ fig = apply_dense_grid(fig, y_prob=True)
+
+ # ── 워터폴 가독성 튜닝(Apply 후 다시 덮어쓰기)
+ fig.update_layout(
+ showlegend=False, # 범례 숨겨 상단 공간 확보
+ bargap=0.15, # 바 사이 간격 축소 → 막대가 두툼하게
+ margin=dict(l=8, r=8, t=30, b=14),
+ )
+ fig.update_xaxes(automargin=True)
+
+ return fig
+
+
+def stacked_funnel_figure(row):
+ stages = [("선호", "pref_success_rate"), ("추천", "rec_success_rate"),
+ ("구매의향", "intent_success_rate"), ("구매", "buy_success_rate")]
+ succ = []; fail = []; labs=[]
+ for lab, col in stages:
+ p = _safe_num(row.get(col))
+ if np.isfinite(p):
+ succ.append(p); fail.append(1-p); labs.append(lab)
+ if not succ: return _empty_fig("No Funnel data")
+ fig = go.Figure()
+ fig.add_bar(x=labs, y=succ, name="성공", text=[f"{v:.1%}" for v in succ], textposition="inside",
+ marker_color=COL_GREEN_LITE)
+ fig.add_bar(x=labs, y=fail, name="실패", text=[f"{v:.1%}" for v in fail], textposition="inside",
+ marker_color=COL_RED)
+ fig.update_layout(barmode="stack", yaxis=dict(range=[0,1], tickformat=".1%"),
+ height=320, title="100% 스택 퍼널 (성공/실패)")
+ return apply_dense_grid(fig, y_prob=True)
+
+def forest_figure(df_scope: pd.DataFrame):
+ if df_scope is None or df_scope.empty:
+ return _empty_fig("No Forest data")
+
+ if not {"model", "segment"}.issubset(set(df_scope.columns)):
+ return _empty_fig("Need 'model' and 'segment'")
+
+ s = df_scope.copy()
+
+ # ----- 1) 사용할 단계(성공률) 선택: buy → intent → rec → pref → success_rate → rate
+ stage_order = [
+ ("buy", "buy_success_rate"),
+ ("intent", "intent_success_rate"),
+ ("rec", "rec_success_rate"),
+ ("pref", "pref_success_rate"),
+ ("", "success_rate"),
+ ("", "rate"),
+ ]
+ stage = ""
+ rate_col = None
+ for st, col in stage_order:
+ if col in s.columns:
+ stage, rate_col = st, col
+ break
+ if rate_col is None:
+ return _empty_fig("No rate column")
+
+ # ----- 2) 표본(n) 컬럼 찾기(단계별 우선, 없으면 일반 표본명으로 폴백)
+ def _find_n_col(stage_name: str) -> str | None:
+ cands = []
+ if stage_name:
+ cands += [f"{stage_name}_sample_size", f"{stage_name}_n", f"{stage_name}_total"]
+ cands += ["sample_size", "n", "N", "total", "count", "nobs", "베이스수", "표본수", "pref_sample_size"]
+ for c in cands:
+ if c in s.columns:
+ return c
+ return None
+
+ n_col = _find_n_col(stage)
+ if n_col is None:
+ return _empty_fig("No sample size column")
+
+ # ----- 3) 숫자화 + 비율 정규화
+ s[rate_col] = pd.to_numeric(s[rate_col], errors="coerce")
+ s[n_col] = pd.to_numeric(s[n_col], errors="coerce")
+ s = s.dropna(subset=[rate_col, n_col])
+ if s.empty:
+ return _empty_fig("No Forest values")
+
+ r = np.where(s[rate_col] > 1.5, s[rate_col] / 100.0, s[rate_col]) # % → 비율
+ r = np.clip(r, 0.0, 1.0)
+ n = np.clip(s[n_col].to_numpy().astype(float), 0.0, np.inf)
+ k = np.clip(np.round(r * n), 0.0, n) # 성공 수 추정
+
+ # ----- 4) 모델 단위로 집계(중복 y축 제거)
+ agg = (pd.DataFrame({
+ "model": s["model"].astype(str),
+ "segment": s["segment"].astype(str),
+ "k": k, "n": n
+ })
+ .groupby("model", as_index=False)
+ .agg(k=("k","sum"), n=("n","sum"), seg=("segment", lambda x: x.iloc[0])))
+
+ if agg.empty or not np.isfinite(agg["n"]).any():
+ return _empty_fig("No Forest values")
+
+ # ----- 5) Jeffreys 95% CI
+ alpha = 0.05
+ try:
+ from scipy.stats import beta as _beta
+ agg["p"] = (agg["k"] + 0.5) / (agg["n"] + 1.0)
+ agg["lo"] = _beta.ppf(alpha/2, agg["k"] + 0.5, agg["n"] - agg["k"] + 0.5)
+ agg["hi"] = _beta.ppf(1 - alpha/2, agg["k"] + 0.5, agg["n"] - agg["k"] + 0.5)
+ except Exception:
+ try:
+ from statsmodels.stats.proportion import proportion_confint
+ agg["p"] = (agg["k"] + 0.5) / (agg["n"] + 1.0)
+ lo, hi = proportion_confint(agg["k"], agg["n"], alpha=alpha, method="beta")
+ agg["lo"], agg["hi"] = lo, hi
+ except Exception:
+ # Wilson 폴백
+ z = 1.959963984540054
+ p = agg["k"] / agg["n"]
+ denom = 1 + z*z/agg["n"]
+ center = (p + z*z/(2*agg["n"])) / denom
+ half = z*np.sqrt((p*(1-p) + z*z/(4*agg["n"])) / agg["n"]) / denom
+ agg["p"] = p
+ agg["lo"] = np.maximum(0.0, center - half)
+ agg["hi"] = np.minimum(1.0, center + half)
+
+ use = agg.sort_values("p").reset_index(drop=True)
+
+ # ----- 6) 색(모델의 우세 세그먼트) 지정
+ dom_seg = _model_dominant_segment(df_scope)
+ mapped_seg = use["model"].map(dom_seg).fillna(use["seg"])
+ colors = mapped_seg.apply(_tier_color_for_segment).tolist()
+
+ err_plus = (use["hi"] - use["p"]).to_numpy()
+ err_minus = (use["p"] - use["lo"]).to_numpy()
+
+ # ----- 7) 플롯
+ fig = go.Figure()
+ fig.add_trace(go.Scatter(
+ x=use["p"].astype(float),
+ y=use["model"].astype(str),
+ mode="markers",
+ name="모델", # ← trace 이름 지정 (trace 0 제거)
+ hovertemplate="%{y}: %{x:.1%}",
+ marker=dict(size=10, color=colors, line=dict(color=COL_BLACK, width=1.6)),
+ ))
+ fig.update_traces(error_x=dict(
+ type="data", symmetric=False,
+ array=err_plus, arrayminus=err_minus,
+ color=COL_BLACK, thickness=1.2, width=3
+ ))
+ add_vline_safe(fig, 0.5, line_dash="dot", line_color=COL_BLACK, opacity=0.4)
+ fig.update_layout(
+ height=320,
+ title="포레스트 플롯 (모델 비교) — 95% CI",
+ xaxis=dict(range=[0, 1], dtick=0.1, tickformat=".0%", title="성공률"),
+ margin=dict(l=10, r=10, t=54, b=18),
+ showlegend=False,
+)
+ fig = apply_dense_grid(fig, x_prob=True)
+ fig.update_layout(margin=dict(l=10, r=10, t=78, b=24)) # 상단 여백 키움
+ fig.update_yaxes(domain=[0.12, 1.00]) # 위쪽 12% 비워서 아래로 내림
+ return fig
+
+def compare_distribution_figure(df_master, seg, mod, loy, stage_label):
+ if df_master is None or df_master.empty:
+ return _empty_fig("No Ranking data")
+
+ seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy)
+
+ stage2lift = {
+ "선호": "pref_lift_vs_galaxy",
+ "추천": "rec_lift_vs_galaxy",
+ "구매의향": "intent_lift_vs_galaxy",
+ "구매": "buy_lift_vs_galaxy",
+ }
+ lift_col = stage2lift.get(stage_label, "buy_lift_vs_galaxy")
+ if lift_col not in df_master.columns:
+ return _empty_fig("No lift column")
+
+ # 1) 비교 축 고르기
+ candidates = []
+ if mod == "ALL": candidates.append("model")
+ if seg == "ALL": candidates.append("segment")
+ if loy == "ALL": candidates.append("loyalty")
+
+ key = None
+ for k in candidates:
+ if k in df_master.columns and df_master[k].astype(str).nunique(dropna=True) > 1:
+ key = k
+ break
+ if key is None:
+ # fallback: 유니크 가장 많은 축
+ avail = [c for c in ["model","segment","loyalty"] if c in df_master.columns]
+ if not avail:
+ return _empty_fig("No grouping key")
+ key = max(avail, key=lambda c: df_master[c].astype(str).nunique(dropna=True))
+
+ # 2) 전체/선택 집계
+ overall = (df_master.groupby(key, as_index=False)
+ .agg({lift_col: "mean"})
+ .rename(columns={lift_col: "전체"}))
+
+ scope = df_master.copy()
+ if seg != "ALL": scope = scope[scope["segment"].astype(str) == seg]
+ if mod != "ALL": scope = scope[scope["model"].astype(str) == mod]
+ if loy != "ALL": scope = scope[scope["loyalty"].astype(str) == loy]
+
+ if scope.empty:
+ return _empty_fig("No values")
+
+ selected = (scope.groupby(key, as_index=False)
+ .agg({lift_col: "mean"})
+ .rename(columns={lift_col: "선택"}))
+
+ merged = pd.merge(overall, selected, on=key, how="outer")
+ if merged.empty:
+ return _empty_fig("No values")
+
+ # 3) 정리: 키는 문자열로, 결측 수치만 0.0으로
+ merged[key] = merged[key].astype(str)
+ for col in ["전체", "선택"]:
+ if col in merged.columns:
+ merged[col] = pd.to_numeric(merged[col], errors="coerce")
+ merged[["전체","선택"]] = merged[["전체","선택"]].fillna(0.0)
+
+ # 정렬 순서(선택 오름차순이 기본, 전부 0이면 전체 기준)
+ if (merged["선택"] != 0).any():
+ order = merged.sort_values("선택", ascending=True)[key].tolist()
+ else:
+ order = merged.sort_values("전체", ascending=True)[key].tolist()
+
+ base = merged.set_index(key).loc[order]
+
+ # 4) 색상
+ vals_sel = base["선택"].to_numpy()
+ if key == "model":
+ dom_seg = _model_dominant_segment(df_master)
+ bar_colors = [_tier_color_for_segment(dom_seg.get(k, "LowEnd")) for k in order]
+ else:
+ bar_colors = royg_color_for(vals_sel)
+
+ # 5) 그림
+ fig = go.Figure()
+ fig.add_trace(go.Bar(
+ x=base["전체"], y=order, orientation="h", name="전체",
+ marker_color="rgba(150,150,150,0.35)"
+ ))
+ fig.add_trace(go.Bar(
+ x=vals_sel, y=order, orientation="h", name="선택",
+ marker=dict(color=bar_colors, line=dict(color=COL_GRAY, width=0.5)),
+ text=[f"{v:+.1f}" for v in vals_sel], textposition="outside"
+ ))
+ add_vline_safe(fig, 0, line_dash="dot", line_color=COL_GRAY)
+
+ fig.update_layout(
+ barmode="group",
+ title=f"{stage_label} Lift ({key})",
+ xaxis_title="Lift vs Galaxy",
+ height=320,
+ margin=dict(l=10, r=10, t=54, b=18),
+ paper_bgcolor="#ffffff", plot_bgcolor="#ffffff"
+)
+
+ # 공통 스타일 먼저
+ fig = apply_dense_grid(fig)
+
+ # 3) 위로 들러붙는 것 방지용으로 '위 여백+도메인' 덮어쓰기
+ fig.update_layout(margin=dict(l=10, r=10, t=68, b=28))
+ fig.update_yaxes(domain=[0.0, 0.86]) # 위쪽 14% 비워서 아래로 내림
+
+
+ # 4) 리턴
+ return fig
+
+def bubble_figure(
+ df_scope: pd.DataFrame,
+ lift_col: str,
+ snr_col: str,
+ label_top_n: int = 4,
+ label_inside: bool = False,
+ textfont_size: int = 11
+) -> go.Figure:
+ # --- 가드 ---
+ if df_scope is None or df_scope.empty:
+ return _empty_fig("No Bubble data")
+ if lift_col not in df_scope.columns or snr_col not in df_scope.columns:
+ return _empty_fig("No Bubble data")
+
+ s = df_scope.copy()
+ s[lift_col] = pd.to_numeric(s[lift_col], errors="coerce")
+ s[snr_col] = pd.to_numeric(s[snr_col], errors="coerce")
+ s["pref_sample_size"] = pd.to_numeric(
+ s.get("pref_sample_size", pd.Series(1, index=s.index)),
+ errors="coerce"
+ ).fillna(1.0)
+
+ key = "model" if ("model" in s.columns and s["model"].notna().any()) else (
+ "segment" if "segment" in s.columns else None)
+ if key is None:
+ return _empty_fig("No Bubble key")
+
+ need_cols = [key, lift_col, snr_col, "pref_sample_size"]
+ if "segment" in s.columns and "segment" not in need_cols:
+ need_cols.append("segment")
+
+ use = s[need_cols].dropna(subset=[lift_col, snr_col])
+ if use.empty:
+ return _empty_fig("No Bubble values")
+
+ # ---- 집계 ----
+ if "segment" in use.columns:
+ grp = (use.groupby(key, as_index=False)
+ .agg(x=(lift_col, "mean"),
+ y=(snr_col, "mean"),
+ n=("pref_sample_size", "sum"),
+ seg=("segment", "first")))
+ else:
+ grp = (use.groupby(key, as_index=False)
+ .agg(x=(lift_col, "mean"),
+ y=(snr_col, "mean"),
+ n=("pref_sample_size", "sum")))
+ grp["seg"] = np.nan
+
+ # ---- 색상 ----
+ dom_seg = _model_dominant_segment(df_scope)
+ def _color_for(row):
+ if key == "model":
+ base_seg = dom_seg.get(str(row[key]), row["seg"])
+ else:
+ base_seg = row["seg"] if pd.notna(row["seg"]) else row[key]
+ return _tier_color_for_segment(base_seg)
+ grp["color"] = grp.apply(_color_for, axis=1)
+
+ # ---- 버블 크기(√스케일) ----
+ n = grp["n"].astype(float).to_numpy()
+ if np.isfinite(n).any():
+ r = np.sqrt(np.maximum(n, 0))
+ r0, r1 = float(np.nanmin(r)), float(np.nanmax(r))
+ size = 24.0 if abs(r1 - r0) < 1e-9 else 12 + (r - r0)/(r1 - r0) * 48
+ else:
+ size = np.full(len(grp), 24.0)
+
+ # ---- 라벨 ----
+ labels_all = grp[key].astype(str).tolist()
+ if label_top_n is None or label_top_n == -1:
+ text = labels_all
+ elif label_top_n <= 0:
+ text = [""] * len(labels_all)
+ else:
+ top_idx = np.argsort(-grp["n"].to_numpy())[:label_top_n]
+ show = set(top_idx.tolist())
+ text = [labels_all[i] if i in show else "" for i in range(len(labels_all))]
+ hovertext = grp[key].astype(str)
+
+ # ===== 승/패 분할 경계 & 음영 =====
+ x_vals = grp["x"].astype(float).to_numpy()
+ y_vals = grp["y"].astype(float).to_numpy()
+ x_thr = 0.0 if (np.nanmin(x_vals) < 0 < np.nanmax(x_vals)) else float(np.nanmedian(x_vals))
+ y_thr = 2.0 if (np.nanmin(y_vals) <= 2.0 <= np.nanmax(y_vals)) else float(np.nanmedian(y_vals))
+
+ x_min, x_max = float(np.nanmin(x_vals)), float(np.nanmax(x_vals))
+ y_min, y_max = float(np.nanmin(y_vals)), float(np.nanmax(y_vals))
+ x_pad = (x_max - x_min) * 0.03 if np.isfinite(x_max - x_min) else 0.0
+ y_pad = (y_max - y_min) * 0.03 if np.isfinite(y_max - y_min) else 0.0
+ x0, x1 = x_min - x_pad, x_max + x_pad
+ y0, y1 = y_min - y_pad, y_max + y_pad
+
+ winner_fill = hex_to_rgba("#FDE68A", 0.16)
+ loser_fill = hex_to_rgba("#9CA3AF", 0.14)
+
+ fig = go.Figure()
+
+ add_vrect_safe(fig, x0, x_thr, y0=y_thr, y1=y1, fillcolor=loser_fill, layer="below")
+ add_vrect_safe(fig, x_thr, x1, y0=y_thr, y1=y1, fillcolor=winner_fill, layer="below")
+ add_vline_safe(fig, x_thr, line_dash="dot", line_color="#888", opacity=0.6)
+ add_hline_safe(fig, y_thr, line_dash="dot", line_color="#888", opacity=0.6)
+
+ fig.add_trace(go.Scatter(
+ x=grp["x"], y=grp["y"],
+ mode="markers+text",
+ text=text,
+ hovertext=hovertext,
+ textposition=("middle center" if label_inside else "top center"),
+ textfont=dict(size=textfont_size),
+ cliponaxis=False,
+ marker=dict(size=size, color=grp["color"], line=dict(color="#111", width=0.7)),
+ customdata=grp["n"].astype(float),
+ hovertemplate=(f"{key}=%{{hovertext}}
"
+ "Lift=%{x:.1f}
"
+ "SNR=%{y:.1f}
"
+ "표본=%{customdata:,}"),
+ name="모델/세그"
+ ))
+
+ # 기본 레이아웃
+ fig.update_layout(
+ title="Lift vs SNR (버블=표본수)",
+ xaxis_title=None,
+ yaxis_title="SNR",
+ height=320,
+ showlegend=False,
+ paper_bgcolor="#fff", plot_bgcolor="#fff",
+ margin=dict(l=10, r=10, t=26, b=48)
+ )
+ fig.update_xaxes(title_standoff=18, automargin=True)
+ fig.update_yaxes(title_standoff=8, automargin=True)
+
+ # 각주
+ foot_y = -0.20
+ fig.add_annotation(xref="paper", yref="paper", x=0.00, y=foot_y,
+ text="■", showarrow=False, font=dict(size=11, color="#FDE68A"))
+ fig.add_annotation(xref="paper", yref="paper", x=0.035, y=foot_y,
+ text="승자 영역 (Lift↑, SNR↑)", showarrow=False, font=dict(size=10, color="#555"), xanchor="left")
+ fig.add_annotation(xref="paper", yref="paper", x=0.32, y=foot_y,
+ text="■", showarrow=False, font=dict(size=11, color="#9CA3AF"))
+ fig.add_annotation(xref="paper", yref="paper", x=0.355, y=foot_y,
+ text="패자 영역 (Lift↓, SNR↑)", showarrow=False, font=dict(size=10, color="#555"), xanchor="left")
+ fig.add_annotation(xref="paper", yref="paper", x=0.67, y=foot_y,
+ text="○ 원 크기 = 표본수(√스케일)", showarrow=False, font=dict(size=10, color="#666"), xanchor="left")
+
+ # 공통 스타일 적용 후 '위로 들러붙음' 해소용 덮어쓰기
+ fig = apply_dense_grid(fig)
+ fig.update_layout(
+ height=320,
+ margin=dict(l=10, r=10, t=84, b=52), # ↑ 상단 여백 크게
+ title=dict(y=0.98, pad=dict(t=18, b=0)) # 타이틀도 살짝 내려줌
+ )
+ fig.update_yaxes(domain=[0.12, 1.00], automargin=True) # ↑ 플롯 영역 자체를 아래로
+ # x축 제목은 annotation으로
+ fig.add_annotation(
+ text="Lift vs Galaxy",
+ xref="x domain", yref="paper",
+ x=1, y=-0.10,
+ xanchor="right", yanchor="top",
+ showarrow=False,
+ font=dict(size=12)
+ )
+ return fig
+
+def ppc_purchase_overlay_figure(row: pd.Series, m: int | None = None, draws: int = 6000) -> go.Figure:
+ """관측 구매율과 Posterior(베타) & Posterior Predictive(베타-이항) 오버레이."""
+ # 관측치
+ n = _pick_sample_for_stage(row, "buy")
+ if n <= 0:
+ n = _safe_int0(row.get("pref_sample_size"))
+ p_obs = _safe_num(row.get("buy_success_rate"))
+ if not np.isfinite(p_obs):
+ return _empty_fig("No PPC data")
+ p_obs = float(np.clip(p_obs/100.0 if p_obs > 1.5 else p_obs, 0.0, 1.0))
+ k_obs = int(np.clip(round(p_obs * max(n, 1)), 0, max(n, 1)))
+ if m is None:
+ m = n
+
+ # Posterior (Jeffreys prior: Beta(0.5,0.5))
+ a, b = k_obs + 0.5, (n - k_obs) + 0.5
+ p = np.random.beta(a, b, size=draws)
+
+ # Posterior predictive (새 표본 m개 관측 시 비율)
+ m = max(int(m), 1)
+ k_pred = np.random.binomial(m, p)
+ rate_pred = k_pred / m
+
+ # 95% HDI
+ lo, hi = np.quantile(p, [0.025, 0.975])
+
+ fig = go.Figure()
+ fig.add_histogram(
+ x=p, nbinsx=60, histnorm="probability density",
+ name="Posterior p", marker_color=hex_to_rgba("#9CA3AF", 0.45), opacity=0.55
+ )
+ fig.add_histogram(
+ x=rate_pred, nbinsx=60, histnorm="probability density",
+ name=f"PPC n={m:,}", marker_color=hex_to_rgba(COL_STAGE_BUY, 0.55), opacity=0.55
+ )
+
+ # 관측치/구간 표시
+ add_vline_safe(fig, p_obs, line_color="#111", line_width=2, opacity=0.9)
+ fig.add_vrect(x0=lo, x1=hi, fillcolor=hex_to_rgba("#60A5FA", 0.18), line_width=0)
+
+ # ← 핵심: 범례를 아래로(도면 밖) 보내고 아주 작게
+ fig.update_layout(
+ barmode="overlay",
+ title="PPC(구매율) — Posterior & Posterior Predictive",
+ height=320,
+ margin=dict(l=10, r=10, t=30, b=64), # 바닥 여백 확보
+ showlegend=True,
+ legend=dict(
+ orientation="h",
+ y=-0.22, yanchor="top", # 플롯 아래쪽, 도면 밖
+ x=0.0, xanchor="left",
+ font=dict(size=9),
+ itemsizing="constant",
+ itemwidth=30
+ )
+ )
+ fig.update_xaxes(range=[0, 1], tickformat=".0%", title="구매율")
+ fig.update_yaxes(title="밀도")
+ return apply_dense_grid(fig, x_prob=True)
+
+percent1 = FormatTemplate.percentage(1)
+num1 = Format(precision=1, scheme=Scheme.fixed)
+
+CARD_STYLE = {
+ "background": "white",
+ "border": "none", # ← 보더 제거
+ "borderRadius": "14px",
+ "padding": "14px",
+ "boxShadow": "none", # ← 그림자도 제거(원하면 유지)
+}
+# (추가) KPI 전용 카드 — 하늘색 배경
+KPI_CARD_STYLE = {
+ **CARD_STYLE,
+ "background": "#EAF2FF",
+ "border": "1px solid #d6e4ff"
+}
+
+ROW2_CARD_H = 360
+ROW2_GRAPH_H = 320
+
+# ───────────── spacing knobs (한 곳에서 조절) ─────────────
+ROW_GAP = "16px" # 카드 사이 간격
+PAGE_PAD = "24px 28px 24px" # 행 안쪽 패딩
+CARD_H = "430px" # 카드(박스) 높이
+GRAPH_H = "390px" # 카드 안 그래프 높이 (CARD_H보다 40px 작게)
+KPI_GAP = "12px" # KPI 카드 간격
+
+ROW1_COLS = "1fr 1fr 1fr" # 상단 3카드 동일 너비
+ROW2_COLS = "1fr 1fr 1fr" # 하단 3카드 동일 너비
+
+# ───────────────── app.layout 교체 ─────────────────
+# ───────────── spacing & sizing knobs ─────────────
+TOP_CARD_H = "430px" # 맨 위 3개 카드 박스 높이
+TOP_GRAPH_H = "390px" # 박스 안 그래프 높이 (탭/제목 여백 고려해 TOP_CARD_H - 40)
+ROW_CARD_H = "420px" # 아래 행 카드 높이
+ROW_GRAPH_H = "380px"
+
+PAGE_PAD = "24px 28px 24px" # 각 행 내부 패딩
+ROW_GAP = "16px" # 카드 사이 간격
+KPI_GAP = "12px"
+
+# 카드 공통 스타일: flex column으로 그래프가 꽉 차도록
+CARD_STYLE = {
+ "background": "#fff",
+ "borderRadius": "12px",
+ "padding": "12px",
+ "boxShadow": "0 1px 3px rgba(0,0,0,0.06)",
+ "display": "flex",
+ "flexDirection": "column",
+}
+
+# 그래프 내부 여백/레전드/텍스트를 통일해 보이는 영역을 맞춤
+def standardize_top_fig(fig):
+ fig.update_layout(
+ margin=dict(l=28, r=16, t=36, b=28),
+ title_x=0.02,
+ title_pad=dict(t=4, b=4),
+ uniformtext=dict(minsize=10, mode="hide"),
+ legend=dict(orientation="h", x=0, y=-0.2), # 하단 가로배치 → 높이 편차 제거
+ )
+ # 축이 있는 차트는 automargin
+ for ax in ("xaxis", "yaxis"):
+ if ax in fig.layout:
+ fig.layout[ax].update(automargin=True, title_standoff=6)
+ return fig
+
+# ───────────────── app.layout 교체 ─────────────────
+# ───────────── spacing & sizing knobs ─────────────
+TOP_CARD_H = "430px" # 맨 위 3개 카드 박스 높이
+TOP_GRAPH_H = "390px" # 박스 안 그래프 높이 (탭/제목 여백 고려해 TOP_CARD_H - 40)
+ROW_CARD_H = "420px" # 아래 행 카드 높이
+ROW_GRAPH_H = "380px"
+
+PAGE_PAD = "24px 28px 24px" # 각 행 내부 패딩
+ROW_GAP = "16px" # 카드 사이 간격
+KPI_GAP = "12px"
+
+# 카드 공통 스타일: flex column으로 그래프가 꽉 차도록
+CARD_STYLE = {
+ "background": "#fff",
+ "borderRadius": "12px",
+ "padding": "12px",
+ "boxShadow": "0 1px 3px rgba(0,0,0,0.06)",
+ "display": "flex",
+ "flexDirection": "column",
+}
+
+# 그래프 내부 여백/레전드/텍스트를 통일해 보이는 영역을 맞춤
+def standardize_top_fig(fig):
+ fig.update_layout(
+ margin=dict(l=28, r=16, t=36, b=28),
+ title_x=0.02,
+ title_pad=dict(t=4, b=4),
+ uniformtext=dict(minsize=10, mode="hide"),
+ legend=dict(orientation="h", x=0, y=-0.2), # 하단 가로배치 → 높이 편차 제거
+ )
+ # 축이 있는 차트는 automargin
+ for ax in ("xaxis", "yaxis"):
+ if ax in fig.layout:
+ fig.layout[ax].update(automargin=True, title_standoff=6)
+ return fig
+
+# ───────────────── app.layout 교체 ─────────────────
+ROW_GAP = "16px" # 카드 사이 간격
+PAGE_PAD = "24px 28px 24px" # 행 안쪽 패딩
+CARD_H = "430px" # 카드(박스) 높이
+GRAPH_H = "390px" # 카드 안 그래프 높이 (CARD_H보다 40px 작게)
+KPI_GAP = "12px" # KPI 카드 간격
+
+ROW1_COLS = "1fr 1fr 1fr" # 상단 3카드 동일 너비
+ROW2_COLS = "1fr 1fr 1fr" # 하단 3카드 동일 너비
+
+app.layout = html.Div(
+ [
+ dcc.Store(id="store-master"),
+ dcc.Store(id="store-tm"),
+ dcc.Store(id="store-sankey"),
+ dcc.Store(id="store-overall"),
+ dcc.Store(id="store-mod-opts"),
+
+ # Sankey 드래그 토글 + 인터랙션 로그
html.Div(
[
+ dcc.Checklist(
+ id="sankey-drag",
+ options=[{"label": " Sankey 드래그 허용", "value": "drag"}],
+ value=[],
+ inputStyle={"marginRight": "6px"},
+ style={"fontSize": "12px", "color": "#555"},
+ ),
+ html.Div(id="interact-msg", style={"marginTop": "6px","fontSize": "12px","color": "#444"}),
+ ],
+ style={"display":"flex","justifyContent":"space-between","alignItems":"center","padding":"0 16px 8px"},
+ ),
+
+ # 상단 바
+ html.Div(
+ [
+ html.Div("Bayesian Journey Dashboard", style={"fontWeight":"700","fontSize":"18px"}),
html.Div(
[
- html.Div(
- [
- html.Span(
- "Stage",
- style={"fontSize": "12px", "color": "#666", "marginRight": "8px"},
- ),
- dcc.Dropdown(
- id="dd-stage-rank",
- options=[{"label": v, "value": v} for v in ["선호", "추천", "구매의향", "구매"]],
- value="구매",
- clearable=False,
- style={"width": "140px", "fontSize": "12px"},
- ),
- ],
- style={
- "display": "flex",
- "justifyContent": "flex-end",
- "alignItems": "center",
- "marginBottom": "6px",
- },
- ),
- dcc.Graph(
- id="fig-stage-rank",
- config=GRAPH_CONFIG | {"responsive": True},
- style={"height": GRAPH_H, "width": "100%"},
- ),
+ dcc.Input(id="excel-path", value=DEFAULT_PATH, placeholder="Excel 경로",
+ style={"width":"520px","marginRight":"8px"}),
+ html.Button("Load", id="load-btn", n_clicks=0, className="btn", style={"marginRight":"8px"}),
],
- style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"},
+ style={"display":"flex","alignItems":"center"},
),
+ ],
+ style={"display":"flex","justifyContent":"space-between","alignItems":"center",
+ "padding":"12px 16px","borderBottom":"1px solid #eee","position":"sticky",
+ "top":"0","background":"#fafafa","zIndex":10},
+ ),
+
+ html.Div(id="status-msg", style={"padding":"8px 16px","color":"#555","fontSize":"12px"}),
+
+ # 필터
+ html.Div(
+ [
+ html.Div([html.Label("Segment", style={"fontWeight":"600"}),
+ dcc.Dropdown(id="dd-seg", options=[], value="ALL", clearable=True)],
+ style={"flex":"1","minWidth":"220px","marginRight":"8px"}),
+ html.Div([html.Label("Model", style={"fontWeight":"600"}),
+ dcc.Dropdown(id="dd-mod", options=[], value="ALL", clearable=True)],
+ style={"flex":"1","minWidth":"220px","marginRight":"8px"}),
+
+ html.Div([html.Label("Loyalty", style={"fontWeight":"600"}),
+ dcc.Dropdown(id="dd-loy", options=[], value="ALL", clearable=True)],
+ style={"flex":"1","minWidth":"220px"}),
+ ],
+ style={"display":"flex","gap":"8px","padding":"12px 16px"},
+ ),
+
+ # KPI
+ html.Div(
+ [
+ html.Div([html.Div("표본 수", style={"color":"#888","fontSize":"12px"}),
+ html.H3(id="kpi-sample", style={"margin":"4px 0 0"})], style=KPI_CARD_STYLE),
+ html.Div([html.Div("최종 구매율 (Δ 포함)", style={"color":"#888","fontSize":"12px"}),
+ html.H3(id="ins-final", style={"margin":"4px 0 0"})], style=KPI_CARD_STYLE),
+ html.Div([html.Div("최대 드롭", style={"color":"#888","fontSize":"12px"}),
+ html.H3(id="ins-drop", style={"margin":"4px 0 0","fontSize":"18px"})], style=KPI_CARD_STYLE),
+ html.Div([html.Div("불확실성 (95% HDI 폭)", style={"color":"#888","fontSize":"12px"}),
+ html.H3(id="ins-uncert", style={"margin":"4px 0 0"})], style=KPI_CARD_STYLE),
+ ],
+ style={
+ "display":"grid",
+ "gridTemplateColumns":"repeat(4, minmax(0,1fr))",
+ "gap": KPI_GAP,
+ "padding":"0 16px 12px"
+ },
+ ),
+
+ # 숨김 KPI(호환)
+ html.Div([html.H3(id="kpi-buy-success"), html.H3(id="kpi-buy-fail")], style={"display":"none"}),
+
+ # Row 1: Sankey + 전이 퍼널(누적율) + (워터폴/PPC 탭)
+ html.Div(
+ [
html.Div(
dcc.Graph(
- id="fig-forest",
+ id="fig-sankey",
config=GRAPH_CONFIG | {"responsive": True},
- style={"height": GRAPH_H, "width": "100%"},
+ style={"height": GRAPH_H, "width": "100%"}
),
- style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"},
+ style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"} # ← 고정/클립
),
html.Div(
dcc.Graph(
- id="fig-bubble",
+ id="fig-matrix",
config=GRAPH_CONFIG | {"responsive": True},
- style={"height": GRAPH_H, "width": "100%"},
+ style={"height": GRAPH_H, "width": "100%"}
),
+ style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"}
+ ),
+
+ html.Div(
+ [
+ dcc.Tabs(
+ id="tab-right", value="waterfall",
+ children=[
+ dcc.Tab(label="워터폴", value="waterfall"),
+ dcc.Tab(label="PPC(구매율)", value="ppc"),
+ ],
+ style={"marginBottom":"6px"},
+ ),
+ dcc.Graph(
+ id="fig-right",
+ config=GRAPH_CONFIG | {"responsive": True},
+ style={"height": GRAPH_H, "width": "100%"}
+ ),
+ ],
style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"},
),
],
style={
- "display": "grid",
- "gridTemplateColumns": ROW2_COLS,
+ "display":"grid",
+ "gridTemplateColumns": ROW1_COLS,
"gap": ROW_GAP,
"padding": PAGE_PAD,
- "marginTop": "4px",
+ "marginBottom":"22px",
},
),
+
+# Row 2: 스테이지 리프트 + 포레스트 + 버블
+html.Div(
+ [
+ html.Div(
+ [
+ html.Div(
+ [
+ html.Span(
+ "Stage",
+ style={"fontSize": "12px", "color": "#666", "marginRight": "8px"},
+ ),
+ dcc.Dropdown(
+ id="dd-stage-rank",
+ options=[{"label": v, "value": v} for v in ["선호", "추천", "구매의향", "구매"]],
+ value="구매",
+ clearable=False,
+ style={"width": "140px", "fontSize": "12px"},
+ ),
+ ],
+ style={
+ "display": "flex",
+ "justifyContent": "flex-end",
+ "alignItems": "center",
+ "marginBottom": "6px",
+ },
+ ),
+ dcc.Graph(
+ id="fig-stage-rank",
+ config={**GRAPH_CONFIG, "responsive": True},
+ style={"height": GRAPH_H, "width": "100%"},
+ ),
+ ],
+ style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"},
+ ),
+
+ html.Div(
+ dcc.Graph(
+ id="fig-forest",
+ config={**GRAPH_CONFIG, "responsive": True},
+ style={"height": GRAPH_H, "width": "100%"},
+ ),
+ style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"},
+ ),
+
+ html.Div(
+ dcc.Graph(
+ id="fig-bubble",
+ config={**GRAPH_CONFIG, "responsive": True},
+ style={"height": GRAPH_H, "width": "100%"},
+ ),
+ style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"},
+ ),
+ ],
+ style={
+ "display": "grid",
+ "gridTemplateColumns": ROW2_COLS,
+ "gap": ROW_GAP,
+ "padding": PAGE_PAD,
+ "marginTop": "4px",
+ },
+),
+
+ # 숨김 그래프
+ html.Div(
+ [
+ dcc.Graph(id="fig-survival", config=GRAPH_CONFIG, style={"height": GRAPH_H}),
+ dcc.Graph(id="fig-funnel", config=GRAPH_CONFIG, style={"height": GRAPH_H}),
+ ],
+ style={"display":"none"},
+ ),
+
+ # 상세 테이블
+ html.Div(
+ [
+ html.H4("상세 메트릭", style={"margin":"0 0 8px 0"}),
+ dash_table.DataTable(
+ id="metrics-table",
+ columns=[
+ {"name": "단계", "id": "단계"},
+ {"name": "베이스수", "id": "베이스수", "type": "numeric",
+ "format": Format(precision=0, scheme=Scheme.fixed)},
+ {"name": "성공확률", "id": "성공확률", "type": "numeric", "format": percent1},
+ {"name": "실패확률", "id": "실패확률", "type": "numeric", "format": percent1},
+ {"name": "하한", "id": "하한", "type": "numeric", "format": percent1},
+ {"name": "상한", "id": "상한", "type": "numeric", "format": percent1},
+ {"name": "판정", "id": "판정"},
+ {"name": "평가등급", "id": "평가등급"},
+ {"name": "SNR", "id": "SNR", "type": "numeric", "format": num1},
+ {"name": "Lift", "id": "Lift", "type": "numeric", "format": num1},
+ {"name": "raw평균", "id": "raw평균", "type": "numeric", "format": percent1},
+ {"name": "raw표준편차", "id": "raw표준편차", "type": "numeric", "format": percent1},
+ ],
+ data=[],
+ page_size=10,
+ style_table={"overflowX":"auto"},
+ style_cell={"fontFamily":"Noto Sans KR, Arial, sans-serif","fontSize":"12px","padding":"6px"},
+ style_header={"fontWeight":"bold"},
+ style_data_conditional=[
+ {"if": {"column_id": "베이스수"}, "textAlign": "right"},
+ {"if": {"column_id": "성공확률"}, "textAlign": "right"},
+ {"if": {"column_id": "실패확률"}, "textAlign": "right"},
+ {"if": {"column_id": "하한"}, "textAlign": "right"},
+ {"if": {"column_id": "상한"}, "textAlign": "right"},
+ {"if": {"column_id": "SNR"}, "textAlign": "right"},
+ {"if": {"column_id": "Lift"}, "textAlign": "right"},
+ {"if": {"column_id": "raw평균"}, "textAlign": "right"},
+ {"if": {"column_id": "raw표준편차"}, "textAlign": "right"},
+ {"if": {"row_index": "odd"}, "backgroundColor": "#fafafa"},
+ ],
+ ),
+ ],
+ style={**CARD_STYLE, "margin":"18px 16px 24px"},
+ ),
+ ],
+ style={"background":"#f6f7fb","minHeight":"100vh"},
+)
+# ───────────────── app.layout 교체 끝 ─────────────────
+
+# ===================== 콜백: Load =====================
+@app.callback(
+ Output("store-master","data"),
+ Output("store-tm","data"),
+ Output("store-sankey","data"),
+ Output("store-overall","data"),
+ Output("dd-seg","options"),
+ Output("dd-seg","value"),
+ Output("store-mod-opts","data"),
+ Output("dd-loy","options"),
+ Output("dd-loy","value"),
+ Output("status-msg","children"),
+ Input("load-btn","n_clicks"),
+ State("excel-path","value"),
+ prevent_initial_call=True
+)
+def on_load(n, path):
+ try:
+ exists = os.path.exists(path)
+ size = (os.path.getsize(path) if exists else 0)
+
+ # 1) 엑셀 로드
+ df_master, df_tm, df_sankey, overall, seg_opts, mod_opts_all, loy_opts, dbg = load_excel(path)
+
+ # 2) 마스터로부터 모든 조합 Sankey 캐시 합성
+ df_sankey_syn = build_sankey_cache_from_master(df_master, collapse_to_buy=True)
+
+ # 3) 상태 메시지(캐시 행수 포함)
+ status = (f"✅ 로드 완료 | path={path} (exists={exists}, size={size:,} bytes) | "
+ f"engine={dbg.get('engine')} | sheets={dbg.get('sheets')} | matched={dbg.get('matched')} | "
+ f"sankey_cache={len(df_sankey_syn):,} rows")
+
+ # 4) 리턴: 세 번째(store-sankey)에 캐시를 넣는다
+ return (
+ df_master.to_json(date_format="iso", orient="split"),
+ df_tm.to_json(date_format="iso", orient="split"),
+ df_sankey_syn.to_json(date_format="iso", orient="split"), # ⬅ 여기!
+ json.dumps(overall),
+ [{"label":v, "value":v} for v in seg_opts], "ALL",
+ json.dumps(mod_opts_all),
+ [{"label":v, "value":v} for v in loy_opts], "ALL",
+ status
+ )
+ except Exception as e:
+ err = f"❌ LOAD ERROR: {type(e).__name__}: {e}"
+ print("LOAD ERROR TRACE:\n", traceback.format_exc())
+ return None, None, None, None, [], None, None, [], None, err
+
+
+# 세그먼트 변경 시 모델 옵션 업데이트
+@app.callback(
+ Output("dd-mod","options"),
+ Output("dd-mod","value"),
+ Input("dd-seg","value"),
+ State("store-master","data"),
+ State("store-mod-opts","data"),
+)
+def on_seg_change(seg, js_master, js_allmods):
+ if not js_master or not js_allmods:
+ return [], None
+ df_master = pd.read_json(js_master, orient="split")
+ seg_val = _as_all(seg)
+ if seg_val!="ALL":
+ mods = ["ALL"] + sorted([str(v) for v in df_master[df_master["segment"].astype(str)==seg_val]["model"].dropna().astype(str).unique().tolist() if str(v)!="ALL"])
+ else:
+ mods = json.loads(js_allmods)
+ return [{"label":v,"value":v} for v in mods], "ALL"
+
+@app.callback(
+ Output("interact-msg","children"),
+ Input("fig-sankey","clickData"),
+ Input("fig-matrix","relayoutData"),
+ Input("fig-right","relayoutData"),
+ Input("fig-stage-rank","selectedData"),
+ Input("fig-forest","selectedData"),
+ Input("fig-bubble","selectedData"),
+ prevent_initial_call=True
+)
+def on_interact(sankey_click, matrix_relayout, wf_relayout, rank_sel, forest_sel, bubble_sel):
+ ctx = dash.callback_context
+ if not ctx.triggered:
+ return dash.no_update
+
+ tid = ctx.triggered[0]["prop_id"] # e.g. "fig-bubble.selectedData"
+ comp, prop = tid.split(".")
+ payload = ctx.triggered[0]["value"]
+
+ if prop == "clickData" and payload:
+ pt = (payload.get("points") or [{}])[0]
+ label = pt.get("label") or f"{pt.get('sourceLabel','?')}→{pt.get('targetLabel','?')}"
+ return f"🖱 {comp}: {label} 클릭"
+ if prop == "selectedData" and payload:
+ n = len(payload.get("points", []))
+ return f"🔎 {comp}: {n}개 선택"
+ if prop == "relayoutData" and payload:
+ keys = ", ".join(list(payload.keys())[:3])
+ return f"🧭 {comp}: 뷰 변경({keys}...)"
+
+ return dash.no_update
+
+# update_all 위쪽(같은 파일)에 추가
+def _slice_sankey_cache_by_choice(df, seg, mod, loy):
+ if df is None or df.empty:
+ return pd.DataFrame()
+ sub = df.copy()
+ if "segment" in sub.columns and seg != "ALL":
+ sub = sub[(sub["segment"].astype(str) == seg) | sub["segment"].isna() | (sub["segment"].astype(str) == "ALL")]
+ if "model" in sub.columns and mod != "ALL":
+ sub = sub[(sub["model"].astype(str) == mod) | sub["model"].isna() | (sub["model"].astype(str) == "ALL")]
+ if "loyalty" in sub.columns and loy != "ALL":
+ sub = sub[(sub["loyalty"].astype(str) == loy) | sub["loyalty"].isna() | (sub["loyalty"].astype(str) == "ALL")]
+ if "level" in sub.columns:
+ for lv in LVL_PRIORITY:
+ cand = sub[sub["level"].astype(str) == lv]
+ if not cand.empty:
+ return cand.copy()
+ return sub
+
+
+ # 레벨 우선순위(가장 세분화된 것부터)로 하나만 남기기
+ if "level" in sub.columns:
+ for lv in LVL_PRIORITY:
+ cand = sub[sub["level"].astype(str) == lv]
+ if not cand.empty:
+ return cand.copy()
+ return sub
+
+def _read_df_store(js):
+ if not js:
+ return pd.DataFrame()
+ # 이미 dict/object로 들어오면 시도
+ if isinstance(js, dict):
+ if {"columns","data"}.issubset(js.keys()):
+ return pd.DataFrame(js["data"], columns=js["columns"])
+ try:
+ return pd.DataFrame(js)
+ except Exception:
+ return pd.DataFrame()
+ # 문자열이면 우선 split → 실패 시 일반 json 해석
+ if isinstance(js, str):
+ try:
+ return pd.read_json(io.StringIO(js), orient="split")
+ except Exception:
+ try:
+ obj = json.loads(js)
+ if isinstance(obj, dict) and {"columns","data"}.issubset(obj.keys()):
+ return pd.DataFrame(obj["data"], columns=obj["columns"])
+ elif isinstance(obj, list):
+ return pd.DataFrame(obj)
+ elif isinstance(obj, dict):
+ # overall 같은 dict가 오면 DF로 만들지 않고 빈 DF 반환
+ return pd.DataFrame()
+ except Exception:
+ return pd.DataFrame()
+ return pd.DataFrame()
+
+def _read_overall(js_overall):
+ if not js_overall:
+ return {}
+ if isinstance(js_overall, dict):
+ return js_overall
+ try:
+ return json.loads(js_overall)
+ except Exception:
+ return {}
+
+# ===================== 콜백: 대시보드 계산 =====================
+@app.callback(
+ Output("kpi-sample","children"),
+ Output("kpi-buy-success","children"),
+ Output("kpi-buy-fail","children"),
+ Output("ins-final","children"),
+ Output("ins-drop","children"),
+ Output("ins-uncert","children"),
+ Output("metrics-table","data"),
+ Output("fig-sankey","figure"),
+ Output("fig-matrix","figure"),
+ #Output("fig-simfan","figure"),
+ Output("fig-bubble","figure"),
+ Output("fig-stage-rank","figure"),
+ Output("fig-survival","figure"),
+ Output("fig-right","figure"),
+ # Output("fig-waterfall","figure"),
+ Output("fig-funnel","figure"),
+ Output("fig-forest","figure"),
+ Input("dd-seg","value"),
+ Input("dd-mod","value"),
+ Input("dd-loy","value"),
+ Input("sankey-drag","value"),
+ Input("dd-stage-rank","value"),
+ Input("tab-right","value"), # ← 추가
+ Input("store-master","data"),
+ Input("store-tm","data"),
+ Input("store-sankey","data"),
+ Input("store-overall","data"),
+)
+
+def update_all(seg, mod, loy, drag_val, stage_label, tab_right,
+ js_master, js_tm, js_sankey, js_overall=None):
+ # 기본값 보정
+ seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy)
+ if not isinstance(stage_label, str) or not stage_label:
+ stage_label = "구매"
+ empty = _empty_fig("Load data first")
+
+ # 가드: 마스터 없으면 15개 템플릿 리턴
+ if not js_master:
+ return (
+ "–", "–", "–", # kpi-sample, kpi-buy-success, kpi-buy-fail
+ "–", "–", "–", # ins-final, ins-drop, ins-uncert
+ [], # metrics-table.data
+ empty, empty, # fig-sankey, fig-matrix
+ empty, empty, # fig-bubble, fig-stage-rank
+ empty, empty, # fig-survival, fig-right
+ empty, # fig-funnel
+ empty # fig-forest
+ )
+
+ js_sankey, js_overall, _ = _maybe_swap_sankey_overall(js_sankey, js_overall)
+
+ try:
+ # 0) sankey/overall 뒤바뀜 자동 교정
+ js_sankey, js_overall, _ = _maybe_swap_sankey_overall(js_sankey, js_overall)
+
+ # 1) 스토어 읽기(안전)
+ df_master = _read_df_store(js_master)
+ df_tm = _read_df_store(js_tm)
+ df_sankey = _read_df_store(js_sankey)
+ overall = _read_overall(js_overall)
+
+ # 2) 선택/스코프
+ row_pick = pick_row_for(df_master, seg, mod, loy)
+ scope = df_master.copy()
+ if seg!="ALL": scope = scope[scope["segment"].astype(str)==seg]
+ if mod!="ALL": scope = scope[scope["model"].astype(str)==mod]
+ if loy!="ALL": scope = scope[scope["loyalty"].astype(str)==loy]
+
+ # 집계행으로 결측 보강
+ row_agg = compose_composite_row(scope)
+ rowd = {k: row_pick[k] for k in row_pick.index}
+
+ def _safe_num_or_nan(x):
+ try:
+ fx = float(x)
+ return fx if np.isfinite(fx) else np.nan
+ except Exception:
+ return np.nan
+
+ def coalesce_into(dst_dict, src_series, cols):
+ for c in cols:
+ va = _safe_num_or_nan(dst_dict.get(c))
+ if np.isnan(va):
+ dst_dict[c] = (src_series.get(c) if isinstance(src_series, pd.Series) else np.nan)
+
+ core_cols = [
+ "pref_sample_size",
+ "pref_success_rate","pref_ci_lower","pref_ci_upper",
+ "rec_success_rate","rec_ci_lower","rec_ci_upper",
+ "intent_success_rate","intent_ci_lower","intent_ci_upper",
+ "buy_success_rate","buy_ci_lower","buy_ci_upper",
+ "bayesian_dropout_pref_to_rec","bayesian_dropout_rec_to_intent","bayesian_dropout_intent_to_buy",
+ "bayesian_full_conversion",
+ "pref_snr","rec_snr","intent_snr","buy_snr",
+ "pref_lift_vs_galaxy","rec_lift_vs_galaxy","intent_lift_vs_galaxy","buy_lift_vs_galaxy",
+ ]
+ coalesce_into(rowd, row_agg, core_cols)
+ row = pd.Series(rowd)
+
+ # 3) KPI/테이블
+ tbl = metrics_table_row(row)
+
+ def _face(val, good, soso, reverse=False):
+ if not np.isfinite(val): return "❔"
+ v = (1 - val) if reverse else val
+ return "🟢" if v >= good else ("🟡" if v >= soso else "🔴")
+
+ GOOD_P, SOSO_P = 0.55, 0.45
+ GOOD_DROP, SOSO_DROP = 0.20, 0.35
+ GOOD_W, SOSO_W = 0.08, 0.12
+
+ sample = _safe_int0(row.get("pref_sample_size"))
+ kpi_sample_text = f"📊 {sample:,}"
+
+ buy_p = _safe_num(row.get("buy_success_rate"))
+ buy_s = (f"{buy_p:.1%}" if np.isfinite(buy_p) else "N/A")
+ buy_f = (f"{(1-buy_p):.1%}" if np.isfinite(buy_p) else "N/A")
+
+ overall_buy = _safe_num(overall.get("buy_mean"))
+ delta = (buy_p - overall_buy) if (np.isfinite(buy_p) and np.isfinite(overall_buy)) else np.nan
+ face_final = _face(buy_p, GOOD_P, SOSO_P, reverse=False)
+ ins_final = (f"{face_final} 성공 {buy_s} / 실패 {buy_f} (vs 전체 {delta:+.1%}p)"
+ if np.isfinite(delta) else f"{face_final} 성공 {buy_s} / 실패 {buy_f}")
+
+ d1, d2, d3, _ = drops_from_anywhere(row, df_tm, seg, mod, loy)
+ drops = [v for v in [d1, d2, d3] if np.isfinite(v)]
+ dmax = max(drops) if drops else np.nan
+ face_drop = _face(dmax, GOOD_DROP, SOSO_DROP, reverse=True)
+ ins_drop = f"{face_drop} " + biggest_drop_text_by_sources(row, df_tm, seg, mod, loy)
+
+ def _widest_hdi(r):
+ pick = []
+ for stage, lo_col, hi_col in [("선호","pref_ci_lower","pref_ci_upper"),
+ ("추천","rec_ci_lower","rec_ci_upper"),
+ ("구매의향","intent_ci_lower","intent_ci_upper"),
+ ("구매","buy_ci_lower","buy_ci_upper")]:
+ lo = _safe_num(r.get(lo_col)); hi = _safe_num(r.get(hi_col))
+ if np.isfinite(lo) and np.isfinite(hi):
+ pick.append((stage, max(0.0, hi - lo)))
+ return max(pick, key=lambda x: x[1]) if pick else (None, np.nan)
+
+ stage_w, width_w = _widest_hdi(row)
+ face_unc = _face(width_w, GOOD_W, SOSO_W, reverse=True)
+ ins_uncert = "데이터 없음" if stage_w is None else f"{face_unc} {stage_w} 단계 {width_w*100:.1f}%p"
+
+ # 4) Sankey (캐시 정규화 → 보강)
+ g_for_sankey = build_sankey_flow_table(df_sankey, seg=seg, mod=mod, loy=loy, collapse_to_buy=True)
+ if g_for_sankey is None or g_for_sankey.empty:
+ # 완전 비면 현재 row로 즉석 합성
+ g_for_sankey = _sankey_from_master_row(row, seg, mod, loy)
+ g_for_sankey = add_collapsed_to_buy(g_for_sankey, add_from=("선호","추천","구매의향"))
+
+ fig_sankey = sankey_figure(
+ df_sankey=None,
+ seg=seg, mod=mod, loy=loy,
+ drag=("drag" in (drag_val or [])),
+ table_override=g_for_sankey
+ )
+
+ # 5) 나머지 그래프
+ fig_matrix = matrix_funnel_figure(row, df_tm, seg, mod, loy)
+ lift_col = "buy_lift_vs_galaxy" if "buy_lift_vs_galaxy" in scope.columns else "pref_lift_vs_galaxy"
+ snr_col = "buy_snr" if "buy_snr" in scope.columns else "pref_snr"
+ fig_bubble = bubble_figure(scope, lift_col, snr_col)
+ fig_stage_rank = compare_distribution_figure(df_master, seg, mod, loy, stage_label)
+ fig_survival = survival_curve_figure(row, df_tm, seg, mod, loy)
+ fig_funnel = stacked_funnel_figure(row)
+ fig_forest = forest_figure(scope)
+
+ fig_right = (ppc_purchase_overlay_figure(row)
+ if (tab_right or "waterfall") == "ppc"
+ else waterfall_figure(row, df_tm, seg, mod, loy))
+
+ # 6) 최종 15개 리턴(콜백 Output 순서대로)
+ return (
+ kpi_sample_text, buy_s, buy_f, # kpi-sample, kpi-buy-success, kpi-buy-fail
+ ins_final, ins_drop, ins_uncert, # 인사이트 3개
+ tbl.to_dict("records"), # metrics-table.data
+ fig_sankey, fig_matrix, # sankey, matrix
+ fig_bubble, fig_stage_rank, # bubble, stage-rank
+ fig_survival, fig_right, # survival, right-panel(waterfall/ppc)
+ fig_funnel, # funnel
+ fig_forest # forest
+ )
+
+ except Exception:
+ print("UPDATE ERROR:\n", traceback.format_exc())
+ return (
+ "–","–","–","–","–","–",
+ [],
+ empty, empty, empty, empty, empty, empty, empty, empty
+ )
+
+# ===================== 실행 =====================
+if __name__ == "__main__":
+ base_port = int(os.getenv("PORT", "8059"))
+ for i in range(5):
+ try:
+ app.run_server(host="0.0.0.0", port=base_port + i, debug=False, use_reloader=False)
+ break
+ except (OSError, SystemExit) as e:
+ if "Address already in use" in str(e) or getattr(e, "code", None) == 1:
+ continue
+ raise
+