| |
| |
| |
| |
| import streamlit as st |
| import pandas as pd |
| import numpy as np |
| import re |
| from datetime import datetime |
| from collections import defaultdict |
|
|
| st.set_page_config(page_title="π SEM λΆμκΈ°", |
| page_icon="π", layout="wide") |
|
|
| |
| def _check_password(): |
| try: |
| correct_pw = st.secrets.get("PASSWORD", "9400") |
| except Exception: |
| correct_pw = "9400" |
|
|
| def _submit(): |
| if st.session_state.get("_pw_input") == correct_pw: |
| st.session_state["_pw_ok"] = True |
| else: |
| st.session_state["_pw_ok"] = False |
|
|
| if st.session_state.get("_pw_ok"): |
| return True |
|
|
| st.markdown("## π SEM λΆμκΈ°") |
| st.text_input("λΉλ°λ²νΈλ₯Ό μ
λ ₯νμΈμ", type="password", |
| key="_pw_input", on_change=_submit) |
| if "_pw_ok" in st.session_state and not st.session_state["_pw_ok"]: |
| st.error("λΉλ°λ²νΈκ° νλ Έμ΅λλ€.") |
| st.stop() |
|
|
| _check_password() |
|
|
| st.markdown(""" |
| <style> |
| .main-title{font-size:2rem;font-weight:700;color:#2F5496} |
| .sub-title{font-size:.95rem;color:#666;margin-bottom:1rem} |
| .card{background:#F3F6FB;border-left:4px solid #2F5496; |
| padding:8px 14px;border-radius:4px;margin:3px 0} |
| </style>""", unsafe_allow_html=True) |
|
|
| |
| try: |
| from modules.utils import detect_scale, cronbach_alpha, calc_ave_cr, fmt_p, sig_stars, build_excel |
| except Exception as e: |
| st.error(f"λͺ¨λ λ‘λ μ€λ₯: {e}\n\nμ±κ³Ό κ°μ ν΄λμ modules/ ν΄λκ° μλμ§ νμΈνμΈμ.") |
| st.stop() |
|
|
|
|
| |
| |
| |
|
|
| def detect_outliers_mahalanobis(df, cols, p_threshold=0.001): |
| from scipy import stats as sc |
| data = df[cols].dropna() |
| n, k = data.shape |
| if n < k + 2: |
| return None, None, [] |
| try: |
| mean = data.mean().values |
| cov_mat = np.cov(data.values.T, ddof=1) |
| try: |
| cov_inv = np.linalg.inv(cov_mat) |
| except np.linalg.LinAlgError: |
| cov_inv = np.linalg.pinv(cov_mat) |
| diffs = data.values - mean |
| mahal = np.array([float(d @ cov_inv @ d) for d in diffs]) |
| pvals = sc.chi2.sf(mahal, df=k) |
| cutoff = sc.chi2.ppf(1 - p_threshold, df=k) |
| result = pd.DataFrame({ |
| "μλ³Έ νλ²νΈ": data.index + 1, |
| "λ§ν λΌλ
ΈλΉμ€ 거리": np.round(mahal, 3), |
| "pκ°": np.round(pvals, 4), |
| "μ΄μμΉ": mahal > cutoff, |
| }).reset_index(drop=True) |
| outlier_idx = data.index[mahal > cutoff].tolist() |
| return result, round(cutoff, 3), outlier_idx |
| except Exception: |
| return None, None, [] |
|
|
|
|
| def compute_smc(df, cols): |
| data = df[cols].dropna() |
| if len(data) < len(cols) + 2: |
| return pd.DataFrame({"λ³μ": cols, "SMC": [np.nan]*len(cols), |
| "νμ ": ["μ¬λ‘ μ λΆμ‘±"]*len(cols)}) |
| rows = [] |
| for col in cols: |
| others = [c for c in cols if c != col] |
| if not others: |
| rows.append({"λ³μ": col, "SMC": np.nan, "νμ ": "-"}) |
| continue |
| try: |
| X = data[others].values.astype(float) |
| y = data[col].values.astype(float) |
| Xc = np.column_stack([np.ones(len(X)), X]) |
| b, _, _, _ = np.linalg.lstsq(Xc, y, rcond=None) |
| yh = Xc @ b |
| ss_res = float(np.sum((y - yh) ** 2)) |
| ss_tot = float(np.sum((y - y.mean()) ** 2)) |
| r2 = (1.0 - ss_res / ss_tot) if ss_tot > 0 else np.nan |
| rows.append({ |
| "λ³μ": col, |
| "SMC": round(float(r2), 3), |
| "νμ ": "β οΈ μ κ±° κ³ λ € (SMC < .20)" |
| if (not np.isnan(r2) and r2 < 0.2) else "β
μ μ" |
| }) |
| except Exception: |
| rows.append({"λ³μ": col, "SMC": np.nan, "νμ ": "κ³μ° μ€λ₯"}) |
| return pd.DataFrame(rows) |
|
|
|
|
| def auto_detect_constructs(df): |
| likert = [c for c in df.columns if detect_scale(df[c]) == "likert"] |
| groups = defaultdict(list) |
| for col in likert: |
| m = re.match(r'^([A-Za-z]+\d*)(\d)$', col) |
| groups[m.group(1) if m else col].append(col) |
| return {k: v for k, v in groups.items() if len(v) >= 2} |
|
|
|
|
| def _adequate_check(fit): |
| return (fit.get("NFI",0)>=.90 and fit.get("RFI",0)>=.90 and |
| fit.get("IFI",0)>=.90 and fit.get("TLI",0)>=.90 and |
| fit.get("CFI",0)>=.90 and fit.get("RMSEA",1)<.049) |
|
|
|
|
| def get_composite_scores(df, constructs): |
| return pd.DataFrame( |
| {lv: df[items].mean(axis=1) for lv, items in constructs.items()}) |
|
|
|
|
| |
| |
| |
|
|
| def calc_mi_approx(model, obs_df, obs_vars): |
| try: |
| n = len(obs_df) |
| S = np.cov(obs_df[obs_vars].values.T, ddof=1) |
| Sig, _ = model.calc_sigma() |
| Sig = Sig + np.eye(len(obs_vars)) * 1e-8 |
| Si = np.linalg.inv(Sig) |
| G = Si @ (S - Sig) @ Si |
| rows = [] |
| p = len(obs_vars) |
| for i in range(p): |
| for j in range(i + 1, p): |
| denom = 2 * Si[i, i] * Si[j, j] + 2 * Si[i, j] ** 2 |
| mi = (n - 1) * G[i, j] ** 2 / max(denom, 1e-8) |
| rows.append({"λ³μ1": obs_vars[i], "λ³μ2": obs_vars[j], |
| "MI": round(float(mi), 3)}) |
| return pd.DataFrame(rows).sort_values("MI", ascending=False).reset_index(drop=True) |
| except Exception: |
| return pd.DataFrame() |
|
|
|
|
| def run_cfa_with_mi(df, constructs, mi_threshold=3.84, max_mods=200): |
| try: |
| from semopy import Model, calc_stats |
| except ImportError: |
| return None, "semopy ν¨ν€μ§κ° νμν©λλ€." |
| try: |
| item_to_lv = {item: lv for lv, items in constructs.items() for item in items} |
| all_items = [i for items in constructs.values() for i in items] |
| data = df[all_items].dropna() |
|
|
| meas_lines = [f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] |
| meas_str = "\n".join(meas_lines) |
| lv_keys = list(constructs.keys()) |
| lv_cov_lines = [f" {lv_keys[i]} ~~ {lv_keys[j]}" |
| for i in range(len(lv_keys)) |
| for j in range(i + 1, len(lv_keys))] |
| meas_str = meas_str + ("\n" + "\n".join(lv_cov_lines) if lv_cov_lines else "") |
|
|
| def _fit(model_str): |
| try: |
| m = Model(model_str); m.fit(data); return m |
| except Exception: |
| return None |
|
|
| base_str = meas_str |
| for _var_syntax in [ |
| "\n".join(f" {lv} ~~ 1*{lv}" for lv in constructs.keys()), |
| "\n".join(f" {lv} ~~ 1 * {lv}" for lv in constructs.keys()), |
| "\n".join(f" {lv} ~~ 1@{lv}" for lv in constructs.keys()), |
| ]: |
| _candidate = meas_str + "\n" + _var_syntax |
| if _fit(_candidate) is not None: |
| base_str = _candidate |
| break |
|
|
| def _extract_fit(m): |
| try: |
| sd = calc_stats(m).iloc[0].to_dict() |
| chi2 = float(sd.get("chi2", 0)) |
| dof = float(sd.get("DoF", 1)) |
| chi2_bl = float(sd.get("chi2 Baseline", 0)) |
| dof_bl = float(sd.get("DoF Baseline", 1)) |
| pval = float(sd.get("chi2 p-value", 1)) |
| cfi = float(sd.get("CFI", 0)) |
| tli = float(sd.get("TLI", 0)) |
| nfi = float(sd.get("NFI", 0)) |
| rmsea = float(sd.get("RMSEA", 1)) |
| rfi = ((chi2_bl/dof_bl - chi2/dof) / (chi2_bl/dof_bl) |
| if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) |
| ifi = ((chi2_bl - chi2) / (chi2_bl - dof) |
| if (chi2_bl - dof) > 0 else 0.0) |
| return {"ΟΒ²": round(chi2,3), "df": int(dof), |
| "ΟΒ²/df": round(chi2/dof,3) if dof>0 else "-", |
| "p": round(pval,3), |
| "NFI": round(nfi,3), "RFI": round(rfi,3), |
| "IFI": round(ifi,3), "TLI": round(tli,3), |
| "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} |
| except Exception: |
| return {} |
|
|
| m0 = _fit(base_str) |
| if m0 is None: |
| return None, "μ΄κΈ° CFA μΆμ μ€ν¨" |
| fit0 = _extract_fit(m0) |
| mi_df0 = calc_mi_approx(m0, data, all_items) |
|
|
| extra_cov = [] |
| added_pairs = set() |
| mod_log = [] |
| m_cur, fit_cur, mi_cur = m0, fit0.copy(), mi_df0.copy() |
|
|
| for step in range(max_mods): |
| if _adequate_check(fit_cur): |
| break |
| high_mi = mi_cur[mi_cur["MI"] > mi_threshold] |
| if high_mi.empty: |
| break |
| avail = high_mi[ |
| high_mi.apply( |
| lambda r: tuple(sorted([r["λ³μ1"], r["λ³μ2"]])) not in added_pairs, |
| axis=1)] |
| if avail.empty: |
| break |
| row = avail.iloc[0] |
| v1, v2 = row["λ³μ1"], row["λ³μ2"] |
| pair = tuple(sorted([v1, v2])) |
| new_cov = extra_cov + [(v1, v2)] |
| new_str = (base_str + "\n" + |
| "\n".join(f" {a} ~~ {b}" for a, b in new_cov)) |
| m_new = _fit(new_str) |
| if m_new is None: |
| added_pairs.add(pair); continue |
| fit_new = _extract_fit(m_new) |
| lv1 = item_to_lv.get(v1, v1) |
| lv2 = item_to_lv.get(v2, v2) |
| mod_log.append({ |
| "λ¨κ³": step + 1, |
| "μμ κ²½λ‘": f"{v1} ~~ {v2}", |
| "μμ μμΈ": lv1 if lv1 == lv2 else f"{lv1}β{lv2}", |
| "MI": round(row["MI"], 3), |
| "CFI μ βν": f"{fit_cur.get('CFI','-')} β {fit_new.get('CFI','-')}", |
| "RMSEA μ βν": f"{fit_cur.get('RMSEA','-')} β {fit_new.get('RMSEA','-')}", |
| }) |
| extra_cov = new_cov |
| m_cur, fit_cur = m_new, fit_new |
| added_pairs.add(pair) |
| mi_cur = calc_mi_approx(m_cur, data, all_items) |
|
|
| ins = m_cur.inspect(std_est=True) |
| std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) |
| final_mi = calc_mi_approx(m_cur, data, all_items) |
| return { |
| "init_model": m0, "mod_model": m_cur, |
| "init_fit": fit0, "mod_fit": fit_cur, |
| "init_mi": mi_df0, "final_mi": final_mi, |
| "mod_log": pd.DataFrame(mod_log), |
| "extra_cov": extra_cov, |
| "was_modified": len(extra_cov) > 0, |
| "ins": ins, "std_col": std_col, |
| "data": data, "all_items": all_items, |
| "base_str": base_str, |
| }, None |
| except Exception as e: |
| return None, f"CFA MI μ€λ₯: {e}" |
|
|
|
|
| def run_cfa_sem(df, constructs, hypotheses=None): |
| try: |
| from semopy import Model, calc_stats |
| except ImportError: |
| return None, None, None, None |
| try: |
| mm = "\n".join(f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()) |
| if hypotheses: |
| deps = defaultdict(list) |
| for s, t in hypotheses: |
| deps[t].append(s) |
| mm += "\n" + "\n".join(f" {t} ~ {' + '.join(ss)}" for t, ss in deps.items()) |
| all_items = [i for items in constructs.values() for i in items] |
| data = df[all_items].dropna() |
| model = Model(mm); model.fit(data) |
| ins = model.inspect(std_est=True) |
| std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) |
| stats = calc_stats(model) |
| try: |
| stats_dict = stats.iloc[0].to_dict() if hasattr(stats, "iloc") else dict(stats) |
| except Exception: |
| stats_dict = {} |
| try: |
| chi2 = float(stats_dict.get("chi2", 0)) |
| dof = float(stats_dict.get("DoF", 1)) |
| chi2_bl = float(stats_dict.get("chi2 Baseline", 0)) |
| dof_bl = float(stats_dict.get("DoF Baseline", 1)) |
| pval = float(stats_dict.get("chi2 p-value", 1)) |
| cfi = float(stats_dict.get("CFI", 0)) |
| tli = float(stats_dict.get("TLI", 0)) |
| nfi = float(stats_dict.get("NFI", 0)) |
| rmsea = float(stats_dict.get("RMSEA", 1)) |
| rfi = ((chi2_bl/dof_bl - chi2/dof)/(chi2_bl/dof_bl) |
| if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) |
| ifi = ((chi2_bl - chi2)/(chi2_bl - dof) |
| if (chi2_bl - dof) > 0 else 0.0) |
| fit = {"ΟΒ²": round(chi2,3), "df": int(dof), |
| "ΟΒ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3), |
| "NFI": round(nfi,3), "RFI": round(rfi,3), |
| "IFI": round(ifi,3), "TLI": round(tli,3), |
| "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} |
| except Exception: |
| fit = {} |
| return ins, std_col, stats_dict, fit |
| except Exception: |
| return None, None, None, None |
|
|
|
|
| def _extract_load_df(ins, std_col, constructs): |
| lv_names = set(constructs.keys()) |
| all_items = set(i for v in constructs.values() for i in v) |
| if (ins["op"] == "=~").any(): |
| load_df = ins[ins["op"] == "=~"].copy() |
| lv_col, ind_col = "lval", "rval" |
| else: |
| load_df = ins[ins["lval"].isin(all_items) & ins["rval"].isin(lv_names)].copy() |
| lv_col, ind_col = "rval", "lval" |
| cols_needed = [c for c in [lv_col, ind_col, std_col, "Std. Err", "z-value", "p-value"] |
| if c in load_df.columns] |
| load_df = load_df[cols_needed].copy() |
| rename_map = {lv_col: "μ μ¬λ³μ", ind_col: "μΈ‘μ λ³μ", std_col: "νμ€νκ³μ", |
| "Std. Err": "SE", "z-value": "tκ°", "p-value": "pκ°_raw"} |
| load_df = load_df.rename(columns=rename_map) |
| if "pκ°_raw" in load_df.columns: |
| load_df["pκ°"] = load_df["pκ°_raw"].apply( |
| lambda p: fmt_p(p) if str(p).strip() not in ("-", "") else "-") |
| load_df = load_df.drop(columns=["pκ°_raw"]) |
| load_df["νμ€νκ³μ"] = pd.to_numeric(load_df["νμ€νκ³μ"], errors="coerce").round(3) |
| for col in ["SE", "tκ°"]: |
| if col in load_df.columns: |
| load_df[col] = load_df[col].apply( |
| lambda v: round(float(v), 3) |
| if pd.notna(v) and str(v).strip() not in ("None", "", "nan", "-") else "-") |
| return load_df.reset_index(drop=True), lv_col |
|
|
|
|
| |
| |
| |
|
|
| def _get_rscript_path(): |
| import subprocess, os, glob |
| try: |
| r = subprocess.run(['Rscript', '--version'], capture_output=True, timeout=10) |
| if r.returncode == 0: |
| return 'Rscript' |
| except Exception: |
| pass |
| candidates = glob.glob(r'C:\Program Files\R\R-*\bin\Rscript.exe') |
| candidates += glob.glob(r'C:\Program Files (x86)\R\R-*\bin\Rscript.exe') |
| if candidates: |
| candidates.sort(reverse=True) |
| return candidates[0] |
| return None |
|
|
|
|
| def _rscript_available(): |
| return _get_rscript_path() is not None |
|
|
|
|
| def _parse_fit_csv(path): |
| df = pd.read_csv(path) |
| if df.empty: |
| return {} |
| row = df.iloc[0] |
| chi2 = float(row.get('chisq', 0)) |
| dof = float(row.get('df', 1)) |
| return { |
| 'ΟΒ²': round(chi2, 3), |
| 'df': int(dof), |
| 'ΟΒ²/df': round(chi2 / dof, 3) if dof > 0 else '-', |
| 'p': round(float(row.get('pvalue', 1)), 6), |
| 'NFI': round(float(row.get('nfi', 0)), 3), |
| 'RFI': round(float(row.get('rfi', 0)), 3), |
| 'IFI': round(float(row.get('ifi', 0)), 3), |
| 'TLI': round(float(row.get('tli', 0)), 3), |
| 'CFI': round(float(row.get('cfi', 0)), 3), |
| 'RMSEA': round(float(row.get('rmsea', 1)), 3), |
| } |
|
|
|
|
| def _fmt_mi_df(df): |
| if df is None or df.empty: |
| return pd.DataFrame(columns=['λ³μ1', 'λ³μ2', 'MI']) |
| df = df.copy() |
| df.columns = ['λ³μ1', 'λ³μ2', 'MI'] |
| df['MI'] = pd.to_numeric(df['MI'], errors='coerce').round(3) |
| return df.sort_values('MI', ascending=False).reset_index(drop=True) |
|
|
|
|
| def run_lavaan_cfa_with_mi(df, constructs, mi_threshold=3.84, max_mods=200, |
| manual_extra_cov=None): |
| import subprocess, tempfile, os |
|
|
| if not _rscript_available(): |
| return None, "Rscriptλ₯Ό μ°Ύμ μ μμ. R μ€μΉ ν PATH λ±λ‘ νμ." |
|
|
| all_items = [i for items in constructs.values() for i in items] |
| item_to_lv = {item: lv for lv, items in constructs.items() for item in items} |
| lv_names = list(constructs.keys()) |
| data = df[all_items].dropna() |
|
|
| meas_lines = [f"{lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] |
| manual_pairs = list(manual_extra_cov) if manual_extra_cov else [] |
| extra_lines = [f"{v1} ~~ {v2}" for v1, v2 in manual_pairs] |
| model_base = "\n".join(meas_lines + extra_lines) |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| def p(name): |
| return os.path.join(tmpdir, name).replace('\\', '/') |
|
|
| data.to_csv(p('data.csv'), index=False) |
| with open(p('model.txt'), 'w', encoding='utf-8') as f: |
| f.write(model_base) |
| with open(p('lvnames.txt'), 'w', encoding='utf-8') as f: |
| f.write('\n'.join(lv_names)) |
|
|
| r_script = f''' |
| suppressPackageStartupMessages(library(lavaan)) |
| options(warn=-1) |
| |
| out <- "{p("")}" |
| data <- read.csv(file.path(out,"data.csv")) |
| mdl_base <- paste(readLines(file.path(out,"model.txt"), encoding="UTF-8"), collapse="\\n") |
| lv_names <- readLines(file.path(out,"lvnames.txt"), encoding="UTF-8") |
| mi_thr <- {mi_threshold} |
| max_mods <- {max_mods} |
| |
| get_fit <- function(fit) {{ |
| fm <- fitMeasures(fit, c("chisq","df","pvalue","cfi","tli","nfi","rfi","ifi","rmsea")) |
| data.frame(as.list(fm)) |
| }} |
| is_ok <- function(fit) {{ |
| fm <- fitMeasures(fit, c("cfi","rmsea")) |
| unname(fm["cfi"]) >= 0.95 && unname(fm["rmsea"]) < 0.049 |
| }} |
| get_mi_cov <- function(fit) {{ |
| m <- tryCatch(modindices(fit, sort.=TRUE, maximum.number=500), error=function(e) NULL) |
| if (is.null(m)) return(data.frame(lhs=character(),rhs=character(),mi=numeric())) |
| m <- m[m$op=="~~", c("lhs","rhs","mi")] |
| m[!(m$lhs %in% lv_names & m$rhs %in% lv_names), ] |
| }} |
| |
| fit0 <- tryCatch(cfa(mdl_base, data=data, estimator="ML"), error=function(e) NULL) |
| if (is.null(fit0)) {{ |
| writeLines("ERROR: CFA failed", file.path(out,"status.txt")); quit(status=1) |
| }} |
| write.csv(get_fit(fit0), file.path(out,"fit_init.csv"), row.names=FALSE) |
| mi0 <- get_mi_cov(fit0) |
| write.csv(mi0, file.path(out,"mi_init.csv"), row.names=FALSE) |
| |
| added <- character(0); extra <- character(0); log_rows <- list() |
| fit_cur <- fit0; mdl_cur <- mdl_base |
| |
| for (step in seq_len(max_mods)) {{ |
| if (is_ok(fit_cur)) break |
| mi_c <- get_mi_cov(fit_cur) |
| mi_c <- mi_c[mi_c$mi > mi_thr, ] |
| if (nrow(mi_c)==0) break |
| ok <- sapply(seq_len(nrow(mi_c)), function(i) {{ |
| pk <- paste(sort(c(mi_c$lhs[i], mi_c$rhs[i])), collapse="~~") |
| !(pk %in% added) |
| }}) |
| mi_c <- mi_c[ok, ] |
| if (nrow(mi_c)==0) break |
| |
| v1 <- mi_c$lhs[1]; v2 <- mi_c$rhs[1]; mv <- mi_c$mi[1] |
| pk <- paste(sort(c(v1,v2)), collapse="~~") |
| mdl_new <- paste0(mdl_cur,"\\n ",v1," ~~ ",v2) |
| fit_new <- tryCatch(cfa(mdl_new, data=data, estimator="ML"), error=function(e) NULL) |
| added <- c(added, pk) |
| if (is.null(fit_new)) next |
| |
| fm_b <- fitMeasures(fit_cur, c("cfi","rmsea","rfi","ifi")) |
| fm_a <- fitMeasures(fit_new, c("cfi","rmsea","rfi","ifi")) |
| log_rows[[length(log_rows)+1]] <- data.frame( |
| step=step, v1=v1, v2=v2, mi=round(mv,3), |
| cfi_b=round(unname(fm_b["cfi"]),3), cfi_a=round(unname(fm_a["cfi"]),3), |
| rmsea_b=round(unname(fm_b["rmsea"]),3),rmsea_a=round(unname(fm_a["rmsea"]),3), |
| rfi_b=round(unname(fm_b["rfi"]),3), rfi_a=round(unname(fm_a["rfi"]),3), |
| ifi_b=round(unname(fm_b["ifi"]),3), ifi_a=round(unname(fm_a["ifi"]),3), |
| stringsAsFactors=FALSE) |
| extra <- c(extra, paste(v1, v2, sep=",")) |
| fit_cur <- fit_new; mdl_cur <- mdl_new |
| }} |
| |
| write.csv(get_fit(fit_cur), file.path(out,"fit_mod.csv"), row.names=FALSE) |
| write.csv(as.data.frame(standardizedSolution(fit_cur)), file.path(out,"std_sol.csv"), row.names=FALSE) |
| write.csv(get_mi_cov(fit_cur), file.path(out,"mi_final.csv"), row.names=FALSE) |
| if (length(extra)>0) writeLines(extra, file.path(out,"extra_cov.txt")) |
| if (length(log_rows)>0) {{ |
| write.csv(do.call(rbind, log_rows), file.path(out,"mod_log.csv"), row.names=FALSE) |
| }} |
| writeLines("SUCCESS", file.path(out,"status.txt")) |
| ''' |
| with open(p('cfa.R'), 'w', encoding='utf-8') as f: |
| f.write(r_script) |
|
|
| rscript = _get_rscript_path() |
| try: |
| res = subprocess.run([rscript, '--vanilla', p('cfa.R')], |
| capture_output=True, text=True, timeout=300) |
| except subprocess.TimeoutExpired: |
| return None, "R μ€ν μκ° μ΄κ³Ό (5λΆ)" |
| except Exception as ex: |
| return None, f"Rscript μ€ν μ€λ₯: {ex}" |
|
|
| if not os.path.exists(os.path.join(tmpdir, 'status.txt')): |
| err = res.stderr[-2000:] if res.stderr else "μ μ μλ μ€λ₯" |
| return None, f"R μ€λ₯:\n{err}" |
|
|
| try: |
| fit0 = _parse_fit_csv(os.path.join(tmpdir, 'fit_init.csv')) |
| fit_mod = _parse_fit_csv(os.path.join(tmpdir, 'fit_mod.csv')) |
| std_sol = pd.read_csv(os.path.join(tmpdir, 'std_sol.csv')) |
| mi_init = _fmt_mi_df(pd.read_csv(os.path.join(tmpdir, 'mi_init.csv'))) |
| mf_path = os.path.join(tmpdir, 'mi_final.csv') |
| mi_final = _fmt_mi_df(pd.read_csv(mf_path)) if os.path.exists(mf_path) else pd.DataFrame() |
| except Exception as ex: |
| return None, f"κ²°κ³Ό νμ± μ€λ₯: {ex}" |
|
|
| extra_cov = list(manual_pairs) |
| ec_path = os.path.join(tmpdir, 'extra_cov.txt') |
| if os.path.exists(ec_path): |
| with open(ec_path, encoding='utf-8') as f: |
| for line in f: |
| parts = line.strip().split(',') |
| if len(parts) == 2: |
| t = tuple(parts) |
| if t not in extra_cov: |
| extra_cov.append(t) |
|
|
| mod_log_records = [] |
| ml_path = os.path.join(tmpdir, 'mod_log.csv') |
| if os.path.exists(ml_path): |
| log_df = pd.read_csv(ml_path) |
| for _, row in log_df.iterrows(): |
| lv1 = item_to_lv.get(str(row['v1']), str(row['v1'])) |
| lv2 = item_to_lv.get(str(row['v2']), str(row['v2'])) |
| mod_log_records.append({ |
| 'λ¨κ³': int(row['step']), |
| 'μμ κ²½λ‘': f"{row['v1']} ~~ {row['v2']}", |
| 'μμ μμΈ': lv1 if lv1 == lv2 else f'{lv1}β{lv2}', |
| 'MI': row['mi'], |
| 'CFI μ βν': f"{row['cfi_b']} β {row['cfi_a']}", |
| 'RMSEA μ βν': f"{row['rmsea_b']} β {row['rmsea_a']}", |
| 'RFI μ βν': f"{row.get('rfi_b', '-')} β {row.get('rfi_a', '-')}", |
| 'IFI μ βν': f"{row.get('ifi_b', '-')} β {row.get('ifi_a', '-')}", |
| }) |
|
|
| return { |
| 'init_fit': fit0, |
| 'mod_fit': fit_mod, |
| 'init_mi': mi_init, |
| 'final_mi': mi_final, |
| 'mod_log': pd.DataFrame(mod_log_records), |
| 'extra_cov': extra_cov, |
| 'was_modified': len(extra_cov) > 0, |
| 'std_sol': std_sol, |
| 'data': data, |
| 'all_items': all_items, |
| 'lv_names': lv_names, |
| }, None |
|
|
|
|
| def _extract_load_df_lavaan(std_sol, constructs): |
| try: |
| loads = std_sol[std_sol['op'] == '=~'].copy() |
| cols = [c for c in ['lhs', 'rhs', 'est.std', 'se', 'z', 'pvalue'] if c in loads.columns] |
| loads = loads[cols].copy() |
| rename = {'lhs': 'μ μ¬λ³μ', 'rhs': 'μΈ‘μ λ³μ', |
| 'est.std': 'νμ€νκ³μ', 'se': 'SE', 'z': 'tκ°', 'pvalue': 'pκ°_raw'} |
| loads = loads.rename(columns=rename) |
| loads['νμ€νκ³μ'] = pd.to_numeric(loads['νμ€νκ³μ'], errors='coerce').round(3) |
| loads['SE'] = pd.to_numeric(loads['SE'], errors='coerce').round(3) |
| loads['tκ°'] = pd.to_numeric(loads['tκ°'], errors='coerce').round(3) |
| if 'pκ°_raw' in loads.columns: |
| loads['pκ°'] = loads['pκ°_raw'].apply( |
| lambda p: fmt_p(float(p)) |
| if pd.notna(p) and str(p).strip() not in ('NA', '', 'nan') else '-') |
| loads = loads.drop(columns=['pκ°_raw']) |
| return loads.reset_index(drop=True) |
| except Exception: |
| return pd.DataFrame() |
|
|
|
|
| def build_cfa_tables(df, constructs, mi_threshold=3.84, max_mods=200, |
| manual_extra_cov=None): |
| result, err = run_lavaan_cfa_with_mi(df, constructs, mi_threshold, max_mods, |
| manual_extra_cov=manual_extra_cov) |
| use_lavaan = (result is not None) |
|
|
| if not use_lavaan: |
| st.warning(f"β οΈ lavaan μ€ν¨ β semopy μ¬μ©\n\nμμΈ: {err}") |
| result, err = run_cfa_with_mi(df, constructs, mi_threshold, max_mods) |
|
|
| if result is None: |
| st.error(err or "CFA μ€ν μ€ν¨") |
| return None, None, None, None, None, None, None, False, [] |
|
|
| try: |
| if use_lavaan: |
| std_sol = result["std_sol"] |
| load_df = _extract_load_df_lavaan(std_sol, constructs) |
| rel_rows = [] |
| for lv, items in constructs.items(): |
| lv_loads = std_sol[(std_sol['op'] == '=~') & (std_sol['lhs'] == lv)] |
| lam = pd.to_numeric(lv_loads['est.std'], errors='coerce').dropna().values |
| ave, cr = calc_ave_cr(lam) |
| alpha = cronbach_alpha(df[items]) |
| rel_rows.append({"μ μ¬λ³μ": lv, "AVE": ave, "CR": cr, "Cronbach Ξ±": alpha}) |
| else: |
| ins = result["ins"] |
| std_col = result["std_col"] |
| load_df, lv_col = _extract_load_df(ins, std_col, constructs) |
| rel_rows = [] |
| for lv, items in constructs.items(): |
| raw = ins[ins[lv_col] == lv][std_col].values |
| lam = np.array([v for v in raw |
| if str(v).strip() not in ("-","","nan","None")], dtype=float) |
| ave, cr = calc_ave_cr(lam) |
| alpha = cronbach_alpha(df[items]) |
| rel_rows.append({"μ μ¬λ³μ": lv, "AVE": ave, "CR": cr, "Cronbach Ξ±": alpha}) |
|
|
| return (load_df, pd.DataFrame(rel_rows), |
| result["init_fit"], result["mod_fit"], |
| result["init_mi"], result.get("final_mi", pd.DataFrame()), |
| result["mod_log"], |
| result["was_modified"], result["extra_cov"]) |
| except Exception as e: |
| st.error(f"CFA κ²°κ³Ό μ²λ¦¬ μ€λ₯: {e}") |
| return None, None, None, None, None, None, None, False, [] |
|
|
|
|
| def build_lavaan_sem_table(df, constructs, hypotheses, |
| cfa_extra_cov=None, mi_threshold=3.84, max_mods=200): |
| import subprocess, tempfile, os |
|
|
| if not _rscript_available(): |
| return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) |
|
|
| all_items = [i for items in constructs.values() for i in items] |
| lv_names = list(constructs.keys()) |
| lv_set = set(lv_names) |
| item_to_lv = {item: lv for lv, items in constructs.items() for item in items} |
| data = df[all_items].dropna() |
|
|
| cfa_cov_pairs = (cfa_extra_cov |
| if isinstance(cfa_extra_cov, list) and |
| (not cfa_extra_cov or isinstance(cfa_extra_cov[0], tuple)) |
| else []) |
|
|
| deps = defaultdict(list) |
| for s, t in hypotheses: |
| deps[t].append(s) |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| def p(name): |
| return os.path.join(tmpdir, name).replace('\\', '/') |
|
|
| data.to_csv(p('data.csv'), index=False) |
| with open(p('meas.txt'), 'w', encoding='utf-8') as f: |
| f.write('\n'.join(f"{lv} =~ {' + '.join(items)}" for lv, items in constructs.items())) |
| with open(p('struct.txt'), 'w', encoding='utf-8') as f: |
| f.write('\n'.join(f"{t} ~ {' + '.join(ss)}" for t, ss in deps.items())) |
| with open(p('lvnames.txt'), 'w', encoding='utf-8') as f: |
| f.write('\n'.join(lv_names)) |
| with open(p('cfa_cov.txt'), 'w', encoding='utf-8') as f: |
| for v1, v2 in cfa_cov_pairs: |
| f.write(f'{v1},{v2}\n') |
|
|
| r_script = f''' |
| suppressPackageStartupMessages(library(lavaan)) |
| options(warn=-1) |
| |
| out <- "{p("")}" |
| data <- read.csv(file.path(out,"data.csv")) |
| meas_lines <- readLines(file.path(out,"meas.txt"), encoding="UTF-8") |
| struct_lines <- readLines(file.path(out,"struct.txt"), encoding="UTF-8") |
| lv_names <- readLines(file.path(out,"lvnames.txt"),encoding="UTF-8") |
| cfa_cov_raw <- tryCatch(readLines(file.path(out,"cfa_cov.txt"),encoding="UTF-8"), |
| error=function(e) character(0)) |
| mi_thr <- {mi_threshold} |
| max_mods <- {max_mods} |
| |
| build_model <- function(extra_str=character(0)) {{ |
| cov_lines <- character(0) |
| all_covs <- c(cfa_cov_raw, extra_str) |
| for (cv in all_covs) {{ |
| pts <- strsplit(trimws(cv),",")[[1]] |
| if (length(pts)==2) {{ |
| ln <- paste0(" ",pts[1]," ~~ ",pts[2]) |
| if (!(ln %in% cov_lines)) cov_lines <- c(cov_lines, ln) |
| }} |
| }} |
| paste(c(paste0(" ",meas_lines), cov_lines, paste0(" ",struct_lines)), collapse="\\n") |
| }} |
| |
| get_fit <- function(fit) {{ |
| fm <- fitMeasures(fit, c("chisq","df","pvalue","cfi","tli","nfi","rfi","ifi","rmsea")) |
| data.frame(as.list(fm)) |
| }} |
| is_ok <- function(fit) {{ |
| fm <- fitMeasures(fit, c("cfi","rmsea")) |
| unname(fm["cfi"]) >= 0.95 && unname(fm["rmsea"]) < 0.049 |
| }} |
| get_mi_cov <- function(fit) {{ |
| m <- tryCatch(modindices(fit,sort.=TRUE,maximum.number=500),error=function(e) NULL) |
| if (is.null(m)) return(data.frame(lhs=character(),rhs=character(),mi=numeric())) |
| m <- m[m$op=="~~", c("lhs","rhs","mi")] |
| m[!(m$lhs %in% lv_names & m$rhs %in% lv_names), ] |
| }} |
| |
| fit0 <- tryCatch(sem(build_model(), data=data, estimator="ML"), error=function(e) NULL) |
| if (is.null(fit0)) {{ |
| writeLines("ERROR: SEM initial fit failed", file.path(out,"status.txt")); quit(status=1) |
| }} |
| write.csv(get_fit(fit0), file.path(out,"fit_init.csv"), row.names=FALSE) |
| mi0 <- get_mi_cov(fit0) |
| write.csv(mi0, file.path(out,"mi_init.csv"), row.names=FALSE) |
| |
| added <- character(0); extra <- character(0); log_rows <- list() |
| fit_cur <- fit0 |
| |
| for (step in seq_len(max_mods)) {{ |
| if (is_ok(fit_cur)) break |
| mi_c <- get_mi_cov(fit_cur) |
| mi_c <- mi_c[mi_c$mi > mi_thr, ] |
| if (nrow(mi_c)==0) break |
| ok <- sapply(seq_len(nrow(mi_c)), function(i) {{ |
| pk <- paste(sort(c(mi_c$lhs[i],mi_c$rhs[i])),collapse="~~") |
| !(pk %in% added) |
| }}) |
| mi_c <- mi_c[ok,] |
| if (nrow(mi_c)==0) break |
| |
| v1<-mi_c$lhs[1]; v2<-mi_c$rhs[1]; mv<-mi_c$mi[1] |
| pk <- paste(sort(c(v1,v2)),collapse="~~") |
| extra_new <- c(extra, paste(v1,v2,sep=",")) |
| fit_new <- tryCatch(sem(build_model(extra_new),data=data,estimator="ML"),error=function(e) NULL) |
| added <- c(added, pk) |
| if (is.null(fit_new)) next |
| |
| fm_b <- fitMeasures(fit_cur,c("cfi","rmsea","rfi","ifi")) |
| fm_a <- fitMeasures(fit_new,c("cfi","rmsea","rfi","ifi")) |
| log_rows[[length(log_rows)+1]] <- data.frame( |
| step=step, v1=v1, v2=v2, mi=round(mv,3), |
| cfi_b=round(unname(fm_b["cfi"]),3), cfi_a=round(unname(fm_a["cfi"]),3), |
| rmsea_b=round(unname(fm_b["rmsea"]),3),rmsea_a=round(unname(fm_a["rmsea"]),3), |
| rfi_b=round(unname(fm_b["rfi"]),3), rfi_a=round(unname(fm_a["rfi"]),3), |
| stringsAsFactors=FALSE) |
| extra <- extra_new |
| fit_cur <- fit_new |
| }} |
| |
| write.csv(get_fit(fit_cur), file.path(out,"fit_mod.csv"), row.names=FALSE) |
| write.csv(as.data.frame(standardizedSolution(fit_cur)), file.path(out,"std_sol.csv"), row.names=FALSE) |
| write.csv(get_mi_cov(fit_cur), file.path(out,"mi_final.csv"), row.names=FALSE) |
| if (length(extra)>0) writeLines(extra, file.path(out,"extra_cov.txt")) |
| if (length(log_rows)>0) write.csv(do.call(rbind,log_rows), file.path(out,"mod_log.csv"), row.names=FALSE) |
| writeLines("SUCCESS", file.path(out,"status.txt")) |
| ''' |
| with open(p('sem.R'), 'w', encoding='utf-8') as f: |
| f.write(r_script) |
|
|
| rscript = _get_rscript_path() |
| try: |
| res = subprocess.run([rscript, '--vanilla', p('sem.R')], |
| capture_output=True, text=True, timeout=300) |
| except Exception: |
| return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) |
|
|
| if not os.path.exists(os.path.join(tmpdir, 'status.txt')): |
| return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) |
|
|
| try: |
| fit0 = _parse_fit_csv(os.path.join(tmpdir, 'fit_init.csv')) |
| fit_mod = _parse_fit_csv(os.path.join(tmpdir, 'fit_mod.csv')) |
| std_sol = pd.read_csv(os.path.join(tmpdir, 'std_sol.csv')) |
| mi_init = _fmt_mi_df(pd.read_csv(os.path.join(tmpdir, 'mi_init.csv'))) |
| mf_path = os.path.join(tmpdir, 'mi_final.csv') |
| mi_final = _fmt_mi_df(pd.read_csv(mf_path)) if os.path.exists(mf_path) else pd.DataFrame() |
| except Exception: |
| return _build_sem_semopy(df, constructs, hypotheses, cfa_extra_cov, mi_threshold, max_mods) |
|
|
| extra_cov = list(cfa_cov_pairs) |
| ec_path = os.path.join(tmpdir, 'extra_cov.txt') |
| if os.path.exists(ec_path): |
| with open(ec_path, encoding='utf-8') as f: |
| for line in f: |
| parts = line.strip().split(',') |
| if len(parts) == 2: |
| t = tuple(parts) |
| if t not in extra_cov: |
| extra_cov.append(t) |
|
|
| mod_log_records = [] |
| ml_path = os.path.join(tmpdir, 'mod_log.csv') |
| if os.path.exists(ml_path): |
| log_df = pd.read_csv(ml_path) |
| for _, row in log_df.iterrows(): |
| lv1 = item_to_lv.get(str(row['v1']), str(row['v1'])) |
| lv2 = item_to_lv.get(str(row['v2']), str(row['v2'])) |
| mod_log_records.append({ |
| 'λ¨κ³': int(row['step']), |
| 'μμ κ²½λ‘': f"{row['v1']} ~~ {row['v2']}", |
| 'μμ μμΈ': lv1 if lv1 == lv2 else f'{lv1}β{lv2}', |
| 'MI': row['mi'], |
| 'CFI μ βν': f"{row['cfi_b']} β {row['cfi_a']}", |
| 'RMSEA μ βν': f"{row['rmsea_b']} β {row['rmsea_a']}", |
| }) |
|
|
| def _sp(v): |
| try: |
| return float(v) if str(v).strip() not in ('NA', '', 'nan') else np.nan |
| except Exception: |
| return np.nan |
|
|
| struct = std_sol[(std_sol['op'] == '~') & |
| std_sol['lhs'].isin(lv_set) & |
| std_sol['rhs'].isin(lv_set)].copy() |
| hyp_map = {(s, t): f"H{i+1}" for i, (s, t) in enumerate(hypotheses)} |
| cols_need = [c for c in ['lhs','rhs','est.std','se','z','pvalue'] if c in struct.columns] |
| struct = struct[cols_need].copy() |
| struct.columns = ['μ’
μλ³μ','λ
립λ³μ','νμ€νΞ²','SE','tκ°','pκ°_raw'][:len(cols_need)] |
| struct['κ°μ€'] = struct.apply(lambda r: hyp_map.get((r['λ
립λ³μ'], r['μ’
μλ³μ']), '-'), axis=1) |
| struct['κ²½λ‘'] = struct['λ
립λ³μ'] + ' β ' + struct['μ’
μλ³μ'] |
| struct['μ μμ±'] = struct['pκ°_raw'].apply(lambda v: sig_stars(_sp(v))) |
| struct['μ±νμ¬λΆ'] = struct['pκ°_raw'].apply( |
| lambda v: ('μ±ν' if _sp(v) < 0.05 else 'κΈ°κ°') if not np.isnan(_sp(v)) else '-') |
| struct['pκ°'] = struct['pκ°_raw'].apply( |
| lambda v: fmt_p(_sp(v)) if not np.isnan(_sp(v)) else '-') |
| for col in ['νμ€νΞ²', 'SE', 'tκ°']: |
| struct[col] = pd.to_numeric(struct[col], errors='coerce').round(3) |
| struct = struct.drop(columns=['pκ°_raw']) |
| out_cols = ['κ°μ€','κ²½λ‘','νμ€νΞ²','SE','tκ°','pκ°','μ μμ±','μ±νμ¬λΆ'] |
| path_df = struct[[c for c in out_cols if c in struct.columns]].reset_index(drop=True) |
|
|
| was_mod = len(mod_log_records) > 0 |
| return (path_df, fit0, fit_mod, |
| pd.DataFrame(mod_log_records), extra_cov, was_mod, |
| mi_init, mi_final) |
|
|
|
|
| def _build_sem_semopy(df, constructs, hypotheses, |
| cfa_extra_cov=None, mi_threshold=3.84, max_mods=200): |
| """semopy κΈ°λ° SEM (lavaan ν΄λ°±)""" |
| try: |
| from semopy import Model, calc_stats |
| except ImportError: |
| st.error("semopy ν¨ν€μ§κ° νμν©λλ€.") |
| return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() |
|
|
| try: |
| item_to_lv = {item: lv for lv, items in constructs.items() for item in items} |
| all_items = [i for items in constructs.values() for i in items] |
| lv_names = set(constructs.keys()) |
| data = df[all_items].dropna() |
|
|
| cfa_cov_pairs = (cfa_extra_cov |
| if isinstance(cfa_extra_cov, list) and |
| (not cfa_extra_cov or isinstance(cfa_extra_cov[0], tuple)) |
| else []) |
|
|
| meas_lines = [f" {lv} =~ {' + '.join(items)}" for lv, items in constructs.items()] |
| meas_str = "\n".join(meas_lines) |
| lv_keys = list(constructs.keys()) |
| lv_cov_lines = [f" {lv_keys[i]} ~~ {lv_keys[j]}" |
| for i in range(len(lv_keys)) for j in range(i+1, len(lv_keys))] |
| meas_str = meas_str + ("\n" + "\n".join(lv_cov_lines) if lv_cov_lines else "") |
| var_str = "\n".join(f" {lv} ~~ 1*{lv}" for lv in constructs.keys()) |
|
|
| deps = defaultdict(list) |
| for s, t in hypotheses: |
| deps[t].append(s) |
| struct_str = "\n".join(f" {t} ~ {' + '.join(ss)}" for t, ss in deps.items()) |
|
|
| cov_part = ("\n" + "\n".join(f" {a} ~~ {b}" for a, b in cfa_cov_pairs) |
| if cfa_cov_pairs else "") |
|
|
| def _fit_model(model_str): |
| try: |
| m = Model(model_str); m.fit(data); return m |
| except Exception: |
| return None |
|
|
| def _extract_fit(m): |
| try: |
| sd = calc_stats(m).iloc[0].to_dict() |
| chi2 = float(sd.get("chi2", 0)) |
| dof = float(sd.get("DoF", 1)) |
| chi2_bl = float(sd.get("chi2 Baseline", 0)) |
| dof_bl = float(sd.get("DoF Baseline", 1)) |
| pval = float(sd.get("chi2 p-value", 1)) |
| cfi = float(sd.get("CFI", 0)) |
| tli = float(sd.get("TLI", 0)) |
| nfi = float(sd.get("NFI", 0)) |
| rmsea = float(sd.get("RMSEA", 1)) |
| rfi = ((chi2_bl/dof_bl - chi2/dof)/(chi2_bl/dof_bl) |
| if chi2_bl>0 and dof_bl>0 and dof>0 else 0.0) |
| ifi = ((chi2_bl - chi2)/(chi2_bl - dof) |
| if (chi2_bl - dof) > 0 else 0.0) |
| return {"ΟΒ²": round(chi2,3), "df": int(dof), |
| "ΟΒ²/df": round(chi2/dof,3) if dof>0 else "-", "p": round(pval,3), |
| "NFI": round(nfi,3), "RFI": round(rfi,3), |
| "IFI": round(ifi,3), "TLI": round(tli,3), |
| "CFI": round(cfi,3), "RMSEA": round(rmsea,3)} |
| except Exception: |
| return {} |
|
|
| def _extract_paths(m): |
| ins = m.inspect(std_est=True) |
| std_col = next((c for c in ins.columns if "std" in c.lower()), ins.columns[-1]) |
| struct = ins[ins["lval"].isin(lv_names) & ins["rval"].isin(lv_names)].copy() |
| cols = [c for c in ["lval","rval",std_col,"Std. Err","z-value","p-value"] |
| if c in struct.columns] |
| struct = struct[cols].copy() |
| rm = {"lval":"μ’
μλ³μ","rval":"λ
립λ³μ", std_col:"νμ€νΞ²", |
| "Std. Err":"SE", "z-value":"tκ°", "p-value":"pκ°_raw"} |
| struct = struct.rename(columns=rm) |
| hyp_map = {(s,t): f"H{i+1}" for i,(s,t) in enumerate(hypotheses)} |
| struct["κ°μ€"] = struct.apply( |
| lambda r: hyp_map.get((r["λ
립λ³μ"],r["μ’
μλ³μ"]),"-"), axis=1) |
| struct["κ²½λ‘"] = struct["λ
립λ³μ"] + " β " + struct["μ’
μλ³μ"] |
| def _sp(p): |
| try: return float(p) if str(p).strip() not in("-","","nan") else np.nan |
| except: return np.nan |
| struct["μ μμ±"] = struct["pκ°_raw"].apply(lambda p: sig_stars(_sp(p))) |
| struct["μ±νμ¬λΆ"] = struct["pκ°_raw"].apply( |
| lambda p: ("μ±ν" if _sp(p)<0.05 else "κΈ°κ°") if not np.isnan(_sp(p)) else "-") |
| struct["pκ°"] = struct["pκ°_raw"].apply( |
| lambda p: fmt_p(_sp(p)) if not np.isnan(_sp(p)) else "-") |
| for col in [c for c in ["νμ€νΞ²","SE","tκ°"] if c in struct.columns]: |
| struct[col] = pd.to_numeric(struct[col], errors="coerce").round(3) |
| out = ["κ°μ€","κ²½λ‘","νμ€νΞ²","SE","tκ°","pκ°","μ μμ±","μ±νμ¬λΆ"] |
| return struct[[c for c in out if c in struct.columns]].reset_index(drop=True) |
|
|
| base_str = meas_str + "\n" + var_str + cov_part + "\n" + struct_str |
| base_no_var = meas_str + cov_part + "\n" + struct_str |
| m0 = _fit_model(base_str) or _fit_model(base_no_var) |
| if m0 is None: |
| return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() |
|
|
| fit0 = _extract_fit(m0) |
| path0 = _extract_paths(m0) |
| extra_cov = list(cfa_cov_pairs) |
| added_pairs = {tuple(sorted(p)) for p in cfa_cov_pairs} |
| mod_log = [] |
| m_cur, fit_cur = m0, fit0.copy() |
|
|
| for step in range(max_mods): |
| if _adequate_check(fit_cur): break |
| mi_cur = calc_mi_approx(m_cur, data, all_items) |
| avail = mi_cur[(mi_cur["MI"] > mi_threshold) & |
| (~mi_cur.apply( |
| lambda r: tuple(sorted([r["λ³μ1"], r["λ³μ2"]])) in added_pairs, |
| axis=1))] |
| if avail.empty: break |
| row = avail.iloc[0] |
| v1, v2 = row["λ³μ1"], row["λ³μ2"] |
| pair = tuple(sorted([v1, v2])) |
| new_cov = extra_cov + [(v1, v2)] |
| new_cov_str = "\n".join(f" {a} ~~ {b}" for a, b in new_cov) |
| new_str = (meas_str + "\n" + var_str + "\n" + new_cov_str + "\n" + struct_str |
| if "\n" + var_str in base_str |
| else meas_str + "\n" + new_cov_str + "\n" + struct_str) |
| m_new = _fit_model(new_str) |
| if m_new is None: |
| added_pairs.add(pair); continue |
| fit_new = _extract_fit(m_new) |
| lv1 = item_to_lv.get(v1, v1); lv2 = item_to_lv.get(v2, v2) |
| mod_log.append({ |
| "λ¨κ³": step+1, |
| "μμ κ²½λ‘": f"{v1} ~~ {v2}", |
| "μμ μμΈ": lv1 if lv1==lv2 else f"{lv1}β{lv2}", |
| "MI": round(row["MI"],3), |
| "CFI μ βν": f"{fit_cur.get('CFI','-')}β{fit_new.get('CFI','-')}", |
| "RMSEA μ βν": f"{fit_cur.get('RMSEA','-')}β{fit_new.get('RMSEA','-')}", |
| }) |
| extra_cov = new_cov; m_cur = m_new; fit_cur = fit_new |
| added_pairs.add(pair) |
|
|
| path_final = _extract_paths(m_cur) |
| was_mod = len(mod_log) > 0 |
| init_mi_sem = calc_mi_approx(m0, data, all_items) |
| final_mi_sem = calc_mi_approx(m_cur, data, all_items) |
| return (path_final, fit0, fit_cur, |
| pd.DataFrame(mod_log), extra_cov, was_mod, |
| init_mi_sem, final_mi_sem) |
| except Exception as e: |
| st.error(f"SEM μ€λ₯: {e}") |
| return None, None, None, None, [], False, pd.DataFrame(), pd.DataFrame() |
|
|
|
|
| def run_correlation(df, constructs): |
| try: |
| comp = {lv: df[items].mean(axis=1) for lv, items in constructs.items()} |
| corr = pd.DataFrame(comp).corr().round(3) |
| sqrt_ave = {} |
| try: |
| ins, std_col, _, _ = run_cfa_sem(df, constructs) |
| if ins is not None: |
| lv_names = set(constructs.keys()) |
| all_items = set(i for v in constructs.values() for i in v) |
| use_rval = not (ins["op"] == "=~").any() |
| lv_col = "rval" if use_rval else "lval" |
| for lv, items in constructs.items(): |
| raw = ins[ins[lv_col] == lv][std_col].values |
| lam = np.array([v for v in raw |
| if str(v).strip() not in ("-","","nan","None")], dtype=float) |
| ave, _ = calc_ave_cr(lam) |
| sqrt_ave[lv] = round(float(np.sqrt(ave)), 3) |
| except Exception: |
| pass |
|
|
| tbl = corr.copy().astype(object) |
| for lv in constructs: |
| if lv in tbl.columns: |
| tbl.loc[lv, lv] = sqrt_ave.get(lv, "-") |
| for i in range(len(tbl.columns)): |
| for j in range(i + 1, len(tbl.columns)): |
| tbl.iloc[i, j] = "" |
| return tbl.reset_index().rename(columns={"index": "λ³μ"}) |
| except Exception as e: |
| st.error(f"μκ΄κ΄κ³ λΆμ μ€λ₯: {e}") |
| return pd.DataFrame() |
|
|
|
|
| |
| |
| |
|
|
| def build_single_sheet_excel(sheets: dict) -> bytes: |
| try: |
| from openpyxl import Workbook |
| from openpyxl.styles import Font, PatternFill, Alignment, Border, Side |
| import io as _io |
|
|
| wb = Workbook() |
| ws = wb.active |
| ws.title = "λΆμκ²°κ³Ό" |
|
|
| _hdr_font = Font(name="Arial", bold=True, color="FFFFFF", size=10) |
| _hdr_fill = PatternFill("solid", fgColor="2F5496") |
| _ttl_font = Font(name="Arial", bold=True, size=12, color="2F5496") |
| _note_font = Font(name="Arial", size=9, color="595959", italic=True) |
| _ctr = Alignment(horizontal="center", vertical="center", wrap_text=True) |
| _lft = Alignment(horizontal="left", vertical="center") |
| _bd = Border(left=Side(style="thin"), right=Side(style="thin"), |
| top=Side(style="thin"), bottom=Side(style="thin")) |
| _adopt_fill_y = PatternFill("solid", fgColor="E2EFDA") |
| _adopt_fill_n = PatternFill("solid", fgColor="FFC7CE") |
| _adopt_font_y = Font(name="Arial", size=10, color="375623") |
| _adopt_font_n = Font(name="Arial", size=10, color="9C0006") |
|
|
| cur_row = 1 |
| for name, payload in sheets.items(): |
| try: |
| title = payload[0] |
| df_data = payload[1] |
| note = payload[2] if len(payload) > 2 else "" |
| adopt_col = payload[3] if len(payload) > 3 else None |
| if df_data is None or len(df_data) == 0: |
| continue |
| cols = list(df_data.columns) |
| nc = max(len(cols), 1) |
| adopt_idx = (cols.index(adopt_col) + 1 |
| if adopt_col and adopt_col in cols else None) |
| ws.merge_cells(start_row=cur_row, start_column=1, |
| end_row=cur_row, end_column=nc) |
| cell = ws.cell(row=cur_row, column=1, value=title) |
| cell.font = _ttl_font |
| cur_row += 1 |
| for ci, col in enumerate(cols, 1): |
| c = ws.cell(row=cur_row, column=ci, value=str(col)) |
| c.font = _hdr_font; c.fill = _hdr_fill |
| c.alignment = _ctr; c.border = _bd |
| cur_row += 1 |
| for row in df_data.itertuples(index=False): |
| for ci, val in enumerate(row, 1): |
| safe_val = ("" if isinstance(val, float) and (val != val) else val) |
| cell = ws.cell(row=cur_row, column=ci, value=safe_val) |
| cell.alignment = _lft if ci <= 2 else _ctr |
| cell.border = _bd |
| if adopt_idx and ci == adopt_idx: |
| is_y = str(val) == "μ±ν" |
| cell.fill = _adopt_fill_y if is_y else _adopt_fill_n |
| cell.font = _adopt_font_y if is_y else _adopt_font_n |
| cur_row += 1 |
| if note: |
| nc_cell = ws.cell(row=cur_row, column=1, value=note) |
| nc_cell.font = _note_font |
| cur_row += 1 |
| cur_row += 2 |
| except Exception: |
| pass |
|
|
| for col_cells in ws.columns: |
| try: |
| w = max((len(str(c.value)) if c.value else 0) for c in col_cells) |
| ws.column_dimensions[col_cells[0].column_letter].width = min(w + 4, 45) |
| except Exception: |
| pass |
|
|
| buf = _io.BytesIO() |
| wb.save(buf) |
| return buf.getvalue() |
| except Exception: |
| from openpyxl import Workbook |
| import io as _io |
| wb = Workbook(); buf = _io.BytesIO(); wb.save(buf); return buf.getvalue() |
|
|
|
|
| |
| |
| |
| st.markdown('<p class="main-title">π SEM λΆμκΈ°</p>', unsafe_allow_html=True) |
| st.markdown('<p class="sub-title">λΉλλΆμ Β· κΈ°μ ν΅κ³ Β· μ λ’°λ Β· CFA Β· μκ΄κ΄κ³ Β· SEM</p>', |
| unsafe_allow_html=True) |
|
|
| |
| st.markdown("### π STEP 1. λ°μ΄ν° μ
λ‘λ") |
| uploaded = st.file_uploader("Excel(.xlsx/.xls) λλ CSV νμΌ", type=["xlsx","xls","csv"]) |
| if not uploaded: |
| st.info("νμΌμ μ
λ‘λνλ©΄ λΆμμ΄ μμλ©λλ€.") |
| st.stop() |
|
|
| try: |
| if uploaded.name.lower().endswith(".csv"): |
| df = pd.read_csv(uploaded) |
| else: |
| df = pd.read_excel(uploaded) |
| except Exception as e: |
| st.error(f"νμΌ μ½κΈ° μ€λ₯: {e}") |
| st.stop() |
|
|
| for c in df.columns: |
| try: |
| converted = pd.to_numeric(df[c], errors="coerce") |
| if converted.notna().sum() == df[c].notna().sum() and df[c].notna().sum() > 0: |
| df[c] = converted |
| except Exception: |
| pass |
|
|
| st.success(f"β
νμΌ λ‘λ μλ£ β {len(df)}ν Γ {len(df.columns)}μ΄") |
| with st.expander("π λ°μ΄ν° 미리보기 (μμ 10ν)"): |
| st.dataframe(df.head(10), use_container_width=True) |
|
|
| |
| st.markdown("---") |
| st.markdown("### π STEP 1-5. μ΄μμΉ νμ§ (λ§ν λΌλ
ΈλΉμ€ 거리 + SMC)") |
|
|
| num_cols_all = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] |
| tab_maha, tab_smc = st.tabs(["π λ§ν λΌλ
ΈλΉμ€ 거리", "π SMC (λ€μ€μκ΄μμΉ)"]) |
|
|
| with tab_maha: |
| st.caption("ΟΒ² λΆν¬ κΈ°μ€μΌλ‘ λ§ν λΌλ
ΈλΉμ€ κ±°λ¦¬κ° μκ³κ°μ μ΄κ³Όνλ **νλ³Έ(ν)**μ μ΄μμΉλ‘ νμ§ν©λλ€.") |
| col_a, col_b = st.columns([3, 1]) |
| outlier_cols = col_a.multiselect("λΆμ λ³μ μ ν", num_cols_all, default=num_cols_all, key="oc_cols") |
| p_thr = col_b.selectbox("μ μμμ€", [0.001, 0.005, 0.01, 0.05], index=0, key="oc_pthr") |
| if outlier_cols and st.button("π λ§ν λΌλ
ΈλΉμ€ νμ§ μ€ν", key="oc_run"): |
| oc_result, oc_cutoff, oc_idx = detect_outliers_mahalanobis(df, outlier_cols, p_threshold=p_thr) |
| st.session_state["oc_result"] = oc_result |
| st.session_state["oc_cutoff"] = oc_cutoff |
| st.session_state["oc_idx"] = oc_idx |
| st.session_state.pop("oc_remove_rows", None) |
| if "oc_result" in st.session_state and st.session_state["oc_result"] is not None: |
| oc_result = st.session_state["oc_result"] |
| oc_cutoff = st.session_state["oc_cutoff"] |
| oc_idx = st.session_state["oc_idx"] |
| n_out = len(oc_idx) |
| st.markdown(f"**ΟΒ² μκ³κ°:** `{oc_cutoff}`") |
| col_r1, col_r2, col_r3 = st.columns(3) |
| col_r1.metric("μ 체 νλ³Έ", len(df)) |
| col_r2.metric("νμ§λ μ΄μμΉ", n_out, delta=f"-{n_out}" if n_out > 0 else "μμ", delta_color="inverse") |
| col_r3.metric("μ κ±° ν νλ³Έ", len(df) - n_out) |
| disp_all = oc_result.sort_values("λ§ν λΌλ
ΈλΉμ€ 거리", ascending=False).copy() |
| disp_all["νμ "] = disp_all["μ΄μμΉ"].map({True: "β οΈ μ΄μμΉ", False: "β
μ μ"}) |
| disp_all = disp_all.drop(columns=["μ΄μμΉ"]) |
| with st.expander("π λ§ν λΌλ
ΈλΉμ€ μ 체 κ²°κ³Ό", expanded=(n_out > 0)): |
| st.dataframe(disp_all, use_container_width=True, hide_index=True) |
| if n_out > 0: |
| outlier_row_nums = (oc_result[oc_result["μ΄μμΉ"] == True]["μλ³Έ νλ²νΈ"].tolist() |
| if "μ΄μμΉ" in oc_result.columns else []) |
| _default_rows = [r for r in st.session_state.get("oc_remove_rows", outlier_row_nums) |
| if r in outlier_row_nums] |
| st.multiselect(f"ποΈ μ κ±°ν νλ³Έ μ ν (μ΄ {n_out}κ° νμ§)", options=outlier_row_nums, |
| default=_default_rows, key="oc_remove_rows") |
| else: |
| st.success("β
μ΄μμΉκ° νμ§λμ§ μμμ΅λλ€.") |
|
|
| with tab_smc: |
| st.caption("κ° λ³μλ₯Ό λλ¨Έμ§ λ³μλ‘ νκ·λΆμν RΒ²(SMC) < .20 μ΄λ©΄ λ³μ μ κ±°λ₯Ό κ³ λ €ν©λλ€.") |
| smc_cols = st.multiselect("SMC λΆμ λ³μ μ ν", num_cols_all, default=num_cols_all, key="smc_cols") |
| if smc_cols and st.button("π SMC νμ§ μ€ν", key="smc_run"): |
| with st.spinner("SMC κ³μ° μ€..."): |
| smc_result = compute_smc(df, smc_cols) |
| st.session_state["smc_result"] = smc_result |
| st.session_state.pop("smc_drop_vars", None) |
| if "smc_result" in st.session_state and st.session_state["smc_result"] is not None: |
| smc_result = st.session_state["smc_result"] |
| low_smc = smc_result[smc_result["SMC"].notna() & (smc_result["SMC"] < 0.2)] |
| n_low = len(low_smc) |
| col_s1, col_s2 = st.columns(2) |
| col_s1.metric("λΆμ λ³μ μ", len(smc_result)) |
| col_s2.metric("SMC < .20 λ³μ", n_low) |
| with st.expander("π SMC λΆμ κ²°κ³Ό", expanded=(n_low > 0)): |
| st.dataframe(smc_result.sort_values("SMC").reset_index(drop=True), |
| use_container_width=True, hide_index=True) |
| if n_low > 0: |
| low_vars = low_smc["λ³μ"].tolist() |
| _default_vars = [v for v in st.session_state.get("smc_drop_vars", low_vars) |
| if v in smc_result["λ³μ"].tolist()] |
| st.multiselect(f"ποΈ μ κ±°ν λ³μ μ ν (SMC < .20, μ΄ {n_low}κ°)", |
| options=smc_result["λ³μ"].tolist(), |
| default=_default_vars, key="smc_drop_vars") |
| else: |
| st.success("β
SMC < .20 λ³μκ° μμ΅λλ€.") |
|
|
| |
| _removed_rows = st.session_state.get("oc_remove_rows", []) |
| _dropped_vars = st.session_state.get("smc_drop_vars", []) |
| if _removed_rows: |
| _remove_idx = [int(r) - 1 for r in _removed_rows] |
| _valid_idx = [i for i in _remove_idx if i in df.index] |
| if _valid_idx: |
| df = df.drop(index=_valid_idx).reset_index(drop=True) |
| st.info(f"ποΈ μ΄μμΉ **{len(_valid_idx)}κ° νλ³Έ** μ κ±° β λΆμ νλ³Έ: **{len(df)}κ°**") |
| if _dropped_vars: |
| _actual_drop = [v for v in _dropped_vars if v in df.columns] |
| if _actual_drop: |
| df = df.drop(columns=_actual_drop) |
| st.info(f"ποΈ SMC λ³μ **{len(_actual_drop)}κ°** μ κ±°: `{', '.join(_actual_drop)}`") |
|
|
| |
| st.markdown("---") |
| st.markdown("### π STEP 2. ꡬμ±κ°λ
μλ νμ§") |
| constructs_auto = auto_detect_constructs(df) |
| c1, c2, c3 = st.columns(3) |
| c1.metric("μ 체 λ³μ", len(df.columns)) |
| c2.metric("μ ν¨ μ¬λ‘", len(df.dropna())) |
| c3.metric("νμ§λ ꡬμ±κ°λ
", len(constructs_auto)) |
|
|
| if constructs_auto: |
| for lv, items in constructs_auto.items(): |
| st.markdown(f"- **{lv}** ({len(items)}λ¬Έν): {', '.join(items)}") |
| else: |
| st.warning("μλ νμ§λ ꡬμ±κ°λ
μ΄ μμ΅λλ€. μλμμ μ§μ μ€μ ν΄ μ£ΌμΈμ.") |
|
|
| with st.expander("βοΈ κ΅¬μ±κ°λ
μ§μ νΈμ§ (νμ μ)"): |
| n_c = st.number_input("ꡬμ±κ°λ
μ", min_value=1, max_value=20, |
| value=max(len(constructs_auto), 1), key="n_constructs") |
| auto_k = list(constructs_auto.keys()) |
| auto_v = list(constructs_auto.values()) |
| constructs_edit = {} |
| for i in range(int(n_c)): |
| ca, cb = st.columns([1, 3]) |
| default_name = auto_k[i] if i < len(auto_k) else f"LV{i+1}" |
| default_items = auto_v[i] if i < len(auto_v) else [] |
| nm = ca.text_input(f"μ΄λ¦ {i+1}", value=default_name, key=f"cn_{i}") |
| it = cb.multiselect(f"λ¬Έν {i+1}", df.columns.tolist(), default=default_items, key=f"ci_{i}") |
| if nm and len(it) >= 2: |
| constructs_edit[nm] = it |
| constructs = constructs_edit if constructs_edit else constructs_auto |
|
|
| if not constructs: |
| constructs = constructs_auto |
|
|
| |
| st.markdown("---") |
| st.markdown("### β
STEP 3. λΆμ λ°©λ² μ ν") |
|
|
| num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] |
| cat_cols = [c for c in df.columns if detect_scale(df[c]) in ("binary","categorical")] |
| lv_list = list(constructs.keys()) |
| n_valid = len(df.dropna()) |
|
|
| suggestions = { |
| "λΉλλΆμ": {"ok": len(cat_cols) > 0, |
| "reason": f"λ²μ£Όν λ³μ {len(cat_cols)}κ° κ°μ§"}, |
| "κΈ°μ ν΅κ³": {"ok": len(num_cols) > 0, |
| "reason": f"μμΉν λ³μ {len(num_cols)}κ°"}, |
| "μ λ’°λ (Cronbach's Ξ±)": {"ok": len(constructs) > 0, |
| "reason": f"ꡬμ±κ°λ
{len(constructs)}κ° νμ§"}, |
| "νμΈμ μμΈλΆμ (CFA)": {"ok": len(constructs) >= 2 and n_valid >= 100, |
| "reason": f"ꡬμ±κ°λ
{len(constructs)}κ°, N={n_valid}"}, |
| "μκ΄κ΄κ³ λΆμ": {"ok": len(constructs) >= 2, |
| "reason": f"μ μ¬λ³μ {len(constructs)}κ°"}, |
| "ꡬ쑰방μ μ (SEM)": {"ok": len(constructs) >= 3 and n_valid >= 200, |
| "reason": f"ꡬμ±κ°λ
{len(constructs)}κ°, N={n_valid}"}, |
| } |
|
|
| selected = {} |
| for method, info in suggestions.items(): |
| icon = "β
" if info["ok"] else "β οΈ" |
| col_a, col_b = st.columns([1, 10]) |
| selected[method] = col_a.checkbox(f"{icon}", value=info["ok"], key=f"sel_{method}") |
| col_b.markdown( |
| f'<div class="card"><b>{method}</b>' |
| f' <span style="color:#888;font-size:.83rem">{info["reason"]}</span></div>', |
| unsafe_allow_html=True) |
|
|
| |
| hypotheses = [] |
| if selected.get("ꡬ쑰방μ μ (SEM)") and len(lv_list) >= 2: |
| st.markdown("---\n#### π SEM κ°μ€ μ€μ ") |
| if "hyps" not in st.session_state: |
| st.session_state.hyps = [("", "")] |
| if st.button("οΌ κ°μ€ μΆκ°"): |
| st.session_state.hyps.append(("", "")) |
| to_delete = None |
| for i, (sd, td) in enumerate(st.session_state.hyps): |
| ca, cb, cc = st.columns([3, 3, 1]) |
| s = ca.selectbox(f"H{i+1} λ
립λ³μ", lv_list, |
| index=lv_list.index(sd) if sd in lv_list else 0, key=f"hs_{i}") |
| t = cb.selectbox(f"H{i+1} μ’
μλ³μ", lv_list, |
| index=lv_list.index(td) if td in lv_list else 0, key=f"ht_{i}") |
| if cc.button("μμ ", key=f"hd_{i}"): |
| to_delete = i |
| if s != t: |
| hypotheses.append((s, t)) |
| st.session_state.hyps[i] = (s, t) |
| if to_delete is not None and len(st.session_state.hyps) > 1: |
| st.session_state.hyps.pop(to_delete) |
| st.rerun() |
|
|
| |
| st.markdown("---") |
| _run_btn = st.button("π λΆμ μ€ν", type="primary", use_container_width=True) |
|
|
| if _run_btn: |
| for _k in ["_sem_applied", "_sem_init", "_sem_manual_pairs", |
| "_sem_df", "_sem_constructs", "_sem_hypotheses", "_sem_cfa_cov", |
| "_cfa_applied", "_cfa_init", "_cfa_df", "_cfa_constructs", |
| "_cfa_manual_pairs"]: |
| st.session_state.pop(_k, None) |
|
|
| if not _run_btn and "_tab_data" not in st.session_state: |
| st.stop() |
|
|
| if not _run_btn and "_tab_data" in st.session_state: |
| tab_data = st.session_state["_tab_data"] |
| tab_labels = st.session_state["_tab_labels"] |
| xlsx_sheets = st.session_state.get("_xlsx_sheets", {}) |
| if "_cfa_applied" in st.session_state and "CFA" in tab_data: |
| tab_data["CFA"] = st.session_state["_cfa_applied"] |
| _lds, _rd, _fi, _fm, _im, _fmi, _ml, _wm, _ec = tab_data["CFA"] |
| if _lds is not None: |
| xlsx_sheets["ν2_CFA"] = ("CFA β μμΈλΆνλ", _lds, "β» νμ€νκ³μβ₯.50, AVEβ₯.50, CRβ₯.70") |
| xlsx_sheets["ν2_μ λ’°λ"] = ("CFA β AVE/CR/Ξ±", _rd, "") |
| if _wm and not _ml.empty: |
| xlsx_sheets["CFA_μμ κ³Όμ "] = ("CFA β MI κΈ°λ° μμ κ³Όμ ", _ml, "β» MI>3.84 κΈ°μ€") |
| if "_sem_applied" in st.session_state and "SEM" in tab_data: |
| tab_data["SEM"] = st.session_state["_sem_applied"] |
| _sp, _fi, _fm, _ml, _ec, _wm, _im, _fm2 = tab_data["SEM"] |
| if _sp is not None: |
| xlsx_sheets["ν4_κ°μ€κ²μ¦"] = ( |
| "SEM κ°μ€ κ²μ¦", _sp, |
| "β» ***p<.001 **p<.01 *p<.05 n.s.=μ μνμ§μμ", "μ±νμ¬λΆ") |
| if _wm and not _ml.empty: |
| xlsx_sheets["SEM_μμ κ³Όμ "] = ("SEM β MI κΈ°λ° μμ κ³Όμ ", _ml, "β» MI>3.84 κΈ°μ€") |
| else: |
| xlsx_sheets = {} |
| tab_labels = [] |
| tab_data = {} |
| cfa_extra_cov = [] |
|
|
| |
| if selected.get("λΉλλΆμ") and cat_cols: |
| try: |
| freq_res = {} |
| for c in cat_cols[:10]: |
| f = df[c].value_counts().sort_index() |
| pct = (f / f.sum() * 100).round(1) |
| fdf = pd.DataFrame({"λΉλ": f, "λ°±λΆμ¨(%)": pct, "λμ (%)": pct.cumsum().round(1)}) |
| fdf.index.name = c |
| freq_res[c] = fdf |
| tab_data["λΉλλΆμ"] = freq_res |
| tab_labels.append("λΉλλΆμ") |
| for cn, fdf in freq_res.items(): |
| xlsx_sheets[f"λΉλ_{cn}"[:31]] = (f"λΉλλΆμ β {cn}", fdf.reset_index(), "") |
| except Exception as e: |
| st.warning(f"λΉλλΆμ μ€λ₯: {e}") |
|
|
| |
| if selected.get("κΈ°μ ν΅κ³") and num_cols: |
| try: |
| desc = df[num_cols].describe().T[["count","mean","std","min","max"]].copy() |
| desc.columns = ["N","νκ· ","νμ€νΈμ°¨","μ΅μκ°","μ΅λκ°"] |
| desc["μλ"] = df[num_cols].skew() |
| desc["첨λ"] = df[num_cols].kurt() |
| desc = desc.round(3) |
| tab_data["κΈ°μ ν΅κ³"] = desc |
| tab_labels.append("κΈ°μ ν΅κ³") |
| xlsx_sheets["κΈ°μ ν΅κ³"] = ("κΈ°μ ν΅κ³", desc.reset_index().rename(columns={"index":"λ³μ"}), "") |
| except Exception as e: |
| st.warning(f"κΈ°μ ν΅κ³ μ€λ₯: {e}") |
|
|
| |
| if selected.get("μ λ’°λ (Cronbach's Ξ±)") and constructs: |
| try: |
| rel_rows = [] |
| for lv, items in constructs.items(): |
| alpha = cronbach_alpha(df[items]) |
| rel_rows.append({"ꡬμ±κ°λ
": lv, "λ¬Έν μ": len(items), |
| "Cronbach Ξ±": alpha, |
| "νμ ": "μΆ©μ‘± β" if (not np.isnan(alpha) and alpha >= 0.7) else "λ―ΈμΆ©μ‘± β"}) |
| rel_df = pd.DataFrame(rel_rows) |
| tab_data["μ λ’°λ"] = rel_df |
| tab_labels.append("μ λ’°λ") |
| xlsx_sheets["μ λ’°λλΆμ"] = ("μ λ’°λ λΆμ (Cronbach's Ξ±)", rel_df, "β» Ξ± β₯ .70 κΆμ₯") |
| except Exception as e: |
| st.warning(f"μ λ’°λ λΆμ μ€λ₯: {e}") |
|
|
| |
| if selected.get("νμΈμ μμΈλΆμ (CFA)") and constructs: |
| with st.spinner("CFA μ΄κΈ° λΆμ μ€..."): |
| try: |
| loads, rel_df, fit_init, fit_mod, init_mi, final_mi, mod_log, was_mod, extra_cov = \ |
| build_cfa_tables(df, constructs, max_mods=0) |
| if loads is not None: |
| st.session_state["_cfa_init"] = (loads, rel_df, fit_init, fit_mod, |
| init_mi, final_mi, mod_log, was_mod, extra_cov) |
| st.session_state["_cfa_df"] = df |
| st.session_state["_cfa_constructs"] = constructs |
| _cfa_display = st.session_state.get("_cfa_applied", st.session_state["_cfa_init"]) |
| tab_data["CFA"] = _cfa_display |
| tab_labels.append("CFA") |
| _lds, _rd, _fi, _fm, _im, _fmi, _ml, _wm, _ec = _cfa_display |
| cfa_extra_cov = list(_ec) if _ec else [] |
| if _lds is not None: |
| xlsx_sheets["ν2_CFA"] = ("CFA β μμΈλΆνλ", _lds, |
| "β» νμ€νκ³μβ₯.50, AVEβ₯.50, CRβ₯.70") |
| xlsx_sheets["ν2_μ λ’°λ"] = ("CFA β AVE/CR/Ξ±", _rd, "") |
| if _wm and not _ml.empty: |
| xlsx_sheets["CFA_μμ κ³Όμ "] = ("CFA β MI κΈ°λ° μμ κ³Όμ ", _ml, "β» MI>3.84 κΈ°μ€") |
| if not _im.empty: |
| xlsx_sheets["CFA_MIμ΄κΈ°"] = ("CFA β μ΄κΈ° μμ μ§μ(MI)", _im.head(20), "") |
| if not _fmi.empty: |
| xlsx_sheets["CFA_MIμ΅μ’
"] = ("CFA β μ΅μ’
μμ μ§μ(MI)", _fmi.head(20), "") |
| else: |
| st.warning("β οΈ CFA κ²°κ³Ό μμ± μ€ν¨.") |
| except Exception as e: |
| st.warning(f"CFA μ€λ₯: {e}") |
|
|
| |
| if selected.get("μκ΄κ΄κ³ λΆμ") and constructs: |
| try: |
| corr_tbl = run_correlation(df, constructs) |
| if not corr_tbl.empty: |
| tab_data["μκ΄κ΄κ³"] = corr_tbl |
| tab_labels.append("μκ΄κ΄κ³") |
| xlsx_sheets["ν3_μκ΄κ΄κ³"] = ( |
| "μκ΄κ΄κ³ λΆμ (λκ°μ =βAVE)", corr_tbl, |
| "β» λκ°μ : βAVE | νλ³νλΉλ κΈ°μ€: βAVE > ν λ³μ μκ΄κ³μ") |
| except Exception as e: |
| st.warning(f"μκ΄κ΄κ³ μ€λ₯: {e}") |
|
|
| |
| if selected.get("ꡬ쑰방μ μ (SEM)") and constructs and hypotheses: |
| with st.spinner("SEM μ΄κΈ° λΆμ μ€..."): |
| try: |
| sem_paths, fit_init_sem, fit_mod_sem, mod_log_sem, extra_cov_sem, was_mod_sem, \ |
| init_mi_sem, final_mi_sem = \ |
| build_lavaan_sem_table(df, constructs, hypotheses, |
| cfa_extra_cov=cfa_extra_cov, max_mods=0) |
| if sem_paths is not None: |
| st.session_state["_sem_init"] = ( |
| sem_paths, fit_init_sem, fit_mod_sem, |
| mod_log_sem, extra_cov_sem, was_mod_sem, |
| init_mi_sem, final_mi_sem) |
| st.session_state["_sem_df"] = df |
| st.session_state["_sem_constructs"] = constructs |
| st.session_state["_sem_hypotheses"] = hypotheses |
| st.session_state["_sem_cfa_cov"] = cfa_extra_cov |
| _sem_display = st.session_state.get("_sem_applied", st.session_state["_sem_init"]) |
| tab_data["SEM"] = _sem_display |
| tab_labels.append("SEM") |
| _sp, _fi, _fm, _ml, _ec, _wm, _im, _fm2 = _sem_display |
| if _sp is not None: |
| xlsx_sheets["ν4_κ°μ€κ²μ¦"] = ( |
| "SEM κ°μ€ κ²μ¦", _sp, |
| "β» ***p<.001 **p<.01 *p<.05 n.s.=μ μνμ§μμ", "μ±νμ¬λΆ") |
| if _wm and not _ml.empty: |
| xlsx_sheets["SEM_μμ κ³Όμ "] = ( |
| "SEM β MI κΈ°λ° μμ κ³Όμ ", _ml, "β» MI>3.84 κΈ°μ€") |
| else: |
| st.warning("β οΈ SEM κ²°κ³Όλ₯Ό μμ±νμ§ λͺ»νμ΅λλ€.") |
| except Exception as e: |
| st.warning(f"SEM μ€λ₯: {e}") |
|
|
| st.session_state["_tab_data"] = tab_data |
| st.session_state["_tab_labels"] = tab_labels |
| st.session_state["_xlsx_sheets"] = xlsx_sheets |
|
|
| |
| st.markdown("---") |
| st.markdown("## π λΆμ κ²°κ³Ό") |
|
|
| if not tab_labels: |
| st.warning("μ€νλ λΆμμ΄ μμ΅λλ€.") |
| st.stop() |
|
|
| FIT_NOTE = "κΆμ₯ κΈ°μ€: NFIΒ·RFIΒ·IFIΒ·TLIΒ·CFI β₯ .90 | RMSEA < .049 | SRMR β€ .08" |
| tabs = st.tabs(tab_labels) |
|
|
| for tab, lbl in zip(tabs, tab_labels): |
| with tab: |
| c = tab_data[lbl] |
|
|
| if lbl == "λΉλλΆμ": |
| for col, fdf in c.items(): |
| st.markdown(f"**{col}**") |
| st.dataframe(fdf, use_container_width=True) |
|
|
| elif lbl == "κΈ°μ ν΅κ³": |
| st.dataframe(c, use_container_width=True) |
|
|
| elif lbl == "μ λ’°λ": |
| st.dataframe(c, use_container_width=True) |
| st.caption("Ξ± β₯ .70: λ΄μ μΌκ΄μ± ν보") |
|
|
| elif lbl == "CFA": |
| loads, rel_df, fit_init, fit_mod, init_mi, final_mi, mod_log, was_mod, extra_cov = c |
|
|
| crit_df = pd.DataFrame([ |
| {"μ§μ": "NFI", "κΆμ₯κΈ°μ€": "β₯ .90"}, |
| {"μ§μ": "RFI", "κΆμ₯κΈ°μ€": "β₯ .90"}, |
| {"μ§μ": "IFI", "κΆμ₯κΈ°μ€": "β₯ .90"}, |
| {"μ§μ": "TLI", "κΆμ₯κΈ°μ€": "β₯ .90"}, |
| {"μ§μ": "CFI", "κΆμ₯κΈ°μ€": "β₯ .90"}, |
| {"μ§μ": "RMSEA", "κΆμ₯κΈ°μ€": "< .049"}, |
| {"μ§μ": "SRMR", "κΆμ₯κΈ°μ€": "β€ .08"}, |
| ]) |
| with st.expander("π μ ν©λ κΆμ₯ κΈ°μ€"): |
| st.dataframe(crit_df, use_container_width=True, hide_index=True) |
|
|
| st.markdown("**λͺ¨λΈ μ ν©λ λΉκ΅**") |
| fit_col_order = ["ΟΒ²", "df", "ΟΒ²/df", "p", "NFI", "RFI", "IFI", "TLI", "CFI", "RMSEA"] |
|
|
| def _fit_row(label, fit): |
| row = {"λͺ¨ν": label} |
| for k in fit_col_order: |
| row[k] = fit.get(k, "-") |
| return row |
|
|
| if was_mod: |
| fit_cmp = pd.DataFrame([_fit_row("μ΄κΈ° λͺ¨ν", fit_init), _fit_row("μμ λͺ¨ν", fit_mod)]) |
| else: |
| fit_cmp = pd.DataFrame([_fit_row("μ΄κΈ° λͺ¨ν", fit_init)]) |
|
|
| def _hl_fit(row): |
| colors = [""] * len(row) |
| checks = {"NFI":.90,"RFI":.90,"IFI":.90,"TLI":.90,"CFI":.90} |
| cols = list(row.index) |
| for idx, col in enumerate(cols): |
| v = row[col] |
| try: |
| fv = float(v) |
| if col in checks and fv < checks[col]: |
| colors[idx] = "background-color:#FFE0E0" |
| elif col == "RMSEA" and fv >= 0.049: |
| colors[idx] = "background-color:#FFE0E0" |
| elif col in checks and fv >= checks[col]: |
| colors[idx] = "background-color:#E8F5E9" |
| elif col == "RMSEA" and fv < 0.049: |
| colors[idx] = "background-color:#E8F5E9" |
| except Exception: |
| pass |
| return colors |
|
|
| st.dataframe(fit_cmp.style.apply(_hl_fit, axis=1), use_container_width=True) |
| st.caption("π’ κΈ°μ€ μΆ©μ‘± | π΄ κΈ°μ€ λ―ΈμΆ©μ‘± | RMSEA < .049 κΆμ₯") |
|
|
| if was_mod and extra_cov: |
| st.success(f"β
MI κΈ°λ° λͺ¨νμμ {len(extra_cov)}ν μ μ©") |
| paths_str = " / ".join(f"{a} ~~ {b}" for a, b in extra_cov) |
| st.markdown(f"**μμ λ κ²½λ‘:** `{paths_str}`") |
| with st.expander("π μμ κ³Όμ λ¨κ³λ³ μμΈ"): |
| st.dataframe(mod_log, use_container_width=True) |
| elif not was_mod: |
| all_ok = _adequate_check(fit_init) |
| if all_ok: |
| st.success("β
μ΄κΈ° λͺ¨ν μ ν©λ μνΈ β μμ λΆνμ") |
| else: |
| st.info("βΉοΈ μ ν©λ λ―ΈμΆ©μ‘±μ΄λ MI > 3.84 μμ΄ μμ΄ λ μ΄μ μμ λΆκ°") |
|
|
| |
| _cfa_init_mi = st.session_state.get("_cfa_init", (None,)*9)[4] |
| if _cfa_init_mi is not None and not _cfa_init_mi.empty: |
| _cfa_mi_over = _cfa_init_mi[_cfa_init_mi["MI"] > 3.84] |
| if not _cfa_mi_over.empty: |
| st.markdown("---") |
| st.markdown("#### π οΈ μ차곡λΆμ° μλ μΆκ°") |
| st.caption("MI > 3.84 μ μ€ μ μ©ν νλͺ©μ μ νν λ€ **μ¬μ€ν** λ²νΌμ λλ¬μ£ΌμΈμ.") |
| _cfa_opts = [f"{row['λ³μ1']} ~~ {row['λ³μ2']} (MI = {row['MI']})" |
| for _, row in _cfa_mi_over.iterrows()] |
| _cfa_cur_pairs = st.session_state.get("_cfa_manual_pairs", []) |
| _cfa_cur_opts = [o for o in _cfa_opts |
| if any(f"{v1} ~~ {v2}" in o for v1, v2 in _cfa_cur_pairs)] |
| _cfa_sel = st.multiselect("μ μ©ν μ차곡λΆμ° κ²½λ‘ μ ν", |
| options=_cfa_opts, default=_cfa_cur_opts, |
| key="cfa_mi_multisel") |
| if st.button("βΆοΈ μ ν κ²½λ‘ μ μ©νμ¬ CFA μ¬μ€ν", key="cfa_rerun_btn"): |
| _cfa_new_pairs = [] |
| for s in _cfa_sel: |
| part = s.split(" (MI")[0].strip() |
| v1, v2 = [x.strip() for x in part.split("~~")] |
| _cfa_new_pairs.append((v1, v2)) |
| st.session_state["_cfa_manual_pairs"] = _cfa_new_pairs |
| with st.spinner("CFA μ¬λΆμ μ€..."): |
| try: |
| _cr = build_cfa_tables( |
| st.session_state["_cfa_df"], |
| st.session_state["_cfa_constructs"], |
| max_mods=0, manual_extra_cov=_cfa_new_pairs) |
| if _cr[0] is not None: |
| st.session_state["_cfa_applied"] = _cr |
| st.rerun() |
| except Exception as _ce: |
| st.warning(f"CFA μ¬μ€ν μ€λ₯: {_ce}") |
|
|
| col_mi1, col_mi2 = st.columns(2) |
| with col_mi1: |
| if init_mi is not None and not init_mi.empty: |
| with st.expander("π μ΄κΈ° μμ μ§μ(MI) μμ 20κ°"): |
| d = init_mi.head(20).copy() |
| d["νμ "] = d["MI"].apply(lambda v: "β οΈ μμ κ³ λ €" if v > 3.84 else "") |
| st.dataframe(d, use_container_width=True) |
| with col_mi2: |
| if final_mi is not None and not final_mi.empty: |
| with st.expander("π μ΅μ’
μμ μ§μ(MI) μμ 20κ°"): |
| d = final_mi.head(20).copy() |
| d["νμ "] = d["MI"].apply(lambda v: "β οΈ μΆκ° μμ κ°λ₯" if v > 3.84 else "β
") |
| st.dataframe(d, use_container_width=True) |
|
|
| st.markdown("---") |
| st.markdown("**CFA κ²°κ³Ό** (μμ λͺ¨ν κΈ°μ€)") |
| try: |
| merged = loads.copy() |
| merged["AVE"] = ""; merged["CR"] = ""; merged["Cronbach Ξ±"] = "" |
| rel_map = rel_df.set_index("μ μ¬λ³μ") |
| seen = set() |
| for idx, row in merged.iterrows(): |
| lv = row["μ μ¬λ³μ"] |
| if lv not in seen: |
| seen.add(lv) |
| if lv in rel_map.index: |
| merged.at[idx, "AVE"] = rel_map.loc[lv, "AVE"] |
| merged.at[idx, "CR"] = rel_map.loc[lv, "CR"] |
| merged.at[idx, "Cronbach Ξ±"] = rel_map.loc[lv, "Cronbach Ξ±"] |
| disp_cols = ["μ μ¬λ³μ", "μΈ‘μ λ³μ", "νμ€νκ³μ", "SE", "tκ°", "pκ°", "AVE", "CR", "Cronbach Ξ±"] |
| merged = merged[[c for c in disp_cols if c in merged.columns]] |
| st.dataframe(merged, use_container_width=True, hide_index=True) |
| except Exception: |
| st.dataframe(loads, use_container_width=True) |
| st.dataframe(rel_df, use_container_width=True) |
| st.caption("κΆμ₯: νμ€νκ³μ β₯ .50 | AVE β₯ .50 | CR β₯ .70 | Cronbach Ξ± β₯ .70") |
|
|
| elif lbl == "μκ΄κ΄κ³": |
| st.dataframe(c, use_container_width=True) |
| st.caption("λκ°μ = βAVE | νμΌκ° = μ μ¬λ³μ κ° μκ΄κ³μ") |
|
|
| elif lbl == "SEM": |
| paths, fit_init, fit_mod, mod_log, extra_cov, was_mod, init_mi_sem, final_mi_sem = c |
| fit_col_order = ["ΟΒ²","df","ΟΒ²/df","p","NFI","RFI","IFI","TLI","CFI","RMSEA"] |
|
|
| def _hl_sem(row): |
| checks = {"NFI":.90,"RFI":.90,"IFI":.90,"TLI":.90,"CFI":.90} |
| colors = [] |
| for col in row.index: |
| try: |
| fv = float(row[col]) |
| if col in checks: |
| colors.append("background-color:#E8F5E9" if fv>=checks[col] |
| else "background-color:#FFE0E0") |
| elif col == "RMSEA": |
| colors.append("background-color:#E8F5E9" if fv<0.049 |
| else "background-color:#FFE0E0") |
| else: |
| colors.append("") |
| except Exception: |
| colors.append("") |
| return colors |
|
|
| st.markdown("**λͺ¨λΈ μ ν©λ**") |
| if was_mod: |
| fit_cmp = pd.DataFrame([ |
| {"λͺ¨ν":"μ΄κΈ° λͺ¨ν", **{k: fit_init.get(k,"-") for k in fit_col_order}}, |
| {"λͺ¨ν":"μμ λͺ¨ν", **{k: fit_mod.get(k,"-") for k in fit_col_order}}, |
| ]) |
| else: |
| fit_cmp = pd.DataFrame([ |
| {"λͺ¨ν":"μ΄κΈ° λͺ¨ν", **{k: fit_init.get(k,"-") for k in fit_col_order}} |
| ]) |
| st.dataframe(fit_cmp.style.apply(_hl_sem, axis=1), use_container_width=True) |
| st.caption(FIT_NOTE) |
|
|
| if was_mod and extra_cov: |
| st.success(f"β
MI κΈ°λ° μμ {len(mod_log)}ν μ μ©") |
| paths_str = " / ".join(f"{a} ~~ {b}" for a, b in extra_cov if isinstance(a, str)) |
| st.markdown(f"**μμ λ κ²½λ‘:** `{paths_str}`") |
| with st.expander("π μμ κ³Όμ μμΈ"): |
| st.dataframe(mod_log, use_container_width=True) |
| elif not was_mod: |
| if _adequate_check(fit_init): |
| st.success("β
μ΄κΈ° λͺ¨ν μ ν©λ μνΈ β μμ λΆνμ") |
| else: |
| st.info("βΉοΈ μ ν©λ λ―ΈμΆ©μ‘±μ΄λ MI > 3.84 μμ΄ μμ΄ λ μ΄μ μμ λΆκ°") |
|
|
| |
| _init_mi_for_sel = st.session_state.get("_sem_init", (None,)*8)[6] |
| if _init_mi_for_sel is not None and not _init_mi_for_sel.empty: |
| mi_over = _init_mi_for_sel[_init_mi_for_sel["MI"] > 3.84] |
| if not mi_over.empty: |
| st.markdown("---") |
| st.markdown("#### π οΈ μ차곡λΆμ° μλ μΆκ°") |
| st.caption("MI > 3.84 μ μ€ μ μ©ν νλͺ©μ μ νν λ€ **μ¬μ€ν** λ²νΌμ λλ¬μ£ΌμΈμ.") |
| _opts = [f"{row['λ³μ1']} ~~ {row['λ³μ2']} (MI = {row['MI']})" |
| for _, row in mi_over.iterrows()] |
| _cur_pairs = st.session_state.get("_sem_manual_pairs", []) |
| _cur_opts = [o for o in _opts |
| if any(f"{v1} ~~ {v2}" in o for v1, v2 in _cur_pairs)] |
| _sel = st.multiselect("μ μ©ν μ차곡λΆμ° κ²½λ‘ μ ν", |
| options=_opts, default=_cur_opts, |
| key="sem_mi_multisel") |
| if st.button("βΆοΈ μ ν κ²½λ‘ μ μ©νμ¬ SEM μ¬μ€ν", key="sem_rerun_btn"): |
| _new_pairs = [] |
| for s in _sel: |
| part = s.split(" (MI")[0].strip() |
| v1, v2 = [x.strip() for x in part.split("~~")] |
| _new_pairs.append((v1, v2)) |
| st.session_state["_sem_manual_pairs"] = _new_pairs |
| _cfa_cov = st.session_state.get("_sem_cfa_cov", []) |
| _all_covs = list(_cfa_cov) + _new_pairs |
| with st.spinner("SEM μ¬λΆμ μ€..."): |
| try: |
| _r = build_lavaan_sem_table( |
| st.session_state["_sem_df"], |
| st.session_state["_sem_constructs"], |
| st.session_state["_sem_hypotheses"], |
| cfa_extra_cov=_all_covs, max_mods=0) |
| if _r[0] is not None: |
| _r2 = list(_r) |
| _r2[4] = _all_covs |
| _r2[5] = len(_new_pairs) > 0 |
| st.session_state["_sem_applied"] = tuple(_r2) |
| st.rerun() |
| except Exception as _e: |
| st.warning(f"SEM μ¬μ€ν μ€λ₯: {_e}") |
|
|
| col_mi1, col_mi2 = st.columns(2) |
| with col_mi1: |
| if init_mi_sem is not None and not init_mi_sem.empty: |
| with st.expander("π μ΄κΈ° μμ μ§μ(MI) μμ 20κ°"): |
| d = init_mi_sem.head(20).copy() |
| d["νμ "] = d["MI"].apply(lambda v: "β οΈ μμ κ³ λ €" if v > 3.84 else "") |
| st.dataframe(d, use_container_width=True) |
| with col_mi2: |
| if final_mi_sem is not None and not final_mi_sem.empty: |
| with st.expander("π μ΅μ’
μμ μ§μ(MI) μμ 20κ°"): |
| d = final_mi_sem.head(20).copy() |
| d["νμ "] = d["MI"].apply(lambda v: "β οΈ μΆκ° μμ κ°λ₯" if v > 3.84 else "β
") |
| st.dataframe(d, use_container_width=True) |
|
|
| st.markdown("---") |
| st.markdown("**κ°μ€ κ²μ¦ κ²°κ³Ό** (μμ λͺ¨ν κΈ°μ€)") |
| def hl(row): |
| color = "#E8F5E9" if row.get("μ±νμ¬λΆ") == "μ±ν" else "#FFF3F3" |
| return [f"background-color:{color}"] * len(row) |
| st.dataframe(paths.style.apply(hl, axis=1), use_container_width=True) |
| n_adopt = (paths["μ±νμ¬λΆ"] == "μ±ν").sum() |
| st.markdown(f"**μ±ν: {n_adopt}κ° / κΈ°κ°: {len(paths)-n_adopt}κ° / μ 체: {len(paths)}κ°**") |
|
|
| |
| st.markdown("---") |
| xlsx_sheets = st.session_state.get("_xlsx_sheets", xlsx_sheets) |
| if xlsx_sheets: |
| try: |
| _ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
| _fname = f"SEM_RESULT_{_ts}.xlsx" |
| xlsx_bytes = build_single_sheet_excel(xlsx_sheets) |
| st.download_button( |
| label=f"π₯ {_fname} λ€μ΄λ‘λ (λͺ¨λ κ²°κ³Ό ν μνΈ)", |
| data=xlsx_bytes, |
| file_name=_fname, |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
| use_container_width=True, |
| type="primary" |
| ) |
| st.caption("β» λͺ¨λ λΆμ κ²°κ³Όκ° 'λΆμκ²°κ³Ό' μνΈ νλμ μΉμ
λ³λ‘ ν΅ν©λμ΄ μ μ₯λ©λλ€.") |
| except Exception as e: |
| st.error(f"Excel μμ± μ€λ₯: {e}") |
|
|