Easonwangzk's picture
Initial commit with Git LFS
ab26b91
# -*- coding: utf-8 -*-
"""
COMPREHENSIVE EVALUATION SYSTEM FOR AMAZON MULTIMODAL RAG
----------------------------------------------------------
Evaluates:
1. Retrieval Quality (Accuracy, Recall, MRR, MAP)
2. Response Relevance (Semantic Similarity, Product Mention, Category Match)
3. System Performance (Response Time, Success Rate)
Outputs results to Excel file with detailed metrics.
"""
import os
import time
import logging
import argparse
import numpy as np
import pandas as pd
from typing import List, Dict, Optional, Tuple
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')
# Import from your project
from rag import CLIPEmbedder, ChromaVectorStore, clean_text
from llm import generate_answer, LLMClient, OpenAILLMClient
import config
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ===============================================================
# 1. RETRIEVAL EVALUATION METRICS
# ===============================================================
class RetrievalEvaluator:
"""Evaluates retrieval quality using multiple metrics."""
def __init__(self, persist_dir="chromadb_store"):
self.embedder = CLIPEmbedder()
self.vectorstore = ChromaVectorStore(persist_dir)
def evaluate_single_query(
self,
query_text: str,
ground_truth_category: str,
top_k: int = 10
) -> Dict:
"""
Evaluate a single query against ground truth.
Returns metrics for this query.
"""
# Get query embedding
query_emb = self.embedder.embed_text(query_text)
# Retrieve top-k results
results = self.vectorstore.query(query_emb, top_k=top_k)
retrieved_categories = [
meta.get("category", "")
for meta in results["metadatas"][0]
]
retrieved_distances = results["distances"][0]
# Calculate metrics
metrics = {}
# Accuracy@K (is top-1 correct?)
metrics["accuracy_at_1"] = 1.0 if retrieved_categories[0] == ground_truth_category else 0.0
# Recall@K (is ground truth in top K?)
for k in [1, 5, 10]:
if k <= len(retrieved_categories):
metrics[f"recall_at_{k}"] = 1.0 if ground_truth_category in retrieved_categories[:k] else 0.0
else:
metrics[f"recall_at_{k}"] = 0.0
# Mean Reciprocal Rank (MRR)
try:
rank = retrieved_categories.index(ground_truth_category) + 1
metrics["reciprocal_rank"] = 1.0 / rank
except ValueError:
metrics["reciprocal_rank"] = 0.0
# Average Precision (AP)
relevant_positions = [
i + 1 for i, cat in enumerate(retrieved_categories[:top_k])
if cat == ground_truth_category
]
if relevant_positions:
precisions = [pos_idx / pos for pos_idx, pos in enumerate(relevant_positions, 1)]
metrics["average_precision"] = sum(precisions) / len(relevant_positions)
else:
metrics["average_precision"] = 0.0
# Average distance of retrieved results (lower is better)
metrics["avg_distance"] = float(np.mean(retrieved_distances[:5]))
metrics["top1_distance"] = float(retrieved_distances[0])
return metrics
def evaluate_dataset(
self,
csv_path: str,
max_queries: int = 100,
top_k: int = 10
) -> Tuple[pd.DataFrame, Dict]:
"""
Evaluate retrieval on a dataset.
Returns: (detailed_results_df, aggregate_metrics)
"""
logger.info(f"📊 Starting retrieval evaluation on {max_queries} queries...")
# Load queries from CSV
df = pd.read_csv(csv_path, nrows=max_queries)
all_results = []
for idx, row in df.iterrows():
query_id = row.get("uniq_id", f"query_{idx}")
product_name = row.get("product_name", "")
product_text = row.get("product_text", "")
ground_truth_category = row.get("main_category", "")
# Create query text
query_text = clean_text(f"{product_name} {product_text}")
try:
# Evaluate single query
metrics = self.evaluate_single_query(
query_text=query_text,
ground_truth_category=ground_truth_category,
top_k=top_k
)
# Store results
result = {
"query_id": query_id,
"query_text": query_text[:100], # Truncate for display
"ground_truth_category": ground_truth_category,
**metrics
}
all_results.append(result)
if (idx + 1) % 10 == 0:
logger.info(f"Evaluated {idx + 1}/{len(df)} queries...")
except Exception as e:
logger.error(f"Error evaluating query {query_id}: {e}")
continue
# Create DataFrame with detailed results
results_df = pd.DataFrame(all_results)
# Calculate aggregate metrics
aggregate_metrics = {
"total_queries": len(results_df),
"accuracy_at_1": results_df["accuracy_at_1"].mean(),
"recall_at_1": results_df["recall_at_1"].mean(),
"recall_at_5": results_df["recall_at_5"].mean(),
"recall_at_10": results_df["recall_at_10"].mean(),
"mean_reciprocal_rank": results_df["reciprocal_rank"].mean(),
"mean_average_precision": results_df["average_precision"].mean(),
"avg_top1_distance": results_df["top1_distance"].mean(),
"avg_distance_top5": results_df["avg_distance"].mean(),
}
logger.info("✅ Retrieval evaluation complete!")
return results_df, aggregate_metrics
# ===============================================================
# 2. RESPONSE RELEVANCE EVALUATION
# ===============================================================
class ResponseEvaluator:
"""Evaluates LLM response quality and relevance."""
def __init__(self, llm_client=None):
self.embedder = CLIPEmbedder()
self.llm_client = llm_client
def evaluate_single_response(
self,
query: str,
response: str,
retrieved_products: List[Dict],
ground_truth_category: str,
image_path: Optional[str] = None
) -> Dict:
"""
Evaluate a single LLM response.
"""
metrics = {}
# 1. Response Length
metrics["response_length"] = len(response)
metrics["response_word_count"] = len(response.split())
# 2. Product Mention Rate
# Check if product names are mentioned in response
mentioned_products = 0
for product in retrieved_products[:3]: # Check top-3 products
product_name = product.get("name", "").lower()
if product_name and product_name in response.lower():
mentioned_products += 1
metrics["product_mention_rate"] = mentioned_products / min(3, len(retrieved_products)) if retrieved_products else 0.0
# 3. Category Mention
metrics["category_mentioned"] = 1.0 if ground_truth_category.lower() in response.lower() else 0.0
# 4. Response Quality Indicators
# Check for hedging language (uncertainty)
hedging_phrases = ["not sure", "don't know", "cannot", "can't tell", "unclear", "unsure"]
metrics["has_hedging"] = 1.0 if any(phrase in response.lower() for phrase in hedging_phrases) else 0.0
# Check for comparison (indicates analytical response)
comparison_words = ["compare", "comparison", "both", "versus", "vs", "while", "whereas"]
metrics["has_comparison"] = 1.0 if any(word in response.lower() for word in comparison_words) else 0.0
# 5. Semantic Similarity (query-response relevance)
try:
query_emb = self.embedder.embed_text(query)
response_emb = self.embedder.embed_text(response)
# Cosine similarity (1 - distance)
dot_product = np.dot(query_emb, response_emb)
metrics["semantic_similarity"] = float(dot_product)
except Exception as e:
logger.warning(f"Could not compute semantic similarity: {e}")
metrics["semantic_similarity"] = 0.0
# 6. Relevance to Retrieved Products
# Check if response aligns with top retrieved product category
if retrieved_products:
top_product_category = retrieved_products[0].get("category", "")
metrics["matches_top_product_category"] = 1.0 if top_product_category == ground_truth_category else 0.0
else:
metrics["matches_top_product_category"] = 0.0
return metrics
def evaluate_end_to_end(
self,
csv_path: str,
max_queries: int = 50,
mode: str = "zero-shot",
persist_dir: str = "chromadb_store"
) -> Tuple[pd.DataFrame, Dict]:
"""
End-to-end evaluation: retrieval + LLM response.
"""
logger.info(f"🚀 Starting end-to-end evaluation on {max_queries} queries...")
# Load queries
df = pd.read_csv(csv_path, nrows=max_queries)
all_results = []
for idx, row in df.iterrows():
query_id = row.get("uniq_id", f"query_{idx}")
product_name = row.get("product_name", "")
product_text = row.get("product_text", "")
ground_truth_category = row.get("main_category", "")
# Create query
query = f"Tell me about this product: {product_name}"
try:
# Measure response time
start_time = time.time()
# Generate answer
result = generate_answer(
user_question=query,
mode=mode,
persist_dir=persist_dir,
llm_client=self.llm_client
)
response_time = time.time() - start_time
response = result.get("answer", "")
retrieved_products = result.get("products", [])
# Evaluate response
response_metrics = self.evaluate_single_response(
query=query,
response=response,
retrieved_products=retrieved_products,
ground_truth_category=ground_truth_category
)
# Store results
result_data = {
"query_id": query_id,
"query": query[:100],
"response": response[:200], # Truncated for Excel
"ground_truth_category": ground_truth_category,
"response_time_seconds": response_time,
"num_products_retrieved": len(retrieved_products),
**response_metrics
}
all_results.append(result_data)
if (idx + 1) % 5 == 0:
logger.info(f"Evaluated {idx + 1}/{len(df)} queries... (avg time: {response_time:.2f}s)")
except Exception as e:
logger.error(f"Error evaluating query {query_id}: {e}")
all_results.append({
"query_id": query_id,
"query": query[:100],
"response": f"ERROR: {str(e)}",
"ground_truth_category": ground_truth_category,
"response_time_seconds": 0,
"num_products_retrieved": 0,
})
continue
# Create DataFrame
results_df = pd.DataFrame(all_results)
# Calculate aggregate metrics
aggregate_metrics = {
"total_queries": len(results_df),
"avg_response_time": results_df["response_time_seconds"].mean(),
"avg_response_length": results_df["response_length"].mean() if "response_length" in results_df else 0,
"avg_word_count": results_df["response_word_count"].mean() if "response_word_count" in results_df else 0,
"avg_product_mention_rate": results_df["product_mention_rate"].mean() if "product_mention_rate" in results_df else 0,
"category_mention_rate": results_df["category_mentioned"].mean() if "category_mentioned" in results_df else 0,
"avg_semantic_similarity": results_df["semantic_similarity"].mean() if "semantic_similarity" in results_df else 0,
"hedging_rate": results_df["has_hedging"].mean() if "has_hedging" in results_df else 0,
"comparison_rate": results_df["has_comparison"].mean() if "has_comparison" in results_df else 0,
"top_product_match_rate": results_df["matches_top_product_category"].mean() if "matches_top_product_category" in results_df else 0,
}
logger.info("✅ End-to-end evaluation complete!")
return results_df, aggregate_metrics
# ===============================================================
# 3. EXCEL EXPORT FUNCTIONALITY
# ===============================================================
def export_to_excel(
retrieval_results: Optional[pd.DataFrame] = None,
retrieval_metrics: Optional[Dict] = None,
response_results: Optional[pd.DataFrame] = None,
response_metrics: Optional[Dict] = None,
output_path: str = "evaluation_results.xlsx"
):
"""
Export evaluation results to Excel file with multiple sheets.
"""
logger.info(f"💾 Exporting results to {output_path}...")
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
# Sheet 1: Summary
summary_data = []
if retrieval_metrics:
summary_data.append({"Category": "RETRIEVAL METRICS", "Metric": "", "Value": ""})
for key, value in retrieval_metrics.items():
summary_data.append({
"Category": "Retrieval",
"Metric": key,
"Value": f"{value:.4f}" if isinstance(value, (int, float)) else value
})
if response_metrics:
summary_data.append({"Category": "", "Metric": "", "Value": ""})
summary_data.append({"Category": "RESPONSE METRICS", "Metric": "", "Value": ""})
for key, value in response_metrics.items():
summary_data.append({
"Category": "Response",
"Metric": key,
"Value": f"{value:.4f}" if isinstance(value, (int, float)) else value
})
if summary_data:
summary_df = pd.DataFrame(summary_data)
summary_df.to_excel(writer, sheet_name="Summary", index=False)
# Sheet 2: Retrieval Details
if retrieval_results is not None and not retrieval_results.empty:
retrieval_results.to_excel(writer, sheet_name="Retrieval_Details", index=False)
# Sheet 3: Response Details
if response_results is not None and not response_results.empty:
response_results.to_excel(writer, sheet_name="Response_Details", index=False)
# Sheet 4: Visualizations Data (for charts in Excel)
if retrieval_metrics:
viz_data = {
"Metric": [
"Accuracy@1",
"Recall@5",
"Recall@10",
"MRR",
"MAP"
],
"Value": [
retrieval_metrics.get("accuracy_at_1", 0),
retrieval_metrics.get("recall_at_5", 0),
retrieval_metrics.get("recall_at_10", 0),
retrieval_metrics.get("mean_reciprocal_rank", 0),
retrieval_metrics.get("mean_average_precision", 0),
]
}
viz_df = pd.DataFrame(viz_data)
viz_df.to_excel(writer, sheet_name="Chart_Data", index=False)
logger.info(f"✅ Results exported to {output_path}")
# Print summary to console
print("\n" + "="*60)
print("📊 EVALUATION SUMMARY")
print("="*60)
if retrieval_metrics:
print("\n🔍 RETRIEVAL METRICS:")
print(f" • Accuracy@1: {retrieval_metrics.get('accuracy_at_1', 0):.3f}")
print(f" • Recall@5: {retrieval_metrics.get('recall_at_5', 0):.3f}")
print(f" • Recall@10: {retrieval_metrics.get('recall_at_10', 0):.3f}")
print(f" • MRR: {retrieval_metrics.get('mean_reciprocal_rank', 0):.3f}")
print(f" • MAP: {retrieval_metrics.get('mean_average_precision', 0):.3f}")
if response_metrics:
print("\n💬 RESPONSE METRICS:")
print(f" • Avg Response Time: {response_metrics.get('avg_response_time', 0):.2f}s")
print(f" • Avg Word Count: {response_metrics.get('avg_word_count', 0):.1f}")
print(f" • Product Mention Rate: {response_metrics.get('avg_product_mention_rate', 0):.3f}")
print(f" • Semantic Similarity: {response_metrics.get('avg_semantic_similarity', 0):.3f}")
print(f" • Category Match Rate: {response_metrics.get('top_product_match_rate', 0):.3f}")
print("\n" + "="*60)
print(f"📁 Full results saved to: {output_path}")
print("="*60 + "\n")
# ===============================================================
# 4. MAIN EVALUATION PIPELINE
# ===============================================================
def run_full_evaluation(
csv_path: str,
persist_dir: str = "chromadb_store",
max_retrieval_queries: int = 100,
max_response_queries: int = 50,
output_path: str = "evaluation_results.xlsx",
mode: str = "zero-shot"
):
"""
Run complete evaluation pipeline:
1. Retrieval evaluation
2. Response evaluation
3. Export to Excel
"""
print("\n🚀 Starting Full Evaluation Pipeline...\n")
# Initialize LLM client (reuse for all queries)
logger.info("Initializing LLM client...")
try:
if config.USE_OPENAI:
llm_client = OpenAILLMClient(
api_key=config.OPENAI_API_KEY,
model=config.OPENAI_MODEL
)
else:
llm_client = LLMClient(model_name=config.LLM_MODEL)
except Exception as e:
logger.error(f"Failed to initialize LLM: {e}")
llm_client = None
# 1. Retrieval Evaluation
retrieval_evaluator = RetrievalEvaluator(persist_dir)
retrieval_results, retrieval_metrics = retrieval_evaluator.evaluate_dataset(
csv_path=csv_path,
max_queries=max_retrieval_queries
)
# 2. Response Evaluation (only if LLM is available)
response_results = None
response_metrics = None
if llm_client:
response_evaluator = ResponseEvaluator(llm_client=llm_client)
response_results, response_metrics = response_evaluator.evaluate_end_to_end(
csv_path=csv_path,
max_queries=max_response_queries,
mode=mode,
persist_dir=persist_dir
)
else:
logger.warning("⚠️ Skipping response evaluation (LLM not available)")
# 3. Export to Excel
export_to_excel(
retrieval_results=retrieval_results,
retrieval_metrics=retrieval_metrics,
response_results=response_results,
response_metrics=response_metrics,
output_path=output_path
)
print("\n✅ Full evaluation pipeline complete!\n")
# ===============================================================
# 5. CLI INTERFACE
# ===============================================================
def main():
parser = argparse.ArgumentParser(
description="Comprehensive Evaluation for Amazon Multimodal RAG"
)
parser.add_argument(
"--csv",
type=str,
required=True,
help="Path to CSV dataset"
)
parser.add_argument(
"--db",
type=str,
default="chromadb_store",
help="Path to ChromaDB directory"
)
parser.add_argument(
"--output",
type=str,
default="evaluation_results.xlsx",
help="Output Excel file path"
)
parser.add_argument(
"--mode",
type=str,
default="zero-shot",
choices=["zero-shot", "few-shot", "multi-shot"],
help="Prompt mode for LLM"
)
parser.add_argument(
"--max-retrieval",
type=int,
default=100,
help="Max queries for retrieval evaluation"
)
parser.add_argument(
"--max-response",
type=int,
default=50,
help="Max queries for response evaluation (slower)"
)
parser.add_argument(
"--retrieval-only",
action="store_true",
help="Run only retrieval evaluation (faster)"
)
args = parser.parse_args()
if args.retrieval_only:
# Quick retrieval-only evaluation
evaluator = RetrievalEvaluator(args.db)
results_df, metrics = evaluator.evaluate_dataset(
csv_path=args.csv,
max_queries=args.max_retrieval
)
export_to_excel(
retrieval_results=results_df,
retrieval_metrics=metrics,
output_path=args.output
)
else:
# Full evaluation
run_full_evaluation(
csv_path=args.csv,
persist_dir=args.db,
max_retrieval_queries=args.max_retrieval,
max_response_queries=args.max_response,
output_path=args.output,
mode=args.mode
)
if __name__ == "__main__":
main()