# schema_profiler.py from __future__ import annotations from typing import Dict, Any, List, Tuple, Optional import pandas as pd import numpy as np import re, math, os # Optional embeddings for soft matching; falls back to lexical if missing try: from sentence_transformers import SentenceTransformer _EMB = SentenceTransformer("all-MiniLM-L6-v2") except Exception: _EMB = None def profile_csv(path: str, max_rows: int = 10000) -> Dict[str, Any]: df = pd.read_csv(path, nrows=max_rows, low_memory=False) cols = [] for c in df.columns: s = df[c] cols.append({ "raw": str(c), "dtype": str(s.dtype), "nonnull": int(s.notna().sum()), "samples": s.dropna().astype(str).head(3).tolist(), }) return {"kind":"csv","name":os.path.basename(path),"rows":len(df),"columns":cols,"df":df} def build_dynamic_label_space(scenario_text: str) -> List[str]: """ Create a candidate label space from the scenario itself: - Nounish/metric-like phrases (very permissive) - Units hints (%, hours, days, rate, cost, capacity) - Also include frequent bigrams from scenario """ t = (scenario_text or "").lower() # crude noun-ish grabs phrases = re.findall(r"[a-z][a-z0-9_./%-]*(?:\s+[a-z0-9_./%-]+){0,3}", t) phrases = [p.strip() for p in phrases if len(p.split())<=4 and len(p)>=3] # keep likely metric-ish tokens keepers = [] for p in phrases: if any(k in p for k in ["median","mean","p90","p95","rate","cost","capacity","clients","visits","screen","a1c","bmi","bp","wait","throughput","budget","per day","per client","percent","%","hours","days","delta","change","outcome"]): keepers.append(p) # dedupe and limit size seen = set() out = [] for x in keepers: x = re.sub(r"\s+", " ", x).strip() if x not in seen: seen.add(x) out.append(x) if len(out) >= 128: break return out or ["value","count","rate","cost","capacity"] def soft_bind_inputs_to_columns( required_inputs: List[str], column_bag: List[str], scenario_labels: List[str], min_score: float = 0.46 ) -> Dict[str, Dict[str, Any]]: """ For each required input "name", find the best candidate column from the union of: - uploaded headers - scenario-derived label space Returns {input_name: {"match": raw_col_or_label, "score": float, "source": "header|scenario"}} If no confident match, the 'match' is None. """ req = [r.strip() for r in required_inputs if r and r.strip()] if not req: return {} # Vectorize all tokens if embeddings present combined_pool = list(dict.fromkeys(column_bag + scenario_labels)) if _EMB is not None and combined_pool: pool_vecs = _EMB.encode(combined_pool) req_vecs = _EMB.encode(req) sims = np.matmul(req_vecs, pool_vecs.T) # cosine if model outputs normalized; good enough here mapping: Dict[str, Dict[str, Any]] = {} for i, name in enumerate(req): j = int(np.argmax(sims[i])) score = float(np.max(sims[i])) cand = combined_pool[j] src = "header" if cand in column_bag else "scenario" mapping[name] = {"match": cand if score >= min_score else None, "score": score, "source": src} return mapping # Fallback: lexical overlap (very conservative) def _lex_overlap(a: str, b: str) -> float: A = set(re.findall(r"[a-z0-9]+", a.lower())) B = set(re.findall(r"[a-z0-9]+", b.lower())) if not A or not B: return 0.0 return len(A & B) / math.sqrt(len(A)*len(B)) mapping: Dict[str, Dict[str, Any]] = {} for name in req: best = ("", 0.0, "") for cand in combined_pool: s = _lex_overlap(name, cand) if s > best[1]: best = (cand, s, "header" if cand in column_bag else "scenario") mapping[name] = {"match": best[0] if best[1] >= 0.34 else None, "score": best[1], "source": best[2]} return mapping