Test / src /evaluation /qa_evaluator.py
Архипов Дмитрий
test
565e754
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
import pandas as pd
class AnswerEvaluation(BaseModel):
is_valid: bool = Field(
description="Является ли ответ валидным и корректным относительно вопроса и оригинального текста"
)
relevance_score: float = Field(
description="Оценка релевантности ответа вопросу от 0.0 до 1.0",
ge=0.0,
le=1.0
)
completeness_score: float = Field(
description="Оценка полноты ответа от 0.0 до 1.0 (насколько ответ покрывает всю необходимую информацию)",
ge=0.0,
le=1.0
)
factual_accuracy_score: float = Field(
description="Оценка фактической точности ответа от 0.0 до 1.0 (соответствие фактам из оригинального текста)",
ge=0.0,
le=1.0
)
class QuestionBatchIterator:
def __init__(self, questions, batch_size):
self.questions = questions
self.batch_size = batch_size
self.current_idx = 0
def __iter__(self):
return self
def __next__(self):
if self.current_idx >= len(self.questions):
raise StopIteration
batch = self.questions[self.current_idx:self.current_idx + self.batch_size]
self.current_idx += self.batch_size
return batch
def __len__(self):
return (len(self.questions) + self.batch_size - 1) // self.batch_size
def reset(self):
self.current_idx = 0
class QAEvaluator:
def __init__(
self,
df,
text_column="original_text",
model="qwen/qwen3-next-80b-a3b-instruct",
temperature=0.0,
api_key=None,
api_base="https://api.proxyapi.ru/openrouter/v1"
):
self.df = df.copy()
self.original_text_column = text_column
self.api_key = api_key
self.llm = ChatOpenAI(
model=model,
temperature=temperature,
openai_api_key=self.api_key,
openai_api_base=api_base,
)
self._setup_evaluation_agent()
self._current_question_column = None
self._questions_data = None
def _setup_evaluation_agent(self):
self.parser = PydanticOutputParser(pydantic_object=AnswerEvaluation)
self.prompt = ChatPromptTemplate.from_messages([
("system", """Ты - эксперт по оценке качества ответов на вопросы по новостным текстам.
Твоя задача - оценить, насколько ответ корректен и полон относительно заданного вопроса и оригинального текста.
## Критерии оценки:
### is_valid (валидность):
- True: ответ корректно отвечает на вопрос и соответствует фактам из текста
- False: ответ неверный, не по теме, или содержит фактические ошибки
### relevance_score (релевантность, 0.0-1.0):
- 1.0: ответ полностью по теме вопроса
- 0.5: ответ частично по теме
- 0.0: ответ не имеет отношения к вопросу
### completeness_score (полнота, 0.0-1.0):
- 1.0: ответ содержит всю необходимую информацию
- 0.5: ответ содержит часть информации
- 0.0: ответ пустой или не содержит нужной информации
### factual_accuracy_score (фактическая точность, 0.0-1.0):
- 1.0: все факты в ответе соответствуют оригинальному тексту
- 0.5: есть небольшие неточности
- 0.0: факты в ответе противоречат оригинальному тексту
{format_instructions}"""),
("human", """Оцени следующий ответ:
## Оригинальный текст поста:
{original_text}
## Вопрос:
{question}
## Ответ для оценки:
{answer}
Проанализируй и выдай оценку.""")
])
self.evaluation_chain = self.prompt | self.llm | self.parser
def get_questions(self, question_column, batch_size=10):
if question_column not in self.df.columns:
raise ValueError(f"Колонка '{question_column}' не найдена в DataFrame. "
f"Доступные колонки: {list(self.df.columns)}")
self._current_question_column = question_column
self._questions_data = []
for idx, row in self.df.iterrows():
self._questions_data.append({
"index": idx,
"question": row[question_column],
"original_text": row[self.original_text_column]
})
questions = [item["question"] for item in self._questions_data]
return QuestionBatchIterator(questions, batch_size)
def evaluate_answers(self, answers, show_progress=True):
if self._questions_data is None:
raise ValueError("Сначала вызовите get_questions() для получения вопросов")
if len(answers) != len(self._questions_data):
raise ValueError(
f"Количество ответов ({len(answers)}) не совпадает с количеством "
f"вопросов ({len(self._questions_data)})"
)
total_questions = len(answers)
valid_answers = 0
invalid_answers = 0
detailed_results = []
relevance_scores = []
completeness_scores = []
factual_accuracy_scores = []
if show_progress:
from tqdm import tqdm
iterator = tqdm(
zip(self._questions_data, answers),
total=len(answers),
desc="Оценка ответов"
)
else:
iterator = zip(self._questions_data, answers)
for qa_data, answer in iterator:
try:
evaluation = self._evaluate_single_answer(
original_text=qa_data["original_text"],
question=qa_data["question"],
answer=answer
)
if evaluation.is_valid:
valid_answers += 1
else:
invalid_answers += 1
relevance_scores.append(evaluation.relevance_score)
completeness_scores.append(evaluation.completeness_score)
factual_accuracy_scores.append(evaluation.factual_accuracy_score)
detailed_results.append({
"index": qa_data["index"],
"question": qa_data["question"],
"answer": answer,
"is_valid": evaluation.is_valid,
"relevance_score": evaluation.relevance_score,
"completeness_score": evaluation.completeness_score,
"factual_accuracy_score": evaluation.factual_accuracy_score,
})
except Exception as e:
print(f"Ошибка при оценке ответа: {e}")
invalid_answers += 1
relevance_scores.append(0.0)
completeness_scores.append(0.0)
factual_accuracy_scores.append(0.0)
detailed_results.append({
"index": qa_data["index"],
"question": qa_data["question"],
"answer": answer,
"is_valid": False,
"relevance_score": 0.0,
"completeness_score": 0.0,
"factual_accuracy_score": 0.0
})
avg_relevance = sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0.0
avg_completeness = sum(completeness_scores) / len(completeness_scores) if completeness_scores else 0.0
avg_factual_accuracy = sum(factual_accuracy_scores) / len(factual_accuracy_scores) if factual_accuracy_scores else 0.0
accuracy = valid_answers / total_questions if total_questions > 0 else 0.0
combined_score = (avg_relevance + avg_completeness + avg_factual_accuracy) / 3
return {
"total_questions": total_questions,
"valid_answers": valid_answers,
"invalid_answers": invalid_answers,
"accuracy": accuracy,
"avg_relevance": avg_relevance,
"avg_completeness": avg_completeness,
"avg_factual_accuracy": avg_factual_accuracy,
"combined_score": combined_score,
"detailed_results": detailed_results,
}
def _evaluate_single_answer(self, original_text, question, answer):
if answer is None or (isinstance(answer, str) and answer.strip() == ""):
return AnswerEvaluation(
is_valid=False,
relevance_score=0.0,
completeness_score=0.0,
factual_accuracy_score=0.0,
)
result = self.evaluation_chain.invoke({
"original_text": original_text,
"question": question,
"answer": answer,
"format_instructions": self.parser.get_format_instructions()
})
return result
def get_detailed_results_df(self, metrics):
return pd.DataFrame(metrics["detailed_results"])