Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Add src directory to Python path | |
| src_path = Path(__file__).parent.parent | |
| sys.path.append(str(src_path)) | |
| # Load environment variables | |
| root_dir = Path(__file__).parent.parent.parent | |
| env_path = root_dir / ".env" | |
| load_dotenv(env_path) | |
| from embedding.model import EmbeddingModel | |
| from langchain.chat_models import init_chat_model | |
| from langchain_core.rate_limiters import InMemoryRateLimiter | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from rag.chain import RAGChain | |
| from rag.document_loader import GridCodeLoader | |
| from rag.vectorstore import VectorStore | |
| from ragas import EvaluationDataset, RunConfig, evaluate | |
| from ragas.embeddings import LangchainEmbeddingsWrapper | |
| from ragas.llms import LangchainLLMWrapper | |
| from ragas.metrics import AnswerRelevancy, ContextPrecision, ContextRecall, Faithfulness | |
| from ragas.testset import TestsetGenerator | |
| def setup_rag(embedding_model_type): | |
| """Initialize RAG system for evaluation with specified embedding model.""" | |
| logger.info("Setting up RAG system...") | |
| # Load documents | |
| data_path = root_dir / "data" / "raw" / "grid_code.pdf" | |
| if not data_path.exists(): | |
| raise FileNotFoundError(f"PDF not found: {data_path}") | |
| loader = GridCodeLoader(str(data_path), pages=17) | |
| documents = loader.load_and_split() | |
| logger.info(f"Loaded {len(documents)} document chunks") | |
| # Initialize embedding model and vectorstore | |
| embedding_model = EmbeddingModel(model_type=embedding_model_type) | |
| vectorstore = VectorStore(embedding_model) | |
| vectorstore = vectorstore.create_vectorstore(documents) | |
| return RAGChain(vectorstore), documents | |
| def generate_test_dataset(documents, n_questions=30): | |
| """Generate synthetic test dataset using RAGAS or load it if it already exists.""" | |
| dataset_path = "../data/processed/synthetic_test_dataset.csv" | |
| # Check if the dataset already exists | |
| if os.path.exists(dataset_path): | |
| logger.info(f"Loading existing synthetic test dataset from {dataset_path}...") | |
| return pd.read_csv(dataset_path) | |
| logger.info("Generating synthetic test dataset...") | |
| # Initialize the rate limiter | |
| rate_limiter = InMemoryRateLimiter( | |
| requests_per_second=1, # Make a request once every 1 second | |
| check_every_n_seconds=0.1, # Check every 100 ms to see if allowed to make a request | |
| max_bucket_size=10, # Controls the maximum burst size | |
| ) | |
| # Initialize the chat model with the rate limiter | |
| model = init_chat_model("gpt-4o", temperature=0, rate_limiter=rate_limiter) | |
| # Initialize generator models | |
| generator_llm = LangchainLLMWrapper(model) | |
| generator_embeddings = LangchainEmbeddingsWrapper(OpenAIEmbeddings()) | |
| # Create test set generator | |
| generator = TestsetGenerator( | |
| llm=generator_llm, embedding_model=generator_embeddings | |
| ) | |
| # Generate synthetic test dataset | |
| dataset = generator.generate_with_langchain_docs( | |
| documents, testset_size=n_questions | |
| ) | |
| df = dataset.to_pandas() | |
| df.to_csv(dataset_path, index=False) # Save as CSV | |
| logger.info( | |
| f"Generated synthetic dataset with {len(df)} test cases and saved to '{dataset_path}'." | |
| ) | |
| return df | |
| def get_rag_response(rag_chain, question): | |
| """Get RAG response with retry logic""" | |
| return rag_chain.invoke(question) | |
| def evaluate_rag_system(rag_chain, test_dataset): | |
| """Evaluate RAG system using RAGAS metrics""" | |
| logger.info("Starting RAGAS evaluation...") | |
| # Get RAG responses for each question | |
| eval_data = [] | |
| # Iterate through DataFrame rows | |
| for _, row in test_dataset.iterrows(): | |
| # Add delay between requests | |
| time.sleep(3) # Wait 3 seconds between requests | |
| response = get_rag_response(rag_chain, row["user_input"]) | |
| eval_data.append( | |
| { | |
| "user_input": row["user_input"], | |
| "response": response["answer"], | |
| "retrieved_contexts": [doc.page_content for doc in response["context"]], | |
| "ground_truth": row["reference"], # Keep for faithfulness | |
| "reference": row["reference"], # Keep for context_recall | |
| } | |
| ) | |
| logger.info(f"Processed question: {row['user_input'][:50]}...") | |
| # Convert to pandas then to EvaluationDataset | |
| eval_df = pd.DataFrame(eval_data) | |
| logger.info("Sample evaluation data:") | |
| logger.info(eval_df.iloc[0].to_dict()) | |
| eval_dataset = EvaluationDataset.from_pandas(eval_df) | |
| # Initialize RAGAS evaluator | |
| evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o")) | |
| custom_run_config = RunConfig(timeout=360, max_workers=32) | |
| # Run evaluation | |
| results = evaluate( | |
| eval_dataset, | |
| metrics=[ | |
| Faithfulness(), # Measures how accurately the generated response reflects the ground truth. | |
| AnswerRelevancy(), # Assesses the relevance of the answer to the user's question. | |
| ContextRecall(), # Evaluates the ability of the model to retrieve relevant context from the documents. | |
| ContextPrecision(), # Measures the precision of the retrieved contexts in relation to the user's question. | |
| ], | |
| llm=evaluator_llm, | |
| run_config=custom_run_config, | |
| ) | |
| return results | |
| def run_evaluation_with_model(rag_chain, test_dataset, embedding_model_type): | |
| """Run evaluation with the specified embedding model type.""" | |
| logger.info(f"Running evaluation with {embedding_model_type} embeddings...") | |
| # Run evaluation | |
| results = evaluate_rag_system(rag_chain, test_dataset) | |
| logger.info(f"Evaluation Results for {embedding_model_type}:") | |
| logger.info(results) | |
| # Save results to CSV | |
| results_path = Path("../data/processed/") | |
| results_path.mkdir(parents=True, exist_ok=True) | |
| # Convert results to DataFrame | |
| results_df = pd.DataFrame([results]) | |
| results_df.to_csv( | |
| results_path / f"evaluation_results_{embedding_model_type}.csv", index=False | |
| ) | |
| logger.info( | |
| f"Saved evaluation results to evaluation_results_{embedding_model_type}.csv" | |
| ) | |
| def main(): | |
| """Main evaluation script""" | |
| logger.info("Starting RAG evaluation") | |
| try: | |
| # Setup RAG system with the fine-tuned embedding model | |
| rag_chain_finetuned, documents = setup_rag("finetuned") | |
| # Generate synthetic test dataset | |
| test_dataset = generate_test_dataset(documents) | |
| # Run evaluations with both embedding models | |
| run_evaluation_with_model(rag_chain_finetuned, test_dataset, "finetuned") | |
| # # Setup RAG system with the OpenAI embedding model | |
| # rag_chain_openai, _ = setup_rag("openai") | |
| # # Run evaluation with OpenAI embeddings | |
| # run_evaluation_with_model(rag_chain_openai, test_dataset, "openai") | |
| except Exception as e: | |
| logger.error(f"Evaluation failed: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| main() | |