JSCPPProgrammer's picture
Initial: GenSearcher workflow + FireRed /generate adapter + Gradio
80b7188 verified
import argparse
import concurrent.futures
import json
import os
import pandas as pd
from openai import OpenAI
from tqdm import tqdm
from rllm.data.utils import TestDataset, TrainDataset, fetch_live_code_bench_system_prompt, load_dataset
from rllm.rewards.code_reward import RewardCodeFn, extract_code_from_model
from rllm.rewards.reward_types import RewardConfig, RewardInput, RewardType
HUMANEVALPLUS_PROMPT = "Think step by step: please provide an efficient and self-contained Python script that solves the following problem in a markdown code block:\n\n"
def generate_response(client, prompt, model="o3-mini", reasoning_effort="low"):
# append the prompt to the messages
messages = [{"role": "user", "content": prompt}]
try:
response = client.chat.completions.create(
model=model,
reasoning_effort=reasoning_effort,
messages=messages,
)
except Exception as e:
print(f"Error generating response: {e}")
print(f"Prompt: {prompt}")
return None
return response.choices[0].message.content.strip()
def preload_data(dataset_name):
upper_ds_name = dataset_name.upper()
print(f"Loading dataset {upper_ds_name}...")
if upper_ds_name in TestDataset.Code.__members__:
ds = TestDataset.Code[upper_ds_name]
elif upper_ds_name in TrainDataset.Code.__members__:
ds = TrainDataset.Code[upper_ds_name]
else:
# throw error if dataset is not found
raise ValueError(f"Dataset {dataset_name} not found.")
dataset = load_dataset(ds)
return dataset
def generation_loop(client, dataset_name, model, reasoning_effort, output_dir, n=1, skip_rewards=False):
skip_generation = False
if not os.path.exists(os.path.join(output_dir, "responses.parquet")):
dataset = preload_data(dataset_name)
df = pd.json_normalize(dataset)
else:
print(f"Loading existing responses from {os.path.join(output_dir, 'responses.parquet')}")
df = pd.read_parquet(os.path.join(output_dir, "responses.parquet"))
dataset = df.to_dict(orient="records")
skip_generation = True
all_responses = []
all_scores = []
reward = RewardCodeFn(RewardConfig)
def process_item(args):
idx, item = args
prompt = item["problem"]
if dataset_name != "humanevalplus":
prompt = fetch_live_code_bench_system_prompt(prompt)
response_lst = []
scores_lst = []
for i in range(n):
if skip_generation:
response = item["responses"][i]
else:
if dataset_name == "humanevalplus":
prompt = HUMANEVALPLUS_PROMPT + prompt
response = generate_response(client, prompt, model=model, reasoning_effort=reasoning_effort)
if response and "def solve():" in response:
extracted_code = extract_code_from_model(response)
# check if extracted_code ends with solve()
if extracted_code and not extracted_code.endswith("solve()"):
extracted_code += "\nsolve()\n"
response = f"```python\n{extracted_code}```"
response_lst.append(response)
score = None
if not skip_rewards:
if dataset_name == "humanevalplus":
tests = item["tests"]
else:
tests = item["tests"].tolist() if not isinstance(item["tests"], list) else item["tests"]
input_obj = RewardInput(problem="", problem_type=RewardType.CODE, model_response=response, metadata=tests, data_source=dataset_name)
score = reward(input_obj).reward
scores_lst.append(score)
return idx, response_lst, scores_lst
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
results = list(tqdm(executor.map(process_item, enumerate(dataset)), total=len(dataset)))
for idx, response_lst, scores_lst in results:
all_responses.append((idx, response_lst))
all_scores.append((idx, scores_lst))
# order the lists by idx
all_responses = [x[1] for x in sorted(all_responses, key=lambda x: x[0])]
all_scores = [x[1] for x in sorted(all_scores, key=lambda x: x[0])]
# output the overall accuracy
# Calculate and display pass@1 and pass@n accuracy
if not skip_rewards:
pass_at_1 = sum([1 for scores in all_scores if any(score > 0 for score in scores[:1])]) / len(all_scores)
pass_at_n = sum([1 for scores in all_scores if any(score > 0 for score in scores)]) / len(all_scores)
print(f"Pass@1: {pass_at_1:.4f}")
print(f"Pass@{n}: {pass_at_n:.4f}")
df["responses"] = all_responses
df["scores"] = all_scores
os.makedirs(output_dir, exist_ok=True)
df.to_parquet(os.path.join(output_dir, "responses.parquet"))
results_path = os.path.join(output_dir, "results.json")
with open(results_path, "w") as f:
json.dump(all_scores, f)
def main():
parser = argparse.ArgumentParser(description="Generate a response from the OpenAI reasoning model given a prompt.")
# arg for specifying dataset name
parser.add_argument("--dataset-name", type=str, required=True, help="Name of the dataset to use.")
# arg for specifying output dir
parser.add_argument("--output-dir", type=str, required=True, help="Output directory to save the results.")
args = parser.parse_args()
client = OpenAI()
try:
generation_loop(client, args.dataset_name, "o3-mini", "low", args.output_dir, n=1)
except Exception as e:
print(f"An error occurred: {e}")
# print stack trace
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()