import os import shutil import subprocess import time from collections import deque from copy import deepcopy from dataclasses import asdict, dataclass, field from typing import Any, Dict, List import json import torch from swift.llm import ExportArguments from swift.utils import find_free_port, get_device_count, get_logger logger = get_logger() @dataclass class Experiment: name: str cmd: str group: str requirements: Dict = field(default_factory=dict) eval_requirements: Dict = field(default_factory=dict) eval_dataset: List = field(default_factory=list) args: Dict = field(default_factory=dict) env: Dict = field(default_factory=dict) record: Dict = field(default_factory=dict) create_time: float = None runtime: Dict = field(default_factory=dict) input_args: Any = None do_eval = False def __init__(self, name, cmd, group, requirements=None, eval_requirements=None, eval_dataset=None, args=None, input_args=None, **kwargs): self.name = name self.cmd = cmd self.group = group self.requirements = requirements or {} self.args = args or {} self.record = {} self.env = {} self.runtime = {} self.input_args = input_args self.eval_requirements = eval_requirements or {} self.eval_dataset = eval_dataset or [] if self.cmd == 'eval': self.do_eval = True def load(self, _json): self.name = _json['name'] self.cmd = _json['cmd'] self.requirements = _json['requirements'] self.args = _json['args'] self.record = _json['record'] self.env = _json['env'] self.create_time = _json['create_time'] @property def priority(self): return self.requirements.get('gpu', 0) def to_dict(self): _dict = asdict(self) _dict.pop('runtime') _dict.pop('input_args') return _dict class ExpManager: RESULT_FILE = 'result.jsonl' def __init__(self): self.exps = [] def assert_gpu_not_overlap(self): all_gpus = set() for exp in self.exps: gpus = exp.runtime['env']['CUDA_VISIBLE_DEVICES'].split(',') if all_gpus & set(gpus): raise ValueError(f'GPU overlap: {self.exps}!') all_gpus.update(gpus) def run(self, exp: Experiment): if os.path.exists(os.path.join(exp.input_args.save_dir, exp.name + '.json')): with open(os.path.join(exp.input_args.save_dir, exp.name + '.json'), 'r', encoding='utf-8') as f: _json = json.load(f) if exp.eval_dataset and 'eval_result' not in _json['record']: if not exp.do_eval: logger.info(f'Experiment {exp.name} need eval, load from file.') exp.load(_json) exp.do_eval = True else: logger.warn(f'Experiment {exp.name} already done, skip') return if exp.do_eval: runtime = self._build_eval_cmd(exp) exp.runtime = runtime envs = deepcopy(runtime.get('env', {})) envs.update(os.environ) logger.info(f'Running cmd: {runtime["running_cmd"]}, env: {runtime.get("env", {})}') os.makedirs('exp', exist_ok=True) log_file = os.path.join('exp', f'{exp.name}.eval.log') exp.handler = subprocess.Popen(runtime['running_cmd'] + f' > {log_file} 2>&1', env=envs, shell=True) self.exps.append(exp) self.assert_gpu_not_overlap() return if any([exp.name == e.name for e in self.exps]): raise ValueError(f'Why exp name duplicate? {exp.name}') elif exp.cmd == 'export' and any([exp.cmd == 'export' for exp in self.exps]): # noqa raise AssertionError('Cannot run parallel export task.') else: exp.create_time = time.time() runtime = self._build_cmd(exp) exp.runtime = runtime envs = deepcopy(runtime.get('env', {})) envs.update(os.environ) logger.info(f'Running cmd: {runtime["running_cmd"]}, env: {runtime.get("env", {})}') os.makedirs('exp', exist_ok=True) log_file = os.path.join('exp', f'{exp.name}.{exp.cmd}.log') exp.handler = subprocess.Popen(runtime['running_cmd'] + f' > {log_file} 2>&1', env=envs, shell=True) self.exps.append(exp) self.assert_gpu_not_overlap() def _build_eval_cmd(self, exp: Experiment): gpu = exp.eval_requirements.get('gpu', None) env = {} allocated = [] if gpu: allocated = self._find_free_gpu(int(gpu)) assert allocated, 'No free gpu for now!' allocated = [str(gpu) for gpu in allocated] env['CUDA_VISIBLE_DEVICES'] = ','.join(allocated) best_model_checkpoint = exp.record.get('best_model_checkpoint') eval_dataset = exp.eval_dataset if best_model_checkpoint is not None: if not os.path.exists(os.path.join(best_model_checkpoint, 'args.json')): cmd = f'swift eval --ckpt_dir {best_model_checkpoint} ' \ + f'--infer_backend pt --train_type full --eval_dataset {" ".join(eval_dataset)}' else: cmd = f'swift eval --model {exp.args.get("model")} --infer_backend pt ' \ f'--eval_dataset {" ".join(eval_dataset)}' return { 'running_cmd': cmd, 'gpu': allocated, 'env': env, } def _build_cmd(self, exp: Experiment): gpu = exp.requirements.get('gpu', None) env = {} allocated = [] if gpu: allocated = self._find_free_gpu(int(gpu)) assert allocated, 'No free gpu for now!' allocated = [str(gpu) for gpu in allocated] env['CUDA_VISIBLE_DEVICES'] = ','.join(allocated) if int(exp.requirements.get('ddp', 1)) > 1: env['NPROC_PER_NODE'] = exp.requirements.get('ddp') env['MASTER_PORT'] = str(find_free_port()) if exp.cmd == 'sft': from swift.llm import TrainArguments args = exp.args sft_args = TrainArguments(**args) args['output_dir'] = sft_args.output_dir args['logging_dir'] = sft_args.logging_dir args['add_version'] = False os.makedirs(sft_args.output_dir, exist_ok=True) os.makedirs(sft_args.logging_dir, exist_ok=True) cmd = 'swift sft ' for key, value in args.items(): cmd += f' --{key} {value}' elif exp.cmd == 'rlhf': from swift.llm import RLHFArguments args = exp.args rlhf_args = RLHFArguments(**args) args['output_dir'] = rlhf_args.output_dir args['logging_dir'] = rlhf_args.logging_dir args['add_version'] = False os.makedirs(rlhf_args.output_dir, exist_ok=True) os.makedirs(rlhf_args.logging_dir, exist_ok=True) cmd = 'swift rlhf ' for key, value in args.items(): cmd += f' --{key} {value}' elif exp.cmd == 'export': args = exp.args cmd = 'swift export ' for key, value in args.items(): cmd += f' --{key} {value}' else: raise ValueError(f'Unsupported cmd type: {exp.cmd}') return { 'running_cmd': cmd, 'gpu': allocated, 'env': env, 'logging_dir': args.get('logging_dir'), 'output_dir': args.get('output_dir', args.get('ckpt_dir')) } def _find_free_gpu(self, n): all_gpus = set() for exp in self.exps: all_gpus.update(exp.runtime.get('gpu', set())) all_gpus = {int(g) for g in all_gpus} free_gpu = set(range(get_device_count())) - all_gpus if len(free_gpu) < n: return None return list(free_gpu)[:n] def prepare_experiments(self, args: Any): experiments = [] for config_file in args.config: with open(config_file, 'r', encoding='utf-8') as f: group = os.path.basename(config_file) group = group[:-5] content = json.load(f) exps = content['experiment'] for exp in exps: main_cfg = deepcopy(content) name = exp['name'] cmd = main_cfg['cmd'] run_args = main_cfg['args'] env = main_cfg.get('env', {}) requirements = main_cfg.get('requirements', {}) eval_requirements = main_cfg.get('eval_requirements', {}) eval_dataset = main_cfg.get('eval_dataset', {}) if 'args' in exp: run_args.update(exp['args']) if 'requirements' in exp: requirements.update(exp['requirements']) if 'env' in exp: env.update(exp['env']) experiments.append( Experiment( group=group, name=name, cmd=cmd, args=run_args, env=env, requirements=requirements, eval_requirements=eval_requirements, eval_dataset=eval_dataset, input_args=args)) return experiments @staticmethod def _get_metric(exp: Experiment): if exp.do_eval: if os.path.isfile(os.path.join('exp', f'{exp.name}.eval.log')): with open(os.path.join('exp', f'{exp.name}.eval.log'), 'r', encoding='utf-8') as f: for line in f.readlines(): if 'Final report:' in line: return json.loads(line.split('Final report:')[1].replace('\'', '"')) elif exp.cmd == 'export': exp_args = ExportArguments(**exp.args) if exp_args.quant_bits > 0: if exp_args.ckpt_dir is None: path = f'{exp_args.model_type}-{exp_args.quant_method}-int{exp_args.quant_bits}' else: ckpt_dir, ckpt_name = os.path.split(exp_args.ckpt_dir) path = os.path.join(ckpt_dir, f'{ckpt_name}-{exp_args.quant_method}-int{exp_args.quant_bits}') else: ckpt_dir, ckpt_name = os.path.split(exp_args.ckpt_dir) path = os.path.join(ckpt_dir, f'{ckpt_name}-merged') if os.path.exists(path): shutil.rmtree(exp.name, ignore_errors=True) os.makedirs(exp.name, exist_ok=True) shutil.move(path, os.path.join(exp.name, path)) return { 'best_model_checkpoint': os.path.join(exp.name, path), } else: logging_dir = exp.runtime.get('logging_dir') logging_file = os.path.join(logging_dir, '..', 'logging.jsonl') if os.path.isfile(logging_file): with open(logging_file, 'r', encoding='utf-8') as f: for line in f.readlines(): if 'model_info' in line: return json.loads(line) return None @staticmethod def write_record(exp: Experiment): target_dir = exp.input_args.save_dir file = os.path.join(target_dir, exp.name + '.json') with open(file, 'w', encoding='utf-8') as f: f.write(json.dumps(exp.to_dict()) + '\n') def _poll(self): while True: time.sleep(5) has_finished = False for exp in self.exps: rt = exp.handler.poll() if rt is None: continue has_finished = True if rt == 0: if not exp.do_eval: all_metric = self._get_metric(exp) if all_metric: exp.record.update(all_metric) if exp.eval_dataset: exp.do_eval = True self.exp_queue.appendleft(exp) self.write_record(exp) else: logger.error(f'Running {exp.name} task, but no result found') else: all_metric = self._get_metric(exp) exp.record['eval_result'] = all_metric if all_metric: self.write_record(exp) else: logger.error(f'Running {exp.name} eval task, but no eval result found') logger.info(f'Running {exp.name} finished with return code: {rt}') if has_finished: self.exps = [exp for exp in self.exps if exp.handler.poll() is None] break def begin(self, args: Any): exps = self.prepare_experiments(args) logger.info(f'all exps: {exps}') exps.sort(key=lambda e: e.priority) self.exp_queue = deque() for exp in exps: self.exp_queue.append(exp) while len(self.exp_queue) or len(self.exps) > 0: while len(self.exp_queue): try: logger.info(f'Running exp: {self.exp_queue[0].name}') self.run(self.exp_queue[0]) except Exception as e: if not isinstance(e, AssertionError): logger.error(f'Adding exp {self.exp_queue[0].name} error because of:') logger.error(e) self.exp_queue.popleft() else: logger.info(f'Adding exp {self.exp_queue[0].name} error because of:', str(e)) if 'no free gpu' in str(e).lower(): break else: continue else: self.exp_queue.popleft() self._poll() logger.info(f'Run task finished because of exp queue: {self.exp_queue} and exps: {self.exps}') def find_all_config(dir_or_file: str): if os.path.isfile(dir_or_file): return [dir_or_file] else: configs = [] for dirpath, dirnames, filenames in os.walk(dir_or_file): for name in filenames: if name.endswith('.json') and 'ipynb' not in dirpath: configs.append(os.path.join(dirpath, name)) return configs