JSCPPProgrammer's picture
Initial: GenSearcher workflow + FireRed /generate adapter + Gradio
80b7188 verified
import argparse
import json
import os
import pandas as pd
from datasets import load_dataset
from verl.utils.hdfs_io import copy, makedirs
import rllm
from rllm.agents.system_prompts import SWE_SYSTEM_PROMPT, SWE_USER_PROMPT
# Get the directory for rLLM repo (rllm.__file__)
RLLM_DIR = os.path.dirname(os.path.dirname(os.path.abspath(rllm.__file__)))
SWE_DATASETS = [
"R2E-Gym/R2E-Gym-Subset",
"R2E-Gym/R2E-Gym-Lite",
"R2E-Gym/R2E-Gym-V1",
"R2E-Gym/SWE-Bench-Lite",
"R2E-Gym/SWE-Bench-Verified",
"r2e-edits/SweSmith-RL-Dataset",
]
def main():
parser = argparse.ArgumentParser(description="Generate trajectories using specified environment and policy.")
parser.add_argument("--local_dir", default=os.path.join(RLLM_DIR, "data/swe"))
parser.add_argument("--hdfs_dir", default=None)
args = parser.parse_args()
local_dir = os.path.expanduser(args.local_dir)
os.makedirs(local_dir, exist_ok=True)
hdfs_dir = args.hdfs_dir
def make_map_fn():
def process_fn(row):
row_dict = dict(row)
problem_statement = row_dict.get("problem_statement", "")
return {
"data_source": "swe",
"prompt": [{"role": "system", "content": SWE_SYSTEM_PROMPT}, {"role": "user", "content": SWE_USER_PROMPT.format(problem_statement=problem_statement)}],
"ability": "swe",
"reward_model": {"style": "rule", "ground_truth": ""},
"extra_info": json.dumps(row_dict),
}
return process_fn
process_fn = make_map_fn()
for dataset_name in SWE_DATASETS:
print(f"Processing dataset: {dataset_name}")
try:
# Load the dataset dictionary (which contains splits like 'train' or 'test')
dataset_splits = load_dataset(dataset_name)
except Exception as e:
print(f"Failed to load dataset {dataset_name}: {e}")
continue
output_name_base = dataset_name.split("/")[-1].replace("-", "_") # Use underscore for consistency
# Determine which split exists ('train' or 'test')
if "train" in dataset_splits:
split_name = "train"
split_data = dataset_splits["train"]
elif "test" in dataset_splits:
split_name = "test"
split_data = dataset_splits["test"]
else:
print(f"Skipping {dataset_name} as it contains neither 'train' nor 'test' split.")
continue
print(f"Using '{split_name}' split for {dataset_name}")
# Process the data from the identified split
processed_data = [process_fn(row) for row in split_data]
# Create DataFrame and save to a single parquet file
df = pd.DataFrame(processed_data)
output_filepath = os.path.join(local_dir, f"{output_name_base}.parquet")
df.to_parquet(output_filepath)
print(f"Saved {len(df)} records from '{split_name}' split to {output_filepath}")
# Copy to HDFS if needed
if hdfs_dir is not None:
hdfs_filepath = os.path.join(hdfs_dir, f"{output_name_base}.parquet")
# Ensure HDFS directory exists before copying
# Assuming makedirs handles potential race conditions or existing dirs
try:
makedirs(hdfs_dir)
copy(src=output_filepath, dst=hdfs_filepath)
print(f"Copied {output_name_base}.parquet to HDFS: {hdfs_filepath}")
except Exception as e:
print(f"Failed to copy {output_filepath} to HDFS {hdfs_filepath}: {e}")
if __name__ == "__main__":
main()