Spaces:
Sleeping
Sleeping
| # ================================================================ | |
| # Self-Sensing Concrete Assistant — Hybrid RAG + XGB + (opt) GPT-5 | |
| # FIXED for Windows/Conda import issues (transformers/quantizers) | |
| # - Pins compatible versions (transformers 4.44.2, sbert 2.7.0, torch 2.x) | |
| # - Disables TF/Flax backends; safe fallbacks if dense fails | |
| # - Hybrid retrieval (BM25 + TF-IDF + Dense*) + MMR sentence selection | |
| # - Optional OA PDF harvest (Crossref + Unpaywall) OR local folder | |
| # - Optional GPT-5 synthesis strictly from selected cited sentences | |
| # - Gradio UI with Prediction + Literature Q&A tabs | |
| # ================================================================ | |
| # ---------------------- MUST RUN THESE FLAGS FIRST ---------------------- | |
| import os | |
| os.environ["TRANSFORMERS_NO_TF"] = "1" # don't import TensorFlow | |
| os.environ["TRANSFORMERS_NO_FLAX"] = "1" # don't import Flax/JAX | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| # ------------------------------- Imports ----------------------------------- | |
| import re, json, time, joblib, warnings, math, hashlib, urllib.parse, requests | |
| from pathlib import Path | |
| from typing import List, Dict | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.preprocessing import RobustScaler, OneHotEncoder | |
| from sklearn.preprocessing import normalize as sk_normalize | |
| from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error | |
| from sklearn.feature_selection import VarianceThreshold | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from xgboost import XGBRegressor | |
| from pypdf import PdfReader | |
| import fitz # PyMuPDF | |
| import gradio as gr | |
| USE_DENSE = True | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except Exception as e: | |
| USE_DENSE = False | |
| print("⚠️ sentence-transformers unavailable; continuing with TF-IDF + BM25 only.\n", e) | |
| from rank_bm25 import BM25Okapi | |
| from openai import OpenAI | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| # ============================ Config ======================================= | |
| # --- Data & model paths --- | |
| DATA_PATH = "july3.xlsx" # <- update if needed | |
| LOCAL_PDF_DIR = Path("/papers") # <- optional local folder | |
| # --- RAG artifacts (kept in working dir) --- | |
| ARTIFACT_DIR = Path("rag_artifacts"); ARTIFACT_DIR.mkdir(exist_ok=True) | |
| MODEL_OUT = "stress_gf_xgb.joblib" | |
| TFIDF_VECT_PATH = ARTIFACT_DIR / "tfidf_vectorizer.joblib" | |
| TFIDF_MAT_PATH = ARTIFACT_DIR / "tfidf_matrix.joblib" | |
| BM25_TOK_PATH = ARTIFACT_DIR / "bm25_tokens.joblib" | |
| EMB_NPY_PATH = ARTIFACT_DIR / "chunk_embeddings.npy" | |
| RAG_META_PATH = ARTIFACT_DIR / "chunks.parquet" | |
| # --- Online OA sourcing (set USE_ONLINE_SOURCES=False to rely only on local PDFs) --- | |
| USE_ONLINE_SOURCES = True | |
| DOWNLOAD_DIR = Path("oa_pdfs"); DOWNLOAD_DIR.mkdir(exist_ok=True) | |
| KEYWORDS = [ | |
| "self-sensing concrete gauge factor", | |
| "piezoresistive cementitious composite", | |
| "carbon nanotube cement gauge factor", | |
| "graphene nanoplatelet self-sensing mortar", | |
| ] | |
| YEAR_FROM, YEAR_TO = 2010, 2025 | |
| MAX_DOIS_PER_QUERY = 25 | |
| # Required identifiers for APIs (use institutional email) | |
| UNPAYWALL_EMAIL = os.getenv("UNPAYWALL_EMAIL", "your_email@lsu.edu") | |
| CROSSREF_MAILTO = os.getenv("CROSSREF_MAILTO", "your_email@lsu.edu") | |
| # --- Embedding model (fast CPU) --- | |
| EMB_MODEL_NAME = os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2") | |
| # --- OpenAI (optional LLM synthesis) --- | |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") # e.g., "gpt-5-mini" | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", None) # set env var to enable LLM | |
| # --- Retrieval weights (UI defaults adapt if dense disabled) --- | |
| W_TFIDF_DEFAULT = 0.50 if not USE_DENSE else 0.30 | |
| W_BM25_DEFAULT = 0.50 if not USE_DENSE else 0.30 | |
| W_EMB_DEFAULT = 0.00 if not USE_DENSE else 0.40 | |
| RANDOM_SEED = 42 | |
| # ====================== OA PDF Harvesting (Crossref + Unpaywall) =========== | |
| def crossref_search_dois(query, year_from=YEAR_FROM, year_to=YEAR_TO, rows=MAX_DOIS_PER_QUERY): | |
| base = "https://api.crossref.org/works" | |
| params = { | |
| "query": query, | |
| "filter": f"type:journal-article,from-pub-date:{year_from}-01-01,until-pub-date:{year_to}-12-31", | |
| "rows": rows, | |
| "mailto": CROSSREF_MAILTO | |
| } | |
| r = requests.get(base, params=params, timeout=30) | |
| r.raise_for_status() | |
| items = r.json().get("message", {}).get("items", []) | |
| dois = [] | |
| for it in items: | |
| doi = it.get("DOI") | |
| title = " ".join(it.get("title", [])[:1]).strip() if it.get("title") else "" | |
| if doi: | |
| dois.append((doi, title)) | |
| return dois | |
| def unpaywall_pdf_url(doi, email=UNPAYWALL_EMAIL): | |
| url = f"https://api.unpaywall.org/v2/{urllib.parse.quote(doi)}?email={urllib.parse.quote(email)}" | |
| r = requests.get(url, timeout=30) | |
| if r.status_code != 200: | |
| return None | |
| data = r.json() | |
| loc = data.get("best_oa_location") or {} | |
| return loc.get("url_for_pdf") or loc.get("url") | |
| def _sanitize_filename(s): | |
| return "".join(c for c in s if c.isalnum() or c in "._- ()").strip()[:180] or "paper" | |
| def fetch_oa_pdfs(keywords=KEYWORDS, out_dir=DOWNLOAD_DIR, throttle=1.2): | |
| saved = [] | |
| for q in keywords: | |
| try: | |
| dois = crossref_search_dois(q) | |
| except Exception as e: | |
| print(f"[Crossref] {q}: {e}") | |
| continue | |
| for doi, title in dois: | |
| try: | |
| pdf_url = unpaywall_pdf_url(doi) | |
| except Exception as e: | |
| print(f"[Unpaywall] {doi}: {e}") | |
| pdf_url = None | |
| time.sleep(throttle) | |
| if not pdf_url: | |
| continue | |
| fname = _sanitize_filename(f"{title or doi}.pdf") | |
| fp = out_dir / fname | |
| if fp.exists(): | |
| saved.append(fp); continue | |
| try: | |
| rr = requests.get(pdf_url, timeout=60) | |
| ctype = rr.headers.get("Content-Type", "") | |
| if rr.status_code == 200 and "pdf" in ctype.lower(): | |
| with open(fp, "wb") as f: | |
| f.write(rr.content) | |
| print(f"Saved: {fp.name}") | |
| saved.append(fp) | |
| else: | |
| print(f"Skipped (not pdf): {pdf_url}") | |
| except Exception as e: | |
| print(f"[Download] {pdf_url}: {e}") | |
| print(f"Total OA PDFs cached: {len(saved)}") | |
| return saved | |
| def get_pdf_source_dir(): | |
| if USE_ONLINE_SOURCES: | |
| fetch_oa_pdfs() # idempotent; skips existing | |
| return DOWNLOAD_DIR | |
| else: | |
| return LOCAL_PDF_DIR | |
| # ==================== XGB Pipeline (Prediction) ============================ | |
| def make_onehot(): | |
| try: | |
| return OneHotEncoder(handle_unknown="ignore", sparse_output=False) | |
| except TypeError: | |
| return OneHotEncoder(handle_unknown="ignore", sparse=False) | |
| def rmse(y_true, y_pred): | |
| return mean_squared_error(y_true, y_pred) | |
| def evaluate(m, X, y_log, name="Model"): | |
| y_pred_log = m.predict(X) | |
| y_pred = np.expm1(y_pred_log) | |
| y_true = np.expm1(y_log) | |
| r2 = r2_score(y_true, y_pred) | |
| r = rmse(y_true, y_pred) | |
| mae = mean_absolute_error(y_true, y_pred) | |
| print(f"{name}: R²={r2:.3f}, RMSE={r:.3f}, MAE={mae:.3f}") | |
| return r2, r, mae | |
| # --- Load data | |
| df = pd.read_excel(DATA_PATH) | |
| df.columns = df.columns.str.strip() | |
| drop_cols = [ | |
| 'Loading rate (MPa/s)', 'Voltage (V) AC\\DC', 'Elastic Modulus (GPa)', 'Duration (hrs) of Dying Method' | |
| ] | |
| df = df.drop(columns=[c for c in drop_cols if c in df.columns], errors='ignore') | |
| main_variables = [ | |
| 'Filler1_Type', 'Filler1_Diameter_um', 'Filler1_Length_mm', | |
| 'AvgFiller_Density_g/cm3', 'AvgFiller_weight_%', 'AvgFiller_Volume_%', | |
| 'Filler1_Dimensions', 'Filler2_Type', 'Filler2_Diameter_um', 'Filler2_Length_mm', | |
| 'Filler2_Dimensions', 'Sample_Volume_mm3', 'Electrode/Probe_Count', 'Electrode/Probe_Material', | |
| 'W/B', 'S/B', 'GaugeLength_mm', 'Curing_Conditions', 'Num_ConductiveFillers', | |
| 'DryingTemperature_C', 'DryingDuration_hrs', 'LoadingRate_MPa/s', | |
| 'ElasticModulus_Gpa', 'Voltage_Type', 'Applied_Voltage_V' | |
| ] | |
| target_col = 'Stress_GF_Mpa' | |
| df = df[main_variables + [target_col]].copy() | |
| df = df.dropna(subset=[target_col]) | |
| df = df[df[target_col] > 0] | |
| numeric_cols = [ | |
| 'Filler1_Diameter_um', 'Filler1_Length_mm', 'AvgFiller_Density_g/cm3', | |
| 'AvgFiller_weight_%', 'AvgFiller_Volume_%', 'Filler2_Diameter_um', | |
| 'Filler2_Length_mm', 'Sample_Volume_mm3', 'Electrode/Probe_Count', | |
| 'W/B', 'S/B', 'GaugeLength_mm', 'Num_ConductiveFillers', | |
| 'DryingTemperature_C', 'DryingDuration_hrs', 'LoadingRate_MPa/s', | |
| 'ElasticModulus_Gpa', 'Applied_Voltage_V' | |
| ] | |
| categorical_cols = [ | |
| 'Filler1_Type', 'Filler1_Dimensions', 'Filler2_Type', 'Filler2_Dimensions', | |
| 'Electrode/Probe_Material', 'Curing_Conditions', 'Voltage_Type' | |
| ] | |
| for c in numeric_cols: | |
| df[c] = pd.to_numeric(df[c], errors='coerce') | |
| for c in categorical_cols: | |
| df[c] = df[c].astype(str) | |
| vt = VarianceThreshold(threshold=1e-3) | |
| vt.fit(df[numeric_cols]) | |
| numeric_cols = [c for c in numeric_cols if c not in df[numeric_cols].columns[vt.variances_ < 1e-3]] | |
| corr = df[numeric_cols].corr().abs() | |
| upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool)) | |
| to_drop = [c for c in upper.columns if any(upper[c] > 0.95)] | |
| numeric_cols = [c for c in numeric_cols if c not in to_drop] | |
| X = df[main_variables].copy() | |
| y = np.log1p(df[target_col]) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, random_state=RANDOM_SEED | |
| ) | |
| BEST_PARAMS = { | |
| "regressor__subsample": 1.0, | |
| "regressor__reg_lambda": 5, | |
| "regressor__reg_alpha": 0.05, | |
| "regressor__n_estimators": 300, | |
| "regressor__max_depth": 6, | |
| "regressor__learning_rate": 0.1, | |
| "regressor__gamma": 0, | |
| "regressor__colsample_bytree": 1.0 | |
| } | |
| def train_and_save_model(): | |
| num_tf = Pipeline([('imputer', SimpleImputer(strategy='median')), | |
| ('scaler', RobustScaler())]) | |
| cat_tf = Pipeline([('imputer', SimpleImputer(strategy='most_frequent')), | |
| ('onehot', make_onehot())]) | |
| preprocessor = ColumnTransformer([ | |
| ('num', num_tf, numeric_cols), | |
| ('cat', cat_tf, categorical_cols) | |
| ]) | |
| xgb_pipe = Pipeline([ | |
| ('preprocessor', preprocessor), | |
| ('regressor', XGBRegressor(random_state=RANDOM_SEED, n_jobs=-1, verbosity=0)) | |
| ]) | |
| xgb_pipe.set_params(**BEST_PARAMS).fit(X_train, y_train) | |
| joblib.dump(xgb_pipe, MODEL_OUT) | |
| print(f"✅ Trained new model and saved → {MODEL_OUT}") | |
| return xgb_pipe | |
| def load_or_train_model(): | |
| if os.path.exists(MODEL_OUT): | |
| print(f"📂 Loading existing model from {MODEL_OUT}") | |
| return joblib.load(MODEL_OUT) | |
| else: | |
| print("⚠️ No saved model found. Training a new one...") | |
| return train_and_save_model() | |
| xgb_pipe = load_or_train_model() | |
| # ======================= Hybrid RAG Indexing ================================ | |
| _SENT_SPLIT_RE = re.compile(r"(?<=[.!?])\s+|\n+") | |
| TOKEN_RE = re.compile(r"[A-Za-z0-9_#+\-/\.%]+") | |
| def sent_split(text: str) -> List[str]: | |
| sents = [s.strip() for s in _SENT_SPLIT_RE.split(text) if s.strip()] | |
| return [s for s in sents if len(s.split()) >= 5] | |
| def tokenize(text: str) -> List[str]: | |
| return [t.lower() for t in TOKEN_RE.findall(text)] | |
| def extract_text_pymupdf(pdf_path: Path) -> str: | |
| try: | |
| doc = fitz.open(pdf_path) | |
| buff = [] | |
| for i, page in enumerate(doc): | |
| txt = page.get_text("text") or "" | |
| buff.append(f"[[PAGE={i+1}]]\n{txt}") | |
| return "\n\n".join(buff) | |
| except Exception: | |
| # Fallback to PyPDF | |
| try: | |
| reader = PdfReader(str(pdf_path)) | |
| buff = [] | |
| for i, p in enumerate(reader.pages): | |
| txt = p.extract_text() or "" | |
| buff.append(f"[[PAGE={i+1}]]\n{txt}") | |
| return "\n\n".join(buff) | |
| except Exception as e: | |
| print(f"PDF read error ({pdf_path}): {e}") | |
| return "" | |
| def chunk_by_sentence_windows(text: str, win_size=8, overlap=2) -> List[str]: | |
| sents = sent_split(text) | |
| chunks = [] | |
| step = max(1, win_size - overlap) | |
| for i in range(0, len(sents), step): | |
| window = sents[i:i+win_size] | |
| if not window: break | |
| chunks.append(" ".join(window)) | |
| return chunks | |
| def _safe_init_st_model(name: str): | |
| """Try to init SentenceTransformer; on failure, disable dense and return None.""" | |
| global USE_DENSE | |
| if not USE_DENSE: | |
| return None | |
| try: | |
| m = SentenceTransformer(name) | |
| return m | |
| except Exception as e: | |
| print("⚠️ Could not initialize SentenceTransformer; disabling dense embeddings.\n", e) | |
| USE_DENSE = False | |
| return None | |
| def build_or_load_hybrid(pdf_dir: Path): | |
| have_cache = (TFIDF_VECT_PATH.exists() and TFIDF_MAT_PATH.exists() | |
| and BM25_TOK_PATH.exists() and RAG_META_PATH.exists() | |
| and (EMB_NPY_PATH.exists() or not USE_DENSE)) | |
| if have_cache: | |
| vectorizer = joblib.load(TFIDF_VECT_PATH) | |
| X_tfidf = joblib.load(TFIDF_MAT_PATH) | |
| meta = pd.read_parquet(RAG_META_PATH) | |
| bm25_toks = joblib.load(BM25_TOK_PATH) | |
| emb = np.load(EMB_NPY_PATH) if (USE_DENSE and EMB_NPY_PATH.exists()) else None | |
| print("Loaded hybrid index.") | |
| return vectorizer, X_tfidf, meta, bm25_toks, emb | |
| rows, all_tokens = [], [] | |
| pdf_paths = list(Path(pdf_dir).glob("**/*.pdf")) | |
| if LOCAL_PDF_DIR.exists() and LOCAL_PDF_DIR != pdf_dir: | |
| pdf_paths += list(LOCAL_PDF_DIR.glob("**/*.pdf")) | |
| print(f"Indexing PDFs from {pdf_dir} (+ local merge if present). Found {len(pdf_paths)} files.") | |
| for pdf in pdf_paths: | |
| raw = extract_text_pymupdf(pdf) | |
| if not raw.strip(): | |
| continue | |
| for i, ch in enumerate(chunk_by_sentence_windows(raw, win_size=8, overlap=2)): | |
| rows.append({"doc_path": str(pdf), "chunk_id": i, "text": ch}) | |
| all_tokens.append(tokenize(ch)) | |
| if not rows: | |
| raise RuntimeError(f"No PDF text found under: {pdf_dir}") | |
| meta = pd.DataFrame(rows) | |
| # TF-IDF | |
| vectorizer = TfidfVectorizer( | |
| ngram_range=(1,2), | |
| min_df=1, max_df=0.95, | |
| sublinear_tf=True, smooth_idf=True, | |
| lowercase=True, | |
| token_pattern=r"(?u)\b\w[\w\-\./%+#]*\b" | |
| ) | |
| X_tfidf = vectorizer.fit_transform(meta["text"].tolist()) | |
| # Dense (optional) | |
| emb = None | |
| if USE_DENSE: | |
| try: | |
| st_model_tmp = _safe_init_st_model(EMB_MODEL_NAME) | |
| if st_model_tmp is not None: | |
| em = st_model_tmp.encode(meta["text"].tolist(), batch_size=64, show_progress_bar=False, convert_to_numpy=True) | |
| emb = sk_normalize(em) | |
| np.save(EMB_NPY_PATH, emb) | |
| except Exception as e: | |
| emb = None | |
| print("⚠️ Dense embeddings failed; continuing without them.\n", e) | |
| # Save artifacts | |
| joblib.dump(vectorizer, TFIDF_VECT_PATH) | |
| joblib.dump(X_tfidf, TFIDF_MAT_PATH) | |
| joblib.dump(all_tokens, BM25_TOK_PATH) | |
| meta.to_parquet(RAG_META_PATH, index=False) | |
| print(f"Indexed {len(meta)} chunks from {meta['doc_path'].nunique()} PDFs.") | |
| return vectorizer, X_tfidf, meta, all_tokens, emb | |
| # Build hybrid index | |
| pdf_source_dir = get_pdf_source_dir() | |
| tfidf_vectorizer, tfidf_matrix, rag_meta, bm25_tokens, emb_matrix = build_or_load_hybrid(pdf_source_dir) | |
| bm25 = BM25Okapi(bm25_tokens) | |
| st_query_model = _safe_init_st_model(EMB_MODEL_NAME) # safe init; may set USE_DENSE=False | |
| # If dense failed at runtime, update default weights in case UI uses them | |
| if not USE_DENSE: | |
| W_TFIDF_DEFAULT, W_BM25_DEFAULT, W_EMB_DEFAULT = 0.50, 0.50, 0.00 | |
| def _extract_page(text_chunk: str) -> str: | |
| m = list(re.finditer(r"\[\[PAGE=(\d+)\]\]", text_chunk)) | |
| return (m[-1].group(1) if m else "?") | |
| # ---------------------- Hybrid search -------------------------------------- | |
| def hybrid_search(query: str, k=8, w_tfidf=W_TFIDF_DEFAULT, w_bm25=W_BM25_DEFAULT, w_emb=W_EMB_DEFAULT): | |
| # Dense (optional) | |
| if USE_DENSE and st_query_model is not None and emb_matrix is not None and w_emb > 0: | |
| try: | |
| q_emb = st_query_model.encode([query], convert_to_numpy=True) | |
| q_emb = sk_normalize(q_emb)[0] | |
| dense_scores = emb_matrix @ q_emb | |
| except Exception as e: | |
| print("⚠️ Dense query encoding failed; ignoring dense this run.\n", e) | |
| dense_scores = np.zeros(len(rag_meta), dtype=float) | |
| w_emb = 0.0 | |
| else: | |
| dense_scores = np.zeros(len(rag_meta), dtype=float) | |
| w_emb = 0.0 # force off | |
| # TF-IDF | |
| q_vec = tfidf_vectorizer.transform([query]) | |
| tfidf_scores = (tfidf_matrix @ q_vec.T).toarray().ravel() | |
| # BM25 | |
| q_tokens = [t.lower() for t in TOKEN_RE.findall(query)] | |
| bm25_scores = np.array(bm25.get_scores(q_tokens), dtype=float) | |
| def _norm(x): | |
| x = np.asarray(x, dtype=float) | |
| if np.allclose(x.max(), x.min()): | |
| return np.zeros_like(x) | |
| return (x - x.min()) / (x.max() - x.min()) | |
| s_dense = _norm(dense_scores) | |
| s_tfidf = _norm(tfidf_scores) | |
| s_bm25 = _norm(bm25_scores) | |
| total_w = (w_tfidf + w_bm25 + w_emb) or 1.0 | |
| w_tfidf, w_bm25, w_emb = w_tfidf/total_w, w_bm25/total_w, w_emb/total_w | |
| combo = w_emb * s_dense + w_tfidf * s_tfidf + w_bm25 * s_bm25 | |
| idx = np.argsort(-combo)[:k] | |
| hits = rag_meta.iloc[idx].copy() | |
| hits["score_dense"] = s_dense[idx] | |
| hits["score_tfidf"] = s_tfidf[idx] | |
| hits["score_bm25"] = s_bm25[idx] | |
| hits["score"] = combo[idx] | |
| return hits.reset_index(drop=True) | |
| # -------------- Sentence selection with MMR (diversity) -------------------- | |
| def split_sentences(text: str) -> List[str]: | |
| sents = sent_split(text) | |
| return [s for s in sents if 6 <= len(s.split()) <= 60] | |
| def mmr_select_sentences(question: str, hits: pd.DataFrame, top_n=4, pool_per_chunk=6, lambda_div=0.7): | |
| pool = [] | |
| for _, row in hits.iterrows(): | |
| doc = Path(row["doc_path"]).name | |
| page = _extract_page(row["text"]) | |
| for s in split_sentences(row["text"])[:pool_per_chunk]: | |
| pool.append({"sent": s, "doc": doc, "page": page}) | |
| if not pool: | |
| return [] | |
| sent_texts = [p["sent"] for p in pool] | |
| if USE_DENSE and st_query_model is not None: | |
| try: | |
| texts = [question] + sent_texts | |
| enc = st_query_model.encode(texts, convert_to_numpy=True) | |
| q_vec = sk_normalize(enc[:1])[0] | |
| S = sk_normalize(enc[1:]) | |
| rel = (S @ q_vec) | |
| def sim_fn(i, j): return float(S[i] @ S[j]) | |
| except Exception as e: | |
| print("⚠️ Dense sentence encoding failed; falling back to TF-IDF for MMR.\n", e) | |
| Q = tfidf_vectorizer.transform([question]) | |
| S = tfidf_vectorizer.transform(sent_texts) | |
| rel = (S @ Q.T).toarray().ravel() | |
| def sim_fn(i, j): return float((S[i] @ S[j].T).toarray()[0, 0]) | |
| else: | |
| Q = tfidf_vectorizer.transform([question]) | |
| S = tfidf_vectorizer.transform(sent_texts) | |
| rel = (S @ Q.T).toarray().ravel() | |
| def sim_fn(i, j): return float((S[i] @ S[j].T).toarray()[0, 0]) | |
| selected, selected_idx = [], [] | |
| remain = list(range(len(pool))) | |
| first = int(np.argmax(rel)) | |
| selected.append(pool[first]); selected_idx.append(first); remain.remove(first) | |
| while len(selected) < top_n and remain: | |
| cand_scores = [] | |
| for i in remain: | |
| sim_to_sel = max(sim_fn(i, j) for j in selected_idx) if selected_idx else 0.0 | |
| score = lambda_div * rel[i] - (1 - lambda_div) * sim_to_sel | |
| cand_scores.append((score, i)) | |
| cand_scores.sort(reverse=True) | |
| best_i = cand_scores[0][1] | |
| selected.append(pool[best_i]); selected_idx.append(best_i); remain.remove(best_i) | |
| return selected | |
| def compose_extractive(selected: List[Dict]) -> str: | |
| if not selected: | |
| return "" | |
| lines = [f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected] | |
| return " ".join(lines) | |
| # ------------------- Optional GPT-5 synthesis ------------------------------ | |
| def synthesize_with_llm(question: str, sentence_lines: List[str], model: str = None, temperature: float = 0.2) -> str: | |
| if OPENAI_API_KEY is None: | |
| return None # not configured → skip synthesis | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| if model is None: | |
| model = OPENAI_MODEL | |
| SYSTEM_PROMPT = ( | |
| "You are a scientific assistant for self-sensing cementitious materials.\n" | |
| "Answer STRICTLY using the provided sentences.\n" | |
| "Do not invent facts. Keep it concise (3–6 sentences).\n" | |
| "Retain inline citations like (Doc.pdf, p.X) exactly as given." | |
| ) | |
| user_prompt = ( | |
| f"Question: {question}\n\n" | |
| f"Use ONLY these sentences to answer; keep their inline citations:\n" + | |
| "\n".join(f"- {s}" for s in sentence_lines) | |
| ) | |
| try: | |
| resp = client.responses.create( | |
| model=model, | |
| input=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=temperature, | |
| ) | |
| return getattr(resp, "output_text", None) or str(resp) | |
| except Exception: | |
| return None | |
| # ------------------------ RAG reply ---------------------------------------- | |
| def rag_reply( | |
| question: str, | |
| k: int = 8, | |
| n_sentences: int = 4, | |
| include_passages: bool = False, | |
| use_llm: bool = False, | |
| model: str = None, | |
| temperature: float = 0.2, | |
| strict_quotes_only: bool = False, | |
| w_tfidf: float = W_TFIDF_DEFAULT, | |
| w_bm25: float = W_BM25_DEFAULT, | |
| w_emb: float = W_EMB_DEFAULT | |
| ) -> str: | |
| hits = hybrid_search(question, k=k, w_tfidf=w_tfidf, w_bm25=w_bm25, w_emb=w_emb) | |
| if hits.empty: | |
| return "No relevant passages found. Try rephrasing or adding more PDFs." | |
| selected = mmr_select_sentences(question, hits, top_n=int(n_sentences), pool_per_chunk=6, lambda_div=0.7) | |
| header_cites = "; ".join( | |
| f"{Path(r['doc_path']).name} (p.{_extract_page(r['text'])})" for _, r in hits.head(6).iterrows() | |
| ) | |
| # Coverage note (helps debugging thin answers) | |
| srcs = {Path(r['doc_path']).name for _, r in hits.iterrows()} | |
| coverage_note = "" | |
| if len(srcs) < 3: | |
| coverage_note = f"\n\n> Note: Only {len(srcs)} unique source(s) contributed. Add more PDFs or increase Top-K." | |
| if strict_quotes_only: | |
| if not selected: | |
| return f"**Quoted Passages:**\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) + \ | |
| f"\n\n**Citations:** {header_cites}{coverage_note}" | |
| msg = "**Quoted Passages:**\n- " + "\n- ".join(f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected) | |
| msg += f"\n\n**Citations:** {header_cites}{coverage_note}" | |
| if include_passages: | |
| msg += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| return msg | |
| # Extractive baseline | |
| extractive = compose_extractive(selected) | |
| # Optional LLM synthesis | |
| if use_llm and selected: | |
| lines = [f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected] | |
| llm_text = synthesize_with_llm(question, lines, model=model, temperature=temperature) | |
| if llm_text: | |
| msg = f"**Answer (GPT-5 synthesis):** {llm_text}\n\n**Citations:** {header_cites}{coverage_note}" | |
| if include_passages: | |
| msg += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| return msg | |
| # Fallback: purely extractive | |
| if not extractive: | |
| return f"**Answer:** Here are relevant passages.\n\n**Citations:** {header_cites}{coverage_note}\n\n---\n" + \ | |
| "\n\n".join(hits["text"].tolist()[:2]) | |
| msg = f"**Answer:** {extractive}\n\n**Citations:** {header_cites}{coverage_note}" | |
| if include_passages: | |
| msg += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| return msg | |
| # =========================== Gradio UI ===================================== | |
| INPUT_COLS = [ | |
| "Filler1_Type", "Filler1_Dimensions", "Filler1_Diameter_um", "Filler1_Length_mm", | |
| "Filler2_Type", "Filler2_Dimensions", "Filler2_Diameter_um", "Filler2_Length_mm", | |
| "AvgFiller_Density_g/cm3", "AvgFiller_weight_%", "AvgFiller_Volume_%", | |
| "Sample_Volume_mm3", "Electrode/Probe_Count", "Electrode/Probe_Material", | |
| "W/B", "S/B", "GaugeLength_mm", "Curing_Conditions", "Num_ConductiveFillers", | |
| "DryingTemperature_C", "DryingDuration_hrs", "LoadingRate_MPa/s", | |
| "ElasticModulus_Gpa", "Voltage_Type", "Applied_Voltage_V" | |
| ] | |
| NUMERIC_INPUTS = { | |
| "Filler1_Diameter_um","Filler1_Length_mm","Filler2_Diameter_um","Filler2_Length_mm", | |
| "AvgFiller_Density_g/cm3","AvgFiller_weight_%","AvgFiller_Volume_%","Sample_Volume_mm3", | |
| "Electrode/Probe_Count","W/B","S/B","GaugeLength_mm","Num_ConductiveFillers", | |
| "DryingTemperature_C","DryingDuration_hrs","LoadingRate_MPa/s","ElasticModulus_Gpa", | |
| "Applied_Voltage_V" | |
| } | |
| CAT_DIM_CHOICES = ["0D","1D","2D","3D","NA"] | |
| def _coerce_row(args): | |
| row = {c: v for c, v in zip(INPUT_COLS, args)} | |
| clean = {} | |
| for k, v in row.items(): | |
| if k in NUMERIC_INPUTS: | |
| if v in ("", None): clean[k] = None | |
| else: | |
| try: clean[k] = float(v) | |
| except: clean[k] = None | |
| else: | |
| clean[k] = "" if v is None else str(v).strip() | |
| return pd.DataFrame([clean], columns=INPUT_COLS) | |
| def _load_model(): | |
| if not os.path.exists(MODEL_OUT): | |
| raise FileNotFoundError(f"Model file not found at '{MODEL_OUT}'. Retrain above.") | |
| return joblib.load(MODEL_OUT) | |
| def predict_fn(*args): | |
| try: | |
| mdl = _load_model() | |
| X_new = _coerce_row(args) | |
| y_log = mdl.predict(X_new) | |
| y = float(np.expm1(y_log)[0]) | |
| if -1e-8 < y < 0: y = 0.0 | |
| return y | |
| except Exception as e: | |
| return f"Error during prediction: {e}" | |
| def rag_chat_fn(message, history, top_k, n_sentences, include_passages, | |
| use_llm, model_name, temperature, strict_quotes_only, | |
| w_tfidf, w_bm25, w_emb): | |
| if not message or not message.strip(): | |
| return "Ask a literature question (e.g., *How does CNT length affect gauge factor?*)" | |
| try: | |
| return rag_reply( | |
| question=message, | |
| k=int(top_k), | |
| n_sentences=int(n_sentences), | |
| include_passages=bool(include_passages), | |
| use_llm=bool(use_llm), | |
| model=(model_name or None), | |
| temperature=float(temperature), | |
| strict_quotes_only=bool(strict_quotes_only), | |
| w_tfidf=float(w_tfidf), | |
| w_bm25=float(w_bm25), | |
| w_emb=float(w_emb), | |
| ) | |
| except Exception as e: | |
| return f"RAG error: {e}" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🧪 Self-Sensing Concrete Assistant — Hybrid RAG (Accurate Q&A)") | |
| gr.Markdown( | |
| "- **Prediction**: XGBoost pipeline for **Stress Gauge Factor (MPa)**.\n" | |
| "- **Literature (Hybrid RAG)**: BM25 + TF-IDF + Dense embeddings with **MMR** sentence selection.\n" | |
| "- **Strict mode** shows only quoted sentences with citations; **GPT-5** can paraphrase strictly from those quotes.\n" | |
| "- Toggle **online OA harvesting** in config (Crossref + Unpaywall) or use a local PDF folder." | |
| ) | |
| with gr.Tabs(): | |
| with gr.Tab("🔮 Predict Gauge Factor (XGB)"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| inputs = [ | |
| gr.Textbox(label="Filler1_Type", placeholder="e.g., CNT, Graphite, Steel fiber"), | |
| gr.Dropdown(CAT_DIM_CHOICES, label="Filler1_Dimensions", value="NA"), | |
| gr.Number(label="Filler1_Diameter_um"), | |
| gr.Number(label="Filler1_Length_mm"), | |
| gr.Textbox(label="Filler2_Type", placeholder="Optional"), | |
| gr.Dropdown(CAT_DIM_CHOICES, label="Filler2_Dimensions", value="NA"), | |
| gr.Number(label="Filler2_Diameter_um"), | |
| gr.Number(label="Filler2_Length_mm"), | |
| gr.Number(label="AvgFiller_Density_g/cm3"), | |
| gr.Number(label="AvgFiller_weight_%"), | |
| gr.Number(label="AvgFiller_Volume_%"), | |
| gr.Number(label="Sample_Volume_mm3"), | |
| gr.Number(label="Electrode/Probe_Count"), | |
| gr.Textbox(label="Electrode/Probe_Material", placeholder="e.g., Copper, Silver paste"), | |
| gr.Number(label="W/B"), | |
| gr.Number(label="S/B"), | |
| gr.Number(label="GaugeLength_mm"), | |
| gr.Textbox(label="Curing_Conditions", placeholder="e.g., 28d water, 20°C"), | |
| gr.Number(label="Num_ConductiveFillers"), | |
| gr.Number(label="DryingTemperature_C"), | |
| gr.Number(label="DryingDuration_hrs"), | |
| gr.Number(label="LoadingRate_MPa/s"), | |
| gr.Number(label="ElasticModulus_Gpa"), | |
| gr.Textbox(label="Voltage_Type", placeholder="AC / DC"), | |
| gr.Number(label="Applied_Voltage_V"), | |
| ] | |
| with gr.Column(): | |
| out_pred = gr.Number(label="Predicted Stress_GF (MPa)", precision=6) | |
| gr.Button("Predict", variant="primary").click(predict_fn, inputs, out_pred) | |
| with gr.Tab("📚 Ask the Literature (Hybrid RAG + MMR)"): | |
| with gr.Row(): | |
| top_k = gr.Slider(5, 12, value=8, step=1, label="Top-K chunks") | |
| n_sentences = gr.Slider(2, 6, value=4, step=1, label="Answer length (sentences)") | |
| include_passages = gr.Checkbox(value=False, label="Include supporting passages") | |
| with gr.Accordion("Retriever weights (advanced)", open=False): | |
| w_tfidf = gr.Slider(0.0, 1.0, value=W_TFIDF_DEFAULT, step=0.05, label="TF-IDF weight") | |
| w_bm25 = gr.Slider(0.0, 1.0, value=W_BM25_DEFAULT, step=0.05, label="BM25 weight") | |
| w_emb = gr.Slider(0.0, 1.0, value=W_EMB_DEFAULT, step=0.05, label="Dense weight (set 0 if disabled)") | |
| with gr.Accordion("LLM & Controls", open=False): | |
| strict_quotes_only = gr.Checkbox(value=False, label="Strict quotes only (no paraphrasing)") | |
| use_llm = gr.Checkbox(value=False, label="Use GPT-5 to paraphrase selected sentences") | |
| model_name = gr.Textbox(value=os.getenv("OPENAI_MODEL", OPENAI_MODEL), label="LLM model", placeholder="e.g., gpt-5 or gpt-5-mini") | |
| temperature = gr.Slider(0.0, 1.0, value=0.2, step=0.05, label="Temperature") | |
| gr.ChatInterface( | |
| fn=rag_chat_fn, | |
| additional_inputs=[top_k, n_sentences, include_passages, use_llm, model_name, temperature, strict_quotes_only, w_tfidf, w_bm25, w_emb], | |
| title="Literature Q&A", | |
| description="Hybrid retrieval with diversity. Answers carry inline (Doc, p.X) citations. Toggle strict/LLM modes." | |
| ) | |
| # Note: add share=True to expose publicly (for iframe embedding) | |
| demo.queue().launch() | |