# ============================================================================= # NCA (Necessary Condition Analysis) 모듈 # CE-FDH (Ceiling Envelopment - Free Disposal Hull) 방식 직접 구현 # 참고: Dul (2016), Journal of Business Logistics # ============================================================================= import pandas as pd import numpy as np def _ce_fdh(x: np.ndarray, y: np.ndarray): """CE-FDH 천장선: 각 x에 대해 가능한 최대 y를 반환""" order = np.argsort(x) xs, ys = x[order], y[order] ceiling_x, ceiling_y = [xs[0]], [ys[0]] for i in range(1, len(xs)): if ys[i] > ceiling_y[-1]: ceiling_x.append(xs[i]) ceiling_y.append(ys[i]) return np.array(ceiling_x), np.array(ceiling_y) def _cr_fdh(x: np.ndarray, y: np.ndarray): """CR-FDH: CE-FDH 천장점에 OLS 적합""" cx, cy = _ce_fdh(x, y) if len(cx) < 2: return None, None, None coeffs = np.polyfit(cx, cy, 1) slope, intercept = coeffs y_pred = slope * cx + intercept ss_res = np.sum((cy - y_pred)**2) ss_tot = np.sum((cy - np.mean(cy))**2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else np.nan return slope, intercept, r2 def _effect_size(x, y, slope, intercept, method="cr_fdh"): """NCA 효과 크기 d = 천장 영역 / 전체 범위""" x_range = x.max() - x.min() y_range = y.max() - y.min() scope = x_range * y_range if scope == 0: return 0.0 # 천장선 위쪽 면적 (조건 공간) — NumPy 1.x: trapz / 2.x: trapezoid _trapz = getattr(np, "trapezoid", None) or getattr(np, "trapz", None) xs = np.linspace(x.min(), x.max(), 500) y_ceiling = np.clip(slope * xs + intercept, y.min(), y.max()) ceiling_area = _trapz(y_ceiling - y.min(), xs) d = ceiling_area / scope return round(float(np.clip(d, 0, 1)), 4) def run_nca(df: pd.DataFrame, x_cols: list, y_col: str, p_threshold: float = 0.05): """ Parameters ---------- x_cols : 필요조건 후보 독립변수 목록 y_col : 결과변수 """ data = df[[y_col] + x_cols].dropna() y = data[y_col].values results = [] ceiling_data = {} for xcol in x_cols: x = data[xcol].values slope, intercept, r2_ceil = _cr_fdh(x, y) if slope is None: results.append({"변수": xcol, "효과크기(d)": np.nan, "CR-FDH 기울기": np.nan, "절편": np.nan, "R²(ceiling)": np.nan, "해석": "데이터 부족"}) continue d = _effect_size(x, y, slope, intercept) # 효과크기 해석 (Dul 2016 기준) if d < 0.1: interp = "매우 작음" elif d < 0.3: interp = "작음" elif d < 0.5: interp = "중간" else: interp = "큼" results.append({ "변수(X)": xcol, "결과변수(Y)": y_col, "효과크기(d)": d, "CR-FDH 기울기": round(slope, 4), "절편": round(intercept, 4), "R²(ceiling)": round(r2_ceil, 3) if r2_ceil is not None else np.nan, "해석": interp, "필요조건 판단": "필요조건 ✓" if d >= 0.1 else "필요조건 아님" }) # 병목 분석 데이터 저장 xs_bn = np.arange(0, 110, 10) # 0~100% 수준 x_min, x_max = x.min(), x.max() y_min, y_max = y.min(), y.max() bn_rows = [] for xpct in xs_bn: x_val = x_min + xpct / 100 * (x_max - x_min) y_val = slope * x_val + intercept y_pct = (y_val - y_min) / (y_max - y_min) * 100 if y_max > y_min else np.nan bn_rows.append({"X(%)": xpct, f"{xcol}_최소필요Y(%)": round(float(np.clip(y_pct, 0, 100)), 1)}) ceiling_data[xcol] = pd.DataFrame(bn_rows) result_df = pd.DataFrame(results) # 병목 테이블 통합 if ceiling_data: bottleneck = ceiling_data[x_cols[0]][["X(%)"]] for xcol, bdf in ceiling_data.items(): bottleneck = bottleneck.merge(bdf, on="X(%)") else: bottleneck = pd.DataFrame() return result_df, bottleneck