Shubham170793's picture
Update src/qa.py
28eda6f verified
raw
history blame
10.6 kB
"""
qa.py β€” Phi-2 FAST + ReRank (stable) β€” Prefer semantic ranking, neighbor-fill last-resort
---------------------------------------------------------------------------------------
- Uses intfloat/e5-small-v2 for embeddings
- Uses microsoft/phi-2 for generation
- Re-ranks candidate pool from FAISS then picks top_k by true cosine similarity
- Neighbor expansion only if not enough high-sim items
- Logs chunk indices + similarity scores for debugging
"""
import os
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
print("βœ… qa.py (Phi-2 FAST + ReRank stable) loaded from:", __file__)
# ---------------------------
# Cache
# ---------------------------
CACHE_DIR = "/tmp/hf_cache"
os.makedirs(CACHE_DIR, exist_ok=True)
os.environ.update({
"HF_HOME": CACHE_DIR,
"TRANSFORMERS_CACHE": CACHE_DIR,
"HF_DATASETS_CACHE": CACHE_DIR,
"HF_MODULES_CACHE": CACHE_DIR
})
print(f"βœ… Using Hugging Face cache at {CACHE_DIR}")
# ---------------------------
# Embeddings
# ---------------------------
try:
_query_model = SentenceTransformer("intfloat/e5-small-v2", cache_folder=CACHE_DIR)
print("βœ… Loaded embedding model: intfloat/e5-small-v2")
except Exception as e:
print(f"⚠️ Embedding load failed ({e}), falling back to MiniLM")
_query_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", cache_folder=CACHE_DIR)
# ---------------------------
# Phi-2 model
# ---------------------------
MODEL_NAME = "microsoft/phi-2"
print(f"βœ… Loading LLM: {MODEL_NAME}")
_tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, cache_dir=CACHE_DIR)
_model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
cache_dir=CACHE_DIR,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.bfloat16,
low_cpu_mem_usage=True,
).to("cpu")
_answer_model = pipeline(
"text-generation",
model=_model,
tokenizer=_tokenizer,
device=-1,
model_kwargs={"torch_dtype": torch.bfloat16, "low_cpu_mem_usage": True},
)
print("βœ… Phi-2 text-generation pipeline ready (optimized).")
# ---------------------------
# Prompts
# ---------------------------
STRICT_PROMPT = (
"You are an enterprise documentation assistant.\n"
"Use ONLY the CONTEXT chunks below to answer the QUESTION.\n"
"Cite the chunk number(s) you used, e.g. [Chunk 3].\n"
"If the document does not contain the answer, reply exactly:\n"
"\"I don't know based on the provided document.\"\n\n"
"Context:\n{context}\n\nQuestion: {query}\nAnswer:"
)
REASONING_PROMPT = (
"You are an expert enterprise assistant with reasoning capacity.\n"
"Prefer the provided CONTEXT but you may cautiously infer when reasonable.\n"
"If you infer, say so and prefer facts from the document.\n"
"If the document lacks the answer, say:\n"
"\"I don't know based on the provided document.\"\n\n"
"Context:\n{context}\n\nQuestion: {query}\nAnswer:"
)
# ---------------------------
# Retrieval: FAISS -> rerank -> neighbor fill (last resort)
# ---------------------------
def retrieve_chunks(query: str, index, chunks: list, top_k: int = 3, min_similarity: float = 0.55, candidate_multiplier: int = 4):
"""
Steps:
1. Encode query (E5 style).
2. Run FAISS search for k*candidate_multiplier candidates.
3. Re-embed those candidate texts and compute cosine similarity with query embedding.
4. Sort by similarity and pick top_k where similarity >= min_similarity.
5. If fewer than top_k passed threshold, fill remaining slots by:
- selecting neighboring chunks around the *highest-scoring* chunk(s),
but only if absolutely necessary (keeps noise low).
Returns: ordered list of chunks (strings)
Also prints indices + similarity scores for debugging.
"""
if not index or not chunks:
return []
try:
# 1. encode query
q_emb = _query_model.encode(
[f"query: {query.strip()}"],
convert_to_numpy=True,
normalize_embeddings=True
)[0]
# 2. FAISS initial retrieval (get a larger candidate pool)
num_candidates = max(top_k * candidate_multiplier, top_k + 2)
distances, indices = index.search(np.array([q_emb]).astype("float32"), num_candidates)
candidate_indices = [int(i) for i in indices[0] if i >= 0]
# protective dedupe and clamp
candidate_indices = list(dict.fromkeys(candidate_indices)) # preserve order, unique
# 3. Re-embed candidate texts and compute true cosine similarity
candidate_texts = [chunks[i] for i in candidate_indices]
# Encode passages (passage prefix helps alignment)
doc_embs = _query_model.encode(
[f"passage: {c}" for c in candidate_texts],
convert_to_numpy=True,
normalize_embeddings=True
)
sims = cosine_similarity([q_emb], doc_embs)[0]
# Pair up indices and sims and sort descending
paired = [(candidate_indices[i], float(sims[i])) for i in range(len(candidate_indices))]
paired_sorted = sorted(paired, key=lambda x: x[1], reverse=True)
# Debug print: top candidates and their similarity
print("πŸ”Ž Candidate ranking (index : sim):")
for idx, sim in paired_sorted[: min(len(paired_sorted), top_k * 3)]:
print(f" - Chunk {idx} : {sim:.4f}")
# 4. Pick those meeting threshold
selected = [idx for idx, sim in paired_sorted if sim >= min_similarity]
# Preserve order by similarity
selected = selected[:top_k]
# 5. If not enough, fill by neighbors around highest-scoring items
if len(selected) < top_k:
needed = top_k - len(selected)
# pick highest scoring indices as anchor(s)
anchors = [idx for idx, _ in paired_sorted[:3]] # top 3 anchors
expanded = []
for a in anchors:
# neighbors ordered by proximity: a, a-1, a+1, a-2, a+2 ...
if a not in expanded:
expanded.append(a)
offset = 1
while len(expanded) < top_k and offset < 5:
for cand in (a - offset, a + offset):
if 0 <= cand < len(chunks) and cand not in expanded:
expanded.append(cand)
if len(expanded) >= top_k:
break
offset += 1
if len(expanded) >= top_k:
break
# final selected: first maintain previously selected, then add neighbors from expanded preserving order
final_order = []
for idx, _sim in paired_sorted:
if idx in selected and idx not in final_order:
final_order.append(idx)
for idx in expanded:
if idx not in final_order:
final_order.append(idx)
selected = final_order[:top_k]
# final chunk strings (ordered by selected list)
final_chunks = [chunks[i] for i in selected]
print(f"βœ… retrieve_chunks: returning {len(final_chunks)} chunks (top_k={top_k}, min_sim={min_similarity})")
print(f" chunk indices: {selected}")
# Also return the indices? (if you want to display chunk numbers in UI, you can)
return final_chunks
except Exception as e:
print(f"⚠️ Retrieval error: {e}")
return []
# ---------------------------
# Answer generation
# ---------------------------
def generate_answer(query: str, retrieved_chunks: list, reasoning_mode: bool = False):
"""
- reasoning_mode=False => strict factual, deterministic
- reasoning_mode=True => allow cautious inference (slower / longer)
"""
if not retrieved_chunks:
return "Sorry, I couldn’t find relevant information in the document."
# Add chunk headings so model can cite them if needed
context_lines = []
for i, chunk in enumerate(retrieved_chunks, start=1):
# Use [Chunk i] markers β€” LLM will echo them when asked to cite sources
context_lines.append(f"[Chunk {i}]: {chunk.strip()}")
context = "\n".join(context_lines)
prompt = (REASONING_PROMPT if reasoning_mode else STRICT_PROMPT).format(
context=context, query=query
)
try:
# deterministic in strict mode
if reasoning_mode:
max_new_tokens = 220
temp = 0.6
do_sample = True
else:
max_new_tokens = 140
temp = 0.0
do_sample = False
result = _answer_model(
prompt,
max_new_tokens=max_new_tokens,
temperature=temp,
do_sample=do_sample,
early_stopping=True,
pad_token_id=_tokenizer.eos_token_id,
)
text = result[0].get("generated_text", "").strip()
# remove the prompt echo if present
if "Answer:" in text:
out = text.split("Answer:")[-1].strip()
else:
out = text
# Enforce exact fallback phrase if model tries to paraphrase missing-answer
if not reasoning_mode and ("i don't know" in out.lower() or "not present" in out.lower()):
return "I don't know based on the provided document."
return out
except Exception as e:
print(f"⚠️ Generation failed: {e}")
return "⚠️ Error: Could not generate an answer."
# ---------------------------
# Local debug main
# ---------------------------
if __name__ == "__main__":
from vectorstore import build_faiss_index
dummy_chunks = [
"Step 1: Open the dashboard and navigate to reports.",
"Step 2: Click 'Export' to download a CSV summary.",
"Step 3: Review the generated report in your downloads folder.",
"Appendix: Communication user creation steps are explained later in this guide."
]
embeddings = [
_query_model.encode([f"passage: {c}"], convert_to_numpy=True, normalize_embeddings=True)[0]
for c in dummy_chunks
]
index = build_faiss_index(embeddings)
query = "How do I create a communication user?"
retrieved = retrieve_chunks(query, index, dummy_chunks, top_k=3, min_similarity=0.55)
print("πŸ” Retrieved:", retrieved)
print("πŸ’¬ Answer:", generate_answer(query, retrieved, reasoning_mode=False))