# ============================================================================= # πŸ“Š SEM 뢄석기 (λΉˆλ„λΆ„μ„ / κΈ°μˆ ν†΅κ³„ / 신뒰도 / CFA / 상관관계 / SEM) # μ‹€ν–‰: python -m streamlit run sem.py # ============================================================================= import streamlit as st import pandas as pd import numpy as np import re from datetime import datetime from collections import defaultdict st.set_page_config(page_title="πŸ“Š SEM 뢄석기", page_icon="πŸ“Š", layout="wide") # ── λΉ„λ°€λ²ˆν˜Έ 잠금 ───────────────────────────────────────────────────────────── def _check_password(): try: correct_pw = st.secrets.get("PASSWORD", "9400") except Exception: correct_pw = "9400" def _submit(): if st.session_state.get("_pw_input") == correct_pw: st.session_state["_pw_ok"] = True else: st.session_state["_pw_ok"] = False if st.session_state.get("_pw_ok"): return True st.markdown("## πŸ”’ SEM 뢄석기") st.text_input("λΉ„λ°€λ²ˆν˜Έλ₯Ό μž…λ ₯ν•˜μ„Έμš”", type="password", key="_pw_input", on_change=_submit) if "_pw_ok" in st.session_state and not st.session_state["_pw_ok"]: st.error("λΉ„λ°€λ²ˆν˜Έκ°€ ν‹€λ ΈμŠ΅λ‹ˆλ‹€.") st.stop() _check_password() st.markdown(""" """, unsafe_allow_html=True) # ── λͺ¨λ“ˆ μž„ν¬νŠΈ ─────────────────────────────────────────────────────────────── try: from modules.utils import detect_scale, cronbach_alpha, calc_ave_cr, fmt_p, sig_stars, build_excel except Exception as e: st.error(f"λͺ¨λ“ˆ λ‘œλ“œ 였λ₯˜: {e}\n\nμ•±κ³Ό 같은 폴더에 modules/ 폴더가 μžˆλŠ”μ§€ ν™•μΈν•˜μ„Έμš”.") st.stop() # ══════════════════════════════════════════════════════════════════════════════ # μœ ν‹Έ ν•¨μˆ˜ # ══════════════════════════════════════════════════════════════════════════════ def detect_outliers_mahalanobis(df, cols, p_threshold=0.001): from scipy import stats as sc data = df[cols].dropna() n, k = data.shape if n < k + 2: return None, None, [] try: mean = data.mean().values cov_mat = np.cov(data.values.T, ddof=1) try: cov_inv = np.linalg.inv(cov_mat) except np.linalg.LinAlgError: cov_inv = np.linalg.pinv(cov_mat) diffs = data.values - mean mahal = np.array([float(d @ cov_inv @ d) for d in diffs]) pvals = sc.chi2.sf(mahal, df=k) cutoff = sc.chi2.ppf(1 - p_threshold, df=k) result = pd.DataFrame({ "원본 ν–‰λ²ˆν˜Έ": data.index + 1, "λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리": np.round(mahal, 3), "pκ°’": np.round(pvals, 4), "μ΄μƒμΉ˜": mahal > cutoff, }).reset_index(drop=True) outlier_idx = data.index[mahal > cutoff].tolist() return result, round(cutoff, 3), outlier_idx except Exception: return None, None, [] def compute_smc(df, cols): data = df[cols].dropna() if len(data) < len(cols) + 2: return pd.DataFrame({"λ³€μˆ˜": cols, "SMC": [np.nan]*len(cols), "νŒμ •": ["사둀 수 λΆ€μ‘±"]*len(cols)}) rows = [] for col in cols: others = [c for c in cols if c != col] if not others: rows.append({"λ³€μˆ˜": col, "SMC": np.nan, "νŒμ •": "-"}) continue try: X = data[others].values.astype(float) y = data[col].values.astype(float) Xc = np.column_stack([np.ones(len(X)), X]) b, _, _, _ = np.linalg.lstsq(Xc, y, rcond=None) yh = Xc @ b ss_res = float(np.sum((y - yh) ** 2)) ss_tot = float(np.sum((y - y.mean()) ** 2)) r2 = (1.0 - ss_res / ss_tot) if ss_tot > 0 else np.nan rows.append({ "λ³€μˆ˜": col, "SMC": round(float(r2), 3), "νŒμ •": "⚠️ 제거 κ³ λ € (SMC < .20)" if (not np.isnan(r2) and r2 < 0.2) else "βœ… 정상" }) except Exception: rows.append({"λ³€μˆ˜": col, "SMC": np.nan, "νŒμ •": "계산 였λ₯˜"}) return pd.DataFrame(rows) def auto_detect_constructs(df): likert = [c for c in df.columns if detect_scale(df[c]) == "likert"] groups = defaultdict(list) for col in likert: m = re.match(r'^([A-Za-z]+\d*)(\d)$', col) groups[m.group(1) if m else col].append(col) return {k: v for k, v in groups.items() if len(v) >= 2} def _adequate_check(fit): return (fit.get("NFI",0)>=.90 and fit.get("RFI",0)>=.90 and fit.get("IFI",0)>=.90 and fit.get("TLI",0)>=.90 and fit.get("CFI",0)>=.90 and fit.get("RMSEA",1)<.049) def get_composite_scores(df, constructs): return pd.DataFrame( {lv: df[items].mean(axis=1) for lv, items in constructs.items()}) # ══════════════════════════════════════════════════════════════════════════════ # semopy 기반 CFA (lavaan 폴백용) # ══════════════════════════════════════════════════════════════════════════════ def calc_mi_approx(model, obs_df, obs_vars): try: n = len(obs_df) S = np.cov(obs_df[obs_vars].values.T, ddof=1) Sig, _ = model.calc_sigma() Sig = Sig + np.eye(len(obs_vars)) * 1e-8 Si = np.linalg.inv(Sig) G = Si @ (S - Sig) @ Si rows = [] p = len(obs_vars) for i in range(p): for j in range(i + 1, p): denom = 2 * Si[i, i] * Si[j, j] + 2 * Si[i, j] ** 2 mi = (n - 1) * G[i, j] ** 2 / max(denom, 1e-8) rows.append({"λ³€μˆ˜1": obs_vars[i], "λ³€μˆ˜2": obs_vars[j], "MI": round(float(mi), 3)}) return pd.DataFrame(rows).sort_values("MI", ascending=False).reset_index(drop=True) except Exception: return pd.DataFrame() def run_cfa_with_mi(df, constructs, mi_threshold=3.84, max_mods=200): try: from semopy import Model, calc_stats except ImportError: return None, "semopy νŒ¨ν‚€μ§€κ°€ ν•„μš”ν•©λ‹ˆλ‹€." try: item_to_lv = {item: lv for lv, items in constructs.items() for item in items} all_items = [i for items in constructs.values() for i in items] data = df[all_items].dropna() meas_lines = [f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] meas_str = "\n".join(meas_lines) lv_keys = list(constructs.keys()) lv_cov_lines = [f" {lv_keys[i]} ~~ {lv_keys[j]}" for i in range(len(lv_keys)) for j in range(i + 1, len(lv_keys))] meas_str = meas_str + ("\n" + "\n".join(lv_cov_lines) if lv_cov_lines else "") def _fit(model_str): try: m = Model(model_str); m.fit(data); return m except Exception: return None base_str = meas_str for _var_syntax in [ "\n".join(f" {lv} ~~ 1*{lv}" for lv in constructs.keys()), "\n".join(f" {lv} ~~ 1 * {lv}" for lv in constructs.keys()), "\n".join(f" {lv} ~~ 1@{lv}" for lv in constructs.keys()), ]: _candidate = meas_str + "\n" + _var_syntax if _fit(_candidate) is not None: base_str = _candidate break def _extract_fit(m): try: sd = calc_stats(m).iloc[0].to_dict() chi2 = float(sd.get("chi2", 0)) dof = float(sd.get("DoF", 1)) chi2_bl = float(sd.get("chi2 Baseline", 0)) dof_bl = float(sd.get("DoF Baseline", 1)) pval = float(sd.get("chi2 p-value", 1)) cfi = float(sd.get("CFI", 0)) tli = float(sd.get("TLI", 0)) nfi = float(sd.get("NFI", 0)) rmsea = float(sd.get("RMSEA", 1)) rfi = ((chi2_bl/dof_bl - chi2/dof) / (chi2_bl/dof_bl) if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) ifi = ((chi2_bl - chi2) / (chi2_bl - dof) if (chi2_bl - dof) > 0 else 0.0) return {"χ²": round(chi2,3), "df": int(dof), "χ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3), "NFI": round(nfi,3), "RFI": round(rfi,3), "IFI": round(ifi,3), "TLI": round(tli,3), "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} except Exception: return {} m0 = _fit(base_str) if m0 is None: return None, "초기 CFA μΆ”μ • μ‹€νŒ¨" fit0 = _extract_fit(m0) mi_df0 = calc_mi_approx(m0, data, all_items) extra_cov = [] added_pairs = set() mod_log = [] m_cur, fit_cur, mi_cur = m0, fit0.copy(), mi_df0.copy() for step in range(max_mods): if _adequate_check(fit_cur): break high_mi = mi_cur[mi_cur["MI"] > mi_threshold] if high_mi.empty: break avail = high_mi[ high_mi.apply( lambda r: tuple(sorted([r["λ³€μˆ˜1"], r["λ³€μˆ˜2"]])) not in added_pairs, axis=1)] if avail.empty: break row = avail.iloc[0] v1, v2 = row["λ³€μˆ˜1"], row["λ³€μˆ˜2"] pair = tuple(sorted([v1, v2])) new_cov = extra_cov + [(v1, v2)] new_str = (base_str + "\n" + "\n".join(f" {a} ~~ {b}" for a, b in new_cov)) m_new = _fit(new_str) if m_new is None: added_pairs.add(pair); continue fit_new = _extract_fit(m_new) lv1 = item_to_lv.get(v1, v1) lv2 = item_to_lv.get(v2, v2) mod_log.append({ "단계": step + 1, "μˆ˜μ • 경둜": f"{v1} ~~ {v2}", "μ†Œμ† μš”μΈ": lv1 if lv1 == lv2 else f"{lv1}↔{lv2}", "MI": round(row["MI"], 3), "CFI μ „β†’ν›„": f"{fit_cur.get('CFI','-')} β†’ {fit_new.get('CFI','-')}", "RMSEA μ „β†’ν›„": f"{fit_cur.get('RMSEA','-')} β†’ {fit_new.get('RMSEA','-')}", }) extra_cov = new_cov m_cur, fit_cur = m_new, fit_new added_pairs.add(pair) mi_cur = calc_mi_approx(m_cur, data, all_items) ins = m_cur.inspect(std_est=True) std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) final_mi = calc_mi_approx(m_cur, data, all_items) return { "init_model": m0, "mod_model": m_cur, "init_fit": fit0, "mod_fit": fit_cur, "init_mi": mi_df0, "final_mi": final_mi, "mod_log": pd.DataFrame(mod_log), "extra_cov": extra_cov, "was_modified": len(extra_cov) > 0, "ins": ins, "std_col": std_col, "data": data, "all_items": all_items, "base_str": base_str, }, None except Exception as e: return None, f"CFA MI 였λ₯˜: {e}" def run_cfa_sem(df, constructs, hypotheses=None): try: from semopy import Model, calc_stats except ImportError: return None, None, None, None try: mm = "\n".join(f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()) if hypotheses: deps = defaultdict(list) for s, t in hypotheses: deps[t].append(s) mm += "\n" + "\n".join(f" {t} ~ {' + '.join(ss)}" for t, ss in deps.items()) all_items = [i for items in constructs.values() for i in items] data = df[all_items].dropna() model = Model(mm); model.fit(data) ins = model.inspect(std_est=True) std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) stats = calc_stats(model) try: stats_dict = stats.iloc[0].to_dict() if hasattr(stats, "iloc") else dict(stats) except Exception: stats_dict = {} try: chi2 = float(stats_dict.get("chi2", 0)) dof = float(stats_dict.get("DoF", 1)) chi2_bl = float(stats_dict.get("chi2 Baseline", 0)) dof_bl = float(stats_dict.get("DoF Baseline", 1)) pval = float(stats_dict.get("chi2 p-value", 1)) cfi = float(stats_dict.get("CFI", 0)) tli = float(stats_dict.get("TLI", 0)) nfi = float(stats_dict.get("NFI", 0)) rmsea = float(stats_dict.get("RMSEA", 1)) rfi = ((chi2_bl/dof_bl - chi2/dof)/(chi2_bl/dof_bl) if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) ifi = ((chi2_bl - chi2)/(chi2_bl - dof) if (chi2_bl - dof) > 0 else 0.0) fit = {"χ²": round(chi2,3), "df": int(dof), "χ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3), "NFI": round(nfi,3), "RFI": round(rfi,3), "IFI": round(ifi,3), "TLI": round(tli,3), "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} except Exception: fit = {} return ins, std_col, stats_dict, fit except Exception: return None, None, None, None def _extract_load_df(ins, std_col, constructs): lv_names = set(constructs.keys()) all_items = set(i for v in constructs.values() for i in v) if (ins["op"] == "=~").any(): load_df = ins[ins["op"] == "=~"].copy() lv_col, ind_col = "lval", "rval" else: load_df = ins[ins["lval"].isin(all_items) & ins["rval"].isin(lv_names)].copy() lv_col, ind_col = "rval", "lval" cols_needed = [c for c in [lv_col, ind_col, std_col, "Std. Err", "z-value", "p-value"] if c in load_df.columns] load_df = load_df[cols_needed].copy() rename_map = {lv_col: "μž μž¬λ³€μˆ˜", ind_col: "μΈ‘μ •λ³€μˆ˜", std_col: "ν‘œμ€€ν™”κ³„μˆ˜", "Std. Err": "SE", "z-value": "tκ°’", "p-value": "pκ°’_raw"} load_df = load_df.rename(columns=rename_map) if "pκ°’_raw" in load_df.columns: load_df["pκ°’"] = load_df["pκ°’_raw"].apply( lambda p: fmt_p(p) if str(p).strip() not in ("-", "") else "-") load_df = load_df.drop(columns=["pκ°’_raw"]) load_df["ν‘œμ€€ν™”κ³„μˆ˜"] = pd.to_numeric(load_df["ν‘œμ€€ν™”κ³„μˆ˜"], errors="coerce").round(3) for col in ["SE", "tκ°’"]: if col in load_df.columns: load_df[col] = load_df[col].apply( lambda v: round(float(v), 3) if pd.notna(v) and str(v).strip() not in ("None", "", "nan", "-") else "-") return load_df.reset_index(drop=True), lv_col # ══════════════════════════════════════════════════════════════════════════════ # lavaan (R) 기반 CFA / SEM # ══════════════════════════════════════════════════════════════════════════════ def _get_rscript_path(): import subprocess, os, glob try: r = subprocess.run(['Rscript', '--version'], capture_output=True, timeout=10) if r.returncode == 0: return 'Rscript' except Exception: pass candidates = glob.glob(r'C:\Program Files\R\R-*\bin\Rscript.exe') candidates += glob.glob(r'C:\Program Files (x86)\R\R-*\bin\Rscript.exe') if candidates: candidates.sort(reverse=True) return candidates[0] return None def _rscript_available(): return _get_rscript_path() is not None def _parse_fit_csv(path): df = pd.read_csv(path) if df.empty: return {} row = df.iloc[0] chi2 = float(row.get('chisq', 0)) dof = float(row.get('df', 1)) return { 'χ²': round(chi2, 3), 'df': int(dof), 'χ²/df': round(chi2 / dof, 3) if dof > 0 else '-', 'p': round(float(row.get('pvalue', 1)), 6), 'NFI': round(float(row.get('nfi', 0)), 3), 'RFI': round(float(row.get('rfi', 0)), 3), 'IFI': round(float(row.get('ifi', 0)), 3), 'TLI': round(float(row.get('tli', 0)), 3), 'CFI': round(float(row.get('cfi', 0)), 3), 'RMSEA': round(float(row.get('rmsea', 1)), 3), } def _fmt_mi_df(df): if df is None or df.empty: return pd.DataFrame(columns=['λ³€μˆ˜1', 'λ³€μˆ˜2', 'MI']) df = df.copy() df.columns = ['λ³€μˆ˜1', 'λ³€μˆ˜2', 'MI'] df['MI'] = pd.to_numeric(df['MI'], errors='coerce').round(3) return df.sort_values('MI', ascending=False).reset_index(drop=True) def run_lavaan_cfa_with_mi(df, constructs, mi_threshold=3.84, max_mods=200, manual_extra_cov=None): import subprocess, tempfile, os if not _rscript_available(): return None, "Rscriptλ₯Ό 찾을 수 μ—†μŒ. R μ„€μΉ˜ ν›„ PATH 등둝 ν•„μš”." all_items = [i for items in constructs.values() for i in items] item_to_lv = {item: lv for lv, items in constructs.items() for item in items} lv_names = list(constructs.keys()) data = df[all_items].dropna() meas_lines = [f"{lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] manual_pairs = list(manual_extra_cov) if manual_extra_cov else [] extra_lines = [f"{v1} ~~ {v2}" for v1, v2 in manual_pairs] model_base = "\n".join(meas_lines + extra_lines) with tempfile.TemporaryDirectory() as tmpdir: def p(name): return os.path.join(tmpdir, name).replace('\\', '/') data.to_csv(p('data.csv'), index=False) with open(p('model.txt'), 'w', encoding='utf-8') as f: f.write(model_base) with open(p('lvnames.txt'), 'w', encoding='utf-8') as f: f.write('\n'.join(lv_names)) r_script = f''' suppressPackageStartupMessages(library(lavaan)) options(warn=-1) out <- "{p("")}" data <- read.csv(file.path(out,"data.csv")) mdl_base <- paste(readLines(file.path(out,"model.txt"), encoding="UTF-8"), collapse="\\n") lv_names <- readLines(file.path(out,"lvnames.txt"), encoding="UTF-8") mi_thr <- {mi_threshold} max_mods <- {max_mods} get_fit <- function(fit) {{ fm <- fitMeasures(fit, c("chisq","df","pvalue","cfi","tli","nfi","rfi","ifi","rmsea")) data.frame(as.list(fm)) }} is_ok <- function(fit) {{ fm <- fitMeasures(fit, c("cfi","rmsea")) unname(fm["cfi"]) >= 0.95 && unname(fm["rmsea"]) < 0.049 }} get_mi_cov <- function(fit) {{ m <- tryCatch(modindices(fit, sort.=TRUE, maximum.number=500), error=function(e) NULL) if (is.null(m)) return(data.frame(lhs=character(),rhs=character(),mi=numeric())) m <- m[m$op=="~~", c("lhs","rhs","mi")] m[!(m$lhs %in% lv_names & m$rhs %in% lv_names), ] }} fit0 <- tryCatch(cfa(mdl_base, data=data, estimator="ML"), error=function(e) NULL) if (is.null(fit0)) {{ writeLines("ERROR: CFA failed", file.path(out,"status.txt")); quit(status=1) }} write.csv(get_fit(fit0), file.path(out,"fit_init.csv"), row.names=FALSE) mi0 <- get_mi_cov(fit0) write.csv(mi0, file.path(out,"mi_init.csv"), row.names=FALSE) added <- character(0); extra <- character(0); log_rows <- list() fit_cur <- fit0; mdl_cur <- mdl_base for (step in seq_len(max_mods)) {{ if (is_ok(fit_cur)) break mi_c <- get_mi_cov(fit_cur) mi_c <- mi_c[mi_c$mi > mi_thr, ] if (nrow(mi_c)==0) break ok <- sapply(seq_len(nrow(mi_c)), function(i) {{ pk <- paste(sort(c(mi_c$lhs[i], mi_c$rhs[i])), collapse="~~") !(pk %in% added) }}) mi_c <- mi_c[ok, ] if (nrow(mi_c)==0) break v1 <- mi_c$lhs[1]; v2 <- mi_c$rhs[1]; mv <- mi_c$mi[1] pk <- paste(sort(c(v1,v2)), collapse="~~") mdl_new <- paste0(mdl_cur,"\\n ",v1," ~~ ",v2) fit_new <- tryCatch(cfa(mdl_new, data=data, estimator="ML"), error=function(e) NULL) added <- c(added, pk) if (is.null(fit_new)) next fm_b <- fitMeasures(fit_cur, c("cfi","rmsea","rfi","ifi")) fm_a <- fitMeasures(fit_new, c("cfi","rmsea","rfi","ifi")) log_rows[[length(log_rows)+1]] <- data.frame( step=step, v1=v1, v2=v2, mi=round(mv,3), cfi_b=round(unname(fm_b["cfi"]),3), cfi_a=round(unname(fm_a["cfi"]),3), rmsea_b=round(unname(fm_b["rmsea"]),3),rmsea_a=round(unname(fm_a["rmsea"]),3), rfi_b=round(unname(fm_b["rfi"]),3), rfi_a=round(unname(fm_a["rfi"]),3), ifi_b=round(unname(fm_b["ifi"]),3), ifi_a=round(unname(fm_a["ifi"]),3), stringsAsFactors=FALSE) extra <- c(extra, paste(v1, v2, sep=",")) fit_cur <- fit_new; mdl_cur <- mdl_new }} write.csv(get_fit(fit_cur), file.path(out,"fit_mod.csv"), row.names=FALSE) write.csv(as.data.frame(standardizedSolution(fit_cur)), file.path(out,"std_sol.csv"), row.names=FALSE) write.csv(get_mi_cov(fit_cur), file.path(out,"mi_final.csv"), row.names=FALSE) if (length(extra)>0) writeLines(extra, file.path(out,"extra_cov.txt")) if (length(log_rows)>0) {{ write.csv(do.call(rbind, log_rows), file.path(out,"mod_log.csv"), row.names=FALSE) }} writeLines("SUCCESS", file.path(out,"status.txt")) ''' with open(p('cfa.R'), 'w', encoding='utf-8') as f: f.write(r_script) rscript = _get_rscript_path() try: res = subprocess.run([rscript, '--vanilla', p('cfa.R')], capture_output=True, text=True, timeout=300) except subprocess.TimeoutExpired: return None, "R μ‹€ν–‰ μ‹œκ°„ 초과 (5λΆ„)" except Exception as ex: return None, f"Rscript μ‹€ν–‰ 였λ₯˜: {ex}" if not os.path.exists(os.path.join(tmpdir, 'status.txt')): err = res.stderr[-2000:] if res.stderr else "μ•Œ 수 μ—†λŠ” 였λ₯˜" return None, f"R 였λ₯˜:\n{err}" try: fit0 = _parse_fit_csv(os.path.join(tmpdir, 'fit_init.csv')) fit_mod = _parse_fit_csv(os.path.join(tmpdir, 'fit_mod.csv')) std_sol = pd.read_csv(os.path.join(tmpdir, 'std_sol.csv')) mi_init = _fmt_mi_df(pd.read_csv(os.path.join(tmpdir, 'mi_init.csv'))) mf_path = os.path.join(tmpdir, 'mi_final.csv') mi_final = _fmt_mi_df(pd.read_csv(mf_path)) if os.path.exists(mf_path) else pd.DataFrame() except Exception as ex: return None, f"κ²°κ³Ό νŒŒμ‹± 였λ₯˜: {ex}" extra_cov = list(manual_pairs) ec_path = os.path.join(tmpdir, 'extra_cov.txt') if os.path.exists(ec_path): with open(ec_path, encoding='utf-8') as f: for line in f: parts = line.strip().split(',') if len(parts) == 2: t = tuple(parts) if t not in extra_cov: extra_cov.append(t) mod_log_records = [] ml_path = os.path.join(tmpdir, 'mod_log.csv') if os.path.exists(ml_path): log_df = pd.read_csv(ml_path) for _, row in log_df.iterrows(): lv1 = item_to_lv.get(str(row['v1']), str(row['v1'])) lv2 = item_to_lv.get(str(row['v2']), str(row['v2'])) mod_log_records.append({ '단계': int(row['step']), 'μˆ˜μ • 경둜': f"{row['v1']} ~~ {row['v2']}", 'μ†Œμ† μš”μΈ': lv1 if lv1 == lv2 else f'{lv1}↔{lv2}', 'MI': row['mi'], 'CFI μ „β†’ν›„': f"{row['cfi_b']} β†’ {row['cfi_a']}", 'RMSEA μ „β†’ν›„': f"{row['rmsea_b']} β†’ {row['rmsea_a']}", 'RFI μ „β†’ν›„': f"{row.get('rfi_b', '-')} β†’ {row.get('rfi_a', '-')}", 'IFI μ „β†’ν›„': f"{row.get('ifi_b', '-')} β†’ {row.get('ifi_a', '-')}", }) return { 'init_fit': fit0, 'mod_fit': fit_mod, 'init_mi': mi_init, 'final_mi': mi_final, 'mod_log': pd.DataFrame(mod_log_records), 'extra_cov': extra_cov, 'was_modified': len(extra_cov) > 0, 'std_sol': std_sol, 'data': data, 'all_items': all_items, 'lv_names': lv_names, }, None def _extract_load_df_lavaan(std_sol, constructs): try: loads = std_sol[std_sol['op'] == '=~'].copy() cols = [c for c in ['lhs', 'rhs', 'est.std', 'se', 'z', 'pvalue'] if c in loads.columns] loads = loads[cols].copy() rename = {'lhs': 'μž μž¬λ³€μˆ˜', 'rhs': 'μΈ‘μ •λ³€μˆ˜', 'est.std': 'ν‘œμ€€ν™”κ³„μˆ˜', 'se': 'SE', 'z': 'tκ°’', 'pvalue': 'pκ°’_raw'} loads = loads.rename(columns=rename) loads['ν‘œμ€€ν™”κ³„μˆ˜'] = pd.to_numeric(loads['ν‘œμ€€ν™”κ³„μˆ˜'], errors='coerce').round(3) loads['SE'] = pd.to_numeric(loads['SE'], errors='coerce').round(3) loads['tκ°’'] = pd.to_numeric(loads['tκ°’'], errors='coerce').round(3) if 'pκ°’_raw' in loads.columns: loads['pκ°’'] = loads['pκ°’_raw'].apply( lambda p: fmt_p(float(p)) if pd.notna(p) and str(p).strip() not in ('NA', '', 'nan') else '-') loads = loads.drop(columns=['pκ°’_raw']) return loads.reset_index(drop=True) except Exception: return pd.DataFrame() def build_cfa_tables(df, constructs, mi_threshold=3.84, max_mods=200, manual_extra_cov=None): result, err = run_lavaan_cfa_with_mi(df, constructs, mi_threshold, max_mods, manual_extra_cov=manual_extra_cov) use_lavaan = (result is not None) if not use_lavaan: st.warning(f"⚠️ lavaan μ‹€νŒ¨ β†’ semopy μ‚¬μš©\n\n원인: {err}") result, err = run_cfa_with_mi(df, constructs, mi_threshold, max_mods) if result is None: st.error(err or "CFA μ‹€ν–‰ μ‹€νŒ¨") return None, None, None, None, None, None, None, False, [] try: if use_lavaan: std_sol = result["std_sol"] load_df = _extract_load_df_lavaan(std_sol, constructs) rel_rows = [] for lv, items in constructs.items(): lv_loads = std_sol[(std_sol['op'] == '=~') & (std_sol['lhs'] == lv)] lam = pd.to_numeric(lv_loads['est.std'], errors='coerce').dropna().values ave, cr = calc_ave_cr(lam) alpha = cronbach_alpha(df[items]) rel_rows.append({"μž μž¬λ³€μˆ˜": lv, "AVE": ave, "CR": cr, "Cronbach Ξ±": alpha}) else: ins = result["ins"] std_col = result["std_col"] load_df, lv_col = _extract_load_df(ins, std_col, constructs) rel_rows = [] for lv, items in constructs.items(): raw = ins[ins[lv_col] == lv][std_col].values lam = np.array([v for v in raw if str(v).strip() not in ("-","","nan","None")], dtype=float) ave, cr = calc_ave_cr(lam) alpha = cronbach_alpha(df[items]) rel_rows.append({"μž μž¬λ³€μˆ˜": lv, "AVE": ave, "CR": cr, "Cronbach Ξ±": alpha}) return (load_df, pd.DataFrame(rel_rows), result["init_fit"], result["mod_fit"], result["init_mi"], result.get("final_mi", pd.DataFrame()), result["mod_log"], result["was_modified"], result["extra_cov"]) except Exception as e: st.error(f"CFA κ²°κ³Ό 처리 였λ₯˜: {e}") return None, None, None, None, None, None, None, False, [] def build_lavaan_sem_table(df, constructs, hypotheses, cfa_extra_cov=None, mi_threshold=3.84, max_mods=200): import subprocess, tempfile, os if not _rscript_available(): return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) all_items = [i for items in constructs.values() for i in items] lv_names = list(constructs.keys()) lv_set = set(lv_names) item_to_lv = {item: lv for lv, items in constructs.items() for item in items} data = df[all_items].dropna() cfa_cov_pairs = (cfa_extra_cov if isinstance(cfa_extra_cov, list) and (not cfa_extra_cov or isinstance(cfa_extra_cov[0], tuple)) else []) deps = defaultdict(list) for s, t in hypotheses: deps[t].append(s) with tempfile.TemporaryDirectory() as tmpdir: def p(name): return os.path.join(tmpdir, name).replace('\\', '/') data.to_csv(p('data.csv'), index=False) with open(p('meas.txt'), 'w', encoding='utf-8') as f: f.write('\n'.join(f"{lv} =~ {' + '.join(items)}" for lv, items in constructs.items())) with open(p('struct.txt'), 'w', encoding='utf-8') as f: f.write('\n'.join(f"{t} ~ {' + '.join(ss)}" for t, ss in deps.items())) with open(p('lvnames.txt'), 'w', encoding='utf-8') as f: f.write('\n'.join(lv_names)) with open(p('cfa_cov.txt'), 'w', encoding='utf-8') as f: for v1, v2 in cfa_cov_pairs: f.write(f'{v1},{v2}\n') r_script = f''' suppressPackageStartupMessages(library(lavaan)) options(warn=-1) out <- "{p("")}" data <- read.csv(file.path(out,"data.csv")) meas_lines <- readLines(file.path(out,"meas.txt"), encoding="UTF-8") struct_lines <- readLines(file.path(out,"struct.txt"), encoding="UTF-8") lv_names <- readLines(file.path(out,"lvnames.txt"),encoding="UTF-8") cfa_cov_raw <- tryCatch(readLines(file.path(out,"cfa_cov.txt"),encoding="UTF-8"), error=function(e) character(0)) mi_thr <- {mi_threshold} max_mods <- {max_mods} build_model <- function(extra_str=character(0)) {{ cov_lines <- character(0) all_covs <- c(cfa_cov_raw, extra_str) for (cv in all_covs) {{ pts <- strsplit(trimws(cv),",")[[1]] if (length(pts)==2) {{ ln <- paste0(" ",pts[1]," ~~ ",pts[2]) if (!(ln %in% cov_lines)) cov_lines <- c(cov_lines, ln) }} }} paste(c(paste0(" ",meas_lines), cov_lines, paste0(" ",struct_lines)), collapse="\\n") }} get_fit <- function(fit) {{ fm <- fitMeasures(fit, c("chisq","df","pvalue","cfi","tli","nfi","rfi","ifi","rmsea")) data.frame(as.list(fm)) }} is_ok <- function(fit) {{ fm <- fitMeasures(fit, c("cfi","rmsea")) unname(fm["cfi"]) >= 0.95 && unname(fm["rmsea"]) < 0.049 }} get_mi_cov <- function(fit) {{ m <- tryCatch(modindices(fit,sort.=TRUE,maximum.number=500),error=function(e) NULL) if (is.null(m)) return(data.frame(lhs=character(),rhs=character(),mi=numeric())) m <- m[m$op=="~~", c("lhs","rhs","mi")] m[!(m$lhs %in% lv_names & m$rhs %in% lv_names), ] }} fit0 <- tryCatch(sem(build_model(), data=data, estimator="ML"), error=function(e) NULL) if (is.null(fit0)) {{ writeLines("ERROR: SEM initial fit failed", file.path(out,"status.txt")); quit(status=1) }} write.csv(get_fit(fit0), file.path(out,"fit_init.csv"), row.names=FALSE) mi0 <- get_mi_cov(fit0) write.csv(mi0, file.path(out,"mi_init.csv"), row.names=FALSE) added <- character(0); extra <- character(0); log_rows <- list() fit_cur <- fit0 for (step in seq_len(max_mods)) {{ if (is_ok(fit_cur)) break mi_c <- get_mi_cov(fit_cur) mi_c <- mi_c[mi_c$mi > mi_thr, ] if (nrow(mi_c)==0) break ok <- sapply(seq_len(nrow(mi_c)), function(i) {{ pk <- paste(sort(c(mi_c$lhs[i],mi_c$rhs[i])),collapse="~~") !(pk %in% added) }}) mi_c <- mi_c[ok,] if (nrow(mi_c)==0) break v1<-mi_c$lhs[1]; v2<-mi_c$rhs[1]; mv<-mi_c$mi[1] pk <- paste(sort(c(v1,v2)),collapse="~~") extra_new <- c(extra, paste(v1,v2,sep=",")) fit_new <- tryCatch(sem(build_model(extra_new),data=data,estimator="ML"),error=function(e) NULL) added <- c(added, pk) if (is.null(fit_new)) next fm_b <- fitMeasures(fit_cur,c("cfi","rmsea","rfi","ifi")) fm_a <- fitMeasures(fit_new,c("cfi","rmsea","rfi","ifi")) log_rows[[length(log_rows)+1]] <- data.frame( step=step, v1=v1, v2=v2, mi=round(mv,3), cfi_b=round(unname(fm_b["cfi"]),3), cfi_a=round(unname(fm_a["cfi"]),3), rmsea_b=round(unname(fm_b["rmsea"]),3),rmsea_a=round(unname(fm_a["rmsea"]),3), rfi_b=round(unname(fm_b["rfi"]),3), rfi_a=round(unname(fm_a["rfi"]),3), stringsAsFactors=FALSE) extra <- extra_new fit_cur <- fit_new }} write.csv(get_fit(fit_cur), file.path(out,"fit_mod.csv"), row.names=FALSE) write.csv(as.data.frame(standardizedSolution(fit_cur)), file.path(out,"std_sol.csv"), row.names=FALSE) write.csv(get_mi_cov(fit_cur), file.path(out,"mi_final.csv"), row.names=FALSE) if (length(extra)>0) writeLines(extra, file.path(out,"extra_cov.txt")) if (length(log_rows)>0) write.csv(do.call(rbind,log_rows), file.path(out,"mod_log.csv"), row.names=FALSE) writeLines("SUCCESS", file.path(out,"status.txt")) ''' with open(p('sem.R'), 'w', encoding='utf-8') as f: f.write(r_script) rscript = _get_rscript_path() try: res = subprocess.run([rscript, '--vanilla', p('sem.R')], capture_output=True, text=True, timeout=300) except Exception: return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) if not os.path.exists(os.path.join(tmpdir, 'status.txt')): return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) try: fit0 = _parse_fit_csv(os.path.join(tmpdir, 'fit_init.csv')) fit_mod = _parse_fit_csv(os.path.join(tmpdir, 'fit_mod.csv')) std_sol = pd.read_csv(os.path.join(tmpdir, 'std_sol.csv')) mi_init = _fmt_mi_df(pd.read_csv(os.path.join(tmpdir, 'mi_init.csv'))) mf_path = os.path.join(tmpdir, 'mi_final.csv') mi_final = _fmt_mi_df(pd.read_csv(mf_path)) if os.path.exists(mf_path) else pd.DataFrame() except Exception: return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) extra_cov = list(cfa_cov_pairs) ec_path = os.path.join(tmpdir, 'extra_cov.txt') if os.path.exists(ec_path): with open(ec_path, encoding='utf-8') as f: for line in f: parts = line.strip().split(',') if len(parts) == 2: t = tuple(parts) if t not in extra_cov: extra_cov.append(t) mod_log_records = [] ml_path = os.path.join(tmpdir, 'mod_log.csv') if os.path.exists(ml_path): log_df = pd.read_csv(ml_path) for _, row in log_df.iterrows(): lv1 = item_to_lv.get(str(row['v1']), str(row['v1'])) lv2 = item_to_lv.get(str(row['v2']), str(row['v2'])) mod_log_records.append({ '단계': int(row['step']), 'μˆ˜μ • 경둜': f"{row['v1']} ~~ {row['v2']}", 'μ†Œμ† μš”μΈ': lv1 if lv1 == lv2 else f'{lv1}↔{lv2}', 'MI': row['mi'], 'CFI μ „β†’ν›„': f"{row['cfi_b']} β†’ {row['cfi_a']}", 'RMSEA μ „β†’ν›„': f"{row['rmsea_b']} β†’ {row['rmsea_a']}", }) def _sp(v): try: return float(v) if str(v).strip() not in ('NA', '', 'nan') else np.nan except Exception: return np.nan struct = std_sol[(std_sol['op'] == '~') & std_sol['lhs'].isin(lv_set) & std_sol['rhs'].isin(lv_set)].copy() hyp_map = {(s, t): f"H{i+1}" for i, (s, t) in enumerate(hypotheses)} cols_need = [c for c in ['lhs','rhs','est.std','se','z','pvalue'] if c in struct.columns] struct = struct[cols_need].copy() struct.columns = ['μ’…μ†λ³€μˆ˜','λ…λ¦½λ³€μˆ˜','ν‘œμ€€ν™”Ξ²','SE','tκ°’','pκ°’_raw'][:len(cols_need)] struct['κ°€μ„€'] = struct.apply(lambda r: hyp_map.get((r['λ…λ¦½λ³€μˆ˜'], r['μ’…μ†λ³€μˆ˜']), '-'), axis=1) struct['경둜'] = struct['λ…λ¦½λ³€μˆ˜'] + ' β†’ ' + struct['μ’…μ†λ³€μˆ˜'] struct['μœ μ˜μ„±'] = struct['pκ°’_raw'].apply(lambda v: sig_stars(_sp(v))) struct['채택여뢀'] = struct['pκ°’_raw'].apply( lambda v: ('채택' if _sp(v) < 0.05 else '기각') if not np.isnan(_sp(v)) else '-') struct['pκ°’'] = struct['pκ°’_raw'].apply( lambda v: fmt_p(_sp(v)) if not np.isnan(_sp(v)) else '-') for col in ['ν‘œμ€€ν™”Ξ²', 'SE', 'tκ°’']: struct[col] = pd.to_numeric(struct[col], errors='coerce').round(3) struct = struct.drop(columns=['pκ°’_raw']) out_cols = ['κ°€μ„€','경둜','ν‘œμ€€ν™”Ξ²','SE','tκ°’','pκ°’','μœ μ˜μ„±','채택여뢀'] path_df = struct[[c for c in out_cols if c in struct.columns]].reset_index(drop=True) was_mod = len(mod_log_records) > 0 return (path_df, fit0, fit_mod, pd.DataFrame(mod_log_records), extra_cov, was_mod, mi_init, mi_final) def _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov=None, mi_threshold=3.84, max_mods=200): """semopy 기반 SEM (lavaan 폴백)""" try: from semopy import Model, calc_stats except ImportError: st.error("semopy νŒ¨ν‚€μ§€κ°€ ν•„μš”ν•©λ‹ˆλ‹€.") return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() try: item_to_lv = {item: lv for lv, items in constructs.items() for item in items} all_items = [i for items in constructs.values() for i in items] lv_names = set(constructs.keys()) data = df[all_items].dropna() cfa_cov_pairs = (cfa_extra_cov if isinstance(cfa_extra_cov, list) and (not cfa_extra_cov or isinstance(cfa_extra_cov[0], tuple)) else []) meas_lines = [f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] meas_str = "\n".join(meas_lines) lv_keys = list(constructs.keys()) lv_cov_lines = [f" {lv_keys[i]} ~~ {lv_keys[j]}" for i in range(len(lv_keys)) for j in range(i+1, len(lv_keys))] meas_str = meas_str + ("\n" + "\n".join(lv_cov_lines) if lv_cov_lines else "") var_str = "\n".join(f" {lv} ~~ 1*{lv}" for lv in constructs.keys()) deps = defaultdict(list) for s, t in hypotheses: deps[t].append(s) struct_str = "\n".join(f" {t} ~ {' + '.join(ss)}" for t, ss in deps.items()) cov_part = ("\n" + "\n".join(f" {a} ~~ {b}" for a, b in cfa_cov_pairs) if cfa_cov_pairs else "") def _fit_model(model_str): try: m = Model(model_str); m.fit(data); return m except Exception: return None def _extract_fit(m): try: sd = calc_stats(m).iloc[0].to_dict() chi2 = float(sd.get("chi2", 0)) dof = float(sd.get("DoF", 1)) chi2_bl = float(sd.get("chi2 Baseline", 0)) dof_bl = float(sd.get("DoF Baseline", 1)) pval = float(sd.get("chi2 p-value", 1)) cfi = float(sd.get("CFI", 0)) tli = float(sd.get("TLI", 0)) nfi = float(sd.get("NFI", 0)) rmsea = float(sd.get("RMSEA", 1)) rfi = ((chi2_bl/dof_bl - chi2/dof)/(chi2_bl/dof_bl) if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) ifi = ((chi2_bl - chi2)/(chi2_bl - dof) if (chi2_bl - dof) > 0 else 0.0) return {"χ²": round(chi2,3), "df": int(dof), "χ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3), "NFI": round(nfi,3), "RFI": round(rfi,3), "IFI": round(ifi,3), "TLI": round(tli,3), "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} except Exception: return {} def _extract_paths(m): ins = m.inspect(std_est=True) std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) struct = ins[ins["lval"].isin(lv_names) & ins["rval"].isin(lv_names)].copy() cols = [c for c in ["lval","rval",std_col,"Std. Err","z-value","p-value"] if c in struct.columns] struct = struct[cols].copy() rm = {"lval":"μ’…μ†λ³€μˆ˜","rval":"λ…λ¦½λ³€μˆ˜", std_col:"ν‘œμ€€ν™”Ξ²", "Std. Err":"SE", "z-value":"tκ°’", "p-value":"pκ°’_raw"} struct = struct.rename(columns=rm) hyp_map = {(s,t): f"H{i+1}" for i,(s,t) in enumerate(hypotheses)} struct["κ°€μ„€"] = struct.apply( lambda r: hyp_map.get((r["λ…λ¦½λ³€μˆ˜"],r["μ’…μ†λ³€μˆ˜"]),"-"), axis=1) struct["경둜"] = struct["λ…λ¦½λ³€μˆ˜"] + " β†’ " + struct["μ’…μ†λ³€μˆ˜"] def _sp(p): try: return float(p) if str(p).strip() not in("-","","nan") else np.nan except: return np.nan struct["μœ μ˜μ„±"] = struct["pκ°’_raw"].apply(lambda p: sig_stars(_sp(p))) struct["채택여뢀"] = struct["pκ°’_raw"].apply( lambda p: ("채택" if _sp(p)<0.05 else "기각") if not np.isnan(_sp(p)) else "-") struct["pκ°’"] = struct["pκ°’_raw"].apply( lambda p: fmt_p(_sp(p)) if not np.isnan(_sp(p)) else "-") for col in [c for c in ["ν‘œμ€€ν™”Ξ²","SE","tκ°’"] if c in struct.columns]: struct[col] = pd.to_numeric(struct[col], errors="coerce").round(3) out = ["κ°€μ„€","경둜","ν‘œμ€€ν™”Ξ²","SE","tκ°’","pκ°’","μœ μ˜μ„±","채택여뢀"] return struct[[c for c in out if c in struct.columns]].reset_index(drop=True) base_str = meas_str + "\n" + var_str + cov_part + "\n" + struct_str base_no_var = meas_str + cov_part + "\n" + struct_str m0 = _fit_model(base_str) or _fit_model(base_no_var) if m0 is None: return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() fit0 = _extract_fit(m0) path0 = _extract_paths(m0) extra_cov = list(cfa_cov_pairs) added_pairs = {tuple(sorted(p)) for p in cfa_cov_pairs} mod_log = [] m_cur, fit_cur = m0, fit0.copy() for step in range(max_mods): if _adequate_check(fit_cur): break mi_cur = calc_mi_approx(m_cur, data, all_items) avail = mi_cur[(mi_cur["MI"] > mi_threshold) & (~mi_cur.apply( lambda r: tuple(sorted([r["λ³€μˆ˜1"], r["λ³€μˆ˜2"]])) in added_pairs, axis=1))] if avail.empty: break row = avail.iloc[0] v1, v2 = row["λ³€μˆ˜1"], row["λ³€μˆ˜2"] pair = tuple(sorted([v1, v2])) new_cov = extra_cov + [(v1, v2)] new_cov_str = "\n".join(f" {a} ~~ {b}" for a, b in new_cov) new_str = (meas_str + "\n" + var_str + "\n" + new_cov_str + "\n" + struct_str if "\n" + var_str in base_str else meas_str + "\n" + new_cov_str + "\n" + struct_str) m_new = _fit_model(new_str) if m_new is None: added_pairs.add(pair); continue fit_new = _extract_fit(m_new) lv1 = item_to_lv.get(v1, v1); lv2 = item_to_lv.get(v2, v2) mod_log.append({ "단계": step+1, "μˆ˜μ • 경둜": f"{v1} ~~ {v2}", "μ†Œμ† μš”μΈ": lv1 if lv1==lv2 else f"{lv1}↔{lv2}", "MI": round(row["MI"],3), "CFI μ „β†’ν›„": f"{fit_cur.get('CFI','-')}β†’{fit_new.get('CFI','-')}", "RMSEA μ „β†’ν›„": f"{fit_cur.get('RMSEA','-')}β†’{fit_new.get('RMSEA','-')}", }) extra_cov = new_cov; m_cur = m_new; fit_cur = fit_new added_pairs.add(pair) path_final = _extract_paths(m_cur) was_mod = len(mod_log) > 0 init_mi_sem = calc_mi_approx(m0, data, all_items) final_mi_sem = calc_mi_approx(m_cur, data, all_items) return (path_final, fit0, fit_cur, pd.DataFrame(mod_log), extra_cov, was_mod, init_mi_sem, final_mi_sem) except Exception as e: st.error(f"SEM 였λ₯˜: {e}") return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() def run_correlation(df, constructs): try: comp = {lv: df[items].mean(axis=1) for lv, items in constructs.items()} corr = pd.DataFrame(comp).corr().round(3) sqrt_ave = {} try: ins, std_col, _, _ = run_cfa_sem(df, constructs) if ins is not None: lv_names = set(constructs.keys()) all_items = set(i for v in constructs.values() for i in v) use_rval = not (ins["op"] == "=~").any() lv_col = "rval" if use_rval else "lval" for lv, items in constructs.items(): raw = ins[ins[lv_col] == lv][std_col].values lam = np.array([v for v in raw if str(v).strip() not in ("-","","nan","None")], dtype=float) ave, _ = calc_ave_cr(lam) sqrt_ave[lv] = round(float(np.sqrt(ave)), 3) except Exception: pass tbl = corr.copy().astype(object) for lv in constructs: if lv in tbl.columns: tbl.loc[lv, lv] = sqrt_ave.get(lv, "-") for i in range(len(tbl.columns)): for j in range(i + 1, len(tbl.columns)): tbl.iloc[i, j] = "" return tbl.reset_index().rename(columns={"index": "λ³€μˆ˜"}) except Exception as e: st.error(f"상관관계 뢄석 였λ₯˜: {e}") return pd.DataFrame() # ══════════════════════════════════════════════════════════════════════════════ # Excel 생성 # ══════════════════════════════════════════════════════════════════════════════ def build_single_sheet_excel(sheets: dict) -> bytes: try: from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side import io as _io wb = Workbook() ws = wb.active ws.title = "뢄석결과" _hdr_font = Font(name="Arial", bold=True, color="FFFFFF", size=10) _hdr_fill = PatternFill("solid", fgColor="2F5496") _ttl_font = Font(name="Arial", bold=True, size=12, color="2F5496") _note_font = Font(name="Arial", size=9, color="595959", italic=True) _ctr = Alignment(horizontal="center", vertical="center", wrap_text=True) _lft = Alignment(horizontal="left", vertical="center") _bd = Border(left=Side(style="thin"), right=Side(style="thin"), top=Side(style="thin"), bottom=Side(style="thin")) _adopt_fill_y = PatternFill("solid", fgColor="E2EFDA") _adopt_fill_n = PatternFill("solid", fgColor="FFC7CE") _adopt_font_y = Font(name="Arial", size=10, color="375623") _adopt_font_n = Font(name="Arial", size=10, color="9C0006") cur_row = 1 for name, payload in sheets.items(): try: title = payload[0] df_data = payload[1] note = payload[2] if len(payload) > 2 else "" adopt_col = payload[3] if len(payload) > 3 else None if df_data is None or len(df_data) == 0: continue cols = list(df_data.columns) nc = max(len(cols), 1) adopt_idx = (cols.index(adopt_col) + 1 if adopt_col and adopt_col in cols else None) ws.merge_cells(start_row=cur_row, start_column=1, end_row=cur_row, end_column=nc) cell = ws.cell(row=cur_row, column=1, value=title) cell.font = _ttl_font cur_row += 1 for ci, col in enumerate(cols, 1): c = ws.cell(row=cur_row, column=ci, value=str(col)) c.font = _hdr_font; c.fill = _hdr_fill c.alignment = _ctr; c.border = _bd cur_row += 1 for row in df_data.itertuples(index=False): for ci, val in enumerate(row, 1): safe_val = ("" if isinstance(val, float) and (val != val) else val) cell = ws.cell(row=cur_row, column=ci, value=safe_val) cell.alignment = _lft if ci <= 2 else _ctr cell.border = _bd if adopt_idx and ci == adopt_idx: is_y = str(val) == "채택" cell.fill = _adopt_fill_y if is_y else _adopt_fill_n cell.font = _adopt_font_y if is_y else _adopt_font_n cur_row += 1 if note: nc_cell = ws.cell(row=cur_row, column=1, value=note) nc_cell.font = _note_font cur_row += 1 cur_row += 2 except Exception: pass for col_cells in ws.columns: try: w = max((len(str(c.value)) if c.value else 0) for c in col_cells) ws.column_dimensions[col_cells[0].column_letter].width = min(w + 4, 45) except Exception: pass buf = _io.BytesIO() wb.save(buf) return buf.getvalue() except Exception: from openpyxl import Workbook import io as _io wb = Workbook(); buf = _io.BytesIO(); wb.save(buf); return buf.getvalue() # ══════════════════════════════════════════════════════════════════════════════ # UI # ══════════════════════════════════════════════════════════════════════════════ st.markdown('

πŸ“Š SEM 뢄석기

', unsafe_allow_html=True) st.markdown('

λΉˆλ„λΆ„μ„ Β· κΈ°μˆ ν†΅κ³„ Β· 신뒰도 Β· CFA Β· 상관관계 Β· SEM

', unsafe_allow_html=True) # ── STEP 1: 파일 μ—…λ‘œλ“œ ─────────────────────────────────────────────────────── st.markdown("### πŸ“ STEP 1. 데이터 μ—…λ‘œλ“œ") uploaded = st.file_uploader("Excel(.xlsx/.xls) λ˜λŠ” CSV 파일", type=["xlsx","xls","csv"]) if not uploaded: st.info("νŒŒμΌμ„ μ—…λ‘œλ“œν•˜λ©΄ 뢄석이 μ‹œμž‘λ©λ‹ˆλ‹€.") st.stop() try: if uploaded.name.lower().endswith(".csv"): df = pd.read_csv(uploaded) else: df = pd.read_excel(uploaded) except Exception as e: st.error(f"파일 읽기 였λ₯˜: {e}") st.stop() for c in df.columns: try: converted = pd.to_numeric(df[c], errors="coerce") if converted.notna().sum() == df[c].notna().sum() and df[c].notna().sum() > 0: df[c] = converted except Exception: pass st.success(f"βœ… 파일 λ‘œλ“œ μ™„λ£Œ β€” {len(df)}ν–‰ Γ— {len(df.columns)}μ—΄") with st.expander("πŸ“‹ 데이터 미리보기 (μƒμœ„ 10ν–‰)"): st.dataframe(df.head(10), use_container_width=True) # ── STEP 1-5: μ΄μƒμΉ˜ 탐지 ──────────────────────────────────────────────────── st.markdown("---") st.markdown("### πŸ”Ž STEP 1-5. μ΄μƒμΉ˜ 탐지 (λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리 + SMC)") num_cols_all = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] tab_maha, tab_smc = st.tabs(["πŸ“ λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리", "πŸ“ SMC (λ‹€μ€‘μƒκ΄€μžμŠΉ)"]) with tab_maha: st.caption("χ² 뢄포 κΈ°μ€€μœΌλ‘œ λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리가 μž„κ³„κ°’μ„ μ΄ˆκ³Όν•˜λŠ” **ν‘œλ³Έ(ν–‰)**을 μ΄μƒμΉ˜λ‘œ νƒμ§€ν•©λ‹ˆλ‹€.") col_a, col_b = st.columns([3, 1]) outlier_cols = col_a.multiselect("뢄석 λ³€μˆ˜ 선택", num_cols_all, default=num_cols_all, key="oc_cols") p_thr = col_b.selectbox("μœ μ˜μˆ˜μ€€", [0.001, 0.005, 0.01, 0.05], index=0, key="oc_pthr") if outlier_cols and st.button("πŸ” λ§ˆν• λΌλ…ΈλΉ„μŠ€ 탐지 μ‹€ν–‰", key="oc_run"): oc_result, oc_cutoff, oc_idx = detect_outliers_mahalanobis(df, outlier_cols, p_threshold=p_thr) st.session_state["oc_result"] = oc_result st.session_state["oc_cutoff"] = oc_cutoff st.session_state["oc_idx"] = oc_idx st.session_state.pop("oc_remove_rows", None) if "oc_result" in st.session_state and st.session_state["oc_result"] is not None: oc_result = st.session_state["oc_result"] oc_cutoff = st.session_state["oc_cutoff"] oc_idx = st.session_state["oc_idx"] n_out = len(oc_idx) st.markdown(f"**χ² μž„κ³„κ°’:** `{oc_cutoff}`") col_r1, col_r2, col_r3 = st.columns(3) col_r1.metric("전체 ν‘œλ³Έ", len(df)) col_r2.metric("νƒμ§€λœ μ΄μƒμΉ˜", n_out, delta=f"-{n_out}" if n_out > 0 else "μ—†μŒ", delta_color="inverse") col_r3.metric("제거 ν›„ ν‘œλ³Έ", len(df) - n_out) disp_all = oc_result.sort_values("λ§ˆν• λΌλ…ΈλΉ„μŠ€ 거리", ascending=False).copy() disp_all["νŒμ •"] = disp_all["μ΄μƒμΉ˜"].map({True: "⚠️ μ΄μƒμΉ˜", False: "βœ… 정상"}) disp_all = disp_all.drop(columns=["μ΄μƒμΉ˜"]) with st.expander("πŸ“‹ λ§ˆν• λΌλ…ΈλΉ„μŠ€ 전체 κ²°κ³Ό", expanded=(n_out > 0)): st.dataframe(disp_all, use_container_width=True, hide_index=True) if n_out > 0: outlier_row_nums = (oc_result[oc_result["μ΄μƒμΉ˜"] == True]["원본 ν–‰λ²ˆν˜Έ"].tolist() if "μ΄μƒμΉ˜" in oc_result.columns else []) _default_rows = [r for r in st.session_state.get("oc_remove_rows", outlier_row_nums) if r in outlier_row_nums] st.multiselect(f"πŸ—‘οΈ μ œκ±°ν•  ν‘œλ³Έ 선택 (총 {n_out}개 탐지)", options=outlier_row_nums, default=_default_rows, key="oc_remove_rows") else: st.success("βœ… μ΄μƒμΉ˜κ°€ νƒμ§€λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.") with tab_smc: st.caption("각 λ³€μˆ˜λ₯Ό λ‚˜λ¨Έμ§€ λ³€μˆ˜λ‘œ νšŒκ·€λΆ„μ„ν•œ RΒ²(SMC) < .20 이면 λ³€μˆ˜ 제거λ₯Ό κ³ λ €ν•©λ‹ˆλ‹€.") smc_cols = st.multiselect("SMC 뢄석 λ³€μˆ˜ 선택", num_cols_all, default=num_cols_all, key="smc_cols") if smc_cols and st.button("πŸ” SMC 탐지 μ‹€ν–‰", key="smc_run"): with st.spinner("SMC 계산 쀑..."): smc_result = compute_smc(df, smc_cols) st.session_state["smc_result"] = smc_result st.session_state.pop("smc_drop_vars", None) if "smc_result" in st.session_state and st.session_state["smc_result"] is not None: smc_result = st.session_state["smc_result"] low_smc = smc_result[smc_result["SMC"].notna() & (smc_result["SMC"] < 0.2)] n_low = len(low_smc) col_s1, col_s2 = st.columns(2) col_s1.metric("뢄석 λ³€μˆ˜ 수", len(smc_result)) col_s2.metric("SMC < .20 λ³€μˆ˜", n_low) with st.expander("πŸ“‹ SMC 뢄석 κ²°κ³Ό", expanded=(n_low > 0)): st.dataframe(smc_result.sort_values("SMC").reset_index(drop=True), use_container_width=True, hide_index=True) if n_low > 0: low_vars = low_smc["λ³€μˆ˜"].tolist() _default_vars = [v for v in st.session_state.get("smc_drop_vars", low_vars) if v in smc_result["λ³€μˆ˜"].tolist()] st.multiselect(f"πŸ—‘οΈ μ œκ±°ν•  λ³€μˆ˜ 선택 (SMC < .20, 총 {n_low}개)", options=smc_result["λ³€μˆ˜"].tolist(), default=_default_vars, key="smc_drop_vars") else: st.success("βœ… SMC < .20 λ³€μˆ˜κ°€ μ—†μŠ΅λ‹ˆλ‹€.") # ── μ΄μƒμΉ˜/λ³€μˆ˜ 제거 적용 ───────────────────────────────────────────────────── _removed_rows = st.session_state.get("oc_remove_rows", []) _dropped_vars = st.session_state.get("smc_drop_vars", []) if _removed_rows: _remove_idx = [int(r) - 1 for r in _removed_rows] _valid_idx = [i for i in _remove_idx if i in df.index] if _valid_idx: df = df.drop(index=_valid_idx).reset_index(drop=True) st.info(f"πŸ—‘οΈ μ΄μƒμΉ˜ **{len(_valid_idx)}개 ν‘œλ³Έ** 제거 β†’ 뢄석 ν‘œλ³Έ: **{len(df)}개**") if _dropped_vars: _actual_drop = [v for v in _dropped_vars if v in df.columns] if _actual_drop: df = df.drop(columns=_actual_drop) st.info(f"πŸ—‘οΈ SMC λ³€μˆ˜ **{len(_actual_drop)}개** 제거: `{', '.join(_actual_drop)}`") # ── STEP 2: κ΅¬μ„±κ°œλ… 탐지 ───────────────────────────────────────────────────── st.markdown("---") st.markdown("### πŸ” STEP 2. κ΅¬μ„±κ°œλ… μžλ™ 탐지") constructs_auto = auto_detect_constructs(df) c1, c2, c3 = st.columns(3) c1.metric("전체 λ³€μˆ˜", len(df.columns)) c2.metric("유효 사둀", len(df.dropna())) c3.metric("νƒμ§€λœ κ΅¬μ„±κ°œλ…", len(constructs_auto)) if constructs_auto: for lv, items in constructs_auto.items(): st.markdown(f"- **{lv}** ({len(items)}λ¬Έν•­): {', '.join(items)}") else: st.warning("μžλ™ νƒμ§€λœ κ΅¬μ„±κ°œλ…μ΄ μ—†μŠ΅λ‹ˆλ‹€. μ•„λž˜μ—μ„œ 직접 μ„€μ •ν•΄ μ£Όμ„Έμš”.") with st.expander("✏️ κ΅¬μ„±κ°œλ… 직접 νŽΈμ§‘ (ν•„μš” μ‹œ)"): n_c = st.number_input("κ΅¬μ„±κ°œλ… 수", min_value=1, max_value=20, value=max(len(constructs_auto), 1), key="n_constructs") auto_k = list(constructs_auto.keys()) auto_v = list(constructs_auto.values()) constructs_edit = {} for i in range(int(n_c)): ca, cb = st.columns([1, 3]) default_name = auto_k[i] if i < len(auto_k) else f"LV{i+1}" default_items = auto_v[i] if i < len(auto_v) else [] nm = ca.text_input(f"이름 {i+1}", value=default_name, key=f"cn_{i}") it = cb.multiselect(f"λ¬Έν•­ {i+1}", df.columns.tolist(), default=default_items, key=f"ci_{i}") if nm and len(it) >= 2: constructs_edit[nm] = it constructs = constructs_edit if constructs_edit else constructs_auto if not constructs: constructs = constructs_auto # ── STEP 3: 뢄석 방법 선택 ──────────────────────────────────────────────────── st.markdown("---") st.markdown("### βœ… STEP 3. 뢄석 방법 선택") num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] cat_cols = [c for c in df.columns if detect_scale(df[c]) in ("binary","categorical")] lv_list = list(constructs.keys()) n_valid = len(df.dropna()) suggestions = { "λΉˆλ„λΆ„μ„": {"ok": len(cat_cols) > 0, "reason": f"λ²”μ£Όν˜• λ³€μˆ˜ {len(cat_cols)}개 감지"}, "κΈ°μˆ ν†΅κ³„": {"ok": len(num_cols) > 0, "reason": f"μˆ˜μΉ˜ν˜• λ³€μˆ˜ {len(num_cols)}개"}, "신뒰도 (Cronbach's Ξ±)": {"ok": len(constructs) > 0, "reason": f"κ΅¬μ„±κ°œλ… {len(constructs)}개 탐지"}, "확인적 μš”μΈλΆ„μ„ (CFA)": {"ok": len(constructs) >= 2 and n_valid >= 100, "reason": f"κ΅¬μ„±κ°œλ… {len(constructs)}개, N={n_valid}"}, "상관관계 뢄석": {"ok": len(constructs) >= 2, "reason": f"μž μž¬λ³€μˆ˜ {len(constructs)}개"}, "ꡬ쑰방정식 (SEM)": {"ok": len(constructs) >= 3 and n_valid >= 200, "reason": f"κ΅¬μ„±κ°œλ… {len(constructs)}개, N={n_valid}"}, } selected = {} for method, info in suggestions.items(): icon = "βœ…" if info["ok"] else "⚠️" col_a, col_b = st.columns([1, 10]) selected[method] = col_a.checkbox(f"{icon}", value=info["ok"], key=f"sel_{method}") col_b.markdown( f'
{method}' f' {info["reason"]}
', unsafe_allow_html=True) # ── SEM κ°€μ„€ μ„€μ • ───────────────────────────────────────────────────────────── hypotheses = [] if selected.get("ꡬ쑰방정식 (SEM)") and len(lv_list) >= 2: st.markdown("---\n#### πŸ”— SEM κ°€μ„€ μ„€μ •") if "hyps" not in st.session_state: st.session_state.hyps = [("", "")] if st.button("οΌ‹ κ°€μ„€ μΆ”κ°€"): st.session_state.hyps.append(("", "")) to_delete = None for i, (sd, td) in enumerate(st.session_state.hyps): ca, cb, cc = st.columns([3, 3, 1]) s = ca.selectbox(f"H{i+1} λ…λ¦½λ³€μˆ˜", lv_list, index=lv_list.index(sd) if sd in lv_list else 0, key=f"hs_{i}") t = cb.selectbox(f"H{i+1} μ’…μ†λ³€μˆ˜", lv_list, index=lv_list.index(td) if td in lv_list else 0, key=f"ht_{i}") if cc.button("μ‚­μ œ", key=f"hd_{i}"): to_delete = i if s != t: hypotheses.append((s, t)) st.session_state.hyps[i] = (s, t) if to_delete is not None and len(st.session_state.hyps) > 1: st.session_state.hyps.pop(to_delete) st.rerun() # ── STEP 4: μ‹€ν–‰ ────────────────────────────────────────────────────────────── st.markdown("---") _run_btn = st.button("πŸš€ 뢄석 μ‹€ν–‰", type="primary", use_container_width=True) if _run_btn: for _k in ["_sem_applied", "_sem_init", "_sem_manual_pairs", "_sem_df", "_sem_constructs", "_sem_hypotheses", "_sem_cfa_cov", "_cfa_applied", "_cfa_init", "_cfa_df", "_cfa_constructs", "_cfa_manual_pairs"]: st.session_state.pop(_k, None) if not _run_btn and "_tab_data" not in st.session_state: st.stop() if not _run_btn and "_tab_data" in st.session_state: tab_data = st.session_state["_tab_data"] tab_labels = st.session_state["_tab_labels"] xlsx_sheets = st.session_state.get("_xlsx_sheets", {}) if "_cfa_applied" in st.session_state and "CFA" in tab_data: tab_data["CFA"] = st.session_state["_cfa_applied"] _lds, _rd, _fi, _fm, _im, _fmi, _ml, _wm, _ec = tab_data["CFA"] if _lds is not None: xlsx_sheets["ν‘œ2_CFA"] = ("CFA β€” μš”μΈλΆ€ν•˜λŸ‰", _lds, "β€» ν‘œμ€€ν™”κ³„μˆ˜β‰₯.50, AVEβ‰₯.50, CRβ‰₯.70") xlsx_sheets["ν‘œ2_신뒰도"] = ("CFA β€” AVE/CR/Ξ±", _rd, "") if _wm and not _ml.empty: xlsx_sheets["CFA_μˆ˜μ •κ³Όμ •"] = ("CFA β€” MI 기반 μˆ˜μ • κ³Όμ •", _ml, "β€» MI>3.84 κΈ°μ€€") if "_sem_applied" in st.session_state and "SEM" in tab_data: tab_data["SEM"] = st.session_state["_sem_applied"] _sp, _fi, _fm, _ml, _ec, _wm, _im, _fm2 = tab_data["SEM"] if _sp is not None: xlsx_sheets["ν‘œ4_가섀검증"] = ( "SEM κ°€μ„€ 검증", _sp, "β€» ***p<.001 **p<.01 *p<.05 n.s.=μœ μ˜ν•˜μ§€μ•ŠμŒ", "채택여뢀") if _wm and not _ml.empty: xlsx_sheets["SEM_μˆ˜μ •κ³Όμ •"] = ("SEM β€” MI 기반 μˆ˜μ • κ³Όμ •", _ml, "β€» MI>3.84 κΈ°μ€€") else: xlsx_sheets = {} tab_labels = [] tab_data = {} cfa_extra_cov = [] # λΉˆλ„λΆ„μ„ if selected.get("λΉˆλ„λΆ„μ„") and cat_cols: try: freq_res = {} for c in cat_cols[:10]: f = df[c].value_counts().sort_index() pct = (f / f.sum() * 100).round(1) fdf = pd.DataFrame({"λΉˆλ„": f, "λ°±λΆ„μœ¨(%)": pct, "λˆ„μ (%)": pct.cumsum().round(1)}) fdf.index.name = c freq_res[c] = fdf tab_data["λΉˆλ„λΆ„μ„"] = freq_res tab_labels.append("λΉˆλ„λΆ„μ„") for cn, fdf in freq_res.items(): xlsx_sheets[f"λΉˆλ„_{cn}"[:31]] = (f"λΉˆλ„λΆ„μ„ β€” {cn}", fdf.reset_index(), "") except Exception as e: st.warning(f"λΉˆλ„λΆ„μ„ 였λ₯˜: {e}") # κΈ°μˆ ν†΅κ³„ if selected.get("κΈ°μˆ ν†΅κ³„") and num_cols: try: desc = df[num_cols].describe().T[["count","mean","std","min","max"]].copy() desc.columns = ["N","평균","ν‘œμ€€νŽΈμ°¨","μ΅œμ†Ÿκ°’","μ΅œλŒ“κ°’"] desc["μ™œλ„"] = df[num_cols].skew() desc["첨도"] = df[num_cols].kurt() desc = desc.round(3) tab_data["κΈ°μˆ ν†΅κ³„"] = desc tab_labels.append("κΈ°μˆ ν†΅κ³„") xlsx_sheets["κΈ°μˆ ν†΅κ³„"] = ("κΈ°μˆ ν†΅κ³„", desc.reset_index().rename(columns={"index":"λ³€μˆ˜"}), "") except Exception as e: st.warning(f"κΈ°μˆ ν†΅κ³„ 였λ₯˜: {e}") # 신뒰도 if selected.get("신뒰도 (Cronbach's Ξ±)") and constructs: try: rel_rows = [] for lv, items in constructs.items(): alpha = cronbach_alpha(df[items]) rel_rows.append({"κ΅¬μ„±κ°œλ…": lv, "λ¬Έν•­ 수": len(items), "Cronbach Ξ±": alpha, "νŒμ •": "μΆ©μ‘± βœ“" if (not np.isnan(alpha) and alpha >= 0.7) else "λ―ΈμΆ©μ‘± βœ—"}) rel_df = pd.DataFrame(rel_rows) tab_data["신뒰도"] = rel_df tab_labels.append("신뒰도") xlsx_sheets["신뒰도뢄석"] = ("신뒰도 뢄석 (Cronbach's Ξ±)", rel_df, "β€» Ξ± β‰₯ .70 ꢌμž₯") except Exception as e: st.warning(f"신뒰도 뢄석 였λ₯˜: {e}") # CFA if selected.get("확인적 μš”μΈλΆ„μ„ (CFA)") and constructs: with st.spinner("CFA 초기 뢄석 쀑..."): try: loads, rel_df, fit_init, fit_mod, init_mi, final_mi, mod_log, was_mod, extra_cov = \ build_cfa_tables(df, constructs, max_mods=0) if loads is not None: st.session_state["_cfa_init"] = (loads, rel_df, fit_init, fit_mod, init_mi, final_mi, mod_log, was_mod, extra_cov) st.session_state["_cfa_df"] = df st.session_state["_cfa_constructs"] = constructs _cfa_display = st.session_state.get("_cfa_applied", st.session_state["_cfa_init"]) tab_data["CFA"] = _cfa_display tab_labels.append("CFA") _lds, _rd, _fi, _fm, _im, _fmi, _ml, _wm, _ec = _cfa_display cfa_extra_cov = list(_ec) if _ec else [] if _lds is not None: xlsx_sheets["ν‘œ2_CFA"] = ("CFA β€” μš”μΈλΆ€ν•˜λŸ‰", _lds, "β€» ν‘œμ€€ν™”κ³„μˆ˜β‰₯.50, AVEβ‰₯.50, CRβ‰₯.70") xlsx_sheets["ν‘œ2_신뒰도"] = ("CFA β€” AVE/CR/Ξ±", _rd, "") if _wm and not _ml.empty: xlsx_sheets["CFA_μˆ˜μ •κ³Όμ •"] = ("CFA β€” MI 기반 μˆ˜μ • κ³Όμ •", _ml, "β€» MI>3.84 κΈ°μ€€") if not _im.empty: xlsx_sheets["CFA_MI초기"] = ("CFA β€” 초기 μˆ˜μ •μ§€μˆ˜(MI)", _im.head(20), "") if not _fmi.empty: xlsx_sheets["CFA_MIμ΅œμ’…"] = ("CFA β€” μ΅œμ’… μˆ˜μ •μ§€μˆ˜(MI)", _fmi.head(20), "") else: st.warning("⚠️ CFA κ²°κ³Ό 생성 μ‹€νŒ¨.") except Exception as e: st.warning(f"CFA 였λ₯˜: {e}") # 상관관계 if selected.get("상관관계 뢄석") and constructs: try: corr_tbl = run_correlation(df, constructs) if not corr_tbl.empty: tab_data["상관관계"] = corr_tbl tab_labels.append("상관관계") xlsx_sheets["ν‘œ3_상관관계"] = ( "상관관계 뢄석 (λŒ€κ°μ„ =√AVE)", corr_tbl, "β€» λŒ€κ°μ„ : √AVE | νŒλ³„νƒ€λ‹Ήλ„ κΈ°μ€€: √AVE > 타 λ³€μˆ˜ μƒκ΄€κ³„μˆ˜") except Exception as e: st.warning(f"상관관계 였λ₯˜: {e}") # SEM if selected.get("ꡬ쑰방정식 (SEM)") and constructs and hypotheses: with st.spinner("SEM 초기 뢄석 쀑..."): try: sem_paths, fit_init_sem, fit_mod_sem, mod_log_sem, extra_cov_sem, was_mod_sem, \ init_mi_sem, final_mi_sem = \ build_lavaan_sem_table(df, constructs, hypotheses, cfa_extra_cov=cfa_extra_cov, max_mods=0) if sem_paths is not None: st.session_state["_sem_init"] = ( sem_paths, fit_init_sem, fit_mod_sem, mod_log_sem, extra_cov_sem, was_mod_sem, init_mi_sem, final_mi_sem) st.session_state["_sem_df"] = df st.session_state["_sem_constructs"] = constructs st.session_state["_sem_hypotheses"] = hypotheses st.session_state["_sem_cfa_cov"] = cfa_extra_cov _sem_display = st.session_state.get("_sem_applied", st.session_state["_sem_init"]) tab_data["SEM"] = _sem_display tab_labels.append("SEM") _sp, _fi, _fm, _ml, _ec, _wm, _im, _fm2 = _sem_display if _sp is not None: xlsx_sheets["ν‘œ4_가섀검증"] = ( "SEM κ°€μ„€ 검증", _sp, "β€» ***p<.001 **p<.01 *p<.05 n.s.=μœ μ˜ν•˜μ§€μ•ŠμŒ", "채택여뢀") if _wm and not _ml.empty: xlsx_sheets["SEM_μˆ˜μ •κ³Όμ •"] = ( "SEM β€” MI 기반 μˆ˜μ • κ³Όμ •", _ml, "β€» MI>3.84 κΈ°μ€€") else: st.warning("⚠️ SEM κ²°κ³Όλ₯Ό μƒμ„±ν•˜μ§€ λͺ»ν–ˆμŠ΅λ‹ˆλ‹€.") except Exception as e: st.warning(f"SEM 였λ₯˜: {e}") st.session_state["_tab_data"] = tab_data st.session_state["_tab_labels"] = tab_labels st.session_state["_xlsx_sheets"] = xlsx_sheets # ── κ²°κ³Ό 좜λ ₯ ────────────────────────────────────────────────────────────────── st.markdown("---") st.markdown("## πŸ“‹ 뢄석 κ²°κ³Ό") if not tab_labels: st.warning("μ‹€ν–‰λœ 뢄석이 μ—†μŠ΅λ‹ˆλ‹€.") st.stop() FIT_NOTE = "ꢌμž₯ κΈ°μ€€: NFIΒ·RFIΒ·IFIΒ·TLIΒ·CFI β‰₯ .90 | RMSEA < .049 | SRMR ≀ .08" tabs = st.tabs(tab_labels) for tab, lbl in zip(tabs, tab_labels): with tab: c = tab_data[lbl] if lbl == "λΉˆλ„λΆ„μ„": for col, fdf in c.items(): st.markdown(f"**{col}**") st.dataframe(fdf, use_container_width=True) elif lbl == "κΈ°μˆ ν†΅κ³„": st.dataframe(c, use_container_width=True) elif lbl == "신뒰도": st.dataframe(c, use_container_width=True) st.caption("Ξ± β‰₯ .70: 내적 일관성 확보") elif lbl == "CFA": loads, rel_df, fit_init, fit_mod, init_mi, final_mi, mod_log, was_mod, extra_cov = c crit_df = pd.DataFrame([ {"μ§€μˆ˜": "NFI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"}, {"μ§€μˆ˜": "RFI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"}, {"μ§€μˆ˜": "IFI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"}, {"μ§€μˆ˜": "TLI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"}, {"μ§€μˆ˜": "CFI", "ꢌμž₯κΈ°μ€€": "β‰₯ .90"}, {"μ§€μˆ˜": "RMSEA", "ꢌμž₯κΈ°μ€€": "< .049"}, {"μ§€μˆ˜": "SRMR", "ꢌμž₯κΈ°μ€€": "≀ .08"}, ]) with st.expander("πŸ“Œ 적합도 ꢌμž₯ κΈ°μ€€"): st.dataframe(crit_df, use_container_width=True, hide_index=True) st.markdown("**λͺ¨λΈ 적합도 비ꡐ**") fit_col_order = ["χ²", "df", "χ²/df", "p", "NFI", "RFI", "IFI", "TLI", "CFI", "RMSEA"] def _fit_row(label, fit): row = {"λͺ¨ν˜•": label} for k in fit_col_order: row[k] = fit.get(k, "-") return row if was_mod: fit_cmp = pd.DataFrame([_fit_row("초기 λͺ¨ν˜•", fit_init), _fit_row("μˆ˜μ • λͺ¨ν˜•", fit_mod)]) else: fit_cmp = pd.DataFrame([_fit_row("초기 λͺ¨ν˜•", fit_init)]) def _hl_fit(row): colors = [""] * len(row) checks = {"NFI":.90,"RFI":.90,"IFI":.90,"TLI":.90,"CFI":.90} cols = list(row.index) for idx, col in enumerate(cols): v = row[col] try: fv = float(v) if col in checks and fv < checks[col]: colors[idx] = "background-color:#FFE0E0" elif col == "RMSEA" and fv >= 0.049: colors[idx] = "background-color:#FFE0E0" elif col in checks and fv >= checks[col]: colors[idx] = "background-color:#E8F5E9" elif col == "RMSEA" and fv < 0.049: colors[idx] = "background-color:#E8F5E9" except Exception: pass return colors st.dataframe(fit_cmp.style.apply(_hl_fit, axis=1), use_container_width=True) st.caption("🟒 κΈ°μ€€ μΆ©μ‘± | πŸ”΄ κΈ°μ€€ λ―ΈμΆ©μ‘± | RMSEA < .049 ꢌμž₯") if was_mod and extra_cov: st.success(f"βœ… MI 기반 λͺ¨ν˜•μˆ˜μ • {len(extra_cov)}회 적용") paths_str = " / ".join(f"{a} ~~ {b}" for a, b in extra_cov) st.markdown(f"**μˆ˜μ •λœ 경둜:** `{paths_str}`") with st.expander("πŸ“‹ μˆ˜μ • κ³Όμ • 단계별 상세"): st.dataframe(mod_log, use_container_width=True) elif not was_mod: all_ok = _adequate_check(fit_init) if all_ok: st.success("βœ… 초기 λͺ¨ν˜• 적합도 μ–‘ν˜Έ β€” μˆ˜μ • λΆˆν•„μš”") else: st.info("ℹ️ 적합도 λ―ΈμΆ©μ‘±μ΄λ‚˜ MI > 3.84 쌍이 μ—†μ–΄ 더 이상 μˆ˜μ • λΆˆκ°€") # CFA MI μˆ˜λ™ 선택 _cfa_init_mi = st.session_state.get("_cfa_init", (None,)*9)[4] if _cfa_init_mi is not None and not _cfa_init_mi.empty: _cfa_mi_over = _cfa_init_mi[_cfa_init_mi["MI"] > 3.84] if not _cfa_mi_over.empty: st.markdown("---") st.markdown("#### πŸ› οΈ μž”μ°¨κ³΅λΆ„μ‚° μˆ˜λ™ μΆ”κ°€") st.caption("MI > 3.84 쌍 쀑 μ μš©ν•  ν•­λͺ©μ„ μ„ νƒν•œ λ’€ **μž¬μ‹€ν–‰** λ²„νŠΌμ„ λˆŒλŸ¬μ£Όμ„Έμš”.") _cfa_opts = [f"{row['λ³€μˆ˜1']} ~~ {row['λ³€μˆ˜2']} (MI = {row['MI']})" for _, row in _cfa_mi_over.iterrows()] _cfa_cur_pairs = st.session_state.get("_cfa_manual_pairs", []) _cfa_cur_opts = [o for o in _cfa_opts if any(f"{v1} ~~ {v2}" in o for v1, v2 in _cfa_cur_pairs)] _cfa_sel = st.multiselect("μ μš©ν•  μž”μ°¨κ³΅λΆ„μ‚° 경둜 선택", options=_cfa_opts, default=_cfa_cur_opts, key="cfa_mi_multisel") if st.button("▢️ 선택 경둜 μ μš©ν•˜μ—¬ CFA μž¬μ‹€ν–‰", key="cfa_rerun_btn"): _cfa_new_pairs = [] for s in _cfa_sel: part = s.split(" (MI")[0].strip() v1, v2 = [x.strip() for x in part.split("~~")] _cfa_new_pairs.append((v1, v2)) st.session_state["_cfa_manual_pairs"] = _cfa_new_pairs with st.spinner("CFA μž¬λΆ„μ„ 쀑..."): try: _cr = build_cfa_tables( st.session_state["_cfa_df"], st.session_state["_cfa_constructs"], max_mods=0, manual_extra_cov=_cfa_new_pairs) if _cr[0] is not None: st.session_state["_cfa_applied"] = _cr st.rerun() except Exception as _ce: st.warning(f"CFA μž¬μ‹€ν–‰ 였λ₯˜: {_ce}") col_mi1, col_mi2 = st.columns(2) with col_mi1: if init_mi is not None and not init_mi.empty: with st.expander("πŸ” 초기 μˆ˜μ •μ§€μˆ˜(MI) μƒμœ„ 20개"): d = init_mi.head(20).copy() d["νŒμ •"] = d["MI"].apply(lambda v: "⚠️ μˆ˜μ • κ³ λ €" if v > 3.84 else "") st.dataframe(d, use_container_width=True) with col_mi2: if final_mi is not None and not final_mi.empty: with st.expander("πŸ” μ΅œμ’… μˆ˜μ •μ§€μˆ˜(MI) μƒμœ„ 20개"): d = final_mi.head(20).copy() d["νŒμ •"] = d["MI"].apply(lambda v: "⚠️ μΆ”κ°€ μˆ˜μ • κ°€λŠ₯" if v > 3.84 else "βœ…") st.dataframe(d, use_container_width=True) st.markdown("---") st.markdown("**CFA κ²°κ³Ό** (μˆ˜μ •λͺ¨ν˜• κΈ°μ€€)") try: merged = loads.copy() merged["AVE"] = ""; merged["CR"] = ""; merged["Cronbach Ξ±"] = "" rel_map = rel_df.set_index("μž μž¬λ³€μˆ˜") seen = set() for idx, row in merged.iterrows(): lv = row["μž μž¬λ³€μˆ˜"] if lv not in seen: seen.add(lv) if lv in rel_map.index: merged.at[idx, "AVE"] = rel_map.loc[lv, "AVE"] merged.at[idx, "CR"] = rel_map.loc[lv, "CR"] merged.at[idx, "Cronbach Ξ±"] = rel_map.loc[lv, "Cronbach Ξ±"] disp_cols = ["μž μž¬λ³€μˆ˜", "μΈ‘μ •λ³€μˆ˜", "ν‘œμ€€ν™”κ³„μˆ˜", "SE", "tκ°’", "pκ°’", "AVE", "CR", "Cronbach Ξ±"] merged = merged[[c for c in disp_cols if c in merged.columns]] st.dataframe(merged, use_container_width=True, hide_index=True) except Exception: st.dataframe(loads, use_container_width=True) st.dataframe(rel_df, use_container_width=True) st.caption("ꢌμž₯: ν‘œμ€€ν™”κ³„μˆ˜ β‰₯ .50 | AVE β‰₯ .50 | CR β‰₯ .70 | Cronbach Ξ± β‰₯ .70") elif lbl == "상관관계": st.dataframe(c, use_container_width=True) st.caption("λŒ€κ°μ„  = √AVE | ν•˜μ‚Όκ° = μž μž¬λ³€μˆ˜ κ°„ μƒκ΄€κ³„μˆ˜") elif lbl == "SEM": paths, fit_init, fit_mod, mod_log, extra_cov, was_mod, init_mi_sem, final_mi_sem = c fit_col_order = ["χ²","df","χ²/df","p","NFI","RFI","IFI","TLI","CFI","RMSEA"] def _hl_sem(row): checks = {"NFI":.90,"RFI":.90,"IFI":.90,"TLI":.90,"CFI":.90} colors = [] for col in row.index: try: fv = float(row[col]) if col in checks: colors.append("background-color:#E8F5E9" if fv>=checks[col] else "background-color:#FFE0E0") elif col == "RMSEA": colors.append("background-color:#E8F5E9" if fv<0.049 else "background-color:#FFE0E0") else: colors.append("") except Exception: colors.append("") return colors st.markdown("**λͺ¨λΈ 적합도**") if was_mod: fit_cmp = pd.DataFrame([ {"λͺ¨ν˜•":"초기 λͺ¨ν˜•", **{k: fit_init.get(k,"-") for k in fit_col_order}}, {"λͺ¨ν˜•":"μˆ˜μ • λͺ¨ν˜•", **{k: fit_mod.get(k,"-") for k in fit_col_order}}, ]) else: fit_cmp = pd.DataFrame([ {"λͺ¨ν˜•":"초기 λͺ¨ν˜•", **{k: fit_init.get(k,"-") for k in fit_col_order}} ]) st.dataframe(fit_cmp.style.apply(_hl_sem, axis=1), use_container_width=True) st.caption(FIT_NOTE) if was_mod and extra_cov: st.success(f"βœ… MI 기반 μˆ˜μ • {len(mod_log)}회 적용") paths_str = " / ".join(f"{a} ~~ {b}" for a, b in extra_cov if isinstance(a, str)) st.markdown(f"**μˆ˜μ •λœ 경둜:** `{paths_str}`") with st.expander("πŸ“‹ μˆ˜μ • κ³Όμ • 상세"): st.dataframe(mod_log, use_container_width=True) elif not was_mod: if _adequate_check(fit_init): st.success("βœ… 초기 λͺ¨ν˜• 적합도 μ–‘ν˜Έ β€” μˆ˜μ • λΆˆν•„μš”") else: st.info("ℹ️ 적합도 λ―ΈμΆ©μ‘±μ΄λ‚˜ MI > 3.84 쌍이 μ—†μ–΄ 더 이상 μˆ˜μ • λΆˆκ°€") # SEM MI μˆ˜λ™ 선택 _init_mi_for_sel = st.session_state.get("_sem_init", (None,)*8)[6] if _init_mi_for_sel is not None and not _init_mi_for_sel.empty: mi_over = _init_mi_for_sel[_init_mi_for_sel["MI"] > 3.84] if not mi_over.empty: st.markdown("---") st.markdown("#### πŸ› οΈ μž”μ°¨κ³΅λΆ„μ‚° μˆ˜λ™ μΆ”κ°€") st.caption("MI > 3.84 쌍 쀑 μ μš©ν•  ν•­λͺ©μ„ μ„ νƒν•œ λ’€ **μž¬μ‹€ν–‰** λ²„νŠΌμ„ λˆŒλŸ¬μ£Όμ„Έμš”.") _opts = [f"{row['λ³€μˆ˜1']} ~~ {row['λ³€μˆ˜2']} (MI = {row['MI']})" for _, row in mi_over.iterrows()] _cur_pairs = st.session_state.get("_sem_manual_pairs", []) _cur_opts = [o for o in _opts if any(f"{v1} ~~ {v2}" in o for v1, v2 in _cur_pairs)] _sel = st.multiselect("μ μš©ν•  μž”μ°¨κ³΅λΆ„μ‚° 경둜 선택", options=_opts, default=_cur_opts, key="sem_mi_multisel") if st.button("▢️ 선택 경둜 μ μš©ν•˜μ—¬ SEM μž¬μ‹€ν–‰", key="sem_rerun_btn"): _new_pairs = [] for s in _sel: part = s.split(" (MI")[0].strip() v1, v2 = [x.strip() for x in part.split("~~")] _new_pairs.append((v1, v2)) st.session_state["_sem_manual_pairs"] = _new_pairs _cfa_cov = st.session_state.get("_sem_cfa_cov", []) _all_covs = list(_cfa_cov) + _new_pairs with st.spinner("SEM μž¬λΆ„μ„ 쀑..."): try: _r = build_lavaan_sem_table( st.session_state["_sem_df"], st.session_state["_sem_constructs"], st.session_state["_sem_hypotheses"], cfa_extra_cov=_all_covs, max_mods=0) if _r[0] is not None: _r2 = list(_r) _r2[4] = _all_covs _r2[5] = len(_new_pairs) > 0 st.session_state["_sem_applied"] = tuple(_r2) st.rerun() except Exception as _e: st.warning(f"SEM μž¬μ‹€ν–‰ 였λ₯˜: {_e}") col_mi1, col_mi2 = st.columns(2) with col_mi1: if init_mi_sem is not None and not init_mi_sem.empty: with st.expander("πŸ” 초기 μˆ˜μ •μ§€μˆ˜(MI) μƒμœ„ 20개"): d = init_mi_sem.head(20).copy() d["νŒμ •"] = d["MI"].apply(lambda v: "⚠️ μˆ˜μ • κ³ λ €" if v > 3.84 else "") st.dataframe(d, use_container_width=True) with col_mi2: if final_mi_sem is not None and not final_mi_sem.empty: with st.expander("πŸ” μ΅œμ’… μˆ˜μ •μ§€μˆ˜(MI) μƒμœ„ 20개"): d = final_mi_sem.head(20).copy() d["νŒμ •"] = d["MI"].apply(lambda v: "⚠️ μΆ”κ°€ μˆ˜μ • κ°€λŠ₯" if v > 3.84 else "βœ…") st.dataframe(d, use_container_width=True) st.markdown("---") st.markdown("**κ°€μ„€ 검증 κ²°κ³Ό** (μˆ˜μ •λͺ¨ν˜• κΈ°μ€€)") def hl(row): color = "#E8F5E9" if row.get("채택여뢀") == "채택" else "#FFF3F3" return [f"background-color:{color}"] * len(row) st.dataframe(paths.style.apply(hl, axis=1), use_container_width=True) n_adopt = (paths["채택여뢀"] == "채택").sum() st.markdown(f"**채택: {n_adopt}개 / 기각: {len(paths)-n_adopt}개 / 전체: {len(paths)}개**") # ── Excel λ‹€μš΄λ‘œλ“œ ───────────────────────────────────────────────────────────── st.markdown("---") xlsx_sheets = st.session_state.get("_xlsx_sheets", xlsx_sheets) if xlsx_sheets: try: _ts = datetime.now().strftime("%Y%m%d_%H%M%S") _fname = f"SEM_RESULT_{_ts}.xlsx" xlsx_bytes = build_single_sheet_excel(xlsx_sheets) st.download_button( label=f"πŸ“₯ {_fname} λ‹€μš΄λ‘œλ“œ (λͺ¨λ“  κ²°κ³Ό ν•œ μ‹œνŠΈ)", data=xlsx_bytes, file_name=_fname, mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", use_container_width=True, type="primary" ) st.caption("β€» λͺ¨λ“  뢄석 κ²°κ³Όκ°€ '뢄석결과' μ‹œνŠΈ ν•˜λ‚˜μ— μ„Ήμ…˜λ³„λ‘œ ν†΅ν•©λ˜μ–΄ μ €μž₯λ©λ‹ˆλ‹€.") except Exception as e: st.error(f"Excel 생성 였λ₯˜: {e}")