Spaces:
Running
Running
| from typing import Any, Dict | |
| import pandas as pd | |
| from graphgen.bases import BaseLLMWrapper, BaseOperator, QAPair | |
| from graphgen.common import init_llm, init_storage | |
| from graphgen.utils import logger, run_concurrent | |
| class EvaluateService(BaseOperator): | |
| """ | |
| 1. KG Quality Evaluation | |
| 2. QA Quality Evaluation | |
| """ | |
| def __init__( | |
| self, | |
| working_dir: str = "cache", | |
| metrics: list[str] = None, | |
| graph_backend: str = "kuzu", | |
| kv_backend: str = "rocksdb", | |
| **kwargs, | |
| ): | |
| super().__init__(working_dir=working_dir, op_name="evaluate_service") | |
| self.llm_client: BaseLLMWrapper = init_llm("synthesizer") | |
| self.metrics = metrics or [] | |
| self.kwargs = kwargs | |
| self.graph_storage = init_storage( | |
| backend=graph_backend, working_dir=working_dir, namespace="graph" | |
| ) | |
| self.chunk_storage = init_storage( | |
| backend=kv_backend, working_dir=working_dir, namespace="chunk" | |
| ) | |
| # Initialize evaluators | |
| self.qa_evaluators = {} | |
| self.kg_evaluators = {} | |
| self._init_evaluators() | |
| def _init_evaluators(self): | |
| """Initialize QA and KG evaluators based on metrics.""" | |
| for metric in self.metrics: | |
| if metric == "qa_length": | |
| from graphgen.models import LengthEvaluator | |
| self.qa_evaluators[metric] = LengthEvaluator() | |
| elif metric == "qa_mtld": | |
| from graphgen.models import MTLDEvaluator | |
| self.qa_evaluators[metric] = MTLDEvaluator( | |
| **self.kwargs.get("mtld_params", {}) | |
| ) | |
| elif metric == "qa_reward_score": | |
| from graphgen.models import RewardEvaluator | |
| self.qa_evaluators[metric] = RewardEvaluator( | |
| **self.kwargs.get("reward_params", {}) | |
| ) | |
| elif metric == "qa_uni_score": | |
| from graphgen.models import UniEvaluator | |
| self.qa_evaluators[metric] = UniEvaluator( | |
| **self.kwargs.get("uni_params", {}) | |
| ) | |
| elif metric == "kg_accuracy": | |
| from graphgen.models import AccuracyEvaluator | |
| self.kg_evaluators[metric] = AccuracyEvaluator( | |
| graph_storage=self.graph_storage, | |
| chunk_storage=self.chunk_storage, | |
| llm_client=self.llm_client, | |
| ) | |
| elif metric == "kg_consistency": | |
| from graphgen.models import ConsistencyEvaluator | |
| self.kg_evaluators[metric] = ConsistencyEvaluator( | |
| graph_storage=self.graph_storage, | |
| chunk_storage=self.chunk_storage, | |
| llm_client=self.llm_client, | |
| ) | |
| elif metric == "kg_structure": | |
| from graphgen.models import StructureEvaluator | |
| self.kg_evaluators[metric] = StructureEvaluator( | |
| graph_storage=self.graph_storage, | |
| **self.kwargs.get("structure_params", {}), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown QA metric: {metric}") | |
| async def _process_single_qa(self, item: dict[str, Any]) -> dict[str, Any]: | |
| try: | |
| qa_pair = QAPair( | |
| question=str(item.get("question", "")), | |
| answer=str(item.get("answer", "")), | |
| ) | |
| if not qa_pair.question or not qa_pair.answer: | |
| self.logger.error("Empty question or answer, skipping.") | |
| return {} | |
| except Exception as e: | |
| self.logger.error("Error in QAPair creation: %s", str(e)) | |
| return {} | |
| for metric, evaluator in self.qa_evaluators.items(): | |
| try: | |
| score = evaluator.evaluate(qa_pair) | |
| if isinstance(score, dict): | |
| for sub_metric, sub_score in score.items(): | |
| item[f"{metric}_{sub_metric}"] = float(sub_score) | |
| else: | |
| item[metric] = float(score) | |
| except Exception as e: | |
| self.logger.error("Error in %s evaluation: %s", metric, str(e)) | |
| item[metric] = None | |
| return item | |
| def _evaluate_qa(self, items: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| def transform_messages_format(items: list[dict]) -> list[dict]: | |
| """ | |
| Transform from [{'messages': [...]}, ...] to [{'question': '...', 'answer': '...'}, ...] | |
| """ | |
| transformed = [] | |
| for item in items: | |
| messages = item.get("messages", []) | |
| question = next( | |
| (m["content"] for m in messages if m.get("role") == "user"), "" | |
| ) | |
| answer = next( | |
| (m["content"] for m in messages if m.get("role") == "assistant"), "" | |
| ) | |
| transformed.append({"question": question, "answer": answer}) | |
| return transformed | |
| if not items: | |
| return [] | |
| if not self.qa_evaluators: | |
| self.logger.warning("No QA evaluators initialized, skipping QA evaluation") | |
| return [] | |
| items = transform_messages_format(items) | |
| results = run_concurrent( | |
| self._process_single_qa, | |
| items, | |
| desc="Evaluating QA items", | |
| unit="item", | |
| ) | |
| results = [item for item in results if item] | |
| return results | |
| def _evaluate_kg(self) -> Dict[str, Any]: | |
| results = {} | |
| for metric, evaluator in self.kg_evaluators.items(): | |
| try: | |
| self.logger.info("Running %s evaluation...", metric) | |
| score = evaluator.evaluate() | |
| results[metric] = score | |
| except Exception as e: | |
| self.logger.error("Error in %s evaluation: %s", metric, str(e)) | |
| results[metric] = {"error": str(e)} | |
| return results | |
| def process(self, batch: pd.DataFrame) -> pd.DataFrame: | |
| # QA evaluation | |
| if len(self.qa_evaluators) > 0: | |
| items = batch.to_dict(orient="records") | |
| results = self._evaluate_qa(items) | |
| return pd.DataFrame(results) | |
| # KG evaluation | |
| if len(self.kg_evaluators) > 0: | |
| results = self._evaluate_kg() | |
| # Convert dict to DataFrame (single row) | |
| return pd.DataFrame([results]) | |
| # No metrics specified | |
| logger.warning("No metrics specified, returning empty DataFrame") | |
| return pd.DataFrame() | |