| from langchain_core.output_parsers import PydanticOutputParser | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_openai import ChatOpenAI | |
| from pydantic import BaseModel, Field | |
| import pandas as pd | |
| class AnswerEvaluation(BaseModel): | |
| is_valid: bool = Field( | |
| description="Является ли ответ валидным и корректным относительно вопроса и оригинального текста" | |
| ) | |
| relevance_score: float = Field( | |
| description="Оценка релевантности ответа вопросу от 0.0 до 1.0", | |
| ge=0.0, | |
| le=1.0 | |
| ) | |
| completeness_score: float = Field( | |
| description="Оценка полноты ответа от 0.0 до 1.0 (насколько ответ покрывает всю необходимую информацию)", | |
| ge=0.0, | |
| le=1.0 | |
| ) | |
| factual_accuracy_score: float = Field( | |
| description="Оценка фактической точности ответа от 0.0 до 1.0 (соответствие фактам из оригинального текста)", | |
| ge=0.0, | |
| le=1.0 | |
| ) | |
| class QuestionBatchIterator: | |
| def __init__(self, questions, batch_size): | |
| self.questions = questions | |
| self.batch_size = batch_size | |
| self.current_idx = 0 | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| if self.current_idx >= len(self.questions): | |
| raise StopIteration | |
| batch = self.questions[self.current_idx:self.current_idx + self.batch_size] | |
| self.current_idx += self.batch_size | |
| return batch | |
| def __len__(self): | |
| return (len(self.questions) + self.batch_size - 1) // self.batch_size | |
| def reset(self): | |
| self.current_idx = 0 | |
| class QAEvaluator: | |
| def __init__( | |
| self, | |
| df, | |
| text_column="original_text", | |
| model="qwen/qwen3-next-80b-a3b-instruct", | |
| temperature=0.0, | |
| api_key=None, | |
| api_base="https://api.proxyapi.ru/openrouter/v1" | |
| ): | |
| self.df = df.copy() | |
| self.original_text_column = text_column | |
| self.api_key = api_key | |
| self.llm = ChatOpenAI( | |
| model=model, | |
| temperature=temperature, | |
| openai_api_key=self.api_key, | |
| openai_api_base=api_base, | |
| ) | |
| self._setup_evaluation_agent() | |
| self._current_question_column = None | |
| self._questions_data = None | |
| def _setup_evaluation_agent(self): | |
| self.parser = PydanticOutputParser(pydantic_object=AnswerEvaluation) | |
| self.prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """Ты - эксперт по оценке качества ответов на вопросы по новостным текстам. | |
| Твоя задача - оценить, насколько ответ корректен и полон относительно заданного вопроса и оригинального текста. | |
| ## Критерии оценки: | |
| ### is_valid (валидность): | |
| - True: ответ корректно отвечает на вопрос и соответствует фактам из текста | |
| - False: ответ неверный, не по теме, или содержит фактические ошибки | |
| ### relevance_score (релевантность, 0.0-1.0): | |
| - 1.0: ответ полностью по теме вопроса | |
| - 0.5: ответ частично по теме | |
| - 0.0: ответ не имеет отношения к вопросу | |
| ### completeness_score (полнота, 0.0-1.0): | |
| - 1.0: ответ содержит всю необходимую информацию | |
| - 0.5: ответ содержит часть информации | |
| - 0.0: ответ пустой или не содержит нужной информации | |
| ### factual_accuracy_score (фактическая точность, 0.0-1.0): | |
| - 1.0: все факты в ответе соответствуют оригинальному тексту | |
| - 0.5: есть небольшие неточности | |
| - 0.0: факты в ответе противоречат оригинальному тексту | |
| {format_instructions}"""), | |
| ("human", """Оцени следующий ответ: | |
| ## Оригинальный текст поста: | |
| {original_text} | |
| ## Вопрос: | |
| {question} | |
| ## Ответ для оценки: | |
| {answer} | |
| Проанализируй и выдай оценку.""") | |
| ]) | |
| self.evaluation_chain = self.prompt | self.llm | self.parser | |
| def get_questions(self, question_column, batch_size=10): | |
| if question_column not in self.df.columns: | |
| raise ValueError(f"Колонка '{question_column}' не найдена в DataFrame. " | |
| f"Доступные колонки: {list(self.df.columns)}") | |
| self._current_question_column = question_column | |
| self._questions_data = [] | |
| for idx, row in self.df.iterrows(): | |
| self._questions_data.append({ | |
| "index": idx, | |
| "question": row[question_column], | |
| "original_text": row[self.original_text_column] | |
| }) | |
| questions = [item["question"] for item in self._questions_data] | |
| return QuestionBatchIterator(questions, batch_size) | |
| def evaluate_answers(self, answers, show_progress=True): | |
| if self._questions_data is None: | |
| raise ValueError("Сначала вызовите get_questions() для получения вопросов") | |
| if len(answers) != len(self._questions_data): | |
| raise ValueError( | |
| f"Количество ответов ({len(answers)}) не совпадает с количеством " | |
| f"вопросов ({len(self._questions_data)})" | |
| ) | |
| total_questions = len(answers) | |
| valid_answers = 0 | |
| invalid_answers = 0 | |
| detailed_results = [] | |
| relevance_scores = [] | |
| completeness_scores = [] | |
| factual_accuracy_scores = [] | |
| if show_progress: | |
| from tqdm import tqdm | |
| iterator = tqdm( | |
| zip(self._questions_data, answers), | |
| total=len(answers), | |
| desc="Оценка ответов" | |
| ) | |
| else: | |
| iterator = zip(self._questions_data, answers) | |
| for qa_data, answer in iterator: | |
| try: | |
| evaluation = self._evaluate_single_answer( | |
| original_text=qa_data["original_text"], | |
| question=qa_data["question"], | |
| answer=answer | |
| ) | |
| if evaluation.is_valid: | |
| valid_answers += 1 | |
| else: | |
| invalid_answers += 1 | |
| relevance_scores.append(evaluation.relevance_score) | |
| completeness_scores.append(evaluation.completeness_score) | |
| factual_accuracy_scores.append(evaluation.factual_accuracy_score) | |
| detailed_results.append({ | |
| "index": qa_data["index"], | |
| "question": qa_data["question"], | |
| "answer": answer, | |
| "is_valid": evaluation.is_valid, | |
| "relevance_score": evaluation.relevance_score, | |
| "completeness_score": evaluation.completeness_score, | |
| "factual_accuracy_score": evaluation.factual_accuracy_score, | |
| }) | |
| except Exception as e: | |
| print(f"Ошибка при оценке ответа: {e}") | |
| invalid_answers += 1 | |
| relevance_scores.append(0.0) | |
| completeness_scores.append(0.0) | |
| factual_accuracy_scores.append(0.0) | |
| detailed_results.append({ | |
| "index": qa_data["index"], | |
| "question": qa_data["question"], | |
| "answer": answer, | |
| "is_valid": False, | |
| "relevance_score": 0.0, | |
| "completeness_score": 0.0, | |
| "factual_accuracy_score": 0.0 | |
| }) | |
| avg_relevance = sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0.0 | |
| avg_completeness = sum(completeness_scores) / len(completeness_scores) if completeness_scores else 0.0 | |
| avg_factual_accuracy = sum(factual_accuracy_scores) / len(factual_accuracy_scores) if factual_accuracy_scores else 0.0 | |
| accuracy = valid_answers / total_questions if total_questions > 0 else 0.0 | |
| combined_score = (avg_relevance + avg_completeness + avg_factual_accuracy) / 3 | |
| return { | |
| "total_questions": total_questions, | |
| "valid_answers": valid_answers, | |
| "invalid_answers": invalid_answers, | |
| "accuracy": accuracy, | |
| "avg_relevance": avg_relevance, | |
| "avg_completeness": avg_completeness, | |
| "avg_factual_accuracy": avg_factual_accuracy, | |
| "combined_score": combined_score, | |
| "detailed_results": detailed_results, | |
| } | |
| def _evaluate_single_answer(self, original_text, question, answer): | |
| if answer is None or (isinstance(answer, str) and answer.strip() == ""): | |
| return AnswerEvaluation( | |
| is_valid=False, | |
| relevance_score=0.0, | |
| completeness_score=0.0, | |
| factual_accuracy_score=0.0, | |
| ) | |
| result = self.evaluation_chain.invoke({ | |
| "original_text": original_text, | |
| "question": question, | |
| "answer": answer, | |
| "format_instructions": self.parser.get_format_instructions() | |
| }) | |
| return result | |
| def get_detailed_results_df(self, metrics): | |
| return pd.DataFrame(metrics["detailed_results"]) | |