| import os
|
| import shutil
|
| import subprocess
|
| import time
|
| from collections import deque
|
| from copy import deepcopy
|
| from dataclasses import asdict, dataclass, field
|
| from typing import Any, Dict, List
|
|
|
| import json
|
|
|
| from swift.arguments import ExportArguments
|
| from swift.utils import find_free_port, get_device_count, get_logger
|
|
|
| logger = get_logger()
|
|
|
|
|
| @dataclass
|
| class Experiment:
|
|
|
| name: str
|
|
|
| cmd: str
|
|
|
| group: str
|
|
|
| requirements: Dict = field(default_factory=dict)
|
|
|
| eval_requirements: Dict = field(default_factory=dict)
|
|
|
| eval_dataset: List = field(default_factory=list)
|
|
|
| args: Dict = field(default_factory=dict)
|
|
|
| env: Dict = field(default_factory=dict)
|
|
|
| record: Dict = field(default_factory=dict)
|
|
|
| create_time: float = None
|
|
|
| runtime: Dict = field(default_factory=dict)
|
|
|
| input_args: Any = None
|
|
|
| do_eval = False
|
|
|
| def __init__(self,
|
| name,
|
| cmd,
|
| group,
|
| requirements=None,
|
| eval_requirements=None,
|
| eval_dataset=None,
|
| args=None,
|
| input_args=None,
|
| **kwargs):
|
| self.name = name
|
| self.cmd = cmd
|
| self.group = group
|
| self.requirements = requirements or {}
|
| self.args = args or {}
|
| self.record = {}
|
| self.env = {}
|
| self.runtime = {}
|
| self.input_args = input_args
|
| self.eval_requirements = eval_requirements or {}
|
| self.eval_dataset = eval_dataset or []
|
| if self.cmd == 'eval':
|
| self.do_eval = True
|
|
|
| def load(self, _json):
|
| self.name = _json['name']
|
| self.cmd = _json['cmd']
|
| self.requirements = _json['requirements']
|
| self.args = _json['args']
|
| self.record = _json['record']
|
| self.env = _json['env']
|
| self.create_time = _json['create_time']
|
|
|
| @property
|
| def priority(self):
|
| return self.requirements.get('gpu', 0)
|
|
|
| def to_dict(self):
|
| _dict = asdict(self)
|
| _dict.pop('runtime')
|
| _dict.pop('input_args')
|
| return _dict
|
|
|
|
|
| class ExpManager:
|
|
|
| RESULT_FILE = 'result.jsonl'
|
|
|
| def __init__(self):
|
| self.exps = []
|
|
|
| def assert_gpu_not_overlap(self):
|
| all_gpus = set()
|
| for exp in self.exps:
|
| gpus = exp.runtime['env']['CUDA_VISIBLE_DEVICES'].split(',')
|
| if all_gpus & set(gpus):
|
| raise ValueError(f'GPU overlap: {self.exps}!')
|
| all_gpus.update(gpus)
|
|
|
| def run(self, exp: Experiment):
|
| if os.path.exists(os.path.join(exp.input_args.save_dir, exp.name + '.json')):
|
| with open(os.path.join(exp.input_args.save_dir, exp.name + '.json'), 'r', encoding='utf-8') as f:
|
| _json = json.load(f)
|
| if exp.eval_dataset and 'eval_result' not in _json['record']:
|
| if not exp.do_eval:
|
| logger.info(f'Experiment {exp.name} need eval, load from file.')
|
| exp.load(_json)
|
| exp.do_eval = True
|
| else:
|
| logger.warn(f'Experiment {exp.name} already done, skip')
|
| return
|
|
|
| if exp.do_eval:
|
| runtime = self._build_eval_cmd(exp)
|
| exp.runtime = runtime
|
| envs = deepcopy(runtime.get('env', {}))
|
| envs.update(os.environ)
|
| logger.info(f'Running cmd: {runtime["running_cmd"]}, env: {runtime.get("env", {})}')
|
| os.makedirs('exp', exist_ok=True)
|
| log_file = os.path.join('exp', f'{exp.name}.eval.log')
|
| exp.handler = subprocess.Popen(runtime['running_cmd'] + f' > {log_file} 2>&1', env=envs, shell=True)
|
| self.exps.append(exp)
|
| self.assert_gpu_not_overlap()
|
| return
|
|
|
| if any([exp.name == e.name for e in self.exps]):
|
| raise ValueError(f'Why exp name duplicate? {exp.name}')
|
| elif exp.cmd == 'export' and any([exp.cmd == 'export' for exp in self.exps]):
|
| raise AssertionError('Cannot run parallel export task.')
|
| else:
|
| exp.create_time = time.time()
|
| runtime = self._build_cmd(exp)
|
| exp.runtime = runtime
|
| envs = deepcopy(runtime.get('env', {}))
|
| envs.update(os.environ)
|
| logger.info(f'Running cmd: {runtime["running_cmd"]}, env: {runtime.get("env", {})}')
|
| os.makedirs('exp', exist_ok=True)
|
| log_file = os.path.join('exp', f'{exp.name}.{exp.cmd}.log')
|
| exp.handler = subprocess.Popen(runtime['running_cmd'] + f' > {log_file} 2>&1', env=envs, shell=True)
|
| self.exps.append(exp)
|
| self.assert_gpu_not_overlap()
|
|
|
| def _build_eval_cmd(self, exp: Experiment):
|
| gpu = exp.eval_requirements.get('gpu', None)
|
| env = {}
|
| allocated = []
|
| if gpu:
|
| allocated = self._find_free_gpu(int(gpu))
|
| assert allocated, 'No free gpu for now!'
|
| allocated = [str(gpu) for gpu in allocated]
|
| env['CUDA_VISIBLE_DEVICES'] = ','.join(allocated)
|
|
|
| best_model_checkpoint = exp.record.get('best_model_checkpoint')
|
| eval_dataset = exp.eval_dataset
|
| if best_model_checkpoint is not None:
|
| if not os.path.exists(os.path.join(best_model_checkpoint, 'args.json')):
|
| cmd = f'swift eval --ckpt_dir {best_model_checkpoint} ' \
|
| + f'--infer_backend transformers --tuner_type full --eval_dataset {" ".join(eval_dataset)}'
|
| else:
|
| cmd = f'swift eval --model {exp.args.get("model")} --infer_backend transformers ' \
|
| f'--eval_dataset {" ".join(eval_dataset)}'
|
|
|
| return {
|
| 'running_cmd': cmd,
|
| 'gpu': allocated,
|
| 'env': env,
|
| }
|
|
|
| def _build_cmd(self, exp: Experiment):
|
| gpu = exp.requirements.get('gpu', None)
|
| env = {}
|
| allocated = []
|
| if gpu:
|
| allocated = self._find_free_gpu(int(gpu))
|
| assert allocated, 'No free gpu for now!'
|
| allocated = [str(gpu) for gpu in allocated]
|
| env['CUDA_VISIBLE_DEVICES'] = ','.join(allocated)
|
| if int(exp.requirements.get('ddp', 1)) > 1:
|
| env['NPROC_PER_NODE'] = exp.requirements.get('ddp')
|
| env['MASTER_PORT'] = str(find_free_port())
|
|
|
| if exp.cmd == 'sft':
|
| from swift import SftArguments
|
| args = exp.args
|
| sft_args = SftArguments(**args)
|
| args['output_dir'] = sft_args.output_dir
|
| args['logging_dir'] = sft_args.logging_dir
|
| args['add_version'] = False
|
| os.makedirs(sft_args.output_dir, exist_ok=True)
|
| os.makedirs(sft_args.logging_dir, exist_ok=True)
|
| cmd = 'swift sft '
|
| for key, value in args.items():
|
| cmd += f' --{key} {value}'
|
| elif exp.cmd == 'rlhf':
|
| from swift import RLHFArguments
|
| args = exp.args
|
| rlhf_args = RLHFArguments(**args)
|
| args['output_dir'] = rlhf_args.output_dir
|
| args['logging_dir'] = rlhf_args.logging_dir
|
| args['add_version'] = False
|
| os.makedirs(rlhf_args.output_dir, exist_ok=True)
|
| os.makedirs(rlhf_args.logging_dir, exist_ok=True)
|
| cmd = 'swift rlhf '
|
| for key, value in args.items():
|
| cmd += f' --{key} {value}'
|
| elif exp.cmd == 'export':
|
| args = exp.args
|
| cmd = 'swift export '
|
| for key, value in args.items():
|
| cmd += f' --{key} {value}'
|
| else:
|
| raise ValueError(f'Unsupported cmd type: {exp.cmd}')
|
| return {
|
| 'running_cmd': cmd,
|
| 'gpu': allocated,
|
| 'env': env,
|
| 'logging_dir': args.get('logging_dir'),
|
| 'output_dir': args.get('output_dir', args.get('ckpt_dir'))
|
| }
|
|
|
| def _find_free_gpu(self, n):
|
| all_gpus = set()
|
| for exp in self.exps:
|
| all_gpus.update(exp.runtime.get('gpu', set()))
|
| all_gpus = {int(g) for g in all_gpus}
|
| free_gpu = set(range(get_device_count())) - all_gpus
|
| if len(free_gpu) < n:
|
| return None
|
| return list(free_gpu)[:n]
|
|
|
| def prepare_experiments(self, args: Any):
|
| experiments = []
|
| for config_file in args.config:
|
| with open(config_file, 'r', encoding='utf-8') as f:
|
| group = os.path.basename(config_file)
|
| group = group[:-5]
|
| content = json.load(f)
|
| exps = content['experiment']
|
| for exp in exps:
|
| main_cfg = deepcopy(content)
|
| name = exp['name']
|
| cmd = main_cfg['cmd']
|
| run_args = main_cfg['args']
|
| env = main_cfg.get('env', {})
|
| requirements = main_cfg.get('requirements', {})
|
| eval_requirements = main_cfg.get('eval_requirements', {})
|
| eval_dataset = main_cfg.get('eval_dataset', {})
|
| if 'args' in exp:
|
| run_args.update(exp['args'])
|
| if 'requirements' in exp:
|
| requirements.update(exp['requirements'])
|
| if 'env' in exp:
|
| env.update(exp['env'])
|
| experiments.append(
|
| Experiment(
|
| group=group,
|
| name=name,
|
| cmd=cmd,
|
| args=run_args,
|
| env=env,
|
| requirements=requirements,
|
| eval_requirements=eval_requirements,
|
| eval_dataset=eval_dataset,
|
| input_args=args))
|
| return experiments
|
|
|
| @staticmethod
|
| def _get_metric(exp: Experiment):
|
| if exp.do_eval:
|
| if os.path.isfile(os.path.join('exp', f'{exp.name}.eval.log')):
|
| with open(os.path.join('exp', f'{exp.name}.eval.log'), 'r', encoding='utf-8') as f:
|
| for line in f.readlines():
|
| if 'Final report:' in line:
|
| return json.loads(line.split('Final report:')[1].replace('\'', '"'))
|
| elif exp.cmd == 'export':
|
| exp_args = ExportArguments(**exp.args)
|
| if exp_args.quant_bits > 0:
|
| if exp_args.ckpt_dir is None:
|
| path = f'{exp_args.model_type}-{exp_args.quant_method}-int{exp_args.quant_bits}'
|
| else:
|
| ckpt_dir, ckpt_name = os.path.split(exp_args.ckpt_dir)
|
| path = os.path.join(ckpt_dir, f'{ckpt_name}-{exp_args.quant_method}-int{exp_args.quant_bits}')
|
| else:
|
| ckpt_dir, ckpt_name = os.path.split(exp_args.ckpt_dir)
|
| path = os.path.join(ckpt_dir, f'{ckpt_name}-merged')
|
| if os.path.exists(path):
|
| shutil.rmtree(exp.name, ignore_errors=True)
|
| os.makedirs(exp.name, exist_ok=True)
|
| shutil.move(path, os.path.join(exp.name, path))
|
| return {
|
| 'best_model_checkpoint': os.path.join(exp.name, path),
|
| }
|
| else:
|
| logging_dir = exp.runtime.get('logging_dir')
|
| logging_file = os.path.join(logging_dir, '..', 'logging.jsonl')
|
| if os.path.isfile(logging_file):
|
| with open(logging_file, 'r', encoding='utf-8') as f:
|
| for line in f.readlines():
|
| if 'model_info' in line:
|
| return json.loads(line)
|
| return None
|
|
|
| @staticmethod
|
| def write_record(exp: Experiment):
|
| target_dir = exp.input_args.save_dir
|
| file = os.path.join(target_dir, exp.name + '.json')
|
| with open(file, 'w', encoding='utf-8') as f:
|
| f.write(json.dumps(exp.to_dict()) + '\n')
|
|
|
| def _poll(self):
|
| while True:
|
| time.sleep(5)
|
|
|
| has_finished = False
|
| for exp in self.exps:
|
| rt = exp.handler.poll()
|
| if rt is None:
|
| continue
|
|
|
| has_finished = True
|
| if rt == 0:
|
| if not exp.do_eval:
|
| all_metric = self._get_metric(exp)
|
| if all_metric:
|
| exp.record.update(all_metric)
|
| if exp.eval_dataset:
|
| exp.do_eval = True
|
| self.exp_queue.appendleft(exp)
|
| self.write_record(exp)
|
| else:
|
| logger.error(f'Running {exp.name} task, but no result found')
|
| else:
|
| all_metric = self._get_metric(exp)
|
| exp.record['eval_result'] = all_metric
|
| if all_metric:
|
| self.write_record(exp)
|
| else:
|
| logger.error(f'Running {exp.name} eval task, but no eval result found')
|
| logger.info(f'Running {exp.name} finished with return code: {rt}')
|
|
|
| if has_finished:
|
| self.exps = [exp for exp in self.exps if exp.handler.poll() is None]
|
| break
|
|
|
| def begin(self, args: Any):
|
| exps = self.prepare_experiments(args)
|
| logger.info(f'all exps: {exps}')
|
| exps.sort(key=lambda e: e.priority)
|
| self.exp_queue = deque()
|
| for exp in exps:
|
| self.exp_queue.append(exp)
|
|
|
| while len(self.exp_queue) or len(self.exps) > 0:
|
| while len(self.exp_queue):
|
| try:
|
| logger.info(f'Running exp: {self.exp_queue[0].name}')
|
| self.run(self.exp_queue[0])
|
| except Exception as e:
|
| if not isinstance(e, AssertionError):
|
| logger.error(f'Adding exp {self.exp_queue[0].name} error because of:')
|
| logger.error(e)
|
| self.exp_queue.popleft()
|
| else:
|
| logger.info(f'Adding exp {self.exp_queue[0].name} error because of:', str(e))
|
| if 'no free gpu' in str(e).lower():
|
| break
|
| else:
|
| continue
|
| else:
|
| self.exp_queue.popleft()
|
| self._poll()
|
| logger.info(f'Run task finished because of exp queue: {self.exp_queue} and exps: {self.exps}')
|
|
|
|
|
| def find_all_config(dir_or_file: str):
|
| if os.path.isfile(dir_or_file):
|
| return [dir_or_file]
|
| else:
|
| configs = []
|
| for dirpath, dirnames, filenames in os.walk(dir_or_file):
|
| for name in filenames:
|
| if name.endswith('.json') and 'ipynb' not in dirpath:
|
| configs.append(os.path.join(dirpath, name))
|
| return configs
|
|
|