| |
| |
| |
| |
| |
| import pandas as pd |
| import numpy as np |
|
|
|
|
| def _ce_fdh(x: np.ndarray, y: np.ndarray): |
| """CE-FDH 천장선: 각 x에 대해 가능한 최대 y를 반환""" |
| order = np.argsort(x) |
| xs, ys = x[order], y[order] |
| ceiling_x, ceiling_y = [xs[0]], [ys[0]] |
| for i in range(1, len(xs)): |
| if ys[i] > ceiling_y[-1]: |
| ceiling_x.append(xs[i]) |
| ceiling_y.append(ys[i]) |
| return np.array(ceiling_x), np.array(ceiling_y) |
|
|
|
|
| def _cr_fdh(x: np.ndarray, y: np.ndarray): |
| """CR-FDH: CE-FDH 천장점에 OLS 적합""" |
| cx, cy = _ce_fdh(x, y) |
| if len(cx) < 2: |
| return None, None, None |
| coeffs = np.polyfit(cx, cy, 1) |
| slope, intercept = coeffs |
| y_pred = slope * cx + intercept |
| ss_res = np.sum((cy - y_pred)**2) |
| ss_tot = np.sum((cy - np.mean(cy))**2) |
| r2 = 1 - ss_res / ss_tot if ss_tot > 0 else np.nan |
| return slope, intercept, r2 |
|
|
|
|
| def _effect_size(x, y, slope, intercept, method="cr_fdh"): |
| """NCA 효과 크기 d = 천장 영역 / 전체 범위""" |
| x_range = x.max() - x.min() |
| y_range = y.max() - y.min() |
| scope = x_range * y_range |
| if scope == 0: return 0.0 |
|
|
| |
| _trapz = getattr(np, "trapezoid", None) or getattr(np, "trapz", None) |
| xs = np.linspace(x.min(), x.max(), 500) |
| y_ceiling = np.clip(slope * xs + intercept, y.min(), y.max()) |
| ceiling_area = _trapz(y_ceiling - y.min(), xs) |
| d = ceiling_area / scope |
| return round(float(np.clip(d, 0, 1)), 4) |
|
|
|
|
| def run_nca(df: pd.DataFrame, x_cols: list, y_col: str, |
| p_threshold: float = 0.05): |
| """ |
| Parameters |
| ---------- |
| x_cols : 필요조건 후보 독립변수 목록 |
| y_col : 결과변수 |
| """ |
| data = df[[y_col] + x_cols].dropna() |
| y = data[y_col].values |
| results = [] |
| ceiling_data = {} |
|
|
| for xcol in x_cols: |
| x = data[xcol].values |
| slope, intercept, r2_ceil = _cr_fdh(x, y) |
| if slope is None: |
| results.append({"변수": xcol, "효과크기(d)": np.nan, |
| "CR-FDH 기울기": np.nan, "절편": np.nan, |
| "R²(ceiling)": np.nan, "해석": "데이터 부족"}) |
| continue |
|
|
| d = _effect_size(x, y, slope, intercept) |
|
|
| |
| if d < 0.1: interp = "매우 작음" |
| elif d < 0.3: interp = "작음" |
| elif d < 0.5: interp = "중간" |
| else: interp = "큼" |
|
|
| results.append({ |
| "변수(X)": xcol, |
| "결과변수(Y)": y_col, |
| "효과크기(d)": d, |
| "CR-FDH 기울기": round(slope, 4), |
| "절편": round(intercept, 4), |
| "R²(ceiling)": round(r2_ceil, 3) if r2_ceil is not None else np.nan, |
| "해석": interp, |
| "필요조건 판단": "필요조건 ✓" if d >= 0.1 else "필요조건 아님" |
| }) |
|
|
| |
| xs_bn = np.arange(0, 110, 10) |
| x_min, x_max = x.min(), x.max() |
| y_min, y_max = y.min(), y.max() |
| bn_rows = [] |
| for xpct in xs_bn: |
| x_val = x_min + xpct / 100 * (x_max - x_min) |
| y_val = slope * x_val + intercept |
| y_pct = (y_val - y_min) / (y_max - y_min) * 100 if y_max > y_min else np.nan |
| bn_rows.append({"X(%)": xpct, f"{xcol}_최소필요Y(%)": round(float(np.clip(y_pct, 0, 100)), 1)}) |
| ceiling_data[xcol] = pd.DataFrame(bn_rows) |
|
|
| result_df = pd.DataFrame(results) |
|
|
| |
| if ceiling_data: |
| bottleneck = ceiling_data[x_cols[0]][["X(%)"]] |
| for xcol, bdf in ceiling_data.items(): |
| bottleneck = bottleneck.merge(bdf, on="X(%)") |
| else: |
| bottleneck = pd.DataFrame() |
|
|
| return result_df, bottleneck |
|
|