File size: 10,068 Bytes
cb2428f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import os
import shutil
import subprocess
import time
from typing import List

from swift.utils import get_device_count

# NOTE: this script supports at most 8 GPUS in a node, if using multi node, please use custom logic.

# Paste conda env
# conda_prefix = 'source /root/miniconda3/etc/profile.d/conda.sh && conda activate py311 && '
conda_prefix = ''


def do_sample(model: str, model_type: str, dataset: List[str], iter: int):
    device_count = get_device_count()
    handlers = []
    datasets = []
    # Sampling cache, to avoid lmdeploy & PRM run at the same time
    # Why lmdeploy not vllm? we found that the responses generated by lmdeploy are more similar than ones of vllm.
    for device in range(device_count):
        sample_cmd = (f'{conda_prefix} USE_OPENCOMPASS_EVALUATOR=True CUDA_VISIBLE_DEVICES={device} swift sample '
                      f'--model {model} --model_type {model_type} '
                      f'--dataset {" ".join(dataset)} '
                      f'--data_range {device} {device_count} '
                      f'--max_length 2048 '
                      f'--system "You are a math model, you should **think step by step** carefully, '
                      f'and always consider the basic math principles to avoid making calculating mistakes.'
                      f'Give the final answer wrapped with \\boxed{{}}" '
                      f'--load_args false '
                      f'--sampler_engine vllm '
                      f'--max_new_tokens 768 '
                      f'--override_exist_file true '
                      f'--num_sampling_per_gpu_batch_size 1 '
                      f'--num_return_sequences 64 '
                      f'--cache_files sample_output/iter_{iter}_proc_{device}_cache.jsonl '
                      f'--output_file iter_{iter}_proc_{device}_cache.jsonl '
                      f'--top_p 1.0 '
                      f'--temperature 1.0 ')
        print(f'Sampling caches of iter {iter}, part {device}.', flush=True)
        env = os.environ.copy()
        env['CUDA_VISIBLE_DEVICES'] = str(device)
        handler = subprocess.Popen(
            f'{sample_cmd}' + f' > logs/sample_iter_{iter}_proc_{device}_cache.log 2>&1',
            env=os.environ.copy(),
            shell=True,
            executable='/bin/bash')
        handlers.append(handler)

    for proc, handler in enumerate(handlers):
        handler.wait()
        assert os.path.exists(os.path.join('sample_output', f'iter_{iter}_proc_{proc}_cache.jsonl'))

    handlers = []
    # Sample again, this time to filter with ORM & PRM
    # Provide your PRM model or PRM name(add PRM in plugin/prm.py first)
    # You can define your custom PRM logic in the plugin
    # (like, split your steps, use the worst score/last score/avg score)
    for device in range(device_count):
        sample_cmd = (
            f'{conda_prefix} USE_OPENCOMPASS_EVALUATOR=True CUDA_VISIBLE_DEVICES={device} swift sample '
            f'--model {model} --model_type {model_type} '  # change to --resume_from_checkpoint to use the latest optimizer state # noqa
            f'--dataset {" ".join(dataset)} '
            f'--data_range {device} {device_count} '
            f'--max_length 2048 '
            f'--system "You are a math model, you should **think step by step** carefully, '
            f'and always consider the basic math principles to avoid making calculating mistakes.'
            f'Give the final answer wrapped with \\boxed{{}}" '
            f'--load_args false '
            f'--sampler_engine no '
            f'--orm_model math '  # math defines in plugin/orm.py
            f'--prm_model Qwen/Qwen2.5-Math-PRM-7B '
            f'--prm_threshold {min(0.7 + 0.1*iter, 0.9)} '
            f'--max_new_tokens 768 '
            f'--override_exist_file true '  # no not override the existing sample files
            f'--num_sampling_per_gpu_batch_size 1 '
            f'--num_return_sequences 64 '
            f'--output_file iter_{iter}_proc_{device}_sampling.jsonl '
            f'--cache_files sample_output/iter_{iter}_proc_{device}_cache.jsonl ')
        print(f'Sampling iter {iter}, part {device}.', flush=True)
        env = os.environ.copy()
        env['CUDA_VISIBLE_DEVICES'] = str(device)
        handler = subprocess.Popen(
            f'{sample_cmd}' + f' > logs/sample_iter_{iter}_proc_{device}.log 2>&1',
            env=os.environ.copy(),
            shell=True,
            executable='/bin/bash')
        handlers.append(handler)

    for proc, handler in enumerate(handlers):
        handler.wait()
        assert os.path.exists(os.path.join('sample_output', f'iter_{iter}_proc_{proc}_sampling.jsonl')), (
            f'{os.path.join("sample_output", f"iter_{iter}_proc_{proc}_sampling.jsonl")} not exists, '
            'please check the sample logs to get the detail error.')
        datasets.append(os.path.join('sample_output', f'iter_{iter}_proc_{proc}_sampling.jsonl'))
    print(f'Sampling done, files:{datasets}', flush=True)
    return datasets


def do_train(model: str, model_type: str, datasets: List[str], iter, cmd='sft'):
    gpu_prefix = ''
    ds_config = ''
    if get_device_count() > 1:
        gpu_prefix = f'NPROC_PER_NODE={get_device_count()} '
        ds_config = '--deepspeed zero3 '
    extra_args = ''
    if cmd == 'rlhf':
        extra_args = '--rlhf_type dpo --beta 0.3 '  # use another reinforce learning method supported by swift
    ga = 128 // get_device_count() // 2
    train_cmd = (f'{conda_prefix} {gpu_prefix} swift {cmd} '
                 f'--model {model} --model_type {model_type} '
                 f'--dataset {" ".join(datasets)} '
                 f'--max_length 2048 '
                 f'--num_train_epochs 1 '
                 f'--load_args false '
                 f'--train_type full '
                 f'{extra_args} '
                 f'--eval_strategy no '
                 f'--split_dataset_ratio 0 '
                 f'--per_device_train_batch_size 2 '
                 f'--gradient_accumulation_steps {ga} '
                 f'--save_steps 1 '
                 f'--save_strategy epoch '
                 f'{ds_config} '
                 f'--learning_rate 4e-6 ')

    print(f'Training iter {iter}.', flush=True)
    handler = subprocess.Popen(
        f'{train_cmd}' + f' > logs/train_iter_{iter}.log 2>&1',
        shell=True,
        env=os.environ.copy(),
        executable='/bin/bash')
    handler.wait()
    ckpt = None
    with open(f'logs/train_iter_{iter}.log', 'r') as f:
        for line in f.readlines():
            if 'last_model_checkpoint: ' in line:
                ckpt = line.split('last_model_checkpoint: ')[1]
                break
    assert ckpt is not None
    print(f'Training done, ckpt: {ckpt.strip()}.', flush=True)
    return ckpt.strip()


def do_eval(model, model_type: str, iter):
    eval_cmd = (
        f'{conda_prefix} swift eval '
        '--eval_dataset competition_math '  # eval another dataset
        '--infer_backend vllm --eval_limit 500 '
        f'--model {model} --model_type {model_type} '
        '--system "You are a math model, you should **think step by step** carefully, '
        'and always consider the basic math principles to avoid making calculating mistakes. '
        'Give the final answer wrapped with \\boxed{}"')
    print('Evaluating.', flush=True)
    # Replace the original dataset to the math.json, this is for test, comment this if not need
    replace_math_dataset()

    if iter is None:
        iter = 'origin'
    env = os.environ.copy()
    env['CUDA_VISIBLE_DEVICES'] = '0'
    handler = subprocess.Popen(
        f'{eval_cmd}' + f' > logs/eval_iter_{iter}.log 2>&1', shell=True, env=env, executable='/bin/bash')
    handler.wait()

    acc = None
    # | math | 393424 | accuracy | gen | 39.00 |
    with open(f'logs/eval_iter_{iter}.log', 'r') as f:
        for line in f.readlines():
            if 'Level 5' in line and 'AveragePass@1' in line:
                parts = [p for p in line.split('|') if p.strip()]
                acc = float(parts[-2])
                break

    print(f'Iter {iter} eval done with acc: {acc}.', flush=True)
    return acc


def replace_math_dataset():
    # Note: This may run failed because this is special for math test,
    # and one must run swift eval --eval_dataset math first to make sure opencompass has created
    # the folder.
    # You can use original math dataset either. just comment this call.
    user_dir = os.path.expanduser('~')
    if os.path.exists(os.path.join(user_dir, '.cache', 'opencompass', 'data', 'math', 'math.json')):
        os.remove(os.path.join(user_dir, '.cache', 'opencompass', 'data', 'math', 'math.json'))
    shutil.copy(
        os.path.join('examples', 'train', 'rft', 'math.json'),
        os.path.join(user_dir, '.cache', 'opencompass', 'data', 'math', 'math.json'))


def main():
    os.makedirs('logs', exist_ok=True)
    max_acc = 0.
    first_model = 'Qwen/Qwen2.5-Math-7B-Instruct'
    model_type = 'qwen2_5_math'

    if False:
        # eval the original model
        do_eval(first_model, None)

    model = first_model
    for i in range(5):
        ts = time.time()
        datasets = do_sample(model, model_type, ['tastelikefeet/competition_math'], i)
        # add custom data filter here, for example: length or diversity control
        print(f'do sample cost: {(time.time()-ts) / 60:.1f} minutes.', flush=True)
        ts = time.time()
        # if want to train the original dataset with datasets, add the original dataset here
        # if want to train the original model everytime, change to first_model
        ckpt = do_train(model, model_type, datasets, i)
        print(f'do train cost: {(time.time() - ts) / 60:.1f} minutes.', flush=True)
        ts = time.time()
        acc = do_eval(ckpt, model_type, i)
        print(f'do eval cost: {(time.time() - ts) / 60:.1f} minutes.', flush=True)
        if acc > max_acc:
            max_acc = acc
        model = ckpt
        print(f'acc: {acc}, upgrade model to : {model}', flush=True)


if __name__ == '__main__':
    main()