# -*- coding: utf-8 -*- """ COMPREHENSIVE EVALUATION SYSTEM FOR AMAZON MULTIMODAL RAG ---------------------------------------------------------- Evaluates: 1. Retrieval Quality (Accuracy, Recall, MRR, MAP) 2. Response Relevance (Semantic Similarity, Product Mention, Category Match) 3. System Performance (Response Time, Success Rate) Outputs results to Excel file with detailed metrics. """ import os import time import logging import argparse import numpy as np import pandas as pd from typing import List, Dict, Optional, Tuple from collections import defaultdict import warnings warnings.filterwarnings('ignore') # Import from your project from rag import CLIPEmbedder, ChromaVectorStore, clean_text from llm import generate_answer, LLMClient, OpenAILLMClient import config # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # =============================================================== # 1. RETRIEVAL EVALUATION METRICS # =============================================================== class RetrievalEvaluator: """Evaluates retrieval quality using multiple metrics.""" def __init__(self, persist_dir="chromadb_store"): self.embedder = CLIPEmbedder() self.vectorstore = ChromaVectorStore(persist_dir) def evaluate_single_query( self, query_text: str, ground_truth_category: str, top_k: int = 10 ) -> Dict: """ Evaluate a single query against ground truth. Returns metrics for this query. """ # Get query embedding query_emb = self.embedder.embed_text(query_text) # Retrieve top-k results results = self.vectorstore.query(query_emb, top_k=top_k) retrieved_categories = [ meta.get("category", "") for meta in results["metadatas"][0] ] retrieved_distances = results["distances"][0] # Calculate metrics metrics = {} # Accuracy@K (is top-1 correct?) metrics["accuracy_at_1"] = 1.0 if retrieved_categories[0] == ground_truth_category else 0.0 # Recall@K (is ground truth in top K?) for k in [1, 5, 10]: if k <= len(retrieved_categories): metrics[f"recall_at_{k}"] = 1.0 if ground_truth_category in retrieved_categories[:k] else 0.0 else: metrics[f"recall_at_{k}"] = 0.0 # Mean Reciprocal Rank (MRR) try: rank = retrieved_categories.index(ground_truth_category) + 1 metrics["reciprocal_rank"] = 1.0 / rank except ValueError: metrics["reciprocal_rank"] = 0.0 # Average Precision (AP) relevant_positions = [ i + 1 for i, cat in enumerate(retrieved_categories[:top_k]) if cat == ground_truth_category ] if relevant_positions: precisions = [pos_idx / pos for pos_idx, pos in enumerate(relevant_positions, 1)] metrics["average_precision"] = sum(precisions) / len(relevant_positions) else: metrics["average_precision"] = 0.0 # Average distance of retrieved results (lower is better) metrics["avg_distance"] = float(np.mean(retrieved_distances[:5])) metrics["top1_distance"] = float(retrieved_distances[0]) return metrics def evaluate_dataset( self, csv_path: str, max_queries: int = 100, top_k: int = 10 ) -> Tuple[pd.DataFrame, Dict]: """ Evaluate retrieval on a dataset. Returns: (detailed_results_df, aggregate_metrics) """ logger.info(f"šŸ“Š Starting retrieval evaluation on {max_queries} queries...") # Load queries from CSV df = pd.read_csv(csv_path, nrows=max_queries) all_results = [] for idx, row in df.iterrows(): query_id = row.get("uniq_id", f"query_{idx}") product_name = row.get("product_name", "") product_text = row.get("product_text", "") ground_truth_category = row.get("main_category", "") # Create query text query_text = clean_text(f"{product_name} {product_text}") try: # Evaluate single query metrics = self.evaluate_single_query( query_text=query_text, ground_truth_category=ground_truth_category, top_k=top_k ) # Store results result = { "query_id": query_id, "query_text": query_text[:100], # Truncate for display "ground_truth_category": ground_truth_category, **metrics } all_results.append(result) if (idx + 1) % 10 == 0: logger.info(f"Evaluated {idx + 1}/{len(df)} queries...") except Exception as e: logger.error(f"Error evaluating query {query_id}: {e}") continue # Create DataFrame with detailed results results_df = pd.DataFrame(all_results) # Calculate aggregate metrics aggregate_metrics = { "total_queries": len(results_df), "accuracy_at_1": results_df["accuracy_at_1"].mean(), "recall_at_1": results_df["recall_at_1"].mean(), "recall_at_5": results_df["recall_at_5"].mean(), "recall_at_10": results_df["recall_at_10"].mean(), "mean_reciprocal_rank": results_df["reciprocal_rank"].mean(), "mean_average_precision": results_df["average_precision"].mean(), "avg_top1_distance": results_df["top1_distance"].mean(), "avg_distance_top5": results_df["avg_distance"].mean(), } logger.info("āœ… Retrieval evaluation complete!") return results_df, aggregate_metrics # =============================================================== # 2. RESPONSE RELEVANCE EVALUATION # =============================================================== class ResponseEvaluator: """Evaluates LLM response quality and relevance.""" def __init__(self, llm_client=None): self.embedder = CLIPEmbedder() self.llm_client = llm_client def evaluate_single_response( self, query: str, response: str, retrieved_products: List[Dict], ground_truth_category: str, image_path: Optional[str] = None ) -> Dict: """ Evaluate a single LLM response. """ metrics = {} # 1. Response Length metrics["response_length"] = len(response) metrics["response_word_count"] = len(response.split()) # 2. Product Mention Rate # Check if product names are mentioned in response mentioned_products = 0 for product in retrieved_products[:3]: # Check top-3 products product_name = product.get("name", "").lower() if product_name and product_name in response.lower(): mentioned_products += 1 metrics["product_mention_rate"] = mentioned_products / min(3, len(retrieved_products)) if retrieved_products else 0.0 # 3. Category Mention metrics["category_mentioned"] = 1.0 if ground_truth_category.lower() in response.lower() else 0.0 # 4. Response Quality Indicators # Check for hedging language (uncertainty) hedging_phrases = ["not sure", "don't know", "cannot", "can't tell", "unclear", "unsure"] metrics["has_hedging"] = 1.0 if any(phrase in response.lower() for phrase in hedging_phrases) else 0.0 # Check for comparison (indicates analytical response) comparison_words = ["compare", "comparison", "both", "versus", "vs", "while", "whereas"] metrics["has_comparison"] = 1.0 if any(word in response.lower() for word in comparison_words) else 0.0 # 5. Semantic Similarity (query-response relevance) try: query_emb = self.embedder.embed_text(query) response_emb = self.embedder.embed_text(response) # Cosine similarity (1 - distance) dot_product = np.dot(query_emb, response_emb) metrics["semantic_similarity"] = float(dot_product) except Exception as e: logger.warning(f"Could not compute semantic similarity: {e}") metrics["semantic_similarity"] = 0.0 # 6. Relevance to Retrieved Products # Check if response aligns with top retrieved product category if retrieved_products: top_product_category = retrieved_products[0].get("category", "") metrics["matches_top_product_category"] = 1.0 if top_product_category == ground_truth_category else 0.0 else: metrics["matches_top_product_category"] = 0.0 return metrics def evaluate_end_to_end( self, csv_path: str, max_queries: int = 50, mode: str = "zero-shot", persist_dir: str = "chromadb_store" ) -> Tuple[pd.DataFrame, Dict]: """ End-to-end evaluation: retrieval + LLM response. """ logger.info(f"šŸš€ Starting end-to-end evaluation on {max_queries} queries...") # Load queries df = pd.read_csv(csv_path, nrows=max_queries) all_results = [] for idx, row in df.iterrows(): query_id = row.get("uniq_id", f"query_{idx}") product_name = row.get("product_name", "") product_text = row.get("product_text", "") ground_truth_category = row.get("main_category", "") # Create query query = f"Tell me about this product: {product_name}" try: # Measure response time start_time = time.time() # Generate answer result = generate_answer( user_question=query, mode=mode, persist_dir=persist_dir, llm_client=self.llm_client ) response_time = time.time() - start_time response = result.get("answer", "") retrieved_products = result.get("products", []) # Evaluate response response_metrics = self.evaluate_single_response( query=query, response=response, retrieved_products=retrieved_products, ground_truth_category=ground_truth_category ) # Store results result_data = { "query_id": query_id, "query": query[:100], "response": response[:200], # Truncated for Excel "ground_truth_category": ground_truth_category, "response_time_seconds": response_time, "num_products_retrieved": len(retrieved_products), **response_metrics } all_results.append(result_data) if (idx + 1) % 5 == 0: logger.info(f"Evaluated {idx + 1}/{len(df)} queries... (avg time: {response_time:.2f}s)") except Exception as e: logger.error(f"Error evaluating query {query_id}: {e}") all_results.append({ "query_id": query_id, "query": query[:100], "response": f"ERROR: {str(e)}", "ground_truth_category": ground_truth_category, "response_time_seconds": 0, "num_products_retrieved": 0, }) continue # Create DataFrame results_df = pd.DataFrame(all_results) # Calculate aggregate metrics aggregate_metrics = { "total_queries": len(results_df), "avg_response_time": results_df["response_time_seconds"].mean(), "avg_response_length": results_df["response_length"].mean() if "response_length" in results_df else 0, "avg_word_count": results_df["response_word_count"].mean() if "response_word_count" in results_df else 0, "avg_product_mention_rate": results_df["product_mention_rate"].mean() if "product_mention_rate" in results_df else 0, "category_mention_rate": results_df["category_mentioned"].mean() if "category_mentioned" in results_df else 0, "avg_semantic_similarity": results_df["semantic_similarity"].mean() if "semantic_similarity" in results_df else 0, "hedging_rate": results_df["has_hedging"].mean() if "has_hedging" in results_df else 0, "comparison_rate": results_df["has_comparison"].mean() if "has_comparison" in results_df else 0, "top_product_match_rate": results_df["matches_top_product_category"].mean() if "matches_top_product_category" in results_df else 0, } logger.info("āœ… End-to-end evaluation complete!") return results_df, aggregate_metrics # =============================================================== # 3. EXCEL EXPORT FUNCTIONALITY # =============================================================== def export_to_excel( retrieval_results: Optional[pd.DataFrame] = None, retrieval_metrics: Optional[Dict] = None, response_results: Optional[pd.DataFrame] = None, response_metrics: Optional[Dict] = None, output_path: str = "evaluation_results.xlsx" ): """ Export evaluation results to Excel file with multiple sheets. """ logger.info(f"šŸ’¾ Exporting results to {output_path}...") with pd.ExcelWriter(output_path, engine='openpyxl') as writer: # Sheet 1: Summary summary_data = [] if retrieval_metrics: summary_data.append({"Category": "RETRIEVAL METRICS", "Metric": "", "Value": ""}) for key, value in retrieval_metrics.items(): summary_data.append({ "Category": "Retrieval", "Metric": key, "Value": f"{value:.4f}" if isinstance(value, (int, float)) else value }) if response_metrics: summary_data.append({"Category": "", "Metric": "", "Value": ""}) summary_data.append({"Category": "RESPONSE METRICS", "Metric": "", "Value": ""}) for key, value in response_metrics.items(): summary_data.append({ "Category": "Response", "Metric": key, "Value": f"{value:.4f}" if isinstance(value, (int, float)) else value }) if summary_data: summary_df = pd.DataFrame(summary_data) summary_df.to_excel(writer, sheet_name="Summary", index=False) # Sheet 2: Retrieval Details if retrieval_results is not None and not retrieval_results.empty: retrieval_results.to_excel(writer, sheet_name="Retrieval_Details", index=False) # Sheet 3: Response Details if response_results is not None and not response_results.empty: response_results.to_excel(writer, sheet_name="Response_Details", index=False) # Sheet 4: Visualizations Data (for charts in Excel) if retrieval_metrics: viz_data = { "Metric": [ "Accuracy@1", "Recall@5", "Recall@10", "MRR", "MAP" ], "Value": [ retrieval_metrics.get("accuracy_at_1", 0), retrieval_metrics.get("recall_at_5", 0), retrieval_metrics.get("recall_at_10", 0), retrieval_metrics.get("mean_reciprocal_rank", 0), retrieval_metrics.get("mean_average_precision", 0), ] } viz_df = pd.DataFrame(viz_data) viz_df.to_excel(writer, sheet_name="Chart_Data", index=False) logger.info(f"āœ… Results exported to {output_path}") # Print summary to console print("\n" + "="*60) print("šŸ“Š EVALUATION SUMMARY") print("="*60) if retrieval_metrics: print("\nšŸ” RETRIEVAL METRICS:") print(f" • Accuracy@1: {retrieval_metrics.get('accuracy_at_1', 0):.3f}") print(f" • Recall@5: {retrieval_metrics.get('recall_at_5', 0):.3f}") print(f" • Recall@10: {retrieval_metrics.get('recall_at_10', 0):.3f}") print(f" • MRR: {retrieval_metrics.get('mean_reciprocal_rank', 0):.3f}") print(f" • MAP: {retrieval_metrics.get('mean_average_precision', 0):.3f}") if response_metrics: print("\nšŸ’¬ RESPONSE METRICS:") print(f" • Avg Response Time: {response_metrics.get('avg_response_time', 0):.2f}s") print(f" • Avg Word Count: {response_metrics.get('avg_word_count', 0):.1f}") print(f" • Product Mention Rate: {response_metrics.get('avg_product_mention_rate', 0):.3f}") print(f" • Semantic Similarity: {response_metrics.get('avg_semantic_similarity', 0):.3f}") print(f" • Category Match Rate: {response_metrics.get('top_product_match_rate', 0):.3f}") print("\n" + "="*60) print(f"šŸ“ Full results saved to: {output_path}") print("="*60 + "\n") # =============================================================== # 4. MAIN EVALUATION PIPELINE # =============================================================== def run_full_evaluation( csv_path: str, persist_dir: str = "chromadb_store", max_retrieval_queries: int = 100, max_response_queries: int = 50, output_path: str = "evaluation_results.xlsx", mode: str = "zero-shot" ): """ Run complete evaluation pipeline: 1. Retrieval evaluation 2. Response evaluation 3. Export to Excel """ print("\nšŸš€ Starting Full Evaluation Pipeline...\n") # Initialize LLM client (reuse for all queries) logger.info("Initializing LLM client...") try: if config.USE_OPENAI: llm_client = OpenAILLMClient( api_key=config.OPENAI_API_KEY, model=config.OPENAI_MODEL ) else: llm_client = LLMClient(model_name=config.LLM_MODEL) except Exception as e: logger.error(f"Failed to initialize LLM: {e}") llm_client = None # 1. Retrieval Evaluation retrieval_evaluator = RetrievalEvaluator(persist_dir) retrieval_results, retrieval_metrics = retrieval_evaluator.evaluate_dataset( csv_path=csv_path, max_queries=max_retrieval_queries ) # 2. Response Evaluation (only if LLM is available) response_results = None response_metrics = None if llm_client: response_evaluator = ResponseEvaluator(llm_client=llm_client) response_results, response_metrics = response_evaluator.evaluate_end_to_end( csv_path=csv_path, max_queries=max_response_queries, mode=mode, persist_dir=persist_dir ) else: logger.warning("āš ļø Skipping response evaluation (LLM not available)") # 3. Export to Excel export_to_excel( retrieval_results=retrieval_results, retrieval_metrics=retrieval_metrics, response_results=response_results, response_metrics=response_metrics, output_path=output_path ) print("\nāœ… Full evaluation pipeline complete!\n") # =============================================================== # 5. CLI INTERFACE # =============================================================== def main(): parser = argparse.ArgumentParser( description="Comprehensive Evaluation for Amazon Multimodal RAG" ) parser.add_argument( "--csv", type=str, required=True, help="Path to CSV dataset" ) parser.add_argument( "--db", type=str, default="chromadb_store", help="Path to ChromaDB directory" ) parser.add_argument( "--output", type=str, default="evaluation_results.xlsx", help="Output Excel file path" ) parser.add_argument( "--mode", type=str, default="zero-shot", choices=["zero-shot", "few-shot", "multi-shot"], help="Prompt mode for LLM" ) parser.add_argument( "--max-retrieval", type=int, default=100, help="Max queries for retrieval evaluation" ) parser.add_argument( "--max-response", type=int, default=50, help="Max queries for response evaluation (slower)" ) parser.add_argument( "--retrieval-only", action="store_true", help="Run only retrieval evaluation (faster)" ) args = parser.parse_args() if args.retrieval_only: # Quick retrieval-only evaluation evaluator = RetrievalEvaluator(args.db) results_df, metrics = evaluator.evaluate_dataset( csv_path=args.csv, max_queries=args.max_retrieval ) export_to_excel( retrieval_results=results_df, retrieval_metrics=metrics, output_path=args.output ) else: # Full evaluation run_full_evaluation( csv_path=args.csv, persist_dir=args.db, max_retrieval_queries=args.max_retrieval, max_response_queries=args.max_response, output_path=args.output, mode=args.mode ) if __name__ == "__main__": main()