# ================================================================ # Self-Sensing Concrete Assistant — Predictor (XGB) + Hybrid RAG # - Uses local 'papers/' folder for literature # - Robust MMR sentence selection (no list index errors) # - Predictor: safe model caching + safe feature alignment # - Stable categoricals ("NA"); no over-strict completeness gate # - Fixed [[PAGE=...]] regex # - NEW: Lightweight instrumentation (JSONL logs per RAG turn) # - UPDATED THEME: Dark-blue tabs + Evaluate tab + k-slider styling # - PATCH: Per-question/aggregate File + JSON outputs now dark-themed via elem_id hooks # - OPTIONAL JS: Adds .eval-active class when Evaluate tab is selected # ================================================================ # ---------------------- Runtime flags (HF-safe) ---------------------- import os os.environ["TRANSFORMERS_NO_TF"] = "1" os.environ["TRANSFORMERS_NO_FLAX"] = "1" os.environ["TOKENIZERS_PARALLELISM"] = "false" # ------------------------------- Imports ------------------------------ import re, joblib, warnings, json, traceback, time, uuid, subprocess, sys from pathlib import Path from typing import List, Dict, Any, Optional import numpy as np import pandas as pd import gradio as gr warnings.filterwarnings("ignore", category=UserWarning) # Optional deps (handled gracefully if missing) USE_DENSE = True try: from sentence_transformers import SentenceTransformer except Exception: USE_DENSE = False try: from rank_bm25 import BM25Okapi except Exception: BM25Okapi = None print("rank_bm25 not installed; BM25 disabled (TF-IDF still works).") # Optional OpenAI (for LLM paraphrase) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") try: from openai import OpenAI except Exception: OpenAI = None # LLM availability flag — used internally; UI remains hidden LLM_AVAILABLE = (OPENAI_API_KEY is not None and OPENAI_API_KEY.strip() != "" and OpenAI is not None) # ========================= Predictor (kept) ========================= CF_COL = "Conductive Filler Conc. (wt%)" TARGET_COL = "Stress GF (MPa-1)" CANON_NA = "NA" # canonical placeholder for categoricals MAIN_VARIABLES = [ "Filler 1 Type", "Filler 1 Diameter (µm)", "Filler 1 Length (mm)", CF_COL, "Filler 1 Dimensionality", "Filler 2 Type", "Filler 2 Diameter (µm)", "Filler 2 Length (mm)", "Filler 2 Dimensionality", "Specimen Volume (mm3)", "Probe Count", "Probe Material", "W/B", "S/B", "Gauge Length (mm)", "Curing Condition", "Number of Fillers", "Drying Temperature (°C)", "Drying Duration (hr)", "Loading Rate (MPa/s)", "Modulus of Elasticity (GPa)", "Current Type", "Applied Voltage (V)" ] NUMERIC_COLS = { "Filler 1 Diameter (µm)", "Filler 1 Length (mm)", CF_COL, "Filler 2 Diameter (µm)", "Filler 2 Length (mm)", "Specimen Volume (mm3)", "Probe Count", "W/B", "S/B", "Gauge Length (mm)", "Number of Fillers", "Drying Temperature (°C)", "Drying Duration (hr)", "Loading Rate (MPa/s)", "Modulus of Elasticity (GPa)", "Applied Voltage (V)" } CATEGORICAL_COLS = { "Filler 1 Type", "Filler 1 Dimensionality", "Filler 2 Type", "Filler 2 Dimensionality", "Probe Material", "Curing Condition", "Current Type" } DIM_CHOICES = ["0D", "1D", "2D", "3D", CANON_NA] CURRENT_CHOICES = ["DC", "AC", CANON_NA] MODEL_CANDIDATES = [ "stress_gf_xgb.joblib", "models/stress_gf_xgb.joblib", "/home/user/app/stress_gf_xgb.joblib", os.getenv("MODEL_PATH", "") ] # ---------- Model caching + status ---------- MODEL = None MODEL_STATUS = "🔴 Model not loaded" def _try_load_model(): global MODEL, MODEL_STATUS for p in [x for x in MODEL_CANDIDATES if x]: if os.path.exists(p): try: MODEL = joblib.load(p) MODEL_STATUS = f"🟢 Loaded model: {Path(p).name}" print("[ModelLoad] Loaded:", p) return except Exception as e: print(f"[ModelLoad] Error from {p}: {e}") traceback.print_exc() MODEL = None if MODEL is None: MODEL_STATUS = "🔴 Model not found (place stress_gf_xgb.joblib at repo root or models/, or set MODEL_PATH)" print("[ModelLoad]", MODEL_STATUS) _try_load_model() # load at import time def _canon_cat(v: Any) -> str: """Stable, canonical category placeholder normalization.""" if v is None: return CANON_NA s = str(v).strip() if s == "" or s.upper() in {"N/A", "NONE", "NULL"}: return CANON_NA return s def _to_float_or_nan(v): if v in ("", None): return np.nan try: return float(str(v).replace(",", "")) except Exception: return np.nan def _coerce_to_row(form_dict: dict) -> pd.DataFrame: row = {} for col in MAIN_VARIABLES: v = form_dict.get(col, None) if col in NUMERIC_COLS: row[col] = _to_float_or_nan(v) elif col in CATEGORICAL_COLS: row[col] = _canon_cat(v) else: s = str(v).strip() if v is not None else "" row[col] = s if s else CANON_NA return pd.DataFrame([row], columns=MAIN_VARIABLES) def _align_columns_to_model(df: pd.DataFrame, mdl) -> pd.DataFrame: """ SAFE alignment: - If mdl.feature_names_in_ exists AND is a subset of df.columns (raw names), reorder to it. - Else, try a Pipeline step (e.g., 'preprocessor') with feature_names_in_ subset of df.columns. - Else, DO NOT align (let the pipeline handle columns by name). """ try: feat = getattr(mdl, "feature_names_in_", None) if isinstance(feat, (list, np.ndarray, pd.Index)): feat = list(feat) if all(c in df.columns for c in feat): return df[feat] if hasattr(mdl, "named_steps"): for key in ["preprocessor", "columntransformer"]: if key in mdl.named_steps: step = mdl.named_steps[key] feat2 = getattr(step, "feature_names_in_", None) if isinstance(feat2, (list, np.ndarray, pd.Index)): feat2 = list(feat2) if all(c in df.columns for c in feat2): return df[feat2] # fallback to first step if it exposes input names try: first_key = list(mdl.named_steps.keys())[0] step = mdl.named_steps[first_key] feat3 = getattr(step, "feature_names_in_", None) if isinstance(feat3, (list, np.ndarray, pd.Index)): feat3 = list(feat3) if all(c in df.columns for c in feat3): return df[feat3] except Exception: pass return df except Exception as e: print(f"[Align] Skip aligning due to: {e}") traceback.print_exc() return df def predict_fn(**kwargs): """ Always attempt prediction. - Missing numerics -> NaN (imputer handles) - Categoricals -> 'NA' - If model missing or inference error -> 0.0 (keeps UI stable) """ if MODEL is None: return 0.0 X_new = _coerce_to_row(kwargs) X_new = _align_columns_to_model(X_new, MODEL) try: y_raw = MODEL.predict(X_new) # log1p or original scale depending on training if getattr(MODEL, "target_is_log1p_", False): y = np.expm1(y_raw) else: y = y_raw y = float(np.asarray(y).ravel()[0]) return max(y, 0.0) except Exception as e: print(f"[Predict] {e}") traceback.print_exc() return 0.0 EXAMPLE = { "Filler 1 Type": "CNT", "Filler 1 Dimensionality": "1D", "Filler 1 Diameter (µm)": 0.02, "Filler 1 Length (mm)": 1.2, CF_COL: 0.5, "Filler 2 Type": "", "Filler 2 Dimensionality": CANON_NA, "Filler 2 Diameter (µm)": None, "Filler 2 Length (mm)": None, "Specimen Volume (mm3)": 1000, "Probe Count": 2, "Probe Material": "Copper", "W/B": 0.4, "S/B": 2.5, "Gauge Length (mm)": 20, "Curing Condition": "28d water, 20°C", "Number of Fillers": 1, "Drying Temperature (°C)": 60, "Drying Duration (hr)": 24, "Loading Rate (MPa/s)": 0.1, "Modulus of Elasticity (GPa)": 25, "Current Type": "DC", "Applied Voltage (V)": 5.0, } def _fill_example(): return [EXAMPLE.get(k, None) for k in MAIN_VARIABLES] def _clear_all(): cleared = [] for col in MAIN_VARIABLES: if col in NUMERIC_COLS: cleared.append(None) elif col in {"Filler 1 Dimensionality", "Filler 2 Dimensionality"}: cleared.append(CANON_NA) elif col == "Current Type": cleared.append(CANON_NA) else: cleared.append("") return cleared # ========================= Hybrid RAG ========================= ARTIFACT_DIR = Path("rag_artifacts"); ARTIFACT_DIR.mkdir(exist_ok=True) TFIDF_VECT_PATH = ARTIFACT_DIR / "tfidf_vectorizer.joblib" TFIDF_MAT_PATH = ARTIFACT_DIR / "tfidf_matrix.joblib" BM25_TOK_PATH = ARTIFACT_DIR / "bm25_tokens.joblib" EMB_NPY_PATH = ARTIFACT_DIR / "chunk_embeddings.npy" RAG_META_PATH = ARTIFACT_DIR / "chunks.parquet" LOCAL_PDF_DIR = Path("papers"); LOCAL_PDF_DIR.mkdir(exist_ok=True) USE_ONLINE_SOURCES = os.getenv("USE_ONLINE_SOURCES", "false").lower() == "true" W_TFIDF_DEFAULT = 0.50 if not USE_DENSE else 0.30 W_BM25_DEFAULT = 0.50 if not USE_DENSE else 0.30 W_EMB_DEFAULT = 0.00 if USE_DENSE is False else 0.40 _SENT_SPLIT_RE = re.compile(r"(?<=[.!?])\s+|\n+") TOKEN_RE = re.compile(r"[A-Za-z0-9_#+\-/\.%]+") def sent_split(text: str) -> List[str]: sents = [s.strip() for s in _SENT_SPLIT_RE.split(text) if s.strip()] return [s for s in sents if len(s.split()) >= 5] def tokenize(text: str) -> List[str]: return [t.lower() for t in TOKEN_RE.findall(text)] def _extract_pdf_text(pdf_path: Path) -> str: try: import fitz doc = fitz.open(pdf_path) out = [] for i, page in enumerate(doc): out.append(f"[[PAGE={i+1}]]\n{page.get_text('text') or ''}") return "\n\n".join(out) except Exception: try: from pypdf import PdfReader reader = PdfReader(str(pdf_path)) out = [] for i, p in enumerate(reader.pages): txt = p.extract_text() or "" out.append(f"[[PAGE={i+1}]]\n{txt}") return "\n\n".join(out) except Exception as e: print(f"PDF read error ({pdf_path}): {e}") return "" def chunk_by_sentence_windows(text: str, win_size=8, overlap=2) -> List[str]: sents = sent_split(text) chunks, step = [], max(1, win_size - overlap) for i in range(0, len(sents), step): window = sents[i:i+win_size] if not window: break chunks.append(" ".join(window)) return chunks def _safe_init_st_model(name: str): global USE_DENSE if not USE_DENSE: return None try: return SentenceTransformer(name) except Exception as e: print("Dense embeddings unavailable:", e) USE_DENSE = False return None def build_or_load_hybrid(pdf_dir: Path): # Build or load the hybrid retriever cache have_cache = (TFIDF_VECT_PATH.exists() and TFIDF_MAT_PATH.exists() and RAG_META_PATH.exists() and (BM25_TOK_PATH.exists() or BM25Okapi is None) and (EMB_NPY_PATH.exists() or not USE_DENSE)) if have_cache: vectorizer = joblib.load(TFIDF_VECT_PATH) X_tfidf = joblib.load(TFIDF_MAT_PATH) meta = pd.read_parquet(RAG_META_PATH) bm25_toks = joblib.load(BM25_TOK_PATH) if BM25Okapi is not None else None emb = np.load(EMB_NPY_PATH) if (USE_DENSE and EMB_NPY_PATH.exists()) else None return vectorizer, X_tfidf, meta, bm25_toks, emb rows, all_tokens = [], [] pdf_paths = list(Path(pdf_dir).glob("**/*.pdf")) print(f"Indexing PDFs in {pdf_dir} — found {len(pdf_paths)} files.") for pdf in pdf_paths: raw = _extract_pdf_text(pdf) if not raw.strip(): continue for i, ch in enumerate(chunk_by_sentence_windows(raw, win_size=8, overlap=2)): rows.append({"doc_path": str(pdf), "chunk_id": i, "text": ch}) all_tokens.append(tokenize(ch)) if not rows: meta = pd.DataFrame(columns=["doc_path", "chunk_id", "text"]) vectorizer = None; X_tfidf = None; emb = None; all_tokens = None return vectorizer, X_tfidf, meta, all_tokens, emb meta = pd.DataFrame(rows) from sklearn.feature_extraction.text import TfidfVectorizer vectorizer = TfidfVectorizer( ngram_range=(1,2), min_df=1, max_df=0.95, sublinear_tf=True, smooth_idf=True, lowercase=True, token_pattern=r"(?u)\b\w[\w\-\./%+#]*\b" ) X_tfidf = vectorizer.fit_transform(meta["text"].tolist()) emb = None if USE_DENSE: try: st_model = _safe_init_st_model(os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2")) if st_model is not None: from sklearn.preprocessing import normalize as sk_normalize em = st_model.encode(meta["text"].tolist(), batch_size=64, show_progress_bar=False, convert_to_numpy=True) emb = sk_normalize(em) np.save(EMB_NPY_PATH, emb) except Exception as e: print("Dense embedding failed:", e) emb = None joblib.dump(vectorizer, TFIDF_VECT_PATH) joblib.dump(X_tfidf, TFIDF_MAT_PATH) if BM25Okapi is not None: joblib.dump(all_tokens, BM25_TOK_PATH) meta.to_parquet(RAG_META_PATH, index=False) return vectorizer, X_tfidf, meta, all_tokens, emb tfidf_vectorizer, tfidf_matrix, rag_meta, bm25_tokens, emb_matrix = build_or_load_hybrid(LOCAL_PDF_DIR) bm25 = BM25Okapi(bm25_tokens) if (BM25Okapi is not None and bm25_tokens is not None) else None st_query_model = _safe_init_st_model(os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2")) def _extract_page(text_chunk: str) -> str: # Correct: [[PAGE=123]] m = list(re.finditer(r"\[\[PAGE=(\d+)\]\]", text_chunk or "")) return (m[-1].group(1) if m else "?") def hybrid_search(query: str, k=8, w_tfidf=W_TFIDF_DEFAULT, w_bm25=W_BM25_DEFAULT, w_emb=W_EMB_DEFAULT): if rag_meta is None or rag_meta.empty: return pd.DataFrame() # Dense scores if USE_DENSE and st_query_model is not None and emb_matrix is not None and w_emb > 0: try: from sklearn.preprocessing import normalize as sk_normalize q_emb = st_query_model.encode([query], convert_to_numpy=True) q_emb = sk_normalize(q_emb)[0] dense_scores = emb_matrix @ q_emb except Exception as e: print("Dense query encoding failed:", e) dense_scores = np.zeros(len(rag_meta), dtype=float); w_emb = 0.0 else: dense_scores = np.zeros(len(rag_meta), dtype=float); w_emb = 0.0 # TF-IDF scores if tfidf_vectorizer is not None and tfidf_matrix is not None: q_vec = tfidf_vectorizer.transform([query]) tfidf_scores = (tfidf_matrix @ q_vec.T).toarray().ravel() else: tfidf_scores = np.zeros(len(rag_meta), dtype=float); w_tfidf = 0.0 # BM25 scores if bm25 is not None: q_tokens = [t.lower() for t in re.findall(r"[A-Za-z0-9_#+\-\/\.%]+", query)] bm25_scores = np.array(bm25.get_scores(q_tokens), dtype=float) else: bm25_scores = np.zeros(len(rag_meta), dtype=float); w_bm25 = 0.0 def _norm(x): x = np.asarray(x, dtype=float) if np.allclose(x.max(), x.min()): return np.zeros_like(x) return (x - x.min()) / (x.max() - x.min()) s_dense = _norm(dense_scores) s_tfidf = _norm(tfidf_scores) s_bm25 = _norm(bm25_scores) total_w = (w_tfidf + w_bm25 + w_emb) or 1.0 w_tfidf, w_bm25, w_emb = w_tfidf/total_w, w_bm25/total_w, w_emb/total_w combo = w_emb * s_dense + w_tfidf * s_tfidf + w_bm25 * s_bm25 idx = np.argsort(-combo)[:k] hits = rag_meta.iloc[idx].copy() hits["score_dense"] = s_dense[idx] hits["score_tfidf"] = s_tfidf[idx] hits["score_bm25"] = s_bm25[idx] hits["score"] = combo[idx] return hits.reset_index(drop=True) def split_sentences(text: str) -> List[str]: sents = sent_split(text) return [s for s in sents if 6 <= len(s.split()) <= 60] def mmr_select_sentences(question: str, hits: pd.DataFrame, top_n=4, pool_per_chunk=6, lambda_div=0.7): """ Robust MMR sentence picker: - Handles empty pools - Clamps top_n to pool size - Avoids 'list index out of range' """ # Build pool pool = [] for _, row in hits.iterrows(): doc = Path(row["doc_path"]).name page = _extract_page(row["text"]) sents = split_sentences(row["text"]) if not sents: continue for s in sents[:max(1, int(pool_per_chunk))]: pool.append({"sent": s, "doc": doc, "page": page}) if not pool: return [] # Relevance vectors sent_texts = [p["sent"] for p in pool] use_dense = USE_DENSE and st_query_model is not None try: if use_dense: from sklearn.preprocessing import normalize as sk_normalize enc = st_query_model.encode([question] + sent_texts, convert_to_numpy=True) q_vec = sk_normalize(enc[:1])[0] S = sk_normalize(enc[1:]) rel = (S @ q_vec) def sim_fn(i, j): return float(S[i] @ S[j]) else: from sklearn.feature_extraction.text import TfidfVectorizer vect = TfidfVectorizer().fit(sent_texts + [question]) Q = vect.transform([question]); S = vect.transform(sent_texts) rel = (S @ Q.T).toarray().ravel() def sim_fn(i, j): num = (S[i] @ S[j].T) return float(num.toarray()[0, 0]) if hasattr(num, "toarray") else float(num) except Exception: # Fallback: uniform relevance if vectorization fails rel = np.ones(len(sent_texts), dtype=float) def sim_fn(i, j): return 0.0 # Normalize lambda_div lambda_div = float(np.clip(lambda_div, 0.0, 1.0)) # Select first by highest relevance remain = list(range(len(pool))) if not remain: return [] first = int(np.argmax(rel)) selected_idx = [first] selected = [pool[first]] remain.remove(first) # Clamp top_n max_pick = min(int(top_n), len(pool)) while len(selected) < max_pick and remain: cand_scores = [] for i in remain: div_i = max(sim_fn(i, j) for j in selected_idx) if selected_idx else 0.0 score = lambda_div * float(rel[i]) - (1.0 - lambda_div) * div_i cand_scores.append((score, i)) if not cand_scores: break cand_scores.sort(reverse=True) _, best_i = cand_scores[0] selected_idx.append(best_i) selected.append(pool[best_i]) remain.remove(best_i) return selected def compose_extractive(selected: List[Dict[str, Any]]) -> str: if not selected: return "" return " ".join(f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected) # ========================= NEW: Instrumentation helpers ========================= LOG_PATH = ARTIFACT_DIR / "rag_logs.jsonl" OPENAI_IN_COST_PER_1K = float(os.getenv("OPENAI_COST_IN_PER_1K", "0")) OPENAI_OUT_COST_PER_1K = float(os.getenv("OPENAI_COST_OUT_PER_1K", "0")) def _safe_write_jsonl(path: Path, record: dict): try: with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") except Exception as e: print("[Log] write failed:", e) def _calc_cost_usd(prompt_toks, completion_toks): if prompt_toks is None or completion_toks is None: return None return (prompt_toks / 1000.0) * OPENAI_IN_COST_PER_1K + (completion_toks / 1000.0) * OPENAI_OUT_COST_PER_1K # ----------------- Modified to return (text, usage_dict) ----------------- def synthesize_with_llm(question: str, sentence_lines: List[str], model: str = None, temperature: float = 0.2): if not LLM_AVAILABLE: return None, None client = OpenAI(api_key=OPENAI_API_KEY) model = model or OPENAI_MODEL SYSTEM_PROMPT = ( "You are a scientific assistant for self-sensing cementitious materials.\n" "Answer STRICTLY using the provided sentences.\n" "Do not invent facts. Keep it concise (3–6 sentences).\n" "Retain inline citations like (Doc.pdf, p.X) exactly as given." ) user_prompt = ( f"Question: {question}\n\n" f"Use ONLY these sentences to answer; keep their inline citations:\n" + "\n".join(f"- {s}" for s in sentence_lines) ) try: resp = client.responses.create( model=model, input=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=temperature, ) out_text = getattr(resp, "output_text", None) or str(resp) usage = None try: u = getattr(resp, "usage", None) if u: pt = getattr(u, "prompt_tokens", None) if hasattr(u, "prompt_tokens") else u.get("prompt_tokens", None) ct = getattr(u, "completion_tokens", None) if hasattr(u, "completion_tokens") else u.get("completion_tokens", None) usage = {"prompt_tokens": pt, "completion_tokens": ct} except Exception: usage = None return out_text, usage except Exception: return None, None def rag_reply( question: str, k: int = 8, n_sentences: int = 4, include_passages: bool = False, use_llm: bool = False, model: str = None, temperature: float = 0.2, strict_quotes_only: bool = False, w_tfidf: float = W_TFIDF_DEFAULT, w_bm25: float = W_BM25_DEFAULT, w_emb: float = W_EMB_DEFAULT ) -> str: run_id = str(uuid.uuid4()) t0_total = time.time() t0_retr = time.time() # --- Retrieval --- hits = hybrid_search(question, k=k, w_tfidf=w_tfidf, w_bm25=w_bm25, w_emb=w_emb) t1_retr = time.time() latency_ms_retriever = int((t1_retr - t0_retr) * 1000) if hits is None or hits.empty: final = "No indexed PDFs found. Upload PDFs to the 'papers/' folder and reload the Space." record = { "run_id": run_id, "ts": int(time.time()*1000), "inputs": { "question": question, "top_k": int(k), "n_sentences": int(n_sentences), "w_tfidf": float(w_tfidf), "w_bm25": float(w_bm25), "w_emb": float(w_emb), "use_llm": bool(use_llm), "model": model, "temperature": float(temperature) }, "retrieval": {"hits": [], "latency_ms_retriever": latency_ms_retriever}, "output": {"final_answer": final, "used_sentences": []}, "latency_ms_total": int((time.time()-t0_total)*1000), "openai": None } _safe_write_jsonl(LOG_PATH, record) return final # Select sentences selected = mmr_select_sentences(question, hits, top_n=int(n_sentences), pool_per_chunk=6, lambda_div=0.7) header_cites = "; ".join(f"{Path(r['doc_path']).name} (p.{_extract_page(r['text'])})" for _, r in hits.head(6).iterrows()) srcs = {Path(r['doc_path']).name for _, r in hits.iterrows()} coverage_note = "" if len(srcs) >= 3 else f"\n\n> Note: Only {len(srcs)} unique source(s) contributed. Add more PDFs or increase Top-K." # Prepare retrieval list for logging retr_list = [] for _, r in hits.iterrows(): retr_list.append({ "doc": Path(r["doc_path"]).name, "page": _extract_page(r["text"]), "score_tfidf": float(r.get("score_tfidf", 0.0)), "score_bm25": float(r.get("score_bm25", 0.0)), "score_dense": float(r.get("score_dense", 0.0)), "combo_score": float(r.get("score", 0.0)), }) # Strict quotes only (no LLM) if strict_quotes_only: if not selected: final = f"**Quoted Passages:**\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) + f"\n\n**Citations:** {header_cites}{coverage_note}" else: final = "**Quoted Passages:**\n- " + "\n- ".join(f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected) final += f"\n\n**Citations:** {header_cites}{coverage_note}" if include_passages: final += "\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) record = { "run_id": run_id, "ts": int(time.time()*1000), "inputs": { "question": question, "top_k": int(k), "n_sentences": int(n_sentences), "w_tfidf": float(w_tfidf), "w_bm25": float(w_bm25), "w_emb": float(w_emb), "use_llm": False, "model": None, "temperature": float(temperature) }, "retrieval": {"hits": retr_list, "latency_ms_retriever": latency_ms_retriever}, "output": { "final_answer": final, "used_sentences": [{"sent": s["sent"], "doc": s["doc"], "page": s["page"]} for s in selected] }, "latency_ms_total": int((time.time()-t0_total)*1000), "openai": None } _safe_write_jsonl(LOG_PATH, record) return final # Extractive or LLM synthesis extractive = compose_extractive(selected) llm_usage = None llm_latency_ms = None if use_llm and selected: lines = [f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected] t0_llm = time.time() llm_text, llm_usage = synthesize_with_llm(question, lines, model=model, temperature=temperature) t1_llm = time.time() llm_latency_ms = int((t1_llm - t0_llm) * 1000) if llm_text: final = f"**Answer (LLM synthesis):** {llm_text}\n\n**Citations:** {header_cites}{coverage_note}" if include_passages: final += "\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) else: if not extractive: final = f"**Answer:** Here are relevant passages.\n\n**Citations:** {header_cites}{coverage_note}\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) else: final = f"**Answer:** {extractive}\n\n**Citations:** {header_cites}{coverage_note}" if include_passages: final += "\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) else: if not extractive: final = f"**Answer:** Here are relevant passages.\n\n**Citations:** {header_cites}{coverage_note}\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) else: final = f"**Answer:** {extractive}\n\n**Citations:** {header_cites}{coverage_note}" if include_passages: final += "\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) # --------- Log full run --------- prompt_toks = llm_usage.get("prompt_tokens") if llm_usage else None completion_toks = llm_usage.get("completion_tokens") if llm_usage else None cost_usd = _calc_cost_usd(prompt_toks, completion_toks) total_ms = int((time.time() - t0_total) * 1000) record = { "run_id": run_id, "ts": int(time.time()*1000), "inputs": { "question": question, "top_k": int(k), "n_sentences": int(n_sentences), "w_tfidf": float(w_tfidf), "w_bm25": float(w_bm25), "w_emb": float(w_emb), "use_llm": bool(use_llm), "model": model, "temperature": float(temperature) }, "retrieval": {"hits": retr_list, "latency_ms_retriever": latency_ms_retriever}, "output": { "final_answer": final, "used_sentences": [{"sent": s['sent'], "doc": s['doc'], "page": s['page']} for s in selected] }, "latency_ms_total": total_ms, "latency_ms_llm": llm_latency_ms, "openai": { "prompt_tokens": prompt_toks, "completion_tokens": completion_toks, "cost_usd": cost_usd } if use_llm else None } _safe_write_jsonl(LOG_PATH, record) return final def rag_chat_fn(message, history, top_k, n_sentences, include_passages, use_llm, model_name, temperature, strict_quotes_only, w_tfidf, w_bm25, w_emb): if not message or not message.strip(): return "Ask a literature question (e.g., *How does CNT length affect gauge factor?*)" try: return rag_reply( question=message, k=int(top_k), n_sentences=int(n_sentences), include_passages=bool(include_passages), use_llm=bool(use_llm), model=(model_name or None), temperature=float(temperature), strict_quotes_only=bool(strict_quotes_only), w_tfidf=float(w_tfidf), w_bm25=float(w_bm25), w_emb=float(w_emb), ) except Exception as e: return f"RAG error: {e}" # ========================= UI (science-oriented styling) ========================= CSS = """ /* Science-oriented: crisp contrast + readable numerics */ * {font-family: ui-sans-serif, system-ui, -apple-system, 'Segoe UI', Roboto, 'Helvetica Neue', Arial;} .gradio-container { background: linear-gradient(135deg, #0b1020 0%, #0c2b1a 60%, #0a2b4d 100%) !important; } .card {background: rgba(255,255,255,0.06) !important; border: 1px solid rgba(255,255,255,0.14); border-radius: 12px;} label {color: #e8f7ff !important; text-shadow: 0 1px 0 rgba(0,0,0,0.35); cursor: pointer;} input[type="number"] {font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace;} /* Checkbox clickability fixes */ input[type="checkbox"], .gr-checkbox, .gr-checkbox > * { pointer-events: auto !important; } .gr-checkbox label, .gr-check-radio label { pointer-events: auto !important; cursor: pointer; } #rag-tab input[type="checkbox"] { accent-color: #60a5fa !important; } /* RAG tab styling */ #rag-tab .block, #rag-tab .group, #rag-tab .accordion { background: linear-gradient(160deg, #1f2937 0%, #14532d 55%, #0b3b68 100%) !important; border-radius: 12px; border: 1px solid rgba(255,255,255,0.14); } #rag-tab input, #rag-tab textarea, #rag-tab select, #rag-tab .scroll-hide, #rag-tab .chatbot textarea { background: rgba(17, 24, 39, 0.85) !important; border: 1px solid #60a5fa !important; color: #e5f2ff !important; } #rag-tab input[type="range"] { accent-color: #22c55e !important; } #rag-tab button { border-radius: 10px !important; font-weight: 600 !important; } #rag-tab .chatbot { background: rgba(15, 23, 42, 0.6) !important; border: 1px solid rgba(148, 163, 184, 0.35) !important; } #rag-tab .message.user { background: rgba(34, 197, 94, 0.15) !important; border-left: 3px solid #22c55e !important; } #rag-tab .message.bot { background: rgba(59, 130, 246, 0.15) !important; border-left: 3px solid #60a5fa !important; color: #eef6ff !important; } /* Evaluate tab dark/high-contrast styling */ #eval-tab .block, #eval-tab .group, #eval-tab .accordion { background: linear-gradient(165deg, #0a0f1f 0%, #0d1a31 60%, #0a1c2e 100%) !important; border-radius: 12px; border: 1px solid rgba(139, 197, 255, 0.28); } #eval-tab label, #eval-tab .markdown, #eval-tab .prose, #eval-tab p, #eval-tab span { color: #e6f2ff !important; } #eval-tab input, #eval-tab .gr-file, #eval-tab .scroll-hide, #eval-tab textarea, #eval-tab select { background: rgba(8, 13, 26, 0.9) !important; border: 1px solid #3b82f6 !important; color: #dbeafe !important; } #eval-tab input[type="range"] { accent-color: #22c55e !important; } #eval-tab button { border-radius: 10px !important; font-weight: 700 !important; background: #0ea5e9 !important; color: #001321 !important; border: 1px solid #7dd3fc !important; } #eval-tab .gr-json, #eval-tab .markdown pre, #eval-tab .markdown code { background: rgba(2, 6, 23, 0.85) !important; color: #e2e8f0 !important; border: 1px solid rgba(148, 163, 184, 0.3) !important; border-radius: 10px !important; } /* Predictor output emphasis */ #pred-out .wrap { font-size: 20px; font-weight: 700; color: #ecfdf5; } /* Tab header: darker blue theme for all tabs */ .gradio-container .tab-nav button[role="tab"] { background: #0b1b34 !important; color: #cfe6ff !important; border: 1px solid #1e3a8a !important; } .gradio-container .tab-nav button[role="tab"][aria-selected="true"] { background: #0e2a57 !important; color: #e0f2fe !important; border-color: #3b82f6 !important; } /* Evaluate tab: enforce dark-blue text for labels/marks */ #eval-tab .label, #eval-tab label, #eval-tab .gr-slider .label, #eval-tab .wrap .label, #eval-tab .prose, #eval-tab .markdown, #eval-tab p, #eval-tab span { color: #cfe6ff !important; /* softer than pure white */ } /* Target the specific k-slider label strongly */ #k-slider .label, #k-slider label, #k-slider .wrap .label { color: #cfe6ff !important; text-shadow: 0 1px 0 rgba(0,0,0,0.35); } /* Slider track/thumb (dark blue gradient + blue thumb) */ #eval-tab input[type="range"] { accent-color: #3b82f6 !important; /* fallback */ } /* WebKit */ #eval-tab input[type="range"]::-webkit-slider-runnable-track { height: 6px; background: linear-gradient(90deg, #0b3b68, #1e3a8a); border-radius: 4px; } #eval-tab input[type="range"]::-webkit-slider-thumb { -webkit-appearance: none; appearance: none; margin-top: -6px; /* centers thumb on 6px track */ width: 18px; height: 18px; background: #1d4ed8; border: 1px solid #60a5fa; border-radius: 50%; } /* Firefox */ #eval-tab input[type="range"]::-moz-range-track { height: 6px; background: linear-gradient(90deg, #0b3b68, #1e3a8a); border-radius: 4px; } #eval-tab input[type="range"]::-moz-range-thumb { width: 18px; height: 18px; background: #1d4ed8; border: 1px solid #60a5fa; border-radius: 50%; } /* ======== PATCH: Style the File + JSON outputs by ID ======== */ #perq-file, #agg-file { background: rgba(8, 13, 26, 0.9) !important; border: 1px solid #3b82f6 !important; border-radius: 12px !important; padding: 8px !important; } #perq-file * , #agg-file * { color: #dbeafe !important; } #perq-file a, #agg-file a { background: #0e2a57 !important; color: #e0f2fe !important; border: 1px solid #60a5fa !important; border-radius: 8px !important; padding: 6px 10px !important; text-decoration: none !important; } #perq-file a:hover, #agg-file a:hover { background: #10356f !important; border-color: #93c5fd !important; } /* File preview wrappers (covers multiple Gradio render modes) */ #perq-file .file-preview, #agg-file .file-preview, #perq-file .wrap, #agg-file .wrap { background: rgba(2, 6, 23, 0.85) !important; border-radius: 10px !important; border: 1px solid rgba(148,163,184,.3) !important; } /* JSON output: dark panel + readable text */ #agg-json { background: rgba(2, 6, 23, 0.85) !important; border: 1px solid rgba(148,163,184,.35) !important; border-radius: 12px !important; padding: 8px !important; } #agg-json *, #agg-json .json, #agg-json .wrap { color: #e6f2ff !important; } #agg-json pre, #agg-json code { background: rgba(4, 10, 24, 0.9) !important; color: #e2e8f0 !important; border: 1px solid rgba(148,163,184,.35) !important; border-radius: 10px !important; } /* Tree/overflow modes */ #agg-json [data-testid="json-tree"], #agg-json [role="tree"], #agg-json .overflow-auto { background: rgba(4, 10, 24, 0.9) !important; color: #e6f2ff !important; border-radius: 10px !important; border: 1px solid rgba(148,163,184,.35) !important; } /* Eval log markdown */ #eval-log, #eval-log * { color: #cfe6ff !important; } #eval-log pre, #eval-log code { background: rgba(2, 6, 23, 0.85) !important; color: #e2e8f0 !important; border: 1px solid rgba(148,163,184,.3) !important; border-radius: 10px !important; } /* When Evaluate tab is active and JS has added .eval-active, bump contrast subtly */ #eval-tab.eval-active .block, #eval-tab.eval-active .group { border-color: #60a5fa !important; } #eval-tab.eval-active .label { color: #e6f2ff !important; } """ theme = gr.themes.Soft( primary_hue="blue", neutral_hue="green" ).set( body_background_fill="#0b1020", body_text_color="#e0f2fe", input_background_fill="#0f172a", input_border_color="#1e40af", button_primary_background_fill="#2563eb", button_primary_text_color="#ffffff", button_secondary_background_fill="#14532d", button_secondary_text_color="#ecfdf5", ) with gr.Blocks(css=CSS, theme=theme, fill_height=True) as demo: # Optional: JS to toggle .eval-active when Evaluate tab selected gr.HTML(""" """) gr.Markdown( "
" "Left: ML prediction for Stress Gauge Factor (original scale, MPa-1). " "Right: Literature Q&A via Hybrid RAG (BM25 + TF-IDF + optional dense) with MMR sentence selection." "
" ) with gr.Tabs(): # ------------------------- Predictor Tab ------------------------- with gr.Tab("🔮 Predict Gauge Factor (XGB)"): with gr.Row(): with gr.Column(scale=7): with gr.Accordion("Primary conductive filler", open=True, elem_classes=["card"]): f1_type = gr.Textbox(label="Filler 1 Type *", placeholder="e.g., CNT, Graphite, Steel fiber") f1_diam = gr.Number(label="Filler 1 Diameter (µm) *") f1_len = gr.Number(label="Filler 1 Length (mm) *") cf_conc = gr.Number(label=f"{CF_COL} *", info="Weight percent of total binder") f1_dim = gr.Dropdown(DIM_CHOICES, value=CANON_NA, label="Filler 1 Dimensionality *") with gr.Accordion("Secondary filler (optional)", open=False, elem_classes=["card"]): f2_type = gr.Textbox(label="Filler 2 Type", placeholder="Optional") f2_diam = gr.Number(label="Filler 2 Diameter (µm)") f2_len = gr.Number(label="Filler 2 Length (mm)") f2_dim = gr.Dropdown(DIM_CHOICES, value=CANON_NA, label="Filler 2 Dimensionality") with gr.Accordion("Mix design & specimen", open=False, elem_classes=["card"]): spec_vol = gr.Number(label="Specimen Volume (mm3) *") probe_cnt = gr.Number(label="Probe Count *") probe_mat = gr.Textbox(label="Probe Material *", placeholder="e.g., Copper, Silver paste") wb = gr.Number(label="W/B *") sb = gr.Number(label="S/B *") gauge_len = gr.Number(label="Gauge Length (mm) *") curing = gr.Textbox(label="Curing Condition *", placeholder="e.g., 28d water, 20°C") n_fillers = gr.Number(label="Number of Fillers *") with gr.Accordion("Processing", open=False, elem_classes=["card"]): dry_temp = gr.Number(label="Drying Temperature (°C)") dry_hrs = gr.Number(label="Drying Duration (hr)") with gr.Accordion("Mechanical & electrical loading", open=False, elem_classes=["card"]): load_rate = gr.Number(label="Loading Rate (MPa/s)") E_mod = gr.Number(label="Modulus of Elasticity (GPa) *") current = gr.Dropdown(CURRENT_CHOICES, value=CANON_NA, label="Current Type") voltage = gr.Number(label="Applied Voltage (V)") with gr.Column(scale=5): with gr.Group(elem_classes=["card"]): out_pred = gr.Number(label="Predicted Stress GF (MPa-1)", value=0.0, precision=6, elem_id="pred-out") gr.Markdown(f"{MODEL_STATUS}") with gr.Row(): btn_pred = gr.Button("Predict", variant="primary") btn_clear = gr.Button("Clear") btn_demo = gr.Button("Fill Example") with gr.Accordion("About this model", open=False, elem_classes=["card"]): gr.Markdown( "- Pipeline: ColumnTransformer → (RobustScaler + OneHot) → XGBoost\n" "- Target: Stress GF (MPa-1) on original scale (model may train on log1p; saved flag used at inference).\n" "- Missing values are safely imputed per-feature.\n" "- Trained columns:\n" f" `{', '.join(MAIN_VARIABLES)}`", elem_classes=["prose"] ) inputs_in_order = [ f1_type, f1_diam, f1_len, cf_conc, f1_dim, f2_type, f2_diam, f2_len, f2_dim, spec_vol, probe_cnt, probe_mat, wb, sb, gauge_len, curing, n_fillers, dry_temp, dry_hrs, load_rate, E_mod, current, voltage ] def _predict_wrapper(*vals): data = {k: v for k, v in zip(MAIN_VARIABLES, vals)} return predict_fn(**data) btn_pred.click(_predict_wrapper, inputs=inputs_in_order, outputs=out_pred) btn_clear.click(lambda: _clear_all(), inputs=None, outputs=inputs_in_order).then(lambda: 0.0, outputs=out_pred) btn_demo.click(lambda: _fill_example(), inputs=None, outputs=inputs_in_order) # ------------------------- Literature Tab ------------------------- with gr.Tab("📚 Ask the Literature (Hybrid RAG + MMR)", elem_id="rag-tab"): pdf_count = len(list(LOCAL_PDF_DIR.glob("**/*.pdf"))) gr.Markdown( f"Using local folderpapers/ — **{pdf_count} PDF(s)** indexed. "
"Upload more PDFs and reload the Space to expand coverage. Answers cite (Doc.pdf, p.X)."
)
with gr.Row():
top_k = gr.Slider(5, 12, value=8, step=1, label="Top-K chunks")
n_sentences = gr.Slider(2, 6, value=4, step=1, label="Answer length (sentences)")
include_passages = gr.Checkbox(value=False, label="Include supporting passages", interactive=True)
with gr.Accordion("Retriever weights (advanced)", open=False):
w_tfidf = gr.Slider(0.0, 1.0, value=W_TFIDF_DEFAULT, step=0.05, label="TF-IDF weight")
w_bm25 = gr.Slider(0.0, 1.0, value=W_BM25_DEFAULT, step=0.05, label="BM25 weight")
w_emb = gr.Slider(0.0, 1.0, value=(0.0 if not USE_DENSE else 0.40), step=0.05, label="Dense weight (set 0 if disabled)")
# Hidden states (unchanged)
state_use_llm = gr.State(LLM_AVAILABLE)
state_model_name = gr.State(os.getenv("OPENAI_MODEL", OPENAI_MODEL))
state_temperature = gr.State(0.2)
state_strict = gr.State(False)
gr.ChatInterface(
fn=rag_chat_fn,
additional_inputs=[
top_k, n_sentences, include_passages,
state_use_llm, state_model_name, state_temperature, state_strict,
w_tfidf, w_bm25, w_emb
],
title="Literature Q&A",
description="Hybrid retrieval with diversity. Answers carry inline (Doc, p.X) citations."
)
# ====== Evaluate (Gold vs Logs) — darker, higher-contrast ======
with gr.Tab("📏 Evaluate (Gold vs Logs)", elem_id="eval-tab"):
gr.Markdown("Upload your **gold.csv** and compute metrics against the app logs.")
with gr.Row():
gold_file = gr.File(label="gold.csv", file_types=[".csv"], interactive=True)
k_slider = gr.Slider(3, 12, value=8, step=1, label="k for Hit/Recall/nDCG", elem_id="k-slider")
with gr.Row():
btn_eval = gr.Button("Compute Metrics", variant="primary")
with gr.Row():
out_perq = gr.File(label="Per-question metrics (CSV)", elem_id="perq-file")
out_agg = gr.File(label="Aggregate metrics (JSON)", elem_id="agg-file")
out_json = gr.JSON(label="Aggregate summary", elem_id="agg-json")
out_log = gr.Markdown(label="Run log", elem_id="eval-log")
def _run_eval_inproc(gold_path: str, k: int = 8):
import json as _json
out_dir = str(ARTIFACT_DIR)
logs = str(LOG_PATH)
cmd = [
sys.executable, "rag_eval_metrics.py",
"--gold_csv", gold_path,
"--logs_jsonl", logs,
"--k", str(k),
"--out_dir", out_dir
]
try:
p = subprocess.run(cmd, capture_output=True, text=True, check=False)
stdout = p.stdout or ""
stderr = p.stderr or ""
perq = ARTIFACT_DIR / "metrics_per_question.csv"
agg = ARTIFACT_DIR / "metrics_aggregate.json"
agg_json = {}
if agg.exists():
agg_json = _json.loads(agg.read_text(encoding="utf-8"))
report = "```\n" + (stdout.strip() or "(no stdout)") + ("\n" + stderr.strip() if stderr else "") + "\n```"
return (str(perq) if perq.exists() else None,
str(agg) if agg.exists() else None,
agg_json,
report)
except Exception as e:
return (None, None, {}, f"**Eval error:** {e}")
def _eval_wrapper(gf, k):
from pathlib import Path
if gf is None:
default_gold = Path("gold.csv")
if not default_gold.exists():
return None, None, {}, "**No gold.csv provided or found in repo root.**"
gold_path = str(default_gold)
else:
gold_path = gf.name
return _run_eval_inproc(gold_path, int(k))
btn_eval.click(_eval_wrapper, inputs=[gold_file, k_slider],
outputs=[out_perq, out_agg, out_json, out_log])
# ------------- Launch -------------
if __name__ == "__main__":
demo.queue().launch()
import os
import pandas as pd
# Folder where your RAG files are stored
folder = "papers" # change if needed
# List all files in the folder
files = sorted(os.listdir(folder))
# Save them to a CSV file
pd.DataFrame({"doc": files}).to_csv("paper_list.csv", index=False)
print("✅ Saved paper_list.csv with", len(files), "papers")