Spaces:
Sleeping
Sleeping
| # rag_core.py — RAG core + logging + grid evaluation (no UI) | |
| import os | |
| import re | |
| import json | |
| import time | |
| import uuid | |
| import traceback | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| # ---------------------- Optional deps ---------------------- # | |
| USE_DENSE = True | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except Exception: | |
| USE_DENSE = False | |
| try: | |
| from rank_bm25 import BM25Okapi | |
| except Exception: | |
| BM25Okapi = None | |
| print("rank_bm25 not installed; BM25 disabled (TF-IDF still works).") | |
| # Optional OpenAI (for LLM synthesis; not needed for retrieval eval) | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") | |
| try: | |
| from openai import OpenAI | |
| except Exception: | |
| OpenAI = None | |
| LLM_AVAILABLE = ( | |
| OPENAI_API_KEY is not None | |
| and OPENAI_API_KEY.strip() != "" | |
| and OpenAI is not None | |
| ) | |
| # -------------------------- Paths & artifacts --------------------------- # | |
| ARTIFACT_DIR = Path("rag_artifacts") | |
| ARTIFACT_DIR.mkdir(exist_ok=True) | |
| LOCAL_PDF_DIR = Path("papers") | |
| LOCAL_PDF_DIR.mkdir(exist_ok=True) | |
| TFIDF_VECT_PATH = ARTIFACT_DIR / "tfidf_vectorizer.joblib" | |
| TFIDF_MAT_PATH = ARTIFACT_DIR / "tfidf_matrix.joblib" | |
| BM25_TOK_PATH = ARTIFACT_DIR / "bm25_tokens.joblib" | |
| EMB_NPY_PATH = ARTIFACT_DIR / "chunk_embeddings.npy" | |
| RAG_META_PATH = ARTIFACT_DIR / "chunks.parquet" | |
| LOG_PATH = ARTIFACT_DIR / "rag_logs.jsonl" | |
| USE_ONLINE_SOURCES = os.getenv("USE_ONLINE_SOURCES", "false").lower() == "true" | |
| # default hybrid weights | |
| W_TFIDF_DEFAULT = 0.50 if not USE_DENSE else 0.30 | |
| W_BM25_DEFAULT = 0.50 if not USE_DENSE else 0.30 | |
| W_EMB_DEFAULT = 0.00 if not USE_DENSE else 0.40 | |
| # -------------------------- basic text helpers -------------------------- # | |
| _SENT_SPLIT_RE = re.compile(r"(?<=[.!?])\s+|\n+") | |
| TOKEN_RE = re.compile(r"[A-Za-z0-9_#+\-/\.%]+") | |
| def sent_split(text: str) -> List[str]: | |
| sents = [s.strip() for s in _SENT_SPLIT_RE.split(text) if s.strip()] | |
| return [s for s in sents if len(s.split()) >= 5] | |
| def tokenize(text: str) -> List[str]: | |
| return [t.lower() for t in TOKEN_RE.findall(text)] | |
| # -------------------------- PDF text extraction ------------------------ # | |
| def _extract_pdf_text(pdf_path: Path) -> str: | |
| try: | |
| import fitz # PyMuPDF | |
| doc = fitz.open(pdf_path) | |
| out = [] | |
| for i, page in enumerate(doc): | |
| out.append(f"[[PAGE={i+1}]]\n{page.get_text('text') or ''}") | |
| return "\n\n".join(out) | |
| except Exception: | |
| try: | |
| from pypdf import PdfReader | |
| reader = PdfReader(str(pdf_path)) | |
| out = [] | |
| for i, p in enumerate(reader.pages): | |
| txt = p.extract_text() or "" | |
| out.append(f"[[PAGE={i+1}]]\n{txt}") | |
| return "\n\n".join(out) | |
| except Exception as e: | |
| print(f"PDF read error ({pdf_path}): {e}") | |
| return "" | |
| def chunk_by_sentence_windows(text: str, win_size: int = 8, overlap: int = 2) -> List[str]: | |
| sents = sent_split(text) | |
| chunks, step = [], max(1, win_size - overlap) | |
| for i in range(0, len(sents), step): | |
| window = sents[i:i+win_size] | |
| if not window: | |
| break | |
| chunks.append(" ".join(window)) | |
| return chunks | |
| # -------------------------- dense encoder -------------------------- # | |
| def _safe_init_st_model(name: str): | |
| global USE_DENSE | |
| if not USE_DENSE: | |
| return None | |
| try: | |
| return SentenceTransformer(name) | |
| except Exception as e: | |
| print("Dense embeddings unavailable:", e) | |
| USE_DENSE = False | |
| return None | |
| # --------------------- build / load hybrid index --------------------- # | |
| def build_or_load_hybrid(pdf_dir: Path): | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import joblib | |
| have_cache = ( | |
| TFIDF_VECT_PATH.exists() | |
| and TFIDF_MAT_PATH.exists() | |
| and RAG_META_PATH.exists() | |
| and (BM25_TOK_PATH.exists() or BM25Okapi is None) | |
| and (EMB_NPY_PATH.exists() or not USE_DENSE) | |
| ) | |
| if have_cache: | |
| vectorizer = joblib.load(TFIDF_VECT_PATH) | |
| X_tfidf = joblib.load(TFIDF_MAT_PATH) | |
| meta = pd.read_parquet(RAG_META_PATH) | |
| bm25_toks = joblib.load(BM25_TOK_PATH) if BM25Okapi is not None else None | |
| emb = np.load(EMB_NPY_PATH) if (USE_DENSE and EMB_NPY_PATH.exists()) else None | |
| return vectorizer, X_tfidf, meta, bm25_toks, emb | |
| rows, all_tokens = [], [] | |
| pdf_paths = list(pdf_dir.glob("**/*.pdf")) | |
| print(f"Indexing PDFs in {pdf_dir} — found {len(pdf_paths)} file(s).") | |
| for pdf in pdf_paths: | |
| raw = _extract_pdf_text(pdf) | |
| if not raw.strip(): | |
| continue | |
| for i, ch in enumerate(chunk_by_sentence_windows(raw, win_size=8, overlap=2)): | |
| rows.append({"doc_path": str(pdf), "chunk_id": i, "text": ch}) | |
| all_tokens.append(tokenize(ch)) | |
| if not rows: | |
| meta = pd.DataFrame(columns=["doc_path", "chunk_id", "text"]) | |
| return None, None, meta, None, None | |
| meta = pd.DataFrame(rows) | |
| vectorizer = TfidfVectorizer( | |
| ngram_range=(1, 2), | |
| min_df=1, | |
| max_df=0.95, | |
| sublinear_tf=True, | |
| smooth_idf=True, | |
| lowercase=True, | |
| token_pattern=r"(?u)\b\w[\w\-\./%+#]*\b", | |
| ) | |
| X_tfidf = vectorizer.fit_transform(meta["text"].tolist()) | |
| emb = None | |
| if USE_DENSE: | |
| try: | |
| st_model = _safe_init_st_model( | |
| os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2") | |
| ) | |
| if st_model is not None: | |
| from sklearn.preprocessing import normalize as sk_normalize | |
| em = st_model.encode( | |
| meta["text"].tolist(), | |
| batch_size=64, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| ) | |
| emb = sk_normalize(em) | |
| np.save(EMB_NPY_PATH, emb) | |
| except Exception as e: | |
| print("Dense embedding failed:", e) | |
| emb = None | |
| import joblib | |
| joblib.dump(vectorizer, TFIDF_VECT_PATH) | |
| joblib.dump(X_tfidf, TFIDF_MAT_PATH) | |
| if BM25Okapi is not None: | |
| joblib.dump(all_tokens, BM25_TOK_PATH) | |
| meta.to_parquet(RAG_META_PATH, index=False) | |
| return vectorizer, X_tfidf, meta, all_tokens, emb | |
| tfidf_vectorizer, tfidf_matrix, rag_meta, bm25_tokens, emb_matrix = build_or_load_hybrid( | |
| LOCAL_PDF_DIR | |
| ) | |
| bm25 = BM25Okapi(bm25_tokens) if (BM25Okapi is not None and bm25_tokens is not None) else None | |
| st_query_model = _safe_init_st_model( | |
| os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2") | |
| ) | |
| # -------------------------- hybrid retrieval -------------------------- # | |
| def _extract_page(text_chunk: str) -> str: | |
| m = list(re.finditer(r"\[\[PAGE=(\d+)\]\]", text_chunk or "")) | |
| return m[-1].group(1) if m else "?" | |
| def hybrid_search( | |
| query: str, | |
| k: int = 8, | |
| w_tfidf: float = W_TFIDF_DEFAULT, | |
| w_bm25: float = W_BM25_DEFAULT, | |
| w_emb: float = W_EMB_DEFAULT, | |
| ) -> pd.DataFrame: | |
| if rag_meta is None or rag_meta.empty: | |
| return pd.DataFrame() | |
| n_chunks = len(rag_meta) | |
| # dense scores | |
| if USE_DENSE and st_query_model is not None and emb_matrix is not None and w_emb > 0: | |
| try: | |
| from sklearn.preprocessing import normalize as sk_normalize | |
| q_emb = st_query_model.encode([query], convert_to_numpy=True) | |
| q_emb = sk_normalize(q_emb)[0] | |
| dense_scores = emb_matrix @ q_emb | |
| except Exception as e: | |
| print("Dense query encoding failed:", e) | |
| dense_scores = np.zeros(n_chunks) | |
| w_emb = 0.0 | |
| else: | |
| dense_scores = np.zeros(n_chunks) | |
| w_emb = 0.0 | |
| # tf-idf | |
| if tfidf_vectorizer is not None and tfidf_matrix is not None: | |
| q_vec = tfidf_vectorizer.transform([query]) | |
| tfidf_scores = (tfidf_matrix @ q_vec.T).toarray().ravel() | |
| else: | |
| tfidf_scores = np.zeros(n_chunks) | |
| w_tfidf = 0.0 | |
| # bm25 | |
| if bm25 is not None: | |
| q_tokens = [t.lower() for t in TOKEN_RE.findall(query)] | |
| bm25_scores = np.array(bm25.get_scores(q_tokens), dtype=float) | |
| else: | |
| bm25_scores = np.zeros(n_chunks) | |
| w_bm25 = 0.0 | |
| def _norm(x): | |
| x = np.asarray(x, dtype=float) | |
| if np.allclose(x.max(), x.min()): | |
| return np.zeros_like(x) | |
| return (x - x.min()) / (x.max() - x.min()) | |
| s_dense = _norm(dense_scores) | |
| s_tfidf = _norm(tfidf_scores) | |
| s_bm25 = _norm(bm25_scores) | |
| total_w = (w_tfidf + w_bm25 + w_emb) or 1.0 | |
| w_tfidf, w_bm25, w_emb = ( | |
| w_tfidf / total_w, | |
| w_bm25 / total_w, | |
| w_emb / total_w, | |
| ) | |
| combo = w_emb * s_dense + w_tfidf * s_tfidf + w_bm25 * s_bm25 | |
| idx = np.argsort(-combo)[:k] | |
| hits = rag_meta.iloc[idx].copy() | |
| hits["score_dense"] = s_dense[idx] | |
| hits["score_tfidf"] = s_tfidf[idx] | |
| hits["score_bm25"] = s_bm25[idx] | |
| hits["score"] = combo[idx] | |
| return hits.reset_index(drop=True) | |
| # --------------------- MMR sentence selection --------------------- # | |
| def split_sentences(text: str) -> List[str]: | |
| sents = sent_split(text) | |
| return [s for s in sents if 6 <= len(s.split()) <= 60] | |
| def mmr_select_sentences( | |
| question: str, | |
| hits: pd.DataFrame, | |
| top_n: int = 4, | |
| pool_per_chunk: int = 6, | |
| lambda_div: float = 0.7, | |
| ) -> List[Dict[str, Any]]: | |
| pool = [] | |
| for _, row in hits.iterrows(): | |
| doc = Path(row["doc_path"]).name | |
| page = _extract_page(row["text"]) | |
| sents = split_sentences(row["text"]) | |
| if not sents: | |
| continue | |
| for s in sents[:max(1, int(pool_per_chunk))]: | |
| pool.append({"sent": s, "doc": doc, "page": page}) | |
| if not pool: | |
| return [] | |
| sent_texts = [p["sent"] for p in pool] | |
| use_dense = USE_DENSE and st_query_model is not None | |
| try: | |
| if use_dense: | |
| from sklearn.preprocessing import normalize as sk_normalize | |
| enc = st_query_model.encode([question] + sent_texts, convert_to_numpy=True) | |
| q_vec = sk_normalize(enc[:1])[0] | |
| S = sk_normalize(enc[1:]) | |
| rel = S @ q_vec | |
| def sim_fn(i, j): return float(S[i] @ S[j]) | |
| else: | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| vect = TfidfVectorizer().fit(sent_texts + [question]) | |
| Q = vect.transform([question]) | |
| S = vect.transform(sent_texts) | |
| rel = (S @ Q.T).toarray().ravel() | |
| def sim_fn(i, j): | |
| num = (S[i] @ S[j].T) | |
| return float(num.toarray()[0, 0]) if hasattr(num, "toarray") else float(num) | |
| except Exception: | |
| rel = np.ones(len(sent_texts)) | |
| def sim_fn(i, j): return 0.0 | |
| lambda_div = float(np.clip(lambda_div, 0.0, 1.0)) | |
| remain = list(range(len(pool))) | |
| first = int(np.argmax(rel)) | |
| selected_idx = [first] | |
| selected = [pool[first]] | |
| remain.remove(first) | |
| max_pick = min(int(top_n), len(pool)) | |
| while len(selected) < max_pick and remain: | |
| cand_scores: List[Tuple[float, int]] = [] | |
| for i in remain: | |
| div_i = max(sim_fn(i, j) for j in selected_idx) if selected_idx else 0.0 | |
| score = lambda_div * float(rel[i]) - (1.0 - lambda_div) * div_i | |
| cand_scores.append((score, i)) | |
| cand_scores.sort(reverse=True) | |
| _, best_i = cand_scores[0] | |
| selected_idx.append(best_i) | |
| selected.append(pool[best_i]) | |
| remain.remove(best_i) | |
| return selected | |
| def compose_extractive(selected: List[Dict[str, Any]]) -> str: | |
| if not selected: | |
| return "" | |
| return " ".join(f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected) | |
| # --------------------------- logging helpers --------------------------- # | |
| OPENAI_IN_COST_PER_1K = float(os.getenv("OPENAI_COST_IN_PER_1K", "0")) | |
| OPENAI_OUT_COST_PER_1K = float(os.getenv("OPENAI_COST_OUT_PER_1K", "0")) | |
| def _safe_write_jsonl(path: Path, record: dict): | |
| try: | |
| with open(path, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| except Exception as e: | |
| print("[Log] write failed:", e) | |
| def _calc_cost_usd(prompt_toks, completion_toks): | |
| if prompt_toks is None or completion_toks is None: | |
| return None | |
| return (prompt_toks / 1000.0) * OPENAI_IN_COST_PER_1K + ( | |
| completion_toks / 1000.0 | |
| ) * OPENAI_OUT_COST_PER_1K | |
| # ------------------------ optional LLM synthesis ------------------------ # | |
| def synthesize_with_llm( | |
| question: str, | |
| sentence_lines: List[str], | |
| model: Optional[str] = None, | |
| temperature: float = 0.2, | |
| ): | |
| if not LLM_AVAILABLE: | |
| return None, None | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| model = model or OPENAI_MODEL | |
| SYSTEM_PROMPT = ( | |
| "You are a scientific assistant for self-sensing cementitious materials.\n" | |
| "Answer STRICTLY using the provided sentences.\n" | |
| "Do not invent facts. Keep it concise (3–6 sentences).\n" | |
| "Retain inline citations like (Doc.pdf, p.X) exactly as given." | |
| ) | |
| user_prompt = ( | |
| f"Question: {question}\n\n" | |
| "Use ONLY these sentences to answer; keep their inline citations:\n" | |
| + "\n".join(f"- {s}" for s in sentence_lines) | |
| ) | |
| try: | |
| resp = client.responses.create( | |
| model=model, | |
| input=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=temperature, | |
| ) | |
| out_text = getattr(resp, "output_text", None) or str(resp) | |
| usage = None | |
| try: | |
| u = getattr(resp, "usage", None) | |
| if u: | |
| pt = getattr(u, "prompt_tokens", None) if hasattr(u, "prompt_tokens") else u.get("prompt_tokens", None) | |
| ct = getattr(u, "completion_tokens", None) if hasattr(u, "completion_tokens") else u.get("completion_tokens", None) | |
| usage = {"prompt_tokens": pt, "completion_tokens": ct} | |
| except Exception: | |
| usage = None | |
| return out_text, usage | |
| except Exception: | |
| return None, None | |
| # ------------------- main RAG reply (with config_id) ------------------- # | |
| def rag_reply( | |
| question: str, | |
| k: int = 8, | |
| n_sentences: int = 4, | |
| include_passages: bool = False, | |
| use_llm: bool = False, | |
| model: Optional[str] = None, | |
| temperature: float = 0.2, | |
| strict_quotes_only: bool = False, | |
| w_tfidf: float = W_TFIDF_DEFAULT, | |
| w_bm25: float = W_BM25_DEFAULT, | |
| w_emb: float = W_EMB_DEFAULT, | |
| config_id: Optional[str] = None, | |
| ) -> str: | |
| run_id = str(uuid.uuid4()) | |
| t0_total = time.time() | |
| t0_retr = time.time() | |
| hits = hybrid_search( | |
| question, | |
| k=int(k), | |
| w_tfidf=float(w_tfidf), | |
| w_bm25=float(w_bm25), | |
| w_emb=float(w_emb), | |
| ) | |
| t1_retr = time.time() | |
| latency_ms_retriever = int((t1_retr - t0_retr) * 1000) | |
| if hits is None or hits.empty: | |
| final = "No indexed PDFs found." | |
| record = { | |
| "run_id": run_id, | |
| "ts": int(time.time() * 1000), | |
| "inputs": { | |
| "question": question, | |
| "top_k": int(k), | |
| "n_sentences": int(n_sentences), | |
| "w_tfidf": float(w_tfidf), | |
| "w_bm25": float(w_bm25), | |
| "w_emb": float(w_emb), | |
| "use_llm": bool(use_llm), | |
| "model": model, | |
| "temperature": float(temperature), | |
| "config_id": config_id, | |
| }, | |
| "retrieval": {"hits": [], "latency_ms_retriever": latency_ms_retriever}, | |
| "output": {"final_answer": final, "used_sentences": []}, | |
| "latency_ms_total": int((time.time() - t0_total) * 1000), | |
| "openai": None, | |
| } | |
| _safe_write_jsonl(LOG_PATH, record) | |
| return final | |
| selected = mmr_select_sentences( | |
| question, hits, top_n=int(n_sentences), pool_per_chunk=6, lambda_div=0.7 | |
| ) | |
| header_cites = "; ".join( | |
| f"{Path(r['doc_path']).name} (p.{_extract_page(r['text'])})" | |
| for _, r in hits.head(6).iterrows() | |
| ) | |
| srcs = {Path(r["doc_path"]).name for _, r in hits.iterrows()} | |
| coverage_note = ( | |
| "" | |
| if len(srcs) >= 3 | |
| else f"\n\n> Note: Only {len(srcs)} unique source(s). Add more PDFs or increase Top-K." | |
| ) | |
| retr_list = [] | |
| for _, r in hits.iterrows(): | |
| retr_list.append( | |
| { | |
| "doc": Path(r["doc_path"]).name, | |
| "page": _extract_page(r["text"]), | |
| "score_tfidf": float(r.get("score_tfidf", 0.0)), | |
| "score_bm25": float(r.get("score_bm25", 0.0)), | |
| "score_dense": float(r.get("score_dense", 0.0)), | |
| "combo_score": float(r.get("score", 0.0)), | |
| } | |
| ) | |
| # retrieval-only / strict quotations (useful for grid eval) | |
| if strict_quotes_only: | |
| if not selected: | |
| final = ( | |
| f"**Quoted Passages:**\n\n---\n" | |
| + "\n\n".join(hits["text"].tolist()[:2]) | |
| + f"\n\n**Citations:** {header_cites}{coverage_note}" | |
| ) | |
| else: | |
| final = "**Quoted Passages:**\n- " + "\n- ".join( | |
| f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected | |
| ) | |
| final += f"\n\n**Citations:** {header_cites}{coverage_note}" | |
| if include_passages: | |
| final += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| record = { | |
| "run_id": run_id, | |
| "ts": int(time.time() * 1000), | |
| "inputs": { | |
| "question": question, | |
| "top_k": int(k), | |
| "n_sentences": int(n_sentences), | |
| "w_tfidf": float(w_tfidf), | |
| "w_bm25": float(w_bm25), | |
| "w_emb": float(w_emb), | |
| "use_llm": False, | |
| "model": None, | |
| "temperature": float(temperature), | |
| "config_id": config_id, | |
| }, | |
| "retrieval": {"hits": retr_list, "latency_ms_retriever": latency_ms_retriever}, | |
| "output": { | |
| "final_answer": final, | |
| "used_sentences": [ | |
| {"sent": s["sent"], "doc": s["doc"], "page": s["page"]} | |
| for s in selected | |
| ], | |
| }, | |
| "latency_ms_total": int((time.time() - t0_total) * 1000), | |
| "openai": None, | |
| } | |
| _safe_write_jsonl(LOG_PATH, record) | |
| return final | |
| # extractive / LLM synthesis | |
| extractive = compose_extractive(selected) | |
| llm_usage = None | |
| llm_latency_ms = None | |
| if use_llm and selected: | |
| lines = [f"{s['sent']} ({s['doc']}, p.{s['page']})" for s in selected] | |
| t0_llm = time.time() | |
| llm_text, llm_usage = synthesize_with_llm( | |
| question, lines, model=model, temperature=temperature | |
| ) | |
| t1_llm = time.time() | |
| llm_latency_ms = int((t1_llm - t0_llm) * 1000) | |
| if llm_text: | |
| final = ( | |
| f"**Answer (LLM synthesis):** {llm_text}\n\n" | |
| f"**Citations:** {header_cites}{coverage_note}" | |
| ) | |
| if include_passages: | |
| final += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| else: | |
| if not extractive: | |
| final = ( | |
| f"**Answer:** Here are relevant passages.\n\n" | |
| f"**Citations:** {header_cites}{coverage_note}\n\n---\n" | |
| + "\n\n".join(hits["text"].tolist()[:2]) | |
| ) | |
| else: | |
| final = ( | |
| f"**Answer:** {extractive}\n\n" | |
| f"**Citations:** {header_cites}{coverage_note}" | |
| ) | |
| if include_passages: | |
| final += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| else: | |
| if not extractive: | |
| final = ( | |
| f"**Answer:** Here are relevant passages.\n\n" | |
| f"**Citations:** {header_cites}{coverage_note}\n\n---\n" | |
| + "\n\n".join(hits["text"].tolist()[:2]) | |
| ) | |
| else: | |
| final = ( | |
| f"**Answer:** {extractive}\n\n" | |
| f"**Citations:** {header_cites}{coverage_note}" | |
| ) | |
| if include_passages: | |
| final += "\n\n---\n" + "\n\n".join(hits["text"].tolist()[:2]) | |
| prompt_toks = llm_usage.get("prompt_tokens") if llm_usage else None | |
| completion_toks = llm_usage.get("completion_tokens") if llm_usage else None | |
| cost_usd = _calc_cost_usd(prompt_toks, completion_toks) | |
| total_ms = int((time.time() - t0_total) * 1000) | |
| record = { | |
| "run_id": run_id, | |
| "ts": int(time.time() * 1000), | |
| "inputs": { | |
| "question": question, | |
| "top_k": int(k), | |
| "n_sentences": int(n_sentences), | |
| "w_tfidf": float(w_tfidf), | |
| "w_bm25": float(w_bm25), | |
| "w_emb": float(w_emb), | |
| "use_llm": bool(use_llm), | |
| "model": model, | |
| "temperature": float(temperature), | |
| "config_id": config_id, | |
| }, | |
| "retrieval": {"hits": retr_list, "latency_ms_retriever": latency_ms_retriever}, | |
| "output": { | |
| "final_answer": final, | |
| "used_sentences": [ | |
| {"sent": s["sent"], "doc": s["doc"], "page": s["page"]} | |
| for s in selected | |
| ], | |
| }, | |
| "latency_ms_total": total_ms, | |
| "latency_ms_llm": llm_latency_ms, | |
| "openai": { | |
| "prompt_tokens": prompt_toks, | |
| "completion_tokens": completion_toks, | |
| "cost_usd": cost_usd, | |
| } | |
| if use_llm | |
| else None, | |
| } | |
| _safe_write_jsonl(LOG_PATH, record) | |
| return final | |
| # --------------- automated grid evaluation over weights --------------- # | |
| def run_weight_grid_eval( | |
| gold_csv: str, | |
| weight_grid: List[Dict[str, float]], | |
| k: int = 8, | |
| n_sentences: int = 4, | |
| ) -> None: | |
| """ | |
| Automatically evaluate many (w_tfidf, w_bm25, w_emb) combinations | |
| on the full gold question set. | |
| - Reads questions from gold_csv (column 'question') | |
| - For each configuration in weight_grid, calls rag_reply(...) | |
| with use_llm=False and strict_quotes_only=True | |
| - All runs are logged into rag_logs.jsonl with a 'config_id' | |
| and the exact weights. | |
| """ | |
| gold_df = pd.read_csv(gold_csv) | |
| if "question" not in gold_df.columns: | |
| raise ValueError("gold_csv must contain a 'question' column.") | |
| questions = gold_df["question"].astype(str).tolist() | |
| for cfg in weight_grid: | |
| wt = float(cfg.get("w_tfidf", 0.0)) | |
| wb = float(cfg.get("w_bm25", 0.0)) | |
| we = float(cfg.get("w_emb", 0.0)) | |
| cid = cfg.get("id") or f"tfidf{wt}_bm25{wb}_emb{we}" | |
| print( | |
| f"\n[GridEval] Running config {cid} " | |
| f"(w_tfidf={wt}, w_bm25={wb}, w_emb={we}, k={k})" | |
| ) | |
| for q in questions: | |
| _ = rag_reply( | |
| question=q, | |
| k=int(k), | |
| n_sentences=int(n_sentences), | |
| include_passages=False, | |
| use_llm=False, | |
| model=None, | |
| temperature=0.0, | |
| strict_quotes_only=True, | |
| w_tfidf=wt, | |
| w_bm25=wb, | |
| w_emb=we, | |
| config_id=cid, | |
| ) | |
| print("✅ RAG core + grid evaluation helpers loaded.") | |