AsyncRAG / rag.py
Zubaish
update
cf1df19
import os
import torch
from transformers import pipeline
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from config import EMBEDDING_MODEL, LLM_MODEL, CHROMA_DIR, LLM_TASK
# 1. Initialize Embeddings
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
# 2. Load Vector DB
if os.path.exists(CHROMA_DIR) and any(os.scandir(CHROMA_DIR)):
vectordb = Chroma(persist_directory=CHROMA_DIR, embedding_function=embeddings)
print("✅ Vector DB loaded successfully")
else:
vectordb = None
print("⚠️ Vector DB folder missing or empty")
# 3. LLM Pipeline - Optimized for CPU stability
qa_pipeline = pipeline(
LLM_TASK,
model=LLM_MODEL,
device_map="cpu",
max_new_tokens=256, # Sufficient for detailed answers
trust_remote_code=True,
model_kwargs={"torch_dtype": torch.float32} # Safer for CPU
)
def ask_rag_with_status(question: str):
if vectordb is None:
return "Knowledge base not ready.", "ERROR"
# Search for context
docs = vectordb.similarity_search(question, k=3)
context = "\n".join([d.page_content for d in docs])
# Simple, clear prompt for Qwen
prompt = f"Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"
try:
# Generate with specific stopping criteria to prevent "looping"
result = qa_pipeline(
prompt,
do_sample=False, # Use greedy decoding for faster, consistent answers
temperature=0.0,
pad_token_id=qa_pipeline.tokenizer.eos_token_id
)
full_output = result[0]["generated_text"]
# Extract everything after the word "Answer:"
if "Answer:" in full_output:
answer = full_output.split("Answer:")[-1].strip()
else:
answer = full_output.strip()
if not answer:
answer = "I found context in the documents but could not generate a coherent summary. Please rephrase."
return answer, ["Context retrieved", "Qwen generated answer"]
except Exception as e:
print(f"❌ Generation error: {e}")
return "The model timed out while thinking. Try a shorter question.", "TIMEOUT"