File size: 7,838 Bytes
5374a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# Acknowledgement:
# This file is modified from the original AFlow repository: https://github.com/geekan/MetaGPT/blob/main/metagpt/ext/aflow/scripts/optimizer_utils/data_utils.py
import os
import json
import random
import tarfile
import requests
import datetime
import numpy as np
import pandas as pd
from ..utils import make_parent_folder
from ...core.logging import logger
from ...core.module_utils import load_json, save_json
AFLOW_DATASET_FILES_MAP = {
"hotpotqa": {"train": None, "dev": "hotpotqa_validate.jsonl", "test": "hotpotqa_test.jsonl"},
"humaneval": {"train": None, "dev": "humaneval_validate.jsonl", "test": "humaneval_test.jsonl", "test_cases": "humaneval_public_test.jsonl"},
"mbpp": {"train": None, "dev": "mbpp_validate.jsonl", "test": "mbpp_test.jsonl", "test_cases": "mbpp_public_test.jsonl"},
"gsm8k": {"train": None, "dev": "gsm8k_validate.jsonl", "test": "gsm8k_test.jsonl"},
"math": {"train": None, "dev": "math_validate.jsonl", "test": "math_test.jsonl"},
}
def extract_tar_gz(filename: str, extract_path: str) -> None:
"""Extract a tar.gz file to the specified path."""
with tarfile.open(filename, "r:gz") as tar:
tar.extractall(path=extract_path)
def download_aflow_benchmark_data(dataset: str, save_folder: str):
candidate_datasets = list(AFLOW_DATASET_FILES_MAP.keys()) + ["all"]
lower_candidate_datasets = [dataset.lower() for dataset in candidate_datasets]
if dataset.lower() not in lower_candidate_datasets:
raise ValueError(f"Invalid value for dataset: {dataset}. Available choices: {candidate_datasets}")
url = "https://drive.google.com/uc?export=download&id=1DNoegtZiUhWtvkd2xoIuElmIi4ah7k8e"
logger.info(f"Downloading AFlow benchmark data from {url} ...")
aflow_data_save_file = os.path.join(save_folder, "aflow_data.tar.gz")
# download_file(url=url, save_file=aflow_data_save_file)
make_parent_folder(aflow_data_save_file)
response = requests.get(url, stream=True)
response.raise_for_status()
with open(aflow_data_save_file, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
logger.info(f"Extracting data for {dataset} dataset(s) from {aflow_data_save_file} ...")
extract_tar_gz(aflow_data_save_file, extract_path=save_folder)
if dataset != "all":
dataset_files = [file for file in list(AFLOW_DATASET_FILES_MAP[dataset].values()) if file is not None]
for file in os.listdir(save_folder):
if file not in dataset_files:
os.remove(os.path.join(save_folder, file))
if os.path.exists(aflow_data_save_file):
logger.info(f"Remove {aflow_data_save_file}")
os.remove(aflow_data_save_file)
class DataUtils:
def __init__(self, root_path: str):
self.root_path = root_path
self.top_scores = []
def load_results(self, path: str) -> list:
result_path = os.path.join(path, "results.json")
if os.path.exists(result_path):
with open(result_path, "r") as json_file:
try:
return json.load(json_file)
except json.JSONDecodeError:
return []
return []
def get_top_rounds(self, sample: int, path=None, mode="Graph"):
self._load_scores(path, mode)
unique_rounds = set()
unique_top_scores = []
first_round = next((item for item in self.top_scores if item["round"] == 0), None)
if first_round:
unique_top_scores.append(first_round)
unique_rounds.add(0)
for item in self.top_scores:
if item["round"] not in unique_rounds:
unique_top_scores.append(item)
unique_rounds.add(item["round"])
if len(unique_top_scores) >= sample:
break
return unique_top_scores
def select_round(self, items):
if not items:
raise ValueError("Item list is empty.")
sorted_items = sorted(items, key=lambda x: x["score"], reverse=True)
scores = [item["score"] * 100 for item in sorted_items]
probabilities = self._compute_probabilities(scores)
logger.info(f"\nMixed probability distribution: {probabilities}")
logger.info(f"\nSorted rounds: {sorted_items}")
selected_index = np.random.choice(len(sorted_items), p=probabilities)
logger.info(f"\nSelected index: {selected_index}, Selected item: {sorted_items[selected_index]}")
return sorted_items[selected_index]
def _compute_probabilities(self, scores, alpha=0.2, lambda_=0.3):
scores = np.array(scores, dtype=np.float64)
n = len(scores)
if n == 0:
raise ValueError("Score list is empty.")
uniform_prob = np.full(n, 1.0 / n, dtype=np.float64)
max_score = np.max(scores)
shifted_scores = scores - max_score
exp_weights = np.exp(alpha * shifted_scores)
sum_exp_weights = np.sum(exp_weights)
if sum_exp_weights == 0:
raise ValueError("Sum of exponential weights is 0, cannot normalize.")
score_prob = exp_weights / sum_exp_weights
mixed_prob = lambda_ * uniform_prob + (1 - lambda_) * score_prob
total_prob = np.sum(mixed_prob)
if not np.isclose(total_prob, 1.0):
mixed_prob = mixed_prob / total_prob
return mixed_prob
def load_log(self, cur_round, path=None, mode: str = "Graph"):
if mode == "Graph":
log_dir = os.path.join(self.root_path, f"round_{cur_round}", "log.json")
else:
log_dir = path
if not os.path.exists(log_dir):
return ""
logger.info(log_dir)
# data = read_json_file(log_dir, encoding="utf-8")
data = load_json(log_dir, type="json")
if isinstance(data, dict):
data = [data]
elif not isinstance(data, list):
data = list(data)
if not data:
return ""
sample_size = min(3, len(data))
random_samples = random.sample(data, sample_size)
log = ""
for sample in random_samples:
log += json.dumps(sample, indent=4, ensure_ascii=False) + "\n\n"
return log
def get_results_file_path(self, graph_path: str) -> str:
return os.path.join(graph_path, "results.json")
def create_result_data(self, round: int, score: float, avg_cost: float, total_cost: float) -> dict:
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return {"round": round, "score": score, "avg_cost": avg_cost, "total_cost": total_cost, "time": now}
def save_results(self, json_file_path: str, data: list):
save_json(data, json_file_path, type="json", use_indent=True)
def _load_scores(self, path=None, mode="Graph"):
if mode == "Graph":
rounds_dir = self.root_path # os.path.join(self.root_path, "workflows")
else:
rounds_dir = path
result_file = os.path.join(rounds_dir, "results.json")
self.top_scores = []
data = load_json(result_file, type="json")
df = pd.DataFrame(data)
scores_per_round = df.groupby("round")["score"].mean().to_dict()
for round_number, average_score in scores_per_round.items():
self.top_scores.append({"round": round_number, "score": average_score})
self.top_scores.sort(key=lambda x: x["score"], reverse=True)
return self.top_scores
def test_case_2_test_function(solution: str, test_case: str, entry_point: str):
tester_function = f"""
{solution}
def check(candidate):
{test_case}
def test_check():
check({entry_point})
test_check()
"""
return tester_function
|