# -*- coding: utf-8 -*- # app.py — Bayesian Journey Dashboard (Colab-friendly, robust Excel + plots) # Fixes: # - ✅ 정규화 유틸(_as_all, _ensure_key_cols 등) 포함 # - ✅ pick_row_for 포함 # - ✅ Plotly 축 그리드 속성 정리(유효하지 않은 prop 제거) # - ✅ 포트 충돌 시 자동 대체 포트로 재시도 import os, json, re, traceback import numpy as np import pandas as pd import plotly.graph_objects as go import plotly.express as px from dash import Dash, html, dcc, dash_table, Input, Output, State from dash.dash_table import FormatTemplate from dash.dash_table.Format import Format, Scheme import dash # (NEW) 인터랙션 로그용 # (파일 상단 import 근처에 추가) import io import hashlib FLOW_SALT = os.getenv("FLOW_SALT", "phi-v1-2025-01") # 필요시 환경변수로 바꿔치기 가능 FLOW_SALT = os.getenv("FLOW_SALT", "phi-v1-2025-01") FLOW_GLOBAL = True # True면 전역 고정, False면 해시 기반 GLOBAL_K = 11.3 def _flow_scale(seg, mod, loy): if FLOW_GLOBAL: return GLOBAL_K key = f"{seg}|{mod}|{loy}|{FLOW_SALT}" h = int(hashlib.sha256(key.encode("utf-8")).hexdigest()[:8], 16) return 7.5 + (h % 1100) / 100.0 # ======== 인터랙션 공용 설정 ======== GRAPH_CONFIG = { "displayModeBar": True, "scrollZoom": True, # 휠로 줌 "doubleClick": "reset", # 더블클릭 리셋 "modeBarButtonsToAdd": ["lasso2d", "select2d"], "showTips": True, } # ===================== 기본 경로 ===================== from pathlib import Path ROOT = Path(__file__).resolve().parent DEFAULT_PATH = os.getenv("EXCEL_PATH", str(ROOT / "bayesian_analysis_total_v1.xlsx")) EXCEL_PATH = DEFAULT_PATH # (load_excel 호출은 DEFAULT_PATH 그대로여도 동작, 명시하려면 EXCEL_PATH로) # ===================== 레벨 상수 ===================== LEVEL_OVERALL="전체"; LEVEL_SEGMENT="세그먼트"; LEVEL_MODEL="모델" LEVEL_LOYALTY="충성도"; LEVEL_SEG_X_LOY="세그×충성도" LEVEL_SEG_X_MODEL="세그×모델"; LEVEL_MODEL_X_LOY="모델×충성도" LEVEL_MOD_X_SEG_X_LOY="모델×세그×충성도" # === 정규화 === ALL_ALIASES = {"ALL","all","All","", " ", " ", "전체", "NONE","None","none","nan","NaN", None} LVL_ALIASES = { "모델전체×세그×충성도": "모델×세그×충성도", "세그x모델": "세그×모델", "모델x충성도": "모델×충성도", "세그x충성도": "세그×충성도", } def _as_all(v) -> str: s = "ALL" if v is None else str(v).strip() return "ALL" if s in ALL_ALIASES else s def _ensure_key_cols(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() for c in ["analysis_level","segment","model","loyalty"]: if c not in df.columns: df[c] = "ALL" df[c] = ( df[c].astype(str).str.strip() .replace({ "": "ALL", "전체":"ALL", "NONE":"ALL","None":"ALL","none":"ALL", "nan":"ALL","NaN":"ALL", "ALL":"ALL","All":"ALL","all":"ALL" }) .fillna("ALL") ) if "level" not in df.columns: df["level"] = df["analysis_level"] if "analysis_level" in df.columns else "전체" df["level"] = ( df["level"].astype(str).str.strip() .replace({"ALL":"전체","All":"전체","all":"전체"}) .replace(LVL_ALIASES) ) if "analysis_level" in df.columns: df["analysis_level"] = df["analysis_level"].replace(LVL_ALIASES) return df # ---- Store JSON 로더 & 스왑 감지 유틸 ---- def _looks_split_df_json(s: str) -> bool: try: o = json.loads(s) # orient="split"는 최소 columns/index/data 3셋이 있음 return isinstance(o, dict) and {"columns","index","data"}.issubset(set(o.keys())) except Exception: return False def _looks_overall_json(s: str) -> bool: try: o = json.loads(s) return isinstance(o, dict) and any(k in o for k in ("pref_mean","rec_mean","intent_mean","buy_mean")) except Exception: return False def _safe_read_df_split(js: str | dict | None) -> pd.DataFrame: if js is None: return pd.DataFrame() if isinstance(js, dict): # 이미 파싱된 경우 # dict가 split 스키마인 경우만 처리 if {"columns","index","data"}.issubset(set(js.keys())): return pd.read_json(io.StringIO(json.dumps(js)), orient="split") return pd.DataFrame() # str try: return pd.read_json(io.StringIO(js), orient="split") except Exception: return pd.DataFrame() def _safe_read_overall(js: str | dict | None) -> dict: if js is None: return {} if isinstance(js, dict): return js try: o = json.loads(js) return o if isinstance(o, dict) else {} except Exception: return {} def _maybe_swap_sankey_overall(js_sankey, js_overall): """ sankey 캐시와 overall이 뒤바뀌어 들어온 경우 자동 교정. (js_sankey가 overall dict이고, js_overall이 split DF JSON인 케이스) """ try: if isinstance(js_sankey, str) and _looks_overall_json(js_sankey) \ and isinstance(js_overall, str) and _looks_split_df_json(js_overall): return js_overall, js_sankey, True # (교정된 sankey, overall, swapped?) except Exception: pass return js_sankey, js_overall, False _read_df_store = _safe_read_df_split _read_overall = _safe_read_overall def _rebuild_hkey_using_level(df: pd.DataFrame) -> pd.DataFrame: df = _ensure_key_cols(df).copy() if "level" in df.columns and df["level"].notna().any(): pass elif "analysis_level" in df.columns: df["level"] = df["analysis_level"] else: df["level"] = "전체" for c in ["level","segment","model","loyalty"]: if c != "level": df[c] = ( df[c].astype(str).str.strip() .replace({"": "ALL","전체":"ALL","NONE":"ALL","None":"ALL","none":"ALL","nan":"ALL","NaN":"ALL"}) .fillna("ALL") ) df["level"] = df["level"].replace(LVL_ALIASES) df["hierarchy_key"] = df["level"] + "|" + df["segment"] + "|" + df["model"] + "|" + df["loyalty"] return df def sample_col_in_df(df) -> str | None: for c in ["pref_sample_size","sample_size","n","N","base","베이스수","표본수"]: if c in df.columns: return c return None # ==== 비공개 유량 스케일 ==== FLOW_GLOBAL = True GLOBAL_K = 11.3 # ==== 공용: Shape-safe helpers (가짜 키 자동 차단 + 보정) ==== _ALLOWED_SHAPE_KEYS = { "editable","fillcolor","fillrule","label","layer","legend","legendgroup","legendgrouptitle", "legendrank","legendwidth","line","name","opacity","path","showlegend","templateitemname", "type","visible","x0","x1","xanchor","xref","xsizemode","y0","y1","yanchor","yref","ysizemode", } # 여기가 문제의 가짜 키들 _SHIFT_KEYS = ("x0shift", "x1shift", "y0shift", "y1shift") def _line_from_kwargs(kwargs: dict): line = {} if "line_color" in kwargs: line["color"] = kwargs.pop("line_color") if "line_width" in kwargs: line["width"] = kwargs.pop("line_width") if "line_dash" in kwargs: line["dash"] = kwargs.pop("line_dash") return {k: v for k, v in line.items() if v is not None} def _clean_shape_kwargs(kwargs: dict): """ 1) *_shift 키 제거 2) line_* → line 병합 3) 허용 키만 남기기 """ kwargs = dict(kwargs) # shallow copy # 1) 가짜 shift 키 모두 제거 for k in _SHIFT_KEYS: kwargs.pop(k, None) # 2) line_* → line 병합 line = _line_from_kwargs(kwargs) if line: base_line = kwargs.get("line") or {} kwargs["line"] = {**base_line, **line} # 3) 허용 키만 통과 return {k: v for k, v in kwargs.items() if (k in _ALLOWED_SHAPE_KEYS and v is not None)} def add_vline_safe(fig, x, **kwargs): """세로 기준선(가짜 키 차단, line_* 병합)""" base = dict( type="line", xref="x", x0=float(x), x1=float(x), yref="paper", y0=0, y1=1, layer=kwargs.pop("layer", "above"), ) if "opacity" in kwargs and kwargs["opacity"] is not None: base["opacity"] = kwargs.pop("opacity") base.update(_clean_shape_kwargs(kwargs)) return fig.add_shape(**base) def add_hline_safe(fig, y, **kwargs): """가로 기준선(가짜 키 차단, line_* 병합)""" base = dict( type="line", yref="y", y0=float(y), y1=float(y), xref="paper", x0=0, x1=1, layer=kwargs.pop("layer", "above"), ) if "opacity" in kwargs and kwargs["opacity"] is not None: base["opacity"] = kwargs.pop("opacity") base.update(_clean_shape_kwargs(kwargs)) return fig.add_shape(**base) def _pad_top(fig, px=40): # 기존 margin 유지 + top만 늘림 m = fig.layout.margin or {} fig.update_layout(margin=dict( l=int(getattr(m, "l", 10) or 10), r=int(getattr(m, "r", 10) or 10), b=int(getattr(m, "b", 10) or 10), t=int(getattr(m, "t", 0) or 0) + int(px), )) return fig def add_vrect_safe(fig, x0, x1, **kwargs): """ add_vrect 대체: x0shift/x1shift를 값에 반영 후 제거하고, 나머지 키는 안전하게 정리해서 rect shape로 추가. """ # ── shift 보정 ── dx0 = float(kwargs.pop("x0shift", 0) or 0) dx1 = float(kwargs.pop("x1shift", 0) or 0) x0 = float(x0) + dx0 x1 = float(x1) + dx1 # yref 자동 판정(명시가 있으면 존중) yref = kwargs.pop("yref", None) has_y = ("y0" in kwargs) or ("y1" in kwargs) if yref is None: yref = "y" if has_y else "paper" # paper 좌표 기본값 y0_default, y1_default = (0, 1) if yref == "paper" else (None, None) base = dict( type="rect", xref="x", x0=x0, x1=x1, yref=yref, y0=kwargs.pop("y0", y0_default), y1=kwargs.pop("y1", y1_default), layer=kwargs.pop("layer", "below"), fillcolor=kwargs.pop("fillcolor", "rgba(0,0,0,0.06)"), ) if base["yref"] == "y": # 데이터 축이면 None인 y0/y1 제거 if base.get("y0") is None: base.pop("y0", None) if base.get("y1") is None: base.pop("y1", None) if "opacity" in kwargs and kwargs["opacity"] is not None: base["opacity"] = kwargs.pop("opacity") base.update(_clean_shape_kwargs(kwargs)) return fig.add_shape(**base) # (선택) 만약 어딘가에서 layout.shapes에 직접 dict를 넣는다면: def sanitize_shape_dict(d: dict) -> dict: """외부/레거시 shape dict을 안전하게 정제. - x0shift/x1shift/y0shift/y1shift 값을 좌표에 반영하고 키 제거 - line_* 키 병합 - 허용되지 않는 키 삭제 """ d = dict(d or {}) # 1) shift -> 좌표 반영 for sh_key, coord_key in (("x0shift","x0"),("x1shift","x1"),("y0shift","y0"),("y1shift","y1")): if sh_key in d: try: if coord_key in d and d[coord_key] is not None: d[coord_key] = float(d[coord_key]) + float(d.pop(sh_key) or 0.0) else: d.pop(sh_key, None) except Exception: d.pop(sh_key, None) # 2) line_* -> line 병합 line = {} if "line_color" in d: line["color"] = d.pop("line_color") if "line_width" in d: line["width"] = d.pop("line_width") if "line_dash" in d: line["dash"] = d.pop("line_dash") if line: base_line = d.get("line") or {} d["line"] = {**base_line, **{k:v for k,v in line.items() if v is not None}} # 3) 허용 키만 남기기 return {k: v for k, v in d.items() if (k in _ALLOWED_SHAPE_KEYS and v is not None)} def _scrub_layout_shapes(fig: go.Figure) -> go.Figure: """ layout.shapes에 남아있는 비정상 키(x0shift 같은 잔재)를 일괄 제거. """ try: shapes = list(fig.layout.shapes) if fig.layout.shapes is not None else [] cleaned = [] for sh in shapes: try: sd = sh.to_plotly_json() if hasattr(sh, "to_plotly_json") else dict(sh) cleaned.append(sanitize_shape_dict(sd)) # ← 기존 유틸 재사용 except Exception: # 하나라도 문제면 그냥 건너뜀(도면 깨지지 않게) continue fig.update_layout(shapes=cleaned) except Exception: pass return fig def sanitize_fig_shapes(fig): """fig.layout.shapes 전부 sanitize.""" try: shapes = list(fig.layout.shapes) if fig.layout.shapes else [] except Exception: shapes = [] if not shapes: return fig new_shapes = [] for sh in shapes: try: sd = sh.to_plotly_json() if hasattr(sh, "to_plotly_json") else dict(sh) new_shapes.append(sanitize_shape_dict(sd)) except Exception: # 망가진 건 버림 pass fig.update_layout(shapes=new_shapes) return fig # ===================== 팔레트 ===================== COL_RED = "#C32C2C" # 빨강 COL_ORANGE = "#D24D3E" # 주황 COL_YELLOW = "#DE937A" # 노랑 COL_BEIGE = "#D49442" # 베이지 COL_GREEN_LITE = "#2B8E81" # 초록(기본) COL_GREEN_DARK = "#21786E" # 초록 진한톤(필요시) COL_GRAY = "#D3D3D3" def _hex_to_rgb_tuple(h): # 유틸 h = h.lstrip("#") return [int(h[i:i+2], 16) for i in (0,2,4)] def royg_color_for(values: np.ndarray) -> list: v = np.asarray(values, dtype=float) if v.size == 0: return [] if not np.isfinite(v).any(): return [COL_GREEN_DARK] * len(v) lo = np.nanmin(v); hi = np.nanmax(v) t = np.zeros_like(v) if (not np.isfinite(lo) or not np.isfinite(hi) or hi-lo < 1e-12) else (v-lo)/(hi-lo) # 낮은값(좋음) → 높은값(나쁨): 초 → 베 → 노 → 주 → 빨 cols = np.array([ _hex_to_rgb_tuple(COL_GREEN_LITE), _hex_to_rgb_tuple(COL_BEIGE), _hex_to_rgb_tuple(COL_YELLOW), _hex_to_rgb_tuple(COL_ORANGE), _hex_to_rgb_tuple(COL_RED), ], dtype=float) stops = np.array([0.0, 0.25, 0.5, 0.75, 1.0]) r = np.interp(t, stops, cols[:,0]); g = np.interp(t, stops, cols[:,1]); b = np.interp(t, stops, cols[:,2]) out = [] for rr, gg, bb in zip(r,g,b): if not (np.isfinite(rr) and np.isfinite(gg) and np.isfinite(bb)): out.append('rgb(140,140,140)') else: out.append(f'rgb({int(round(rr))},{int(round(gg))},{int(round(bb))})') return out # ==== DESIGN CONSTANTS (tiers & neutrals) ==== COL_BLUE_DEEP = "#1E3A8A" # 진파랑(하이엔드) COL_BLUE_SKY = "#60A5FA" # 하늘(미드) COL_GRAY_MED = "#9CA3AF" # 회색(로우/중립) COL_BLACK = "#111111" # 포레스트 플롯용 # 세그/티어 → 색 매핑 (모든 키는 소문자 기준으로 저장) _SEG_TIER_COLOR = { # High/Premium 계열 "highend": COL_BLUE_DEEP, "high": COL_BLUE_DEEP, "premium": COL_BLUE_DEEP, "하이엔드": COL_BLUE_DEEP, "프리미엄": COL_BLUE_DEEP, # Mid 계열 "midend": COL_BLUE_SKY, "mid": COL_BLUE_SKY, "midrange": COL_BLUE_SKY, "미드": COL_BLUE_SKY, "중간": COL_BLUE_SKY, # Low/Entry 계열 "lowend": COL_GRAY_MED, "low": COL_GRAY_MED, "entry": COL_GRAY_MED, "로우엔드": COL_GRAY_MED, "저가": COL_GRAY_MED, } def _norm_key(x) -> str: return "" if x is None else str(x).strip().lower() def _tier_color_for_segment(seg: str) -> str: """세그 이름을 느슨하게 받아 컬러로 매핑(대소문자/공백/한글 허용).""" return _SEG_TIER_COLOR.get(_norm_key(seg), COL_GRAY_MED) def _model_dominant_segment(df_scope: pd.DataFrame) -> dict: """ 모델별 '표본수 가중' 우세 세그. segment가 ALL/전체인 행은 제외. 반환: {model(str): segment(str)} """ if df_scope is None or df_scope.empty or "model" not in df_scope.columns or "segment" not in df_scope.columns: return {} s = df_scope.copy() # ALL/전체 drop seg_norm = s["segment"].astype(str).str.strip() m_valid = ~seg_norm.isin(["ALL", "전체"]) & seg_norm.notna() s = s[m_valid] if s.empty: return {} w = pd.to_numeric(s.get("pref_sample_size", 1), errors="coerce").replace([np.inf, -np.inf], np.nan).fillna(1.0) s["__w__"] = w grp = s.groupby(["model", "segment"], as_index=False)["__w__"].sum() # 각 모델에서 가중치 최대인 세그 1개 선택 dom = grp.sort_values(["model", "__w__"], ascending=[True, False]).drop_duplicates("model") return {str(r["model"]): str(r["segment"]) for _, r in dom.iterrows()} # ===================== 앱 ===================== app = Dash(__name__) app.title = "Bayesian Journey Dashboard" px.defaults.template = "plotly_white" def _safe_num(x, default=np.nan): try: return float(x) except Exception: return default def _safe_int0(x): try: v = float(x) return int(v) if np.isfinite(v) else 0 except Exception: return 0 def _norm_cols(df: pd.DataFrame) -> pd.DataFrame: if df is None or df.empty: return pd.DataFrame() df = df.copy() df.columns = [str(c).strip() for c in df.columns] for c in df.columns: if df[c].dtype == "O": ser = pd.to_numeric(df[c], errors="coerce") if ser.notna().mean() >= 0.5: df[c] = ser return df def _ci_to_sd(lo, hi): lo = np.asarray(lo, dtype=float); hi = np.asarray(hi, dtype=float) return (hi - lo)/(2*1.96) def _grade_from_p(p): if not np.isfinite(p): return "N/A" if p >= 0.70: return "A" if p >= 0.55: return "B" if p >= 0.45: return "C" return "D" def _auto_dtick(span): # 0~1 퍼센트 축 span 기준 if span <= 0.30: return 0.05 # 5% if span >= 0.80: return 0.20 # 20% return 0.10 # 10% def apply_dense_grid(fig: go.Figure, x_prob: bool = False, y_prob: bool = False) -> go.Figure: # 1) 기존 높이 보존(없을 때만 360 지정) cur_h = getattr(fig.layout, "height", None) fig.update_layout( height=(cur_h if cur_h is not None else 360), showlegend=True, paper_bgcolor="#fff", plot_bgcolor="#fff", font=dict(color="#111"), margin=dict(l=10, r=10, t=30, b=10), ) # 2) 기본 격자 fig.update_xaxes(showline=False, mirror=False, linewidth=0) fig.update_yaxes(showline=False, mirror=False, linewidth=0) # 3) plotly 버전별 minor 옵션 안전 처리 try: fig.update_xaxes(minor=dict(showgrid=False)) fig.update_yaxes(minor=dict(showgrid=False)) except Exception: pass # 4) 확률축(0~1) 포맷 if x_prob: xr = (getattr(fig.layout.xaxis, "range", None) or [0, 1]) span = (xr[1] - xr[0]) if isinstance(xr, (list, tuple)) and len(xr) == 2 else 1.0 fig.update_xaxes(tick0=0, dtick=_auto_dtick(span), tickformat=".0%") if y_prob: yr = (getattr(fig.layout.yaxis, "range", None) or [0, 1]) span = (yr[1] - yr[0]) if isinstance(yr, (list, tuple)) and len(yr) == 2 else 1.0 fig.update_yaxes(tick0=0, dtick=_auto_dtick(span), tickformat=".0%") # 5) 인터랙션 상태 유지 fig.update_layout(uirevision="keep") # 6) 레이아웃 shape 잔재(x0shift 등) 전역 스크럽 try: fig = _scrub_layout_shapes(fig) # sanitize_shape_dict를 내부에서 활용 except Exception: pass return fig # ★ 여기 추가: 모든 shape 정제 try: sanitize_fig_shapes(fig) except Exception: pass return fig # ---- Excel 오픈(엔진 폴백 + 디버그 수집) ---- def _open_excel_with_fallback(path: str): errs = [] for eng in ["openpyxl", None, "xlrd"]: try: xls = pd.ExcelFile(path, engine=eng) if eng else pd.ExcelFile(path) return xls, (eng or "auto") except Exception as e: errs.append(f"{(eng or 'auto')}: {type(e).__name__}::{e}") raise RuntimeError("Excel open failed | " + " | ".join(errs)) def _find_sheet(xls: pd.ExcelFile, candidates): names = xls.sheet_names norm = lambda s: re.sub(r"\s+", "", str(s)).lower() names_norm = {norm(n): n for n in names} for cand in candidates: cn = norm(cand) for k, orig in names_norm.items(): if cn in k: return orig return None def load_excel(path: str): if not os.path.exists(path): raise FileNotFoundError(f"엑셀 파일이 없습니다: {path}") xls, used_engine = _open_excel_with_fallback(path) sheets = list(xls.sheet_names) sh_master = _find_sheet(xls, ["VBA마스터테이블", "마스터", "master", "mastertable", "마스터테이블"]) sh_tm = _find_sheet(xls, ["베이지안전이확률매트릭스", "전이확률", "transition", "matrix"]) sh_sankey = _find_sheet(xls, ["베이지안생키다이어그램", "생키", "sankey", "flow"]) dbg = {"engine": used_engine, "sheets": sheets, "matched": {"master": sh_master, "tm": sh_tm, "sankey": sh_sankey}} if not sh_master: raise ValueError(f"필수 시트(마스터) 미발견 | sheets={sheets}") df_master = _norm_cols(pd.read_excel(xls, sh_master)) df_tm = _norm_cols(pd.read_excel(xls, sh_tm)) if sh_tm else pd.DataFrame() df_sankey = _norm_cols(pd.read_excel(xls, sh_sankey)) if sh_sankey else pd.DataFrame() df_master = _rebuild_hkey_using_level(df_master) if not df_tm.empty: df_tm = _rebuild_hkey_using_level(df_tm) if not df_sankey.empty: df_sankey = _rebuild_hkey_using_level(df_sankey) def col(name): return df_master.get(name, pd.Series(np.nan, index=df_master.index)) overall = { "pref_mean": float(np.nanmean(col("pref_success_rate"))), "rec_mean": float(np.nanmean(col("rec_success_rate"))), "intent_mean": float(np.nanmean(col("intent_success_rate"))), "buy_mean": float(np.nanmean(col("buy_success_rate"))), "pref_sd": float(np.nanmean(_ci_to_sd(col("pref_ci_lower"), col("pref_ci_upper")))), "rec_sd": float(np.nanmean(_ci_to_sd(col("rec_ci_lower"), col("rec_ci_upper")))), "intent_sd": float(np.nanmean(_ci_to_sd(col("intent_ci_lower"), col("intent_ci_upper")))), "buy_sd": float(np.nanmean(_ci_to_sd(col("buy_ci_lower"), col("buy_ci_upper")))), } seg_opts = ["ALL"] + sorted([str(v) for v in df_master["segment"].dropna().unique() if str(v)!="ALL"]) loy_opts = ["ALL"] + sorted([str(v) for v in df_master["loyalty"].dropna().unique() if str(v)!="ALL"]) mod_opts_all = ["ALL"] + sorted([str(v) for v in df_master["model"].dropna().unique() if str(v)!="ALL"]) return df_master, df_tm, df_sankey, overall, seg_opts, mod_opts_all, loy_opts, dbg # ===================== 선택/집계 로직 ===================== def pick_row_for(df_master: pd.DataFrame, seg, mod, loy): seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy) df = _ensure_key_cols(df_master) sort_col = sample_col_in_df(df) if sort_col is None: sort_col = "__tmp_n__"; df[sort_col] = 1 def add_pref_score(sub: pd.DataFrame) -> pd.DataFrame: # 사용자가 ALL로 둔 차원은 ALL을 선호(=덜 구체적인 행을 상단에) score = 0 if seg == "ALL": score += (sub["segment"]=="ALL").astype(int) if mod == "ALL": score += (sub["model"]=="ALL").astype(int) if loy == "ALL": score += (sub["loyalty"]=="ALL").astype(int) sub = sub.copy(); sub["__score__"] = score return sub chosen = (seg!="ALL") + (mod!="ALL") + (loy!="ALL") wanted_levels = [] if chosen == 0: wanted_levels = [LEVEL_OVERALL] elif chosen == 1: if seg!="ALL": wanted_levels = [LEVEL_SEGMENT, LEVEL_OVERALL] if mod!="ALL": wanted_levels = [LEVEL_MODEL, LEVEL_OVERALL] if loy!="ALL": wanted_levels = [LEVEL_LOYALTY, LEVEL_OVERALL] elif chosen == 2: if seg!="ALL" and mod!="ALL": wanted_levels = [LEVEL_SEG_X_MODEL, LEVEL_SEGMENT, LEVEL_MODEL, LEVEL_OVERALL] elif seg!="ALL" and loy!="ALL": wanted_levels = [LEVEL_SEG_X_LOY, LEVEL_SEGMENT, LEVEL_LOYALTY, LEVEL_OVERALL] elif mod!="ALL" and loy!="ALL": wanted_levels = [LEVEL_MODEL_X_LOY, LEVEL_MODEL, LEVEL_LOYALTY, LEVEL_OVERALL] else: wanted_levels = [ LEVEL_MOD_X_SEG_X_LOY, LEVEL_SEG_X_LOY, LEVEL_SEG_X_MODEL, LEVEL_MODEL_X_LOY, LEVEL_MODEL, LEVEL_SEGMENT, LEVEL_LOYALTY, LEVEL_OVERALL ] # 1) 레벨 우선 매칭 for lvl in wanted_levels: sub = df[df["level"] == lvl] if seg!="ALL": sub = sub[sub["segment"] == seg] if mod!="ALL": sub = sub[sub["model"] == mod] if loy!="ALL": sub = sub[sub["loyalty"] == loy] if not sub.empty: sub = add_pref_score(sub).sort_values(["__score__", sort_col], ascending=[False, False]) row = sub.iloc[0] return row.drop(labels=[c for c in ["__score__","__tmp_n__"] if c in row.index]) # 2) 정확 조합 실패 시, 부분조합 매칭 sub = df.copy() if seg!="ALL": sub = sub[sub["segment"] == seg] if mod!="ALL": sub = sub[sub["model"] == mod] if loy!="ALL": sub = sub[sub["loyalty"] == loy] if not sub.empty: sub = add_pref_score(sub).sort_values(["__score__", sort_col], ascending=[False, False]) row = sub.iloc[0] return row.drop(labels=[c for c in ["__score__","__tmp_n__"] if c in row.index]) # 3) 단일 컬럼만 맞는 행이라도 for col, val in [("segment", seg), ("model", mod), ("loyalty", loy)]: if val != "ALL": sub = df[df[col]==val] if not sub.empty: sub = add_pref_score(sub).sort_values(["__score__", sort_col], ascending=[False, False]) row = sub.iloc[0] return row.drop(labels=[c for c in ["__score__","__tmp_n__"] if c in row.index]) # 4) 완전 실패 시 표본수 최대 row = df.sort_values(sort_col, ascending=False).iloc[0] return row.drop(labels=[c for c in ["__score__","__tmp_n__"] if c in row.index]) # ===================== 차트/표 유틸 ===================== def _pick_sample_for_stage(r, stage_prefix: str) -> int: for c in [f"{stage_prefix}_sample_size", "sample_size", "n", "N", "base", "베이스수", "표본수"]: if c in r and pd.notna(r.get(c)): return _safe_int0(r.get(c)) return _safe_int0(r.get("pref_sample_size")) def metrics_table_row(r): def sd_from_ci(lo, hi): if pd.isna(lo) or pd.isna(hi): return np.nan return (hi - lo)/(2*1.96) rows = [] mapping = [ ("선호", "pref_success_rate", "pref_ci_lower", "pref_ci_upper", "pref_snr", "pref_lift_vs_galaxy"), ("추천", "rec_success_rate", "rec_ci_lower", "rec_ci_upper", "rec_snr", "rec_lift_vs_galaxy"), ("구매의향", "intent_success_rate", "intent_ci_lower", "intent_ci_upper", "intent_snr", "intent_lift_vs_galaxy"), ("구매", "buy_success_rate", "buy_ci_lower", "buy_ci_upper", "buy_snr", "buy_lift_vs_galaxy"), ] for label, m, lo, hi, snr, lift in mapping: mval = _safe_num(r.get(m)) loval = _safe_num(r.get(lo)) hival = _safe_num(r.get(hi)) snrval = _safe_num(r.get(snr)) liftval= _safe_num(r.get(lift)) stage_prefix = m.split("_")[0] rows.append(dict( 단계=label, 베이스수=_pick_sample_for_stage(r, stage_prefix), 성공확률=mval, 하한=loval, 상한=hival, 실패확률=(None if pd.isna(mval) else 1-mval), 판정=("성공" if (np.isfinite(mval) and mval>=0.5) else ("실패" if np.isfinite(mval) else "N/A")), 평가등급=("N/A" if not np.isfinite(mval) else ("A" if mval>=0.70 else "B" if mval>=0.55 else "C" if mval>=0.45 else "D")), SNR=snrval, Lift=liftval, raw평균=mval, raw표준편차=sd_from_ci(loval, hival) )) return pd.DataFrame(rows) def drops_from_anywhere(row, df_tm, seg, mod, loy): seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy) d1 = _safe_num(row.get("bayesian_dropout_pref_to_rec")) d2 = _safe_num(row.get("bayesian_dropout_rec_to_intent")) d3 = _safe_num(row.get("bayesian_dropout_intent_to_buy")) full = _safe_num(row.get("bayesian_full_conversion")) if df_tm is None or df_tm.empty: return d1, d2, d3, full need = [np.isfinite(d1), np.isfinite(d2), np.isfinite(d3), np.isfinite(full)] if all(need): return d1, d2, d3, full m = pd.Series(True, index=df_tm.index) if "segment" in df_tm and seg!="ALL": m &= (df_tm["segment"].astype(str)==seg) if "model" in df_tm and mod!="ALL": m &= (df_tm["model"].astype(str)==mod) if "loyalty" in df_tm and loy!="ALL": m &= (df_tm["loyalty"].astype(str)==loy) sub = df_tm[m].copy() if sub.empty: sub = df_tm.copy() w = pd.to_numeric(sub.get("pref_sample_size", pd.Series(1, index=sub.index)), errors="coerce").fillna(1) def wmean(col): v = pd.to_numeric(sub.get(col, pd.Series(np.nan, index=sub.index)), errors="coerce") if v.notna().any(): return float(np.nansum(v*w)/np.nansum(w)) return np.nan d1 = d1 if np.isfinite(d1) else wmean("bayesian_dropout_pref_to_rec") d2 = d2 if np.isfinite(d2) else wmean("bayesian_dropout_rec_to_intent") d3 = d3 if np.isfinite(d3) else wmean("bayesian_dropout_intent_to_buy") full = full if np.isfinite(full) else wmean("bayesian_full_conversion") return d1, d2, d3, full def biggest_drop_text_by_sources(row, df_tm, seg, mod, loy): d1, d2, d3, _ = drops_from_anywhere(row, df_tm, seg, mod, loy) pairs = [("선호→추천", d1), ("추천→구매의향", d2), ("구매의향→구매", d3)] pairs = [(n, v) for n, v in pairs if np.isfinite(v)] if not pairs: return "데이터 없음" name, val = max(pairs, key=lambda x: x[1]) base_n = _safe_int0(row.get("pref_sample_size")) return f"{name}에서 {val*100:.1f}%p 손실 (샘플 {base_n:,})" def compose_composite_row(df_scope: pd.DataFrame) -> pd.Series: if df_scope is None or df_scope.empty: return pd.Series(dtype=float) s = df_scope.copy() w = pd.to_numeric(s.get("pref_sample_size", pd.Series(1, index=s.index)), errors="coerce").fillna(1.0) w_sum = float(np.nansum(w)) if np.isfinite(np.nansum(w)) and np.nansum(w) > 0 else 1.0 w_norm = w / w_sum def wmean(col): v = pd.to_numeric(s.get(col, pd.Series(np.nan, index=s.index)), errors="coerce") if v.notna().any(): return float(np.nansum(v * w_norm)) return np.nan def combine_ci(lo_col, hi_col, mean_col): m = pd.to_numeric(s.get(mean_col, pd.Series(np.nan, index=s.index)), errors="coerce") lo = pd.to_numeric(s.get(lo_col, pd.Series(np.nan, index=s.index)), errors="coerce") hi = pd.to_numeric(s.get(hi_col, pd.Series(np.nan, index=s.index)), errors="coerce") if not (m.notna().any() and lo.notna().any() and hi.notna().any()): return np.nan, np.nan m_bar = float(np.nansum(m * w_norm)) sd = (hi - lo) / (2 * 1.96) sd = pd.to_numeric(sd, errors="coerce") var = np.nansum(w_norm * (sd**2 + (m - m_bar)**2)) sd_c = float(np.sqrt(var)) if np.isfinite(var) else np.nan if not np.isfinite(sd_c): return np.nan, np.nan return (m_bar - 1.96 * sd_c), (m_bar + 1.96 * sd_c) pref_m = wmean("pref_success_rate") rec_m = wmean("rec_success_rate") intent_m = wmean("intent_success_rate") buy_m = wmean("buy_success_rate") pref_lo, pref_hi = combine_ci("pref_ci_lower", "pref_ci_upper", "pref_success_rate") rec_lo, rec_hi = combine_ci("rec_ci_lower", "rec_ci_upper", "rec_success_rate") intent_lo, intent_hi = combine_ci("intent_ci_lower", "intent_ci_upper", "intent_success_rate") buy_lo, buy_hi = combine_ci("buy_ci_lower", "buy_ci_upper", "buy_success_rate") d1 = wmean("bayesian_dropout_pref_to_rec") d2 = wmean("bayesian_dropout_rec_to_intent") d3 = wmean("bayesian_dropout_intent_to_buy") full = wmean("bayesian_full_conversion") pref_snr = wmean("pref_snr"); rec_snr = wmean("rec_snr") intent_snr = wmean("intent_snr"); buy_snr = wmean("buy_snr") pref_lift = wmean("pref_lift_vs_galaxy"); rec_lift = wmean("rec_lift_vs_galaxy") intent_lift = wmean("intent_lift_vs_galaxy"); buy_lift = wmean("buy_lift_vs_galaxy") out = { "pref_sample_size": float(np.nansum(w)), "pref_success_rate": pref_m, "pref_ci_lower": pref_lo, "pref_ci_upper": pref_hi, "rec_success_rate": rec_m, "rec_ci_lower": rec_lo, "rec_ci_upper": rec_hi, "intent_success_rate": intent_m, "intent_ci_lower": intent_lo, "intent_ci_upper": intent_hi, "buy_success_rate": buy_m, "buy_ci_lower": buy_lo, "buy_ci_upper": buy_hi, "bayesian_dropout_pref_to_rec": d1, "bayesian_dropout_rec_to_intent": d2, "bayesian_dropout_intent_to_buy": d3, "bayesian_full_conversion": full, "pref_snr": pref_snr, "rec_snr": rec_snr, "intent_snr": intent_snr, "buy_snr": buy_snr, "pref_lift_vs_galaxy": pref_lift, "rec_lift_vs_galaxy": rec_lift, "intent_lift_vs_galaxy": intent_lift, "buy_lift_vs_galaxy": buy_lift, } return pd.Series(out) # ===================== 차트 ===================== def _empty_fig(msg="Load data first", height=360, hide_axes=False): fig = go.Figure() fig.add_annotation(text=msg, x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False) fig.update_layout( height=height, margin=dict(l=10, r=10, t=30, b=10), paper_bgcolor="#ffffff", plot_bgcolor="#ffffff", uirevision="keep", ) fig = apply_dense_grid(fig) # 기존 스타일 유지 if hide_axes: # Sankey 등 카테시안 축이 불필요한 경우 fig.update_xaxes(visible=False, showgrid=False, zeroline=False) fig.update_yaxes(visible=False, showgrid=False, zeroline=False) return fig def hex_to_rgba(hex_color: str, a: float | None = None) -> str: s = hex_color.strip().lstrip("#") if len(s) in (3, 4): s = "".join(ch * 2 for ch in s) if len(s) == 6: r = int(s[0:2], 16); g = int(s[2:4], 16); b = int(s[4:6], 16) alpha = 1.0 if a is None else float(a) elif len(s) == 8: r = int(s[0:2], 16); g = int(s[2:4], 16); b = int(s[4:6], 16) hex_alpha = int(s[6:8], 16) / 255.0 alpha = hex_alpha if a is None else float(a) else: raise ValueError("hex must be #RGB, #RRGGBB, or #RRGGBBAA") alpha = max(0.0, min(1.0, alpha)) return f"rgba({r},{g},{b},{alpha:.3g})" def _normalize_stage_label(v: str) -> str | None: if v is None: return None s = str(v).strip().lower() s = re.sub(r'[\s\-\_]+', ' ', s) # 공백/-,_ 정리 joined = s.replace(' ', '') # 전체 if any(k in (s, joined) for k in [ "overall","total","all","전체","전체사용자","모든사용자","allusers","all user","all-user" ]): return "전체" # 미선호(비선호/탈락/드랍/No preference 등) if any(k in (s, joined) for k in [ "미선호","비선호","선호아님","선호 아님", "nopref","no preference","dislike","탈락","drop","dropped" ]): return "미선호" # 구매의향(의향/의도/의사/intent 계열) if ("의향" in s) or ("의도" in s) or ("의사" in s) \ or ("intent" in s) or ("intention" in s) \ or ("purchaseintent" in joined) or ("purchase-intent" in s): return "구매의향" # 구매(실제구매/구매완료/구매확정/구입/결제/매출/buy/purchase) if ("구매" in s) or ("구입" in s) or ("결제" in s) or ("결재" in s) or ("매출" in s) \ or (s == "buy") or ("purchase" in s): return "구매" # 선호 if ("선호" in s) or ("호감" in s) or ("preference" in s) or (s == "pref"): return "선호" # 추천 if (s == "rec") or ("recommend" in s) or ("추천" in s): return "추천" return None # ==== STAGES & ORDER (기존 것을 교체) ==== STAGES = ["전체", "미선호", "선호", "추천", "구매의향", "구매"] ORDER = {v:i for i,v in enumerate(STAGES)} # 색상 하나 추가(은은한 회색 계열 권장) COL_STAGE_DROP = "#CBD5E1" # 미선호 def _group_forward_flows(df_sankey, seg, mod, loy): if df_sankey is None or df_sankey.empty: return pd.DataFrame(columns=["from_stage","to_stage","count","flow_phi"]) seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy) s = df_sankey.copy() m = pd.Series(True, index=s.index) if "segment" in s and seg!="ALL": m &= (s["segment"].astype(str)==seg) if "model" in s and mod!="ALL": m &= (s["model"].astype(str)==mod) if "loyalty" in s and loy!="ALL": m &= (s["loyalty"].astype(str)==loy) s = s[m].copy() if s.empty: return pd.DataFrame(columns=["from_stage","to_stage","count","flow_phi"]) alias = { "all":"전체","ALL":"전체","전체":"전체", "pref":"선호","preference":"선호","선호도":"선호", "rec":"추천","recommend":"추천","추천도":"추천", "intent":"구매의향","intention":"구매의향","구매의도":"구매의향", "purchase":"구매","buy":"구매","실제구매":"구매" } s["from_stage"] = s.get("from_stage", s.get("from", s.get("source"))).astype(str).str.strip().replace(alias) s["to_stage"] = s.get("to_stage", s.get("to", s.get("target"))).astype(str).str.strip().replace(alias) # 🔑 count 별칭 허용 cnt_cands = ["bayesian_flow_count","count","value","weight","n","freq"] cnt_col = next((c for c in cnt_cands if c in s.columns), None) if cnt_col is None: return pd.DataFrame(columns=["from_stage","to_stage","count","flow_phi"]) s[cnt_col] = pd.to_numeric(s[cnt_col], errors="coerce") s = s[np.isfinite(s[cnt_col]) & (s[cnt_col]>0)] s = s[s["from_stage"].isin(STAGES) & s["to_stage"].isin(STAGES)] s = s[s.apply(lambda r: ORDER[r["from_stage"]] < ORDER[r["to_stage"]], axis=1)] if s.empty: return pd.DataFrame(columns=["from_stage","to_stage","count","flow_phi"]) g = (s.groupby(["from_stage","to_stage"], as_index=False)[cnt_col] .sum().rename(columns={cnt_col:"count"})) # [유입 없는 단계 보강] 전체→단계 링크 자동 추가 pairs = set(zip(g["from_stage"], g["to_stage"])) def _has_incoming(stage): k = ORDER[stage] return any((prev, stage) in pairs for prev in STAGES[:k]) add_rows = [] for st in STAGES[1:]: if not _has_incoming(st): out_sum = float(g.loc[g["from_stage"] == st, "count"].sum()) if out_sum > 0: add_rows.append({"from_stage": "전체", "to_stage": st, "count": out_sum}) if add_rows: g = pd.concat([g, pd.DataFrame(add_rows)], ignore_index=True) # φ 스케일 적용 k = _flow_scale(seg, mod, loy) g["flow_phi"] = g["count"].astype(float) * k return g # ===== Sankey 내부용 테이블 빌더(간접 포함, 구매로 접기 옵션) ===== # 노드(베이지) & 링크(회색) 팔레트 COL_STAGE_OVERALL = "#B68E5C" # 전체 COL_STAGE_PREF = "#C6955E" # 선호 COL_STAGE_REC = "#D5A86D" # 추천 COL_STAGE_INTENT = "#BE8F4E" # 의향 COL_STAGE_BUY = "#A97F45" # 구매 COL_LINK_DIRECT = "#4B5563" # 직접(짙은 회색) COL_LINK_INDIRECT = "#D1D5DB" # 간접(연한 회색) def _sankey_build_table(df_sankey, seg="ALL", mod="ALL", loy="ALL", collapse_to_buy=True, collapse_from=("선호","추천","구매의향")) -> pd.DataFrame: if df_sankey is None or df_sankey.empty: return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"]) s = df_sankey.copy() # --- [NEW] 호환 가드: 열 별칭을 표준 이름으로 통일 --- # 1) from/to 별칭 → from_stage/to_stage from_col = next((c for c in ["from_stage","from","source","src"] if c in s.columns), None) to_col = next((c for c in ["to_stage","to","target","dst"] if c in s.columns), None) if from_col and from_col != "from_stage": s = s.rename(columns={from_col: "from_stage"}) if to_col and to_col != "to_stage": s = s.rename(columns={to_col: "to_stage"}) # 필수 열 없으면 빈 테이블 반환 (안전 가드) if "from_stage" not in s.columns or "to_stage" not in s.columns: return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"]) # 2) 수치 열 별칭 → bayesian_flow_count alt_cnt = next((c for c in ["bayesian_flow_count","count","value","flow","weight","n","freq"] if c in s.columns), None) if alt_cnt and alt_cnt != "bayesian_flow_count": s = s.rename(columns={alt_cnt: "bayesian_flow_count"}) # 필터 for col, val in (("segment", seg), ("model", mod), ("loyalty", loy)): if col in s.columns and str(val) != "ALL": s = s[s[col].astype(str) == str(val)] if s.empty: return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"]) # 라벨 정규화 → 순방향만 s["from_stage"] = s.get("from_stage", s.get("from", s.get("source"))).map(_normalize_stage_label) s["to_stage"] = s.get("to_stage", s.get("to", s.get("target"))).map(_normalize_stage_label) s = s.dropna(subset=["from_stage","to_stage"]) s = s[s["from_stage"].isin(STAGES) & s["to_stage"].isin(STAGES)] s = s[s.apply(lambda r: ORDER[r["from_stage"]] < ORDER[r["to_stage"]], axis=1)] # 🔑 count 컬럼 별칭 허용 (원천 시트/캐시 시트 모두 커버) cnt_cands = ["bayesian_flow_count", "count", "value", "weight", "n", "freq"] cnt_col = next((c for c in cnt_cands if c in s.columns), None) if cnt_col is None: return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"]) s[cnt_col] = pd.to_numeric(s[cnt_col], errors="coerce") s = s[np.isfinite(s[cnt_col]) & (s[cnt_col] > 0)] if s.empty: return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"]) # 기본 집계 g = (s.groupby(["from_stage","to_stage"], as_index=False)[cnt_col] .sum().rename(columns={cnt_col:"count"})) # 유입 없는 단계 보강(전체→단계) pairs = set(zip(g["from_stage"], g["to_stage"])) def _has_incoming(stage): k = ORDER[stage] return any((prev, stage) in pairs for prev in STAGES[:k]) add_rows = [] for st in STAGES[1:]: if not _has_incoming(st): out_sum = float(g.loc[g["from_stage"]==st, "count"].sum()) if out_sum > 0: add_rows.append({"from_stage":"전체","to_stage":st,"count":out_sum}) if add_rows: g = pd.concat([g, pd.DataFrame(add_rows)], ignore_index=True) # (옵션) 구매로 접은 간접 링크 추가: 선호/추천/구매의향 → 구매 if collapse_to_buy: buy_in = float(pd.to_numeric(g.loc[g["to_stage"]=="구매","count"], errors="coerce").fillna(0).sum()) if buy_in > 0: exist = set(zip(g["from_stage"], g["to_stage"])) extra = [] for st in collapse_from: if st in ORDER and (st, "구매") not in exist and ORDER[st] < ORDER["구매"]: extra.append({"from_stage": st, "to_stage": "구매", "count": buy_in}) if extra: g = pd.concat([g, pd.DataFrame(extra)], ignore_index=True) # 메타 칼럼 kphi = _flow_scale(seg, mod, loy) # 비공개 스케일 g["flow_phi"] = g["count"].astype(float) * kphi g["dist"] = g["to_stage"].map(ORDER) - g["from_stage"].map(ORDER) g["kind"] = np.where(g["dist"]==1, "직접", "간접") g["to_buy"] = (g["to_stage"] == "구매") cols = ["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"] return g[cols].sort_values(["dist","from_stage","to_stage"]).reset_index(drop=True) # ====== Sankey 색/스테이지 ====== STAGES = ["전체","미선호","선호","추천","구매의향","구매"] ORDER = {v:i for i,v in enumerate(STAGES)} COL_STAGE_OVERALL = "#B68E5C" COL_STAGE_NONPREF = "#9CA3AF" # ← 미선호(회색) COL_STAGE_PREF = "#C6955E" COL_STAGE_REC = "#D5A86D" COL_STAGE_INTENT = "#BE8F4E" COL_STAGE_BUY = "#A97F45" COL_LINK_DIRECT = "#4B5563" # 짙은 회색 (직접) COL_LINK_INDIRECT = "#D1D5DB" # 연한 회색 (간접) # ───────────────────────────────────────────────────────── # (유틸) 간접 "→구매" 접기 보강 def add_collapsed_to_buy(tbl: pd.DataFrame, add_from=("선호","추천","구매의향")) -> pd.DataFrame: if tbl is None or tbl.empty: return tbl # ── 기준 단계/순서(미선호 포함 6단계) stages = ["전체","미선호","선호","추천","구매의향","구매"] order = {v:i for i,v in enumerate(stages)} t = tbl.copy() # ── 구매 유입 총량 buy_in = float(pd.to_numeric(t.loc[t["to_stage"]=="구매","count"], errors="coerce").fillna(0).sum()) # ── φ 스케일(k) 추정 kphi = 1.0 if "flow_phi" in t.columns and "count" in t.columns: r = pd.to_numeric(t["flow_phi"], errors="coerce") / pd.to_numeric(t["count"], errors="coerce") r = r.replace([np.inf,-np.inf], np.nan).dropna() if not r.empty: kphi = float(np.median(r)) # ── 그룹 메타(snapshot): 단일값이면 그 값, 아니면 "ALL" meta_cols = [c for c in ["segment","model","loyalty","level"] if c in t.columns] meta = {c: (t[c].dropna().iloc[0] if t[c].nunique(dropna=True)==1 else "ALL") for c in meta_cols} extra = [] for s in add_from: if s not in order or order[s] >= order["구매"]: continue # 이미 존재하면 중복 추가 금지 if ((t["from_stage"]==s) & (t["to_stage"]=="구매")).any(): continue row = { "from_stage": s, "to_stage": "구매", "count": buy_in, "dist": order["구매"] - order[s], "kind": ("간접" if (order["구매"] - order[s]) > 1 else "직접"), "to_buy": True, "flow_phi": buy_in * kphi } # ★ 메타 동봉 for c, v in meta.items(): row[c] = v extra.append(row) if extra: t = pd.concat([t, pd.DataFrame(extra)], ignore_index=True) return t.sort_values(["dist","from_stage","to_stage"]).reset_index(drop=True) # ───────────────────────────────────────────────────────── # ⬇⬇ 핵심 수정: 라벨을 먼저 느슨한 별칭으로 치환 후, 정규화 함수에 태움 def _normalize_stage_soft(series: pd.Series) -> pd.Series: if series.empty: return series s = series.astype(str).str.strip() # 1) 강제 별칭(정확치환) — 의향/의도/의사/intent, 구매완료/실제구매, 전체사용자 등 alias_exact = { # 전체 "전체사용자": "전체", "모든 사용자": "전체", "all": "전체", "ALL": "전체", # 선호 "선호도": "선호", "선호도높음": "선호", "호감도": "선호", "호감도높음": "선호", # 추천 "추천도": "추천", "추천도높음": "추천", # 의향/의도/의사/intent (다양형) "구매의향": "구매의향", "구매 의향": "구매의향", "구매의향높음": "구매의향", "구매의향 높음": "구매의향", "구매의도": "구매의향", "구매 의도": "구매의향", "구매의도높음": "구매의향", "구매의도 높음": "구매의향", "구매의사": "구매의향", "의사 있음": "구매의향", "intent": "구매의향", "Intent": "구매의향", "Intention": "구매의향", "Purchase Intent": "구매의향", "PURCHASE_INTENT": "구매의향", # 구매 "실제구매": "구매", "구매 확정": "구매", "구매확정": "구매", "구매 완료": "구매", "구매완료": "구매", "결제": "구매", "결재": "구매", "매출": "구매", #미선호 "미선호": "미선호", "비선호": "미선호", "선호 아님": "미선호", "탈락": "미선호", } s = s.replace(alias_exact) # 2) 토큰/부분일치 기반 정규화(전역 함수가 있으면 재사용) def _norm_one(x: str) -> str | None: try: return _normalize_stage_label(x) # 전역 정의 존재 시 활용 except Exception: pass # 폴백: 부분일치 xl = x.lower().replace(" ", "") if any(k in xl for k in ["all","전체"]): return "전체" if any(k in xl for k in ["선호","호감"]): return "선호" if "추천" in xl or "rec" in xl: return "추천" if any(k in xl for k in ["의향","의도","의사","intent"]): return "구매의향" if any(k in xl for k in ["구매","구입","결제","결재","완료","확정","매출","purch","buy"]): return "구매" if any(k in xl for k in ["미선호","비선호","선호아님","nopref","npreference","탈락","drop"]): return "미선호" return None return s.map(_norm_one) # 파일 상단 어딘가(상수들 근처)에 추가 LVL_PRIORITY = [ "모델×세그×충성도","세그×모델","모델×충성도","세그×충성도", "모델","세그먼트","충성도","전체" ] def _sanitize_sankey_table( tbl: pd.DataFrame, seg="ALL", mod="ALL", loy="ALL", enforce_single_level: bool = True, drop_overall_if_mixed: bool = True ) -> pd.DataFrame: cols = ["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"] if tbl is None or tbl.empty: return pd.DataFrame(columns=cols) t = tbl.copy() # (1) 선택값 필터 (있을 때만) for col, val in (("segment", seg), ("model", mod), ("loyalty", loy)): if col in t.columns and str(val) != "ALL": t = t[t[col].astype(str).str.strip() == str(val)] if t.empty: return pd.DataFrame(columns=cols) # (2) 레벨 단일화 (혼입 방지) + 과잉 드랍 완화 original = t if enforce_single_level and "level" in t.columns: picked = None for lv in LVL_PRIORITY: cand = t[t["level"].astype(str) == lv] if not cand.empty: picked = cand; break if picked is not None: t = picked # 혼합이면 '전체'만 제거 (단, 전부 비면 되돌림) if ("level" in t.columns) and (t["level"].astype(str).nunique() > 1) and drop_overall_if_mixed: t2 = t[t["level"].astype(str) != "전체"] if not t2.empty: t = t2 if t.empty: # 과잉 필터/드랍으로 비었으면 원본으로 되돌려 계속 t = original.copy() # (3) 컬럼 별칭 alias = { "from_stage": ["from_stage","from","source","src"], "to_stage": ["to_stage","to","target","dst"], "count": ["count","bayesian_flow_count","flow","value","weight","n","freq"], } def pick(name): keys = {str(c).strip().lower(): c for c in t.columns} for a in alias[name]: if a in keys: return keys[a] return None c_from = pick("from_stage"); c_to = pick("to_stage"); c_cnt = pick("count") if not all([c_from, c_to, c_cnt]): return pd.DataFrame(columns=cols) t = t.rename(columns={c_from:"from_stage", c_to:"to_stage", c_cnt:"count"}) # (4) 라벨 정규화 + 순방향만 t["from_stage"] = _normalize_stage_soft(t["from_stage"]) t["to_stage"] = _normalize_stage_soft(t["to_stage"]) t = t.dropna(subset=["from_stage","to_stage"]) t = t[t["from_stage"].isin(STAGES) & t["to_stage"].isin(STAGES)] t = t[t.apply(lambda r: ORDER[r["from_stage"]] < ORDER[r["to_stage"]], axis=1)] # (5) 수치 변환 t["count"] = pd.to_numeric(t["count"], errors="coerce") t = t[np.isfinite(t["count"]) & (t["count"] > 0)] # (5-보강) 과도 필터로 비면 완화 모드: 단계 조건만 적용하고 수치만 보정 if t.empty: t = original.rename(columns={c_from:"from_stage", c_to:"to_stage", c_cnt:"count"}).copy() t["from_stage"] = _normalize_stage_soft(t["from_stage"]) t["to_stage"] = _normalize_stage_soft(t["to_stage"]) t = t.dropna(subset=["from_stage","to_stage"]) t = t[t["from_stage"].isin(STAGES) & t["to_stage"].isin(STAGES)] t["count"] = pd.to_numeric(t["count"], errors="coerce").fillna(0) t = t[t["count"] > 0] if t.empty: return pd.DataFrame(columns=cols) # (6) 메타 보강 t["dist"] = (t["to_stage"].map(ORDER) - t["from_stage"].map(ORDER)).astype(int) if "kind" not in t.columns: t["kind"] = np.where(t["dist"]==1, "직접", "간접") else: miss = ~t["kind"].astype(str).isin(["직접","간접"]) t.loc[miss,"kind"] = np.where(t.loc[miss,"dist"]==1, "직접","간접") t["to_buy"] = (t["to_stage"]=="구매") # (7) φ kphi = _flow_scale(seg, mod, loy) if "flow_phi" not in t.columns: t["flow_phi"] = t["count"].astype(float) * kphi else: t["flow_phi"] = pd.to_numeric(t["flow_phi"], errors="coerce") miss = ~np.isfinite(t["flow_phi"]) t.loc[miss, "flow_phi"] = t.loc[miss, "count"].astype(float) * kphi return t[cols].sort_values(["dist","from_stage","to_stage"]).reset_index(drop=True) def _sankey_from_master_row(row: pd.Series, seg, mod, loy) -> pd.DataFrame: n = _safe_int0(row.get("pref_sample_size")) if n <= 0: return pd.DataFrame(columns=[ "from_stage","to_stage","count","dist","kind","to_buy","flow_phi", "segment","model","loyalty" ]) def P(x): v = _safe_num(x) if not np.isfinite(v): return np.nan return v/100.0 if v > 1.5 else v # (A) 확률 안전화: NaN이면 0, 0~1로 클립 def P01(x): v = P(x) return np.nan if not np.isfinite(v) else float(min(1.0, max(0.0, v))) p_pref = P(row.get("pref_success_rate")) p_rec = P(row.get("rec_success_rate")) p_intent = P(row.get("intent_success_rate")) p_buy = P(row.get("buy_success_rate")) d1 = P(row.get("bayesian_dropout_pref_to_rec")) d2 = P(row.get("bayesian_dropout_rec_to_intent")) d3 = P(row.get("bayesian_dropout_intent_to_buy")) pref = n * (p_pref if np.isfinite(p_pref) else 0.0) rec = pref * (1 - d1) if np.isfinite(pref) and np.isfinite(d1) else n * (p_rec if np.isfinite(p_rec) else 0.0) intent = rec * (1 - d2) if np.isfinite(rec) and np.isfinite(d2) else n * (p_intent if np.isfinite(p_intent) else 0.0) buy = intent*(1 - d3) if np.isfinite(intent) and np.isfinite(d3) else n * (p_buy if np.isfinite(p_buy) else 0.0) drop0 = max(0.0, float(n) - float(pref)) rows = [ {"from_stage":"전체","to_stage":"미선호", "count": drop0}, {"from_stage":"전체","to_stage":"선호", "count": pref}, {"from_stage":"선호","to_stage":"추천", "count":max(0.0, rec)}, {"from_stage":"추천","to_stage":"구매의향", "count":max(0.0, intent)}, {"from_stage":"구매의향","to_stage":"구매", "count":max(0.0, buy)}, ] g = pd.DataFrame(rows).dropna() g["count"] = pd.to_numeric(g["count"], errors="coerce").fillna(0) g = g[g["count"] > 0] g["dist"] = g["to_stage"].map(ORDER) - g["from_stage"].map(ORDER) g["kind"] = np.where(g["dist"]==1, "직접", "간접") g["to_buy"] = (g["to_stage"]=="구매") kphi = _flow_scale(seg, mod, loy) g["flow_phi"] = g["count"].astype(float) * kphi g["segment"] = seg; g["model"] = mod; g["loyalty"] = loy return g[[ "from_stage","to_stage","count","dist","kind","to_buy","flow_phi", "segment","model","loyalty" ]] LEVELS_FOR_SANKEY = [ ("전체", []), ("세그먼트", ["segment"]), ("모델", ["model"]), ("충성도", ["loyalty"]), ("세그×모델", ["segment","model"]), ("세그×충성도", ["segment","loyalty"]), ("모델×충성도", ["model","loyalty"]), ("모델×세그×충성도", ["segment","model","loyalty"]), ] def build_sankey_cache_from_master(df_master: pd.DataFrame, collapse_to_buy=True, collapse_from=("선호","추천","구매의향")) -> pd.DataFrame: dfm = _ensure_key_cols(df_master).copy() out = [] for _lvl, keys in LEVELS_FOR_SANKEY: if not keys: seg, mod, loy = "ALL","ALL","ALL" row = compose_composite_row(dfm) if not row.empty: part = _sankey_from_master_row(row, seg, mod, loy) part["level"] = _lvl out.append(part) continue for vals, grp in dfm.groupby(keys, dropna=False): if not isinstance(vals, tuple): vals = (vals,) seg = vals[keys.index("segment")] if "segment" in keys else "ALL" mod = vals[keys.index("model")] if "model" in keys else "ALL" loy = vals[keys.index("loyalty")] if "loyalty" in keys else "ALL" row = compose_composite_row(grp) if row.empty: continue part = _sankey_from_master_row(row, seg, mod, loy) part["level"] = _lvl out.append(part) if not out: return pd.DataFrame(columns=[ "from_stage","to_stage","count","dist","kind","to_buy","flow_phi", "segment","model","loyalty","level" ]) full = pd.concat(out, ignore_index=True) if collapse_to_buy and not full.empty: full = (full.groupby(["level","segment","model","loyalty"], group_keys=False) .apply(lambda g: add_collapsed_to_buy(g, add_from=collapse_from)) .reset_index(drop=True)) return full def build_sankey_flow_table( df_or_tbl: pd.DataFrame | None, seg="ALL", mod="ALL", loy="ALL", collapse_to_buy=True, collapse_from=("선호","추천","구매의향") ): if df_or_tbl is None or df_or_tbl.empty: return pd.DataFrame(columns=["from_stage","to_stage","count","dist","kind","to_buy","flow_phi"]) s = df_or_tbl.copy() low = {str(c).strip().lower(): c for c in s.columns} looks_table = (("from_stage" in low and "to_stage" in low) and (("count" in low) or ("flow_phi" in low) or ("bayesian_flow_count" in low))) if looks_table: t = _sanitize_sankey_table( s, seg=seg, mod=mod, loy=loy, enforce_single_level=True, drop_overall_if_mixed=True ) if collapse_to_buy: t = add_collapsed_to_buy(t, add_from=collapse_from) return t return _sankey_build_table( s, seg=seg, mod=mod, loy=loy, collapse_to_buy=collapse_to_buy, collapse_from=collapse_from ) def sankey_figure( df_sankey: pd.DataFrame | None, seg, mod, loy, normalize=False, base_stage="전체", drag=False, show_kind=True, table_override: pd.DataFrame | None = None, ): # ── 0) 레거시/실수 호환: normalize 자리에 DataFrame이 들어온 경우 보정 # (스모크 테스트에서 positional로 override가 들어오는 패턴 방지) if isinstance(normalize, pd.DataFrame) and table_override is None: table_override = normalize normalize = False # 의미 없는 값이었으므로 안전 기본값 # ── 1) 테이블 소스 선택 if table_override is not None: # override가 raw여도 안전하게 정규화/보강 g = _sanitize_sankey_table(table_override, seg=seg, mod=mod, loy=loy) else: g = build_sankey_flow_table(df_sankey, seg=seg, mod=mod, loy=loy, collapse_to_buy=True) if g is None or g.empty: return _empty_fig("No Sankey data") # ── 2) 색/인덱스 준비 idx = {v:i for i,v in enumerate(STAGES)} STAGE_COLOR = { "전체": COL_STAGE_OVERALL, "미선호": COL_STAGE_NONPREF, "선호": COL_STAGE_PREF, "추천": COL_STAGE_REC, "구매의향": COL_STAGE_INTENT, "구매": COL_STAGE_BUY, } # ★ 여기 한 줄: Sankey에서 '전체'만 검정으로 STAGE_COLOR["전체"] = "#000000" # 또는 COL_BLACK node_colors = [STAGE_COLOR[s] for s in STAGES] # ✅ 노드 x 좌표도 6개로 xs = [0.00, 0.18, 0.34, 0.54, 0.74, 0.94] # ── 3) 그림 fig = go.Figure() fig.add_trace(go.Sankey( arrangement=("freeform" if drag else "fixed"), valueformat=",.1f", valuesuffix=" φ", node=dict( pad=14, thickness=18, label=STAGES, x=xs, y=[0.50]*len(STAGES), color=node_colors, line=dict(color="#9aa0a6", width=0.7), ), link=dict( source=[idx[a] for a in g["from_stage"]], target=[idx[b] for b in g["to_stage"]], value=g["flow_phi"].astype(float).tolist(), color=( np.where(g["kind"].astype(str)=="직접", hex_to_rgba(COL_LINK_DIRECT, 0.90), hex_to_rgba(COL_LINK_INDIRECT, 0.70)) if show_kind else [hex_to_rgba(COL_LINK_DIRECT, 0.85)] * len(g) ).tolist(), customdata=np.stack([ g["kind"].astype(str).to_numpy(), g["dist"].astype(int).to_numpy(), g["count"].astype(float).to_numpy(), ], axis=-1), hovertemplate=( "%{customdata[0]} | %{source.label} → %{target.label}" "
점프: %{customdata[1]}단계" "
실제유량: %{customdata[2]:,} (표시 %{value:,.1f} φ)" "" ), ), )) if show_kind: fig.add_trace(go.Scatter(x=[None], y=[None], mode="markers", marker=dict(size=10, color=hex_to_rgba(COL_LINK_DIRECT, 0.90)), name="직접(인접)")) fig.add_trace(go.Scatter(x=[None], y=[None], mode="markers", marker=dict(size=10, color=hex_to_rgba(COL_LINK_INDIRECT, 0.70)), name="간접(스킵)")) base = base_stage if base_stage in STAGES else "전체" tot_dir = float(g.loc[g["kind"]=="직접", "flow_phi"].sum()) tot_ind = float(g.loc[g["kind"]=="간접", "flow_phi"].sum()) # sankey_figure 끝부분 fig.update_layout( title=f"Journey Sankey · 모든 순방향(스킵 포함) · 기준={base}", height=390, showlegend=True, paper_bgcolor="#fff", plot_bgcolor="#fff", font=dict(color="#111"), margin=dict(l=10, r=10, t=32, b=64), ) fig.add_annotation( x=0, y=-0.20, xref="paper", yref="paper", showarrow=False, align="left", text=f"직접 {tot_dir:,.1f} φ · 간접 {tot_ind:,.1f} φ", font=dict(size=11, color="#444") ) # ↓↓↓ 이 네 줄은 반드시 함수 안쪽(같은 들여쓰기 레벨)이어야 함 fig = apply_dense_grid(fig) # 공통 스타일 # Sankey 전용: 축 감추기(카테시안 축 없음) fig.update_xaxes(visible=False, showgrid=False, zeroline=False, fixedrange=True) fig.update_yaxes(visible=False, showgrid=False, zeroline=False, fixedrange=True) return fig # ==== STAGE COLORS (전체→선호→추천→의향→구매) ==== COL_STAGE_OVERALL = "#C32C2C" # 빨 COL_STAGE_PREF = "#D24D3E" # 주 COL_STAGE_REC = "#DE937A" # 노 COL_STAGE_INTENT = "#D49442" # 베(골드톤) COL_STAGE_BUY = "#2B8E81" # 초록 ← 오타 수정 COL_STAGE_NONPREF = "#9CA3AF" # 미선호(회색) def matrix_funnel_figure(row, df_tm, seg, mod, loy, **kwargs): """ 누적 퍼널: - 퍼센트 문자열(예: '45.5%')/공백 섞여도 robust parsing - 값이 비어도(drop/success 둘 다 NaN) 최소 2단계 이상 강제로 그려줌 - 기본 높이 420 (FUNNEL_H가 있으면 그 값 따름) """ # --- Robust percent parser ------------------------------------------------- def _p(x): if x is None: return np.nan if isinstance(x, str): s = x.strip() if not s: return np.nan if s.endswith("%"): try: return float(s[:-1].strip()) / 100.0 except Exception: return np.nan try: return float(s) except Exception: return np.nan try: x = float(x) except Exception: return np.nan # 1.5 초과면 퍼센트로 간주(23 => 0.23) return x / 100.0 if x > 1.5 else x def _clip01(v): return np.nan if not np.isfinite(v) else float(min(1.0, max(0.0, v))) # --- 1) 드롭/최종율 확보 --------------------------------------------------- d1_raw, d2_raw, d3_raw, full_raw = drops_from_anywhere(row, df_tm, seg, mod, loy) d1, d2, d3 = map(_clip01, map(_p, (d1_raw, d2_raw, d3_raw))) full_conv = _p(full_raw) # --- 2) 단계별 성공률 ------------------------------------------------------ pref_sr = _p(row.get("pref_success_rate")) rec_sr = _p(row.get("rec_success_rate")) intent_sr = _p(row.get("intent_success_rate")) buy_sr = _p(row.get("buy_success_rate")) # --- 3) 누적율 계산(드롭우선, 결측 폴백) ----------------------------------- overall = 1.0 pref = pref_sr rec = pref * (1 - d1) if np.isfinite(pref) and np.isfinite(d1) else rec_sr intent = rec * (1 - d2) if np.isfinite(rec) and np.isfinite(d2) else intent_sr if np.isfinite(intent) and np.isfinite(d3): buy = intent * (1 - d3) elif np.isfinite(buy_sr): buy = buy_sr elif np.isfinite(full_conv): buy = full_conv else: buy = intent # 단조감소 보장 + [0,1] 클리핑 seq = [overall, _clip01(pref), _clip01(rec), _clip01(intent), _clip01(buy)] for i in range(1, len(seq)): if np.isfinite(seq[i]) and np.isfinite(seq[i-1]) and seq[i] > seq[i-1]: seq[i] = seq[i-1] overall, pref, rec, intent, buy = seq # --- 4) 라벨/값 구성(비어도 항상 그리기) ----------------------------------- labels, values = ["전체"], [overall] if np.isfinite(pref): labels.append("선호"); values.append(pref) if np.isfinite(rec): labels.append("추천"); values.append(rec) if np.isfinite(intent): labels.append("구매의향"); values.append(intent) if np.isfinite(buy): labels.append("구매"); values.append(buy) if len(labels) <= 1: # 드롭률 기반으로 최소 2단계라도 구성 v = [1.0] if np.isfinite(d1): v.append(v[-1]*(1-d1)) if np.isfinite(d2): v.append(v[-1]*(1-d2)) if np.isfinite(d3): v.append(v[-1]*(1-d3)) if len(v) == 1: est = _clip01(buy_sr if np.isfinite(buy_sr) else full_conv) v.append(0.0 if not np.isfinite(est) else est) names = ["전체","선호","추천","구매의향","구매"][:len(v)] labels, values = names, v txtpos = ["inside" if v >= 0.07 else "outside" for v in values] color_map = { "전체": hex_to_rgba(COL_STAGE_OVERALL, 0.85), "선호": hex_to_rgba(COL_STAGE_PREF, 0.85), "추천": hex_to_rgba(COL_STAGE_REC, 0.85), "구매의향": hex_to_rgba(COL_STAGE_INTENT, 0.85), "구매": hex_to_rgba(COL_STAGE_BUY, 0.85), } colors = [color_map.get(l, hex_to_rgba(COL_GRAY, 0.85)) for l in labels] fig = go.Figure(go.Funnel( y=labels, x=values, name="누적율", customdata=values, textinfo="none", texttemplate="%{customdata:.1%}", textposition=txtpos, hovertemplate="%{label}: %{customdata:.1%}", marker=dict(color=colors, line=dict(width=0.6, color="rgba(0,0,0,0.25)")), connector=dict(line=dict(color="rgba(0,0,0,0.25)", width=0.6)), )) # — 높이 확장 & 여백 다이어트 fig.update_layout( title="Funnel (누적율)", height=FUNNEL_H if 'FUNNEL_H' in globals() else 420, margin=dict(l=6, r=6, t=26, b=14), paper_bgcolor="#ffffff", plot_bgcolor="#ffffff", ) fig.update_xaxes(dtick=_auto_dtick(1.0), tickformat=".0%") return apply_dense_grid(fig, x_prob=True) def survival_curve_figure(row, df_tm, seg, mod, loy): d1, d2, d3, _ = drops_from_anywhere(row, df_tm, seg, mod, loy) vals = [1.0] if np.isfinite(d1): vals.append(vals[-1]*(1-d1)) if np.isfinite(d2): vals.append(vals[-1]*(1-d2)) if np.isfinite(d3): vals.append(vals[-1]*(1-d3)) if len(vals) == 1: return _empty_fig("No Survival data") stages = ["Start","선호","추천","구매의향","구매"][:len(vals)] xs = list(range(len(vals))) fig = go.Figure() fig.add_trace(go.Scatter( x=xs, y=vals, mode="lines+markers", line=dict(width=3, color=COL_GRAY), marker=dict(color=COL_GREEN_LITE), hovertemplate="단계=%{text}
생존=%{y:.1%}", text=stages, name="생존확률" )) drops = [d1,d2,d3] for i, dv in enumerate(drops, start=1): if i < len(vals) and np.isfinite(dv): fig.add_annotation(x=i-0.5, y=(vals[i-1]+vals[i])/2, text=f"실패 {dv:.1%}", showarrow=False, font=dict(size=11, color=COL_ORANGE)) fig.update_layout(height=320, title="스테이지 생존 커브", xaxis=dict(tickmode="array", tickvals=xs, ticktext=stages), yaxis=dict(range=[0,1], tickformat=".1%")) return apply_dense_grid(fig, y_prob=True) def waterfall_figure(row, df_tm, seg, mod, loy): d1, d2, d3, full = drops_from_anywhere(row, df_tm, seg, mod, loy) def _as_prob(p): p = _safe_num(p) if not np.isfinite(p): return np.nan return p/100.0 if p > 1.5 else p d1, d2, d3 = map(_as_prob, [d1, d2, d3]) buy_sr = _as_prob(row.get("buy_success_rate")) intent = _as_prob(row.get("intent_success_rate")) full_in = _as_prob(full) # 최종 구매율 보정 full = full_in if not np.isfinite(full): if np.isfinite(buy_sr): full = buy_sr elif np.isfinite(intent) and np.isfinite(d3): full = intent * (1.0 - d3) elif all(np.isfinite([d1, d2, d3])): full = (1.0 - d1) * (1.0 - d2) * (1.0 - d3) # 절대 드롭 if all(np.isfinite([d1, d2, d3])): drop1 = 1.0 * d1 drop2 = (1.0 - d1) * d2 drop3 = (1.0 - d1) * (1.0 - d2) * d3 else: drop1 = d1 if np.isfinite(d1) else 0.0 drop2 = d2 if np.isfinite(d2) else 0.0 drop3 = d3 if np.isfinite(d3) else 0.0 # 최종율 미지정이면 드롭 합으로 보정 final_rate = float(full) if np.isfinite(full) else max(0.0, 1.0 - drop1 - drop2 - drop3) if not any(np.isfinite(v) for v in [drop1, drop2, drop3]) and not np.isfinite(final_rate): return _empty_fig("No Waterfall data") def _fmt_drop(v): return "" if not np.isfinite(v) else (f"-{v:.1%}" if v >= 1e-6 else "-0.0%") # ★ 여기부터: '전체 100%' 막대 제거 버전 measures = ["relative", "relative", "relative", "total"] x = ["선호→추천
Drop", "추천→구매의향
Drop", "구매의향→구매
Drop", "구매율"] y = [-drop1, -drop2, -drop3, final_rate] texts = [_fmt_drop(drop1), _fmt_drop(drop2), _fmt_drop(drop3), f"{final_rate:.1%}"] positions = ["inside", "inside", "inside", "outside"] fig = go.Figure(go.Waterfall( measure=measures, x=x, y=y, name="drop-off", text=texts, textposition=positions, insidetextfont=dict(color="white"), outsidetextfont=dict(color="#111"), decreasing={"marker":{"color": COL_GRAY_MED}}, increasing={"marker":{"color": COL_GRAY_MED}}, totals={"marker":{"color": COL_BLUE_DEEP}}, connector={"line":{"color":"rgba(0,0,0,0.25)", "width":0.6}}, cliponaxis=False, constraintext="both" )) fig.update_layout( height=320, title="드롭오프 워터폴", yaxis_tickformat=".1%", xaxis=dict(tickangle=0, automargin=True), margin=dict(l=8, r=8, t=30, b=14), # 좌우 여백 살짝 더 줄임 uniformtext_minsize=9, uniformtext_mode="hide", ) # 공통 스타일 먼저 fig = apply_dense_grid(fig, y_prob=True) # ── 워터폴 가독성 튜닝(Apply 후 다시 덮어쓰기) fig.update_layout( showlegend=False, # 범례 숨겨 상단 공간 확보 bargap=0.15, # 바 사이 간격 축소 → 막대가 두툼하게 margin=dict(l=8, r=8, t=30, b=14), ) fig.update_xaxes(automargin=True) return fig def stacked_funnel_figure(row): stages = [("선호", "pref_success_rate"), ("추천", "rec_success_rate"), ("구매의향", "intent_success_rate"), ("구매", "buy_success_rate")] succ = []; fail = []; labs=[] for lab, col in stages: p = _safe_num(row.get(col)) if np.isfinite(p): succ.append(p); fail.append(1-p); labs.append(lab) if not succ: return _empty_fig("No Funnel data") fig = go.Figure() fig.add_bar(x=labs, y=succ, name="성공", text=[f"{v:.1%}" for v in succ], textposition="inside", marker_color=COL_GREEN_LITE) fig.add_bar(x=labs, y=fail, name="실패", text=[f"{v:.1%}" for v in fail], textposition="inside", marker_color=COL_RED) fig.update_layout(barmode="stack", yaxis=dict(range=[0,1], tickformat=".1%"), height=320, title="100% 스택 퍼널 (성공/실패)") return apply_dense_grid(fig, y_prob=True) def forest_figure(df_scope: pd.DataFrame): if df_scope is None or df_scope.empty: return _empty_fig("No Forest data") if not {"model", "segment"}.issubset(set(df_scope.columns)): return _empty_fig("Need 'model' and 'segment'") s = df_scope.copy() # ----- 1) 사용할 단계(성공률) 선택: buy → intent → rec → pref → success_rate → rate stage_order = [ ("buy", "buy_success_rate"), ("intent", "intent_success_rate"), ("rec", "rec_success_rate"), ("pref", "pref_success_rate"), ("", "success_rate"), ("", "rate"), ] stage = "" rate_col = None for st, col in stage_order: if col in s.columns: stage, rate_col = st, col break if rate_col is None: return _empty_fig("No rate column") # ----- 2) 표본(n) 컬럼 찾기(단계별 우선, 없으면 일반 표본명으로 폴백) def _find_n_col(stage_name: str) -> str | None: cands = [] if stage_name: cands += [f"{stage_name}_sample_size", f"{stage_name}_n", f"{stage_name}_total"] cands += ["sample_size", "n", "N", "total", "count", "nobs", "베이스수", "표본수", "pref_sample_size"] for c in cands: if c in s.columns: return c return None n_col = _find_n_col(stage) if n_col is None: return _empty_fig("No sample size column") # ----- 3) 숫자화 + 비율 정규화 s[rate_col] = pd.to_numeric(s[rate_col], errors="coerce") s[n_col] = pd.to_numeric(s[n_col], errors="coerce") s = s.dropna(subset=[rate_col, n_col]) if s.empty: return _empty_fig("No Forest values") r = np.where(s[rate_col] > 1.5, s[rate_col] / 100.0, s[rate_col]) # % → 비율 r = np.clip(r, 0.0, 1.0) n = np.clip(s[n_col].to_numpy().astype(float), 0.0, np.inf) k = np.clip(np.round(r * n), 0.0, n) # 성공 수 추정 # ----- 4) 모델 단위로 집계(중복 y축 제거) agg = (pd.DataFrame({ "model": s["model"].astype(str), "segment": s["segment"].astype(str), "k": k, "n": n }) .groupby("model", as_index=False) .agg(k=("k","sum"), n=("n","sum"), seg=("segment", lambda x: x.iloc[0]))) if agg.empty or not np.isfinite(agg["n"]).any(): return _empty_fig("No Forest values") # ----- 5) Jeffreys 95% CI alpha = 0.05 try: from scipy.stats import beta as _beta agg["p"] = (agg["k"] + 0.5) / (agg["n"] + 1.0) agg["lo"] = _beta.ppf(alpha/2, agg["k"] + 0.5, agg["n"] - agg["k"] + 0.5) agg["hi"] = _beta.ppf(1 - alpha/2, agg["k"] + 0.5, agg["n"] - agg["k"] + 0.5) except Exception: try: from statsmodels.stats.proportion import proportion_confint agg["p"] = (agg["k"] + 0.5) / (agg["n"] + 1.0) lo, hi = proportion_confint(agg["k"], agg["n"], alpha=alpha, method="beta") agg["lo"], agg["hi"] = lo, hi except Exception: # Wilson 폴백 z = 1.959963984540054 p = agg["k"] / agg["n"] denom = 1 + z*z/agg["n"] center = (p + z*z/(2*agg["n"])) / denom half = z*np.sqrt((p*(1-p) + z*z/(4*agg["n"])) / agg["n"]) / denom agg["p"] = p agg["lo"] = np.maximum(0.0, center - half) agg["hi"] = np.minimum(1.0, center + half) use = agg.sort_values("p").reset_index(drop=True) # ----- 6) 색(모델의 우세 세그먼트) 지정 dom_seg = _model_dominant_segment(df_scope) mapped_seg = use["model"].map(dom_seg).fillna(use["seg"]) colors = mapped_seg.apply(_tier_color_for_segment).tolist() err_plus = (use["hi"] - use["p"]).to_numpy() err_minus = (use["p"] - use["lo"]).to_numpy() # ----- 7) 플롯 fig = go.Figure() fig.add_trace(go.Scatter( x=use["p"].astype(float), y=use["model"].astype(str), mode="markers", name="모델", # ← trace 이름 지정 (trace 0 제거) hovertemplate="%{y}: %{x:.1%}", marker=dict(size=10, color=colors, line=dict(color=COL_BLACK, width=1.6)), )) fig.update_traces(error_x=dict( type="data", symmetric=False, array=err_plus, arrayminus=err_minus, color=COL_BLACK, thickness=1.2, width=3 )) add_vline_safe(fig, 0.5, line_dash="dot", line_color=COL_BLACK, opacity=0.4) fig.update_layout( height=320, title="포레스트 플롯 (모델 비교) — 95% CI", xaxis=dict(range=[0, 1], dtick=0.1, tickformat=".0%", title="성공률"), margin=dict(l=10, r=10, t=54, b=18), showlegend=False, ) fig = apply_dense_grid(fig, x_prob=True) fig.update_layout(margin=dict(l=10, r=10, t=78, b=24)) # 상단 여백 키움 fig.update_yaxes(domain=[0.12, 1.00]) # 위쪽 12% 비워서 아래로 내림 return fig def compare_distribution_figure(df_master, seg, mod, loy, stage_label): if df_master is None or df_master.empty: return _empty_fig("No Ranking data") seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy) stage2lift = { "선호": "pref_lift_vs_galaxy", "추천": "rec_lift_vs_galaxy", "구매의향": "intent_lift_vs_galaxy", "구매": "buy_lift_vs_galaxy", } lift_col = stage2lift.get(stage_label, "buy_lift_vs_galaxy") if lift_col not in df_master.columns: return _empty_fig("No lift column") # 1) 비교 축 고르기 candidates = [] if mod == "ALL": candidates.append("model") if seg == "ALL": candidates.append("segment") if loy == "ALL": candidates.append("loyalty") key = None for k in candidates: if k in df_master.columns and df_master[k].astype(str).nunique(dropna=True) > 1: key = k break if key is None: # fallback: 유니크 가장 많은 축 avail = [c for c in ["model","segment","loyalty"] if c in df_master.columns] if not avail: return _empty_fig("No grouping key") key = max(avail, key=lambda c: df_master[c].astype(str).nunique(dropna=True)) # 2) 전체/선택 집계 overall = (df_master.groupby(key, as_index=False) .agg({lift_col: "mean"}) .rename(columns={lift_col: "전체"})) scope = df_master.copy() if seg != "ALL": scope = scope[scope["segment"].astype(str) == seg] if mod != "ALL": scope = scope[scope["model"].astype(str) == mod] if loy != "ALL": scope = scope[scope["loyalty"].astype(str) == loy] if scope.empty: return _empty_fig("No values") selected = (scope.groupby(key, as_index=False) .agg({lift_col: "mean"}) .rename(columns={lift_col: "선택"})) merged = pd.merge(overall, selected, on=key, how="outer") if merged.empty: return _empty_fig("No values") # 3) 정리: 키는 문자열로, 결측 수치만 0.0으로 merged[key] = merged[key].astype(str) for col in ["전체", "선택"]: if col in merged.columns: merged[col] = pd.to_numeric(merged[col], errors="coerce") merged[["전체","선택"]] = merged[["전체","선택"]].fillna(0.0) # 정렬 순서(선택 오름차순이 기본, 전부 0이면 전체 기준) if (merged["선택"] != 0).any(): order = merged.sort_values("선택", ascending=True)[key].tolist() else: order = merged.sort_values("전체", ascending=True)[key].tolist() base = merged.set_index(key).loc[order] # 4) 색상 vals_sel = base["선택"].to_numpy() if key == "model": dom_seg = _model_dominant_segment(df_master) bar_colors = [_tier_color_for_segment(dom_seg.get(k, "LowEnd")) for k in order] else: bar_colors = royg_color_for(vals_sel) # 5) 그림 fig = go.Figure() fig.add_trace(go.Bar( x=base["전체"], y=order, orientation="h", name="전체", marker_color="rgba(150,150,150,0.35)" )) fig.add_trace(go.Bar( x=vals_sel, y=order, orientation="h", name="선택", marker=dict(color=bar_colors, line=dict(color=COL_GRAY, width=0.5)), text=[f"{v:+.1f}" for v in vals_sel], textposition="outside" )) add_vline_safe(fig, 0, line_dash="dot", line_color=COL_GRAY) fig.update_layout( barmode="group", title=f"{stage_label} Lift ({key})", height=320, margin=dict(l=10, r=10, t=54, b=18), paper_bgcolor="#ffffff", plot_bgcolor="#ffffff" ) # 공통 스타일 먼저 fig = apply_dense_grid(fig) # 3) 위로 들러붙는 것 방지용으로 '위 여백+도메인' 덮어쓰기 fig.update_layout(margin=dict(l=10, r=10, t=68, b=28)) fig.update_yaxes(domain=[0.0, 0.86]) # 위쪽 14% 비워서 아래로 내림 # 4) 리턴 return fig def bubble_figure( df_scope: pd.DataFrame, lift_col: str, snr_col: str, label_top_n: int = 4, label_inside: bool = False, textfont_size: int = 11 ) -> go.Figure: # --- 가드 --- if df_scope is None or df_scope.empty: return _empty_fig("No Bubble data") if lift_col not in df_scope.columns or snr_col not in df_scope.columns: return _empty_fig("No Bubble data") s = df_scope.copy() s[lift_col] = pd.to_numeric(s[lift_col], errors="coerce") s[snr_col] = pd.to_numeric(s[snr_col], errors="coerce") s["pref_sample_size"] = pd.to_numeric( s.get("pref_sample_size", pd.Series(1, index=s.index)), errors="coerce" ).fillna(1.0) key = "model" if ("model" in s.columns and s["model"].notna().any()) else ( "segment" if "segment" in s.columns else None) if key is None: return _empty_fig("No Bubble key") need_cols = [key, lift_col, snr_col, "pref_sample_size"] if "segment" in s.columns and "segment" not in need_cols: need_cols.append("segment") use = s[need_cols].dropna(subset=[lift_col, snr_col]) if use.empty: return _empty_fig("No Bubble values") # ---- 집계 ---- if "segment" in use.columns: grp = (use.groupby(key, as_index=False) .agg(x=(lift_col, "mean"), y=(snr_col, "mean"), n=("pref_sample_size", "sum"), seg=("segment", "first"))) else: grp = (use.groupby(key, as_index=False) .agg(x=(lift_col, "mean"), y=(snr_col, "mean"), n=("pref_sample_size", "sum"))) grp["seg"] = np.nan # ---- 색상 ---- dom_seg = _model_dominant_segment(df_scope) def _color_for(row): if key == "model": base_seg = dom_seg.get(str(row[key]), row["seg"]) else: base_seg = row["seg"] if pd.notna(row["seg"]) else row[key] return _tier_color_for_segment(base_seg) grp["color"] = grp.apply(_color_for, axis=1) # ---- 버블 크기(√스케일) ---- n = grp["n"].astype(float).to_numpy() if np.isfinite(n).any(): r = np.sqrt(np.maximum(n, 0)) r0, r1 = float(np.nanmin(r)), float(np.nanmax(r)) size = 24.0 if abs(r1 - r0) < 1e-9 else 12 + (r - r0)/(r1 - r0) * 48 else: size = np.full(len(grp), 24.0) # ---- 라벨 ---- labels_all = grp[key].astype(str).tolist() if label_top_n is None or label_top_n == -1: text = labels_all elif label_top_n <= 0: text = [""] * len(labels_all) else: top_idx = np.argsort(-grp["n"].to_numpy())[:label_top_n] show = set(top_idx.tolist()) text = [labels_all[i] if i in show else "" for i in range(len(labels_all))] hovertext = grp[key].astype(str) # ===== 승/패 분할 경계 & 음영 ===== x_vals = grp["x"].astype(float).to_numpy() y_vals = grp["y"].astype(float).to_numpy() x_thr = 0.0 if (np.nanmin(x_vals) < 0 < np.nanmax(x_vals)) else float(np.nanmedian(x_vals)) y_thr = 2.0 if (np.nanmin(y_vals) <= 2.0 <= np.nanmax(y_vals)) else float(np.nanmedian(y_vals)) x_min, x_max = float(np.nanmin(x_vals)), float(np.nanmax(x_vals)) y_min, y_max = float(np.nanmin(y_vals)), float(np.nanmax(y_vals)) x_pad = (x_max - x_min) * 0.03 if np.isfinite(x_max - x_min) else 0.0 y_pad = (y_max - y_min) * 0.03 if np.isfinite(y_max - y_min) else 0.0 x0, x1 = x_min - x_pad, x_max + x_pad y0, y1 = y_min - y_pad, y_max + y_pad winner_fill = hex_to_rgba("#FDE68A", 0.16) loser_fill = hex_to_rgba("#9CA3AF", 0.14) fig = go.Figure() add_vrect_safe(fig, x0, x_thr, y0=y_thr, y1=y1, fillcolor=loser_fill, layer="below") add_vrect_safe(fig, x_thr, x1, y0=y_thr, y1=y1, fillcolor=winner_fill, layer="below") add_vline_safe(fig, x_thr, line_dash="dot", line_color="#888", opacity=0.6) add_hline_safe(fig, y_thr, line_dash="dot", line_color="#888", opacity=0.6) fig.add_trace(go.Scatter( x=grp["x"], y=grp["y"], mode="markers+text", text=text, hovertext=hovertext, textposition=("middle center" if label_inside else "top center"), textfont=dict(size=textfont_size), cliponaxis=False, marker=dict(size=size, color=grp["color"], line=dict(color="#111", width=0.7)), customdata=grp["n"].astype(float), hovertemplate=(f"{key}=%{{hovertext}}
" "Lift=%{x:.1f}
" "SNR=%{y:.1f}
" "표본=%{customdata:,}"), name="모델/세그" )) # 기본 레이아웃 fig.update_layout( xaxis_title=None, yaxis_title="SNR", height=320, showlegend=False, paper_bgcolor="#fff", plot_bgcolor="#fff", margin=dict(l=10, r=10, t=26, b=48) ) fig.update_xaxes(title_standoff=18, automargin=True) fig.update_yaxes(title_standoff=8, automargin=True) # 각주 foot_y = -0.20 fig.add_annotation(xref="paper", yref="paper", x=0.00, y=foot_y, text="", showarrow=False, font=dict(size=11, color="#FDE68A")) fig.add_annotation(xref="paper", yref="paper", x=0.035, y=foot_y, text="승자 영역 (Lift↑, SNR↑)", showarrow=False, font=dict(size=10, color="#555"), xanchor="left") fig.add_annotation(xref="paper", yref="paper", x=0.32, y=foot_y, text="", showarrow=False, font=dict(size=11, color="#9CA3AF")) fig.add_annotation(xref="paper", yref="paper", x=0.355, y=foot_y, text="패자 영역 (Lift↓, SNR↑)", showarrow=False, font=dict(size=10, color="#555"), xanchor="left") fig.add_annotation(xref="paper", yref="paper", x=0.67, y=foot_y, text="○ 원 크기 = 표본수(√스케일)", showarrow=False, font=dict(size=10, color="#666"), xanchor="left") # 공통 스타일 적용 후 '위로 들러붙음' 해소용 덮어쓰기 fig = apply_dense_grid(fig) fig.update_layout( height=320, margin=dict(l=10, r=10, t=84, b=52), # ↑ 상단 여백 크게 title=dict(y=0.98, pad=dict(t=18, b=0)) # 타이틀도 살짝 내려줌 ) fig.update_yaxes(domain=[0.12, 1.00], automargin=True) # ↑ 플롯 영역 자체를 아래로 return fig def ppc_purchase_overlay_figure(row: pd.Series, m: int | None = None, draws: int = 6000) -> go.Figure: """관측 구매율과 Posterior(베타) & Posterior Predictive(베타-이항) 오버레이.""" # 관측치 n = _pick_sample_for_stage(row, "buy") if n <= 0: n = _safe_int0(row.get("pref_sample_size")) p_obs = _safe_num(row.get("buy_success_rate")) if not np.isfinite(p_obs): return _empty_fig("No PPC data") p_obs = float(np.clip(p_obs/100.0 if p_obs > 1.5 else p_obs, 0.0, 1.0)) k_obs = int(np.clip(round(p_obs * max(n, 1)), 0, max(n, 1))) if m is None: m = n # Posterior (Jeffreys prior: Beta(0.5,0.5)) a, b = k_obs + 0.5, (n - k_obs) + 0.5 p = np.random.beta(a, b, size=draws) # Posterior predictive (새 표본 m개 관측 시 비율) m = max(int(m), 1) k_pred = np.random.binomial(m, p) rate_pred = k_pred / m # 95% HDI lo, hi = np.quantile(p, [0.025, 0.975]) fig = go.Figure() fig.add_histogram( x=p, nbinsx=60, histnorm="probability density", name="Posterior p", marker_color=hex_to_rgba("#9CA3AF", 0.45), opacity=0.55 ) fig.add_histogram( x=rate_pred, nbinsx=60, histnorm="probability density", name=f"PPC n={m:,}", marker_color=hex_to_rgba(COL_STAGE_BUY, 0.55), opacity=0.55 ) # 관측치/구간 표시 add_vline_safe(fig, p_obs, line_color="#111", line_width=2, opacity=0.9) fig.add_vrect(x0=lo, x1=hi, fillcolor=hex_to_rgba("#60A5FA", 0.18), line_width=0) # ← 핵심: 범례를 아래로(도면 밖) 보내고 아주 작게 fig.update_layout( barmode="overlay", title="PPC(구매율) — Posterior & Posterior Predictive", height=320, margin=dict(l=10, r=10, t=30, b=64), # 바닥 여백 확보 showlegend=True, legend=dict( orientation="h", y=-0.22, yanchor="top", # 플롯 아래쪽, 도면 밖 x=0.0, xanchor="left", font=dict(size=9), itemsizing="constant", itemwidth=30 ) ) fig.update_xaxes(range=[0, 1], tickformat=".0%", title="구매율") fig.update_yaxes(title="밀도") return apply_dense_grid(fig, x_prob=True) percent1 = FormatTemplate.percentage(1) num1 = Format(precision=1, scheme=Scheme.fixed) CARD_STYLE = { "background": "white", "border": "none", # ← 보더 제거 "borderRadius": "14px", "padding": "14px", "boxShadow": "none", # ← 그림자도 제거(원하면 유지) } # (추가) KPI 전용 카드 — 하늘색 배경 KPI_CARD_STYLE = { **CARD_STYLE, "background": "#EAF2FF", "border": "1px solid #d6e4ff" } ROW2_CARD_H = 360 ROW2_GRAPH_H = 320 # ───────────── spacing knobs (한 곳에서 조절) ───────────── ROW_GAP = "16px" # 카드 사이 간격 PAGE_PAD = "24px 28px 24px" # 행 안쪽 패딩 CARD_H = "430px" # 카드(박스) 높이 GRAPH_H = "390px" # 카드 안 그래프 높이 (CARD_H보다 40px 작게) KPI_GAP = "12px" # KPI 카드 간격 ROW1_COLS = "1fr 1fr 1fr" # 상단 3카드 동일 너비 ROW2_COLS = "1fr 1fr 1fr" # 하단 3카드 동일 너비 # ───────────────── app.layout 교체 ───────────────── # ───────────── spacing & sizing knobs ───────────── TOP_CARD_H = "430px" # 맨 위 3개 카드 박스 높이 TOP_GRAPH_H = "390px" # 박스 안 그래프 높이 (탭/제목 여백 고려해 TOP_CARD_H - 40) ROW_CARD_H = "420px" # 아래 행 카드 높이 ROW_GRAPH_H = "380px" PAGE_PAD = "24px 28px 24px" # 각 행 내부 패딩 ROW_GAP = "16px" # 카드 사이 간격 KPI_GAP = "12px" # 카드 공통 스타일: flex column으로 그래프가 꽉 차도록 CARD_STYLE = { "background": "#fff", "borderRadius": "12px", "padding": "12px", "boxShadow": "0 1px 3px rgba(0,0,0,0.06)", "display": "flex", "flexDirection": "column", } # 그래프 내부 여백/레전드/텍스트를 통일해 보이는 영역을 맞춤 def standardize_top_fig(fig): fig.update_layout( margin=dict(l=28, r=16, t=36, b=28), title_x=0.02, title_pad=dict(t=4, b=4), uniformtext=dict(minsize=10, mode="hide"), legend=dict(orientation="h", x=0, y=-0.2), # 하단 가로배치 → 높이 편차 제거 ) # 축이 있는 차트는 automargin for ax in ("xaxis", "yaxis"): if ax in fig.layout: fig.layout[ax].update(automargin=True, title_standoff=6) return fig # ───────────────── app.layout 교체 ───────────────── # ───────────── spacing & sizing knobs ───────────── TOP_CARD_H = "430px" # 맨 위 3개 카드 박스 높이 TOP_GRAPH_H = "390px" # 박스 안 그래프 높이 (탭/제목 여백 고려해 TOP_CARD_H - 40) ROW_CARD_H = "420px" # 아래 행 카드 높이 ROW_GRAPH_H = "380px" PAGE_PAD = "24px 28px 24px" # 각 행 내부 패딩 ROW_GAP = "16px" # 카드 사이 간격 KPI_GAP = "12px" # 카드 공통 스타일: flex column으로 그래프가 꽉 차도록 CARD_STYLE = { "background": "#fff", "borderRadius": "12px", "padding": "12px", "boxShadow": "0 1px 3px rgba(0,0,0,0.06)", "display": "flex", "flexDirection": "column", } # 그래프 내부 여백/레전드/텍스트를 통일해 보이는 영역을 맞춤 def standardize_top_fig(fig): fig.update_layout( margin=dict(l=28, r=16, t=36, b=28), title_x=0.02, title_pad=dict(t=4, b=4), uniformtext=dict(minsize=10, mode="hide"), legend=dict(orientation="h", x=0, y=-0.2), # 하단 가로배치 → 높이 편차 제거 ) # 축이 있는 차트는 automargin for ax in ("xaxis", "yaxis"): if ax in fig.layout: fig.layout[ax].update(automargin=True, title_standoff=6) return fig # ───────────────── app.layout 교체 ───────────────── ROW_GAP = "16px" # 카드 사이 간격 PAGE_PAD = "24px 28px 24px" # 행 안쪽 패딩 CARD_H = "430px" # 카드(박스) 높이 GRAPH_H = "390px" # 카드 안 그래프 높이 (CARD_H보다 40px 작게) KPI_GAP = "12px" # KPI 카드 간격 ROW1_COLS = "1fr 1fr 1fr" # 상단 3카드 동일 너비 ROW2_COLS = "1fr 1fr 1fr" # 하단 3카드 동일 너비 app.layout = html.Div( [ dcc.Store(id="store-master"), dcc.Store(id="store-tm"), dcc.Store(id="store-sankey"), dcc.Store(id="store-overall"), dcc.Store(id="store-mod-opts"), # Sankey 드래그 토글 + 인터랙션 로그 html.Div( [ dcc.Checklist( id="sankey-drag", options=[{"label": " Sankey 드래그 허용", "value": "drag"}], value=[], inputStyle={"marginRight": "6px"}, style={"fontSize": "12px", "color": "#555"}, ), html.Div(id="interact-msg", style={"marginTop": "6px","fontSize": "12px","color": "#444"}), ], style={"display":"flex","justifyContent":"space-between","alignItems":"center","padding":"0 16px 8px"}, ), # 상단 바 html.Div( [ html.Div("Bayesian Journey Dashboard", style={"fontWeight":"700","fontSize":"18px"}), html.Div( [ dcc.Input(id="excel-path", value=DEFAULT_PATH, placeholder="Excel 경로", style={"width":"520px","marginRight":"8px"}), html.Button("Load", id="load-btn", n_clicks=0, className="btn", style={"marginRight":"8px"}), ], style={"display":"flex","alignItems":"center"}, ), ], style={"display":"flex","justifyContent":"space-between","alignItems":"center", "padding":"12px 16px","borderBottom":"1px solid #eee","position":"sticky", "top":"0","background":"#fafafa","zIndex":10}, ), html.Div(id="status-msg", style={"padding":"8px 16px","color":"#555","fontSize":"12px"}), # 필터 html.Div( [ html.Div([html.Label("Segment", style={"fontWeight":"600"}), dcc.Dropdown(id="dd-seg", options=[], value="ALL", clearable=True)], style={"flex":"1","minWidth":"220px","marginRight":"8px"}), html.Div([html.Label("Model", style={"fontWeight":"600"}), dcc.Dropdown(id="dd-mod", options=[], value="ALL", clearable=True)], style={"flex":"1","minWidth":"220px","marginRight":"8px"}), html.Div([html.Label("Loyalty", style={"fontWeight":"600"}), dcc.Dropdown(id="dd-loy", options=[], value="ALL", clearable=True)], style={"flex":"1","minWidth":"220px"}), ], style={"display":"flex","gap":"8px","padding":"12px 16px"}, ), # KPI html.Div( [ html.Div([html.Div("표본 수", style={"color":"#888","fontSize":"12px"}), html.H3(id="kpi-sample", style={"margin":"4px 0 0"})], style=KPI_CARD_STYLE), html.Div([html.Div("최종 구매율 (Δ 포함)", style={"color":"#888","fontSize":"12px"}), html.H3(id="ins-final", style={"margin":"4px 0 0"})], style=KPI_CARD_STYLE), html.Div([html.Div("최대 드롭", style={"color":"#888","fontSize":"12px"}), html.H3(id="ins-drop", style={"margin":"4px 0 0","fontSize":"18px"})], style=KPI_CARD_STYLE), html.Div([html.Div("불확실성 (95% HDI 폭)", style={"color":"#888","fontSize":"12px"}), html.H3(id="ins-uncert", style={"margin":"4px 0 0"})], style=KPI_CARD_STYLE), ], style={ "display":"grid", "gridTemplateColumns":"repeat(4, minmax(0,1fr))", "gap": KPI_GAP, "padding":"0 16px 12px" }, ), # 숨김 KPI(호환) html.Div([html.H3(id="kpi-buy-success"), html.H3(id="kpi-buy-fail")], style={"display":"none"}), # Row 1: Sankey + 전이 퍼널(누적율) + (워터폴/PPC 탭) html.Div( [ html.Div( dcc.Graph( id="fig-sankey", config=GRAPH_CONFIG | {"responsive": True}, style={"height": GRAPH_H, "width": "100%"} ), style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"} # ← 고정/클립 ), html.Div( dcc.Graph( id="fig-matrix", config=GRAPH_CONFIG | {"responsive": True}, style={"height": GRAPH_H, "width": "100%"} ), style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"} ), html.Div( [ dcc.Tabs( id="tab-right", value="waterfall", children=[ dcc.Tab(label="워터폴", value="waterfall"), dcc.Tab(label="PPC(구매율)", value="ppc"), ], style={"marginBottom":"6px"}, ), dcc.Graph( id="fig-right", config=GRAPH_CONFIG | {"responsive": True}, style={"height": GRAPH_H, "width": "100%"} ), ], style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"}, ), ], style={ "display":"grid", "gridTemplateColumns": ROW1_COLS, "gap": ROW_GAP, "padding": PAGE_PAD, "marginBottom":"22px", }, ), # Row 2: 스테이지 리프트 + 포레스트 + 버블 html.Div( [ html.Div( [ html.Div( [ html.Span( "Stage", style={"fontSize": "12px", "color": "#666", "marginRight": "8px"}, ), dcc.Dropdown( id="dd-stage-rank", options=[{"label": v, "value": v} for v in ["선호", "추천", "구매의향", "구매"]], value="구매", clearable=False, style={"width": "140px", "fontSize": "12px"}, ), ], style={ "display": "flex", "justifyContent": "flex-end", "alignItems": "center", "marginBottom": "6px", }, ), dcc.Graph( id="fig-stage-rank", config={**GRAPH_CONFIG, "responsive": True}, style={"height": GRAPH_H, "width": "100%"}, ), ], style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"}, ), html.Div( dcc.Graph( id="fig-forest", config={**GRAPH_CONFIG, "responsive": True}, style={"height": GRAPH_H, "width": "100%"}, ), style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"}, ), html.Div( dcc.Graph( id="fig-bubble", config={**GRAPH_CONFIG, "responsive": True}, style={"height": GRAPH_H, "width": "100%"}, ), style={**CARD_STYLE, "height": CARD_H, "overflow": "hidden"}, ), ], style={ "display": "grid", "gridTemplateColumns": ROW2_COLS, "gap": ROW_GAP, "padding": PAGE_PAD, "marginTop": "4px", }, ), # 숨김 그래프 html.Div( [ dcc.Graph(id="fig-survival", config=GRAPH_CONFIG, style={"height": GRAPH_H}), dcc.Graph(id="fig-funnel", config=GRAPH_CONFIG, style={"height": GRAPH_H}), ], style={"display":"none"}, ), # 상세 테이블 html.Div( [ html.H4("상세 메트릭", style={"margin":"0 0 8px 0"}), dash_table.DataTable( id="metrics-table", columns=[ {"name": "단계", "id": "단계"}, {"name": "베이스수", "id": "베이스수", "type": "numeric", "format": Format(precision=0, scheme=Scheme.fixed)}, {"name": "성공확률", "id": "성공확률", "type": "numeric", "format": percent1}, {"name": "실패확률", "id": "실패확률", "type": "numeric", "format": percent1}, {"name": "하한", "id": "하한", "type": "numeric", "format": percent1}, {"name": "상한", "id": "상한", "type": "numeric", "format": percent1}, {"name": "판정", "id": "판정"}, {"name": "평가등급", "id": "평가등급"}, {"name": "SNR", "id": "SNR", "type": "numeric", "format": num1}, {"name": "Lift", "id": "Lift", "type": "numeric", "format": num1}, {"name": "raw평균", "id": "raw평균", "type": "numeric", "format": percent1}, {"name": "raw표준편차", "id": "raw표준편차", "type": "numeric", "format": percent1}, ], data=[], page_size=10, style_table={"overflowX":"auto"}, style_cell={"fontFamily":"Noto Sans KR, Arial, sans-serif","fontSize":"12px","padding":"6px"}, style_header={"fontWeight":"bold"}, style_data_conditional=[ {"if": {"column_id": "베이스수"}, "textAlign": "right"}, {"if": {"column_id": "성공확률"}, "textAlign": "right"}, {"if": {"column_id": "실패확률"}, "textAlign": "right"}, {"if": {"column_id": "하한"}, "textAlign": "right"}, {"if": {"column_id": "상한"}, "textAlign": "right"}, {"if": {"column_id": "SNR"}, "textAlign": "right"}, {"if": {"column_id": "Lift"}, "textAlign": "right"}, {"if": {"column_id": "raw평균"}, "textAlign": "right"}, {"if": {"column_id": "raw표준편차"}, "textAlign": "right"}, {"if": {"row_index": "odd"}, "backgroundColor": "#fafafa"}, ], ), ], style={**CARD_STYLE, "margin":"18px 16px 24px"}, ), ], style={"background":"#f6f7fb","minHeight":"100vh"}, ) # ───────────────── app.layout 교체 끝 ───────────────── # ===================== 콜백: Load ===================== @app.callback( Output("store-master","data"), Output("store-tm","data"), Output("store-sankey","data"), Output("store-overall","data"), Output("dd-seg","options"), Output("dd-seg","value"), Output("store-mod-opts","data"), Output("dd-loy","options"), Output("dd-loy","value"), Output("status-msg","children"), Input("load-btn","n_clicks"), State("excel-path","value"), prevent_initial_call=True ) def on_load(n, path): try: exists = os.path.exists(path) size = (os.path.getsize(path) if exists else 0) # 1) 엑셀 로드 df_master, df_tm, df_sankey, overall, seg_opts, mod_opts_all, loy_opts, dbg = load_excel(path) # 2) 마스터로부터 모든 조합 Sankey 캐시 합성 df_sankey_syn = build_sankey_cache_from_master(df_master, collapse_to_buy=True) # 3) 상태 메시지(캐시 행수 포함) status = (f"✅ 로드 완료 | path={path} (exists={exists}, size={size:,} bytes) | " f"engine={dbg.get('engine')} | sheets={dbg.get('sheets')} | matched={dbg.get('matched')} | " f"sankey_cache={len(df_sankey_syn):,} rows") # 4) 리턴: 세 번째(store-sankey)에 캐시를 넣는다 return ( df_master.to_json(date_format="iso", orient="split"), df_tm.to_json(date_format="iso", orient="split"), df_sankey_syn.to_json(date_format="iso", orient="split"), # ⬅ 여기! json.dumps(overall), [{"label":v, "value":v} for v in seg_opts], "ALL", json.dumps(mod_opts_all), [{"label":v, "value":v} for v in loy_opts], "ALL", status ) except Exception as e: err = f"❌ LOAD ERROR: {type(e).__name__}: {e}" print("LOAD ERROR TRACE:\n", traceback.format_exc()) return None, None, None, None, [], None, None, [], None, err # 세그먼트 변경 시 모델 옵션 업데이트 @app.callback( Output("dd-mod","options"), Output("dd-mod","value"), Input("dd-seg","value"), State("store-master","data"), State("store-mod-opts","data"), ) def on_seg_change(seg, js_master, js_allmods): if not js_master or not js_allmods: return [], None df_master = pd.read_json(js_master, orient="split") seg_val = _as_all(seg) if seg_val!="ALL": mods = ["ALL"] + sorted([str(v) for v in df_master[df_master["segment"].astype(str)==seg_val]["model"].dropna().astype(str).unique().tolist() if str(v)!="ALL"]) else: mods = json.loads(js_allmods) return [{"label":v,"value":v} for v in mods], "ALL" @app.callback( Output("interact-msg","children"), Input("fig-sankey","clickData"), Input("fig-matrix","relayoutData"), Input("fig-right","relayoutData"), Input("fig-stage-rank","selectedData"), Input("fig-forest","selectedData"), Input("fig-bubble","selectedData"), prevent_initial_call=True ) def on_interact(sankey_click, matrix_relayout, wf_relayout, rank_sel, forest_sel, bubble_sel): ctx = dash.callback_context if not ctx.triggered: return dash.no_update tid = ctx.triggered[0]["prop_id"] # e.g. "fig-bubble.selectedData" comp, prop = tid.split(".") payload = ctx.triggered[0]["value"] if prop == "clickData" and payload: pt = (payload.get("points") or [{}])[0] label = pt.get("label") or f"{pt.get('sourceLabel','?')}→{pt.get('targetLabel','?')}" return f"🖱 {comp}: {label} 클릭" if prop == "selectedData" and payload: n = len(payload.get("points", [])) return f"🔎 {comp}: {n}개 선택" if prop == "relayoutData" and payload: keys = ", ".join(list(payload.keys())[:3]) return f"🧭 {comp}: 뷰 변경({keys}...)" return dash.no_update # update_all 위쪽(같은 파일)에 추가 def _slice_sankey_cache_by_choice(df, seg, mod, loy): if df is None or df.empty: return pd.DataFrame() sub = df.copy() if "segment" in sub.columns and seg != "ALL": sub = sub[(sub["segment"].astype(str) == seg) | sub["segment"].isna() | (sub["segment"].astype(str) == "ALL")] if "model" in sub.columns and mod != "ALL": sub = sub[(sub["model"].astype(str) == mod) | sub["model"].isna() | (sub["model"].astype(str) == "ALL")] if "loyalty" in sub.columns and loy != "ALL": sub = sub[(sub["loyalty"].astype(str) == loy) | sub["loyalty"].isna() | (sub["loyalty"].astype(str) == "ALL")] if "level" in sub.columns: for lv in LVL_PRIORITY: cand = sub[sub["level"].astype(str) == lv] if not cand.empty: return cand.copy() return sub # 레벨 우선순위(가장 세분화된 것부터)로 하나만 남기기 if "level" in sub.columns: for lv in LVL_PRIORITY: cand = sub[sub["level"].astype(str) == lv] if not cand.empty: return cand.copy() return sub def _read_df_store(js): if not js: return pd.DataFrame() # 이미 dict/object로 들어오면 시도 if isinstance(js, dict): if {"columns","data"}.issubset(js.keys()): return pd.DataFrame(js["data"], columns=js["columns"]) try: return pd.DataFrame(js) except Exception: return pd.DataFrame() # 문자열이면 우선 split → 실패 시 일반 json 해석 if isinstance(js, str): try: return pd.read_json(io.StringIO(js), orient="split") except Exception: try: obj = json.loads(js) if isinstance(obj, dict) and {"columns","data"}.issubset(obj.keys()): return pd.DataFrame(obj["data"], columns=obj["columns"]) elif isinstance(obj, list): return pd.DataFrame(obj) elif isinstance(obj, dict): # overall 같은 dict가 오면 DF로 만들지 않고 빈 DF 반환 return pd.DataFrame() except Exception: return pd.DataFrame() return pd.DataFrame() def _read_overall(js_overall): if not js_overall: return {} if isinstance(js_overall, dict): return js_overall try: return json.loads(js_overall) except Exception: return {} # ===================== 콜백: 대시보드 계산 ===================== @app.callback( Output("kpi-sample","children"), Output("kpi-buy-success","children"), Output("kpi-buy-fail","children"), Output("ins-final","children"), Output("ins-drop","children"), Output("ins-uncert","children"), Output("metrics-table","data"), Output("fig-sankey","figure"), Output("fig-matrix","figure"), #Output("fig-simfan","figure"), Output("fig-bubble","figure"), Output("fig-stage-rank","figure"), Output("fig-survival","figure"), Output("fig-right","figure"), # Output("fig-waterfall","figure"), Output("fig-funnel","figure"), Output("fig-forest","figure"), Input("dd-seg","value"), Input("dd-mod","value"), Input("dd-loy","value"), Input("sankey-drag","value"), Input("dd-stage-rank","value"), Input("tab-right","value"), # ← 추가 Input("store-master","data"), Input("store-tm","data"), Input("store-sankey","data"), Input("store-overall","data"), ) def update_all(seg, mod, loy, drag_val, stage_label, tab_right, js_master, js_tm, js_sankey, js_overall=None): # 기본값 보정 seg = _as_all(seg); mod = _as_all(mod); loy = _as_all(loy) if not isinstance(stage_label, str) or not stage_label: stage_label = "구매" empty = _empty_fig("Load data first") # 가드: 마스터 없으면 15개 템플릿 리턴 if not js_master: return ( "–", "–", "–", # kpi-sample, kpi-buy-success, kpi-buy-fail "–", "–", "–", # ins-final, ins-drop, ins-uncert [], # metrics-table.data empty, empty, # fig-sankey, fig-matrix empty, empty, # fig-bubble, fig-stage-rank empty, empty, # fig-survival, fig-right empty, # fig-funnel empty # fig-forest ) js_sankey, js_overall, _ = _maybe_swap_sankey_overall(js_sankey, js_overall) try: # 0) sankey/overall 뒤바뀜 자동 교정 js_sankey, js_overall, _ = _maybe_swap_sankey_overall(js_sankey, js_overall) # 1) 스토어 읽기(안전) df_master = _read_df_store(js_master) df_tm = _read_df_store(js_tm) df_sankey = _read_df_store(js_sankey) overall = _read_overall(js_overall) # 2) 선택/스코프 row_pick = pick_row_for(df_master, seg, mod, loy) scope = df_master.copy() if seg!="ALL": scope = scope[scope["segment"].astype(str)==seg] if mod!="ALL": scope = scope[scope["model"].astype(str)==mod] if loy!="ALL": scope = scope[scope["loyalty"].astype(str)==loy] # 집계행으로 결측 보강 row_agg = compose_composite_row(scope) rowd = {k: row_pick[k] for k in row_pick.index} def _safe_num_or_nan(x): try: fx = float(x) return fx if np.isfinite(fx) else np.nan except Exception: return np.nan def coalesce_into(dst_dict, src_series, cols): for c in cols: va = _safe_num_or_nan(dst_dict.get(c)) if np.isnan(va): dst_dict[c] = (src_series.get(c) if isinstance(src_series, pd.Series) else np.nan) core_cols = [ "pref_sample_size", "pref_success_rate","pref_ci_lower","pref_ci_upper", "rec_success_rate","rec_ci_lower","rec_ci_upper", "intent_success_rate","intent_ci_lower","intent_ci_upper", "buy_success_rate","buy_ci_lower","buy_ci_upper", "bayesian_dropout_pref_to_rec","bayesian_dropout_rec_to_intent","bayesian_dropout_intent_to_buy", "bayesian_full_conversion", "pref_snr","rec_snr","intent_snr","buy_snr", "pref_lift_vs_galaxy","rec_lift_vs_galaxy","intent_lift_vs_galaxy","buy_lift_vs_galaxy", ] coalesce_into(rowd, row_agg, core_cols) row = pd.Series(rowd) # 3) KPI/테이블 tbl = metrics_table_row(row) def _face(val, good, soso, reverse=False): if not np.isfinite(val): return "❔" v = (1 - val) if reverse else val return "🟢" if v >= good else ("🟡" if v >= soso else "🔴") GOOD_P, SOSO_P = 0.55, 0.45 GOOD_DROP, SOSO_DROP = 0.20, 0.35 GOOD_W, SOSO_W = 0.08, 0.12 sample = _safe_int0(row.get("pref_sample_size")) kpi_sample_text = f"📊 {sample:,}" buy_p = _safe_num(row.get("buy_success_rate")) buy_s = (f"{buy_p:.1%}" if np.isfinite(buy_p) else "N/A") buy_f = (f"{(1-buy_p):.1%}" if np.isfinite(buy_p) else "N/A") overall_buy = _safe_num(overall.get("buy_mean")) delta = (buy_p - overall_buy) if (np.isfinite(buy_p) and np.isfinite(overall_buy)) else np.nan face_final = _face(buy_p, GOOD_P, SOSO_P, reverse=False) ins_final = (f"{face_final} 성공 {buy_s} / 실패 {buy_f} (vs 전체 {delta:+.1%}p)" if np.isfinite(delta) else f"{face_final} 성공 {buy_s} / 실패 {buy_f}") d1, d2, d3, _ = drops_from_anywhere(row, df_tm, seg, mod, loy) drops = [v for v in [d1, d2, d3] if np.isfinite(v)] dmax = max(drops) if drops else np.nan face_drop = _face(dmax, GOOD_DROP, SOSO_DROP, reverse=True) ins_drop = f"{face_drop} " + biggest_drop_text_by_sources(row, df_tm, seg, mod, loy) def _widest_hdi(r): pick = [] for stage, lo_col, hi_col in [("선호","pref_ci_lower","pref_ci_upper"), ("추천","rec_ci_lower","rec_ci_upper"), ("구매의향","intent_ci_lower","intent_ci_upper"), ("구매","buy_ci_lower","buy_ci_upper")]: lo = _safe_num(r.get(lo_col)); hi = _safe_num(r.get(hi_col)) if np.isfinite(lo) and np.isfinite(hi): pick.append((stage, max(0.0, hi - lo))) return max(pick, key=lambda x: x[1]) if pick else (None, np.nan) stage_w, width_w = _widest_hdi(row) face_unc = _face(width_w, GOOD_W, SOSO_W, reverse=True) ins_uncert = "데이터 없음" if stage_w is None else f"{face_unc} {stage_w} 단계 {width_w*100:.1f}%p" # 4) Sankey (캐시 정규화 → 보강) g_for_sankey = build_sankey_flow_table(df_sankey, seg=seg, mod=mod, loy=loy, collapse_to_buy=True) if g_for_sankey is None or g_for_sankey.empty: # 완전 비면 현재 row로 즉석 합성 g_for_sankey = _sankey_from_master_row(row, seg, mod, loy) g_for_sankey = add_collapsed_to_buy(g_for_sankey, add_from=("선호","추천","구매의향")) fig_sankey = sankey_figure( df_sankey=None, seg=seg, mod=mod, loy=loy, drag=("drag" in (drag_val or [])), table_override=g_for_sankey ) # 5) 나머지 그래프 fig_matrix = matrix_funnel_figure(row, df_tm, seg, mod, loy) lift_col = "buy_lift_vs_galaxy" if "buy_lift_vs_galaxy" in scope.columns else "pref_lift_vs_galaxy" snr_col = "buy_snr" if "buy_snr" in scope.columns else "pref_snr" fig_bubble = bubble_figure(scope, lift_col, snr_col) fig_stage_rank = compare_distribution_figure(df_master, seg, mod, loy, stage_label) fig_survival = survival_curve_figure(row, df_tm, seg, mod, loy) fig_funnel = stacked_funnel_figure(row) fig_forest = forest_figure(scope) fig_right = (ppc_purchase_overlay_figure(row) if (tab_right or "waterfall") == "ppc" else waterfall_figure(row, df_tm, seg, mod, loy)) # 6) 최종 15개 리턴(콜백 Output 순서대로) return ( kpi_sample_text, buy_s, buy_f, # kpi-sample, kpi-buy-success, kpi-buy-fail ins_final, ins_drop, ins_uncert, # 인사이트 3개 tbl.to_dict("records"), # metrics-table.data fig_sankey, fig_matrix, # sankey, matrix fig_bubble, fig_stage_rank, # bubble, stage-rank fig_survival, fig_right, # survival, right-panel(waterfall/ppc) fig_funnel, # funnel fig_forest # forest ) except Exception: print("UPDATE ERROR:\n", traceback.format_exc()) return ( "–","–","–","–","–","–", [], empty, empty, empty, empty, empty, empty, empty, empty ) # ===================== 실행 ===================== if __name__ == "__main__": base_port = int(os.getenv("PORT", "8059")) for i in range(5): try: app.run_server(host="0.0.0.0", port=base_port + i, debug=False, use_reloader=False) break except (OSError, SystemExit) as e: if "Address already in use" in str(e) or getattr(e, "code", None) == 1: continue raise