| | import os |
| | import asyncio |
| | import uuid |
| | from dotenv import load_dotenv |
| | from datasets import Dataset |
| | import pandas as pd |
| | from typing import Sequence, Any, List |
| |
|
| | |
| | from ragas import evaluate |
| | from ragas.metrics import ( |
| | faithfulness, |
| | answer_relevancy, |
| | context_recall, |
| | context_precision, |
| | ) |
| | from ragas.testset import TestsetGenerator |
| | |
| |
|
| | |
| | from langchain_groq import ChatGroq |
| | from langchain_community.document_loaders import PyMuPDFLoader |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain_huggingface import HuggingFaceEmbeddings |
| | from langchain_community.vectorstores import FAISS |
| | from langchain.storage import InMemoryStore |
| | from langchain_community.retrievers import BM25Retriever |
| | from langchain.retrievers import EnsembleRetriever, ContextualCompressionRetriever |
| | from langchain.retrievers.document_compressors.base import BaseDocumentCompressor |
| | from langchain_core.documents import Document |
| | from sentence_transformers.cross_encoder import CrossEncoder |
| | from rag_processor import create_rag_chain |
| | from langchain_community.chat_message_histories import ChatMessageHistory |
| | import fitz |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | class LocalReranker(BaseDocumentCompressor): |
| | model: Any |
| | top_n: int = 3 |
| | class Config: |
| | arbitrary_types_allowed = True |
| | def compress_documents(self, documents: Sequence[Document], query: str, callbacks=None) -> Sequence[Document]: |
| | if not documents: return [] |
| | pairs = [[query, doc.page_content] for doc in documents] |
| | scores = self.model.predict(pairs, show_progress_bar=False) |
| | doc_scores = list(zip(documents, scores)) |
| | sorted_doc_scores = sorted(doc_scores, key=lambda x: x[1], reverse=True) |
| | top_docs = [] |
| | for doc, score in sorted_doc_scores[:self.top_n]: |
| | doc.metadata['rerank_score'] = float(score) |
| | top_docs.append(doc) |
| | return top_docs |
| |
|
| | |
| | def load_pdf_with_fallback(filepath): |
| | """Load PDF using PyMuPDF""" |
| | try: |
| | docs = [] |
| | with fitz.open(filepath) as pdf_doc: |
| | for page_num, page in enumerate(pdf_doc): |
| | text = page.get_text() |
| | if text.strip(): |
| | docs.append(Document( |
| | page_content=text, |
| | metadata={"source": os.path.basename(filepath), "page": page_num + 1} |
| | )) |
| | if docs: |
| | print(f"✓ Successfully loaded PDF: {filepath}") |
| | return docs |
| | else: |
| | raise ValueError("No text content found in PDF.") |
| | except Exception as e: |
| | print(f"✗ PyMuPDF failed for {filepath}: {e}") |
| | raise |
| |
|
| | async def main(): |
| | """Main execution function""" |
| | print("\n" + "="*60 + "\nSTARTING RAGAS EVALUATION\n" + "="*60) |
| | |
| | pdf_path = "uploads/Unit_-_1_Introduction.pdf" |
| | if not os.path.exists(pdf_path): |
| | print(f"✗ Error: PDF not found at {pdf_path}") |
| | return |
| |
|
| | try: |
| | |
| | print("\n--- 1. Initializing Models ---") |
| | groq_api_key = os.getenv("GROQ_API_KEY") |
| | if not groq_api_key or groq_api_key == "your_groq_api_key_here": |
| | raise ValueError("GROQ_API_KEY not found or is a placeholder.") |
| |
|
| | generator_llm = ChatGroq(model="llama-3.1-8b-instant", api_key=groq_api_key) |
| | critic_llm = ChatGroq(model="llama-3.1-70b-versatile", api_key=groq_api_key) |
| | embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
| | reranker_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device='cpu') |
| | print("✓ Models initialized.") |
| |
|
| | |
| | print("\n--- 2. Setting up RAG Pipeline ---") |
| | documents = load_pdf_with_fallback(pdf_path) |
| | |
| | |
| | parent_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=400) |
| | child_splitter = RecursiveCharacterTextSplitter(chunk_size=250, chunk_overlap=50) |
| | parent_docs = parent_splitter.split_documents(documents) |
| | doc_ids = [str(uuid.uuid4()) for _ in parent_docs] |
| | |
| | child_docs = [] |
| | for i, doc in enumerate(parent_docs): |
| | _id = doc_ids[i] |
| | sub_docs = child_splitter.split_documents([doc]) |
| | for child in sub_docs: |
| | child.metadata["doc_id"] = _id |
| | child_docs.extend(sub_docs) |
| |
|
| | store = InMemoryStore() |
| | store.mset(list(zip(doc_ids, parent_docs))) |
| | vectorstore = FAISS.from_documents(child_docs, embedding_model) |
| | |
| | bm25_retriever = BM25Retriever.from_documents(child_docs, k=10) |
| | faiss_retriever = vectorstore.as_retriever(search_kwargs={"k": 10}) |
| | ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, faiss_retriever], weights=[0.4, 0.6]) |
| | |
| | reranker = LocalReranker(model=reranker_model, top_n=5) |
| | compression_retriever = ContextualCompressionRetriever(base_compressor=reranker, base_retriever=ensemble_retriever) |
| | |
| | def get_parents(docs: List[Document]) -> List[Document]: |
| | parent_ids = {d.metadata["doc_id"] for d in docs} |
| | return store.mget(list(parent_ids)) |
| |
|
| | final_retriever = compression_retriever | get_parents |
| | |
| | message_histories = {} |
| | def get_session_history(session_id: str): |
| | if session_id not in message_histories: |
| | message_histories[session_id] = ChatMessageHistory() |
| | return message_histories[session_id] |
| |
|
| | rag_chain = create_rag_chain(final_retriever, get_session_history) |
| | print("✓ RAG chain created successfully.") |
| | |
| | |
| | print("\n--- 3. Generating Test Questions ---") |
| | generator = TestsetGenerator.from_langchain(generator_llm, critic_llm, embedding_model) |
| | |
| | |
| | testset = generator.generate_with_langchain_docs(documents, testset_size=5) |
| | print("✓ Testset generated.") |
| | |
| | |
| | print("\n--- 4. Running RAG Chain to Generate Answers ---") |
| | test_questions = [item['question'] for item in testset.to_pandas().to_dict('records')] |
| | ground_truths = [item['ground_truth'] for item in testset.to_pandas().to_dict('records')] |
| | |
| | answers = [] |
| | contexts = [] |
| | |
| | for i, question in enumerate(test_questions): |
| | print(f" Processing question {i+1}/{len(test_questions)}...") |
| | |
| | retrieved_docs = final_retriever.invoke(question) |
| | contexts.append([doc.page_content for doc in retrieved_docs]) |
| | |
| | config = {"configurable": {"session_id": str(uuid.uuid4())}} |
| | answer = await rag_chain.ainvoke({"question": question}, config=config) |
| | answers.append(answer) |
| |
|
| | |
| | print("\n--- 5. Evaluating Results with Ragas ---") |
| | eval_data = { |
| | 'question': test_questions, |
| | 'answer': answers, |
| | 'contexts': contexts, |
| | 'ground_truth': ground_truths |
| | } |
| | eval_dataset = Dataset.from_dict(eval_data) |
| | |
| | result = evaluate( |
| | eval_dataset, |
| | metrics=[faithfulness, answer_relevancy, context_precision, context_recall], |
| | llm=critic_llm, |
| | embeddings=embedding_model |
| | ) |
| | |
| | print("\n" + "="*60 + "\nEVALUATION RESULTS\n" + "="*60) |
| | print(result) |
| | |
| | |
| | print("\n--- 6. Saving Results ---") |
| | results_df = result.to_pandas() |
| | results_df.to_csv("evaluation_results.csv", index=False) |
| | print("✓ Evaluation results saved to evaluation_results.csv") |
| | |
| | print("\n" + "="*60 + "\nEVALUATION COMPLETE!\n" + "="*60) |
| | |
| | except Exception as e: |
| | print(f"\n✗ An error occurred during the process: {e}") |
| | import traceback |
| | traceback.print_exc() |
| |
|
| | if __name__ == "__main__": |
| | asyncio.run(main()) |