# ============================================================================= # π SEM λΆμκΈ° (λΉλλΆμ / κΈ°μ ν΅κ³ / μ λ’°λ / CFA / μκ΄κ΄κ³ / SEM) # μ€ν: python -m streamlit run sem.py # ============================================================================= import streamlit as st import pandas as pd import numpy as np import re from datetime import datetime from collections import defaultdict st.set_page_config(page_title="π SEM λΆμκΈ°", page_icon="π", layout="wide") # ββ λΉλ°λ²νΈ μ κΈ βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def _check_password(): try: correct_pw = st.secrets.get("PASSWORD", "9400") except Exception: correct_pw = "9400" def _submit(): if st.session_state.get("_pw_input") == correct_pw: st.session_state["_pw_ok"] = True else: st.session_state["_pw_ok"] = False if st.session_state.get("_pw_ok"): return True st.markdown("## π SEM λΆμκΈ°") st.text_input("λΉλ°λ²νΈλ₯Ό μ λ ₯νμΈμ", type="password", key="_pw_input", on_change=_submit) if "_pw_ok" in st.session_state and not st.session_state["_pw_ok"]: st.error("λΉλ°λ²νΈκ° νλ Έμ΅λλ€.") st.stop() _check_password() st.markdown(""" """, unsafe_allow_html=True) # ββ λͺ¨λ μν¬νΈ βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ try: from modules.utils import detect_scale, cronbach_alpha, calc_ave_cr, fmt_p, sig_stars, build_excel except Exception as e: st.error(f"λͺ¨λ λ‘λ μ€λ₯: {e}\n\nμ±κ³Ό κ°μ ν΄λμ modules/ ν΄λκ° μλμ§ νμΈνμΈμ.") st.stop() # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # μ νΈ ν¨μ # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def detect_outliers_mahalanobis(df, cols, p_threshold=0.001): from scipy import stats as sc data = df[cols].dropna() n, k = data.shape if n < k + 2: return None, None, [] try: mean = data.mean().values cov_mat = np.cov(data.values.T, ddof=1) try: cov_inv = np.linalg.inv(cov_mat) except np.linalg.LinAlgError: cov_inv = np.linalg.pinv(cov_mat) diffs = data.values - mean mahal = np.array([float(d @ cov_inv @ d) for d in diffs]) pvals = sc.chi2.sf(mahal, df=k) cutoff = sc.chi2.ppf(1 - p_threshold, df=k) result = pd.DataFrame({ "μλ³Έ νλ²νΈ": data.index + 1, "λ§ν λΌλ ΈλΉμ€ 거리": np.round(mahal, 3), "pκ°": np.round(pvals, 4), "μ΄μμΉ": mahal > cutoff, }).reset_index(drop=True) outlier_idx = data.index[mahal > cutoff].tolist() return result, round(cutoff, 3), outlier_idx except Exception: return None, None, [] def compute_smc(df, cols): data = df[cols].dropna() if len(data) < len(cols) + 2: return pd.DataFrame({"λ³μ": cols, "SMC": [np.nan]*len(cols), "νμ ": ["μ¬λ‘ μ λΆμ‘±"]*len(cols)}) rows = [] for col in cols: others = [c for c in cols if c != col] if not others: rows.append({"λ³μ": col, "SMC": np.nan, "νμ ": "-"}) continue try: X = data[others].values.astype(float) y = data[col].values.astype(float) Xc = np.column_stack([np.ones(len(X)), X]) b, _, _, _ = np.linalg.lstsq(Xc, y, rcond=None) yh = Xc @ b ss_res = float(np.sum((y - yh) ** 2)) ss_tot = float(np.sum((y - y.mean()) ** 2)) r2 = (1.0 - ss_res / ss_tot) if ss_tot > 0 else np.nan rows.append({ "λ³μ": col, "SMC": round(float(r2), 3), "νμ ": "β οΈ μ κ±° κ³ λ € (SMC < .20)" if (not np.isnan(r2) and r2 < 0.2) else "β μ μ" }) except Exception: rows.append({"λ³μ": col, "SMC": np.nan, "νμ ": "κ³μ° μ€λ₯"}) return pd.DataFrame(rows) def auto_detect_constructs(df): likert = [c for c in df.columns if detect_scale(df[c]) == "likert"] groups = defaultdict(list) for col in likert: m = re.match(r'^([A-Za-z]+\d*)(\d)$', col) groups[m.group(1) if m else col].append(col) return {k: v for k, v in groups.items() if len(v) >= 2} def _adequate_check(fit): return (fit.get("NFI",0)>=.90 and fit.get("RFI",0)>=.90 and fit.get("IFI",0)>=.90 and fit.get("TLI",0)>=.90 and fit.get("CFI",0)>=.90 and fit.get("RMSEA",1)<.049) def get_composite_scores(df, constructs): return pd.DataFrame( {lv: df[items].mean(axis=1) for lv, items in constructs.items()}) # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # semopy κΈ°λ° CFA (lavaan ν΄λ°±μ©) # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def calc_mi_approx(model, obs_df, obs_vars): try: n = len(obs_df) S = np.cov(obs_df[obs_vars].values.T, ddof=1) Sig, _ = model.calc_sigma() Sig = Sig + np.eye(len(obs_vars)) * 1e-8 Si = np.linalg.inv(Sig) G = Si @ (S - Sig) @ Si rows = [] p = len(obs_vars) for i in range(p): for j in range(i + 1, p): denom = 2 * Si[i, i] * Si[j, j] + 2 * Si[i, j] ** 2 mi = (n - 1) * G[i, j] ** 2 / max(denom, 1e-8) rows.append({"λ³μ1": obs_vars[i], "λ³μ2": obs_vars[j], "MI": round(float(mi), 3)}) return pd.DataFrame(rows).sort_values("MI", ascending=False).reset_index(drop=True) except Exception: return pd.DataFrame() def run_cfa_with_mi(df, constructs, mi_threshold=3.84, max_mods=200): try: from semopy import Model, calc_stats except ImportError: return None, "semopy ν¨ν€μ§κ° νμν©λλ€." try: item_to_lv = {item: lv for lv, items in constructs.items() for item in items} all_items = [i for items in constructs.values() for i in items] data = df[all_items].dropna() meas_lines = [f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] meas_str = "\n".join(meas_lines) lv_keys = list(constructs.keys()) lv_cov_lines = [f" {lv_keys[i]} ~~ {lv_keys[j]}" for i in range(len(lv_keys)) for j in range(i + 1, len(lv_keys))] meas_str = meas_str + ("\n" + "\n".join(lv_cov_lines) if lv_cov_lines else "") def _fit(model_str): try: m = Model(model_str); m.fit(data); return m except Exception: return None base_str = meas_str for _var_syntax in [ "\n".join(f" {lv} ~~ 1*{lv}" for lv in constructs.keys()), "\n".join(f" {lv} ~~ 1 * {lv}" for lv in constructs.keys()), "\n".join(f" {lv} ~~ 1@{lv}" for lv in constructs.keys()), ]: _candidate = meas_str + "\n" + _var_syntax if _fit(_candidate) is not None: base_str = _candidate break def _extract_fit(m): try: sd = calc_stats(m).iloc[0].to_dict() chi2 = float(sd.get("chi2", 0)) dof = float(sd.get("DoF", 1)) chi2_bl = float(sd.get("chi2 Baseline", 0)) dof_bl = float(sd.get("DoF Baseline", 1)) pval = float(sd.get("chi2 p-value", 1)) cfi = float(sd.get("CFI", 0)) tli = float(sd.get("TLI", 0)) nfi = float(sd.get("NFI", 0)) rmsea = float(sd.get("RMSEA", 1)) rfi = ((chi2_bl/dof_bl - chi2/dof) / (chi2_bl/dof_bl) if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) ifi = ((chi2_bl - chi2) / (chi2_bl - dof) if (chi2_bl - dof) > 0 else 0.0) return {"ΟΒ²": round(chi2,3), "df": int(dof), "ΟΒ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3), "NFI": round(nfi,3), "RFI": round(rfi,3), "IFI": round(ifi,3), "TLI": round(tli,3), "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} except Exception: return {} m0 = _fit(base_str) if m0 is None: return None, "μ΄κΈ° CFA μΆμ μ€ν¨" fit0 = _extract_fit(m0) mi_df0 = calc_mi_approx(m0, data, all_items) extra_cov = [] added_pairs = set() mod_log = [] m_cur, fit_cur, mi_cur = m0, fit0.copy(), mi_df0.copy() for step in range(max_mods): if _adequate_check(fit_cur): break high_mi = mi_cur[mi_cur["MI"] > mi_threshold] if high_mi.empty: break avail = high_mi[ high_mi.apply( lambda r: tuple(sorted([r["λ³μ1"], r["λ³μ2"]])) not in added_pairs, axis=1)] if avail.empty: break row = avail.iloc[0] v1, v2 = row["λ³μ1"], row["λ³μ2"] pair = tuple(sorted([v1, v2])) new_cov = extra_cov + [(v1, v2)] new_str = (base_str + "\n" + "\n".join(f" {a} ~~ {b}" for a, b in new_cov)) m_new = _fit(new_str) if m_new is None: added_pairs.add(pair); continue fit_new = _extract_fit(m_new) lv1 = item_to_lv.get(v1, v1) lv2 = item_to_lv.get(v2, v2) mod_log.append({ "λ¨κ³": step + 1, "μμ κ²½λ‘": f"{v1} ~~ {v2}", "μμ μμΈ": lv1 if lv1 == lv2 else f"{lv1}β{lv2}", "MI": round(row["MI"], 3), "CFI μ βν": f"{fit_cur.get('CFI','-')} β {fit_new.get('CFI','-')}", "RMSEA μ βν": f"{fit_cur.get('RMSEA','-')} β {fit_new.get('RMSEA','-')}", }) extra_cov = new_cov m_cur, fit_cur = m_new, fit_new added_pairs.add(pair) mi_cur = calc_mi_approx(m_cur, data, all_items) ins = m_cur.inspect(std_est=True) std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) final_mi = calc_mi_approx(m_cur, data, all_items) return { "init_model": m0, "mod_model": m_cur, "init_fit": fit0, "mod_fit": fit_cur, "init_mi": mi_df0, "final_mi": final_mi, "mod_log": pd.DataFrame(mod_log), "extra_cov": extra_cov, "was_modified": len(extra_cov) > 0, "ins": ins, "std_col": std_col, "data": data, "all_items": all_items, "base_str": base_str, }, None except Exception as e: return None, f"CFA MI μ€λ₯: {e}" def run_cfa_sem(df, constructs, hypotheses=None): try: from semopy import Model, calc_stats except ImportError: return None, None, None, None try: mm = "\n".join(f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()) if hypotheses: deps = defaultdict(list) for s, t in hypotheses: deps[t].append(s) mm += "\n" + "\n".join(f" {t} ~ {' + '.join(ss)}" for t, ss in deps.items()) all_items = [i for items in constructs.values() for i in items] data = df[all_items].dropna() model = Model(mm); model.fit(data) ins = model.inspect(std_est=True) std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) stats = calc_stats(model) try: stats_dict = stats.iloc[0].to_dict() if hasattr(stats, "iloc") else dict(stats) except Exception: stats_dict = {} try: chi2 = float(stats_dict.get("chi2", 0)) dof = float(stats_dict.get("DoF", 1)) chi2_bl = float(stats_dict.get("chi2 Baseline", 0)) dof_bl = float(stats_dict.get("DoF Baseline", 1)) pval = float(stats_dict.get("chi2 p-value", 1)) cfi = float(stats_dict.get("CFI", 0)) tli = float(stats_dict.get("TLI", 0)) nfi = float(stats_dict.get("NFI", 0)) rmsea = float(stats_dict.get("RMSEA", 1)) rfi = ((chi2_bl/dof_bl - chi2/dof)/(chi2_bl/dof_bl) if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) ifi = ((chi2_bl - chi2)/(chi2_bl - dof) if (chi2_bl - dof) > 0 else 0.0) fit = {"ΟΒ²": round(chi2,3), "df": int(dof), "ΟΒ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3), "NFI": round(nfi,3), "RFI": round(rfi,3), "IFI": round(ifi,3), "TLI": round(tli,3), "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} except Exception: fit = {} return ins, std_col, stats_dict, fit except Exception: return None, None, None, None def _extract_load_df(ins, std_col, constructs): lv_names = set(constructs.keys()) all_items = set(i for v in constructs.values() for i in v) if (ins["op"] == "=~").any(): load_df = ins[ins["op"] == "=~"].copy() lv_col, ind_col = "lval", "rval" else: load_df = ins[ins["lval"].isin(all_items) & ins["rval"].isin(lv_names)].copy() lv_col, ind_col = "rval", "lval" cols_needed = [c for c in [lv_col, ind_col, std_col, "Std. Err", "z-value", "p-value"] if c in load_df.columns] load_df = load_df[cols_needed].copy() rename_map = {lv_col: "μ μ¬λ³μ", ind_col: "μΈ‘μ λ³μ", std_col: "νμ€νκ³μ", "Std. Err": "SE", "z-value": "tκ°", "p-value": "pκ°_raw"} load_df = load_df.rename(columns=rename_map) if "pκ°_raw" in load_df.columns: load_df["pκ°"] = load_df["pκ°_raw"].apply( lambda p: fmt_p(p) if str(p).strip() not in ("-", "") else "-") load_df = load_df.drop(columns=["pκ°_raw"]) load_df["νμ€νκ³μ"] = pd.to_numeric(load_df["νμ€νκ³μ"], errors="coerce").round(3) for col in ["SE", "tκ°"]: if col in load_df.columns: load_df[col] = load_df[col].apply( lambda v: round(float(v), 3) if pd.notna(v) and str(v).strip() not in ("None", "", "nan", "-") else "-") return load_df.reset_index(drop=True), lv_col # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # lavaan (R) κΈ°λ° CFA / SEM # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def _get_rscript_path(): import subprocess, os, glob try: r = subprocess.run(['Rscript', '--version'], capture_output=True, timeout=10) if r.returncode == 0: return 'Rscript' except Exception: pass candidates = glob.glob(r'C:\Program Files\R\R-*\bin\Rscript.exe') candidates += glob.glob(r'C:\Program Files (x86)\R\R-*\bin\Rscript.exe') if candidates: candidates.sort(reverse=True) return candidates[0] return None def _rscript_available(): return _get_rscript_path() is not None def _parse_fit_csv(path): df = pd.read_csv(path) if df.empty: return {} row = df.iloc[0] chi2 = float(row.get('chisq', 0)) dof = float(row.get('df', 1)) return { 'ΟΒ²': round(chi2, 3), 'df': int(dof), 'ΟΒ²/df': round(chi2 / dof, 3) if dof > 0 else '-', 'p': round(float(row.get('pvalue', 1)), 6), 'NFI': round(float(row.get('nfi', 0)), 3), 'RFI': round(float(row.get('rfi', 0)), 3), 'IFI': round(float(row.get('ifi', 0)), 3), 'TLI': round(float(row.get('tli', 0)), 3), 'CFI': round(float(row.get('cfi', 0)), 3), 'RMSEA': round(float(row.get('rmsea', 1)), 3), } def _fmt_mi_df(df): if df is None or df.empty: return pd.DataFrame(columns=['λ³μ1', 'λ³μ2', 'MI']) df = df.copy() df.columns = ['λ³μ1', 'λ³μ2', 'MI'] df['MI'] = pd.to_numeric(df['MI'], errors='coerce').round(3) return df.sort_values('MI', ascending=False).reset_index(drop=True) def run_lavaan_cfa_with_mi(df, constructs, mi_threshold=3.84, max_mods=200, manual_extra_cov=None): import subprocess, tempfile, os if not _rscript_available(): return None, "Rscriptλ₯Ό μ°Ύμ μ μμ. R μ€μΉ ν PATH λ±λ‘ νμ." all_items = [i for items in constructs.values() for i in items] item_to_lv = {item: lv for lv, items in constructs.items() for item in items} lv_names = list(constructs.keys()) data = df[all_items].dropna() meas_lines = [f"{lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] manual_pairs = list(manual_extra_cov) if manual_extra_cov else [] extra_lines = [f"{v1} ~~ {v2}" for v1, v2 in manual_pairs] model_base = "\n".join(meas_lines + extra_lines) with tempfile.TemporaryDirectory() as tmpdir: def p(name): return os.path.join(tmpdir, name).replace('\\', '/') data.to_csv(p('data.csv'), index=False) with open(p('model.txt'), 'w', encoding='utf-8') as f: f.write(model_base) with open(p('lvnames.txt'), 'w', encoding='utf-8') as f: f.write('\n'.join(lv_names)) r_script = f''' suppressPackageStartupMessages(library(lavaan)) options(warn=-1) out <- "{p("")}" data <- read.csv(file.path(out,"data.csv")) mdl_base <- paste(readLines(file.path(out,"model.txt"), encoding="UTF-8"), collapse="\\n") lv_names <- readLines(file.path(out,"lvnames.txt"), encoding="UTF-8") mi_thr <- {mi_threshold} max_mods <- {max_mods} get_fit <- function(fit) {{ fm <- fitMeasures(fit, c("chisq","df","pvalue","cfi","tli","nfi","rfi","ifi","rmsea")) data.frame(as.list(fm)) }} is_ok <- function(fit) {{ fm <- fitMeasures(fit, c("cfi","rmsea")) unname(fm["cfi"]) >= 0.95 && unname(fm["rmsea"]) < 0.049 }} get_mi_cov <- function(fit) {{ m <- tryCatch(modindices(fit, sort.=TRUE, maximum.number=500), error=function(e) NULL) if (is.null(m)) return(data.frame(lhs=character(),rhs=character(),mi=numeric())) m <- m[m$op=="~~", c("lhs","rhs","mi")] m[!(m$lhs %in% lv_names & m$rhs %in% lv_names), ] }} fit0 <- tryCatch(cfa(mdl_base, data=data, estimator="ML"), error=function(e) NULL) if (is.null(fit0)) {{ writeLines("ERROR: CFA failed", file.path(out,"status.txt")); quit(status=1) }} write.csv(get_fit(fit0), file.path(out,"fit_init.csv"), row.names=FALSE) mi0 <- get_mi_cov(fit0) write.csv(mi0, file.path(out,"mi_init.csv"), row.names=FALSE) added <- character(0); extra <- character(0); log_rows <- list() fit_cur <- fit0; mdl_cur <- mdl_base for (step in seq_len(max_mods)) {{ if (is_ok(fit_cur)) break mi_c <- get_mi_cov(fit_cur) mi_c <- mi_c[mi_c$mi > mi_thr, ] if (nrow(mi_c)==0) break ok <- sapply(seq_len(nrow(mi_c)), function(i) {{ pk <- paste(sort(c(mi_c$lhs[i], mi_c$rhs[i])), collapse="~~") !(pk %in% added) }}) mi_c <- mi_c[ok, ] if (nrow(mi_c)==0) break v1 <- mi_c$lhs[1]; v2 <- mi_c$rhs[1]; mv <- mi_c$mi[1] pk <- paste(sort(c(v1,v2)), collapse="~~") mdl_new <- paste0(mdl_cur,"\\n ",v1," ~~ ",v2) fit_new <- tryCatch(cfa(mdl_new, data=data, estimator="ML"), error=function(e) NULL) added <- c(added, pk) if (is.null(fit_new)) next fm_b <- fitMeasures(fit_cur, c("cfi","rmsea","rfi","ifi")) fm_a <- fitMeasures(fit_new, c("cfi","rmsea","rfi","ifi")) log_rows[[length(log_rows)+1]] <- data.frame( step=step, v1=v1, v2=v2, mi=round(mv,3), cfi_b=round(unname(fm_b["cfi"]),3), cfi_a=round(unname(fm_a["cfi"]),3), rmsea_b=round(unname(fm_b["rmsea"]),3),rmsea_a=round(unname(fm_a["rmsea"]),3), rfi_b=round(unname(fm_b["rfi"]),3), rfi_a=round(unname(fm_a["rfi"]),3), ifi_b=round(unname(fm_b["ifi"]),3), ifi_a=round(unname(fm_a["ifi"]),3), stringsAsFactors=FALSE) extra <- c(extra, paste(v1, v2, sep=",")) fit_cur <- fit_new; mdl_cur <- mdl_new }} write.csv(get_fit(fit_cur), file.path(out,"fit_mod.csv"), row.names=FALSE) write.csv(as.data.frame(standardizedSolution(fit_cur)), file.path(out,"std_sol.csv"), row.names=FALSE) write.csv(get_mi_cov(fit_cur), file.path(out,"mi_final.csv"), row.names=FALSE) if (length(extra)>0) writeLines(extra, file.path(out,"extra_cov.txt")) if (length(log_rows)>0) {{ write.csv(do.call(rbind, log_rows), file.path(out,"mod_log.csv"), row.names=FALSE) }} writeLines("SUCCESS", file.path(out,"status.txt")) ''' with open(p('cfa.R'), 'w', encoding='utf-8') as f: f.write(r_script) rscript = _get_rscript_path() try: res = subprocess.run([rscript, '--vanilla', p('cfa.R')], capture_output=True, text=True, timeout=300) except subprocess.TimeoutExpired: return None, "R μ€ν μκ° μ΄κ³Ό (5λΆ)" except Exception as ex: return None, f"Rscript μ€ν μ€λ₯: {ex}" if not os.path.exists(os.path.join(tmpdir, 'status.txt')): err = res.stderr[-2000:] if res.stderr else "μ μ μλ μ€λ₯" return None, f"R μ€λ₯:\n{err}" try: fit0 = _parse_fit_csv(os.path.join(tmpdir, 'fit_init.csv')) fit_mod = _parse_fit_csv(os.path.join(tmpdir, 'fit_mod.csv')) std_sol = pd.read_csv(os.path.join(tmpdir, 'std_sol.csv')) mi_init = _fmt_mi_df(pd.read_csv(os.path.join(tmpdir, 'mi_init.csv'))) mf_path = os.path.join(tmpdir, 'mi_final.csv') mi_final = _fmt_mi_df(pd.read_csv(mf_path)) if os.path.exists(mf_path) else pd.DataFrame() except Exception as ex: return None, f"κ²°κ³Ό νμ± μ€λ₯: {ex}" extra_cov = list(manual_pairs) ec_path = os.path.join(tmpdir, 'extra_cov.txt') if os.path.exists(ec_path): with open(ec_path, encoding='utf-8') as f: for line in f: parts = line.strip().split(',') if len(parts) == 2: t = tuple(parts) if t not in extra_cov: extra_cov.append(t) mod_log_records = [] ml_path = os.path.join(tmpdir, 'mod_log.csv') if os.path.exists(ml_path): log_df = pd.read_csv(ml_path) for _, row in log_df.iterrows(): lv1 = item_to_lv.get(str(row['v1']), str(row['v1'])) lv2 = item_to_lv.get(str(row['v2']), str(row['v2'])) mod_log_records.append({ 'λ¨κ³': int(row['step']), 'μμ κ²½λ‘': f"{row['v1']} ~~ {row['v2']}", 'μμ μμΈ': lv1 if lv1 == lv2 else f'{lv1}β{lv2}', 'MI': row['mi'], 'CFI μ βν': f"{row['cfi_b']} β {row['cfi_a']}", 'RMSEA μ βν': f"{row['rmsea_b']} β {row['rmsea_a']}", 'RFI μ βν': f"{row.get('rfi_b', '-')} β {row.get('rfi_a', '-')}", 'IFI μ βν': f"{row.get('ifi_b', '-')} β {row.get('ifi_a', '-')}", }) return { 'init_fit': fit0, 'mod_fit': fit_mod, 'init_mi': mi_init, 'final_mi': mi_final, 'mod_log': pd.DataFrame(mod_log_records), 'extra_cov': extra_cov, 'was_modified': len(extra_cov) > 0, 'std_sol': std_sol, 'data': data, 'all_items': all_items, 'lv_names': lv_names, }, None def _extract_load_df_lavaan(std_sol, constructs): try: loads = std_sol[std_sol['op'] == '=~'].copy() cols = [c for c in ['lhs', 'rhs', 'est.std', 'se', 'z', 'pvalue'] if c in loads.columns] loads = loads[cols].copy() rename = {'lhs': 'μ μ¬λ³μ', 'rhs': 'μΈ‘μ λ³μ', 'est.std': 'νμ€νκ³μ', 'se': 'SE', 'z': 'tκ°', 'pvalue': 'pκ°_raw'} loads = loads.rename(columns=rename) loads['νμ€νκ³μ'] = pd.to_numeric(loads['νμ€νκ³μ'], errors='coerce').round(3) loads['SE'] = pd.to_numeric(loads['SE'], errors='coerce').round(3) loads['tκ°'] = pd.to_numeric(loads['tκ°'], errors='coerce').round(3) if 'pκ°_raw' in loads.columns: loads['pκ°'] = loads['pκ°_raw'].apply( lambda p: fmt_p(float(p)) if pd.notna(p) and str(p).strip() not in ('NA', '', 'nan') else '-') loads = loads.drop(columns=['pκ°_raw']) return loads.reset_index(drop=True) except Exception: return pd.DataFrame() def build_cfa_tables(df, constructs, mi_threshold=3.84, max_mods=200, manual_extra_cov=None): result, err = run_lavaan_cfa_with_mi(df, constructs, mi_threshold, max_mods, manual_extra_cov=manual_extra_cov) use_lavaan = (result is not None) if not use_lavaan: st.warning(f"β οΈ lavaan μ€ν¨ β semopy μ¬μ©\n\nμμΈ: {err}") result, err = run_cfa_with_mi(df, constructs, mi_threshold, max_mods) if result is None: st.error(err or "CFA μ€ν μ€ν¨") return None, None, None, None, None, None, None, False, [] try: if use_lavaan: std_sol = result["std_sol"] load_df = _extract_load_df_lavaan(std_sol, constructs) rel_rows = [] for lv, items in constructs.items(): lv_loads = std_sol[(std_sol['op'] == '=~') & (std_sol['lhs'] == lv)] lam = pd.to_numeric(lv_loads['est.std'], errors='coerce').dropna().values ave, cr = calc_ave_cr(lam) alpha = cronbach_alpha(df[items]) rel_rows.append({"μ μ¬λ³μ": lv, "AVE": ave, "CR": cr, "Cronbach Ξ±": alpha}) else: ins = result["ins"] std_col = result["std_col"] load_df, lv_col = _extract_load_df(ins, std_col, constructs) rel_rows = [] for lv, items in constructs.items(): raw = ins[ins[lv_col] == lv][std_col].values lam = np.array([v for v in raw if str(v).strip() not in ("-","","nan","None")], dtype=float) ave, cr = calc_ave_cr(lam) alpha = cronbach_alpha(df[items]) rel_rows.append({"μ μ¬λ³μ": lv, "AVE": ave, "CR": cr, "Cronbach Ξ±": alpha}) return (load_df, pd.DataFrame(rel_rows), result["init_fit"], result["mod_fit"], result["init_mi"], result.get("final_mi", pd.DataFrame()), result["mod_log"], result["was_modified"], result["extra_cov"]) except Exception as e: st.error(f"CFA κ²°κ³Ό μ²λ¦¬ μ€λ₯: {e}") return None, None, None, None, None, None, None, False, [] def build_lavaan_sem_table(df, constructs, hypotheses, cfa_extra_cov=None, mi_threshold=3.84, max_mods=200): import subprocess, tempfile, os if not _rscript_available(): return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) all_items = [i for items in constructs.values() for i in items] lv_names = list(constructs.keys()) lv_set = set(lv_names) item_to_lv = {item: lv for lv, items in constructs.items() for item in items} data = df[all_items].dropna() cfa_cov_pairs = (cfa_extra_cov if isinstance(cfa_extra_cov, list) and (not cfa_extra_cov or isinstance(cfa_extra_cov[0], tuple)) else []) deps = defaultdict(list) for s, t in hypotheses: deps[t].append(s) with tempfile.TemporaryDirectory() as tmpdir: def p(name): return os.path.join(tmpdir, name).replace('\\', '/') data.to_csv(p('data.csv'), index=False) with open(p('meas.txt'), 'w', encoding='utf-8') as f: f.write('\n'.join(f"{lv} =~ {' + '.join(items)}" for lv, items in constructs.items())) with open(p('struct.txt'), 'w', encoding='utf-8') as f: f.write('\n'.join(f"{t} ~ {' + '.join(ss)}" for t, ss in deps.items())) with open(p('lvnames.txt'), 'w', encoding='utf-8') as f: f.write('\n'.join(lv_names)) with open(p('cfa_cov.txt'), 'w', encoding='utf-8') as f: for v1, v2 in cfa_cov_pairs: f.write(f'{v1},{v2}\n') r_script = f''' suppressPackageStartupMessages(library(lavaan)) options(warn=-1) out <- "{p("")}" data <- read.csv(file.path(out,"data.csv")) meas_lines <- readLines(file.path(out,"meas.txt"), encoding="UTF-8") struct_lines <- readLines(file.path(out,"struct.txt"), encoding="UTF-8") lv_names <- readLines(file.path(out,"lvnames.txt"),encoding="UTF-8") cfa_cov_raw <- tryCatch(readLines(file.path(out,"cfa_cov.txt"),encoding="UTF-8"), error=function(e) character(0)) mi_thr <- {mi_threshold} max_mods <- {max_mods} build_model <- function(extra_str=character(0)) {{ cov_lines <- character(0) all_covs <- c(cfa_cov_raw, extra_str) for (cv in all_covs) {{ pts <- strsplit(trimws(cv),",")[[1]] if (length(pts)==2) {{ ln <- paste0(" ",pts[1]," ~~ ",pts[2]) if (!(ln %in% cov_lines)) cov_lines <- c(cov_lines, ln) }} }} paste(c(paste0(" ",meas_lines), cov_lines, paste0(" ",struct_lines)), collapse="\\n") }} get_fit <- function(fit) {{ fm <- fitMeasures(fit, c("chisq","df","pvalue","cfi","tli","nfi","rfi","ifi","rmsea")) data.frame(as.list(fm)) }} is_ok <- function(fit) {{ fm <- fitMeasures(fit, c("cfi","rmsea")) unname(fm["cfi"]) >= 0.95 && unname(fm["rmsea"]) < 0.049 }} get_mi_cov <- function(fit) {{ m <- tryCatch(modindices(fit,sort.=TRUE,maximum.number=500),error=function(e) NULL) if (is.null(m)) return(data.frame(lhs=character(),rhs=character(),mi=numeric())) m <- m[m$op=="~~", c("lhs","rhs","mi")] m[!(m$lhs %in% lv_names & m$rhs %in% lv_names), ] }} fit0 <- tryCatch(sem(build_model(), data=data, estimator="ML"), error=function(e) NULL) if (is.null(fit0)) {{ writeLines("ERROR: SEM initial fit failed", file.path(out,"status.txt")); quit(status=1) }} write.csv(get_fit(fit0), file.path(out,"fit_init.csv"), row.names=FALSE) mi0 <- get_mi_cov(fit0) write.csv(mi0, file.path(out,"mi_init.csv"), row.names=FALSE) added <- character(0); extra <- character(0); log_rows <- list() fit_cur <- fit0 for (step in seq_len(max_mods)) {{ if (is_ok(fit_cur)) break mi_c <- get_mi_cov(fit_cur) mi_c <- mi_c[mi_c$mi > mi_thr, ] if (nrow(mi_c)==0) break ok <- sapply(seq_len(nrow(mi_c)), function(i) {{ pk <- paste(sort(c(mi_c$lhs[i],mi_c$rhs[i])),collapse="~~") !(pk %in% added) }}) mi_c <- mi_c[ok,] if (nrow(mi_c)==0) break v1<-mi_c$lhs[1]; v2<-mi_c$rhs[1]; mv<-mi_c$mi[1] pk <- paste(sort(c(v1,v2)),collapse="~~") extra_new <- c(extra, paste(v1,v2,sep=",")) fit_new <- tryCatch(sem(build_model(extra_new),data=data,estimator="ML"),error=function(e) NULL) added <- c(added, pk) if (is.null(fit_new)) next fm_b <- fitMeasures(fit_cur,c("cfi","rmsea","rfi","ifi")) fm_a <- fitMeasures(fit_new,c("cfi","rmsea","rfi","ifi")) log_rows[[length(log_rows)+1]] <- data.frame( step=step, v1=v1, v2=v2, mi=round(mv,3), cfi_b=round(unname(fm_b["cfi"]),3), cfi_a=round(unname(fm_a["cfi"]),3), rmsea_b=round(unname(fm_b["rmsea"]),3),rmsea_a=round(unname(fm_a["rmsea"]),3), rfi_b=round(unname(fm_b["rfi"]),3), rfi_a=round(unname(fm_a["rfi"]),3), stringsAsFactors=FALSE) extra <- extra_new fit_cur <- fit_new }} write.csv(get_fit(fit_cur), file.path(out,"fit_mod.csv"), row.names=FALSE) write.csv(as.data.frame(standardizedSolution(fit_cur)), file.path(out,"std_sol.csv"), row.names=FALSE) write.csv(get_mi_cov(fit_cur), file.path(out,"mi_final.csv"), row.names=FALSE) if (length(extra)>0) writeLines(extra, file.path(out,"extra_cov.txt")) if (length(log_rows)>0) write.csv(do.call(rbind,log_rows), file.path(out,"mod_log.csv"), row.names=FALSE) writeLines("SUCCESS", file.path(out,"status.txt")) ''' with open(p('sem.R'), 'w', encoding='utf-8') as f: f.write(r_script) rscript = _get_rscript_path() try: res = subprocess.run([rscript, '--vanilla', p('sem.R')], capture_output=True, text=True, timeout=300) except Exception: return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) if not os.path.exists(os.path.join(tmpdir, 'status.txt')): return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) try: fit0 = _parse_fit_csv(os.path.join(tmpdir, 'fit_init.csv')) fit_mod = _parse_fit_csv(os.path.join(tmpdir, 'fit_mod.csv')) std_sol = pd.read_csv(os.path.join(tmpdir, 'std_sol.csv')) mi_init = _fmt_mi_df(pd.read_csv(os.path.join(tmpdir, 'mi_init.csv'))) mf_path = os.path.join(tmpdir, 'mi_final.csv') mi_final = _fmt_mi_df(pd.read_csv(mf_path)) if os.path.exists(mf_path) else pd.DataFrame() except Exception: return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) extra_cov = list(cfa_cov_pairs) ec_path = os.path.join(tmpdir, 'extra_cov.txt') if os.path.exists(ec_path): with open(ec_path, encoding='utf-8') as f: for line in f: parts = line.strip().split(',') if len(parts) == 2: t = tuple(parts) if t not in extra_cov: extra_cov.append(t) mod_log_records = [] ml_path = os.path.join(tmpdir, 'mod_log.csv') if os.path.exists(ml_path): log_df = pd.read_csv(ml_path) for _, row in log_df.iterrows(): lv1 = item_to_lv.get(str(row['v1']), str(row['v1'])) lv2 = item_to_lv.get(str(row['v2']), str(row['v2'])) mod_log_records.append({ 'λ¨κ³': int(row['step']), 'μμ κ²½λ‘': f"{row['v1']} ~~ {row['v2']}", 'μμ μμΈ': lv1 if lv1 == lv2 else f'{lv1}β{lv2}', 'MI': row['mi'], 'CFI μ βν': f"{row['cfi_b']} β {row['cfi_a']}", 'RMSEA μ βν': f"{row['rmsea_b']} β {row['rmsea_a']}", }) def _sp(v): try: return float(v) if str(v).strip() not in ('NA', '', 'nan') else np.nan except Exception: return np.nan struct = std_sol[(std_sol['op'] == '~') & std_sol['lhs'].isin(lv_set) & std_sol['rhs'].isin(lv_set)].copy() hyp_map = {(s, t): f"H{i+1}" for i, (s, t) in enumerate(hypotheses)} cols_need = [c for c in ['lhs','rhs','est.std','se','z','pvalue'] if c in struct.columns] struct = struct[cols_need].copy() struct.columns = ['μ’ μλ³μ','λ 립λ³μ','νμ€νΞ²','SE','tκ°','pκ°_raw'][:len(cols_need)] struct['κ°μ€'] = struct.apply(lambda r: hyp_map.get((r['λ 립λ³μ'], r['μ’ μλ³μ']), '-'), axis=1) struct['κ²½λ‘'] = struct['λ 립λ³μ'] + ' β ' + struct['μ’ μλ³μ'] struct['μ μμ±'] = struct['pκ°_raw'].apply(lambda v: sig_stars(_sp(v))) struct['μ±νμ¬λΆ'] = struct['pκ°_raw'].apply( lambda v: ('μ±ν' if _sp(v) < 0.05 else 'κΈ°κ°') if not np.isnan(_sp(v)) else '-') struct['pκ°'] = struct['pκ°_raw'].apply( lambda v: fmt_p(_sp(v)) if not np.isnan(_sp(v)) else '-') for col in ['νμ€νΞ²', 'SE', 'tκ°']: struct[col] = pd.to_numeric(struct[col], errors='coerce').round(3) struct = struct.drop(columns=['pκ°_raw']) out_cols = ['κ°μ€','κ²½λ‘','νμ€νΞ²','SE','tκ°','pκ°','μ μμ±','μ±νμ¬λΆ'] path_df = struct[[c for c in out_cols if c in struct.columns]].reset_index(drop=True) was_mod = len(mod_log_records) > 0 return (path_df, fit0, fit_mod, pd.DataFrame(mod_log_records), extra_cov, was_mod, mi_init, mi_final) def _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov=None, mi_threshold=3.84, max_mods=200): """semopy κΈ°λ° SEM (lavaan ν΄λ°±)""" try: from semopy import Model, calc_stats except ImportError: st.error("semopy ν¨ν€μ§κ° νμν©λλ€.") return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() try: item_to_lv = {item: lv for lv, items in constructs.items() for item in items} all_items = [i for items in constructs.values() for i in items] lv_names = set(constructs.keys()) data = df[all_items].dropna() cfa_cov_pairs = (cfa_extra_cov if isinstance(cfa_extra_cov, list) and (not cfa_extra_cov or isinstance(cfa_extra_cov[0], tuple)) else []) meas_lines = [f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] meas_str = "\n".join(meas_lines) lv_keys = list(constructs.keys()) lv_cov_lines = [f" {lv_keys[i]} ~~ {lv_keys[j]}" for i in range(len(lv_keys)) for j in range(i+1, len(lv_keys))] meas_str = meas_str + ("\n" + "\n".join(lv_cov_lines) if lv_cov_lines else "") var_str = "\n".join(f" {lv} ~~ 1*{lv}" for lv in constructs.keys()) deps = defaultdict(list) for s, t in hypotheses: deps[t].append(s) struct_str = "\n".join(f" {t} ~ {' + '.join(ss)}" for t, ss in deps.items()) cov_part = ("\n" + "\n".join(f" {a} ~~ {b}" for a, b in cfa_cov_pairs) if cfa_cov_pairs else "") def _fit_model(model_str): try: m = Model(model_str); m.fit(data); return m except Exception: return None def _extract_fit(m): try: sd = calc_stats(m).iloc[0].to_dict() chi2 = float(sd.get("chi2", 0)) dof = float(sd.get("DoF", 1)) chi2_bl = float(sd.get("chi2 Baseline", 0)) dof_bl = float(sd.get("DoF Baseline", 1)) pval = float(sd.get("chi2 p-value", 1)) cfi = float(sd.get("CFI", 0)) tli = float(sd.get("TLI", 0)) nfi = float(sd.get("NFI", 0)) rmsea = float(sd.get("RMSEA", 1)) rfi = ((chi2_bl/dof_bl - chi2/dof)/(chi2_bl/dof_bl) if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) ifi = ((chi2_bl - chi2)/(chi2_bl - dof) if (chi2_bl - dof) > 0 else 0.0) return {"ΟΒ²": round(chi2,3), "df": int(dof), "ΟΒ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3), "NFI": round(nfi,3), "RFI": round(rfi,3), "IFI": round(ifi,3), "TLI": round(tli,3), "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} except Exception: return {} def _extract_paths(m): ins = m.inspect(std_est=True) std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) struct = ins[ins["lval"].isin(lv_names) & ins["rval"].isin(lv_names)].copy() cols = [c for c in ["lval","rval",std_col,"Std. Err","z-value","p-value"] if c in struct.columns] struct = struct[cols].copy() rm = {"lval":"μ’ μλ³μ","rval":"λ 립λ³μ", std_col:"νμ€νΞ²", "Std. Err":"SE", "z-value":"tκ°", "p-value":"pκ°_raw"} struct = struct.rename(columns=rm) hyp_map = {(s,t): f"H{i+1}" for i,(s,t) in enumerate(hypotheses)} struct["κ°μ€"] = struct.apply( lambda r: hyp_map.get((r["λ 립λ³μ"],r["μ’ μλ³μ"]),"-"), axis=1) struct["κ²½λ‘"] = struct["λ 립λ³μ"] + " β " + struct["μ’ μλ³μ"] def _sp(p): try: return float(p) if str(p).strip() not in("-","","nan") else np.nan except: return np.nan struct["μ μμ±"] = struct["pκ°_raw"].apply(lambda p: sig_stars(_sp(p))) struct["μ±νμ¬λΆ"] = struct["pκ°_raw"].apply( lambda p: ("μ±ν" if _sp(p)<0.05 else "κΈ°κ°") if not np.isnan(_sp(p)) else "-") struct["pκ°"] = struct["pκ°_raw"].apply( lambda p: fmt_p(_sp(p)) if not np.isnan(_sp(p)) else "-") for col in [c for c in ["νμ€νΞ²","SE","tκ°"] if c in struct.columns]: struct[col] = pd.to_numeric(struct[col], errors="coerce").round(3) out = ["κ°μ€","κ²½λ‘","νμ€νΞ²","SE","tκ°","pκ°","μ μμ±","μ±νμ¬λΆ"] return struct[[c for c in out if c in struct.columns]].reset_index(drop=True) base_str = meas_str + "\n" + var_str + cov_part + "\n" + struct_str base_no_var = meas_str + cov_part + "\n" + struct_str m0 = _fit_model(base_str) or _fit_model(base_no_var) if m0 is None: return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() fit0 = _extract_fit(m0) path0 = _extract_paths(m0) extra_cov = list(cfa_cov_pairs) added_pairs = {tuple(sorted(p)) for p in cfa_cov_pairs} mod_log = [] m_cur, fit_cur = m0, fit0.copy() for step in range(max_mods): if _adequate_check(fit_cur): break mi_cur = calc_mi_approx(m_cur, data, all_items) avail = mi_cur[(mi_cur["MI"] > mi_threshold) & (~mi_cur.apply( lambda r: tuple(sorted([r["λ³μ1"], r["λ³μ2"]])) in added_pairs, axis=1))] if avail.empty: break row = avail.iloc[0] v1, v2 = row["λ³μ1"], row["λ³μ2"] pair = tuple(sorted([v1, v2])) new_cov = extra_cov + [(v1, v2)] new_cov_str = "\n".join(f" {a} ~~ {b}" for a, b in new_cov) new_str = (meas_str + "\n" + var_str + "\n" + new_cov_str + "\n" + struct_str if "\n" + var_str in base_str else meas_str + "\n" + new_cov_str + "\n" + struct_str) m_new = _fit_model(new_str) if m_new is None: added_pairs.add(pair); continue fit_new = _extract_fit(m_new) lv1 = item_to_lv.get(v1, v1); lv2 = item_to_lv.get(v2, v2) mod_log.append({ "λ¨κ³": step+1, "μμ κ²½λ‘": f"{v1} ~~ {v2}", "μμ μμΈ": lv1 if lv1==lv2 else f"{lv1}β{lv2}", "MI": round(row["MI"],3), "CFI μ βν": f"{fit_cur.get('CFI','-')}β{fit_new.get('CFI','-')}", "RMSEA μ βν": f"{fit_cur.get('RMSEA','-')}β{fit_new.get('RMSEA','-')}", }) extra_cov = new_cov; m_cur = m_new; fit_cur = fit_new added_pairs.add(pair) path_final = _extract_paths(m_cur) was_mod = len(mod_log) > 0 init_mi_sem = calc_mi_approx(m0, data, all_items) final_mi_sem = calc_mi_approx(m_cur, data, all_items) return (path_final, fit0, fit_cur, pd.DataFrame(mod_log), extra_cov, was_mod, init_mi_sem, final_mi_sem) except Exception as e: st.error(f"SEM μ€λ₯: {e}") return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() def run_correlation(df, constructs): try: comp = {lv: df[items].mean(axis=1) for lv, items in constructs.items()} corr = pd.DataFrame(comp).corr().round(3) sqrt_ave = {} try: ins, std_col, _, _ = run_cfa_sem(df, constructs) if ins is not None: lv_names = set(constructs.keys()) all_items = set(i for v in constructs.values() for i in v) use_rval = not (ins["op"] == "=~").any() lv_col = "rval" if use_rval else "lval" for lv, items in constructs.items(): raw = ins[ins[lv_col] == lv][std_col].values lam = np.array([v for v in raw if str(v).strip() not in ("-","","nan","None")], dtype=float) ave, _ = calc_ave_cr(lam) sqrt_ave[lv] = round(float(np.sqrt(ave)), 3) except Exception: pass tbl = corr.copy().astype(object) for lv in constructs: if lv in tbl.columns: tbl.loc[lv, lv] = sqrt_ave.get(lv, "-") for i in range(len(tbl.columns)): for j in range(i + 1, len(tbl.columns)): tbl.iloc[i, j] = "" return tbl.reset_index().rename(columns={"index": "λ³μ"}) except Exception as e: st.error(f"μκ΄κ΄κ³ λΆμ μ€λ₯: {e}") return pd.DataFrame() # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # Excel μμ± # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ def build_single_sheet_excel(sheets: dict) -> bytes: try: from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side import io as _io wb = Workbook() ws = wb.active ws.title = "λΆμκ²°κ³Ό" _hdr_font = Font(name="Arial", bold=True, color="FFFFFF", size=10) _hdr_fill = PatternFill("solid", fgColor="2F5496") _ttl_font = Font(name="Arial", bold=True, size=12, color="2F5496") _note_font = Font(name="Arial", size=9, color="595959", italic=True) _ctr = Alignment(horizontal="center", vertical="center", wrap_text=True) _lft = Alignment(horizontal="left", vertical="center") _bd = Border(left=Side(style="thin"), right=Side(style="thin"), top=Side(style="thin"), bottom=Side(style="thin")) _adopt_fill_y = PatternFill("solid", fgColor="E2EFDA") _adopt_fill_n = PatternFill("solid", fgColor="FFC7CE") _adopt_font_y = Font(name="Arial", size=10, color="375623") _adopt_font_n = Font(name="Arial", size=10, color="9C0006") cur_row = 1 for name, payload in sheets.items(): try: title = payload[0] df_data = payload[1] note = payload[2] if len(payload) > 2 else "" adopt_col = payload[3] if len(payload) > 3 else None if df_data is None or len(df_data) == 0: continue cols = list(df_data.columns) nc = max(len(cols), 1) adopt_idx = (cols.index(adopt_col) + 1 if adopt_col and adopt_col in cols else None) ws.merge_cells(start_row=cur_row, start_column=1, end_row=cur_row, end_column=nc) cell = ws.cell(row=cur_row, column=1, value=title) cell.font = _ttl_font cur_row += 1 for ci, col in enumerate(cols, 1): c = ws.cell(row=cur_row, column=ci, value=str(col)) c.font = _hdr_font; c.fill = _hdr_fill c.alignment = _ctr; c.border = _bd cur_row += 1 for row in df_data.itertuples(index=False): for ci, val in enumerate(row, 1): safe_val = ("" if isinstance(val, float) and (val != val) else val) cell = ws.cell(row=cur_row, column=ci, value=safe_val) cell.alignment = _lft if ci <= 2 else _ctr cell.border = _bd if adopt_idx and ci == adopt_idx: is_y = str(val) == "μ±ν" cell.fill = _adopt_fill_y if is_y else _adopt_fill_n cell.font = _adopt_font_y if is_y else _adopt_font_n cur_row += 1 if note: nc_cell = ws.cell(row=cur_row, column=1, value=note) nc_cell.font = _note_font cur_row += 1 cur_row += 2 except Exception: pass for col_cells in ws.columns: try: w = max((len(str(c.value)) if c.value else 0) for c in col_cells) ws.column_dimensions[col_cells[0].column_letter].width = min(w + 4, 45) except Exception: pass buf = _io.BytesIO() wb.save(buf) return buf.getvalue() except Exception: from openpyxl import Workbook import io as _io wb = Workbook(); buf = _io.BytesIO(); wb.save(buf); return buf.getvalue() # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ # UI # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ st.markdown('
π SEM λΆμκΈ°
', unsafe_allow_html=True) st.markdown('λΉλλΆμ Β· κΈ°μ ν΅κ³ Β· μ λ’°λ Β· CFA Β· μκ΄κ΄κ³ Β· SEM
', unsafe_allow_html=True) # ββ STEP 1: νμΌ μ λ‘λ βββββββββββββββββββββββββββββββββββββββββββββββββββββββ st.markdown("### π STEP 1. λ°μ΄ν° μ λ‘λ") uploaded = st.file_uploader("Excel(.xlsx/.xls) λλ CSV νμΌ", type=["xlsx","xls","csv"]) if not uploaded: st.info("νμΌμ μ λ‘λνλ©΄ λΆμμ΄ μμλ©λλ€.") st.stop() try: if uploaded.name.lower().endswith(".csv"): df = pd.read_csv(uploaded) else: df = pd.read_excel(uploaded) except Exception as e: st.error(f"νμΌ μ½κΈ° μ€λ₯: {e}") st.stop() for c in df.columns: try: converted = pd.to_numeric(df[c], errors="coerce") if converted.notna().sum() == df[c].notna().sum() and df[c].notna().sum() > 0: df[c] = converted except Exception: pass st.success(f"β νμΌ λ‘λ μλ£ β {len(df)}ν Γ {len(df.columns)}μ΄") with st.expander("π λ°μ΄ν° 미리보기 (μμ 10ν)"): st.dataframe(df.head(10), use_container_width=True) # ββ STEP 1-5: μ΄μμΉ νμ§ ββββββββββββββββββββββββββββββββββββββββββββββββββββ st.markdown("---") st.markdown("### π STEP 1-5. μ΄μμΉ νμ§ (λ§ν λΌλ ΈλΉμ€ 거리 + SMC)") num_cols_all = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] tab_maha, tab_smc = st.tabs(["π λ§ν λΌλ ΈλΉμ€ 거리", "π SMC (λ€μ€μκ΄μμΉ)"]) with tab_maha: st.caption("ΟΒ² λΆν¬ κΈ°μ€μΌλ‘ λ§ν λΌλ ΈλΉμ€ κ±°λ¦¬κ° μκ³κ°μ μ΄κ³Όνλ **νλ³Έ(ν)**μ μ΄μμΉλ‘ νμ§ν©λλ€.") col_a, col_b = st.columns([3, 1]) outlier_cols = col_a.multiselect("λΆμ λ³μ μ ν", num_cols_all, default=num_cols_all, key="oc_cols") p_thr = col_b.selectbox("μ μμμ€", [0.001, 0.005, 0.01, 0.05], index=0, key="oc_pthr") if outlier_cols and st.button("π λ§ν λΌλ ΈλΉμ€ νμ§ μ€ν", key="oc_run"): oc_result, oc_cutoff, oc_idx = detect_outliers_mahalanobis(df, outlier_cols, p_threshold=p_thr) st.session_state["oc_result"] = oc_result st.session_state["oc_cutoff"] = oc_cutoff st.session_state["oc_idx"] = oc_idx st.session_state.pop("oc_remove_rows", None) if "oc_result" in st.session_state and st.session_state["oc_result"] is not None: oc_result = st.session_state["oc_result"] oc_cutoff = st.session_state["oc_cutoff"] oc_idx = st.session_state["oc_idx"] n_out = len(oc_idx) st.markdown(f"**ΟΒ² μκ³κ°:** `{oc_cutoff}`") col_r1, col_r2, col_r3 = st.columns(3) col_r1.metric("μ 체 νλ³Έ", len(df)) col_r2.metric("νμ§λ μ΄μμΉ", n_out, delta=f"-{n_out}" if n_out > 0 else "μμ", delta_color="inverse") col_r3.metric("μ κ±° ν νλ³Έ", len(df) - n_out) disp_all = oc_result.sort_values("λ§ν λΌλ ΈλΉμ€ 거리", ascending=False).copy() disp_all["νμ "] = disp_all["μ΄μμΉ"].map({True: "β οΈ μ΄μμΉ", False: "β μ μ"}) disp_all = disp_all.drop(columns=["μ΄μμΉ"]) with st.expander("π λ§ν λΌλ ΈλΉμ€ μ 체 κ²°κ³Ό", expanded=(n_out > 0)): st.dataframe(disp_all, use_container_width=True, hide_index=True) if n_out > 0: outlier_row_nums = (oc_result[oc_result["μ΄μμΉ"] == True]["μλ³Έ νλ²νΈ"].tolist() if "μ΄μμΉ" in oc_result.columns else []) _default_rows = [r for r in st.session_state.get("oc_remove_rows", outlier_row_nums) if r in outlier_row_nums] st.multiselect(f"ποΈ μ κ±°ν νλ³Έ μ ν (μ΄ {n_out}κ° νμ§)", options=outlier_row_nums, default=_default_rows, key="oc_remove_rows") else: st.success("β μ΄μμΉκ° νμ§λμ§ μμμ΅λλ€.") with tab_smc: st.caption("κ° λ³μλ₯Ό λλ¨Έμ§ λ³μλ‘ νκ·λΆμν RΒ²(SMC) < .20 μ΄λ©΄ λ³μ μ κ±°λ₯Ό κ³ λ €ν©λλ€.") smc_cols = st.multiselect("SMC λΆμ λ³μ μ ν", num_cols_all, default=num_cols_all, key="smc_cols") if smc_cols and st.button("π SMC νμ§ μ€ν", key="smc_run"): with st.spinner("SMC κ³μ° μ€..."): smc_result = compute_smc(df, smc_cols) st.session_state["smc_result"] = smc_result st.session_state.pop("smc_drop_vars", None) if "smc_result" in st.session_state and st.session_state["smc_result"] is not None: smc_result = st.session_state["smc_result"] low_smc = smc_result[smc_result["SMC"].notna() & (smc_result["SMC"] < 0.2)] n_low = len(low_smc) col_s1, col_s2 = st.columns(2) col_s1.metric("λΆμ λ³μ μ", len(smc_result)) col_s2.metric("SMC < .20 λ³μ", n_low) with st.expander("π SMC λΆμ κ²°κ³Ό", expanded=(n_low > 0)): st.dataframe(smc_result.sort_values("SMC").reset_index(drop=True), use_container_width=True, hide_index=True) if n_low > 0: low_vars = low_smc["λ³μ"].tolist() _default_vars = [v for v in st.session_state.get("smc_drop_vars", low_vars) if v in smc_result["λ³μ"].tolist()] st.multiselect(f"ποΈ μ κ±°ν λ³μ μ ν (SMC < .20, μ΄ {n_low}κ°)", options=smc_result["λ³μ"].tolist(), default=_default_vars, key="smc_drop_vars") else: st.success("β SMC < .20 λ³μκ° μμ΅λλ€.") # ββ μ΄μμΉ/λ³μ μ κ±° μ μ© βββββββββββββββββββββββββββββββββββββββββββββββββββββ _removed_rows = st.session_state.get("oc_remove_rows", []) _dropped_vars = st.session_state.get("smc_drop_vars", []) if _removed_rows: _remove_idx = [int(r) - 1 for r in _removed_rows] _valid_idx = [i for i in _remove_idx if i in df.index] if _valid_idx: df = df.drop(index=_valid_idx).reset_index(drop=True) st.info(f"ποΈ μ΄μμΉ **{len(_valid_idx)}κ° νλ³Έ** μ κ±° β λΆμ νλ³Έ: **{len(df)}κ°**") if _dropped_vars: _actual_drop = [v for v in _dropped_vars if v in df.columns] if _actual_drop: df = df.drop(columns=_actual_drop) st.info(f"ποΈ SMC λ³μ **{len(_actual_drop)}κ°** μ κ±°: `{', '.join(_actual_drop)}`") # ββ STEP 2: ꡬμ±κ°λ νμ§ βββββββββββββββββββββββββββββββββββββββββββββββββββββ st.markdown("---") st.markdown("### π STEP 2. ꡬμ±κ°λ μλ νμ§") constructs_auto = auto_detect_constructs(df) c1, c2, c3 = st.columns(3) c1.metric("μ 체 λ³μ", len(df.columns)) c2.metric("μ ν¨ μ¬λ‘", len(df.dropna())) c3.metric("νμ§λ ꡬμ±κ°λ ", len(constructs_auto)) if constructs_auto: for lv, items in constructs_auto.items(): st.markdown(f"- **{lv}** ({len(items)}λ¬Έν): {', '.join(items)}") else: st.warning("μλ νμ§λ ꡬμ±κ°λ μ΄ μμ΅λλ€. μλμμ μ§μ μ€μ ν΄ μ£ΌμΈμ.") with st.expander("βοΈ κ΅¬μ±κ°λ μ§μ νΈμ§ (νμ μ)"): n_c = st.number_input("ꡬμ±κ°λ μ", min_value=1, max_value=20, value=max(len(constructs_auto), 1), key="n_constructs") auto_k = list(constructs_auto.keys()) auto_v = list(constructs_auto.values()) constructs_edit = {} for i in range(int(n_c)): ca, cb = st.columns([1, 3]) default_name = auto_k[i] if i < len(auto_k) else f"LV{i+1}" default_items = auto_v[i] if i < len(auto_v) else [] nm = ca.text_input(f"μ΄λ¦ {i+1}", value=default_name, key=f"cn_{i}") it = cb.multiselect(f"λ¬Έν {i+1}", df.columns.tolist(), default=default_items, key=f"ci_{i}") if nm and len(it) >= 2: constructs_edit[nm] = it constructs = constructs_edit if constructs_edit else constructs_auto if not constructs: constructs = constructs_auto # ββ STEP 3: λΆμ λ°©λ² μ ν ββββββββββββββββββββββββββββββββββββββββββββββββββββ st.markdown("---") st.markdown("### β STEP 3. λΆμ λ°©λ² μ ν") num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] cat_cols = [c for c in df.columns if detect_scale(df[c]) in ("binary","categorical")] lv_list = list(constructs.keys()) n_valid = len(df.dropna()) suggestions = { "λΉλλΆμ": {"ok": len(cat_cols) > 0, "reason": f"λ²μ£Όν λ³μ {len(cat_cols)}κ° κ°μ§"}, "κΈ°μ ν΅κ³": {"ok": len(num_cols) > 0, "reason": f"μμΉν λ³μ {len(num_cols)}κ°"}, "μ λ’°λ (Cronbach's Ξ±)": {"ok": len(constructs) > 0, "reason": f"ꡬμ±κ°λ {len(constructs)}κ° νμ§"}, "νμΈμ μμΈλΆμ (CFA)": {"ok": len(constructs) >= 2 and n_valid >= 100, "reason": f"ꡬμ±κ°λ {len(constructs)}κ°, N={n_valid}"}, "μκ΄κ΄κ³ λΆμ": {"ok": len(constructs) >= 2, "reason": f"μ μ¬λ³μ {len(constructs)}κ°"}, "ꡬ쑰방μ μ (SEM)": {"ok": len(constructs) >= 3 and n_valid >= 200, "reason": f"ꡬμ±κ°λ {len(constructs)}κ°, N={n_valid}"}, } selected = {} for method, info in suggestions.items(): icon = "β " if info["ok"] else "β οΈ" col_a, col_b = st.columns([1, 10]) selected[method] = col_a.checkbox(f"{icon}", value=info["ok"], key=f"sel_{method}") col_b.markdown( f'