|
|
|
|
|
""" |
|
|
COMPREHENSIVE EVALUATION SYSTEM FOR AMAZON MULTIMODAL RAG |
|
|
---------------------------------------------------------- |
|
|
Evaluates: |
|
|
1. Retrieval Quality (Accuracy, Recall, MRR, MAP) |
|
|
2. Response Relevance (Semantic Similarity, Product Mention, Category Match) |
|
|
3. System Performance (Response Time, Success Rate) |
|
|
|
|
|
Outputs results to Excel file with detailed metrics. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import time |
|
|
import logging |
|
|
import argparse |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from typing import List, Dict, Optional, Tuple |
|
|
from collections import defaultdict |
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
|
|
|
from rag import CLIPEmbedder, ChromaVectorStore, clean_text |
|
|
from llm import generate_answer, LLMClient, OpenAILLMClient |
|
|
import config |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RetrievalEvaluator: |
|
|
"""Evaluates retrieval quality using multiple metrics.""" |
|
|
|
|
|
def __init__(self, persist_dir="chromadb_store"): |
|
|
self.embedder = CLIPEmbedder() |
|
|
self.vectorstore = ChromaVectorStore(persist_dir) |
|
|
|
|
|
def evaluate_single_query( |
|
|
self, |
|
|
query_text: str, |
|
|
ground_truth_category: str, |
|
|
top_k: int = 10 |
|
|
) -> Dict: |
|
|
""" |
|
|
Evaluate a single query against ground truth. |
|
|
Returns metrics for this query. |
|
|
""" |
|
|
|
|
|
query_emb = self.embedder.embed_text(query_text) |
|
|
|
|
|
|
|
|
results = self.vectorstore.query(query_emb, top_k=top_k) |
|
|
|
|
|
retrieved_categories = [ |
|
|
meta.get("category", "") |
|
|
for meta in results["metadatas"][0] |
|
|
] |
|
|
retrieved_distances = results["distances"][0] |
|
|
|
|
|
|
|
|
metrics = {} |
|
|
|
|
|
|
|
|
metrics["accuracy_at_1"] = 1.0 if retrieved_categories[0] == ground_truth_category else 0.0 |
|
|
|
|
|
|
|
|
for k in [1, 5, 10]: |
|
|
if k <= len(retrieved_categories): |
|
|
metrics[f"recall_at_{k}"] = 1.0 if ground_truth_category in retrieved_categories[:k] else 0.0 |
|
|
else: |
|
|
metrics[f"recall_at_{k}"] = 0.0 |
|
|
|
|
|
|
|
|
try: |
|
|
rank = retrieved_categories.index(ground_truth_category) + 1 |
|
|
metrics["reciprocal_rank"] = 1.0 / rank |
|
|
except ValueError: |
|
|
metrics["reciprocal_rank"] = 0.0 |
|
|
|
|
|
|
|
|
relevant_positions = [ |
|
|
i + 1 for i, cat in enumerate(retrieved_categories[:top_k]) |
|
|
if cat == ground_truth_category |
|
|
] |
|
|
|
|
|
if relevant_positions: |
|
|
precisions = [pos_idx / pos for pos_idx, pos in enumerate(relevant_positions, 1)] |
|
|
metrics["average_precision"] = sum(precisions) / len(relevant_positions) |
|
|
else: |
|
|
metrics["average_precision"] = 0.0 |
|
|
|
|
|
|
|
|
metrics["avg_distance"] = float(np.mean(retrieved_distances[:5])) |
|
|
metrics["top1_distance"] = float(retrieved_distances[0]) |
|
|
|
|
|
return metrics |
|
|
|
|
|
def evaluate_dataset( |
|
|
self, |
|
|
csv_path: str, |
|
|
max_queries: int = 100, |
|
|
top_k: int = 10 |
|
|
) -> Tuple[pd.DataFrame, Dict]: |
|
|
""" |
|
|
Evaluate retrieval on a dataset. |
|
|
Returns: (detailed_results_df, aggregate_metrics) |
|
|
""" |
|
|
logger.info(f"📊 Starting retrieval evaluation on {max_queries} queries...") |
|
|
|
|
|
|
|
|
df = pd.read_csv(csv_path, nrows=max_queries) |
|
|
|
|
|
all_results = [] |
|
|
|
|
|
for idx, row in df.iterrows(): |
|
|
query_id = row.get("uniq_id", f"query_{idx}") |
|
|
product_name = row.get("product_name", "") |
|
|
product_text = row.get("product_text", "") |
|
|
ground_truth_category = row.get("main_category", "") |
|
|
|
|
|
|
|
|
query_text = clean_text(f"{product_name} {product_text}") |
|
|
|
|
|
try: |
|
|
|
|
|
metrics = self.evaluate_single_query( |
|
|
query_text=query_text, |
|
|
ground_truth_category=ground_truth_category, |
|
|
top_k=top_k |
|
|
) |
|
|
|
|
|
|
|
|
result = { |
|
|
"query_id": query_id, |
|
|
"query_text": query_text[:100], |
|
|
"ground_truth_category": ground_truth_category, |
|
|
**metrics |
|
|
} |
|
|
all_results.append(result) |
|
|
|
|
|
if (idx + 1) % 10 == 0: |
|
|
logger.info(f"Evaluated {idx + 1}/{len(df)} queries...") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error evaluating query {query_id}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
results_df = pd.DataFrame(all_results) |
|
|
|
|
|
|
|
|
aggregate_metrics = { |
|
|
"total_queries": len(results_df), |
|
|
"accuracy_at_1": results_df["accuracy_at_1"].mean(), |
|
|
"recall_at_1": results_df["recall_at_1"].mean(), |
|
|
"recall_at_5": results_df["recall_at_5"].mean(), |
|
|
"recall_at_10": results_df["recall_at_10"].mean(), |
|
|
"mean_reciprocal_rank": results_df["reciprocal_rank"].mean(), |
|
|
"mean_average_precision": results_df["average_precision"].mean(), |
|
|
"avg_top1_distance": results_df["top1_distance"].mean(), |
|
|
"avg_distance_top5": results_df["avg_distance"].mean(), |
|
|
} |
|
|
|
|
|
logger.info("✅ Retrieval evaluation complete!") |
|
|
|
|
|
return results_df, aggregate_metrics |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ResponseEvaluator: |
|
|
"""Evaluates LLM response quality and relevance.""" |
|
|
|
|
|
def __init__(self, llm_client=None): |
|
|
self.embedder = CLIPEmbedder() |
|
|
self.llm_client = llm_client |
|
|
|
|
|
def evaluate_single_response( |
|
|
self, |
|
|
query: str, |
|
|
response: str, |
|
|
retrieved_products: List[Dict], |
|
|
ground_truth_category: str, |
|
|
image_path: Optional[str] = None |
|
|
) -> Dict: |
|
|
""" |
|
|
Evaluate a single LLM response. |
|
|
""" |
|
|
metrics = {} |
|
|
|
|
|
|
|
|
metrics["response_length"] = len(response) |
|
|
metrics["response_word_count"] = len(response.split()) |
|
|
|
|
|
|
|
|
|
|
|
mentioned_products = 0 |
|
|
for product in retrieved_products[:3]: |
|
|
product_name = product.get("name", "").lower() |
|
|
if product_name and product_name in response.lower(): |
|
|
mentioned_products += 1 |
|
|
|
|
|
metrics["product_mention_rate"] = mentioned_products / min(3, len(retrieved_products)) if retrieved_products else 0.0 |
|
|
|
|
|
|
|
|
metrics["category_mentioned"] = 1.0 if ground_truth_category.lower() in response.lower() else 0.0 |
|
|
|
|
|
|
|
|
|
|
|
hedging_phrases = ["not sure", "don't know", "cannot", "can't tell", "unclear", "unsure"] |
|
|
metrics["has_hedging"] = 1.0 if any(phrase in response.lower() for phrase in hedging_phrases) else 0.0 |
|
|
|
|
|
|
|
|
comparison_words = ["compare", "comparison", "both", "versus", "vs", "while", "whereas"] |
|
|
metrics["has_comparison"] = 1.0 if any(word in response.lower() for word in comparison_words) else 0.0 |
|
|
|
|
|
|
|
|
try: |
|
|
query_emb = self.embedder.embed_text(query) |
|
|
response_emb = self.embedder.embed_text(response) |
|
|
|
|
|
|
|
|
dot_product = np.dot(query_emb, response_emb) |
|
|
metrics["semantic_similarity"] = float(dot_product) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not compute semantic similarity: {e}") |
|
|
metrics["semantic_similarity"] = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
if retrieved_products: |
|
|
top_product_category = retrieved_products[0].get("category", "") |
|
|
metrics["matches_top_product_category"] = 1.0 if top_product_category == ground_truth_category else 0.0 |
|
|
else: |
|
|
metrics["matches_top_product_category"] = 0.0 |
|
|
|
|
|
return metrics |
|
|
|
|
|
def evaluate_end_to_end( |
|
|
self, |
|
|
csv_path: str, |
|
|
max_queries: int = 50, |
|
|
mode: str = "zero-shot", |
|
|
persist_dir: str = "chromadb_store" |
|
|
) -> Tuple[pd.DataFrame, Dict]: |
|
|
""" |
|
|
End-to-end evaluation: retrieval + LLM response. |
|
|
""" |
|
|
logger.info(f"🚀 Starting end-to-end evaluation on {max_queries} queries...") |
|
|
|
|
|
|
|
|
df = pd.read_csv(csv_path, nrows=max_queries) |
|
|
|
|
|
all_results = [] |
|
|
|
|
|
for idx, row in df.iterrows(): |
|
|
query_id = row.get("uniq_id", f"query_{idx}") |
|
|
product_name = row.get("product_name", "") |
|
|
product_text = row.get("product_text", "") |
|
|
ground_truth_category = row.get("main_category", "") |
|
|
|
|
|
|
|
|
query = f"Tell me about this product: {product_name}" |
|
|
|
|
|
try: |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
result = generate_answer( |
|
|
user_question=query, |
|
|
mode=mode, |
|
|
persist_dir=persist_dir, |
|
|
llm_client=self.llm_client |
|
|
) |
|
|
|
|
|
response_time = time.time() - start_time |
|
|
|
|
|
response = result.get("answer", "") |
|
|
retrieved_products = result.get("products", []) |
|
|
|
|
|
|
|
|
response_metrics = self.evaluate_single_response( |
|
|
query=query, |
|
|
response=response, |
|
|
retrieved_products=retrieved_products, |
|
|
ground_truth_category=ground_truth_category |
|
|
) |
|
|
|
|
|
|
|
|
result_data = { |
|
|
"query_id": query_id, |
|
|
"query": query[:100], |
|
|
"response": response[:200], |
|
|
"ground_truth_category": ground_truth_category, |
|
|
"response_time_seconds": response_time, |
|
|
"num_products_retrieved": len(retrieved_products), |
|
|
**response_metrics |
|
|
} |
|
|
|
|
|
all_results.append(result_data) |
|
|
|
|
|
if (idx + 1) % 5 == 0: |
|
|
logger.info(f"Evaluated {idx + 1}/{len(df)} queries... (avg time: {response_time:.2f}s)") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error evaluating query {query_id}: {e}") |
|
|
all_results.append({ |
|
|
"query_id": query_id, |
|
|
"query": query[:100], |
|
|
"response": f"ERROR: {str(e)}", |
|
|
"ground_truth_category": ground_truth_category, |
|
|
"response_time_seconds": 0, |
|
|
"num_products_retrieved": 0, |
|
|
}) |
|
|
continue |
|
|
|
|
|
|
|
|
results_df = pd.DataFrame(all_results) |
|
|
|
|
|
|
|
|
aggregate_metrics = { |
|
|
"total_queries": len(results_df), |
|
|
"avg_response_time": results_df["response_time_seconds"].mean(), |
|
|
"avg_response_length": results_df["response_length"].mean() if "response_length" in results_df else 0, |
|
|
"avg_word_count": results_df["response_word_count"].mean() if "response_word_count" in results_df else 0, |
|
|
"avg_product_mention_rate": results_df["product_mention_rate"].mean() if "product_mention_rate" in results_df else 0, |
|
|
"category_mention_rate": results_df["category_mentioned"].mean() if "category_mentioned" in results_df else 0, |
|
|
"avg_semantic_similarity": results_df["semantic_similarity"].mean() if "semantic_similarity" in results_df else 0, |
|
|
"hedging_rate": results_df["has_hedging"].mean() if "has_hedging" in results_df else 0, |
|
|
"comparison_rate": results_df["has_comparison"].mean() if "has_comparison" in results_df else 0, |
|
|
"top_product_match_rate": results_df["matches_top_product_category"].mean() if "matches_top_product_category" in results_df else 0, |
|
|
} |
|
|
|
|
|
logger.info("✅ End-to-end evaluation complete!") |
|
|
|
|
|
return results_df, aggregate_metrics |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_to_excel( |
|
|
retrieval_results: Optional[pd.DataFrame] = None, |
|
|
retrieval_metrics: Optional[Dict] = None, |
|
|
response_results: Optional[pd.DataFrame] = None, |
|
|
response_metrics: Optional[Dict] = None, |
|
|
output_path: str = "evaluation_results.xlsx" |
|
|
): |
|
|
""" |
|
|
Export evaluation results to Excel file with multiple sheets. |
|
|
""" |
|
|
logger.info(f"💾 Exporting results to {output_path}...") |
|
|
|
|
|
with pd.ExcelWriter(output_path, engine='openpyxl') as writer: |
|
|
|
|
|
|
|
|
summary_data = [] |
|
|
|
|
|
if retrieval_metrics: |
|
|
summary_data.append({"Category": "RETRIEVAL METRICS", "Metric": "", "Value": ""}) |
|
|
for key, value in retrieval_metrics.items(): |
|
|
summary_data.append({ |
|
|
"Category": "Retrieval", |
|
|
"Metric": key, |
|
|
"Value": f"{value:.4f}" if isinstance(value, (int, float)) else value |
|
|
}) |
|
|
|
|
|
if response_metrics: |
|
|
summary_data.append({"Category": "", "Metric": "", "Value": ""}) |
|
|
summary_data.append({"Category": "RESPONSE METRICS", "Metric": "", "Value": ""}) |
|
|
for key, value in response_metrics.items(): |
|
|
summary_data.append({ |
|
|
"Category": "Response", |
|
|
"Metric": key, |
|
|
"Value": f"{value:.4f}" if isinstance(value, (int, float)) else value |
|
|
}) |
|
|
|
|
|
if summary_data: |
|
|
summary_df = pd.DataFrame(summary_data) |
|
|
summary_df.to_excel(writer, sheet_name="Summary", index=False) |
|
|
|
|
|
|
|
|
if retrieval_results is not None and not retrieval_results.empty: |
|
|
retrieval_results.to_excel(writer, sheet_name="Retrieval_Details", index=False) |
|
|
|
|
|
|
|
|
if response_results is not None and not response_results.empty: |
|
|
response_results.to_excel(writer, sheet_name="Response_Details", index=False) |
|
|
|
|
|
|
|
|
if retrieval_metrics: |
|
|
viz_data = { |
|
|
"Metric": [ |
|
|
"Accuracy@1", |
|
|
"Recall@5", |
|
|
"Recall@10", |
|
|
"MRR", |
|
|
"MAP" |
|
|
], |
|
|
"Value": [ |
|
|
retrieval_metrics.get("accuracy_at_1", 0), |
|
|
retrieval_metrics.get("recall_at_5", 0), |
|
|
retrieval_metrics.get("recall_at_10", 0), |
|
|
retrieval_metrics.get("mean_reciprocal_rank", 0), |
|
|
retrieval_metrics.get("mean_average_precision", 0), |
|
|
] |
|
|
} |
|
|
viz_df = pd.DataFrame(viz_data) |
|
|
viz_df.to_excel(writer, sheet_name="Chart_Data", index=False) |
|
|
|
|
|
logger.info(f"✅ Results exported to {output_path}") |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("📊 EVALUATION SUMMARY") |
|
|
print("="*60) |
|
|
|
|
|
if retrieval_metrics: |
|
|
print("\n🔍 RETRIEVAL METRICS:") |
|
|
print(f" • Accuracy@1: {retrieval_metrics.get('accuracy_at_1', 0):.3f}") |
|
|
print(f" • Recall@5: {retrieval_metrics.get('recall_at_5', 0):.3f}") |
|
|
print(f" • Recall@10: {retrieval_metrics.get('recall_at_10', 0):.3f}") |
|
|
print(f" • MRR: {retrieval_metrics.get('mean_reciprocal_rank', 0):.3f}") |
|
|
print(f" • MAP: {retrieval_metrics.get('mean_average_precision', 0):.3f}") |
|
|
|
|
|
if response_metrics: |
|
|
print("\n💬 RESPONSE METRICS:") |
|
|
print(f" • Avg Response Time: {response_metrics.get('avg_response_time', 0):.2f}s") |
|
|
print(f" • Avg Word Count: {response_metrics.get('avg_word_count', 0):.1f}") |
|
|
print(f" • Product Mention Rate: {response_metrics.get('avg_product_mention_rate', 0):.3f}") |
|
|
print(f" • Semantic Similarity: {response_metrics.get('avg_semantic_similarity', 0):.3f}") |
|
|
print(f" • Category Match Rate: {response_metrics.get('top_product_match_rate', 0):.3f}") |
|
|
|
|
|
print("\n" + "="*60) |
|
|
print(f"📁 Full results saved to: {output_path}") |
|
|
print("="*60 + "\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_full_evaluation( |
|
|
csv_path: str, |
|
|
persist_dir: str = "chromadb_store", |
|
|
max_retrieval_queries: int = 100, |
|
|
max_response_queries: int = 50, |
|
|
output_path: str = "evaluation_results.xlsx", |
|
|
mode: str = "zero-shot" |
|
|
): |
|
|
""" |
|
|
Run complete evaluation pipeline: |
|
|
1. Retrieval evaluation |
|
|
2. Response evaluation |
|
|
3. Export to Excel |
|
|
""" |
|
|
print("\n🚀 Starting Full Evaluation Pipeline...\n") |
|
|
|
|
|
|
|
|
logger.info("Initializing LLM client...") |
|
|
try: |
|
|
if config.USE_OPENAI: |
|
|
llm_client = OpenAILLMClient( |
|
|
api_key=config.OPENAI_API_KEY, |
|
|
model=config.OPENAI_MODEL |
|
|
) |
|
|
else: |
|
|
llm_client = LLMClient(model_name=config.LLM_MODEL) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize LLM: {e}") |
|
|
llm_client = None |
|
|
|
|
|
|
|
|
retrieval_evaluator = RetrievalEvaluator(persist_dir) |
|
|
retrieval_results, retrieval_metrics = retrieval_evaluator.evaluate_dataset( |
|
|
csv_path=csv_path, |
|
|
max_queries=max_retrieval_queries |
|
|
) |
|
|
|
|
|
|
|
|
response_results = None |
|
|
response_metrics = None |
|
|
|
|
|
if llm_client: |
|
|
response_evaluator = ResponseEvaluator(llm_client=llm_client) |
|
|
response_results, response_metrics = response_evaluator.evaluate_end_to_end( |
|
|
csv_path=csv_path, |
|
|
max_queries=max_response_queries, |
|
|
mode=mode, |
|
|
persist_dir=persist_dir |
|
|
) |
|
|
else: |
|
|
logger.warning("⚠️ Skipping response evaluation (LLM not available)") |
|
|
|
|
|
|
|
|
export_to_excel( |
|
|
retrieval_results=retrieval_results, |
|
|
retrieval_metrics=retrieval_metrics, |
|
|
response_results=response_results, |
|
|
response_metrics=response_metrics, |
|
|
output_path=output_path |
|
|
) |
|
|
|
|
|
print("\n✅ Full evaluation pipeline complete!\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Comprehensive Evaluation for Amazon Multimodal RAG" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
"--csv", |
|
|
type=str, |
|
|
required=True, |
|
|
help="Path to CSV dataset" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
"--db", |
|
|
type=str, |
|
|
default="chromadb_store", |
|
|
help="Path to ChromaDB directory" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
"--output", |
|
|
type=str, |
|
|
default="evaluation_results.xlsx", |
|
|
help="Output Excel file path" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
"--mode", |
|
|
type=str, |
|
|
default="zero-shot", |
|
|
choices=["zero-shot", "few-shot", "multi-shot"], |
|
|
help="Prompt mode for LLM" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
"--max-retrieval", |
|
|
type=int, |
|
|
default=100, |
|
|
help="Max queries for retrieval evaluation" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
"--max-response", |
|
|
type=int, |
|
|
default=50, |
|
|
help="Max queries for response evaluation (slower)" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
"--retrieval-only", |
|
|
action="store_true", |
|
|
help="Run only retrieval evaluation (faster)" |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.retrieval_only: |
|
|
|
|
|
evaluator = RetrievalEvaluator(args.db) |
|
|
results_df, metrics = evaluator.evaluate_dataset( |
|
|
csv_path=args.csv, |
|
|
max_queries=args.max_retrieval |
|
|
) |
|
|
export_to_excel( |
|
|
retrieval_results=results_df, |
|
|
retrieval_metrics=metrics, |
|
|
output_path=args.output |
|
|
) |
|
|
else: |
|
|
|
|
|
run_full_evaluation( |
|
|
csv_path=args.csv, |
|
|
persist_dir=args.db, |
|
|
max_retrieval_queries=args.max_retrieval, |
|
|
max_response_queries=args.max_response, |
|
|
output_path=args.output, |
|
|
mode=args.mode |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|