adaptive_rag / retrieval_evaluation.py
lanny xu
delete vectara
3141e61
"""
检索效果评估模块
提供多种评估指标和方法,用于评估RAG系统中检索结果的质量
"""
import time
import json
import numpy as np
from typing import List, Dict, Tuple, Any, Optional, Union
from dataclasses import dataclass, asdict
try:
from langchain_core.documents import Document
except ImportError:
try:
from langchain_core.documents import Document
except ImportError:
from langchain.schema import Document
from sklearn.metrics import ndcg_score, precision_score, recall_score, f1_score
from sentence_transformers import SentenceTransformer, util
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import torch
@dataclass
class RetrievalResult:
"""检索结果数据类"""
query: str
retrieved_docs: List[Document]
relevant_docs: List[Document] # 真实相关的文档
retrieval_time: float
scores: Optional[List[float]] = None # 检索分数
@dataclass
class EvaluationMetrics:
"""评估指标数据类"""
precision_at_k: Dict[int, float]
recall_at_k: Dict[int, float]
f1_at_k: Dict[int, float]
map_score: float # 平均精度均值
mrr: float # 平均倒数排名
ndcg_at_k: Dict[int, float]
coverage: float # 覆盖率
diversity: float # 多样性
novelty: float # 新颖性
latency: float # 平均检索延迟
class RetrievalEvaluator:
"""检索效果评估器"""
def __init__(self, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
"""
初始化评估器
Args:
embedding_model: 用于计算语义相似度的嵌入模型
"""
self.embedding_model = SentenceTransformer(embedding_model)
def evaluate_retrieval(self, results: List[RetrievalResult], k_values: List[int] = [1, 3, 5, 10]) -> EvaluationMetrics:
"""
评估检索结果
Args:
results: 检索结果列表
k_values: 要计算的k值列表
Returns:
评估指标
"""
precision_at_k = {}
recall_at_k = {}
f1_at_k = {}
ndcg_at_k = {}
total_precision = {k: 0 for k in k_values}
total_recall = {k: 0 for k in k_values}
total_f1 = {k: 0 for k in k_values}
total_ndcg = {k: 0 for k in k_values}
all_precisions = []
all_reciprocal_ranks = []
all_latencies = []
for result in results:
query = result.query
retrieved_docs = result.retrieved_docs
relevant_docs = result.relevant_docs
retrieval_time = result.retrieval_time
all_latencies.append(retrieval_time)
# 获取相关文档的ID或内容
relevant_ids = set()
for doc in relevant_docs:
# 使用文档内容作为ID,实际应用中可以使用文档ID
doc_id = doc.page_content[:50] # 使用前50个字符作为ID
relevant_ids.add(doc_id)
# 计算每个k值的指标
for k in k_values:
retrieved_k = retrieved_docs[:k]
retrieved_k_ids = set()
for doc in retrieved_k:
doc_id = doc.page_content[:50]
retrieved_k_ids.add(doc_id)
# 计算交集
intersection = len(relevant_ids.intersection(retrieved_k_ids))
# 计算Precision@K
precision_k = intersection / k if k > 0 else 0
total_precision[k] += precision_k
# 计算Recall@K
recall_k = intersection / len(relevant_ids) if len(relevant_ids) > 0 else 0
total_recall[k] += recall_k
# 计算F1@K
if precision_k + recall_k > 0:
f1_k = 2 * (precision_k * recall_k) / (precision_k + recall_k)
else:
f1_k = 0
total_f1[k] += f1_k
# 计算NDCG@K
if result.scores:
# 创建相关性分数 (1表示相关,0表示不相关)
relevance_scores = []
for doc in retrieved_k:
doc_id = doc.page_content[:50]
relevance = 1 if doc_id in relevant_ids else 0
relevance_scores.append(relevance)
# 理想排序 (所有相关文档排在前面)
ideal_relevance = sorted(relevance_scores, reverse=True)
# 计算NDCG
if len(relevance_scores) > 1 and sum(ideal_relevance) > 0:
try:
ndcg_k = ndcg_score([ideal_relevance], [relevance_scores], k=k)
total_ndcg[k] += ndcg_k
except:
# 如果计算失败,使用简化的NDCG计算
dcg = 0
idcg = 0
for i, rel in enumerate(relevance_scores):
dcg += rel / np.log2(i + 2) if rel > 0 else 0
for i, rel in enumerate(ideal_relevance):
idcg += rel / np.log2(i + 2) if rel > 0 else 0
ndcg_k = dcg / idcg if idcg > 0 else 0
total_ndcg[k] += ndcg_k
else:
total_ndcg[k] += 1.0 # 如果没有相关文档或只有一个文档,NDCG为1
# 计算平均精度 (AP)
precisions = []
for i, doc in enumerate(retrieved_docs):
doc_id = doc.page_content[:50]
if doc_id in relevant_ids:
precision_at_i = len(relevant_ids.intersection(set(
d.page_content[:50] for d in retrieved_docs[:i+1]
))) / (i + 1)
precisions.append(precision_at_i)
ap = sum(precisions) / len(relevant_ids) if precisions else 0
all_precisions.append(ap)
# 计算倒数排名 (RR)
for i, doc in enumerate(retrieved_docs):
doc_id = doc.page_content[:50]
if doc_id in relevant_ids:
rr = 1 / (i + 1)
all_reciprocal_ranks.append(rr)
break
else:
all_reciprocal_ranks.append(0)
# 计算平均指标
num_results = len(results)
for k in k_values:
precision_at_k[k] = total_precision[k] / num_results
recall_at_k[k] = total_recall[k] / num_results
f1_at_k[k] = total_f1[k] / num_results
ndcg_at_k[k] = total_ndcg[k] / num_results
map_score = sum(all_precisions) / num_results if all_precisions else 0
mrr = sum(all_reciprocal_ranks) / num_results if all_reciprocal_ranks else 0
latency = sum(all_latencies) / num_results if all_latencies else 0
# 计算覆盖率、多样性和新颖性
coverage = self._calculate_coverage(results)
diversity = self._calculate_diversity(results)
novelty = self._calculate_novelty(results)
return EvaluationMetrics(
precision_at_k=precision_at_k,
recall_at_k=recall_at_k,
f1_at_k=f1_at_k,
map_score=map_score,
mrr=mrr,
ndcg_at_k=ndcg_at_k,
coverage=coverage,
diversity=diversity,
novelty=novelty,
latency=latency
)
def _calculate_coverage(self, results: List[RetrievalResult]) -> float:
"""计算覆盖率 - 检索到的唯一文档数与总文档数的比例"""
all_retrieved = set()
all_relevant = set()
for result in results:
for doc in result.retrieved_docs:
doc_id = doc.page_content[:50]
all_retrieved.add(doc_id)
for doc in result.relevant_docs:
doc_id = doc.page_content[:50]
all_relevant.add(doc_id)
coverage = len(all_retrieved) / len(all_relevant) if all_relevant else 0
return coverage
def _calculate_diversity(self, results: List[RetrievalResult]) -> float:
"""计算多样性 - 检索结果之间的平均语义差异"""
all_similarities = []
for result in results:
if len(result.retrieved_docs) < 2:
continue
# 获取文档嵌入
doc_texts = [doc.page_content for doc in result.retrieved_docs]
embeddings = self.embedding_model.encode(doc_texts, convert_to_tensor=True)
# 计算文档之间的余弦相似度
cos_sim = util.pytorch_cos_sim(embeddings, embeddings)
# 获取上三角矩阵(排除对角线)
upper_triangle_indices = torch.triu_indices(len(cos_sim), len(cos_sim), offset=1)
similarities = cos_sim[upper_triangle_indices[0], upper_triangle_indices[1]]
# 多样性 = 1 - 平均相似度
diversity = 1 - similarities.mean().item()
all_similarities.append(diversity)
return sum(all_similarities) / len(all_similarities) if all_similarities else 0
def _calculate_novelty(self, results: List[RetrievalResult]) -> float:
"""计算新颖性 - 检索结果中不重复内容的比例"""
total_docs = 0
unique_docs = set()
for result in results:
for doc in result.retrieved_docs:
total_docs += 1
doc_id = doc.page_content[:50]
unique_docs.add(doc_id)
novelty = len(unique_docs) / total_docs if total_docs > 0 else 0
return novelty
def compare_retrievers(self, retriever_results: Dict[str, List[RetrievalResult]],
k_values: List[int] = [1, 3, 5, 10]) -> Dict[str, EvaluationMetrics]:
"""
比较多个检索器的性能
Args:
retriever_results: 检索器名称到检索结果的映射
k_values: 要计算的k值列表
Returns:
检索器名称到评估指标的映射
"""
metrics = {}
for name, results in retriever_results.items():
print(f"评估检索器: {name}")
metrics[name] = self.evaluate_retrieval(results, k_values)
return metrics
def generate_report(self, metrics: Dict[str, EvaluationMetrics],
save_path: Optional[str] = None) -> str:
"""
生成评估报告
Args:
metrics: 检索器名称到评估指标的映射
save_path: 报告保存路径
Returns:
报告文本
"""
report = []
report.append("# 检索效果评估报告\n")
# 创建比较表
df_data = []
for name, metric in metrics.items():
row = {"检索器": name}
row.update({
f"Precision@{k}": f"{metric.precision_at_k[k]:.4f}"
for k in sorted(metric.precision_at_k.keys())
})
row.update({
f"Recall@{k}": f"{metric.recall_at_k[k]:.4f}"
for k in sorted(metric.recall_at_k.keys())
})
row.update({
f"F1@{k}": f"{metric.f1_at_k[k]:.4f}"
for k in sorted(metric.f1_at_k.keys())
})
row.update({
f"NDCG@{k}": f"{metric.ndcg_at_k[k]:.4f}"
for k in sorted(metric.ndcg_at_k.keys())
})
row.update({
"MAP": f"{metric.map_score:.4f}",
"MRR": f"{metric.mrr:.4f}",
"覆盖率": f"{metric.coverage:.4f}",
"多样性": f"{metric.diversity:.4f}",
"新颖性": f"{metric.novelty:.4f}",
"延迟(ms)": f"{metric.latency*1000:.2f}"
})
df_data.append(row)
df = pd.DataFrame(df_data)
report.append("## 指标比较表\n")
report.append(df.to_string(index=False))
report.append("\n\n")
# 添加指标解释
report.append("## 指标解释\n")
report.append("- **Precision@K**: 前K个结果中相关文档的比例\n")
report.append("- **Recall@K**: 前K个结果中相关文档占所有相关文档的比例\n")
report.append("- **F1@K**: Precision和Recall的调和平均数\n")
report.append("- **NDCG@K**: 归一化折扣累积增益,考虑排序位置\n")
report.append("- **MAP**: 平均精度均值,所有查询的平均精度\n")
report.append("- **MRR**: 平均倒数排名,第一个相关文档排名的倒数平均值\n")
report.append("- **覆盖率**: 检索到的唯一文档数与总文档数的比例\n")
report.append("- **多样性**: 检索结果之间的平均语义差异\n")
report.append("- **新颖性**: 检索结果中不重复内容的比例\n")
report.append("- **延迟**: 平均检索时间\n")
# 添加最佳检索器
report.append("## 最佳检索器\n")
# 找出每个指标的最佳检索器
best_metrics = {}
for metric_name in ["precision_at_5", "recall_at_5", "f1_at_5", "ndcg_at_5", "map_score", "mrr"]:
best_name = max(metrics.keys(), key=lambda x: getattr(metrics[x], metric_name))
best_metrics[metric_name] = best_name
report.append(f"- **{metric_name}**: {best_name}\n")
report_text = "".join(report)
# 保存报告
if save_path:
with open(save_path, "w", encoding="utf-8") as f:
f.write(report_text)
print(f"报告已保存到: {save_path}")
return report_text
def plot_metrics_comparison(self, metrics: Dict[str, EvaluationMetrics],
save_path: Optional[str] = None):
"""
绘制指标比较图
Args:
metrics: 检索器名称到评估指标的映射
save_path: 图表保存路径
"""
# 准备数据
retriever_names = list(metrics.keys())
# 创建子图
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle("检索器性能比较", fontsize=16)
# Precision@K
ax = axes[0, 0]
k_values = sorted(list(metrics[retriever_names[0]].precision_at_k.keys()))
for name in retriever_names:
precision_values = [metrics[name].precision_at_k[k] for k in k_values]
ax.plot(k_values, precision_values, marker='o', label=name)
ax.set_title("Precision@K")
ax.set_xlabel("K")
ax.set_ylabel("Precision")
ax.legend()
ax.grid(True)
# Recall@K
ax = axes[0, 1]
for name in retriever_names:
recall_values = [metrics[name].recall_at_k[k] for k in k_values]
ax.plot(k_values, recall_values, marker='o', label=name)
ax.set_title("Recall@K")
ax.set_xlabel("K")
ax.set_ylabel("Recall")
ax.legend()
ax.grid(True)
# F1@K
ax = axes[0, 2]
for name in retriever_names:
f1_values = [metrics[name].f1_at_k[k] for k in k_values]
ax.plot(k_values, f1_values, marker='o', label=name)
ax.set_title("F1@K")
ax.set_xlabel("K")
ax.set_ylabel("F1")
ax.legend()
ax.grid(True)
# NDCG@K
ax = axes[1, 0]
for name in retriever_names:
ndcg_values = [metrics[name].ndcg_at_k[k] for k in k_values]
ax.plot(k_values, ndcg_values, marker='o', label=name)
ax.set_title("NDCG@K")
ax.set_xlabel("K")
ax.set_ylabel("NDCG")
ax.legend()
ax.grid(True)
# MAP和MRR
ax = axes[1, 1]
map_values = [metrics[name].map_score for name in retriever_names]
mrr_values = [metrics[name].mrr for name in retriever_names]
x = np.arange(len(retriever_names))
width = 0.35
ax.bar(x - width/2, map_values, width, label='MAP')
ax.bar(x + width/2, mrr_values, width, label='MRR')
ax.set_title("MAP和MRR")
ax.set_xticks(x)
ax.set_xticklabels(retriever_names)
ax.legend()
ax.grid(True)
# 其他指标
ax = axes[1, 2]
other_metrics = ['coverage', 'diversity', 'novelty']
metric_values = {metric: [] for metric in other_metrics}
for name in retriever_names:
for metric in other_metrics:
metric_values[metric].append(getattr(metrics[name], metric))
x = np.arange(len(retriever_names))
width = 0.25
for i, metric in enumerate(other_metrics):
ax.bar(x + i*width, metric_values[metric], width, label=metric)
ax.set_title("其他指标")
ax.set_xticks(x + width)
ax.set_xticklabels(retriever_names)
ax.legend()
ax.grid(True)
plt.tight_layout()
# 保存图表
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"图表已保存到: {save_path}")
plt.show()
class RetrievalTestSet:
"""检索测试集"""
def __init__(self, queries_file: str, documents_file: str, qrels_file: str):
"""
初始化测试集
Args:
queries_file: 查询文件路径,每行一个查询
documents_file: 文档文件路径,每行一个文档
qrels_file: 相关性标注文件路径,格式为: query_id,doc_id,relevance
"""
self.queries = self._load_queries(queries_file)
self.documents = self._load_documents(documents_file)
self.qrels = self._load_qrels(qrels_file)
def _load_queries(self, file_path: str) -> Dict[str, str]:
"""加载查询"""
queries = {}
with open(file_path, 'r', encoding='utf-8') as f:
for i, line in enumerate(f):
queries[str(i)] = line.strip()
return queries
def _load_documents(self, file_path: str) -> Dict[str, Document]:
"""加载文档"""
documents = {}
with open(file_path, 'r', encoding='utf-8') as f:
for i, line in enumerate(f):
doc = Document(page_content=line.strip(), metadata={"doc_id": str(i)})
documents[str(i)] = doc
return documents
def _load_qrels(self, file_path: str) -> Dict[str, Dict[str, int]]:
"""加载相关性标注"""
qrels = {}
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
parts = line.strip().split(',')
if len(parts) >= 3:
query_id, doc_id, relevance = parts[0], parts[1], int(parts[2])
if query_id not in qrels:
qrels[query_id] = {}
qrels[query_id][doc_id] = relevance
return qrels
def get_retrieval_results(self, retriever, top_k: int = 10) -> List[RetrievalResult]:
"""
使用指定检索器获取检索结果
Args:
retriever: 检索器,需要有一个retrieve(query, top_k)方法
top_k: 返回的文档数量
Returns:
检索结果列表
"""
results = []
for query_id, query_text in self.queries.items():
start_time = time.time()
retrieved_docs = retriever.retrieve(query_text, top_k)
retrieval_time = time.time() - start_time
# 获取相关文档
relevant_docs = []
if query_id in self.qrels:
for doc_id, relevance in self.qrels[query_id].items():
if relevance > 0 and doc_id in self.documents:
relevant_docs.append(self.documents[doc_id])
result = RetrievalResult(
query=query_text,
retrieved_docs=retrieved_docs,
relevant_docs=relevant_docs,
retrieval_time=retrieval_time
)
results.append(result)
return results
def create_sample_test_set():
"""创建示例测试集"""
# 创建示例查询
queries = [
"什么是机器学习?",
"深度学习和机器学习的区别是什么?",
"如何评估机器学习模型的性能?",
"自然语言处理有哪些应用?",
"计算机视觉的基本任务是什么?"
]
# 创建示例文档
documents = [
"机器学习是人工智能的一个分支,它使计算机能够在没有明确编程的情况下学习和改进。",
"深度学习是机器学习的一个子集,它使用多层神经网络来模拟人脑的工作方式。",
"评估机器学习模型的常用指标包括准确率、精确率、召回率和F1分数。",
"自然语言处理是计算机科学和人工智能的一个分支,专注于计算机与人类语言之间的交互。",
"计算机视觉是人工智能的一个领域,训练计算机解释和理解视觉世界。",
"强化学习是机器学习的一个类型,它关注软件代理应该如何在环境中采取行动以最大化累积奖励。",
"数据预处理是机器学习流程中的重要步骤,包括数据清洗、特征选择和特征工程。",
"过拟合是机器学习中的一个常见问题,指模型在训练数据上表现良好但在新数据上表现不佳。",
"卷积神经网络(CNN)是一类深度神经网络,最常用于分析视觉图像。",
"循环神经网络(RNN)是一类人工神经网络,其中节点之间的连接形成有向图沿时间序列。"
]
# 创建相关性标注
qrels = {
"0": {"0": 2, "1": 1, "6": 1, "7": 1}, # 什么是机器学习?
"1": {"0": 1, "1": 2, "8": 1, "9": 1}, # 深度学习和机器学习的区别
"2": {"2": 2, "7": 1}, # 如何评估机器学习模型的性能
"3": {"3": 2, "9": 1}, # 自然语言处理的应用
"4": {"4": 2, "8": 1} # 计算机视觉的基本任务
}
# 保存文件
with open("sample_queries.txt", "w", encoding="utf-8") as f:
for query in queries:
f.write(query + "\n")
with open("sample_documents.txt", "w", encoding="utf-8") as f:
for doc in documents:
f.write(doc + "\n")
with open("sample_qrels.csv", "w", encoding="utf-8") as f:
for query_id, doc_relevance in qrels.items():
for doc_id, relevance in doc_relevance.items():
f.write(f"{query_id},{doc_id},{relevance}\n")
print("示例测试集已创建:")
print("- sample_queries.txt: 查询文件")
print("- sample_documents.txt: 文档文件")
print("- sample_qrels.csv: 相关性标注文件")
return RetrievalTestSet("sample_queries.txt", "sample_documents.txt", "sample_qrels.csv")
if __name__ == "__main__":
# 创建示例测试集
test_set = create_sample_test_set()
# 创建评估器
evaluator = RetrievalEvaluator()
# 这里应该使用您的实际检索器
# 以下是一个模拟的检索器,用于演示
class MockRetriever:
def __init__(self, name):
self.name = name
def retrieve(self, query, top_k=10):
# 模拟检索结果
import random
all_docs = list(test_set.documents.values())
# 模拟不同质量的检索器
if self.name == "good":
# 好的检索器:有更高概率返回相关文档
relevant_docs = [doc for doc in all_docs if any(keyword in doc.page_content.lower()
for keyword in query.lower().split()[:2])]
if relevant_docs:
results = relevant_docs[:min(top_k//2, len(relevant_docs))]
results += random.sample(all_docs, min(top_k-len(results), len(all_docs)))
else:
results = random.sample(all_docs, min(top_k, len(all_docs)))
elif self.name == "medium":
# 中等检索器
relevant_docs = [doc for doc in all_docs if any(keyword in doc.page_content.lower()
for keyword in [query.lower().split()[0]])]
if relevant_docs:
results = relevant_docs[:min(top_k//3, len(relevant_docs))]
results += random.sample(all_docs, min(top_k-len(results), len(all_docs)))
else:
results = random.sample(all_docs, min(top_k, len(all_docs)))
else:
# 差的检索器:随机返回
results = random.sample(all_docs, min(top_k, len(all_docs)))
return results
# 创建不同质量的检索器
good_retriever = MockRetriever("good")
medium_retriever = MockRetriever("medium")
poor_retriever = MockRetriever("poor")
# 获取检索结果
good_results = test_set.get_retrieval_results(good_retriever)
medium_results = test_set.get_retrieval_results(medium_retriever)
poor_results = test_set.get_retrieval_results(poor_retriever)
# 比较检索器
retriever_results = {
"好的检索器": good_results,
"中等检索器": medium_results,
"差的检索器": poor_results
}
# 评估检索器
metrics = evaluator.compare_retrievers(retriever_results)
# 生成报告
report = evaluator.generate_report(metrics, "retrieval_evaluation_report.md")
print(report)
# 绘制比较图
evaluator.plot_metrics_comparison(metrics, "retrieval_evaluation_comparison.png")