Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import uuid | |
| from dotenv import load_dotenv | |
| from datasets import Dataset | |
| import pandas as pd | |
| from typing import Sequence, Any, List | |
| # Ragas and LangChain components | |
| from ragas import evaluate | |
| from ragas.metrics import ( | |
| faithfulness, | |
| answer_relevancy, | |
| context_recall, | |
| context_precision, | |
| ) | |
| from ragas.testset import TestsetGenerator | |
| # NOTE: The 'evolutions' import has been completely removed. | |
| # Your specific RAG components from app.py | |
| from langchain_groq import ChatGroq | |
| from langchain_community.document_loaders import PyMuPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.storage import InMemoryStore | |
| from langchain_community.retrievers import BM25Retriever | |
| from langchain.retrievers import EnsembleRetriever, ContextualCompressionRetriever | |
| from langchain.retrievers.document_compressors.base import BaseDocumentCompressor | |
| from langchain_core.documents import Document | |
| from sentence_transformers.cross_encoder import CrossEncoder | |
| from rag_processor import create_rag_chain | |
| from langchain_community.chat_message_histories import ChatMessageHistory | |
| import fitz | |
| # Load environment variables | |
| load_dotenv() | |
| # --- Re-implementing LocalReranker from app.py --- | |
| class LocalReranker(BaseDocumentCompressor): | |
| model: Any | |
| top_n: int = 3 | |
| class Config: | |
| arbitrary_types_allowed = True | |
| def compress_documents(self, documents: Sequence[Document], query: str, callbacks=None) -> Sequence[Document]: | |
| if not documents: return [] | |
| pairs = [[query, doc.page_content] for doc in documents] | |
| scores = self.model.predict(pairs, show_progress_bar=False) | |
| doc_scores = list(zip(documents, scores)) | |
| sorted_doc_scores = sorted(doc_scores, key=lambda x: x[1], reverse=True) | |
| top_docs = [] | |
| for doc, score in sorted_doc_scores[:self.top_n]: | |
| doc.metadata['rerank_score'] = float(score) | |
| top_docs.append(doc) | |
| return top_docs | |
| # --- Helper Functions --- | |
| def load_pdf_with_fallback(filepath): | |
| """Load PDF using PyMuPDF""" | |
| try: | |
| docs = [] | |
| with fitz.open(filepath) as pdf_doc: | |
| for page_num, page in enumerate(pdf_doc): | |
| text = page.get_text() | |
| if text.strip(): | |
| docs.append(Document( | |
| page_content=text, | |
| metadata={"source": os.path.basename(filepath), "page": page_num + 1} | |
| )) | |
| if docs: | |
| print(f"✓ Successfully loaded PDF: {filepath}") | |
| return docs | |
| else: | |
| raise ValueError("No text content found in PDF.") | |
| except Exception as e: | |
| print(f"✗ PyMuPDF failed for {filepath}: {e}") | |
| raise | |
| async def main(): | |
| """Main execution function""" | |
| print("\n" + "="*60 + "\nSTARTING RAGAS EVALUATION\n" + "="*60) | |
| pdf_path = "uploads/Unit_-_1_Introduction.pdf" | |
| if not os.path.exists(pdf_path): | |
| print(f"✗ Error: PDF not found at {pdf_path}") | |
| return | |
| try: | |
| # --- 1. Setup Models --- | |
| print("\n--- 1. Initializing Models ---") | |
| groq_api_key = os.getenv("GROQ_API_KEY") | |
| if not groq_api_key or groq_api_key == "your_groq_api_key_here": | |
| raise ValueError("GROQ_API_KEY not found or is a placeholder.") | |
| generator_llm = ChatGroq(model="llama-3.1-8b-instant", api_key=groq_api_key) | |
| critic_llm = ChatGroq(model="llama-3.1-70b-versatile", api_key=groq_api_key) | |
| embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| reranker_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device='cpu') | |
| print("✓ Models initialized.") | |
| # --- 2. Setup RAG Pipeline --- | |
| print("\n--- 2. Setting up RAG Pipeline ---") | |
| documents = load_pdf_with_fallback(pdf_path) | |
| # Split documents | |
| parent_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=400) | |
| child_splitter = RecursiveCharacterTextSplitter(chunk_size=250, chunk_overlap=50) | |
| parent_docs = parent_splitter.split_documents(documents) | |
| doc_ids = [str(uuid.uuid4()) for _ in parent_docs] | |
| child_docs = [] | |
| for i, doc in enumerate(parent_docs): | |
| _id = doc_ids[i] | |
| sub_docs = child_splitter.split_documents([doc]) | |
| for child in sub_docs: | |
| child.metadata["doc_id"] = _id | |
| child_docs.extend(sub_docs) | |
| store = InMemoryStore() | |
| store.mset(list(zip(doc_ids, parent_docs))) | |
| vectorstore = FAISS.from_documents(child_docs, embedding_model) | |
| bm25_retriever = BM25Retriever.from_documents(child_docs, k=10) | |
| faiss_retriever = vectorstore.as_retriever(search_kwargs={"k": 10}) | |
| ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, faiss_retriever], weights=[0.4, 0.6]) | |
| reranker = LocalReranker(model=reranker_model, top_n=5) | |
| compression_retriever = ContextualCompressionRetriever(base_compressor=reranker, base_retriever=ensemble_retriever) | |
| def get_parents(docs: List[Document]) -> List[Document]: | |
| parent_ids = {d.metadata["doc_id"] for d in docs} | |
| return store.mget(list(parent_ids)) | |
| final_retriever = compression_retriever | get_parents | |
| message_histories = {} | |
| def get_session_history(session_id: str): | |
| if session_id not in message_histories: | |
| message_histories[session_id] = ChatMessageHistory() | |
| return message_histories[session_id] | |
| rag_chain = create_rag_chain(final_retriever, get_session_history) | |
| print("✓ RAG chain created successfully.") | |
| # --- 3. Generate Testset --- | |
| print("\n--- 3. Generating Test Questions ---") | |
| generator = TestsetGenerator.from_langchain(generator_llm, critic_llm, embedding_model) | |
| # Generate a simple test set without complex distributions | |
| testset = generator.generate_with_langchain_docs(documents, testset_size=5) | |
| print("✓ Testset generated.") | |
| # --- 4. Run RAG Chain on Testset --- | |
| print("\n--- 4. Running RAG Chain to Generate Answers ---") | |
| test_questions = [item['question'] for item in testset.to_pandas().to_dict('records')] | |
| ground_truths = [item['ground_truth'] for item in testset.to_pandas().to_dict('records')] | |
| answers = [] | |
| contexts = [] | |
| for i, question in enumerate(test_questions): | |
| print(f" Processing question {i+1}/{len(test_questions)}...") | |
| # Retrieve contexts | |
| retrieved_docs = final_retriever.invoke(question) | |
| contexts.append([doc.page_content for doc in retrieved_docs]) | |
| # Get answer from chain | |
| config = {"configurable": {"session_id": str(uuid.uuid4())}} | |
| answer = await rag_chain.ainvoke({"question": question}, config=config) | |
| answers.append(answer) | |
| # --- 5. Evaluate with Ragas --- | |
| print("\n--- 5. Evaluating Results with Ragas ---") | |
| eval_data = { | |
| 'question': test_questions, | |
| 'answer': answers, | |
| 'contexts': contexts, | |
| 'ground_truth': ground_truths | |
| } | |
| eval_dataset = Dataset.from_dict(eval_data) | |
| result = evaluate( | |
| eval_dataset, | |
| metrics=[faithfulness, answer_relevancy, context_precision, context_recall], | |
| llm=critic_llm, | |
| embeddings=embedding_model | |
| ) | |
| print("\n" + "="*60 + "\nEVALUATION RESULTS\n" + "="*60) | |
| print(result) | |
| # --- 6. Save Results --- | |
| print("\n--- 6. Saving Results ---") | |
| results_df = result.to_pandas() | |
| results_df.to_csv("evaluation_results.csv", index=False) | |
| print("✓ Evaluation results saved to evaluation_results.csv") | |
| print("\n" + "="*60 + "\nEVALUATION COMPLETE!\n" + "="*60) | |
| except Exception as e: | |
| print(f"\n✗ An error occurred during the process: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |