|
|
""" |
|
|
qa.py β Phi-2 FAST + ReRank (stable) β Prefer semantic ranking, neighbor-fill last-resort |
|
|
--------------------------------------------------------------------------------------- |
|
|
- Uses intfloat/e5-small-v2 for embeddings |
|
|
- Uses microsoft/phi-2 for generation |
|
|
- Re-ranks candidate pool from FAISS then picks top_k by true cosine similarity |
|
|
- Neighbor expansion only if not enough high-sim items |
|
|
- Logs chunk indices + similarity scores for debugging |
|
|
""" |
|
|
|
|
|
import os |
|
|
import numpy as np |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
|
import torch |
|
|
|
|
|
print("β
qa.py (Phi-2 FAST + ReRank stable) loaded from:", __file__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_DIR = "/tmp/hf_cache" |
|
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
os.environ.update({ |
|
|
"HF_HOME": CACHE_DIR, |
|
|
"TRANSFORMERS_CACHE": CACHE_DIR, |
|
|
"HF_DATASETS_CACHE": CACHE_DIR, |
|
|
"HF_MODULES_CACHE": CACHE_DIR |
|
|
}) |
|
|
print(f"β
Using Hugging Face cache at {CACHE_DIR}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
_query_model = SentenceTransformer("intfloat/e5-small-v2", cache_folder=CACHE_DIR) |
|
|
print("β
Loaded embedding model: intfloat/e5-small-v2") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Embedding load failed ({e}), falling back to MiniLM") |
|
|
_query_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", cache_folder=CACHE_DIR) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_NAME = "microsoft/phi-2" |
|
|
print(f"β
Loading LLM: {MODEL_NAME}") |
|
|
|
|
|
_tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, cache_dir=CACHE_DIR) |
|
|
_model = AutoModelForCausalLM.from_pretrained( |
|
|
MODEL_NAME, |
|
|
cache_dir=CACHE_DIR, |
|
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.bfloat16, |
|
|
low_cpu_mem_usage=True, |
|
|
).to("cpu") |
|
|
|
|
|
_answer_model = pipeline( |
|
|
"text-generation", |
|
|
model=_model, |
|
|
tokenizer=_tokenizer, |
|
|
device=-1, |
|
|
model_kwargs={"torch_dtype": torch.bfloat16, "low_cpu_mem_usage": True}, |
|
|
) |
|
|
print("β
Phi-2 text-generation pipeline ready (optimized).") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
STRICT_PROMPT = ( |
|
|
"You are an enterprise documentation assistant.\n" |
|
|
"Use ONLY the CONTEXT chunks below to answer the QUESTION.\n" |
|
|
"Cite the chunk number(s) you used, e.g. [Chunk 3].\n" |
|
|
"If the document does not contain the answer, reply exactly:\n" |
|
|
"\"I don't know based on the provided document.\"\n\n" |
|
|
"Context:\n{context}\n\nQuestion: {query}\nAnswer:" |
|
|
) |
|
|
|
|
|
REASONING_PROMPT = ( |
|
|
"You are an expert enterprise assistant with reasoning capacity.\n" |
|
|
"Prefer the provided CONTEXT but you may cautiously infer when reasonable.\n" |
|
|
"If you infer, say so and prefer facts from the document.\n" |
|
|
"If the document lacks the answer, say:\n" |
|
|
"\"I don't know based on the provided document.\"\n\n" |
|
|
"Context:\n{context}\n\nQuestion: {query}\nAnswer:" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def retrieve_chunks(query: str, index, chunks: list, top_k: int = 3, min_similarity: float = 0.55, candidate_multiplier: int = 4): |
|
|
""" |
|
|
Steps: |
|
|
1. Encode query (E5 style). |
|
|
2. Run FAISS search for k*candidate_multiplier candidates. |
|
|
3. Re-embed those candidate texts and compute cosine similarity with query embedding. |
|
|
4. Sort by similarity and pick top_k where similarity >= min_similarity. |
|
|
5. If fewer than top_k passed threshold, fill remaining slots by: |
|
|
- selecting neighboring chunks around the *highest-scoring* chunk(s), |
|
|
but only if absolutely necessary (keeps noise low). |
|
|
Returns: ordered list of chunks (strings) |
|
|
Also prints indices + similarity scores for debugging. |
|
|
""" |
|
|
|
|
|
if not index or not chunks: |
|
|
return [] |
|
|
|
|
|
try: |
|
|
|
|
|
q_emb = _query_model.encode( |
|
|
[f"query: {query.strip()}"], |
|
|
convert_to_numpy=True, |
|
|
normalize_embeddings=True |
|
|
)[0] |
|
|
|
|
|
|
|
|
num_candidates = max(top_k * candidate_multiplier, top_k + 2) |
|
|
distances, indices = index.search(np.array([q_emb]).astype("float32"), num_candidates) |
|
|
candidate_indices = [int(i) for i in indices[0] if i >= 0] |
|
|
|
|
|
|
|
|
candidate_indices = list(dict.fromkeys(candidate_indices)) |
|
|
|
|
|
|
|
|
candidate_texts = [chunks[i] for i in candidate_indices] |
|
|
|
|
|
doc_embs = _query_model.encode( |
|
|
[f"passage: {c}" for c in candidate_texts], |
|
|
convert_to_numpy=True, |
|
|
normalize_embeddings=True |
|
|
) |
|
|
sims = cosine_similarity([q_emb], doc_embs)[0] |
|
|
|
|
|
|
|
|
paired = [(candidate_indices[i], float(sims[i])) for i in range(len(candidate_indices))] |
|
|
paired_sorted = sorted(paired, key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
|
print("π Candidate ranking (index : sim):") |
|
|
for idx, sim in paired_sorted[: min(len(paired_sorted), top_k * 3)]: |
|
|
print(f" - Chunk {idx} : {sim:.4f}") |
|
|
|
|
|
|
|
|
selected = [idx for idx, sim in paired_sorted if sim >= min_similarity] |
|
|
|
|
|
|
|
|
selected = selected[:top_k] |
|
|
|
|
|
|
|
|
if len(selected) < top_k: |
|
|
needed = top_k - len(selected) |
|
|
|
|
|
anchors = [idx for idx, _ in paired_sorted[:3]] |
|
|
expanded = [] |
|
|
for a in anchors: |
|
|
|
|
|
if a not in expanded: |
|
|
expanded.append(a) |
|
|
offset = 1 |
|
|
while len(expanded) < top_k and offset < 5: |
|
|
for cand in (a - offset, a + offset): |
|
|
if 0 <= cand < len(chunks) and cand not in expanded: |
|
|
expanded.append(cand) |
|
|
if len(expanded) >= top_k: |
|
|
break |
|
|
offset += 1 |
|
|
if len(expanded) >= top_k: |
|
|
break |
|
|
|
|
|
final_order = [] |
|
|
for idx, _sim in paired_sorted: |
|
|
if idx in selected and idx not in final_order: |
|
|
final_order.append(idx) |
|
|
for idx in expanded: |
|
|
if idx not in final_order: |
|
|
final_order.append(idx) |
|
|
selected = final_order[:top_k] |
|
|
|
|
|
|
|
|
final_chunks = [chunks[i] for i in selected] |
|
|
|
|
|
print(f"β
retrieve_chunks: returning {len(final_chunks)} chunks (top_k={top_k}, min_sim={min_similarity})") |
|
|
print(f" chunk indices: {selected}") |
|
|
|
|
|
|
|
|
return final_chunks |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β οΈ Retrieval error: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_answer(query: str, retrieved_chunks: list, reasoning_mode: bool = False): |
|
|
""" |
|
|
- reasoning_mode=False => strict factual, deterministic |
|
|
- reasoning_mode=True => allow cautious inference (slower / longer) |
|
|
""" |
|
|
if not retrieved_chunks: |
|
|
return "Sorry, I couldnβt find relevant information in the document." |
|
|
|
|
|
|
|
|
context_lines = [] |
|
|
for i, chunk in enumerate(retrieved_chunks, start=1): |
|
|
|
|
|
context_lines.append(f"[Chunk {i}]: {chunk.strip()}") |
|
|
context = "\n".join(context_lines) |
|
|
|
|
|
prompt = (REASONING_PROMPT if reasoning_mode else STRICT_PROMPT).format( |
|
|
context=context, query=query |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
if reasoning_mode: |
|
|
max_new_tokens = 220 |
|
|
temp = 0.6 |
|
|
do_sample = True |
|
|
else: |
|
|
max_new_tokens = 140 |
|
|
temp = 0.0 |
|
|
do_sample = False |
|
|
|
|
|
result = _answer_model( |
|
|
prompt, |
|
|
max_new_tokens=max_new_tokens, |
|
|
temperature=temp, |
|
|
do_sample=do_sample, |
|
|
early_stopping=True, |
|
|
pad_token_id=_tokenizer.eos_token_id, |
|
|
) |
|
|
|
|
|
text = result[0].get("generated_text", "").strip() |
|
|
|
|
|
if "Answer:" in text: |
|
|
out = text.split("Answer:")[-1].strip() |
|
|
else: |
|
|
out = text |
|
|
|
|
|
|
|
|
if not reasoning_mode and ("i don't know" in out.lower() or "not present" in out.lower()): |
|
|
return "I don't know based on the provided document." |
|
|
|
|
|
return out |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β οΈ Generation failed: {e}") |
|
|
return "β οΈ Error: Could not generate an answer." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
from vectorstore import build_faiss_index |
|
|
|
|
|
dummy_chunks = [ |
|
|
"Step 1: Open the dashboard and navigate to reports.", |
|
|
"Step 2: Click 'Export' to download a CSV summary.", |
|
|
"Step 3: Review the generated report in your downloads folder.", |
|
|
"Appendix: Communication user creation steps are explained later in this guide." |
|
|
] |
|
|
embeddings = [ |
|
|
_query_model.encode([f"passage: {c}"], convert_to_numpy=True, normalize_embeddings=True)[0] |
|
|
for c in dummy_chunks |
|
|
] |
|
|
index = build_faiss_index(embeddings) |
|
|
|
|
|
query = "How do I create a communication user?" |
|
|
retrieved = retrieve_chunks(query, index, dummy_chunks, top_k=3, min_similarity=0.55) |
|
|
print("π Retrieved:", retrieved) |
|
|
print("π¬ Answer:", generate_answer(query, retrieved, reasoning_mode=False)) |
|
|
|