sem / modules /nca.py
cyj-26's picture
Upload 25 files
26c3195 verified
# =============================================================================
# NCA (Necessary Condition Analysis) 모듈
# CE-FDH (Ceiling Envelopment - Free Disposal Hull) 방식 직접 구현
# 참고: Dul (2016), Journal of Business Logistics
# =============================================================================
import pandas as pd
import numpy as np
def _ce_fdh(x: np.ndarray, y: np.ndarray):
"""CE-FDH 천장선: 각 x에 대해 가능한 최대 y를 반환"""
order = np.argsort(x)
xs, ys = x[order], y[order]
ceiling_x, ceiling_y = [xs[0]], [ys[0]]
for i in range(1, len(xs)):
if ys[i] > ceiling_y[-1]:
ceiling_x.append(xs[i])
ceiling_y.append(ys[i])
return np.array(ceiling_x), np.array(ceiling_y)
def _cr_fdh(x: np.ndarray, y: np.ndarray):
"""CR-FDH: CE-FDH 천장점에 OLS 적합"""
cx, cy = _ce_fdh(x, y)
if len(cx) < 2:
return None, None, None
coeffs = np.polyfit(cx, cy, 1)
slope, intercept = coeffs
y_pred = slope * cx + intercept
ss_res = np.sum((cy - y_pred)**2)
ss_tot = np.sum((cy - np.mean(cy))**2)
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else np.nan
return slope, intercept, r2
def _effect_size(x, y, slope, intercept, method="cr_fdh"):
"""NCA 효과 크기 d = 천장 영역 / 전체 범위"""
x_range = x.max() - x.min()
y_range = y.max() - y.min()
scope = x_range * y_range
if scope == 0: return 0.0
# 천장선 위쪽 면적 (조건 공간) — NumPy 1.x: trapz / 2.x: trapezoid
_trapz = getattr(np, "trapezoid", None) or getattr(np, "trapz", None)
xs = np.linspace(x.min(), x.max(), 500)
y_ceiling = np.clip(slope * xs + intercept, y.min(), y.max())
ceiling_area = _trapz(y_ceiling - y.min(), xs)
d = ceiling_area / scope
return round(float(np.clip(d, 0, 1)), 4)
def run_nca(df: pd.DataFrame, x_cols: list, y_col: str,
p_threshold: float = 0.05):
"""
Parameters
----------
x_cols : 필요조건 후보 독립변수 목록
y_col : 결과변수
"""
data = df[[y_col] + x_cols].dropna()
y = data[y_col].values
results = []
ceiling_data = {}
for xcol in x_cols:
x = data[xcol].values
slope, intercept, r2_ceil = _cr_fdh(x, y)
if slope is None:
results.append({"변수": xcol, "효과크기(d)": np.nan,
"CR-FDH 기울기": np.nan, "절편": np.nan,
"R²(ceiling)": np.nan, "해석": "데이터 부족"})
continue
d = _effect_size(x, y, slope, intercept)
# 효과크기 해석 (Dul 2016 기준)
if d < 0.1: interp = "매우 작음"
elif d < 0.3: interp = "작음"
elif d < 0.5: interp = "중간"
else: interp = "큼"
results.append({
"변수(X)": xcol,
"결과변수(Y)": y_col,
"효과크기(d)": d,
"CR-FDH 기울기": round(slope, 4),
"절편": round(intercept, 4),
"R²(ceiling)": round(r2_ceil, 3) if r2_ceil is not None else np.nan,
"해석": interp,
"필요조건 판단": "필요조건 ✓" if d >= 0.1 else "필요조건 아님"
})
# 병목 분석 데이터 저장
xs_bn = np.arange(0, 110, 10) # 0~100% 수준
x_min, x_max = x.min(), x.max()
y_min, y_max = y.min(), y.max()
bn_rows = []
for xpct in xs_bn:
x_val = x_min + xpct / 100 * (x_max - x_min)
y_val = slope * x_val + intercept
y_pct = (y_val - y_min) / (y_max - y_min) * 100 if y_max > y_min else np.nan
bn_rows.append({"X(%)": xpct, f"{xcol}_최소필요Y(%)": round(float(np.clip(y_pct, 0, 100)), 1)})
ceiling_data[xcol] = pd.DataFrame(bn_rows)
result_df = pd.DataFrame(results)
# 병목 테이블 통합
if ceiling_data:
bottleneck = ceiling_data[x_cols[0]][["X(%)"]]
for xcol, bdf in ceiling_data.items():
bottleneck = bottleneck.merge(bdf, on="X(%)")
else:
bottleneck = pd.DataFrame()
return result_df, bottleneck