GraphGen / graphgen /operators /evaluate /evaluate_service.py
github-actions[bot]
Auto-sync from demo at Fri Dec 26 08:29:01 UTC 2025
7566ac3
from typing import Any, Dict
import pandas as pd
from graphgen.bases import BaseLLMWrapper, BaseOperator, QAPair
from graphgen.common import init_llm, init_storage
from graphgen.utils import logger, run_concurrent
class EvaluateService(BaseOperator):
"""
1. KG Quality Evaluation
2. QA Quality Evaluation
"""
def __init__(
self,
working_dir: str = "cache",
metrics: list[str] = None,
graph_backend: str = "kuzu",
kv_backend: str = "rocksdb",
**kwargs,
):
super().__init__(working_dir=working_dir, op_name="evaluate_service")
self.llm_client: BaseLLMWrapper = init_llm("synthesizer")
self.metrics = metrics or []
self.kwargs = kwargs
self.graph_storage = init_storage(
backend=graph_backend, working_dir=working_dir, namespace="graph"
)
self.chunk_storage = init_storage(
backend=kv_backend, working_dir=working_dir, namespace="chunk"
)
# Initialize evaluators
self.qa_evaluators = {}
self.kg_evaluators = {}
self._init_evaluators()
def _init_evaluators(self):
"""Initialize QA and KG evaluators based on metrics."""
for metric in self.metrics:
if metric == "qa_length":
from graphgen.models import LengthEvaluator
self.qa_evaluators[metric] = LengthEvaluator()
elif metric == "qa_mtld":
from graphgen.models import MTLDEvaluator
self.qa_evaluators[metric] = MTLDEvaluator(
**self.kwargs.get("mtld_params", {})
)
elif metric == "qa_reward_score":
from graphgen.models import RewardEvaluator
self.qa_evaluators[metric] = RewardEvaluator(
**self.kwargs.get("reward_params", {})
)
elif metric == "qa_uni_score":
from graphgen.models import UniEvaluator
self.qa_evaluators[metric] = UniEvaluator(
**self.kwargs.get("uni_params", {})
)
elif metric == "kg_accuracy":
from graphgen.models import AccuracyEvaluator
self.kg_evaluators[metric] = AccuracyEvaluator(
graph_storage=self.graph_storage,
chunk_storage=self.chunk_storage,
llm_client=self.llm_client,
)
elif metric == "kg_consistency":
from graphgen.models import ConsistencyEvaluator
self.kg_evaluators[metric] = ConsistencyEvaluator(
graph_storage=self.graph_storage,
chunk_storage=self.chunk_storage,
llm_client=self.llm_client,
)
elif metric == "kg_structure":
from graphgen.models import StructureEvaluator
self.kg_evaluators[metric] = StructureEvaluator(
graph_storage=self.graph_storage,
**self.kwargs.get("structure_params", {}),
)
else:
raise ValueError(f"Unknown QA metric: {metric}")
async def _process_single_qa(self, item: dict[str, Any]) -> dict[str, Any]:
try:
qa_pair = QAPair(
question=str(item.get("question", "")),
answer=str(item.get("answer", "")),
)
if not qa_pair.question or not qa_pair.answer:
self.logger.error("Empty question or answer, skipping.")
return {}
except Exception as e:
self.logger.error("Error in QAPair creation: %s", str(e))
return {}
for metric, evaluator in self.qa_evaluators.items():
try:
score = evaluator.evaluate(qa_pair)
if isinstance(score, dict):
for sub_metric, sub_score in score.items():
item[f"{metric}_{sub_metric}"] = float(sub_score)
else:
item[metric] = float(score)
except Exception as e:
self.logger.error("Error in %s evaluation: %s", metric, str(e))
item[metric] = None
return item
def _evaluate_qa(self, items: list[dict[str, Any]]) -> list[dict[str, Any]]:
def transform_messages_format(items: list[dict]) -> list[dict]:
"""
Transform from [{'messages': [...]}, ...] to [{'question': '...', 'answer': '...'}, ...]
"""
transformed = []
for item in items:
messages = item.get("messages", [])
question = next(
(m["content"] for m in messages if m.get("role") == "user"), ""
)
answer = next(
(m["content"] for m in messages if m.get("role") == "assistant"), ""
)
transformed.append({"question": question, "answer": answer})
return transformed
if not items:
return []
if not self.qa_evaluators:
self.logger.warning("No QA evaluators initialized, skipping QA evaluation")
return []
items = transform_messages_format(items)
results = run_concurrent(
self._process_single_qa,
items,
desc="Evaluating QA items",
unit="item",
)
results = [item for item in results if item]
return results
def _evaluate_kg(self) -> Dict[str, Any]:
results = {}
for metric, evaluator in self.kg_evaluators.items():
try:
self.logger.info("Running %s evaluation...", metric)
score = evaluator.evaluate()
results[metric] = score
except Exception as e:
self.logger.error("Error in %s evaluation: %s", metric, str(e))
results[metric] = {"error": str(e)}
return results
def process(self, batch: pd.DataFrame) -> pd.DataFrame:
# QA evaluation
if len(self.qa_evaluators) > 0:
items = batch.to_dict(orient="records")
results = self._evaluate_qa(items)
return pd.DataFrame(results)
# KG evaluation
if len(self.kg_evaluators) > 0:
results = self._evaluate_kg()
# Convert dict to DataFrame (single row)
return pd.DataFrame([results])
# No metrics specified
logger.warning("No metrics specified, returning empty DataFrame")
return pd.DataFrame()