From cd1bec5f2a8c8e394f43cd410267bd8bc56eeadc Mon Sep 17 00:00:00 2001 From: Tong Gao Date: Thu, 6 Jul 2023 11:58:37 +0800 Subject: [PATCH] Enhance run.py (#7) * Enhance run.py * update --- run.py | 153 ++++++++++++++++++++++++++++++--------------- tools/cfg_run.py | 158 ----------------------------------------------- 2 files changed, 104 insertions(+), 207 deletions(-) delete mode 100644 tools/cfg_run.py diff --git a/run.py b/run.py index 5f5999e0..081facdd 100644 --- a/run.py +++ b/run.py @@ -7,6 +7,7 @@ from datetime import datetime from mmengine.config import Config from opencompass.partitioners import NaivePartitioner, SizePartitioner +from opencompass.registry import PARTITIONERS, RUNNERS from opencompass.runners import DLCRunner, LocalRunner, SlurmRunner from opencompass.utils import LarkReporter, Summarizer, get_logger @@ -14,20 +15,21 @@ from opencompass.utils import LarkReporter, Summarizer, get_logger def parse_args(): parser = argparse.ArgumentParser(description='Run an evaluation task') parser.add_argument('config', help='Train config file path') - # add mutually exclusive args `--slurm` `--dlc`, default to local runner - luach_method = parser.add_mutually_exclusive_group() - luach_method.add_argument('--slurm', - action='store_true', - default=False, - help='Whether to use srun to launch tasks, if ' - 'True, `--partition(-p)` must be set. Defaults' - ' to False') - luach_method.add_argument('--dlc', - action='store_true', - default=False, - help='Whether to use dlc to launch tasks, if ' - 'True, `--aliyun-cfg` must be set. Defaults' - ' to False') + # add mutually exclusive args `--slurm` `--dlc`, defaults to local runner + # if "infer" or "eval" not specified + launch_method = parser.add_mutually_exclusive_group() + launch_method.add_argument('--slurm', + action='store_true', + default=False, + help='Whether to force tasks to run with srun. ' + 'If True, `--partition(-p)` must be set. ' + 'Defaults to False') + launch_method.add_argument('--dlc', + action='store_true', + default=False, + help='Whether to force tasks to run on dlc. If ' + 'True, `--aliyun-cfg` must be set. Defaults' + ' to False') # add general args parser.add_argument('--debug', help='Debug mode, in which scheduler will run tasks ' @@ -56,10 +58,11 @@ def parse_args(): 'also be a specific timestamp, e.g. 20230516_144254'), parser.add_argument('-w', '--work-dir', - help='Work path, all the outputs will be saved in ' - 'this path, including the slurm logs, the evaluation' - ' results, the summary results, etc. If not specified,' - ' the work_dir will be set to None', + help='Work path, all the outputs will be ' + 'saved in this path, including the slurm logs, ' + 'the evaluation results, the summary results, etc.' + 'If not specified, the work_dir will be set to ' + './outputs/default.', default=None, type=str) parser.add_argument('-l', @@ -68,21 +71,26 @@ def parse_args(): action='store_true', default=False) parser.add_argument('--max-partition-size', - help='The maximum size of a task.', + help='The maximum size of an infer task. Only ' + 'effective when "infer" is missing from the config.', type=int, default=2000), parser.add_argument( '--gen-task-coef', - help='The dataset cost measurement coefficient for generation tasks', + help='The dataset cost measurement coefficient for generation tasks, ' + 'Only effective when "infer" is missing from the config.', type=int, default=20) parser.add_argument('--max-num-workers', - help='Max number of workers to run in parallel.', + help='Max number of workers to run in parallel. ' + 'Will be overrideen by the "max_num_workers" argument ' + 'in the config.', type=int, default=32) parser.add_argument( '--retry', - help='Number of retries if the job failed when using slurm or dlc.', + help='Number of retries if the job failed when using slurm or dlc. ' + 'Will be overrideen by the "retry" argument in the config.', type=int, default=2) # set srun args @@ -97,14 +105,14 @@ def parse_args(): '--partition(-p) must be set if you want to use slurm') if args.dlc: assert os.path.exists(args.aliyun_cfg), ( - 'When luaching tasks using dlc, it needs to be configured' + 'When launching tasks using dlc, it needs to be configured ' 'in "~/.aliyun.cfg", or use "--aliyun-cfg $ALiYun-CFG_Path"' ' to specify a new path.') return args def parse_slurm_args(slurm_parser): - """these args are all for slurm launch.""" + """These args are all for slurm launch.""" slurm_parser.add_argument('-p', '--partition', help='Slurm partition name', @@ -113,12 +121,12 @@ def parse_slurm_args(slurm_parser): slurm_parser.add_argument('-q', '--quotatype', help='Slurm quota type', - default='auto', + default=None, type=str) def parse_dlc_args(dlc_parser): - """these args are all for dlc launch.""" + """These args are all for dlc launch.""" dlc_parser.add_argument('--aliyun-cfg', help='The config path for aliyun config', default='~/.aliyun.cfg', @@ -171,22 +179,71 @@ def main(): LarkReporter(cfg['lark_bot_url']).post(content) if args.mode in ['all', 'infer']: - # Use SizePartitioner to split into subtasks - partitioner = SizePartitioner(osp.join(cfg['work_dir'], - 'predictions/'), - max_task_size=args.max_partition_size, - gen_task_coef=args.gen_task_coef) - tasks = partitioner(cfg) - # execute the infer subtasks - exec_infer_runner(tasks, args, cfg) + if (args.dlc or args.slurm) and cfg.get('infer', None): + logger.warning('You have set "infer" in the config, but ' + 'also specified --slurm or --dlc. ' + 'The "infer" configuration will be overridden by ' + 'your runtime arguments.') + if args.dlc or args.slurm or cfg.get('infer', None) is None: + # Use SizePartitioner to split into subtasks + partitioner = SizePartitioner( + osp.join(cfg['work_dir'], 'predictions/'), + max_task_size=args.max_partition_size, + gen_task_coef=args.gen_task_coef) + tasks = partitioner(cfg) + # execute the infer subtasks + exec_infer_runner(tasks, args, cfg) + else: + if args.partition is not None: + if RUNNERS.get(cfg.infer.runner.type) == SlurmRunner: + cfg.infer.runner.partition = args.partition + cfg.infer.runner.quotatype = args.quotatype + else: + logger.warning('SlurmRunner is not used, so the partition ' + 'argument is ignored.') + if args.debug: + cfg.infer.runner.debug = True + if args.lark: + cfg.infer.runner.lark_bot_url = cfg['lark_bot_url'] + cfg.infer.partitioner['out_dir'] = osp.join( + cfg['work_dir'], 'predictions/') + partitioner = PARTITIONERS.build(cfg.infer.partitioner) + tasks = partitioner(cfg) + runner = RUNNERS.build(cfg.infer.runner) + runner(tasks) # evaluate if args.mode in ['all', 'eval']: - # Use NaivePartitioner,not split - partitioner = NaivePartitioner(osp.join(cfg['work_dir'], 'results/')) - tasks = partitioner(cfg) - # execute the eval tasks - exec_eval_runner(tasks, args, cfg) + if (args.dlc or args.slurm) and cfg.get('eval', None): + logger.warning('You have set "eval" in the config, but ' + 'also specified --slurm or --dlc. ' + 'The "eval" configuration will be overridden by ' + 'your runtime arguments.') + if args.dlc or args.slurm or cfg.get('eval', None) is None: + # Use NaivePartitioner,not split + partitioner = NaivePartitioner( + osp.join(cfg['work_dir'], 'results/')) + tasks = partitioner(cfg) + # execute the eval tasks + exec_eval_runner(tasks, args, cfg) + else: + if args.partition is not None: + if RUNNERS.get(cfg.infer.runner.type) == SlurmRunner: + cfg.eval.runner.partition = args.partition + cfg.eval.runner.quotatype = args.quotatype + else: + logger.warning('SlurmRunner is not used, so the partition ' + 'argument is ignored.') + if args.debug: + cfg.eval.runner.debug = True + if args.lark: + cfg.eval.runner.lark_bot_url = cfg['lark_bot_url'] + cfg.eval.partitioner['out_dir'] = osp.join(cfg['work_dir'], + 'results/') + partitioner = PARTITIONERS.build(cfg.eval.partitioner) + tasks = partitioner(cfg) + runner = RUNNERS.build(cfg.eval.runner) + runner(tasks) # visualize if args.mode in ['all', 'eval', 'viz']: @@ -212,11 +269,10 @@ def exec_infer_runner(tasks, args, cfg): debug=args.debug, lark_bot_url=cfg['lark_bot_url']) else: - runner = LocalRunner( - task=dict(type='OpenICLInferTask'), - max_num_workers = args.max_num_workers, - debug=args.debug, - lark_bot_url=cfg['lark_bot_url']) + runner = LocalRunner(task=dict(type='OpenICLInferTask'), + max_num_workers=args.max_num_workers, + debug=args.debug, + lark_bot_url=cfg['lark_bot_url']) runner(tasks) @@ -238,11 +294,10 @@ def exec_eval_runner(tasks, args, cfg): debug=args.debug, lark_bot_url=cfg['lark_bot_url']) else: - runner = LocalRunner( - task=dict(type='OpenICLEvalTask'), - max_num_workers = args.max_num_workers, - debug=args.debug, - lark_bot_url=cfg['lark_bot_url']) + runner = LocalRunner(task=dict(type='OpenICLEvalTask'), + max_num_workers=args.max_num_workers, + debug=args.debug, + lark_bot_url=cfg['lark_bot_url']) runner(tasks) diff --git a/tools/cfg_run.py b/tools/cfg_run.py deleted file mode 100644 index 9beab509..00000000 --- a/tools/cfg_run.py +++ /dev/null @@ -1,158 +0,0 @@ -import argparse -import getpass -import os -import os.path as osp -from datetime import datetime - -from mmengine.config import Config - -from opencompass.registry import PARTITIONERS, RUNNERS -from opencompass.runners import SlurmRunner -from opencompass.utils import LarkReporter, Summarizer, get_logger - - -def parse_args(): - parser = argparse.ArgumentParser(description='Run an evaluation task') - parser.add_argument('config', help='Train config file path') - parser.add_argument('-p', - '--partition', - help='Slurm partition name', - default=None, - type=str) - parser.add_argument('-q', - '--quotatype', - help='Slurm quota type', - default='auto', - type=str) - parser.add_argument('--debug', - help='Debug mode, in which scheduler will run tasks ' - 'in the single process, and output will not be ' - 'redirected to files', - action='store_true', - default=False) - parser.add_argument('-m', - '--mode', - help='Running mode. You can choose "infer" if you ' - 'only want the inference results, or "eval" if you ' - 'already have the results and want to evaluate them, ' - 'or "viz" if you want to visualize the results.', - choices=['all', 'infer', 'eval', 'viz'], - default='all', - type=str) - parser.add_argument('-r', - '--reuse', - nargs='?', - type=str, - const='latest', - help='Reuse previous outputs & results, and run any ' - 'missing jobs presented in the config. If its ' - 'argument is not specified, the latest results in ' - 'the work_dir will be reused. The argument should ' - 'also be a specific timestamp, e.g. 20230516_144254'), - parser.add_argument('-w', - '--work-dir', - help='Work path, all the outputs will be ' - 'saved in this path, including the slurm logs, ' - 'the evaluation results, the summary results, etc.' - 'If not specified, the work_dir will be set to ' - './outputs/default.', - default=None, - type=str) - parser.add_argument('-l', - '--lark', - help='Report the running status to lark bot', - action='store_true', - default=False) - args = parser.parse_args() - return args - - -def main(): - args = parse_args() - - # initialize logger - logger = get_logger(log_level='DEBUG' if args.debug else 'INFO') - - cfg = Config.fromfile(args.config) - if args.work_dir is not None: - cfg['work_dir'] = args.work_dir - else: - cfg.setdefault('work_dir', './outputs/default/') - - # cfg_time_str defaults to the current time - cfg_time_str = dir_time_str = datetime.now().strftime('%Y%m%d_%H%M%S') - if args.reuse: - if args.reuse == 'latest': - dirs = os.listdir(cfg.work_dir) - assert len(dirs) > 0, 'No previous results to reuse!' - dir_time_str = sorted(dirs)[-1] - else: - dir_time_str = args.reuse - logger.info(f'Reusing experiements from {dir_time_str}') - elif args.mode in ['eval', 'viz']: - raise ValueError('You must specify -r or --reuse when running in eval ' - 'or viz mode!') - # update "actual" work_dir - cfg['work_dir'] = osp.join(cfg.work_dir, dir_time_str) - os.makedirs(osp.join(cfg.work_dir, 'configs'), exist_ok=True) - # dump config - output_config_path = osp.join(cfg.work_dir, 'configs', - f'{cfg_time_str}.py') - cfg.dump(output_config_path) - # Config is intentally reloaded here to avoid initialized - # types cannot be serialized - cfg = Config.fromfile(output_config_path) - - # infer - if not args.lark: - cfg['lark_bot_url'] = None - elif cfg.get('lark_bot_url', None): - content = f'{getpass.getuser()} 的新任务已启动!' - LarkReporter(cfg['lark_bot_url']).post(content) - - if cfg.get('infer', None) is not None and args.mode in ['all', 'infer']: - if args.partition is not None: - if RUNNERS.get(cfg.infer.runner.type) == SlurmRunner: - cfg.infer.runner.partition = args.partition - cfg.infer.runner.quotatype = args.quotatype - else: - logger.warning('SlurmRunner is not used, so the partition ' - 'argument is ignored.') - if args.debug: - cfg.infer.runner.debug = True - if args.lark: - cfg.infer.runner.lark_bot_url = cfg['lark_bot_url'] - cfg.infer.partitioner['out_dir'] = osp.join(cfg['work_dir'], - 'predictions/') - partitioner = PARTITIONERS.build(cfg.infer.partitioner) - tasks = partitioner(cfg) - runner = RUNNERS.build(cfg.infer.runner) - runner(tasks) - - # evaluate - if cfg.get('eval', None) is not None and args.mode in ['all', 'eval']: - if args.partition is not None: - if RUNNERS.get(cfg.infer.runner.type) == SlurmRunner: - cfg.eval.runner.partition = args.partition - cfg.eval.runner.quotatype = args.quotatype - else: - logger.warning('SlurmRunner is not used, so the partition ' - 'argument is ignored.') - if args.debug: - cfg.eval.runner.debug = True - if args.lark: - cfg.eval.runner.lark_bot_url = cfg['lark_bot_url'] - cfg.eval.partitioner['out_dir'] = osp.join(cfg['work_dir'], 'results/') - partitioner = PARTITIONERS.build(cfg.eval.partitioner) - tasks = partitioner(cfg) - runner = RUNNERS.build(cfg.eval.runner) - runner(tasks) - - # visualize - if args.mode in ['all', 'eval', 'viz']: - summarizer = Summarizer(cfg) - summarizer.summarize(time_str=cfg_time_str) - - -if __name__ == '__main__': - main()