# core/variable_loader.py import os import glob import json import time import logging logger = logging.getLogger(__name__) try: import pandas as pd except Exception: pd = None # cache path (temporary) CACHE_PATH = "/tmp/ct_var_cache.json" CACHE_TTL_SECONDS = 60 * 60 # 1 hour; adjust as needed # candidate filenames / patterns to detect relevant excel files DEFAULT_PATTERNS = [ "*SDTM*.xls*", "*SDTMIG*.xls*", "*SDTM_*.xls*", "SDTM*.xls*", "*ADaM*.xls*", "*ADaMIG*.xls*", "ADaM*.xls*", "*CDASH*.xls*", "*CDASHIG*.xls*", "CDASH*.xls*" ] # Typical column name candidates VAR_COL_CANDIDATES = [ "variable", "variable name", "varname", "var", "column", "fieldname" ] LABEL_COL_CANDIDATES = [ "label", "variable label", "var label", "column label" ] DESC_COL_CANDIDATES = [ "description", "definition", "long name", "comments", "notes" ] ROLE_COL_CANDIDATES = [ "role", "type", "datatype", "origin" ] def _first_existing(columns, candidates): if not columns: return None low = {c.strip().lower(): c for c in columns} for cand in candidates: for k, orig in low.items(): if cand == k or cand in k: return orig return None def _discover_files(search_paths=None, patterns=None): patterns = patterns or DEFAULT_PATTERNS search_paths = search_paths or [ ".", "/workspace/data", "/mnt/data", os.getcwd(), "/root/.cache/huggingface/hub", "/home/user/.cache/huggingface/hub", "/root/.cache/huggingface/hub/datasets--essprasad--CT-Chat-Docs", "/home/user/.cache/huggingface/hub/datasets--essprasad--CT-Chat-Docs", ] found = [] for base in search_paths: if not base or not os.path.exists(base): continue for pat in patterns: try: matches = glob.glob(os.path.join(base, pat), recursive=True) for m in matches: if os.path.isfile(m) and m.lower().endswith((".xls", ".xlsx")): found.append(os.path.abspath(m)) except Exception: continue # dedupe but keep order seen = set() unique = [] for p in found: if p not in seen: seen.add(p) unique.append(p) return unique def _extract_from_df(df, filename): """ Given a dataframe, find likely variable/label/description columns and extract rows. Returns list of dicts. """ out = [] if df is None or df.shape[0] == 0: return out cols = list(df.columns) term_col = _first_existing(cols, VAR_COL_CANDIDATES) label_col = _first_existing(cols, LABEL_COL_CANDIDATES) desc_col = _first_existing(cols, DESC_COL_CANDIDATES) role_col = _first_existing(cols, ROLE_COL_CANDIDATES) # If we absolutely cannot find a term column, try first column if not term_col: term_col = cols[0] if cols else None # If there's absolutely no useful columns, give up if not term_col: return out for _, row in df.iterrows(): try: term = str(row.get(term_col, "") or "").strip() except Exception: term = "" if not term: continue label = "" desc = "" role = "" try: label = str(row.get(label_col, "") or "").strip() if label_col in df.columns else "" except Exception: label = "" try: desc = str(row.get(desc_col, "") or "").strip() if desc_col in df.columns else "" except Exception: desc = "" try: role = str(row.get(role_col, "") or "").strip() if role_col in df.columns else "" except Exception: role = "" # Compose a clean definition parts = [] if label: parts.append(f"Label: {label}") if desc: parts.append(f"Description: {desc}") if role: parts.append(f"Role/Origin: {role}") definition = " \n".join(parts).strip() or (label or desc or "") out.append({ "term": term, "definition": definition, "file": os.path.basename(filename), "type": "variable", "sources": [os.path.basename(filename)] }) return out def load_variable_metadata(search_paths=None, use_cache=True, verbose=True): """ Discover SDTM/ADaM/CDASH excel files and extract variable metadata. Returns list of dicts: {'term','definition','file','type','sources'} """ # quick fail if pandas not installed if pd is None: logger.warning("pandas not available — variable metadata loading skipped.") return [] # cache handling try: if use_cache and os.path.exists(CACHE_PATH): mtime = os.path.getmtime(CACHE_PATH) if time.time() - mtime < CACHE_TTL_SECONDS: if verbose: logger.info("Loading variable metadata from cache: %s", CACHE_PATH) with open(CACHE_PATH, "r", encoding="utf-8") as f: return json.load(f) except Exception: # continue if cache read fails pass files = _discover_files(search_paths=search_paths) if verbose: logger.info("Variable loader discovered %d candidate Excel files.", len(files)) all_entries = [] for fx in files: try: # read all sheets (ExcelFile faster for many sheets) xls = pd.ExcelFile(fx) # iterate sheets: for sheet in xls.sheet_names: try: df = pd.read_excel(fx, sheet_name=sheet) # drop rows where all cells are NaN df = df.dropna(how="all") entries = _extract_from_df(df, fx) if entries: # annotate with sheet name to improve provenance for e in entries: e["sources"].append(f"{os.path.basename(fx)}::{sheet}") all_entries.extend(entries) except Exception: # try next sheet continue except Exception: # fallback: try single-sheet read try: df = pd.read_excel(fx) df = df.dropna(how="all") entries = _extract_from_df(df, fx) all_entries.extend(entries) except Exception as e: logger.debug("Failed reading excel %s: %s", fx, e) continue # dedupe by term (keep first occurrence) seen = {} deduped = [] for e in all_entries: key = (e["term"].strip().lower()) if key and key not in seen: seen[key] = True deduped.append(e) # write cache try: with open(CACHE_PATH, "w", encoding="utf-8") as f: json.dump(deduped, f, ensure_ascii=False, indent=2) except Exception: pass if verbose: logger.info("Variable loader extracted %d unique variables.", len(deduped)) return deduped if __name__ == "__main__": # quick CLI for debugging items = load_variable_metadata(verbose=True) print(f"[variable_loader] extracted {len(items)} items") if items: print("Sample:", items[:5])