sem / sem.py
cyj-26's picture
Upload 2 files
2052219 verified
# =============================================================================
# πŸ“Š SEM 뢄석기 (λΉˆλ„λΆ„μ„ / κΈ°μˆ ν†΅κ³„ / 신뒰도 / CFA / 상관관계 / SEM)
# μ‹€ν–‰: python -m streamlit run sem.py
# =============================================================================
import streamlit as st
import pandas as pd
import numpy as np
import re
from datetime import datetime
from collections import defaultdict
st.set_page_config(page_title="πŸ“Š SEM 뢄석기",
page_icon="πŸ“Š", layout="wide")
# ── λΉ„λ°€λ²ˆν˜Έ 잠금 ─────────────────────────────────────────────────────────────
def _check_password():
try:
correct_pw = st.secrets.get("PASSWORD", "9400")
except Exception:
correct_pw = "9400"
def _submit():
if st.session_state.get("_pw_input") == correct_pw:
st.session_state["_pw_ok"] = True
else:
st.session_state["_pw_ok"] = False
if st.session_state.get("_pw_ok"):
return True
st.markdown("## πŸ”’ SEM 뢄석기")
st.text_input("λΉ„λ°€λ²ˆν˜Έλ₯Ό μž…λ ₯ν•˜μ„Έμš”", type="password",
key="_pw_input", on_change=_submit)
if "_pw_ok" in st.session_state and not st.session_state["_pw_ok"]:
st.error("λΉ„λ°€λ²ˆν˜Έκ°€ ν‹€λ ΈμŠ΅λ‹ˆλ‹€.")
st.stop()
_check_password()
st.markdown("""
<style>
.main-title{font-size:2rem;font-weight:700;color:#2F5496}
.sub-title{font-size:.95rem;color:#666;margin-bottom:1rem}
.card{background:#F3F6FB;border-left:4px solid #2F5496;
padding:8px 14px;border-radius:4px;margin:3px 0}
</style>""", unsafe_allow_html=True)
# ── λͺ¨λ“ˆ μž„ν¬νŠΈ ───────────────────────────────────────────────────────────────
try:
from modules.utils import detect_scale, cronbach_alpha, calc_ave_cr, fmt_p, sig_stars, build_excel
except Exception as e:
st.error(f"λͺ¨λ“ˆ λ‘œλ“œ 였λ₯˜: {e}\n\nμ•±κ³Ό 같은 폴더에 modules/ 폴더가 μžˆλŠ”μ§€ ν™•μΈν•˜μ„Έμš”.")
st.stop()
# ══════════════════════════════════════════════════════════════════════════════
# μœ ν‹Έ ν•¨μˆ˜
# ══════════════════════════════════════════════════════════════════════════════
def detect_outliers_mahalanobis(df, cols, p_threshold=0.001):
from scipy import stats as sc
data = df[cols].dropna()
n, k = data.shape
if n < k + 2:
return None, None, []
try:
mean = data.mean().values
cov_mat = np.cov(data.values.T, ddof=1)
try:
cov_inv = np.linalg.inv(cov_mat)
except np.linalg.LinAlgError:
cov_inv = np.linalg.pinv(cov_mat)
diffs = data.values - mean
mahal = np.array([float(d @ cov_inv @ d) for d in diffs])
pvals = sc.chi2.sf(mahal, df=k)
cutoff = sc.chi2.ppf(1 - p_threshold, df=k)
result = pd.DataFrame({
"원본 ν–‰λ²ˆν˜Έ": data.index + 1,
"λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리": np.round(mahal, 3),
"pκ°’": np.round(pvals, 4),
"μ΄μƒμΉ˜": mahal > cutoff,
}).reset_index(drop=True)
outlier_idx = data.index[mahal > cutoff].tolist()
return result, round(cutoff, 3), outlier_idx
except Exception:
return None, None, []
def compute_smc(df, cols):
data = df[cols].dropna()
if len(data) < len(cols) + 2:
return pd.DataFrame({"λ³€μˆ˜": cols, "SMC": [np.nan]*len(cols),
"νŒμ •": ["사둀 수 λΆ€μ‘±"]*len(cols)})
rows = []
for col in cols:
others = [c for c in cols if c != col]
if not others:
rows.append({"λ³€μˆ˜": col, "SMC": np.nan, "νŒμ •": "-"})
continue
try:
X = data[others].values.astype(float)
y = data[col].values.astype(float)
Xc = np.column_stack([np.ones(len(X)), X])
b, _, _, _ = np.linalg.lstsq(Xc, y, rcond=None)
yh = Xc @ b
ss_res = float(np.sum((y - yh) ** 2))
ss_tot = float(np.sum((y - y.mean()) ** 2))
r2 = (1.0 - ss_res / ss_tot) if ss_tot > 0 else np.nan
rows.append({
"λ³€μˆ˜": col,
"SMC": round(float(r2), 3),
"νŒμ •": "⚠️ 제거 κ³ λ € (SMC < .20)"
if (not np.isnan(r2) and r2 < 0.2) else "βœ… 정상"
})
except Exception:
rows.append({"λ³€μˆ˜": col, "SMC": np.nan, "νŒμ •": "계산 였λ₯˜"})
return pd.DataFrame(rows)
def auto_detect_constructs(df):
likert = [c for c in df.columns if detect_scale(df[c]) == "likert"]
groups = defaultdict(list)
for col in likert:
m = re.match(r'^([A-Za-z]+\d*)(\d)$', col)
groups[m.group(1) if m else col].append(col)
return {k: v for k, v in groups.items() if len(v) >= 2}
def _adequate_check(fit):
return (fit.get("NFI",0)>=.90 and fit.get("RFI",0)>=.90 and
fit.get("IFI",0)>=.90 and fit.get("TLI",0)>=.90 and
fit.get("CFI",0)>=.90 and fit.get("RMSEA",1)<.049)
def get_composite_scores(df, constructs):
return pd.DataFrame(
{lv: df[items].mean(axis=1) for lv, items in constructs.items()})
# ══════════════════════════════════════════════════════════════════════════════
# semopy 기반 CFA (lavaan 폴백용)
# ══════════════════════════════════════════════════════════════════════════════
def calc_mi_approx(model, obs_df, obs_vars):
try:
n = len(obs_df)
S = np.cov(obs_df[obs_vars].values.T, ddof=1)
Sig, _ = model.calc_sigma()
Sig = Sig + np.eye(len(obs_vars)) * 1e-8
Si = np.linalg.inv(Sig)
G = Si @ (S - Sig) @ Si
rows = []
p = len(obs_vars)
for i in range(p):
for j in range(i + 1, p):
denom = 2 * Si[i, i] * Si[j, j] + 2 * Si[i, j] ** 2
mi = (n - 1) * G[i, j] ** 2 / max(denom, 1e-8)
rows.append({"λ³€μˆ˜1": obs_vars[i], "λ³€μˆ˜2": obs_vars[j],
"MI": round(float(mi), 3)})
return pd.DataFrame(rows).sort_values("MI", ascending=False).reset_index(drop=True)
except Exception:
return pd.DataFrame()
def run_cfa_with_mi(df, constructs, mi_threshold=3.84, max_mods=200):
try:
from semopy import Model, calc_stats
except ImportError:
return None, "semopy νŒ¨ν‚€μ§€κ°€ ν•„μš”ν•©λ‹ˆλ‹€."
try:
item_to_lv = {item: lv for lv, items in constructs.items() for item in items}
all_items = [i for items in constructs.values() for i in items]
data = df[all_items].dropna()
meas_lines = [f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()]
meas_str = "\n".join(meas_lines)
lv_keys = list(constructs.keys())
lv_cov_lines = [f" {lv_keys[i]} ~~ {lv_keys[j]}"
for i in range(len(lv_keys))
for j in range(i + 1, len(lv_keys))]
meas_str = meas_str + ("\n" + "\n".join(lv_cov_lines) if lv_cov_lines else "")
def _fit(model_str):
try:
m = Model(model_str); m.fit(data); return m
except Exception:
return None
base_str = meas_str
for _var_syntax in [
"\n".join(f" {lv} ~~ 1*{lv}" for lv in constructs.keys()),
"\n".join(f" {lv} ~~ 1 * {lv}" for lv in constructs.keys()),
"\n".join(f" {lv} ~~ 1@{lv}" for lv in constructs.keys()),
]:
_candidate = meas_str + "\n" + _var_syntax
if _fit(_candidate) is not None:
base_str = _candidate
break
def _extract_fit(m):
try:
sd = calc_stats(m).iloc[0].to_dict()
chi2 = float(sd.get("chi2", 0))
dof = float(sd.get("DoF", 1))
chi2_bl = float(sd.get("chi2 Baseline", 0))
dof_bl = float(sd.get("DoF Baseline", 1))
pval = float(sd.get("chi2 p-value", 1))
cfi = float(sd.get("CFI", 0))
tli = float(sd.get("TLI", 0))
nfi = float(sd.get("NFI", 0))
rmsea = float(sd.get("RMSEA", 1))
rfi = ((chi2_bl/dof_bl - chi2/dof) / (chi2_bl/dof_bl)
if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0)
ifi = ((chi2_bl - chi2) / (chi2_bl - dof)
if (chi2_bl - dof) > 0 else 0.0)
return {"χ²": round(chi2,3), "df": int(dof),
"χ²/df": round(chi2/dof,3) if dof>0 else "-",
"p": round(pval,3),
"NFI": round(nfi,3), "RFI": round(rfi,3),
"IFI": round(ifi,3), "TLI": round(tli,3),
"CFI": round(cfi,3), "RMSEA": round(rmsea,3)}
except Exception:
return {}
m0 = _fit(base_str)
if m0 is None:
return None, "초기 CFA μΆ”μ • μ‹€νŒ¨"
fit0 = _extract_fit(m0)
mi_df0 = calc_mi_approx(m0, data, all_items)
extra_cov = []
added_pairs = set()
mod_log = []
m_cur, fit_cur, mi_cur = m0, fit0.copy(), mi_df0.copy()
for step in range(max_mods):
if _adequate_check(fit_cur):
break
high_mi = mi_cur[mi_cur["MI"] > mi_threshold]
if high_mi.empty:
break
avail = high_mi[
high_mi.apply(
lambda r: tuple(sorted([r["λ³€μˆ˜1"], r["λ³€μˆ˜2"]])) not in added_pairs,
axis=1)]
if avail.empty:
break
row = avail.iloc[0]
v1, v2 = row["λ³€μˆ˜1"], row["λ³€μˆ˜2"]
pair = tuple(sorted([v1, v2]))
new_cov = extra_cov + [(v1, v2)]
new_str = (base_str + "\n" +
"\n".join(f" {a} ~~ {b}" for a, b in new_cov))
m_new = _fit(new_str)
if m_new is None:
added_pairs.add(pair); continue
fit_new = _extract_fit(m_new)
lv1 = item_to_lv.get(v1, v1)
lv2 = item_to_lv.get(v2, v2)
mod_log.append({
"단계": step + 1,
"μˆ˜μ • 경둜": f"{v1} ~~ {v2}",
"μ†Œμ† μš”μΈ": lv1 if lv1 == lv2 else f"{lv1}↔{lv2}",
"MI": round(row["MI"], 3),
"CFI μ „β†’ν›„": f"{fit_cur.get('CFI','-')} β†’ {fit_new.get('CFI','-')}",
"RMSEA μ „β†’ν›„": f"{fit_cur.get('RMSEA','-')} β†’ {fit_new.get('RMSEA','-')}",
})
extra_cov = new_cov
m_cur, fit_cur = m_new, fit_new
added_pairs.add(pair)
mi_cur = calc_mi_approx(m_cur, data, all_items)
ins = m_cur.inspect(std_est=True)
std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1])
final_mi = calc_mi_approx(m_cur, data, all_items)
return {
"init_model": m0, "mod_model": m_cur,
"init_fit": fit0, "mod_fit": fit_cur,
"init_mi": mi_df0, "final_mi": final_mi,
"mod_log": pd.DataFrame(mod_log),
"extra_cov": extra_cov,
"was_modified": len(extra_cov) > 0,
"ins": ins, "std_col": std_col,
"data": data, "all_items": all_items,
"base_str": base_str,
}, None
except Exception as e:
return None, f"CFA MI 였λ₯˜: {e}"
def run_cfa_sem(df, constructs, hypotheses=None):
try:
from semopy import Model, calc_stats
except ImportError:
return None, None, None, None
try:
mm = "\n".join(f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items())
if hypotheses:
deps = defaultdict(list)
for s, t in hypotheses:
deps[t].append(s)
mm += "\n" + "\n".join(f" {t} ~ {' + '.join(ss)}" for t, ss in deps.items())
all_items = [i for items in constructs.values() for i in items]
data = df[all_items].dropna()
model = Model(mm); model.fit(data)
ins = model.inspect(std_est=True)
std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1])
stats = calc_stats(model)
try:
stats_dict = stats.iloc[0].to_dict() if hasattr(stats, "iloc") else dict(stats)
except Exception:
stats_dict = {}
try:
chi2 = float(stats_dict.get("chi2", 0))
dof = float(stats_dict.get("DoF", 1))
chi2_bl = float(stats_dict.get("chi2 Baseline", 0))
dof_bl = float(stats_dict.get("DoF Baseline", 1))
pval = float(stats_dict.get("chi2 p-value", 1))
cfi = float(stats_dict.get("CFI", 0))
tli = float(stats_dict.get("TLI", 0))
nfi = float(stats_dict.get("NFI", 0))
rmsea = float(stats_dict.get("RMSEA", 1))
rfi = ((chi2_bl/dof_bl - chi2/dof)/(chi2_bl/dof_bl)
if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0)
ifi = ((chi2_bl - chi2)/(chi2_bl - dof)
if (chi2_bl - dof) > 0 else 0.0)
fit = {"χ²": round(chi2,3), "df": int(dof),
"χ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3),
"NFI": round(nfi,3), "RFI": round(rfi,3),
"IFI": round(ifi,3), "TLI": round(tli,3),
"CFI": round(cfi,3), "RMSEA": round(rmsea,3)}
except Exception:
fit = {}
return ins, std_col, stats_dict, fit
except Exception:
return None, None, None, None
def _extract_load_df(ins, std_col, constructs):
lv_names = set(constructs.keys())
all_items = set(i for v in constructs.values() for i in v)
if (ins["op"] == "=~").any():
load_df = ins[ins["op"] == "=~"].copy()
lv_col, ind_col = "lval", "rval"
else:
load_df = ins[ins["lval"].isin(all_items) & ins["rval"].isin(lv_names)].copy()
lv_col, ind_col = "rval", "lval"
cols_needed = [c for c in [lv_col, ind_col, std_col, "Std. Err", "z-value", "p-value"]
if c in load_df.columns]
load_df = load_df[cols_needed].copy()
rename_map = {lv_col: "μž μž¬λ³€μˆ˜", ind_col: "μΈ‘μ •λ³€μˆ˜", std_col: "ν‘œμ€€ν™”κ³„μˆ˜",
"Std. Err": "SE", "z-value": "tκ°’", "p-value": "pκ°’_raw"}
load_df = load_df.rename(columns=rename_map)
if "pκ°’_raw" in load_df.columns:
load_df["pκ°’"] = load_df["pκ°’_raw"].apply(
lambda p: fmt_p(p) if str(p).strip() not in ("-", "") else "-")
load_df = load_df.drop(columns=["pκ°’_raw"])
load_df["ν‘œμ€€ν™”κ³„μˆ˜"] = pd.to_numeric(load_df["ν‘œμ€€ν™”κ³„μˆ˜"], errors="coerce").round(3)
for col in ["SE", "tκ°’"]:
if col in load_df.columns:
load_df[col] = load_df[col].apply(
lambda v: round(float(v), 3)
if pd.notna(v) and str(v).strip() not in ("None", "", "nan", "-") else "-")
return load_df.reset_index(drop=True), lv_col
# ══════════════════════════════════════════════════════════════════════════════
# lavaan (R) 기반 CFA / SEM
# ══════════════════════════════════════════════════════════════════════════════
def _get_rscript_path():
import subprocess, os, glob
try:
r = subprocess.run(['Rscript', '--version'], capture_output=True, timeout=10)
if r.returncode == 0:
return 'Rscript'
except Exception:
pass
candidates = glob.glob(r'C:\Program Files\R\R-*\bin\Rscript.exe')
candidates += glob.glob(r'C:\Program Files (x86)\R\R-*\bin\Rscript.exe')
if candidates:
candidates.sort(reverse=True)
return candidates[0]
return None
def _rscript_available():
return _get_rscript_path() is not None
def _parse_fit_csv(path):
df = pd.read_csv(path)
if df.empty:
return {}
row = df.iloc[0]
chi2 = float(row.get('chisq', 0))
dof = float(row.get('df', 1))
return {
'χ²': round(chi2, 3),
'df': int(dof),
'χ²/df': round(chi2 / dof, 3) if dof > 0 else '-',
'p': round(float(row.get('pvalue', 1)), 6),
'NFI': round(float(row.get('nfi', 0)), 3),
'RFI': round(float(row.get('rfi', 0)), 3),
'IFI': round(float(row.get('ifi', 0)), 3),
'TLI': round(float(row.get('tli', 0)), 3),
'CFI': round(float(row.get('cfi', 0)), 3),
'RMSEA': round(float(row.get('rmsea', 1)), 3),
}
def _fmt_mi_df(df):
if df is None or df.empty:
return pd.DataFrame(columns=['λ³€μˆ˜1', 'λ³€μˆ˜2', 'MI'])
df = df.copy()
df.columns = ['λ³€μˆ˜1', 'λ³€μˆ˜2', 'MI']
df['MI'] = pd.to_numeric(df['MI'], errors='coerce').round(3)
return df.sort_values('MI', ascending=False).reset_index(drop=True)
def run_lavaan_cfa_with_mi(df, constructs, mi_threshold=3.84, max_mods=200,
manual_extra_cov=None):
import subprocess, tempfile, os
if not _rscript_available():
return None, "Rscriptλ₯Ό 찾을 수 μ—†μŒ. R μ„€μΉ˜ ν›„ PATH 등둝 ν•„μš”."
all_items = [i for items in constructs.values() for i in items]
item_to_lv = {item: lv for lv, items in constructs.items() for item in items}
lv_names = list(constructs.keys())
data = df[all_items].dropna()
meas_lines = [f"{lv} =~ {' + '.join(items)}" for lv, items in constructs.items()]
manual_pairs = list(manual_extra_cov) if manual_extra_cov else []
extra_lines = [f"{v1} ~~ {v2}" for v1, v2 in manual_pairs]
model_base = "\n".join(meas_lines + extra_lines)
with tempfile.TemporaryDirectory() as tmpdir:
def p(name):
return os.path.join(tmpdir, name).replace('\\', '/')
data.to_csv(p('data.csv'), index=False)
with open(p('model.txt'), 'w', encoding='utf-8') as f:
f.write(model_base)
with open(p('lvnames.txt'), 'w', encoding='utf-8') as f:
f.write('\n'.join(lv_names))
r_script = f'''
suppressPackageStartupMessages(library(lavaan))
options(warn=-1)
out <- "{p("")}"
data <- read.csv(file.path(out,"data.csv"))
mdl_base <- paste(readLines(file.path(out,"model.txt"), encoding="UTF-8"), collapse="\\n")
lv_names <- readLines(file.path(out,"lvnames.txt"), encoding="UTF-8")
mi_thr <- {mi_threshold}
max_mods <- {max_mods}
get_fit <- function(fit) {{
fm <- fitMeasures(fit, c("chisq","df","pvalue","cfi","tli","nfi","rfi","ifi","rmsea"))
data.frame(as.list(fm))
}}
is_ok <- function(fit) {{
fm <- fitMeasures(fit, c("cfi","rmsea"))
unname(fm["cfi"]) >= 0.95 && unname(fm["rmsea"]) < 0.049
}}
get_mi_cov <- function(fit) {{
m <- tryCatch(modindices(fit, sort.=TRUE, maximum.number=500), error=function(e) NULL)
if (is.null(m)) return(data.frame(lhs=character(),rhs=character(),mi=numeric()))
m <- m[m$op=="~~", c("lhs","rhs","mi")]
m[!(m$lhs %in% lv_names & m$rhs %in% lv_names), ]
}}
fit0 <- tryCatch(cfa(mdl_base, data=data, estimator="ML"), error=function(e) NULL)
if (is.null(fit0)) {{
writeLines("ERROR: CFA failed", file.path(out,"status.txt")); quit(status=1)
}}
write.csv(get_fit(fit0), file.path(out,"fit_init.csv"), row.names=FALSE)
mi0 <- get_mi_cov(fit0)
write.csv(mi0, file.path(out,"mi_init.csv"), row.names=FALSE)
added <- character(0); extra <- character(0); log_rows <- list()
fit_cur <- fit0; mdl_cur <- mdl_base
for (step in seq_len(max_mods)) {{
if (is_ok(fit_cur)) break
mi_c <- get_mi_cov(fit_cur)
mi_c <- mi_c[mi_c$mi > mi_thr, ]
if (nrow(mi_c)==0) break
ok <- sapply(seq_len(nrow(mi_c)), function(i) {{
pk <- paste(sort(c(mi_c$lhs[i], mi_c$rhs[i])), collapse="~~")
!(pk %in% added)
}})
mi_c <- mi_c[ok, ]
if (nrow(mi_c)==0) break
v1 <- mi_c$lhs[1]; v2 <- mi_c$rhs[1]; mv <- mi_c$mi[1]
pk <- paste(sort(c(v1,v2)), collapse="~~")
mdl_new <- paste0(mdl_cur,"\\n ",v1," ~~ ",v2)
fit_new <- tryCatch(cfa(mdl_new, data=data, estimator="ML"), error=function(e) NULL)
added <- c(added, pk)
if (is.null(fit_new)) next
fm_b <- fitMeasures(fit_cur, c("cfi","rmsea","rfi","ifi"))
fm_a <- fitMeasures(fit_new, c("cfi","rmsea","rfi","ifi"))
log_rows[[length(log_rows)+1]] <- data.frame(
step=step, v1=v1, v2=v2, mi=round(mv,3),
cfi_b=round(unname(fm_b["cfi"]),3), cfi_a=round(unname(fm_a["cfi"]),3),
rmsea_b=round(unname(fm_b["rmsea"]),3),rmsea_a=round(unname(fm_a["rmsea"]),3),
rfi_b=round(unname(fm_b["rfi"]),3), rfi_a=round(unname(fm_a["rfi"]),3),
ifi_b=round(unname(fm_b["ifi"]),3), ifi_a=round(unname(fm_a["ifi"]),3),
stringsAsFactors=FALSE)
extra <- c(extra, paste(v1, v2, sep=","))
fit_cur <- fit_new; mdl_cur <- mdl_new
}}
write.csv(get_fit(fit_cur), file.path(out,"fit_mod.csv"), row.names=FALSE)
write.csv(as.data.frame(standardizedSolution(fit_cur)), file.path(out,"std_sol.csv"), row.names=FALSE)
write.csv(get_mi_cov(fit_cur), file.path(out,"mi_final.csv"), row.names=FALSE)
if (length(extra)>0) writeLines(extra, file.path(out,"extra_cov.txt"))
if (length(log_rows)>0) {{
write.csv(do.call(rbind, log_rows), file.path(out,"mod_log.csv"), row.names=FALSE)
}}
writeLines("SUCCESS", file.path(out,"status.txt"))
'''
with open(p('cfa.R'), 'w', encoding='utf-8') as f:
f.write(r_script)
rscript = _get_rscript_path()
try:
res = subprocess.run([rscript, '--vanilla', p('cfa.R')],
capture_output=True, text=True, timeout=300)
except subprocess.TimeoutExpired:
return None, "R μ‹€ν–‰ μ‹œκ°„ 초과 (5λΆ„)"
except Exception as ex:
return None, f"Rscript μ‹€ν–‰ 였λ₯˜: {ex}"
if not os.path.exists(os.path.join(tmpdir, 'status.txt')):
err = res.stderr[-2000:] if res.stderr else "μ•Œ 수 μ—†λŠ” 였λ₯˜"
return None, f"R 였λ₯˜:\n{err}"
try:
fit0 = _parse_fit_csv(os.path.join(tmpdir, 'fit_init.csv'))
fit_mod = _parse_fit_csv(os.path.join(tmpdir, 'fit_mod.csv'))
std_sol = pd.read_csv(os.path.join(tmpdir, 'std_sol.csv'))
mi_init = _fmt_mi_df(pd.read_csv(os.path.join(tmpdir, 'mi_init.csv')))
mf_path = os.path.join(tmpdir, 'mi_final.csv')
mi_final = _fmt_mi_df(pd.read_csv(mf_path)) if os.path.exists(mf_path) else pd.DataFrame()
except Exception as ex:
return None, f"κ²°κ³Ό νŒŒμ‹± 였λ₯˜: {ex}"
extra_cov = list(manual_pairs)
ec_path = os.path.join(tmpdir, 'extra_cov.txt')
if os.path.exists(ec_path):
with open(ec_path, encoding='utf-8') as f:
for line in f:
parts = line.strip().split(',')
if len(parts) == 2:
t = tuple(parts)
if t not in extra_cov:
extra_cov.append(t)
mod_log_records = []
ml_path = os.path.join(tmpdir, 'mod_log.csv')
if os.path.exists(ml_path):
log_df = pd.read_csv(ml_path)
for _, row in log_df.iterrows():
lv1 = item_to_lv.get(str(row['v1']), str(row['v1']))
lv2 = item_to_lv.get(str(row['v2']), str(row['v2']))
mod_log_records.append({
'단계': int(row['step']),
'μˆ˜μ • 경둜': f"{row['v1']} ~~ {row['v2']}",
'μ†Œμ† μš”μΈ': lv1 if lv1 == lv2 else f'{lv1}↔{lv2}',
'MI': row['mi'],
'CFI μ „β†’ν›„': f"{row['cfi_b']} β†’ {row['cfi_a']}",
'RMSEA μ „β†’ν›„': f"{row['rmsea_b']} β†’ {row['rmsea_a']}",
'RFI μ „β†’ν›„': f"{row.get('rfi_b', '-')} β†’ {row.get('rfi_a', '-')}",
'IFI μ „β†’ν›„': f"{row.get('ifi_b', '-')} β†’ {row.get('ifi_a', '-')}",
})
return {
'init_fit': fit0,
'mod_fit': fit_mod,
'init_mi': mi_init,
'final_mi': mi_final,
'mod_log': pd.DataFrame(mod_log_records),
'extra_cov': extra_cov,
'was_modified': len(extra_cov) > 0,
'std_sol': std_sol,
'data': data,
'all_items': all_items,
'lv_names': lv_names,
}, None
def _extract_load_df_lavaan(std_sol, constructs):
try:
loads = std_sol[std_sol['op'] == '=~'].copy()
cols = [c for c in ['lhs', 'rhs', 'est.std', 'se', 'z', 'pvalue'] if c in loads.columns]
loads = loads[cols].copy()
rename = {'lhs': 'μž μž¬λ³€μˆ˜', 'rhs': 'μΈ‘μ •λ³€μˆ˜',
'est.std': 'ν‘œμ€€ν™”κ³„μˆ˜', 'se': 'SE', 'z': 'tκ°’', 'pvalue': 'pκ°’_raw'}
loads = loads.rename(columns=rename)
loads['ν‘œμ€€ν™”κ³„μˆ˜'] = pd.to_numeric(loads['ν‘œμ€€ν™”κ³„μˆ˜'], errors='coerce').round(3)
loads['SE'] = pd.to_numeric(loads['SE'], errors='coerce').round(3)
loads['tκ°’'] = pd.to_numeric(loads['tκ°’'], errors='coerce').round(3)
if 'pκ°’_raw' in loads.columns:
loads['pκ°’'] = loads['pκ°’_raw'].apply(
lambda p: fmt_p(float(p))
if pd.notna(p) and str(p).strip() not in ('NA', '', 'nan') else '-')
loads = loads.drop(columns=['pκ°’_raw'])
return loads.reset_index(drop=True)
except Exception:
return pd.DataFrame()
def build_cfa_tables(df, constructs, mi_threshold=3.84, max_mods=200,
manual_extra_cov=None):
result, err = run_lavaan_cfa_with_mi(df, constructs, mi_threshold, max_mods,
manual_extra_cov=manual_extra_cov)
use_lavaan = (result is not None)
if not use_lavaan:
st.warning(f"⚠️ lavaan μ‹€νŒ¨ β†’ semopy μ‚¬μš©\n\n원인: {err}")
result, err = run_cfa_with_mi(df, constructs, mi_threshold, max_mods)
if result is None:
st.error(err or "CFA μ‹€ν–‰ μ‹€νŒ¨")
return None, None, None, None, None, None, None, False, []
try:
if use_lavaan:
std_sol = result["std_sol"]
load_df = _extract_load_df_lavaan(std_sol, constructs)
rel_rows = []
for lv, items in constructs.items():
lv_loads = std_sol[(std_sol['op'] == '=~') & (std_sol['lhs'] == lv)]
lam = pd.to_numeric(lv_loads['est.std'], errors='coerce').dropna().values
ave, cr = calc_ave_cr(lam)
alpha = cronbach_alpha(df[items])
rel_rows.append({"μž μž¬λ³€μˆ˜": lv, "AVE": ave, "CR": cr, "Cronbach Ξ±": alpha})
else:
ins = result["ins"]
std_col = result["std_col"]
load_df, lv_col = _extract_load_df(ins, std_col, constructs)
rel_rows = []
for lv, items in constructs.items():
raw = ins[ins[lv_col] == lv][std_col].values
lam = np.array([v for v in raw
if str(v).strip() not in ("-","","nan","None")], dtype=float)
ave, cr = calc_ave_cr(lam)
alpha = cronbach_alpha(df[items])
rel_rows.append({"μž μž¬λ³€μˆ˜": lv, "AVE": ave, "CR": cr, "Cronbach Ξ±": alpha})
return (load_df, pd.DataFrame(rel_rows),
result["init_fit"], result["mod_fit"],
result["init_mi"], result.get("final_mi", pd.DataFrame()),
result["mod_log"],
result["was_modified"], result["extra_cov"])
except Exception as e:
st.error(f"CFA κ²°κ³Ό 처리 였λ₯˜: {e}")
return None, None, None, None, None, None, None, False, []
def build_lavaan_sem_table(df, constructs, hypotheses,
cfa_extra_cov=None, mi_threshold=3.84, max_mods=200):
import subprocess, tempfile, os
if not _rscript_available():
return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods)
all_items = [i for items in constructs.values() for i in items]
lv_names = list(constructs.keys())
lv_set = set(lv_names)
item_to_lv = {item: lv for lv, items in constructs.items() for item in items}
data = df[all_items].dropna()
cfa_cov_pairs = (cfa_extra_cov
if isinstance(cfa_extra_cov, list) and
(not cfa_extra_cov or isinstance(cfa_extra_cov[0], tuple))
else [])
deps = defaultdict(list)
for s, t in hypotheses:
deps[t].append(s)
with tempfile.TemporaryDirectory() as tmpdir:
def p(name):
return os.path.join(tmpdir, name).replace('\\', '/')
data.to_csv(p('data.csv'), index=False)
with open(p('meas.txt'), 'w', encoding='utf-8') as f:
f.write('\n'.join(f"{lv} =~ {' + '.join(items)}" for lv, items in constructs.items()))
with open(p('struct.txt'), 'w', encoding='utf-8') as f:
f.write('\n'.join(f"{t} ~ {' + '.join(ss)}" for t, ss in deps.items()))
with open(p('lvnames.txt'), 'w', encoding='utf-8') as f:
f.write('\n'.join(lv_names))
with open(p('cfa_cov.txt'), 'w', encoding='utf-8') as f:
for v1, v2 in cfa_cov_pairs:
f.write(f'{v1},{v2}\n')
r_script = f'''
suppressPackageStartupMessages(library(lavaan))
options(warn=-1)
out <- "{p("")}"
data <- read.csv(file.path(out,"data.csv"))
meas_lines <- readLines(file.path(out,"meas.txt"), encoding="UTF-8")
struct_lines <- readLines(file.path(out,"struct.txt"), encoding="UTF-8")
lv_names <- readLines(file.path(out,"lvnames.txt"),encoding="UTF-8")
cfa_cov_raw <- tryCatch(readLines(file.path(out,"cfa_cov.txt"),encoding="UTF-8"),
error=function(e) character(0))
mi_thr <- {mi_threshold}
max_mods <- {max_mods}
build_model <- function(extra_str=character(0)) {{
cov_lines <- character(0)
all_covs <- c(cfa_cov_raw, extra_str)
for (cv in all_covs) {{
pts <- strsplit(trimws(cv),",")[[1]]
if (length(pts)==2) {{
ln <- paste0(" ",pts[1]," ~~ ",pts[2])
if (!(ln %in% cov_lines)) cov_lines <- c(cov_lines, ln)
}}
}}
paste(c(paste0(" ",meas_lines), cov_lines, paste0(" ",struct_lines)), collapse="\\n")
}}
get_fit <- function(fit) {{
fm <- fitMeasures(fit, c("chisq","df","pvalue","cfi","tli","nfi","rfi","ifi","rmsea"))
data.frame(as.list(fm))
}}
is_ok <- function(fit) {{
fm <- fitMeasures(fit, c("cfi","rmsea"))
unname(fm["cfi"]) >= 0.95 && unname(fm["rmsea"]) < 0.049
}}
get_mi_cov <- function(fit) {{
m <- tryCatch(modindices(fit,sort.=TRUE,maximum.number=500),error=function(e) NULL)
if (is.null(m)) return(data.frame(lhs=character(),rhs=character(),mi=numeric()))
m <- m[m$op=="~~", c("lhs","rhs","mi")]
m[!(m$lhs %in% lv_names & m$rhs %in% lv_names), ]
}}
fit0 <- tryCatch(sem(build_model(), data=data, estimator="ML"), error=function(e) NULL)
if (is.null(fit0)) {{
writeLines("ERROR: SEM initial fit failed", file.path(out,"status.txt")); quit(status=1)
}}
write.csv(get_fit(fit0), file.path(out,"fit_init.csv"), row.names=FALSE)
mi0 <- get_mi_cov(fit0)
write.csv(mi0, file.path(out,"mi_init.csv"), row.names=FALSE)
added <- character(0); extra <- character(0); log_rows <- list()
fit_cur <- fit0
for (step in seq_len(max_mods)) {{
if (is_ok(fit_cur)) break
mi_c <- get_mi_cov(fit_cur)
mi_c <- mi_c[mi_c$mi > mi_thr, ]
if (nrow(mi_c)==0) break
ok <- sapply(seq_len(nrow(mi_c)), function(i) {{
pk <- paste(sort(c(mi_c$lhs[i],mi_c$rhs[i])),collapse="~~")
!(pk %in% added)
}})
mi_c <- mi_c[ok,]
if (nrow(mi_c)==0) break
v1<-mi_c$lhs[1]; v2<-mi_c$rhs[1]; mv<-mi_c$mi[1]
pk <- paste(sort(c(v1,v2)),collapse="~~")
extra_new <- c(extra, paste(v1,v2,sep=","))
fit_new <- tryCatch(sem(build_model(extra_new),data=data,estimator="ML"),error=function(e) NULL)
added <- c(added, pk)
if (is.null(fit_new)) next
fm_b <- fitMeasures(fit_cur,c("cfi","rmsea","rfi","ifi"))
fm_a <- fitMeasures(fit_new,c("cfi","rmsea","rfi","ifi"))
log_rows[[length(log_rows)+1]] <- data.frame(
step=step, v1=v1, v2=v2, mi=round(mv,3),
cfi_b=round(unname(fm_b["cfi"]),3), cfi_a=round(unname(fm_a["cfi"]),3),
rmsea_b=round(unname(fm_b["rmsea"]),3),rmsea_a=round(unname(fm_a["rmsea"]),3),
rfi_b=round(unname(fm_b["rfi"]),3), rfi_a=round(unname(fm_a["rfi"]),3),
stringsAsFactors=FALSE)
extra <- extra_new
fit_cur <- fit_new
}}
write.csv(get_fit(fit_cur), file.path(out,"fit_mod.csv"), row.names=FALSE)
write.csv(as.data.frame(standardizedSolution(fit_cur)), file.path(out,"std_sol.csv"), row.names=FALSE)
write.csv(get_mi_cov(fit_cur), file.path(out,"mi_final.csv"), row.names=FALSE)
if (length(extra)>0) writeLines(extra, file.path(out,"extra_cov.txt"))
if (length(log_rows)>0) write.csv(do.call(rbind,log_rows), file.path(out,"mod_log.csv"), row.names=FALSE)
writeLines("SUCCESS", file.path(out,"status.txt"))
'''
with open(p('sem.R'), 'w', encoding='utf-8') as f:
f.write(r_script)
rscript = _get_rscript_path()
try:
res = subprocess.run([rscript, '--vanilla', p('sem.R')],
capture_output=True, text=True, timeout=300)
except Exception:
return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods)
if not os.path.exists(os.path.join(tmpdir, 'status.txt')):
return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods)
try:
fit0 = _parse_fit_csv(os.path.join(tmpdir, 'fit_init.csv'))
fit_mod = _parse_fit_csv(os.path.join(tmpdir, 'fit_mod.csv'))
std_sol = pd.read_csv(os.path.join(tmpdir, 'std_sol.csv'))
mi_init = _fmt_mi_df(pd.read_csv(os.path.join(tmpdir, 'mi_init.csv')))
mf_path = os.path.join(tmpdir, 'mi_final.csv')
mi_final = _fmt_mi_df(pd.read_csv(mf_path)) if os.path.exists(mf_path) else pd.DataFrame()
except Exception:
return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods)
extra_cov = list(cfa_cov_pairs)
ec_path = os.path.join(tmpdir, 'extra_cov.txt')
if os.path.exists(ec_path):
with open(ec_path, encoding='utf-8') as f:
for line in f:
parts = line.strip().split(',')
if len(parts) == 2:
t = tuple(parts)
if t not in extra_cov:
extra_cov.append(t)
mod_log_records = []
ml_path = os.path.join(tmpdir, 'mod_log.csv')
if os.path.exists(ml_path):
log_df = pd.read_csv(ml_path)
for _, row in log_df.iterrows():
lv1 = item_to_lv.get(str(row['v1']), str(row['v1']))
lv2 = item_to_lv.get(str(row['v2']), str(row['v2']))
mod_log_records.append({
'단계': int(row['step']),
'μˆ˜μ • 경둜': f"{row['v1']} ~~ {row['v2']}",
'μ†Œμ† μš”μΈ': lv1 if lv1 == lv2 else f'{lv1}↔{lv2}',
'MI': row['mi'],
'CFI μ „β†’ν›„': f"{row['cfi_b']} β†’ {row['cfi_a']}",
'RMSEA μ „β†’ν›„': f"{row['rmsea_b']} β†’ {row['rmsea_a']}",
})
def _sp(v):
try:
return float(v) if str(v).strip() not in ('NA', '', 'nan') else np.nan
except Exception:
return np.nan
struct = std_sol[(std_sol['op'] == '~') &
std_sol['lhs'].isin(lv_set) &
std_sol['rhs'].isin(lv_set)].copy()
hyp_map = {(s, t): f"H{i+1}" for i, (s, t) in enumerate(hypotheses)}
cols_need = [c for c in ['lhs','rhs','est.std','se','z','pvalue'] if c in struct.columns]
struct = struct[cols_need].copy()
struct.columns = ['μ’…μ†λ³€μˆ˜','λ…λ¦½λ³€μˆ˜','ν‘œμ€€ν™”Ξ²','SE','tκ°’','pκ°’_raw'][:len(cols_need)]
struct['κ°€μ„€'] = struct.apply(lambda r: hyp_map.get((r['λ…λ¦½λ³€μˆ˜'], r['μ’…μ†λ³€μˆ˜']), '-'), axis=1)
struct['경둜'] = struct['λ…λ¦½λ³€μˆ˜'] + ' β†’ ' + struct['μ’…μ†λ³€μˆ˜']
struct['μœ μ˜μ„±'] = struct['pκ°’_raw'].apply(lambda v: sig_stars(_sp(v)))
struct['채택여뢀'] = struct['pκ°’_raw'].apply(
lambda v: ('채택' if _sp(v) < 0.05 else '기각') if not np.isnan(_sp(v)) else '-')
struct['pκ°’'] = struct['pκ°’_raw'].apply(
lambda v: fmt_p(_sp(v)) if not np.isnan(_sp(v)) else '-')
for col in ['ν‘œμ€€ν™”Ξ²', 'SE', 'tκ°’']:
struct[col] = pd.to_numeric(struct[col], errors='coerce').round(3)
struct = struct.drop(columns=['pκ°’_raw'])
out_cols = ['κ°€μ„€','경둜','ν‘œμ€€ν™”Ξ²','SE','tκ°’','pκ°’','μœ μ˜μ„±','채택여뢀']
path_df = struct[[c for c in out_cols if c in struct.columns]].reset_index(drop=True)
was_mod = len(mod_log_records) > 0
return (path_df, fit0, fit_mod,
pd.DataFrame(mod_log_records), extra_cov, was_mod,
mi_init, mi_final)
def _build_sem_semopy(df, constructs, hypotheses,
cfa_extra_cov=None, mi_threshold=3.84, max_mods=200):
"""semopy 기반 SEM (lavaan 폴백)"""
try:
from semopy import Model, calc_stats
except ImportError:
st.error("semopy νŒ¨ν‚€μ§€κ°€ ν•„μš”ν•©λ‹ˆλ‹€.")
return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame()
try:
item_to_lv = {item: lv for lv, items in constructs.items() for item in items}
all_items = [i for items in constructs.values() for i in items]
lv_names = set(constructs.keys())
data = df[all_items].dropna()
cfa_cov_pairs = (cfa_extra_cov
if isinstance(cfa_extra_cov, list) and
(not cfa_extra_cov or isinstance(cfa_extra_cov[0], tuple))
else [])
meas_lines = [f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()]
meas_str = "\n".join(meas_lines)
lv_keys = list(constructs.keys())
lv_cov_lines = [f" {lv_keys[i]} ~~ {lv_keys[j]}"
for i in range(len(lv_keys)) for j in range(i+1, len(lv_keys))]
meas_str = meas_str + ("\n" + "\n".join(lv_cov_lines) if lv_cov_lines else "")
var_str = "\n".join(f" {lv} ~~ 1*{lv}" for lv in constructs.keys())
deps = defaultdict(list)
for s, t in hypotheses:
deps[t].append(s)
struct_str = "\n".join(f" {t} ~ {' + '.join(ss)}" for t, ss in deps.items())
cov_part = ("\n" + "\n".join(f" {a} ~~ {b}" for a, b in cfa_cov_pairs)
if cfa_cov_pairs else "")
def _fit_model(model_str):
try:
m = Model(model_str); m.fit(data); return m
except Exception:
return None
def _extract_fit(m):
try:
sd = calc_stats(m).iloc[0].to_dict()
chi2 = float(sd.get("chi2", 0))
dof = float(sd.get("DoF", 1))
chi2_bl = float(sd.get("chi2 Baseline", 0))
dof_bl = float(sd.get("DoF Baseline", 1))
pval = float(sd.get("chi2 p-value", 1))
cfi = float(sd.get("CFI", 0))
tli = float(sd.get("TLI", 0))
nfi = float(sd.get("NFI", 0))
rmsea = float(sd.get("RMSEA", 1))
rfi = ((chi2_bl/dof_bl - chi2/dof)/(chi2_bl/dof_bl)
if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0)
ifi = ((chi2_bl - chi2)/(chi2_bl - dof)
if (chi2_bl - dof) > 0 else 0.0)
return {"χ²": round(chi2,3), "df": int(dof),
"χ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3),
"NFI": round(nfi,3), "RFI": round(rfi,3),
"IFI": round(ifi,3), "TLI": round(tli,3),
"CFI": round(cfi,3), "RMSEA": round(rmsea,3)}
except Exception:
return {}
def _extract_paths(m):
ins = m.inspect(std_est=True)
std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1])
struct = ins[ins["lval"].isin(lv_names) & ins["rval"].isin(lv_names)].copy()
cols = [c for c in ["lval","rval",std_col,"Std. Err","z-value","p-value"]
if c in struct.columns]
struct = struct[cols].copy()
rm = {"lval":"μ’…μ†λ³€μˆ˜","rval":"λ…λ¦½λ³€μˆ˜", std_col:"ν‘œμ€€ν™”Ξ²",
"Std. Err":"SE", "z-value":"tκ°’", "p-value":"pκ°’_raw"}
struct = struct.rename(columns=rm)
hyp_map = {(s,t): f"H{i+1}" for i,(s,t) in enumerate(hypotheses)}
struct["κ°€μ„€"] = struct.apply(
lambda r: hyp_map.get((r["λ…λ¦½λ³€μˆ˜"],r["μ’…μ†λ³€μˆ˜"]),"-"), axis=1)
struct["경둜"] = struct["λ…λ¦½λ³€μˆ˜"] + " β†’ " + struct["μ’…μ†λ³€μˆ˜"]
def _sp(p):
try: return float(p) if str(p).strip() not in("-","","nan") else np.nan
except: return np.nan
struct["μœ μ˜μ„±"] = struct["pκ°’_raw"].apply(lambda p: sig_stars(_sp(p)))
struct["채택여뢀"] = struct["pκ°’_raw"].apply(
lambda p: ("채택" if _sp(p)<0.05 else "기각") if not np.isnan(_sp(p)) else "-")
struct["pκ°’"] = struct["pκ°’_raw"].apply(
lambda p: fmt_p(_sp(p)) if not np.isnan(_sp(p)) else "-")
for col in [c for c in ["ν‘œμ€€ν™”Ξ²","SE","tκ°’"] if c in struct.columns]:
struct[col] = pd.to_numeric(struct[col], errors="coerce").round(3)
out = ["κ°€μ„€","경둜","ν‘œμ€€ν™”Ξ²","SE","tκ°’","pκ°’","μœ μ˜μ„±","채택여뢀"]
return struct[[c for c in out if c in struct.columns]].reset_index(drop=True)
base_str = meas_str + "\n" + var_str + cov_part + "\n" + struct_str
base_no_var = meas_str + cov_part + "\n" + struct_str
m0 = _fit_model(base_str) or _fit_model(base_no_var)
if m0 is None:
return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame()
fit0 = _extract_fit(m0)
path0 = _extract_paths(m0)
extra_cov = list(cfa_cov_pairs)
added_pairs = {tuple(sorted(p)) for p in cfa_cov_pairs}
mod_log = []
m_cur, fit_cur = m0, fit0.copy()
for step in range(max_mods):
if _adequate_check(fit_cur): break
mi_cur = calc_mi_approx(m_cur, data, all_items)
avail = mi_cur[(mi_cur["MI"] > mi_threshold) &
(~mi_cur.apply(
lambda r: tuple(sorted([r["λ³€μˆ˜1"], r["λ³€μˆ˜2"]])) in added_pairs,
axis=1))]
if avail.empty: break
row = avail.iloc[0]
v1, v2 = row["λ³€μˆ˜1"], row["λ³€μˆ˜2"]
pair = tuple(sorted([v1, v2]))
new_cov = extra_cov + [(v1, v2)]
new_cov_str = "\n".join(f" {a} ~~ {b}" for a, b in new_cov)
new_str = (meas_str + "\n" + var_str + "\n" + new_cov_str + "\n" + struct_str
if "\n" + var_str in base_str
else meas_str + "\n" + new_cov_str + "\n" + struct_str)
m_new = _fit_model(new_str)
if m_new is None:
added_pairs.add(pair); continue
fit_new = _extract_fit(m_new)
lv1 = item_to_lv.get(v1, v1); lv2 = item_to_lv.get(v2, v2)
mod_log.append({
"단계": step+1,
"μˆ˜μ • 경둜": f"{v1} ~~ {v2}",
"μ†Œμ† μš”μΈ": lv1 if lv1==lv2 else f"{lv1}↔{lv2}",
"MI": round(row["MI"],3),
"CFI μ „β†’ν›„": f"{fit_cur.get('CFI','-')}β†’{fit_new.get('CFI','-')}",
"RMSEA μ „β†’ν›„": f"{fit_cur.get('RMSEA','-')}β†’{fit_new.get('RMSEA','-')}",
})
extra_cov = new_cov; m_cur = m_new; fit_cur = fit_new
added_pairs.add(pair)
path_final = _extract_paths(m_cur)
was_mod = len(mod_log) > 0
init_mi_sem = calc_mi_approx(m0, data, all_items)
final_mi_sem = calc_mi_approx(m_cur, data, all_items)
return (path_final, fit0, fit_cur,
pd.DataFrame(mod_log), extra_cov, was_mod,
init_mi_sem, final_mi_sem)
except Exception as e:
st.error(f"SEM 였λ₯˜: {e}")
return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame()
def run_correlation(df, constructs):
try:
comp = {lv: df[items].mean(axis=1) for lv, items in constructs.items()}
corr = pd.DataFrame(comp).corr().round(3)
sqrt_ave = {}
try:
ins, std_col, _, _ = run_cfa_sem(df, constructs)
if ins is not None:
lv_names = set(constructs.keys())
all_items = set(i for v in constructs.values() for i in v)
use_rval = not (ins["op"] == "=~").any()
lv_col = "rval" if use_rval else "lval"
for lv, items in constructs.items():
raw = ins[ins[lv_col] == lv][std_col].values
lam = np.array([v for v in raw
if str(v).strip() not in ("-","","nan","None")], dtype=float)
ave, _ = calc_ave_cr(lam)
sqrt_ave[lv] = round(float(np.sqrt(ave)), 3)
except Exception:
pass
tbl = corr.copy().astype(object)
for lv in constructs:
if lv in tbl.columns:
tbl.loc[lv, lv] = sqrt_ave.get(lv, "-")
for i in range(len(tbl.columns)):
for j in range(i + 1, len(tbl.columns)):
tbl.iloc[i, j] = ""
return tbl.reset_index().rename(columns={"index": "λ³€μˆ˜"})
except Exception as e:
st.error(f"상관관계 뢄석 였λ₯˜: {e}")
return pd.DataFrame()
# ══════════════════════════════════════════════════════════════════════════════
# Excel 생성
# ══════════════════════════════════════════════════════════════════════════════
def build_single_sheet_excel(sheets: dict) -> bytes:
try:
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
import io as _io
wb = Workbook()
ws = wb.active
ws.title = "뢄석결과"
_hdr_font = Font(name="Arial", bold=True, color="FFFFFF", size=10)
_hdr_fill = PatternFill("solid", fgColor="2F5496")
_ttl_font = Font(name="Arial", bold=True, size=12, color="2F5496")
_note_font = Font(name="Arial", size=9, color="595959", italic=True)
_ctr = Alignment(horizontal="center", vertical="center", wrap_text=True)
_lft = Alignment(horizontal="left", vertical="center")
_bd = Border(left=Side(style="thin"), right=Side(style="thin"),
top=Side(style="thin"), bottom=Side(style="thin"))
_adopt_fill_y = PatternFill("solid", fgColor="E2EFDA")
_adopt_fill_n = PatternFill("solid", fgColor="FFC7CE")
_adopt_font_y = Font(name="Arial", size=10, color="375623")
_adopt_font_n = Font(name="Arial", size=10, color="9C0006")
cur_row = 1
for name, payload in sheets.items():
try:
title = payload[0]
df_data = payload[1]
note = payload[2] if len(payload) > 2 else ""
adopt_col = payload[3] if len(payload) > 3 else None
if df_data is None or len(df_data) == 0:
continue
cols = list(df_data.columns)
nc = max(len(cols), 1)
adopt_idx = (cols.index(adopt_col) + 1
if adopt_col and adopt_col in cols else None)
ws.merge_cells(start_row=cur_row, start_column=1,
end_row=cur_row, end_column=nc)
cell = ws.cell(row=cur_row, column=1, value=title)
cell.font = _ttl_font
cur_row += 1
for ci, col in enumerate(cols, 1):
c = ws.cell(row=cur_row, column=ci, value=str(col))
c.font = _hdr_font; c.fill = _hdr_fill
c.alignment = _ctr; c.border = _bd
cur_row += 1
for row in df_data.itertuples(index=False):
for ci, val in enumerate(row, 1):
safe_val = ("" if isinstance(val, float) and (val != val) else val)
cell = ws.cell(row=cur_row, column=ci, value=safe_val)
cell.alignment = _lft if ci <= 2 else _ctr
cell.border = _bd
if adopt_idx and ci == adopt_idx:
is_y = str(val) == "채택"
cell.fill = _adopt_fill_y if is_y else _adopt_fill_n
cell.font = _adopt_font_y if is_y else _adopt_font_n
cur_row += 1
if note:
nc_cell = ws.cell(row=cur_row, column=1, value=note)
nc_cell.font = _note_font
cur_row += 1
cur_row += 2
except Exception:
pass
for col_cells in ws.columns:
try:
w = max((len(str(c.value)) if c.value else 0) for c in col_cells)
ws.column_dimensions[col_cells[0].column_letter].width = min(w + 4, 45)
except Exception:
pass
buf = _io.BytesIO()
wb.save(buf)
return buf.getvalue()
except Exception:
from openpyxl import Workbook
import io as _io
wb = Workbook(); buf = _io.BytesIO(); wb.save(buf); return buf.getvalue()
# ══════════════════════════════════════════════════════════════════════════════
# UI
# ══════════════════════════════════════════════════════════════════════════════
st.markdown('<p class="main-title">πŸ“Š SEM 뢄석기</p>', unsafe_allow_html=True)
st.markdown('<p class="sub-title">λΉˆλ„λΆ„μ„ Β· κΈ°μˆ ν†΅κ³„ Β· 신뒰도 Β· CFA Β· 상관관계 Β· SEM</p>',
unsafe_allow_html=True)
# ── STEP 1: 파일 μ—…λ‘œλ“œ ───────────────────────────────────────────────────────
st.markdown("### πŸ“ STEP 1. 데이터 μ—…λ‘œλ“œ")
uploaded = st.file_uploader("Excel(.xlsx/.xls) λ˜λŠ” CSV 파일", type=["xlsx","xls","csv"])
if not uploaded:
st.info("νŒŒμΌμ„ μ—…λ‘œλ“œν•˜λ©΄ 뢄석이 μ‹œμž‘λ©λ‹ˆλ‹€.")
st.stop()
try:
if uploaded.name.lower().endswith(".csv"):
df = pd.read_csv(uploaded)
else:
df = pd.read_excel(uploaded)
except Exception as e:
st.error(f"파일 읽기 였λ₯˜: {e}")
st.stop()
for c in df.columns:
try:
converted = pd.to_numeric(df[c], errors="coerce")
if converted.notna().sum() == df[c].notna().sum() and df[c].notna().sum() > 0:
df[c] = converted
except Exception:
pass
st.success(f"βœ… 파일 λ‘œλ“œ μ™„λ£Œ β€” {len(df)}ν–‰ Γ— {len(df.columns)}μ—΄")
with st.expander("πŸ“‹ 데이터 미리보기 (μƒμœ„ 10ν–‰)"):
st.dataframe(df.head(10), use_container_width=True)
# ── STEP 1-5: μ΄μƒμΉ˜ 탐지 ────────────────────────────────────────────────────
st.markdown("---")
st.markdown("### πŸ”Ž STEP 1-5. μ΄μƒμΉ˜ 탐지 (λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리 + SMC)")
num_cols_all = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
tab_maha, tab_smc = st.tabs(["πŸ“ λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리", "πŸ“ SMC (λ‹€μ€‘μƒκ΄€μžμŠΉ)"])
with tab_maha:
st.caption("χ² 뢄포 κΈ°μ€€μœΌλ‘œ λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리가 μž„κ³„κ°’μ„ μ΄ˆκ³Όν•˜λŠ” **ν‘œλ³Έ(ν–‰)**을 μ΄μƒμΉ˜λ‘œ νƒμ§€ν•©λ‹ˆλ‹€.")
col_a, col_b = st.columns([3, 1])
outlier_cols = col_a.multiselect("뢄석 λ³€μˆ˜ 선택", num_cols_all, default=num_cols_all, key="oc_cols")
p_thr = col_b.selectbox("μœ μ˜μˆ˜μ€€", [0.001, 0.005, 0.01, 0.05], index=0, key="oc_pthr")
if outlier_cols and st.button("πŸ” λ§ˆν• λΌλ…ΈλΉ„μŠ€ 탐지 μ‹€ν–‰", key="oc_run"):
oc_result, oc_cutoff, oc_idx = detect_outliers_mahalanobis(df, outlier_cols, p_threshold=p_thr)
st.session_state["oc_result"] = oc_result
st.session_state["oc_cutoff"] = oc_cutoff
st.session_state["oc_idx"] = oc_idx
st.session_state.pop("oc_remove_rows", None)
if "oc_result" in st.session_state and st.session_state["oc_result"] is not None:
oc_result = st.session_state["oc_result"]
oc_cutoff = st.session_state["oc_cutoff"]
oc_idx = st.session_state["oc_idx"]
n_out = len(oc_idx)
st.markdown(f"**χ² μž„κ³„κ°’:** `{oc_cutoff}`")
col_r1, col_r2, col_r3 = st.columns(3)
col_r1.metric("전체 ν‘œλ³Έ", len(df))
col_r2.metric("νƒμ§€λœ μ΄μƒμΉ˜", n_out, delta=f"-{n_out}" if n_out > 0 else "μ—†μŒ", delta_color="inverse")
col_r3.metric("제거 ν›„ ν‘œλ³Έ", len(df) - n_out)
disp_all = oc_result.sort_values("λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리", ascending=False).copy()
disp_all["νŒμ •"] = disp_all["μ΄μƒμΉ˜"].map({True: "⚠️ μ΄μƒμΉ˜", False: "βœ… 정상"})
disp_all = disp_all.drop(columns=["μ΄μƒμΉ˜"])
with st.expander("πŸ“‹ λ§ˆν• λΌλ…ΈλΉ„μŠ€ 전체 κ²°κ³Ό", expanded=(n_out > 0)):
st.dataframe(disp_all, use_container_width=True, hide_index=True)
if n_out > 0:
outlier_row_nums = (oc_result[oc_result["μ΄μƒμΉ˜"] == True]["원본 ν–‰λ²ˆν˜Έ"].tolist()
if "μ΄μƒμΉ˜" in oc_result.columns else [])
_default_rows = [r for r in st.session_state.get("oc_remove_rows", outlier_row_nums)
if r in outlier_row_nums]
st.multiselect(f"πŸ—‘οΈ μ œκ±°ν•  ν‘œλ³Έ 선택 (총 {n_out}개 탐지)", options=outlier_row_nums,
default=_default_rows, key="oc_remove_rows")
else:
st.success("βœ… μ΄μƒμΉ˜κ°€ νƒμ§€λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.")
with tab_smc:
st.caption("각 λ³€μˆ˜λ₯Ό λ‚˜λ¨Έμ§€ λ³€μˆ˜λ‘œ νšŒκ·€λΆ„μ„ν•œ RΒ²(SMC) < .20 이면 λ³€μˆ˜ 제거λ₯Ό κ³ λ €ν•©λ‹ˆλ‹€.")
smc_cols = st.multiselect("SMC 뢄석 λ³€μˆ˜ 선택", num_cols_all, default=num_cols_all, key="smc_cols")
if smc_cols and st.button("πŸ” SMC 탐지 μ‹€ν–‰", key="smc_run"):
with st.spinner("SMC 계산 쀑..."):
smc_result = compute_smc(df, smc_cols)
st.session_state["smc_result"] = smc_result
st.session_state.pop("smc_drop_vars", None)
if "smc_result" in st.session_state and st.session_state["smc_result"] is not None:
smc_result = st.session_state["smc_result"]
low_smc = smc_result[smc_result["SMC"].notna() & (smc_result["SMC"] < 0.2)]
n_low = len(low_smc)
col_s1, col_s2 = st.columns(2)
col_s1.metric("뢄석 λ³€μˆ˜ 수", len(smc_result))
col_s2.metric("SMC < .20 λ³€μˆ˜", n_low)
with st.expander("πŸ“‹ SMC 뢄석 κ²°κ³Ό", expanded=(n_low > 0)):
st.dataframe(smc_result.sort_values("SMC").reset_index(drop=True),
use_container_width=True, hide_index=True)
if n_low > 0:
low_vars = low_smc["λ³€μˆ˜"].tolist()
_default_vars = [v for v in st.session_state.get("smc_drop_vars", low_vars)
if v in smc_result["λ³€μˆ˜"].tolist()]
st.multiselect(f"πŸ—‘οΈ μ œκ±°ν•  λ³€μˆ˜ 선택 (SMC < .20, 총 {n_low}개)",
options=smc_result["λ³€μˆ˜"].tolist(),
default=_default_vars, key="smc_drop_vars")
else:
st.success("βœ… SMC < .20 λ³€μˆ˜κ°€ μ—†μŠ΅λ‹ˆλ‹€.")
# ── μ΄μƒμΉ˜/λ³€μˆ˜ 제거 적용 ─────────────────────────────────────────────────────
_removed_rows = st.session_state.get("oc_remove_rows", [])
_dropped_vars = st.session_state.get("smc_drop_vars", [])
if _removed_rows:
_remove_idx = [int(r) - 1 for r in _removed_rows]
_valid_idx = [i for i in _remove_idx if i in df.index]
if _valid_idx:
df = df.drop(index=_valid_idx).reset_index(drop=True)
st.info(f"πŸ—‘οΈ μ΄μƒμΉ˜ **{len(_valid_idx)}개 ν‘œλ³Έ** 제거 β†’ 뢄석 ν‘œλ³Έ: **{len(df)}개**")
if _dropped_vars:
_actual_drop = [v for v in _dropped_vars if v in df.columns]
if _actual_drop:
df = df.drop(columns=_actual_drop)
st.info(f"πŸ—‘οΈ SMC λ³€μˆ˜ **{len(_actual_drop)}개** 제거: `{', '.join(_actual_drop)}`")
# ── STEP 2: κ΅¬μ„±κ°œλ… 탐지 ─────────────────────────────────────────────────────
st.markdown("---")
st.markdown("### πŸ” STEP 2. κ΅¬μ„±κ°œλ… μžλ™ 탐지")
constructs_auto = auto_detect_constructs(df)
c1, c2, c3 = st.columns(3)
c1.metric("전체 λ³€μˆ˜", len(df.columns))
c2.metric("유효 사둀", len(df.dropna()))
c3.metric("νƒμ§€λœ κ΅¬μ„±κ°œλ…", len(constructs_auto))
if constructs_auto:
for lv, items in constructs_auto.items():
st.markdown(f"- **{lv}** ({len(items)}λ¬Έν•­): {', '.join(items)}")
else:
st.warning("μžλ™ νƒμ§€λœ κ΅¬μ„±κ°œλ…μ΄ μ—†μŠ΅λ‹ˆλ‹€. μ•„λž˜μ—μ„œ 직접 μ„€μ •ν•΄ μ£Όμ„Έμš”.")
with st.expander("✏️ κ΅¬μ„±κ°œλ… 직접 νŽΈμ§‘ (ν•„μš” μ‹œ)"):
n_c = st.number_input("κ΅¬μ„±κ°œλ… 수", min_value=1, max_value=20,
value=max(len(constructs_auto), 1), key="n_constructs")
auto_k = list(constructs_auto.keys())
auto_v = list(constructs_auto.values())
constructs_edit = {}
for i in range(int(n_c)):
ca, cb = st.columns([1, 3])
default_name = auto_k[i] if i < len(auto_k) else f"LV{i+1}"
default_items = auto_v[i] if i < len(auto_v) else []
nm = ca.text_input(f"이름 {i+1}", value=default_name, key=f"cn_{i}")
it = cb.multiselect(f"λ¬Έν•­ {i+1}", df.columns.tolist(), default=default_items, key=f"ci_{i}")
if nm and len(it) >= 2:
constructs_edit[nm] = it
constructs = constructs_edit if constructs_edit else constructs_auto
if not constructs:
constructs = constructs_auto
# ── STEP 3: 뢄석 방법 선택 ────────────────────────────────────────────────────
st.markdown("---")
st.markdown("### βœ… STEP 3. 뢄석 방법 선택")
num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
cat_cols = [c for c in df.columns if detect_scale(df[c]) in ("binary","categorical")]
lv_list = list(constructs.keys())
n_valid = len(df.dropna())
suggestions = {
"λΉˆλ„λΆ„μ„": {"ok": len(cat_cols) > 0,
"reason": f"λ²”μ£Όν˜• λ³€μˆ˜ {len(cat_cols)}개 감지"},
"κΈ°μˆ ν†΅κ³„": {"ok": len(num_cols) > 0,
"reason": f"μˆ˜μΉ˜ν˜• λ³€μˆ˜ {len(num_cols)}개"},
"신뒰도 (Cronbach's Ξ±)": {"ok": len(constructs) > 0,
"reason": f"κ΅¬μ„±κ°œλ… {len(constructs)}개 탐지"},
"확인적 μš”μΈλΆ„μ„ (CFA)": {"ok": len(constructs) >= 2 and n_valid >= 100,
"reason": f"κ΅¬μ„±κ°œλ… {len(constructs)}개, N={n_valid}"},
"상관관계 뢄석": {"ok": len(constructs) >= 2,
"reason": f"μž μž¬λ³€μˆ˜ {len(constructs)}개"},
"ꡬ쑰방정식 (SEM)": {"ok": len(constructs) >= 3 and n_valid >= 200,
"reason": f"κ΅¬μ„±κ°œλ… {len(constructs)}개, N={n_valid}"},
}
selected = {}
for method, info in suggestions.items():
icon = "βœ…" if info["ok"] else "⚠️"
col_a, col_b = st.columns([1, 10])
selected[method] = col_a.checkbox(f"{icon}", value=info["ok"], key=f"sel_{method}")
col_b.markdown(
f'<div class="card"><b>{method}</b>'
f'&nbsp;<span style="color:#888;font-size:.83rem">{info["reason"]}</span></div>',
unsafe_allow_html=True)
# ── SEM κ°€μ„€ μ„€μ • ─────────────────────────────────────────────────────────────
hypotheses = []
if selected.get("ꡬ쑰방정식 (SEM)") and len(lv_list) >= 2:
st.markdown("---\n#### πŸ”— SEM κ°€μ„€ μ„€μ •")
if "hyps" not in st.session_state:
st.session_state.hyps = [("", "")]
if st.button("οΌ‹ κ°€μ„€ μΆ”κ°€"):
st.session_state.hyps.append(("", ""))
to_delete = None
for i, (sd, td) in enumerate(st.session_state.hyps):
ca, cb, cc = st.columns([3, 3, 1])
s = ca.selectbox(f"H{i+1} λ…λ¦½λ³€μˆ˜", lv_list,
index=lv_list.index(sd) if sd in lv_list else 0, key=f"hs_{i}")
t = cb.selectbox(f"H{i+1} μ’…μ†λ³€μˆ˜", lv_list,
index=lv_list.index(td) if td in lv_list else 0, key=f"ht_{i}")
if cc.button("μ‚­μ œ", key=f"hd_{i}"):
to_delete = i
if s != t:
hypotheses.append((s, t))
st.session_state.hyps[i] = (s, t)
if to_delete is not None and len(st.session_state.hyps) > 1:
st.session_state.hyps.pop(to_delete)
st.rerun()
# ── STEP 4: μ‹€ν–‰ ──────────────────────────────────────────────────────────────
st.markdown("---")
_run_btn = st.button("πŸš€ 뢄석 μ‹€ν–‰", type="primary", use_container_width=True)
if _run_btn:
for _k in ["_sem_applied", "_sem_init", "_sem_manual_pairs",
"_sem_df", "_sem_constructs", "_sem_hypotheses", "_sem_cfa_cov",
"_cfa_applied", "_cfa_init", "_cfa_df", "_cfa_constructs",
"_cfa_manual_pairs"]:
st.session_state.pop(_k, None)
if not _run_btn and "_tab_data" not in st.session_state:
st.stop()
if not _run_btn and "_tab_data" in st.session_state:
tab_data = st.session_state["_tab_data"]
tab_labels = st.session_state["_tab_labels"]
xlsx_sheets = st.session_state.get("_xlsx_sheets", {})
if "_cfa_applied" in st.session_state and "CFA" in tab_data:
tab_data["CFA"] = st.session_state["_cfa_applied"]
_lds, _rd, _fi, _fm, _im, _fmi, _ml, _wm, _ec = tab_data["CFA"]
if _lds is not None:
xlsx_sheets["ν‘œ2_CFA"] = ("CFA β€” μš”μΈλΆ€ν•˜λŸ‰", _lds, "β€» ν‘œμ€€ν™”κ³„μˆ˜β‰₯.50, AVEβ‰₯.50, CRβ‰₯.70")
xlsx_sheets["ν‘œ2_신뒰도"] = ("CFA β€” AVE/CR/Ξ±", _rd, "")
if _wm and not _ml.empty:
xlsx_sheets["CFA_μˆ˜μ •κ³Όμ •"] = ("CFA β€” MI 기반 μˆ˜μ • κ³Όμ •", _ml, "β€» MI>3.84 κΈ°μ€€")
if "_sem_applied" in st.session_state and "SEM" in tab_data:
tab_data["SEM"] = st.session_state["_sem_applied"]
_sp, _fi, _fm, _ml, _ec, _wm, _im, _fm2 = tab_data["SEM"]
if _sp is not None:
xlsx_sheets["ν‘œ4_가섀검증"] = (
"SEM κ°€μ„€ 검증", _sp,
"β€» ***p<.001 **p<.01 *p<.05 n.s.=μœ μ˜ν•˜μ§€μ•ŠμŒ", "채택여뢀")
if _wm and not _ml.empty:
xlsx_sheets["SEM_μˆ˜μ •κ³Όμ •"] = ("SEM β€” MI 기반 μˆ˜μ • κ³Όμ •", _ml, "β€» MI>3.84 κΈ°μ€€")
else:
xlsx_sheets = {}
tab_labels = []
tab_data = {}
cfa_extra_cov = []
# λΉˆλ„λΆ„μ„
if selected.get("λΉˆλ„λΆ„μ„") and cat_cols:
try:
freq_res = {}
for c in cat_cols[:10]:
f = df[c].value_counts().sort_index()
pct = (f / f.sum() * 100).round(1)
fdf = pd.DataFrame({"λΉˆλ„": f, "λ°±λΆ„μœ¨(%)": pct, "λˆ„μ (%)": pct.cumsum().round(1)})
fdf.index.name = c
freq_res[c] = fdf
tab_data["λΉˆλ„λΆ„μ„"] = freq_res
tab_labels.append("λΉˆλ„λΆ„μ„")
for cn, fdf in freq_res.items():
xlsx_sheets[f"λΉˆλ„_{cn}"[:31]] = (f"λΉˆλ„λΆ„μ„ β€” {cn}", fdf.reset_index(), "")
except Exception as e:
st.warning(f"λΉˆλ„λΆ„μ„ 였λ₯˜: {e}")
# κΈ°μˆ ν†΅κ³„
if selected.get("κΈ°μˆ ν†΅κ³„") and num_cols:
try:
desc = df[num_cols].describe().T[["count","mean","std","min","max"]].copy()
desc.columns = ["N","평균","ν‘œμ€€νŽΈμ°¨","μ΅œμ†Ÿκ°’","μ΅œλŒ“κ°’"]
desc["μ™œλ„"] = df[num_cols].skew()
desc["첨도"] = df[num_cols].kurt()
desc = desc.round(3)
tab_data["κΈ°μˆ ν†΅κ³„"] = desc
tab_labels.append("κΈ°μˆ ν†΅κ³„")
xlsx_sheets["κΈ°μˆ ν†΅κ³„"] = ("κΈ°μˆ ν†΅κ³„", desc.reset_index().rename(columns={"index":"λ³€μˆ˜"}), "")
except Exception as e:
st.warning(f"κΈ°μˆ ν†΅κ³„ 였λ₯˜: {e}")
# 신뒰도
if selected.get("신뒰도 (Cronbach's Ξ±)") and constructs:
try:
rel_rows = []
for lv, items in constructs.items():
alpha = cronbach_alpha(df[items])
rel_rows.append({"κ΅¬μ„±κ°œλ…": lv, "λ¬Έν•­ 수": len(items),
"Cronbach Ξ±": alpha,
"νŒμ •": "μΆ©μ‘± βœ“" if (not np.isnan(alpha) and alpha >= 0.7) else "λ―ΈμΆ©μ‘± βœ—"})
rel_df = pd.DataFrame(rel_rows)
tab_data["신뒰도"] = rel_df
tab_labels.append("신뒰도")
xlsx_sheets["신뒰도뢄석"] = ("신뒰도 뢄석 (Cronbach's Ξ±)", rel_df, "β€» Ξ± β‰₯ .70 ꢌμž₯")
except Exception as e:
st.warning(f"신뒰도 뢄석 였λ₯˜: {e}")
# CFA
if selected.get("확인적 μš”μΈλΆ„μ„ (CFA)") and constructs:
with st.spinner("CFA 초기 뢄석 쀑..."):
try:
loads, rel_df, fit_init, fit_mod, init_mi, final_mi, mod_log, was_mod, extra_cov = \
build_cfa_tables(df, constructs, max_mods=0)
if loads is not None:
st.session_state["_cfa_init"] = (loads, rel_df, fit_init, fit_mod,
init_mi, final_mi, mod_log, was_mod, extra_cov)
st.session_state["_cfa_df"] = df
st.session_state["_cfa_constructs"] = constructs
_cfa_display = st.session_state.get("_cfa_applied", st.session_state["_cfa_init"])
tab_data["CFA"] = _cfa_display
tab_labels.append("CFA")
_lds, _rd, _fi, _fm, _im, _fmi, _ml, _wm, _ec = _cfa_display
cfa_extra_cov = list(_ec) if _ec else []
if _lds is not None:
xlsx_sheets["ν‘œ2_CFA"] = ("CFA β€” μš”μΈλΆ€ν•˜λŸ‰", _lds,
"β€» ν‘œμ€€ν™”κ³„μˆ˜β‰₯.50, AVEβ‰₯.50, CRβ‰₯.70")
xlsx_sheets["ν‘œ2_신뒰도"] = ("CFA β€” AVE/CR/Ξ±", _rd, "")
if _wm and not _ml.empty:
xlsx_sheets["CFA_μˆ˜μ •κ³Όμ •"] = ("CFA β€” MI 기반 μˆ˜μ • κ³Όμ •", _ml, "β€» MI>3.84 κΈ°μ€€")
if not _im.empty:
xlsx_sheets["CFA_MI초기"] = ("CFA β€” 초기 μˆ˜μ •μ§€μˆ˜(MI)", _im.head(20), "")
if not _fmi.empty:
xlsx_sheets["CFA_MIμ΅œμ’…"] = ("CFA β€” μ΅œμ’… μˆ˜μ •μ§€μˆ˜(MI)", _fmi.head(20), "")
else:
st.warning("⚠️ CFA κ²°κ³Ό 생성 μ‹€νŒ¨.")
except Exception as e:
st.warning(f"CFA 였λ₯˜: {e}")
# 상관관계
if selected.get("상관관계 뢄석") and constructs:
try:
corr_tbl = run_correlation(df, constructs)
if not corr_tbl.empty:
tab_data["상관관계"] = corr_tbl
tab_labels.append("상관관계")
xlsx_sheets["ν‘œ3_상관관계"] = (
"상관관계 뢄석 (λŒ€κ°μ„ =√AVE)", corr_tbl,
"β€» λŒ€κ°μ„ : √AVE | νŒλ³„νƒ€λ‹Ήλ„ κΈ°μ€€: √AVE > 타 λ³€μˆ˜ μƒκ΄€κ³„μˆ˜")
except Exception as e:
st.warning(f"상관관계 였λ₯˜: {e}")
# SEM
if selected.get("ꡬ쑰방정식 (SEM)") and constructs and hypotheses:
with st.spinner("SEM 초기 뢄석 쀑..."):
try:
sem_paths, fit_init_sem, fit_mod_sem, mod_log_sem, extra_cov_sem, was_mod_sem, \
init_mi_sem, final_mi_sem = \
build_lavaan_sem_table(df, constructs, hypotheses,
cfa_extra_cov=cfa_extra_cov, max_mods=0)
if sem_paths is not None:
st.session_state["_sem_init"] = (
sem_paths, fit_init_sem, fit_mod_sem,
mod_log_sem, extra_cov_sem, was_mod_sem,
init_mi_sem, final_mi_sem)
st.session_state["_sem_df"] = df
st.session_state["_sem_constructs"] = constructs
st.session_state["_sem_hypotheses"] = hypotheses
st.session_state["_sem_cfa_cov"] = cfa_extra_cov
_sem_display = st.session_state.get("_sem_applied", st.session_state["_sem_init"])
tab_data["SEM"] = _sem_display
tab_labels.append("SEM")
_sp, _fi, _fm, _ml, _ec, _wm, _im, _fm2 = _sem_display
if _sp is not None:
xlsx_sheets["ν‘œ4_가섀검증"] = (
"SEM κ°€μ„€ 검증", _sp,
"β€» ***p<.001 **p<.01 *p<.05 n.s.=μœ μ˜ν•˜μ§€μ•ŠμŒ", "채택여뢀")
if _wm and not _ml.empty:
xlsx_sheets["SEM_μˆ˜μ •κ³Όμ •"] = (
"SEM β€” MI 기반 μˆ˜μ • κ³Όμ •", _ml, "β€» MI>3.84 κΈ°μ€€")
else:
st.warning("⚠️ SEM κ²°κ³Όλ₯Ό μƒμ„±ν•˜μ§€ λͺ»ν–ˆμŠ΅λ‹ˆλ‹€.")
except Exception as e:
st.warning(f"SEM 였λ₯˜: {e}")
st.session_state["_tab_data"] = tab_data
st.session_state["_tab_labels"] = tab_labels
st.session_state["_xlsx_sheets"] = xlsx_sheets
# ── κ²°κ³Ό 좜λ ₯ ──────────────────────────────────────────────────────────────────
st.markdown("---")
st.markdown("## πŸ“‹ 뢄석 κ²°κ³Ό")
if not tab_labels:
st.warning("μ‹€ν–‰λœ 뢄석이 μ—†μŠ΅λ‹ˆλ‹€.")
st.stop()
FIT_NOTE = "ꢌμž₯ κΈ°μ€€: NFIΒ·RFIΒ·IFIΒ·TLIΒ·CFI β‰₯ .90 | RMSEA < .049 | SRMR ≀ .08"
tabs = st.tabs(tab_labels)
for tab, lbl in zip(tabs, tab_labels):
with tab:
c = tab_data[lbl]
if lbl == "λΉˆλ„λΆ„μ„":
for col, fdf in c.items():
st.markdown(f"**{col}**")
st.dataframe(fdf, use_container_width=True)
elif lbl == "κΈ°μˆ ν†΅κ³„":
st.dataframe(c, use_container_width=True)
elif lbl == "신뒰도":
st.dataframe(c, use_container_width=True)
st.caption("Ξ± β‰₯ .70: 내적 일관성 확보")
elif lbl == "CFA":
loads, rel_df, fit_init, fit_mod, init_mi, final_mi, mod_log, was_mod, extra_cov = c
crit_df = pd.DataFrame([
{"μ§€μˆ˜": "NFI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"},
{"μ§€μˆ˜": "RFI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"},
{"μ§€μˆ˜": "IFI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"},
{"μ§€μˆ˜": "TLI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"},
{"μ§€μˆ˜": "CFI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"},
{"μ§€μˆ˜": "RMSEA", "ꢌμž₯κΈ°μ€€": "< .049"},
{"μ§€μˆ˜": "SRMR", "ꢌμž₯κΈ°μ€€": "≀ .08"},
])
with st.expander("πŸ“Œ 적합도 ꢌμž₯ κΈ°μ€€"):
st.dataframe(crit_df, use_container_width=True, hide_index=True)
st.markdown("**λͺ¨λΈ 적합도 비ꡐ**")
fit_col_order = ["χ²", "df", "χ²/df", "p", "NFI", "RFI", "IFI", "TLI", "CFI", "RMSEA"]
def _fit_row(label, fit):
row = {"λͺ¨ν˜•": label}
for k in fit_col_order:
row[k] = fit.get(k, "-")
return row
if was_mod:
fit_cmp = pd.DataFrame([_fit_row("초기 λͺ¨ν˜•", fit_init), _fit_row("μˆ˜μ • λͺ¨ν˜•", fit_mod)])
else:
fit_cmp = pd.DataFrame([_fit_row("초기 λͺ¨ν˜•", fit_init)])
def _hl_fit(row):
colors = [""] * len(row)
checks = {"NFI":.90,"RFI":.90,"IFI":.90,"TLI":.90,"CFI":.90}
cols = list(row.index)
for idx, col in enumerate(cols):
v = row[col]
try:
fv = float(v)
if col in checks and fv < checks[col]:
colors[idx] = "background-color:#FFE0E0"
elif col == "RMSEA" and fv >= 0.049:
colors[idx] = "background-color:#FFE0E0"
elif col in checks and fv >= checks[col]:
colors[idx] = "background-color:#E8F5E9"
elif col == "RMSEA" and fv < 0.049:
colors[idx] = "background-color:#E8F5E9"
except Exception:
pass
return colors
st.dataframe(fit_cmp.style.apply(_hl_fit, axis=1), use_container_width=True)
st.caption("🟒 κΈ°μ€€ μΆ©μ‘± | πŸ”΄ κΈ°μ€€ λ―ΈμΆ©μ‘± | RMSEA < .049 ꢌμž₯")
if was_mod and extra_cov:
st.success(f"βœ… MI 기반 λͺ¨ν˜•μˆ˜μ • {len(extra_cov)}회 적용")
paths_str = " / ".join(f"{a} ~~ {b}" for a, b in extra_cov)
st.markdown(f"**μˆ˜μ •λœ 경둜:** `{paths_str}`")
with st.expander("πŸ“‹ μˆ˜μ • κ³Όμ • 단계별 상세"):
st.dataframe(mod_log, use_container_width=True)
elif not was_mod:
all_ok = _adequate_check(fit_init)
if all_ok:
st.success("βœ… 초기 λͺ¨ν˜• 적합도 μ–‘ν˜Έ β€” μˆ˜μ • λΆˆν•„μš”")
else:
st.info("ℹ️ 적합도 λ―ΈμΆ©μ‘±μ΄λ‚˜ MI > 3.84 쌍이 μ—†μ–΄ 더 이상 μˆ˜μ • λΆˆκ°€")
# CFA MI μˆ˜λ™ 선택
_cfa_init_mi = st.session_state.get("_cfa_init", (None,)*9)[4]
if _cfa_init_mi is not None and not _cfa_init_mi.empty:
_cfa_mi_over = _cfa_init_mi[_cfa_init_mi["MI"] > 3.84]
if not _cfa_mi_over.empty:
st.markdown("---")
st.markdown("#### πŸ› οΈ μž”μ°¨κ³΅λΆ„μ‚° μˆ˜λ™ μΆ”κ°€")
st.caption("MI > 3.84 쌍 쀑 μ μš©ν•  ν•­λͺ©μ„ μ„ νƒν•œ λ’€ **μž¬μ‹€ν–‰** λ²„νŠΌμ„ λˆŒλŸ¬μ£Όμ„Έμš”.")
_cfa_opts = [f"{row['λ³€μˆ˜1']} ~~ {row['λ³€μˆ˜2']} (MI = {row['MI']})"
for _, row in _cfa_mi_over.iterrows()]
_cfa_cur_pairs = st.session_state.get("_cfa_manual_pairs", [])
_cfa_cur_opts = [o for o in _cfa_opts
if any(f"{v1} ~~ {v2}" in o for v1, v2 in _cfa_cur_pairs)]
_cfa_sel = st.multiselect("μ μš©ν•  μž”μ°¨κ³΅λΆ„μ‚° 경둜 선택",
options=_cfa_opts, default=_cfa_cur_opts,
key="cfa_mi_multisel")
if st.button("▢️ 선택 경둜 μ μš©ν•˜μ—¬ CFA μž¬μ‹€ν–‰", key="cfa_rerun_btn"):
_cfa_new_pairs = []
for s in _cfa_sel:
part = s.split(" (MI")[0].strip()
v1, v2 = [x.strip() for x in part.split("~~")]
_cfa_new_pairs.append((v1, v2))
st.session_state["_cfa_manual_pairs"] = _cfa_new_pairs
with st.spinner("CFA μž¬λΆ„μ„ 쀑..."):
try:
_cr = build_cfa_tables(
st.session_state["_cfa_df"],
st.session_state["_cfa_constructs"],
max_mods=0, manual_extra_cov=_cfa_new_pairs)
if _cr[0] is not None:
st.session_state["_cfa_applied"] = _cr
st.rerun()
except Exception as _ce:
st.warning(f"CFA μž¬μ‹€ν–‰ 였λ₯˜: {_ce}")
col_mi1, col_mi2 = st.columns(2)
with col_mi1:
if init_mi is not None and not init_mi.empty:
with st.expander("πŸ” 초기 μˆ˜μ •μ§€μˆ˜(MI) μƒμœ„ 20개"):
d = init_mi.head(20).copy()
d["νŒμ •"] = d["MI"].apply(lambda v: "⚠️ μˆ˜μ • κ³ λ €" if v > 3.84 else "")
st.dataframe(d, use_container_width=True)
with col_mi2:
if final_mi is not None and not final_mi.empty:
with st.expander("πŸ” μ΅œμ’… μˆ˜μ •μ§€μˆ˜(MI) μƒμœ„ 20개"):
d = final_mi.head(20).copy()
d["νŒμ •"] = d["MI"].apply(lambda v: "⚠️ μΆ”κ°€ μˆ˜μ • κ°€λŠ₯" if v > 3.84 else "βœ…")
st.dataframe(d, use_container_width=True)
st.markdown("---")
st.markdown("**CFA κ²°κ³Ό** (μˆ˜μ •λͺ¨ν˜• κΈ°μ€€)")
try:
merged = loads.copy()
merged["AVE"] = ""; merged["CR"] = ""; merged["Cronbach Ξ±"] = ""
rel_map = rel_df.set_index("μž μž¬λ³€μˆ˜")
seen = set()
for idx, row in merged.iterrows():
lv = row["μž μž¬λ³€μˆ˜"]
if lv not in seen:
seen.add(lv)
if lv in rel_map.index:
merged.at[idx, "AVE"] = rel_map.loc[lv, "AVE"]
merged.at[idx, "CR"] = rel_map.loc[lv, "CR"]
merged.at[idx, "Cronbach Ξ±"] = rel_map.loc[lv, "Cronbach Ξ±"]
disp_cols = ["μž μž¬λ³€μˆ˜", "μΈ‘μ •λ³€μˆ˜", "ν‘œμ€€ν™”κ³„μˆ˜", "SE", "tκ°’", "pκ°’", "AVE", "CR", "Cronbach Ξ±"]
merged = merged[[c for c in disp_cols if c in merged.columns]]
st.dataframe(merged, use_container_width=True, hide_index=True)
except Exception:
st.dataframe(loads, use_container_width=True)
st.dataframe(rel_df, use_container_width=True)
st.caption("ꢌμž₯: ν‘œμ€€ν™”κ³„μˆ˜ β‰₯ .50 | AVE β‰₯ .50 | CR β‰₯ .70 | Cronbach Ξ± β‰₯ .70")
elif lbl == "상관관계":
st.dataframe(c, use_container_width=True)
st.caption("λŒ€κ°μ„  = √AVE | ν•˜μ‚Όκ° = μž μž¬λ³€μˆ˜ κ°„ μƒκ΄€κ³„μˆ˜")
elif lbl == "SEM":
paths, fit_init, fit_mod, mod_log, extra_cov, was_mod, init_mi_sem, final_mi_sem = c
fit_col_order = ["χ²","df","χ²/df","p","NFI","RFI","IFI","TLI","CFI","RMSEA"]
def _hl_sem(row):
checks = {"NFI":.90,"RFI":.90,"IFI":.90,"TLI":.90,"CFI":.90}
colors = []
for col in row.index:
try:
fv = float(row[col])
if col in checks:
colors.append("background-color:#E8F5E9" if fv>=checks[col]
else "background-color:#FFE0E0")
elif col == "RMSEA":
colors.append("background-color:#E8F5E9" if fv<0.049
else "background-color:#FFE0E0")
else:
colors.append("")
except Exception:
colors.append("")
return colors
st.markdown("**λͺ¨λΈ 적합도**")
if was_mod:
fit_cmp = pd.DataFrame([
{"λͺ¨ν˜•":"초기 λͺ¨ν˜•", **{k: fit_init.get(k,"-") for k in fit_col_order}},
{"λͺ¨ν˜•":"μˆ˜μ • λͺ¨ν˜•", **{k: fit_mod.get(k,"-") for k in fit_col_order}},
])
else:
fit_cmp = pd.DataFrame([
{"λͺ¨ν˜•":"초기 λͺ¨ν˜•", **{k: fit_init.get(k,"-") for k in fit_col_order}}
])
st.dataframe(fit_cmp.style.apply(_hl_sem, axis=1), use_container_width=True)
st.caption(FIT_NOTE)
if was_mod and extra_cov:
st.success(f"βœ… MI 기반 μˆ˜μ • {len(mod_log)}회 적용")
paths_str = " / ".join(f"{a} ~~ {b}" for a, b in extra_cov if isinstance(a, str))
st.markdown(f"**μˆ˜μ •λœ 경둜:** `{paths_str}`")
with st.expander("πŸ“‹ μˆ˜μ • κ³Όμ • 상세"):
st.dataframe(mod_log, use_container_width=True)
elif not was_mod:
if _adequate_check(fit_init):
st.success("βœ… 초기 λͺ¨ν˜• 적합도 μ–‘ν˜Έ β€” μˆ˜μ • λΆˆν•„μš”")
else:
st.info("ℹ️ 적합도 λ―ΈμΆ©μ‘±μ΄λ‚˜ MI > 3.84 쌍이 μ—†μ–΄ 더 이상 μˆ˜μ • λΆˆκ°€")
# SEM MI μˆ˜λ™ 선택
_init_mi_for_sel = st.session_state.get("_sem_init", (None,)*8)[6]
if _init_mi_for_sel is not None and not _init_mi_for_sel.empty:
mi_over = _init_mi_for_sel[_init_mi_for_sel["MI"] > 3.84]
if not mi_over.empty:
st.markdown("---")
st.markdown("#### πŸ› οΈ μž”μ°¨κ³΅λΆ„μ‚° μˆ˜λ™ μΆ”κ°€")
st.caption("MI > 3.84 쌍 쀑 μ μš©ν•  ν•­λͺ©μ„ μ„ νƒν•œ λ’€ **μž¬μ‹€ν–‰** λ²„νŠΌμ„ λˆŒλŸ¬μ£Όμ„Έμš”.")
_opts = [f"{row['λ³€μˆ˜1']} ~~ {row['λ³€μˆ˜2']} (MI = {row['MI']})"
for _, row in mi_over.iterrows()]
_cur_pairs = st.session_state.get("_sem_manual_pairs", [])
_cur_opts = [o for o in _opts
if any(f"{v1} ~~ {v2}" in o for v1, v2 in _cur_pairs)]
_sel = st.multiselect("μ μš©ν•  μž”μ°¨κ³΅λΆ„μ‚° 경둜 선택",
options=_opts, default=_cur_opts,
key="sem_mi_multisel")
if st.button("▢️ 선택 경둜 μ μš©ν•˜μ—¬ SEM μž¬μ‹€ν–‰", key="sem_rerun_btn"):
_new_pairs = []
for s in _sel:
part = s.split(" (MI")[0].strip()
v1, v2 = [x.strip() for x in part.split("~~")]
_new_pairs.append((v1, v2))
st.session_state["_sem_manual_pairs"] = _new_pairs
_cfa_cov = st.session_state.get("_sem_cfa_cov", [])
_all_covs = list(_cfa_cov) + _new_pairs
with st.spinner("SEM μž¬λΆ„μ„ 쀑..."):
try:
_r = build_lavaan_sem_table(
st.session_state["_sem_df"],
st.session_state["_sem_constructs"],
st.session_state["_sem_hypotheses"],
cfa_extra_cov=_all_covs, max_mods=0)
if _r[0] is not None:
_r2 = list(_r)
_r2[4] = _all_covs
_r2[5] = len(_new_pairs) > 0
st.session_state["_sem_applied"] = tuple(_r2)
st.rerun()
except Exception as _e:
st.warning(f"SEM μž¬μ‹€ν–‰ 였λ₯˜: {_e}")
col_mi1, col_mi2 = st.columns(2)
with col_mi1:
if init_mi_sem is not None and not init_mi_sem.empty:
with st.expander("πŸ” 초기 μˆ˜μ •μ§€μˆ˜(MI) μƒμœ„ 20개"):
d = init_mi_sem.head(20).copy()
d["νŒμ •"] = d["MI"].apply(lambda v: "⚠️ μˆ˜μ • κ³ λ €" if v > 3.84 else "")
st.dataframe(d, use_container_width=True)
with col_mi2:
if final_mi_sem is not None and not final_mi_sem.empty:
with st.expander("πŸ” μ΅œμ’… μˆ˜μ •μ§€μˆ˜(MI) μƒμœ„ 20개"):
d = final_mi_sem.head(20).copy()
d["νŒμ •"] = d["MI"].apply(lambda v: "⚠️ μΆ”κ°€ μˆ˜μ • κ°€λŠ₯" if v > 3.84 else "βœ…")
st.dataframe(d, use_container_width=True)
st.markdown("---")
st.markdown("**κ°€μ„€ 검증 κ²°κ³Ό** (μˆ˜μ •λͺ¨ν˜• κΈ°μ€€)")
def hl(row):
color = "#E8F5E9" if row.get("채택여뢀") == "채택" else "#FFF3F3"
return [f"background-color:{color}"] * len(row)
st.dataframe(paths.style.apply(hl, axis=1), use_container_width=True)
n_adopt = (paths["채택여뢀"] == "채택").sum()
st.markdown(f"**채택: {n_adopt}개 / 기각: {len(paths)-n_adopt}개 / 전체: {len(paths)}개**")
# ── Excel λ‹€μš΄λ‘œλ“œ ─────────────────────────────────────────────────────────────
st.markdown("---")
xlsx_sheets = st.session_state.get("_xlsx_sheets", xlsx_sheets)
if xlsx_sheets:
try:
_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
_fname = f"SEM_RESULT_{_ts}.xlsx"
xlsx_bytes = build_single_sheet_excel(xlsx_sheets)
st.download_button(
label=f"πŸ“₯ {_fname} λ‹€μš΄λ‘œλ“œ (λͺ¨λ“  κ²°κ³Ό ν•œ μ‹œνŠΈ)",
data=xlsx_bytes,
file_name=_fname,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
use_container_width=True,
type="primary"
)
st.caption("β€» λͺ¨λ“  뢄석 κ²°κ³Όκ°€ '뢄석결과' μ‹œνŠΈ ν•˜λ‚˜μ— μ„Ήμ…˜λ³„λ‘œ ν†΅ν•©λ˜μ–΄ μ €μž₯λ©λ‹ˆλ‹€.")
except Exception as e:
st.error(f"Excel 생성 였λ₯˜: {e}")