diff --git a/opencompass/runners/dlc.py b/opencompass/runners/dlc.py index 87c51a76..42b4b30e 100644 --- a/opencompass/runners/dlc.py +++ b/opencompass/runners/dlc.py @@ -86,65 +86,70 @@ class DLCRunner(BaseRunner): # Dump task config to file mmengine.mkdir_or_exist('tmp/') param_file = f'tmp/{os.getpid()}_params.py' - task_cfg.dump(param_file) + try: + task_cfg.dump(param_file) - # Build up DLC command - pwd = os.getcwd() - shell_cmd = (f'source {self.aliyun_cfg["bashrc_path"]}; ' - f'conda activate {self.aliyun_cfg["conda_env_name"]}; ' - f'cd {pwd}; ' - '{task_cmd}') + # Build up DLC command + pwd = os.getcwd() + shell_cmd = ( + f'source {self.aliyun_cfg["bashrc_path"]}; ' + f'conda activate {self.aliyun_cfg["conda_env_name"]}; ' + f'cd {pwd}; ' + '{task_cmd}') - tmpl = ('dlc create job' - f" --command '{shell_cmd}'" - f' --name {task_name[:512]}' - ' --kind BatchJob' - f" -c {self.aliyun_cfg['dlc_config_path']}" - f" --workspace_id {self.aliyun_cfg['workspace_id']}" - ' --worker_count 1' - f' --worker_cpu {max(num_gpus * 6, 8)}' - f' --worker_gpu {num_gpus}' - f' --worker_memory {max(num_gpus * 32, 48)}' - f" --worker_image {self.aliyun_cfg['worker_image']}" - ' --interactive') - get_cmd = partial(task.get_command, cfg_path=param_file, template=tmpl) - cmd = get_cmd() + tmpl = ('dlc create job' + f" --command '{shell_cmd}'" + f' --name {task_name[:512]}' + ' --kind BatchJob' + f" -c {self.aliyun_cfg['dlc_config_path']}" + f" --workspace_id {self.aliyun_cfg['workspace_id']}" + ' --worker_count 1' + f' --worker_cpu {max(num_gpus * 6, 8)}' + f' --worker_gpu {num_gpus}' + f' --worker_memory {max(num_gpus * 32, 48)}' + f" --worker_image {self.aliyun_cfg['worker_image']}" + ' --interactive') + get_cmd = partial(task.get_command, + cfg_path=param_file, + template=tmpl) + cmd = get_cmd() - logger = get_logger() - logger.debug(f'Running command: {cmd}') + logger = get_logger() + logger.debug(f'Running command: {cmd}') - # Run command with retry - if self.debug: - stdout = None - else: - out_path = task.get_log_path(file_extension='out') - mmengine.mkdir_or_exist(osp.split(out_path)[0]) - stdout = open(out_path, 'w', encoding='utf-8') + # Run command with retry + if self.debug: + stdout = None + else: + out_path = task.get_log_path(file_extension='out') + mmengine.mkdir_or_exist(osp.split(out_path)[0]) + stdout = open(out_path, 'w', encoding='utf-8') - if random_sleep: - time.sleep(random.randint(0, 10)) - result = subprocess.run(cmd, - shell=True, - text=True, - stdout=stdout, - stderr=stdout) - - retry = self.retry - output_paths = task.get_output_paths() - while self._job_failed(result.returncode, output_paths) and retry > 0: - retry -= 1 if random_sleep: time.sleep(random.randint(0, 10)) - # Re-generate command to refresh ports. - cmd = get_cmd() result = subprocess.run(cmd, shell=True, text=True, stdout=stdout, stderr=stdout) - # Clean up - os.remove(param_file) + retry = self.retry + output_paths = task.get_output_paths() + while self._job_failed(result.returncode, + output_paths) and retry > 0: + retry -= 1 + if random_sleep: + time.sleep(random.randint(0, 10)) + # Re-generate command to refresh ports. + cmd = get_cmd() + result = subprocess.run(cmd, + shell=True, + text=True, + stdout=stdout, + stderr=stdout) + finally: + # Clean up + os.remove(param_file) return task_name, result.returncode def _job_failed(self, return_code: int, output_paths: List[str]) -> bool: diff --git a/opencompass/runners/local.py b/opencompass/runners/local.py index 2fa86d46..2f9fed67 100644 --- a/opencompass/runners/local.py +++ b/opencompass/runners/local.py @@ -62,15 +62,17 @@ class LocalRunner(BaseRunner): # get cmd mmengine.mkdir_or_exist('tmp/') param_file = f'tmp/{os.getpid()}_params.py' - task.cfg.dump(param_file) - cmd = task.get_command(cfg_path=param_file, - template='{task_cmd}') - # run in subprocess if starts with torchrun etc. - if cmd.startswith('python'): - task.run() - else: - subprocess.run(cmd, shell=True, text=True) - os.remove(param_file) + try: + task.cfg.dump(param_file) + cmd = task.get_command(cfg_path=param_file, + template='{task_cmd}') + # run in subprocess if starts with torchrun etc. + if cmd.startswith('python'): + task.run() + else: + subprocess.run(cmd, shell=True, text=True) + finally: + os.remove(param_file) status.append((task_name, 0)) else: import torch @@ -141,31 +143,34 @@ class LocalRunner(BaseRunner): # Dump task config to file mmengine.mkdir_or_exist('tmp/') param_file = f'tmp/{os.getpid()}_{index}_params.py' - task.cfg.dump(param_file) + try: + task.cfg.dump(param_file) - # Build up slurm command - tmpl = 'CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids) - tmpl += ' {task_cmd}' - get_cmd = partial(task.get_command, cfg_path=param_file, template=tmpl) - cmd = get_cmd() + # Build up slurm command + tmpl = 'CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids) + tmpl += ' {task_cmd}' + get_cmd = partial(task.get_command, + cfg_path=param_file, + template=tmpl) + cmd = get_cmd() - logger = get_logger() - logger.debug(f'Running command: {cmd}') + logger = get_logger() + logger.debug(f'Running command: {cmd}') - # Run command - out_path = task.get_log_path(file_extension='out') - mmengine.mkdir_or_exist(osp.split(out_path)[0]) - stdout = open(out_path, 'w', encoding='utf-8') + # Run command + out_path = task.get_log_path(file_extension='out') + mmengine.mkdir_or_exist(osp.split(out_path)[0]) + stdout = open(out_path, 'w', encoding='utf-8') - result = subprocess.run(cmd, - shell=True, - text=True, - stdout=stdout, - stderr=stdout) + result = subprocess.run(cmd, + shell=True, + text=True, + stdout=stdout, + stderr=stdout) - if result.returncode != 0: - logger.warning(f'task {task_name} fail, see\n{out_path}') - - # Clean up - os.remove(param_file) + if result.returncode != 0: + logger.warning(f'task {task_name} fail, see\n{out_path}') + finally: + # Clean up + os.remove(param_file) return task_name, result.returncode diff --git a/opencompass/runners/slurm.py b/opencompass/runners/slurm.py index c6efb60c..5ceb5bad 100644 --- a/opencompass/runners/slurm.py +++ b/opencompass/runners/slurm.py @@ -91,60 +91,64 @@ class SlurmRunner(BaseRunner): # Dump task config to file mmengine.mkdir_or_exist('tmp/') param_file = f'tmp/{os.getpid()}_params.py' - task_cfg.dump(param_file) + try: + task_cfg.dump(param_file) - # Build up slurm command - tmpl = 'srun' - if self.partition: - tmpl += f' -p {self.partition}' - if self.quotatype: - tmpl += f' --quotatype={self.quotatype}' - if self.qos: - tmpl += f' --qos={self.qos}' - if num_gpus > 0: - tmpl += f' --gres=gpu:{num_gpus}' - tmpl += f" -N1 -J '{task_name[:512]}'" + ' {task_cmd}' - get_cmd = partial(task.get_command, cfg_path=param_file, template=tmpl) - cmd = get_cmd() + # Build up slurm command + tmpl = 'srun' + if self.partition: + tmpl += f' -p {self.partition}' + if self.quotatype: + tmpl += f' --quotatype={self.quotatype}' + if self.qos: + tmpl += f' --qos={self.qos}' + if num_gpus > 0: + tmpl += f' --gres=gpu:{num_gpus}' + tmpl += f" -N1 -J '{task_name[:512]}'" + ' {task_cmd}' + get_cmd = partial(task.get_command, + cfg_path=param_file, + template=tmpl) + cmd = get_cmd() - logger = get_logger() - logger.debug(f'Running command: {cmd}') + logger = get_logger() + logger.debug(f'Running command: {cmd}') - # Run command with retry - if self.debug: - stdout = None - else: - out_path = task.get_log_path(file_extension='out') - mmengine.mkdir_or_exist(osp.split(out_path)[0]) - stdout = open(out_path, 'w', encoding='utf-8') + # Run command with retry + if self.debug: + stdout = None + else: + out_path = task.get_log_path(file_extension='out') + mmengine.mkdir_or_exist(osp.split(out_path)[0]) + stdout = open(out_path, 'w', encoding='utf-8') - if random_sleep: - time.sleep(random.randint(0, 10)) - result = subprocess.run(cmd, - shell=True, - text=True, - stdout=stdout, - stderr=stdout) - - retry = self.retry - output_paths = task.get_output_paths() - while self._job_failed(result.returncode, output_paths) and retry > 0: - retry -= 1 if random_sleep: time.sleep(random.randint(0, 10)) - # Re-generate command to refresh ports. - cmd = get_cmd() result = subprocess.run(cmd, shell=True, text=True, stdout=stdout, stderr=stdout) - if result.returncode != 0 and not self.debug: - logger.warning(f'task {task_name} fail, see\n{out_path}') + retry = self.retry + output_paths = task.get_output_paths() + while self._job_failed(result.returncode, + output_paths) and retry > 0: + retry -= 1 + if random_sleep: + time.sleep(random.randint(0, 10)) + # Re-generate command to refresh ports. + cmd = get_cmd() + result = subprocess.run(cmd, + shell=True, + text=True, + stdout=stdout, + stderr=stdout) - # Clean up - os.remove(param_file) + if result.returncode != 0 and not self.debug: + logger.warning(f'task {task_name} fail, see\n{out_path}') + finally: + # Clean up + os.remove(param_file) return task_name, result.returncode def _job_failed(self, return_code: int, output_paths: List[str]) -> bool: diff --git a/opencompass/utils/run.py b/opencompass/utils/run.py index c1a3f76e..0d44cff9 100644 --- a/opencompass/utils/run.py +++ b/opencompass/utils/run.py @@ -3,7 +3,9 @@ from typing import List, Union import tabulate from mmengine.config import Config +from opencompass.partitioners import NaivePartitioner, SizePartitioner from opencompass.runners import DLCRunner, LocalRunner, SlurmRunner +from opencompass.tasks import OpenICLEvalTask, OpenICLInferTask from opencompass.utils import get_logger, match_files @@ -118,54 +120,61 @@ def exec_mm_infer_runner(tasks, args, cfg): runner(tasks) -def exec_infer_runner(tasks, args, cfg): - """execute infer runner according to args.""" - if args.slurm: - runner = SlurmRunner(dict(type='OpenICLInferTask'), - max_num_workers=args.max_num_workers, - partition=args.partition, - quotatype=args.quotatype, - qos=args.qos, - retry=args.retry, - debug=args.debug, - lark_bot_url=cfg['lark_bot_url']) - elif args.dlc: - runner = DLCRunner(dict(type='OpenICLInferTask'), - max_num_workers=args.max_num_workers, - aliyun_cfg=Config.fromfile(args.aliyun_cfg), - retry=args.retry, - debug=args.debug, - lark_bot_url=cfg['lark_bot_url']) - else: - runner = LocalRunner(task=dict(type='OpenICLInferTask'), - max_num_workers=args.max_num_workers, - max_workers_per_gpu=args.max_workers_per_gpu, - debug=args.debug, - lark_bot_url=cfg['lark_bot_url']) - runner(tasks) +def get_config_type(obj) -> str: + return f'{obj.__module__}.{obj.__name__}' -def exec_eval_runner(tasks, args, cfg): - """execute infer runner according to args.""" +def fill_infer_cfg(cfg, args): + new_cfg = dict(infer=dict( + partitioner=dict(type=get_config_type(SizePartitioner), + max_task_size=args.max_partition_size, + gen_task_coef=args.gen_task_coef), + runner=dict( + max_num_workers=args.max_num_workers, + debug=args.debug, + task=dict(type=get_config_type(OpenICLInferTask)), + lark_bot_url=cfg['lark_bot_url'], + )), ) if args.slurm: - runner = SlurmRunner(dict(type='OpenICLEvalTask'), - max_num_workers=args.max_num_workers, - partition=args.partition, - quotatype=args.quotatype, - qos=args.qos, - retry=args.retry, - debug=args.debug, - lark_bot_url=cfg['lark_bot_url']) + new_cfg['infer']['runner']['type'] = get_config_type(SlurmRunner) + new_cfg['infer']['runner']['partition'] = args.partition + new_cfg['infer']['runner']['quotatype'] = args.quotatype + new_cfg['infer']['runner']['qos'] = args.qos + new_cfg['infer']['runner']['retry'] = args.retry elif args.dlc: - runner = DLCRunner(dict(type='OpenICLEvalTask'), - max_num_workers=args.max_num_workers, - aliyun_cfg=Config.fromfile(args.aliyun_cfg), - retry=args.retry, - debug=args.debug, - lark_bot_url=cfg['lark_bot_url']) + new_cfg['infer']['runner']['type'] = get_config_type(DLCRunner) + new_cfg['infer']['runner']['aliyun_cfg'] = Config.fromfile( + args.aliyun_cfg) + new_cfg['infer']['runner']['retry'] = args.retry else: - runner = LocalRunner(task=dict(type='OpenICLEvalTask'), - max_num_workers=args.max_num_workers, - debug=args.debug, - lark_bot_url=cfg['lark_bot_url']) - runner(tasks) + new_cfg['infer']['runner']['type'] = get_config_type(LocalRunner) + new_cfg['infer']['runner'][ + 'max_workers_per_gpu'] = args.max_workers_per_gpu + cfg.merge_from_dict(new_cfg) + + +def fill_eval_cfg(cfg, args): + new_cfg = dict( + eval=dict(partitioner=dict(type=get_config_type(NaivePartitioner)), + runner=dict( + max_num_workers=args.max_num_workers, + debug=args.debug, + task=dict(type=get_config_type(OpenICLEvalTask)), + lark_bot_url=cfg['lark_bot_url'], + ))) + if args.slurm: + new_cfg['eval']['runner']['type'] = get_config_type(SlurmRunner) + new_cfg['eval']['runner']['partition'] = args.partition + new_cfg['eval']['runner']['quotatype'] = args.quotatype + new_cfg['eval']['runner']['qos'] = args.qos + new_cfg['eval']['runner']['retry'] = args.retry + elif args.dlc: + new_cfg['eval']['runner']['type'] = get_config_type(DLCRunner) + new_cfg['eval']['runner']['aliyun_cfg'] = Config.fromfile( + args.aliyun_cfg) + new_cfg['eval']['runner']['retry'] = args.retry + else: + new_cfg['eval']['runner']['type'] = get_config_type(LocalRunner) + new_cfg['eval']['runner'][ + 'max_workers_per_gpu'] = args.max_workers_per_gpu + cfg.merge_from_dict(new_cfg) diff --git a/run.py b/run.py index 565a6c9e..3c3c5d1c 100644 --- a/run.py +++ b/run.py @@ -6,13 +6,12 @@ from datetime import datetime from mmengine.config import Config, DictAction -from opencompass.partitioners import (MultimodalNaivePartitioner, - NaivePartitioner, SizePartitioner) +from opencompass.partitioners import MultimodalNaivePartitioner from opencompass.registry import PARTITIONERS, RUNNERS from opencompass.runners import SlurmRunner from opencompass.utils import LarkReporter, Summarizer, get_logger -from opencompass.utils.run import (exec_eval_runner, exec_infer_runner, - exec_mm_infer_runner, get_config_from_arg) +from opencompass.utils.run import (exec_mm_infer_runner, fill_eval_cfg, + fill_infer_cfg, get_config_from_arg) def parse_args(): @@ -245,39 +244,29 @@ def main(): tasks = partitioner(cfg) exec_mm_infer_runner(tasks, args, cfg) return - elif args.dlc or args.slurm or cfg.get('infer', None) is None: - # Use SizePartitioner to split into subtasks - partitioner = SizePartitioner( - osp.join(cfg['work_dir'], 'predictions/'), - max_task_size=args.max_partition_size, - gen_task_coef=args.gen_task_coef) - tasks = partitioner(cfg) - if args.dry_run: - return - # execute the infer subtasks - exec_infer_runner(tasks, args, cfg) - # If they have specified "infer" in config and haven't used --slurm - # or --dlc, just follow the config + + if args.dlc or args.slurm or cfg.get('infer', None) is None: + fill_infer_cfg(cfg, args) + + if args.partition is not None: + if RUNNERS.get(cfg.infer.runner.type) == SlurmRunner: + cfg.infer.runner.partition = args.partition + cfg.infer.runner.quotatype = args.quotatype else: - if args.partition is not None: - if RUNNERS.get(cfg.infer.runner.type) == SlurmRunner: - cfg.infer.runner.partition = args.partition - cfg.infer.runner.quotatype = args.quotatype - else: - logger.warning('SlurmRunner is not used, so the partition ' - 'argument is ignored.') - if args.debug: - cfg.infer.runner.debug = True - if args.lark: - cfg.infer.runner.lark_bot_url = cfg['lark_bot_url'] - cfg.infer.partitioner['out_dir'] = osp.join( - cfg['work_dir'], 'predictions/') - partitioner = PARTITIONERS.build(cfg.infer.partitioner) - tasks = partitioner(cfg) - if args.dry_run: - return - runner = RUNNERS.build(cfg.infer.runner) - runner(tasks) + logger.warning('SlurmRunner is not used, so the partition ' + 'argument is ignored.') + if args.debug: + cfg.infer.runner.debug = True + if args.lark: + cfg.infer.runner.lark_bot_url = cfg['lark_bot_url'] + cfg.infer.partitioner['out_dir'] = osp.join(cfg['work_dir'], + 'predictions/') + partitioner = PARTITIONERS.build(cfg.infer.partitioner) + tasks = partitioner(cfg) + if args.dry_run: + return + runner = RUNNERS.build(cfg.infer.runner) + runner(tasks) # evaluate if args.mode in ['all', 'eval']: @@ -289,37 +278,28 @@ def main(): 'also specified --slurm or --dlc. ' 'The "eval" configuration will be overridden by ' 'your runtime arguments.') + if args.dlc or args.slurm or cfg.get('eval', None) is None: - # Use NaivePartitioner,not split - partitioner = NaivePartitioner( - osp.join(cfg['work_dir'], 'results/')) - tasks = partitioner(cfg) - if args.dry_run: - return - # execute the eval tasks - exec_eval_runner(tasks, args, cfg) - # If they have specified "eval" in config and haven't used --slurm - # or --dlc, just follow the config - else: - if args.partition is not None: - if RUNNERS.get(cfg.infer.runner.type) == SlurmRunner: - cfg.eval.runner.partition = args.partition - cfg.eval.runner.quotatype = args.quotatype - else: - logger.warning('SlurmRunner is not used, so the partition ' - 'argument is ignored.') - if args.debug: - cfg.eval.runner.debug = True - if args.lark: - cfg.eval.runner.lark_bot_url = cfg['lark_bot_url'] - cfg.eval.partitioner['out_dir'] = osp.join(cfg['work_dir'], - 'results/') - partitioner = PARTITIONERS.build(cfg.eval.partitioner) - tasks = partitioner(cfg) - if args.dry_run: - return - runner = RUNNERS.build(cfg.eval.runner) - runner(tasks) + fill_eval_cfg(cfg, args) + + if args.partition is not None: + if RUNNERS.get(cfg.infer.runner.type) == SlurmRunner: + cfg.eval.runner.partition = args.partition + cfg.eval.runner.quotatype = args.quotatype + else: + logger.warning('SlurmRunner is not used, so the partition ' + 'argument is ignored.') + if args.debug: + cfg.eval.runner.debug = True + if args.lark: + cfg.eval.runner.lark_bot_url = cfg['lark_bot_url'] + cfg.eval.partitioner['out_dir'] = osp.join(cfg['work_dir'], 'results/') + partitioner = PARTITIONERS.build(cfg.eval.partitioner) + tasks = partitioner(cfg) + if args.dry_run: + return + runner = RUNNERS.build(cfg.eval.runner) + runner(tasks) # visualize if args.mode in ['all', 'eval', 'viz']: