Spaces:
Sleeping
Sleeping
| """ | |
| RAG Evaluation Script using LangSmith | |
| This script fetches and analyzes RAG runs from LangSmith to evaluate | |
| context relevance and response quality. | |
| Usage: | |
| python scripts/eval_rag.py --project pythonic-rag --limit 100 | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any | |
| import pandas as pd | |
| from langsmith import Client | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Check if LangSmith API key is set | |
| if not os.getenv("LANGSMITH_API_KEY"): | |
| print("Error: LANGSMITH_API_KEY environment variable not set.") | |
| print("Please set it in your .env file or environment.") | |
| sys.exit(1) | |
| def parse_args(): | |
| """Parse command line arguments.""" | |
| parser = argparse.ArgumentParser(description="Evaluate RAG performance using LangSmith") | |
| parser.add_argument("--project", type=str, default="pythonic-rag", | |
| help="LangSmith project name") | |
| parser.add_argument("--limit", type=int, default=100, | |
| help="Maximum number of runs to evaluate") | |
| parser.add_argument("--days", type=int, default=7, | |
| help="Number of days to look back") | |
| parser.add_argument("--output", type=str, default="rag_evaluation.csv", | |
| help="Output file for evaluation results") | |
| return parser.parse_args() | |
| def get_runs(client: Client, project_name: str, limit: int, days: int): | |
| """Fetch RAG runs from LangSmith.""" | |
| # Calculate start time | |
| start_time = datetime.now() - timedelta(days=days) | |
| # Fetch retrieval runs | |
| retrieval_runs = list(client.list_runs( | |
| project_name=project_name, | |
| start_time=start_time, | |
| tags=["retrieval"], | |
| limit=limit | |
| )) | |
| print(f"Found {len(retrieval_runs)} retrieval runs") | |
| # Fetch generation runs | |
| generation_runs = list(client.list_runs( | |
| project_name=project_name, | |
| start_time=start_time, | |
| tags=["generation"], | |
| limit=limit | |
| )) | |
| print(f"Found {len(generation_runs)} generation runs") | |
| return retrieval_runs, generation_runs | |
| def analyze_retrieval_run(run: Dict[str, Any]) -> Dict[str, Any]: | |
| """Analyze a retrieval run.""" | |
| run_id = run.id | |
| query = run.inputs.get("query", "") | |
| # Get retrieved documents | |
| try: | |
| documents = run.outputs.get("retrieved_documents", []) | |
| doc_count = len(documents) | |
| except: | |
| documents = [] | |
| doc_count = 0 | |
| # Get metadata | |
| metadata = run.metadata or {} | |
| user_id = metadata.get("user_id", "unknown") | |
| session_id = metadata.get("session_id", "unknown") | |
| return { | |
| "run_id": run_id, | |
| "query": query, | |
| "doc_count": doc_count, | |
| "user_id": user_id, | |
| "session_id": session_id, | |
| "timestamp": run.start_time, | |
| } | |
| def analyze_generation_run(run: Dict[str, Any]) -> Dict[str, Any]: | |
| """Analyze a generation run.""" | |
| run_id = run.id | |
| query = run.inputs.get("query", "") | |
| context = run.inputs.get("context", "") | |
| response = run.outputs.get("response", "") | |
| # Get metadata | |
| metadata = run.metadata or {} | |
| user_id = metadata.get("user_id", "unknown") | |
| session_id = metadata.get("session_id", "unknown") | |
| parent_run_id = metadata.get("parent_run_id", None) | |
| # Calculate metrics | |
| context_length = len(context) | |
| response_length = len(response) | |
| return { | |
| "run_id": run_id, | |
| "parent_run_id": parent_run_id, | |
| "query": query, | |
| "context_length": context_length, | |
| "response_length": response_length, | |
| "user_id": user_id, | |
| "session_id": session_id, | |
| "timestamp": run.start_time, | |
| } | |
| def evaluate_rag_performance(retrieval_runs, generation_runs): | |
| """Evaluate RAG performance using the fetched runs.""" | |
| # Analyze retrieval runs | |
| retrieval_data = [analyze_retrieval_run(run) for run in retrieval_runs] | |
| retrieval_df = pd.DataFrame(retrieval_data) | |
| # Analyze generation runs | |
| generation_data = [analyze_generation_run(run) for run in generation_runs] | |
| generation_df = pd.DataFrame(generation_data) | |
| # Match retrieval and generation runs | |
| if not generation_df.empty and not retrieval_df.empty and 'parent_run_id' in generation_df.columns: | |
| # Create a dictionary mapping run_id to row index | |
| retrieval_idx = {run_id: i for i, run_id in enumerate(retrieval_df['run_id'])} | |
| # Add a column for matched flag | |
| generation_df['has_retrieval'] = False | |
| # For each generation run, find the matching retrieval run | |
| for i, row in generation_df.iterrows(): | |
| if row['parent_run_id'] in retrieval_idx: | |
| generation_df.at[i, 'has_retrieval'] = True | |
| # Add more metrics here as needed | |
| # Print summary statistics | |
| print("\n--- RAG Performance Summary ---") | |
| print(f"Total Retrieval Runs: {len(retrieval_df)}") | |
| print(f"Total Generation Runs: {len(generation_df)}") | |
| if not retrieval_df.empty and 'doc_count' in retrieval_df.columns: | |
| print(f"Average Documents Retrieved: {retrieval_df['doc_count'].mean():.2f}") | |
| if not generation_df.empty: | |
| if 'context_length' in generation_df.columns: | |
| print(f"Average Context Length: {generation_df['context_length'].mean():.2f} characters") | |
| if 'response_length' in generation_df.columns: | |
| print(f"Average Response Length: {generation_df['response_length'].mean():.2f} characters") | |
| if 'has_retrieval' in generation_df.columns: | |
| matched = generation_df['has_retrieval'].sum() | |
| print(f"Generation Runs with Matched Retrieval: {matched} ({matched/len(generation_df)*100:.2f}%)") | |
| return retrieval_df, generation_df | |
| def main(): | |
| """Main function.""" | |
| args = parse_args() | |
| # Initialize LangSmith client | |
| client = Client() | |
| print(f"Fetching runs from project '{args.project}' (limit: {args.limit}, days: {args.days})...") | |
| retrieval_runs, generation_runs = get_runs(client, args.project, args.limit, args.days) | |
| if not retrieval_runs and not generation_runs: | |
| print("No runs found. Check your project name and timeframe.") | |
| sys.exit(0) | |
| # Evaluate RAG performance | |
| retrieval_df, generation_df = evaluate_rag_performance(retrieval_runs, generation_runs) | |
| # Save to CSV | |
| if not retrieval_df.empty: | |
| retrieval_df.to_csv(f"retrieval_{args.output}", index=False) | |
| print(f"Saved retrieval data to retrieval_{args.output}") | |
| if not generation_df.empty: | |
| generation_df.to_csv(f"generation_{args.output}", index=False) | |
| print(f"Saved generation data to generation_{args.output}") | |
| print("\nEvaluation complete!") | |
| print("\nNext steps:") | |
| print("1. Review the CSV files for detailed analysis") | |
| print("2. Set up LangSmith feedback datasets for systematic evaluation") | |
| print("3. Use the data to improve prompts and retrieval performance") | |
| if __name__ == "__main__": | |
| main() |