# ================================================================ # Self-Sensing Concrete Assistant — Predictor (XGB) + Hybrid RAG # - Predictor tab: identical behavior to your "second code" # - Literature tab: from your "first code" (Hybrid RAG + MMR) # - Hugging Face friendly: online PDF fetching OFF by default # ================================================================ # ---------------------- Runtime flags (HF-safe) ---------------------- import os os.environ["TRANSFORMERS_NO_TF"] = "1" os.environ["TRANSFORMERS_NO_FLAX"] = "1" os.environ["TOKENIZERS_PARALLELISM"] = "false" # ------------------------------- Imports ------------------------------ import re, time, joblib, warnings, json from pathlib import Path from typing import List, Dict, Any import numpy as np import pandas as pd import gradio as gr warnings.filterwarnings("ignore", category=UserWarning) # Optional deps (handled gracefully if missing) USE_DENSE = True try: from sentence_transformers import SentenceTransformer except Exception: USE_DENSE = False try: from rank_bm25 import BM25Okapi except Exception: BM25Okapi = None print("rank_bm25 not installed; BM25 disabled (TF-IDF still works).") # Optional OpenAI (for LLM paraphrase) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") try: from openai import OpenAI except Exception: OpenAI = None # ========================= Predictor (kept same as 2nd) ========================= CF_COL = "Conductive Filler Conc. (wt%)" TARGET_COL = "Stress GF (MPa-1)" MAIN_VARIABLES = [ "Filler 1 Type", "Filler 1 Diameter (µm)", "Filler 1 Length (mm)", CF_COL, "Filler 1 Dimensionality", "Filler 2 Type", "Filler 2 Diameter (µm)", "Filler 2 Length (mm)", "Filler 2 Dimensionality", "Specimen Volume (mm3)", "Probe Count", "Probe Material", "W/B", "S/B", "Gauge Length (mm)", "Curing Condition", "Number of Fillers", "Drying Temperature (°C)", "Drying Duration (hr)", "Loading Rate (MPa/s)", "Modulus of Elasticity (GPa)", "Current Type", "Applied Voltage (V)" ] NUMERIC_COLS = { "Filler 1 Diameter (µm)", "Filler 1 Length (mm)", CF_COL, "Filler 2 Diameter (µm)", "Filler 2 Length (mm)", "Specimen Volume (mm3)", "Probe Count", "W/B", "S/B", "Gauge Length (mm)", "Number of Fillers", "Drying Temperature (°C)", "Drying Duration (hr)", "Loading Rate (MPa/s)", "Modulus of Elasticity (GPa)", "Applied Voltage (V)" } CATEGORICAL_COLS = { "Filler 1 Type", "Filler 1 Dimensionality", "Filler 2 Type", "Filler 2 Dimensionality", "Probe Material", "Curing Condition", "Current Type" } DIM_CHOICES = ["0D", "1D", "2D", "3D", "NA"] CURRENT_CHOICES = ["DC", "AC", "NA"] MODEL_CANDIDATES = [ "stress_gf_xgb.joblib", "models/stress_gf_xgb.joblib", "/home/user/app/stress_gf_xgb.joblib", ] def _load_model_or_error(): for p in MODEL_CANDIDATES: if os.path.exists(p): try: return joblib.load(p) except Exception as e: return f"Could not load model from {p}: {e}" return ("Model file not found. Upload your trained pipeline as " "stress_gf_xgb.joblib (or put it in models/).") def _coerce_to_row(form_dict: dict) -> pd.DataFrame: row = {} for col in MAIN_VARIABLES: v = form_dict.get(col, None) if col in NUMERIC_COLS: if v in ("", None): row[col] = np.nan else: try: row[col] = float(v) except Exception: row[col] = np.nan else: row[col] = "" if v in (None, "NA") else str(v).strip() return pd.DataFrame([row], columns=MAIN_VARIABLES) def predict_fn(**kwargs): mdl = _load_model_or_error() if isinstance(mdl, str): return mdl X_new = _coerce_to_row(kwargs) try: y_log = mdl.predict(X_new) # model predicts log1p(target) y = float(np.expm1(y_log)[0]) # back to original scale MPa^-1 if -1e-10 < y < 0: y = 0.0 return y except Exception as e: return f"Prediction error: {e}" EXAMPLE = { "Filler 1 Type": "CNT", "Filler 1 Dimensionality": "1D", "Filler 1 Diameter (µm)": 0.02, "Filler 1 Length (mm)": 1.2, CF_COL: 0.5, "Filler 2 Type": "", "Filler 2 Dimensionality": "NA", "Filler 2 Diameter (µm)": None, "Filler 2 Length (mm)": None, "Specimen Volume (mm3)": 1000, "Probe Count": 2, "Probe Material": "Copper", "W/B": 0.4, "S/B": 2.5, "Gauge Length (mm)": 20, "Curing Condition": "28d water, 20°C", "Number of Fillers": 1, "Drying Temperature (°C)": 60, "Drying Duration (hr)": 24, "Loading Rate (MPa/s)": 0.1, "Modulus of Elasticity (GPa)": 25, "Current Type": "DC", "Applied Voltage (V)": 5.0, } def _fill_example(): return [EXAMPLE.get(k, None) for k in MAIN_VARIABLES] def _clear_all(): cleared = [] for col in MAIN_VARIABLES: if col in NUMERIC_COLS: cleared.append(None) elif col in {"Filler 1 Dimensionality", "Filler 2 Dimensionality"}: cleared.append("NA") elif col == "Current Type": cleared.append("NA") else: cleared.append("") return cleared # ========================= Hybrid RAG (from 1st code) ========================= # Configuration ARTIFACT_DIR = Path("rag_artifacts"); ARTIFACT_DIR.mkdir(exist_ok=True) TFIDF_VECT_PATH = ARTIFACT_DIR / "tfidf_vectorizer.joblib" TFIDF_MAT_PATH = ARTIFACT_DIR / "tfidf_matrix.joblib" BM25_TOK_PATH = ARTIFACT_DIR / "bm25_tokens.joblib" EMB_NPY_PATH = ARTIFACT_DIR / "chunk_embeddings.npy" RAG_META_PATH = ARTIFACT_DIR / "chunks.parquet" # PDF source (HF-safe: rely on local /papers by default) LOCAL_PDF_DIR = Path("papers"); LOCAL_PDF_DIR.mkdir(exist_ok=True) USE_ONLINE_SOURCES = os.getenv("USE_ONLINE_SOURCES", "false").lower() == "true" # Retrieval weights W_TFIDF_DEFAULT = 0.50 if not USE_DENSE else 0.30 W_BM25_DEFAULT = 0.50 if not USE_DENSE else 0.30 W_EMB_DEFAULT = 0.00 if not USE_DENSE else 0.40 # Simple text processing _SENT_SPLIT_RE = re.compile(r"(?<=[.!?])\s+|\n+") TOKEN_RE = re.compile(r"[A-Za-z0-9_#+\-/\.%]+") def sent_split(text: str) -> List[str]: sents = [s.strip() for s in _SENT_SPLIT_RE.split(text) if s.strip()] return [s for s in sents if len(s.split()) >= 5] def tokenize(text: str) -> List[str]: return [t.lower() for t in TOKEN_RE.findall(text)] # PDF text extraction (PyMuPDF preferred; pypdf fallback) def _extract_pdf_text(pdf_path: Path) -> str: try: import fitz doc = fitz.open(pdf_path) out = [] for i, page in enumerate(doc): out.append(f"[[PAGE={i+1}]]\n{page.get_text('text') or ''}") return "\n\n".join(out) except Exception: try: from pypdf import PdfReader reader = PdfReader(str(pdf_path)) out = [] for i, p in enumerate(reader.pages): txt = p.extract_text() or "" out.append(f"[[PAGE={i+1}]]\n{txt}") return "\n\n".join(out) except Exception as e: print(f"PDF read error ({pdf_path}): {e}") return "" def chunk_by_sentence_windows(text: str, win_size=8, overlap=2) -> List[str]: sents = sent_split(text) chunks, step = [], max(1, win_size - overlap) for i in range(0, len(sents), step): window = sents[i:i+win_size] if not window: break chunks.append(" ".join(window)) return chunks def _safe_init_st_model(name: str): global USE_DENSE if not USE_DENSE: return None try: return SentenceTransformer(name) except Exception as e: print("Dense embeddings unavailable:", e) USE_DENSE = False return None # Build or load index def build_or_load_hybrid(pdf_dir: Path): have_cache = (TFIDF_VECT_PATH.exists() and TFIDF_MAT_PATH.exists() and RAG_META_PATH.exists() and (BM25_TOK_PATH.exists() or BM25Okapi is None) and (EMB_NPY_PATH.exists() or not USE_DENSE)) if have_cache: vectorizer = joblib.load(TFIDF_VECT_PATH) X_tfidf = joblib.load(TFIDF_MAT_PATH) meta = pd.read_parquet(RAG_META_PATH) bm25_toks = joblib.load(BM25_TOK_PATH) if BM25Okapi is not None else None emb = np.load(EMB_NPY_PATH) if (USE_DENSE and EMB_NPY_PATH.exists()) else None return vectorizer, X_tfidf, meta, bm25_toks, emb rows, all_tokens = [], [] pdf_paths = list(Path(pdf_dir).glob("**/*.pdf")) print(f"Indexing PDFs in {pdf_dir} — found {len(pdf_paths)} files.") for pdf in pdf_paths: raw = _extract_pdf_text(pdf) if not raw.strip(): continue for i, ch in enumerate(chunk_by_sentence_windows(raw, win_size=8, overlap=2)): rows.append({"doc_path": str(pdf), "chunk_id": i, "text": ch}) all_tokens.append(tokenize(ch)) if not rows: # create empty stub to avoid crashes; UI will message user to upload PDFs meta = pd.DataFrame(columns=["doc_path", "chunk_id", "text"]) vectorizer = None; X_tfidf = None; emb = None; all_tokens = None return vectorizer, X_tfidf, meta, all_tokens, emb meta = pd.DataFrame(rows) from sklearn.feature_extraction.text import TfidfVectorizer vectorizer = TfidfVectorizer( ngram_range=(1,2), min_df=1, max_df=0.95, sublinear_tf=True, smooth_idf=True, lowercase=True, token_pattern=r"(?u)\b\w[\w\-\./%+#]*\b" ) X_tfidf = vectorizer.fit_transform(meta["text"].tolist()) emb = None if USE_DENSE: try: st_model = _safe_init_st_model(os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2")) if st_model is not None: from sklearn.preprocessing import normalize as sk_normalize em = st_model.encode(meta["text"].tolist(), batch_size=64, show_progress_bar=False, convert_to_numpy=True) emb = sk_normalize(em) np.save(EMB_NPY_PATH, emb) except Exception as e: print("Dense embedding failed:", e) emb = None # Save artifacts joblib.dump(vectorizer, TFIDF_VECT_PATH) joblib.dump(X_tfidf, TFIDF_MAT_PATH) if BM25Okapi is not None: joblib.dump(all_tokens, BM25_TOK_PATH) meta.to_parquet(RAG_META_PATH, index=False) return vectorizer, X_tfidf, meta, all_tokens, emb tfidf_vectorizer, tfidf_matrix, rag_meta, bm25_tokens, emb_matrix = build_or_load_hybrid(LOCAL_PDF_DIR) bm25 = BM25Okapi(bm25_tokens) if (BM25Okapi is not None and bm25_tokens is not None) else None st_query_model = _safe_init_st_model(os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2")) def _extract_page(text_chunk: str) -> str: m = list(re.finditer(r"\[\[PAGE=(\d+)\]\]", text_chunk or "")) return (m[-1].group(1) if m else "?") def hybrid_search(query: str, k=8, w_tfidf=W_TFIDF_DEFAULT, w_bm25=W_BM25_DEFAULT, w_emb=W_EMB_DEFAULT): if rag_meta is None or rag_meta.empty: return pd.DataFrame() # Dense scores if USE_DENSE and st_query_model is not None and emb_matrix is not None and w_emb > 0: try: from sklearn.preprocessing import normalize as sk_normalize q_emb = st_query_model.encode([query], convert_to_numpy=True) q_emb = sk_normalize(q_emb)[0] dense_scores = emb_matrix @ q_emb except Exception as e: print("Dense query encoding failed:", e) dense_scores = np.zeros(len(rag_meta), dtype=float); w_emb = 0.0 else: dense_scores = np.zeros(len(rag_meta), dtype=float); w_emb = 0.0 # TF-IDF scores if tfidf_vectorizer is not None and tfidf_matrix is not None: q_vec = tfidf_vectorizer.transform([query]) tfidf_scores = (tfidf_matrix @ q_vec.T).toarray().ravel() else: tfidf_scores = np.zeros(len(rag_meta), dtype=float); w_tfidf = 0.0 # BM25 scores if bm25 is not None: q_tokens = [t.lower() for t in re.findall(r"[A-Za-z0-9_#+\-/\.%]+", query)] bm25_scores = np.array(bm25.get_scores(q_tokens), dtype=float) else: bm25_scores = np.zeros(len(rag_meta), dtype=float); w_bm25 = 0.0 def _norm(x): x = np.asarray(x, dtype=float) if np.allclose(x.max(), x.min()): return np.zeros_like(x) return (x - x.min()) / (x.max() - x.min()) s_dense = _norm(dense_scores) s_tfidf = _norm(tfidf_scores) s_bm25 = _norm(bm25_scores) total_w = (w_tfidf + w_bm25 + w_emb) or 1.0 w_tfidf, w_bm25, w_emb = w_tfidf/total_w, w_bm25/total_w, w_emb/total_w combo = w_emb * s_dense + w_tfidf * s_tfidf + w_bm25 * s_bm25 idx = np.argsort(-combo)[:k] hits = rag_meta.iloc[idx].copy() hits["score_dense"] = s_dense[idx] hits["score_tfidf"] = s_tfidf[idx] hits["score_bm25"] = s_bm25[idx] hits["score"] = combo[idx] return hits.reset_index(drop=True) def split_sentences(text: str) -> List[str]: sents = sent_split(text) return [s for s in sents if 6 <= len(s.split()) <= 60] def mmr_select_sentences(question: str, hits: pd.DataFrame, top_n=4, pool_per_chunk=6, lambda_div=0.7): pool = [] for _, row in hits.iterrows(): doc = Path(row["doc_path"]).name page = _extract_page(row["text"]) for s in split_sentences(row["text"])[:pool_per_chunk]: pool.append({"sent": s, "doc": doc, "page": page}) if not pool: return [] sent_texts = [p["sent"] for p in pool] # Embedding-based relevance if available, else TF-IDF use_dense = USE_DENSE and st_query_model is not None if use_dense: try: from sklearn.preprocessing import normalize as sk_normalize texts = [question] + sent_texts enc = st_query_model.encode(texts, convert_to_numpy=True) q_vec = sk_normalize(enc[:1])[0] S = sk_normalize(enc[1:]) rel = (S @ q_vec) def sim_fn(i, j): return float(S[i] @ S[j]) except Exception: use_dense = False if not use_dense: from sklearn.feature_extraction.text import TfidfVectorizer vect = TfidfVectorizer().fit(sent_texts + [question]) Q = vect.transform([question]); S = vect.transform(sent_texts) rel = (S @ Q.T).toarray().ravel() def sim_fn(i, j): return float((S[i] @ S[j].T).toarray()[0, 0]) selected, selected_idx = [], [] remain = list(range(len(pool))) first = int(np.argmax(rel)) selected.append(pool[first]); selected_idx.append(first); remain.remove(first) while len(selected) < top_n and remain: cand_scores = [] for i in remain: sim_to_sel = max(sim_fn(i, j) for j in selected_idx) if selected_idx else 0.0 score = lambda_div * rel[i] - (1 - lambda_div) * sim_to_sel cand_scores.append((score, i)) cand_scores.sort(reverse=True) best_i = cand_scores[0][1] selected.append(pool[best_i]); selected_idx.append(best_i); remain.remove(best_i) return selected def compose_extractive(selected: List[Dict[str, Any]]) -> str: if not selected: return "" return " ".join(f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected) def synthesize_with_llm(question: str, sentence_lines: List[str], model: str = None, temperature: float = 0.2) -> str: if OPENAI_API_KEY is None or OpenAI is None: return None client = OpenAI(api_key=OPENAI_API_KEY) model = model or OPENAI_MODEL SYSTEM_PROMPT = ( "You are a scientific assistant for self-sensing cementitious materials.\n" "Answer STRICTLY using the provided sentences.\n" "Do not invent facts. Keep it concise (3–6 sentences).\n" "Retain inline citations like (Doc.pdf, p.X) exactly as given." ) user_prompt = ( f"Question: {question}\n\n" f"Use ONLY these sentences to answer; keep their inline citations:\n" + "\n".join(f"- {s}" for s in sentence_lines) ) try: resp = client.responses.create( model=model, input=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=temperature, ) return getattr(resp, "output_text", None) or str(resp) except Exception: return None def rag_reply( question: str, k: int = 8, n_sentences: int = 4, include_passages: bool = False, use_llm: bool = False, model: str = None, temperature: float = 0.2, strict_quotes_only: bool = False, w_tfidf: float = W_TFIDF_DEFAULT, w_bm25: float = W_BM25_DEFAULT, w_emb: float = W_EMB_DEFAULT ) -> str: hits = hybrid_search(question, k=k, w_tfidf=w_tfidf, w_bm25=w_bm25, w_emb=w_emb) if hits is None or hits.empty: return "No indexed PDFs found. Upload PDFs to the 'papers/' folder and reload the Space." selected = mmr_select_sentences(question, hits, top_n=int(n_sentences), pool_per_chunk=6, lambda_div=0.7) header_cites = "; ".join(f"{Path(r['doc_path']).name} (p.{_extract_page(r['text'])})" for _, r in hits.head(6).iterrows()) srcs = {Path(r['doc_path']).name for _, r in hits.iterrows()} coverage_note = "" if len(srcs) >= 3 else f"\n\n> Note: Only {len(srcs)} unique source(s) contributed. Add more PDFs or increase Top-K." if strict_quotes_only: if not selected: return f"**Quoted Passages:**\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) + f"\n\n**Citations:** {header_cites}{coverage_note}" msg = "**Quoted Passages:**\n- " + "\n- ".join(f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected) msg += f"\n\n**Citations:** {header_cites}{coverage_note}" if include_passages: msg += "\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) return msg extractive = compose_extractive(selected) if use_llm and selected: lines = [f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected] llm_text = synthesize_with_llm(question, lines, model=model, temperature=temperature) if llm_text: msg = f"**Answer (LLM synthesis):** {llm_text}\n\n**Citations:** {header_cites}{coverage_note}" if include_passages: msg += "\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) return msg if not extractive: return f"**Answer:** Here are relevant passages.\n\n**Citations:** {header_cites}{coverage_note}\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) msg = f"**Answer:** {extractive}\n\n**Citations:** {header_cites}{coverage_note}" if include_passages: msg += "\n\n---\n" + "\n\n".join(hits['text'].tolist()[:2]) return msg def rag_chat_fn(message, history, top_k, n_sentences, include_passages, use_llm, model_name, temperature, strict_quotes_only, w_tfidf, w_bm25, w_emb): if not message or not message.strip(): return "Ask a literature question (e.g., *How does CNT length affect gauge factor?*)" try: return rag_reply( question=message, k=int(top_k), n_sentences=int(n_sentences), include_passages=bool(include_passages), use_llm=bool(use_llm), model=(model_name or None), temperature=float(temperature), strict_quotes_only=bool(strict_quotes_only), w_tfidf=float(w_tfidf), w_bm25=float(w_bm25), w_emb=float(w_emb), ) except Exception as e: return f"RAG error: {e}" # ========================= UI (predictor styling kept) ========================= CSS = """ /* Blue to green gradient background */ .gradio-container { background: linear-gradient(135deg, #1e3a8a 0%, #166534 60%, #15803d 100%) !important; } * {font-family: ui-sans-serif, system-ui, -apple-system, 'Segoe UI', Roboto, 'Helvetica Neue', Arial;} .card {background: rgba(255,255,255,0.07) !important; border: 1px solid rgba(255,255,255,0.12);} label.svelte-1ipelgc {color: #e0f2fe !important;} """ theme = gr.themes.Soft( primary_hue="blue", neutral_hue="green" ).set( body_background_fill="#1e3a8a", body_text_color="#e0f2fe", input_background_fill="#172554", input_border_color="#1e40af", button_primary_background_fill="#2563eb", button_primary_text_color="#ffffff", button_secondary_background_fill="#14532d", button_secondary_text_color="#ecfdf5", ) with gr.Blocks(css=CSS, theme=theme, fill_height=True) as demo: gr.Markdown( "
"
"Left tab: ML prediction for Stress Gauge Factor (kept identical to your deployed predictor). "
"Right tab: Literature Q&A via Hybrid RAG (BM25 + TF-IDF + optional dense) with MMR sentence selection. "
"Upload PDFs into papers/ in your Space repo."
"
papers/ then reload the Space. "
"Answers cite (Doc.pdf, p.X). Toggle strict quotes or optional LLM paraphrasing."
)
with gr.Row():
top_k = gr.Slider(5, 12, value=8, step=1, label="Top-K chunks")
n_sentences = gr.Slider(2, 6, value=4, step=1, label="Answer length (sentences)")
include_passages = gr.Checkbox(value=False, label="Include supporting passages")
with gr.Accordion("Retriever weights (advanced)", open=False):
w_tfidf = gr.Slider(0.0, 1.0, value=W_TFIDF_DEFAULT, step=0.05, label="TF-IDF weight")
w_bm25 = gr.Slider(0.0, 1.0, value=W_BM25_DEFAULT, step=0.05, label="BM25 weight")
w_emb = gr.Slider(0.0, 1.0, value=W_EMB_DEFAULT, step=0.05, label="Dense weight (set 0 if disabled)")
with gr.Accordion("LLM & Controls", open=False):
strict_quotes_only = gr.Checkbox(value=False, label="Strict quotes only (no paraphrasing)")
use_llm = gr.Checkbox(value=False, label="Use LLM to paraphrase selected sentences")
model_name = gr.Textbox(value=os.getenv("OPENAI_MODEL", OPENAI_MODEL),
label="LLM model", placeholder="e.g., gpt-5 or gpt-5-mini")
temperature = gr.Slider(0.0, 1.0, value=0.2, step=0.05, label="Temperature")
gr.ChatInterface(
fn=rag_chat_fn,
additional_inputs=[top_k, n_sentences, include_passages, use_llm, model_name,
temperature, strict_quotes_only, w_tfidf, w_bm25, w_emb],
title="Literature Q&A",
description="Hybrid retrieval with diversity. Answers carry inline (Doc, p.X) citations. Toggle strict/LLM modes."
)
# ------------- Launch -------------
if __name__ == "__main__":
# queue() helps HF Spaces with concurrency; show_error suggests upload PDFs if none
demo.queue().launch()