File size: 3,742 Bytes
80b7188
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import argparse
import json
import os

import pandas as pd
from datasets import load_dataset
from verl.utils.hdfs_io import copy, makedirs

import rllm
from rllm.agents.system_prompts import SWE_SYSTEM_PROMPT, SWE_USER_PROMPT

# Get the directory for rLLM repo (rllm.__file__)
RLLM_DIR = os.path.dirname(os.path.dirname(os.path.abspath(rllm.__file__)))

SWE_DATASETS = [
    "R2E-Gym/R2E-Gym-Subset",
    "R2E-Gym/R2E-Gym-Lite",
    "R2E-Gym/R2E-Gym-V1",
    "R2E-Gym/SWE-Bench-Lite",
    "R2E-Gym/SWE-Bench-Verified",
    "r2e-edits/SweSmith-RL-Dataset",
]


def main():
    parser = argparse.ArgumentParser(description="Generate trajectories using specified environment and policy.")
    parser.add_argument("--local_dir", default=os.path.join(RLLM_DIR, "data/swe"))
    parser.add_argument("--hdfs_dir", default=None)

    args = parser.parse_args()

    local_dir = os.path.expanduser(args.local_dir)
    os.makedirs(local_dir, exist_ok=True)

    hdfs_dir = args.hdfs_dir

    def make_map_fn():
        def process_fn(row):
            row_dict = dict(row)
            problem_statement = row_dict.get("problem_statement", "")
            return {
                "data_source": "swe",
                "prompt": [{"role": "system", "content": SWE_SYSTEM_PROMPT}, {"role": "user", "content": SWE_USER_PROMPT.format(problem_statement=problem_statement)}],
                "ability": "swe",
                "reward_model": {"style": "rule", "ground_truth": ""},
                "extra_info": json.dumps(row_dict),
            }

        return process_fn

    process_fn = make_map_fn()

    for dataset_name in SWE_DATASETS:
        print(f"Processing dataset: {dataset_name}")
        try:
            # Load the dataset dictionary (which contains splits like 'train' or 'test')
            dataset_splits = load_dataset(dataset_name)
        except Exception as e:
            print(f"Failed to load dataset {dataset_name}: {e}")
            continue

        output_name_base = dataset_name.split("/")[-1].replace("-", "_")  # Use underscore for consistency

        # Determine which split exists ('train' or 'test')
        if "train" in dataset_splits:
            split_name = "train"
            split_data = dataset_splits["train"]
        elif "test" in dataset_splits:
            split_name = "test"
            split_data = dataset_splits["test"]
        else:
            print(f"Skipping {dataset_name} as it contains neither 'train' nor 'test' split.")
            continue

        print(f"Using '{split_name}' split for {dataset_name}")

        # Process the data from the identified split
        processed_data = [process_fn(row) for row in split_data]

        # Create DataFrame and save to a single parquet file
        df = pd.DataFrame(processed_data)
        output_filepath = os.path.join(local_dir, f"{output_name_base}.parquet")
        df.to_parquet(output_filepath)
        print(f"Saved {len(df)} records from '{split_name}' split to {output_filepath}")

        # Copy to HDFS if needed
        if hdfs_dir is not None:
            hdfs_filepath = os.path.join(hdfs_dir, f"{output_name_base}.parquet")
            # Ensure HDFS directory exists before copying
            # Assuming makedirs handles potential race conditions or existing dirs
            try:
                makedirs(hdfs_dir)
                copy(src=output_filepath, dst=hdfs_filepath)
                print(f"Copied {output_name_base}.parquet to HDFS: {hdfs_filepath}")
            except Exception as e:
                print(f"Failed to copy {output_filepath} to HDFS {hdfs_filepath}: {e}")


if __name__ == "__main__":
    main()