|
|
import os |
|
|
import regex |
|
|
from typing import Any, List, Callable |
|
|
from ..core.logging import logger |
|
|
from .benchmark import Benchmark |
|
|
from ..utils.utils import download_file |
|
|
from ..core.module_utils import load_json |
|
|
from ..utils.aflow_utils.data_utils import AFLOW_DATASET_FILES_MAP, download_aflow_benchmark_data |
|
|
|
|
|
|
|
|
GSM8K_FILES_MAP = {"train": "train.jsonl", "dev": None, "test": "test.jsonl"} |
|
|
VALID_RAW_GSM8K_FILES = [file for file in list(GSM8K_FILES_MAP.values()) if file is not None] |
|
|
|
|
|
def download_raw_gsm8k_data(name: str, save_folder: str): |
|
|
|
|
|
assert name in VALID_RAW_GSM8K_FILES, f"'{name}' is an invalid GSM8K file name. Available file names: {VALID_RAW_GSM8K_FILES}" |
|
|
url = f"https://raw.githubusercontent.com/openai/grade-school-math/master/grade_school_math/data/{name}" |
|
|
typ = "train" if "train" in name else "test" |
|
|
logger.info(f"Downloading GSM8K {typ} data from: {url}") |
|
|
download_file(url=url, save_file=os.path.join(save_folder, name)) |
|
|
|
|
|
|
|
|
def load_gsm8k_data(file_path: str) -> List[dict]: |
|
|
|
|
|
base_name = os.path.basename(file_path) |
|
|
file_type_map = {file_name: typ for typ, file_name in GSM8K_FILES_MAP.items()} |
|
|
assert base_name in file_type_map, f"'{base_name}' is an invalid gsm8k file name. Available file names: {VALID_RAW_GSM8K_FILES}" |
|
|
|
|
|
typ = file_type_map[base_name] |
|
|
data = load_json(path=file_path, type="jsonl") |
|
|
new_data = [] |
|
|
for i, example in enumerate(data): |
|
|
item = {"id": f"{typ}-{i+1}"} |
|
|
item.update(example) |
|
|
new_data.append(item) |
|
|
return new_data |
|
|
|
|
|
|
|
|
class GSM8K(Benchmark): |
|
|
|
|
|
"""Benchmark class for evaluating math reasoning on GSM8K dataset. |
|
|
|
|
|
GSM8K (Grade School Math 8K) is a dataset of math word problems that |
|
|
test a model's ability to solve grade school level math problems requiring |
|
|
multi-step reasoning. This class handles loading the dataset, evaluating |
|
|
solutions, and computing metrics based on answer accuracy. |
|
|
|
|
|
Each GSM8K example has the following structure: |
|
|
{ |
|
|
"id": "test-1", |
|
|
"question": "the question", |
|
|
"answer": "the answer" |
|
|
} |
|
|
|
|
|
The benchmark evaluates answers by extracting the final numerical value |
|
|
and comparing it to the ground truth answer. |
|
|
""" |
|
|
|
|
|
def __init__(self, path: str = None, mode: str = "all", **kwargs): |
|
|
path = os.path.expanduser(path or "~/.evoagentx/data/gsm8k") |
|
|
super().__init__(name=type(self).__name__, path=path, mode=mode, **kwargs) |
|
|
|
|
|
def _load_data_from_file(self, file_name: str): |
|
|
if file_name is None: |
|
|
return None |
|
|
file_path = os.path.join(self.path, file_name) |
|
|
if not os.path.exists(file_path): |
|
|
download_raw_gsm8k_data(name=file_name, save_folder=self.path) |
|
|
|
|
|
logger.info(f"loading GSM8K data from {file_path} ...") |
|
|
return load_gsm8k_data(file_path=file_path) |
|
|
|
|
|
def _load_data(self): |
|
|
if self.mode == "train" or self.mode == "all": |
|
|
self._train_data = self._load_data_from_file(file_name=GSM8K_FILES_MAP["train"]) |
|
|
if self.mode == "dev" or self.mode == "all": |
|
|
self._dev_data = self._load_data_from_file(file_name=GSM8K_FILES_MAP["dev"]) |
|
|
if self.mode == "test" or self.mode == "all": |
|
|
self._test_data = self._load_data_from_file(file_name=GSM8K_FILES_MAP["test"]) |
|
|
|
|
|
def _get_label(self, example: Any) -> Any: |
|
|
return example["answer"] |
|
|
|
|
|
def _get_id(self, example: Any) -> Any: |
|
|
return example["id"] |
|
|
|
|
|
def extract_last_number(self, text: str) -> float: |
|
|
""" |
|
|
Extract the last number from a text. |
|
|
""" |
|
|
matches = regex.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text)) |
|
|
if matches: |
|
|
last_number = matches[-1].replace(",", "").strip() |
|
|
try: |
|
|
last_number = float(last_number) |
|
|
return last_number |
|
|
except ValueError: |
|
|
return None |
|
|
return None |
|
|
|
|
|
def evaluate(self, prediction: Any, label: Any) -> dict: |
|
|
ground_truth_answer = self.extract_last_number(label) |
|
|
predicted_answer = self.extract_last_number(prediction) |
|
|
if predicted_answer is None: |
|
|
return {"solve_rate": 0.0} |
|
|
solve_rate = 1.0 if abs(predicted_answer - ground_truth_answer) < 1e-6 else 0.0 |
|
|
return {"solve_rate": solve_rate} |
|
|
|
|
|
|
|
|
class AFlowGSM8K(GSM8K): |
|
|
|
|
|
"""AFlow-specific implementation of GSM8K benchmark. |
|
|
|
|
|
This class extends the GSM8K benchmark with features specific to the |
|
|
AFlow framework, including loading from AFlow-formatted data files and |
|
|
supporting asynchronous evaluation for workflows. |
|
|
|
|
|
Attributes: |
|
|
path: Path to the directory containing AFlow-formatted GSM8K files. |
|
|
mode: Data loading mode ("train", "dev", "test", or "all"). |
|
|
_train_data: Training dataset loaded from AFlow format. |
|
|
_dev_data: Development dataset loaded from AFlow format. |
|
|
_test_data: Test dataset loaded from AFlow format. |
|
|
""" |
|
|
|
|
|
def __init__(self, path: str = None, mode: str = "all", **kwargs): |
|
|
path = os.path.expanduser(path or "~/.evoagentx/data/aflow/gsm8k") |
|
|
super().__init__(path=path, mode=mode, **kwargs) |
|
|
|
|
|
def _load_data_from_file(self, file_name: str): |
|
|
if file_name is None: |
|
|
return None |
|
|
file_path = os.path.join(self.path, file_name) |
|
|
if not os.path.exists(file_path): |
|
|
download_aflow_benchmark_data(dataset="gsm8k", save_folder=self.path) |
|
|
return load_json(path=file_path, type="jsonl") |
|
|
|
|
|
def _load_data(self): |
|
|
|
|
|
if self.mode == "train" or self.mode == "all": |
|
|
logger.info(f"Loading train data from {AFLOW_DATASET_FILES_MAP['gsm8k']['train']}") |
|
|
self._train_data = self._load_data_from_file(file_name=AFLOW_DATASET_FILES_MAP["gsm8k"]["train"]) |
|
|
if self.mode == "dev" or self.mode == "all": |
|
|
logger.info(f"Loading dev data from {AFLOW_DATASET_FILES_MAP['gsm8k']['dev']}") |
|
|
self._dev_data = self._load_data_from_file(file_name=AFLOW_DATASET_FILES_MAP["gsm8k"]["dev"]) |
|
|
if self.mode == "test" or self.mode == "all": |
|
|
logger.info(f"Loading test data from {AFLOW_DATASET_FILES_MAP['gsm8k']['test']}") |
|
|
self._test_data = self._load_data_from_file(file_name=AFLOW_DATASET_FILES_MAP["gsm8k"]["test"]) |
|
|
|
|
|
async def async_evaluate(self, graph: Callable, example: Any) -> float: |
|
|
|
|
|
input_text = example["question"] |
|
|
label = self._get_label(example) |
|
|
output = await graph(input_text) |
|
|
metrics = await super().async_evaluate(prediction=output, label=label) |
|
|
return metrics["solve_rate"] |