Spaces:
Sleeping
Sleeping
| # ================================================================ | |
| # Self-Sensing Concrete Assistant — Hybrid RAG + XGB + (opt) GPT-5 | |
| # FIXED for Windows/Conda import issues (transformers/quantizers) | |
| # - Pins compatible versions (transformers 4.44.2, sbert 2.7.0, torch 2.x) | |
| # - Disables TF/Flax backends; safe fallbacks if dense fails | |
| # - Hybrid retrieval (BM25 + TF-IDF + Dense*) + MMR sentence selection | |
| # - Local folder only (RAG reads from ./literature_pdfs); no online indexing | |
| # - Optional GPT-5 synthesis strictly from selected cited sentences | |
| # - Gradio UI with Prediction + Literature Q&A tabs | |
| # ================================================================ | |
| # ---------------------- MUST RUN THESE FLAGS FIRST ---------------------- | |
| import os | |
| os.environ["TRANSFORMERS_NO_TF"] = "1" # don't import TensorFlow | |
| os.environ["TRANSFORMERS_NO_FLAX"] = "1" # don't import Flax/JAX | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| # ------------------------------- Imports ----------------------------------- | |
| import re, json, time, joblib, warnings, math, hashlib | |
| from pathlib import Path | |
| from typing import List, Dict | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.preprocessing import RobustScaler, OneHotEncoder | |
| from sklearn.preprocessing import normalize as sk_normalize | |
| from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error | |
| from sklearn.feature_selection import VarianceThreshold | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from xgboost import XGBRegressor | |
| from pypdf import PdfReader | |
| import fitz # PyMuPDF | |
| import gradio as gr | |
| USE_DENSE = True | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except Exception as e: | |
| USE_DENSE = False | |
| print("⚠️ sentence-transformers unavailable; continuing with TF-IDF + BM25 only.\n", e) | |
| from rank_bm25 import BM25Okapi | |
| from openai import OpenAI | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| # ============================ Config ======================================= | |
| # --- Data & model paths --- | |
| DATA_PATH = "july3.xlsx" # <- update if needed | |
| # --- Local PDF folder for RAG (no online indexing) --- | |
| LOCAL_PDF_DIR = Path("./literature_pdfs") # <- your local folder | |
| LOCAL_PDF_DIR.mkdir(exist_ok=True) | |
| # --- RAG artifacts (kept in working dir) --- | |
| ARTIFACT_DIR = Path("rag_artifacts"); ARTIFACT_DIR.mkdir(exist_ok=True) | |
| MODEL_OUT = "stress_gf_xgb.joblib" | |
| TFIDF_VECT_PATH = ARTIFACT_DIR / "tfidf_vectorizer.joblib" | |
| TFIDF_MAT_PATH = ARTIFACT_DIR / "tfidf_matrix.joblib" | |
| BM25_TOK_PATH = ARTIFACT_DIR / "bm25_tokens.joblib" | |
| EMB_NPY_PATH = ARTIFACT_DIR / "chunk_embeddings.npy" | |
| RAG_META_PATH = ARTIFACT_DIR / "chunks.parquet" | |
| # --- Embedding model (fast CPU) --- | |
| EMB_MODEL_NAME = os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2") | |
| # --- OpenAI (optional LLM synthesis) --- | |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini") # e.g., "gpt-5-mini" | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", None) # set env var to enable LLM | |
| # --- Retrieval weights (UI defaults adapt if dense disabled) --- | |
| W_TFIDF_DEFAULT = 0.50 if not USE_DENSE else 0.30 | |
| W_BM25_DEFAULT = 0.50 if not USE_DENSE else 0.30 | |
| W_EMB_DEFAULT = 0.00 if not USE_DENSE else 0.40 | |
| RANDOM_SEED = 42 | |
| # ==================== XGB Pipeline (Prediction) ============================ | |
| def make_onehot(): | |
| try: | |
| return OneHotEncoder(handle_unknown="ignore", sparse_output=False) | |
| except TypeError: | |
| return OneHotEncoder(handle_unknown="ignore", sparse=False) | |
| def rmse(y_true, y_pred): | |
| return mean_squared_error(y_true, y_pred) | |
| def evaluate(m, X, y_log, name="Model"): | |
| y_pred_log = m.predict(X) | |
| y_pred = np.expm1(y_pred_log) | |
| y_true = np.expm1(y_log) | |
| r2 = r2_score(y_true, y_pred) | |
| r = rmse(y_true, y_pred) | |
| mae = mean_absolute_error(y_true, y_pred) | |
| print(f"{name}: R²={r2:.3f}, RMSE={r:.3f}, MAE={mae:.3f}") | |
| return r2, r, mae | |
| # --- Load data | |
| df = pd.read_excel(DATA_PATH) | |
| df.columns = df.columns.str.strip() | |
| drop_cols = [ | |
| 'Loading rate (MPa/s)', 'Voltage (V) AC\\DC', 'Elastic Modulus (GPa)', 'Duration (hrs) of Dying Method' | |
| ] | |
| df = df.drop(columns=[c for c in drop_cols if c in df.columns], errors='ignore') | |
| main_variables = [ | |
| 'Filler1_Type', 'Filler1_Diameter_um', 'Filler1_Length_mm', | |
| 'AvgFiller_Density_g/cm3', 'AvgFiller_weight_%', 'AvgFiller_Volume_%', | |
| 'Filler1_Dimensions', 'Filler2_Type', 'Filler2_Diameter_um', 'Filler2_Length_mm', | |
| 'Filler2_Dimensions', 'Sample_Volume_mm3', 'Electrode/Probe_Count', 'Electrode/Probe_Material', | |
| 'W/B', 'S/B', 'GaugeLength_mm', 'Curing_Conditions', 'Num_ConductiveFillers', | |
| 'DryingTemperature_C', 'DryingDuration_hrs', 'LoadingRate_MPa/s', | |
| 'ElasticModulus_Gpa', 'Voltage_Type', 'Applied_Voltage_V' | |
| ] | |
| target_col = 'Stress_GF_Mpa' | |
| df = df[main_variables + [target_col]].copy() | |
| df = df.dropna(subset=[target_col]) | |
| df = df[df[target_col] > 0] | |
| numeric_cols = [ | |
| 'Filler1_Diameter_um', 'Filler1_Length_mm', 'AvgFiller_Density_g/cm3', | |
| 'AvgFiller_weight_%', 'AvgFiller_Volume_%', 'Filler2_Diameter_um', | |
| 'Filler2_Length_mm', 'Sample_Volume_mm3', 'Electrode/Probe_Count', | |
| 'W/B', 'S/B', 'GaugeLength_mm', 'Num_ConductiveFillers', | |
| 'DryingTemperature_C', 'DryingDuration_hrs', 'LoadingRate_MPa/s', | |
| 'ElasticModulus_Gpa', 'Applied_Voltage_V' | |
| ] | |
| categorical_cols = [ | |
| 'Filler1_Type', 'Filler1_Dimensions', 'Filler2_Type', 'Filler2_Dimensions', | |
| 'Electrode/Probe_Material', 'Curing_Conditions', 'Voltage_Type' | |
| ] | |
| for c in numeric_cols: | |
| df[c] = pd.to_numeric(df[c], errors='coerce') | |
| for c in categorical_cols: | |
| df[c] = df[c].astype(str) | |
| vt = VarianceThreshold(threshold=1e-3) | |
| vt.fit(df[numeric_cols]) | |
| numeric_cols = [c for c in numeric_cols if c not in df[numeric_cols].columns[vt.variances_ < 1e-3]] | |
| corr = df[numeric_cols].corr().abs() | |
| upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool)) | |
| to_drop = [c for c in upper.columns if any(upper[c] > 0.95)] | |
| numeric_cols = [c for c in numeric_cols if c not in to_drop] | |
| X = df[main_variables].copy() | |
| y = np.log1p(df[target_col]) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, random_state=RANDOM_SEED | |
| ) | |
| BEST_PARAMS = { | |
| "regressor__subsample": 1.0, | |
| "regressor__reg_lambda": 5, | |
| "regressor__reg_alpha": 0.05, | |
| "regressor__n_estimators": 300, | |
| "regressor__max_depth": 6, | |
| "regressor__learning_rate": 0.1, | |
| "regressor__gamma": 0, | |
| "regressor__colsample_bytree": 1.0 | |
| } | |
| def train_and_save_model(): | |
| num_tf = Pipeline([('imputer', SimpleImputer(strategy='median')), | |
| ('scaler', RobustScaler())]) | |
| cat_tf = Pipeline([('imputer', SimpleImputer(strategy='most_frequent')), | |
| ('onehot', make_onehot())]) | |
| preprocessor = ColumnTransformer([ | |
| ('num', num_tf, numeric_cols), | |
| ('cat', cat_tf, categorical_cols) | |
| ]) | |
| xgb_pipe = Pipeline([ | |
| ('preprocessor', preprocessor), | |
| ('regressor', XGBRegressor(random_state=RANDOM_SEED, n_jobs=-1, verbosity=0)) | |
| ]) | |
| xgb_pipe.set_params(**BEST_PARAMS).fit(X_train, y_train) | |
| joblib.dump(xgb_pipe, MODEL_OUT) | |
| print(f"✅ Trained new model and saved → {MODEL_OUT}") | |
| return xgb_pipe | |
| def load_or_train_model(): | |
| if os.path.exists(MODEL_OUT): | |
| print(f"📂 Loading existing model from {MODEL_OUT}") | |
| return joblib.load(MODEL_OUT) | |
| else: | |
| print("⚠️ No saved model found. Training a new one...") | |
| return train_and_save_model() | |
| xgb_pipe = load_or_train_model() | |
| # ======================= Hybrid RAG Indexing ================================ | |
| _SENT_SPLIT_RE = re.compile(r"(?<=[.!?])\s+|\n+") | |
| TOKEN_RE = re.compile(r"[A-Za-z0-9_#+\-/\.%]+") | |
| def sent_split(text: str) -> List[str]: | |
| sents = [s.strip() for s in _SENT_SPLIT_RE.split(text) if s.strip()] | |
| return [s for s in sents if len(s.split()) >= 5] | |
| def tokenize(text: str) -> List[str]: | |
| return [t.lower() for t in TOKEN_RE.findall(text)] | |
| def extract_text_pymupdf(pdf_path: Path) -> str: | |
| try: | |
| doc = fitz.open(pdf_path) | |
| buff = [] | |
| for i, page in enumerate(doc): | |
| txt = page.get_text("text") or "" | |
| buff.append(f"[[PAGE={i+1}]]\n{txt}") | |
| return "\n\n".join(buff) | |
| except Exception: | |
| # Fallback to PyPDF | |
| try: | |
| reader = PdfReader(str(pdf_path)) | |
| buff = [] | |
| for i, p in enumerate(reader.pages): | |
| txt = p.extract_text() or "" | |
| buff.append(f"[[PAGE={i+1}]]\n{txt}") | |
| return "\n\n".join(buff) | |
| except Exception as e: | |
| print(f"PDF read error ({pdf_path}): {e}") | |
| return "" | |
| def chunk_by_sentence_windows(text: str, win_size=8, overlap=2) -> List[str]: | |
| sents = sent_split(text) | |
| chunks = [] | |
| step = max(1, win_size - overlap) | |
| for i in range(0, len(sents), step): | |
| window = sents[i:i+win_size] | |
| if not window: break | |
| chunks.append(" ".join(window)) | |
| return chunks | |
| def _safe_init_st_model(name: str): | |
| """Try to init SentenceTransformer; on failure, disable dense and return None.""" | |
| global USE_DENSE | |
| if not USE_DENSE: | |
| return None | |
| try: | |
| m = SentenceTransformer(name) | |
| return m | |
| except Exception as e: | |
| print("⚠️ Could not initialize SentenceTransformer; disabling dense embeddings.\n", e) | |
| USE_DENSE = False | |
| return None | |
| def _collect_pdf_paths(pdf_dir: Path) -> List[Path]: | |
| # Collect PDFs recursively from the local folder | |
| return list(Path(pdf_dir).glob("**/*.pdf")) | |
| def build_or_load_hybrid(pdf_dir: Path): | |
| # If artifacts exist, load them | |
| have_cache = (TFIDF_VECT_PATH.exists() and TFIDF_MAT_PATH.exists() | |
| and BM25_TOK_PATH.exists() and RAG_META_PATH.exists() | |
| and (EMB_NPY_PATH.exists() or not USE_DENSE)) | |
| if have_cache: | |
| vectorizer = joblib.load(TFIDF_VECT_PATH) | |
| X_tfidf = joblib.load(TFIDF_MAT_PATH) | |
| meta = pd.read_parquet(RAG_META_PATH) | |
| bm25_toks = joblib.load(BM25_TOK_PATH) | |
| emb = np.load(EMB_NPY_PATH) if (USE_DENSE and EMB_NPY_PATH.exists()) else None | |
| print("Loaded hybrid index.") | |
| return vectorizer, X_tfidf, meta, bm25_toks, emb | |
| # Fresh index | |
| rows, all_tokens = [], [] | |
| pdf_paths = _collect_pdf_paths(pdf_dir) | |
| print(f"Indexing PDFs from {pdf_dir}. Found {len(pdf_paths)} files.") | |
| for pdf in pdf_paths: | |
| raw = extract_text_pymupdf(pdf) | |
| if not raw.strip(): | |
| continue | |
| for i, ch in enumerate(chunk_by_sentence_windows(raw, win_size=8, overlap=2)): | |
| rows.append({"doc_path": str(pdf), "chunk_id": i, "text": ch}) | |
| all_tokens.append(tokenize(ch)) | |
| if not rows: | |
| raise RuntimeError(f"No PDF text found under: {pdf_dir}") | |
| meta = pd.DataFrame(rows) | |
| # TF-IDF | |
| vectorizer = TfidfVectorizer( | |
| ngram_range=(1,2), | |
| min_df=1, max_df=0.95, | |
| sublinear_tf=True, smooth_idf=True, | |
| lowercase=True, | |
| token_pattern=r"(?u)\b\w[\w\-\./%+#]*\b" | |
| ) | |
| X_tfidf = vectorizer.fit_transform(meta["text"].tolist()) | |
| # Dense (optional) | |
| emb = None | |
| if USE_DENSE: | |
| try: | |
| st_model_tmp = _safe_init_st_model(EMB_MODEL_NAME) | |
| if st_model_tmp is not None: | |
| em = st_model_tmp.encode(meta["text"].tolist(), batch_size=64, show_progress_bar=False, convert_to_numpy=True) | |
| emb = sk_normalize(em) | |
| np.save(EMB_NPY_PATH, emb) | |
| except Exception as e: | |
| emb = None | |
| print("⚠️ Dense embeddings failed; continuing without them.\n", e) | |
| # Save artifacts | |
| joblib.dump(vectorizer, TFIDF_VECT_PATH) | |
| joblib.dump(X_tfidf, TFIDF_MAT_PATH) | |
| joblib.dump(all_tokens, BM25_TOK_PATH) | |
| meta.to_parquet(RAG_META_PATH, index=False) | |
| print(f"Indexed {len(meta)} chunks from {meta['doc_path'].nunique()} PDFs.") | |
| return vectorizer, X_tfidf, meta, all_tokens, emb | |
| # ---------- Auto reindex if new/modified PDFs are detected ---------- | |
| from datetime import datetime | |
| def auto_reindex_if_needed(pdf_dir: Path): | |
| """Rebuilds RAG index if new or modified PDFs are detected.""" | |
| meta_path = RAG_META_PATH | |
| pdfs = _collect_pdf_paths(pdf_dir) | |
| if not meta_path.exists(): | |
| print("No existing index found — indexing now...") | |
| # Remove stale artifacts if any partial set exists | |
| for p in [TFIDF_VECT_PATH, TFIDF_MAT_PATH, BM25_TOK_PATH, EMB_NPY_PATH]: | |
| try: | |
| if p.exists(): p.unlink() | |
| except Exception: | |
| pass | |
| return # build will happen below | |
| last_index_time = datetime.fromtimestamp(meta_path.stat().st_mtime) | |
| recent = [p for p in pdfs if datetime.fromtimestamp(p.stat().st_mtime) > last_index_time] | |
| if recent: | |
| print(f"Found {len(recent)} new/updated PDFs — rebuilding index...") | |
| # Clear artifacts to force rebuild | |
| for p in [TFIDF_VECT_PATH, TFIDF_MAT_PATH, BM25_TOK_PATH, EMB_NPY_PATH, RAG_META_PATH]: | |
| try: | |
| if p.exists(): p.unlink() | |
| except Exception: | |
| pass | |
| # Build hybrid index (local only) | |
| auto_reindex_if_needed(LOCAL_PDF_DIR) | |
| tfidf_vectorizer, tfidf_matrix, rag_meta, bm25_tokens, emb_matrix = build_or_load_hybrid(LOCAL_PDF_DIR) | |
| bm25 = BM25Okapi(bm25_tokens) | |
| st_query_model = _safe_init_st_model(EMB_MODEL_NAME) # safe init; may set USE_DENSE=False | |
| # If dense failed at runtime, update default weights in case UI uses them | |
| if not USE_DENSE: | |
| W_TFIDF_DEFAULT, W_BM25_DEFAULT, W_EMB_DEFAULT = 0.50, 0.50, 0.00 | |
| def _extract_page(text_chunk: str) -> str: | |
| m = list(re.finditer(r"\[\[PAGE=(\d+)\]\]", text_chunk)) | |
| return (m[-1].group(1) if m else "?") | |
| # ---------------------- Hybrid search -------------------------------------- | |
| def hybrid_search(query: str, k=8, w_tfidf=W_TFIDF_DEFAULT, w_bm25=W_BM25_DEFAULT, w_emb=W_EMB_DEFAULT): | |
| # Dense (optional) | |
| if USE_DENSE and st_query_model is not None and emb_matrix is not None and w_emb > 0: | |
| try: | |
| q_emb = st_query_model.encode([query], convert_to_numpy=True) | |
| q_emb = sk_normalize(q_emb)[0] | |
| dense_scores = emb_matrix @ q_emb | |
| except Exception as e: | |
| print("⚠️ Dense query encoding failed; ignoring dense this run.\n", e) | |
| dense_scores = np.zeros(len(rag_meta), dtype=float) | |
| w_emb = 0.0 | |
| else: | |
| dense_scores = np.zeros(len(rag_meta), dtype=float) | |
| w_emb = 0.0 # force off | |
| # TF-IDF | |
| q_vec = tfidf_vectorizer.transform([query]) | |
| tfidf_scores = (tfidf_matrix @ q_vec.T).toarray().ravel() | |
| # BM25 | |
| q_tokens = [t.lower() for t in TOKEN_RE.findall(query)] | |
| bm25_scores = np.array(bm25.get_scores(q_tokens), dtype=float) | |
| def _norm(x): | |
| x = np.asarray(x, dtype=float) | |
| if np.allclose(x.max(), x.min()): | |
| return np.zeros_like(x) | |
| return (x - x.min()) / (x.max() - x.min()) | |
| s_dense = _norm(dense_scores) | |
| s_tfidf = _norm(tfidf_scores) | |
| s_bm25 = _norm(bm25_scores) | |
| total_w = (w_tfidf + w_bm25 + w_emb) or 1.0 | |
| w_tfidf, w_bm25, w_emb = w_tfidf/total_w, w_bm25/total_w, w_emb/total_w | |
| combo = w_emb * s_dense + w_tfidf * s_tfidf + w_bm25 * s_bm25 | |
| idx = np.argsort(-combo)[:k] | |
| hits = rag_meta.iloc[idx].copy() | |
| hits["score_dense"] = s_dense[idx] | |
| hits["score_tfidf"] = s_tfidf[idx] | |
| hits["score_bm25"] = s_bm25[idx] | |
| hits["score"] = combo[idx] | |
| return hits.reset_index(drop=True) | |
| # -------------- Sentence selection with MMR (diversity) -------------------- | |
| def split_sentences(text: str) -> List[str]: | |
| sents = sent_split(text) | |
| return [s for s in sents if 6 <= len(s.split()) <= 60] | |
| def mmr_select_sentences(question: str, hits: pd.DataFrame, top_n=4, pool_per_chunk=6, lambda_div=0.7): | |
| pool = [] | |
| for _, row in hits.iterrows(): | |
| doc = Path(row["doc_path"]).name | |
| page = _extract_page(row["text"]) | |
| for s in split_sentences(row["text"])[:pool_per_chunk]: | |
| pool.append({"sent": s, "doc": doc, "page": page}) | |
| if not pool: | |
| return [] | |
| sent_texts = [p["sent"] for p in pool] | |
| if USE_DENSE and st_query_model is not None: | |
| try: | |
| texts = [question] + sent_texts | |
| enc = st_query_model.encode(texts, convert_to_numpy=True) | |
| q_vec = sk_normalize(enc[:1])[0] | |
| S = sk_normalize(enc[1:]) | |
| rel = (S @ q_vec) | |
| def sim_fn(i, j): return float(S[i] @ S[j]) | |
| except Exception as e: | |
| print("⚠️ Dense sentence encoding failed; falling back to TF-IDF for MMR.\n", e) | |
| Q = tfidf_vectorizer.transform([question]) | |
| S = tfidf_vectorizer.transform(sent_texts) | |
| rel = (S @ Q.T).toarray().ravel() | |
| def sim_fn(i, j): return float((S[i] @ S[j].T).toarray()[0, 0]) | |
| else: | |
| Q = tfidf_vectorizer.transform([question]) | |
| S = tfidf_vectorizer.transform(sent_texts) | |
| rel = (S @ Q.T).toarray().ravel() | |
| def sim_fn(i, j): return float((S[i] @ S[j].T).toarray()[0, 0]) | |
| selected, selected_idx = [], [] | |
| remain = list(range(len(pool))) | |
| first = int(np.argmax(rel)) | |
| selected.append(pool[first]); selected_idx.append(first); remain.remove(first) | |
| while len(selected) < top_n and remain: | |
| cand_scores = [] | |
| for i in remain: | |
| sim_to_sel = max(sim_fn(i, j) for j in selected_idx) if selected_idx else 0.0 | |
| score = lambda_div * rel[i] - (1 - lambda_div) * sim_to_sel | |
| cand_scores.append((score, i)) | |
| cand_scores.sort(reverse=True) | |
| best_i = cand_scores[0][1] | |
| selected.append(pool[best_i]); selected_idx.append(best_i); remain.remove(best_i) | |
| return selected | |
| def compose_extractive(selected: List[Dict]) -> str: | |
| if not selected: | |
| return "" | |
| lines = [f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected] | |
| return " ".join(lines) | |
| # ------------------- Optional GPT-5 synthesis ------------------------------ | |
| # ------------------- Optional GPT-4o/GPT-5 synthesis ------------------------------ | |
| def synthesize_with_llm(question: str, sentence_lines: List[str], model: str = None, temperature: float = 0.2) -> str: | |
| if OPENAI_API_KEY is None: | |
| print("Skipping ChatGPT") | |
| return None # not configured → skip synthesis | |
| from openai import OpenAI | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| if model is None: | |
| model = OPENAI_MODEL | |
| # --- Stronger, clean academic prompt --- | |
| SYSTEM_PROMPT = ( | |
| "You are a scientific writing assistant specializing in self-sensing cementitious materials.\n" | |
| "Write a short, fluent, and informative paragraph (3–6 sentences) answering the question using ONLY the provided evidence.\n" | |
| "Rephrase and synthesize ideas; do not copy sentences verbatim.\n" | |
| "Include parenthetical citations exactly as given (e.g., '(Paper.pdf, p.4)')." | |
| ) | |
| user_prompt = ( | |
| f"Question: {question}\n\n" | |
| "Evidence:\n" + | |
| "\n".join(f"- {s}" for s in sentence_lines) | |
| ) | |
| try: | |
| print("🔍 Calling GPT synthesis...") | |
| response = client.chat.completions.create( | |
| model=model, | |
| temperature=temperature, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| ) | |
| answer = response.choices[0].message.content.strip() | |
| return answer | |
| except Exception as e: | |
| print(f"❌ LLM synthesis error: {e}") | |
| return None | |
| # ------------------------ RAG reply ---------------------------------------- | |
| def rag_reply( | |
| question: str, | |
| k: int = 8, | |
| n_sentences: int = 4, | |
| include_passages: bool = False, | |
| use_llm: bool = False, | |
| model: str = None, | |
| temperature: float = 0.2, | |
| strict_quotes_only: bool = False, | |
| w_tfidf: float = W_TFIDF_DEFAULT, | |
| w_bm25: float = W_BM25_DEFAULT, | |
| w_emb: float = W_EMB_DEFAULT | |
| ) -> str: | |
| hits = hybrid_search(question, k=k, w_tfidf=w_tfidf, w_bm25=w_bm25, w_emb=w_emb) | |
| if hits.empty: | |
| return "No relevant passages found. Add more PDFs in literature_pdfs/ or adjust your query." | |
| selected = mmr_select_sentences(question, hits, top_n=int(n_sentences), pool_per_chunk=6, lambda_div=0.7) | |
| header_cites = "; ".join( | |
| f"{Path(r['doc_path']).name} (p.{_extract_page(r['text'])})" for _, r in hits.head(6).iterrows() | |
| ) | |
| # Coverage note (helps debugging thin answers) | |
| srcs = {Path(r['doc_path']).name for _, r in hits.iterrows()} | |
| coverage_note = "" | |
| if len(srcs) < 3: | |
| coverage_note = f"\n\n> Note: Only {len(srcs)} unique source(s) contributed. Add more PDFs or increase Top-K." | |
| if strict_quotes_only: | |
| if not selected: | |
| return f"**Quoted Passages:**\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) + \ | |
| f"\n\n**Citations:** {header_cites}{coverage_note}" | |
| msg = "**Quoted Passages:**\n- " + "\n- ".join(f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected) | |
| msg += f"\n\n**Citations:** {header_cites}{coverage_note}" | |
| if include_passages: | |
| msg += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| return msg | |
| # Extractive baseline | |
| extractive = compose_extractive(selected) | |
| # Optional LLM synthesis | |
| if use_llm and selected: | |
| lines = [f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected] | |
| llm_text = synthesize_with_llm(question, lines, model=model, temperature=temperature) | |
| if llm_text: | |
| msg = f"**Answer (GPT-5 synthesis):** {llm_text}\n\n**Citations:** {header_cites}{coverage_note}" | |
| if include_passages: | |
| msg += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| return msg | |
| # Fallback: purely extractive | |
| if not extractive: | |
| return f"**Answer:** Here are relevant passages.\n\n**Citations:** {header_cites}{coverage_note}\n\n---\n" + \ | |
| "\n\n".join(hits["text"].tolist()[:2]) | |
| msg = f"**Answer:** {extractive}\n\n**Citations:** {header_cites}{coverage_note}" | |
| if include_passages: | |
| msg += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| return msg | |
| # =========================== Gradio UI ===================================== | |
| INPUT_COLS = [ | |
| "Filler1_Type", "Filler1_Dimensions", "Filler1_Diameter_um", "Filler1_Length_mm", | |
| "Filler2_Type", "Filler2_Dimensions", "Filler2_Diameter_um", "Filler2_Length_mm", | |
| "AvgFiller_Density_g/cm3", "AvgFiller_weight_%", "AvgFiller_Volume_%", | |
| "Sample_Volume_mm3", "Electrode/Probe_Count", "Electrode/Probe_Material", | |
| "W/B", "S/B", "GaugeLength_mm", "Curing_Conditions", "Num_ConductiveFillers", | |
| "DryingTemperature_C", "DryingDuration_hrs", "LoadingRate_MPa/s", | |
| "ElasticModulus_Gpa", "Voltage_Type", "Applied_Voltage_V" | |
| ] | |
| NUMERIC_INPUTS = { | |
| "Filler1_Diameter_um","Filler1_Length_mm","Filler2_Diameter_um","Filler2_Length_mm", | |
| "AvgFiller_Density_g/cm3","AvgFiller_weight_%","AvgFiller_Volume_%","Sample_Volume_mm3", | |
| "Electrode/Probe_Count","W/B","S/B","GaugeLength_mm","Num_ConductiveFillers", | |
| "DryingTemperature_C","DryingDuration_hrs","LoadingRate_MPa/s","ElasticModulus_Gpa", | |
| "Applied_Voltage_V" | |
| } | |
| CAT_DIM_CHOICES = ["0D","1D","2D","3D","NA"] | |
| def _coerce_row(args): | |
| row = {c: v for c, v in zip(INPUT_COLS, args)} | |
| clean = {} | |
| for k, v in row.items(): | |
| if k in NUMERIC_INPUTS: | |
| if v in ("", None): clean[k] = None | |
| else: | |
| try: clean[k] = float(v) | |
| except: clean[k] = None | |
| else: | |
| clean[k] = "" if v is None else str(v).strip() | |
| return pd.DataFrame([clean], columns=INPUT_COLS) | |
| def _load_model(): | |
| if not os.path.exists(MODEL_OUT): | |
| raise FileNotFoundError(f"Model file not found at '{MODEL_OUT}'. Retrain above.") | |
| return joblib.load(MODEL_OUT) | |
| def predict_fn(*args): | |
| try: | |
| mdl = _load_model() | |
| X_new = _coerce_row(args) | |
| y_log = mdl.predict(X_new) | |
| y = float(np.expm1(y_log)[0]) | |
| if -1e-8 < y < 0: y = 0.0 | |
| return y | |
| except Exception as e: | |
| return f"Error during prediction: {e}" | |
| def rag_chat_fn(message, history, top_k, n_sentences, include_passages, | |
| use_llm, model_name, temperature, strict_quotes_only, | |
| w_tfidf, w_bm25, w_emb): | |
| if not message or not message.strip(): | |
| return "Ask a literature question (e.g., *How does CNT length affect gauge factor?*)" | |
| try: | |
| return rag_reply( | |
| question=message, | |
| k=int(top_k), | |
| n_sentences=int(n_sentences), | |
| include_passages=bool(include_passages), | |
| use_llm=bool(use_llm), | |
| model=(model_name or None), | |
| temperature=float(temperature), | |
| strict_quotes_only=bool(strict_quotes_only), | |
| w_tfidf=float(w_tfidf), | |
| w_bm25=float(w_bm25), | |
| w_emb=float(w_emb), | |
| ) | |
| except Exception as e: | |
| return f"RAG error: {e}" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🧪 Self-Sensing Concrete Assistant — Hybrid RAG (Accurate Q&A)") | |
| gr.Markdown( | |
| "- **Prediction**: XGBoost pipeline for **Stress Gauge Factor (MPa)**.\n" | |
| "- **Literature (Hybrid RAG)**: BM25 + TF-IDF + Dense embeddings with **MMR** sentence selection.\n" | |
| "- **Strict mode** shows only quoted sentences with citations; **GPT-5** can paraphrase strictly from those quotes.\n" | |
| "- **Local-only RAG**: drop PDFs into `literature_pdfs/` and the index will auto-refresh on restart." | |
| ) | |
| with gr.Tabs(): | |
| with gr.Tab("🔮 Predict Gauge Factor (XGB)"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| inputs = [ | |
| gr.Textbox(label="Filler1_Type", placeholder="e.g., CNT, Graphite, Steel fiber"), | |
| gr.Dropdown(CAT_DIM_CHOICES, label="Filler1_Dimensions", value="NA"), | |
| gr.Number(label="Filler1_Diameter_um"), | |
| gr.Number(label="Filler1_Length_mm"), | |
| gr.Textbox(label="Filler2_Type", placeholder="Optional"), | |
| gr.Dropdown(CAT_DIM_CHOICES, label="Filler2_Dimensions", value="NA"), | |
| gr.Number(label="Filler2_Diameter_um"), | |
| gr.Number(label="Filler2_Length_mm"), | |
| gr.Number(label="AvgFiller_Density_g/cm3"), | |
| gr.Number(label="AvgFiller_weight_%"), | |
| gr.Number(label="AvgFiller_Volume_%"), | |
| gr.Number(label="Sample_Volume_mm3"), | |
| gr.Number(label="Electrode/Probe_Count"), | |
| gr.Textbox(label="Electrode/Probe_Material", placeholder="e.g., Copper, Silver paste"), | |
| gr.Number(label="W/B"), | |
| gr.Number(label="S/B"), | |
| gr.Number(label="GaugeLength_mm"), | |
| gr.Textbox(label="Curing_Conditions", placeholder="e.g., 28d water, 20°C"), | |
| gr.Number(label="Num_ConductiveFillers"), | |
| gr.Number(label="DryingTemperature_C"), | |
| gr.Number(label="DryingDuration_hrs"), | |
| gr.Number(label="LoadingRate_MPa/s"), | |
| gr.Number(label="ElasticModulus_Gpa"), | |
| gr.Textbox(label="Voltage_Type", placeholder="AC / DC"), | |
| gr.Number(label="Applied_Voltage_V"), | |
| ] | |
| with gr.Column(): | |
| out_pred = gr.Number(label="Predicted Stress_GF (MPa)", precision=6) | |
| gr.Button("Predict", variant="primary").click(predict_fn, inputs, out_pred) | |
| with gr.Tab("📚 Ask the Literature (Hybrid RAG + MMR)"): | |
| with gr.Row(): | |
| top_k = gr.Slider(5, 12, value=8, step=1, label="Top-K chunks") | |
| n_sentences = gr.Slider(2, 6, value=4, step=1, label="Answer length (sentences)") | |
| include_passages = gr.Checkbox(value=False, label="Include supporting passages") | |
| with gr.Accordion("Retriever weights (advanced)", open=False): | |
| w_tfidf = gr.Slider(0.0, 1.0, value=W_TFIDF_DEFAULT, step=0.05, label="TF-IDF weight") | |
| w_bm25 = gr.Slider(0.0, 1.0, value=W_BM25_DEFAULT, step=0.05, label="BM25 weight") | |
| w_emb = gr.Slider(0.0, 1.0, value=W_EMB_DEFAULT, step=0.05, label="Dense weight (set 0 if disabled)") | |
| with gr.Accordion("LLM & Controls", open=False): | |
| strict_quotes_only = gr.Checkbox(value=False, label="Strict quotes only (no paraphrasing)") | |
| use_llm = gr.Checkbox(value=False, label="Use GPT-5 to paraphrase selected sentences") | |
| model_name = gr.Textbox(value=os.getenv("OPENAI_MODEL", OPENAI_MODEL), label="LLM model", placeholder="e.g., gpt-5 or gpt-5-mini") | |
| temperature = gr.Slider(0.0, 1.0, value=0.2, step=0.05, label="Temperature") | |
| gr.ChatInterface( | |
| fn=rag_chat_fn, | |
| additional_inputs=[top_k, n_sentences, include_passages, use_llm, model_name, temperature, strict_quotes_only, w_tfidf, w_bm25, w_emb], | |
| title="Literature Q&A", | |
| description="Hybrid retrieval with diversity. Answers carry inline (Doc, p.X) citations. Toggle strict/LLM modes." | |
| ) | |
| # Note: add share=True to expose publicly (for iframe embedding) | |
| demo.queue().launch() | |