[Feature] Add heuristic size partitioner (#63)

* [Feature] Add heuristic size partitioner

* update
This commit is contained in:
Leymore 2023-07-20 11:53:24 +08:00 committed by GitHub
parent eea8b04417
commit 3fe5ee096c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 12 deletions

View File

@ -57,3 +57,15 @@ Running method:
```bash
python tools/test_api_model.py [CONFIG_PATH] -n
```
## Prediction Merger
This tool can merge patitioned predictions.
Running method:
```bash
python tools/prediction_merger.py CONFIG_PATH [-w WORK_DIR]
```
- `-w`: Work path, default is `'./outputs/default'`.

View File

@ -66,3 +66,15 @@ python tools/case_analyzer.py CONFIG_PATH [-w WORK_DIR]
```bash
python tools/test_api_model.py [CONFIG_PATH] -n
```
## Prediction Merger
本工具可以合并由于 `partitioner` 而产生的分片推理结果。
运行方式:
```bash
python tools/prediction_merger.py CONFIG_PATH [-w WORK_DIR]
```
- `-w`:工作路径,默认为 `'./outputs/default'`

View File

@ -1,6 +1,7 @@
import copy
import math
import os.path as osp
from fnmatch import fnmatch
from typing import List, Tuple, Union
import mmengine
@ -134,6 +135,30 @@ class SizePartitioner(BasePartitioner):
split_configs.append(cfg)
return split_configs
def get_factor(self, dataset: ConfigDict) -> int:
infer_cfg = dataset.infer_cfg
template = (infer_cfg.prompt_template.template if 'prompt_template'
in infer_cfg else infer_cfg.ice_template.template)
# If it's the Gen template, the dataset size will be multiplied by the
# self.gen_task_coef
factor = self.gen_task_coef
# If it's the PPL template, the dataset size will be multiplied by the
# number of labels
if isinstance(template, dict):
ctr = sum(key in template for key in ('begin', 'round', 'end'))
if ctr != len(template.keys()):
factor = len(template.keys())
dataset_abbr = dataset_abbr_from_cfg(dataset)
if any(
fnmatch(dataset_abbr, pattern)
for pattern in ('bbh*', 'gsm8k*', 'math*', 'strategyqa*',
'agieval-jec*', 'agieval-gaokao-mathcloze',
'agieval-math')):
factor *= 10
return factor
def get_cost(self,
dataset: ConfigDict,
get_raw_factors: bool = False) -> Union[int, Tuple[int, int]]:
@ -150,19 +175,8 @@ class SizePartitioner(BasePartitioner):
"""
dataset_abbr = dataset_abbr_from_cfg(dataset)
# If it's the PPL template, the dataset size will be multiplied by the
# number of labels
infer_cfg = dataset.infer_cfg
test_range = dataset.reader_cfg.get('test_range', '')
template = (infer_cfg.prompt_template.template if 'prompt_template'
in infer_cfg else infer_cfg.ice_template.template)
# If it's the Gen template, the dataset size will be multiplied by the
# self.gen_task_coef
factor = self.gen_task_coef
if isinstance(template, dict):
ctr = sum(key in template for key in ('begin', 'round', 'end'))
if ctr != len(template.keys()):
factor = len(template.keys())
factor = self.get_factor(dataset)
if dataset_abbr in self.dataset_size:
actual_size = eval('len(range(self.dataset_size[dataset_abbr])'

100
tools/prediction_merger.py Normal file
View File

@ -0,0 +1,100 @@
import argparse
import copy
import json
import os.path as osp
import mmengine
from mmengine.config import Config, ConfigDict
from opencompass.utils import build_dataset_from_cfg, get_infer_output_path
def parse_args():
parser = argparse.ArgumentParser(description='Run an evaluation task')
parser.add_argument('config', help='Train config file path')
parser.add_argument('-w',
'--work-dir',
help='Work path, all the outputs will be '
'saved in this path, including the slurm logs, '
'the evaluation results, the summary results, etc.'
'If not specified, the work_dir will be set to '
'./outputs/default.',
default=None,
type=str)
args = parser.parse_args()
return args
class PredictionMerger:
""""""
def __init__(self, cfg: ConfigDict) -> None:
self.cfg = cfg
self.model_cfg = copy.deepcopy(self.cfg['model'])
self.dataset_cfg = copy.deepcopy(self.cfg['dataset'])
self.work_dir = self.cfg.get('work_dir')
def run(self):
filename = get_infer_output_path(
self.model_cfg, self.dataset_cfg,
osp.join(self.work_dir, 'predictions'))
root, ext = osp.splitext(filename)
partial_filename = root + '_0' + ext
if osp.exists(osp.realpath(filename)):
return
if not osp.exists(osp.realpath(partial_filename)):
print(f'{filename} not found')
return
# Load predictions
partial_filenames = []
if osp.exists(osp.realpath(filename)):
preds = mmengine.load(filename)
else:
preds, offset = {}, 0
i = 1
while osp.exists(osp.realpath(partial_filename)):
partial_filenames.append(osp.realpath(partial_filename))
_preds = mmengine.load(partial_filename)
partial_filename = root + f'_{i}' + ext
i += 1
for _o in range(len(_preds)):
preds[str(offset)] = _preds[str(_o)]
offset += 1
dataset = build_dataset_from_cfg(self.dataset_cfg)
if len(preds) != len(dataset.test):
print('length mismatch')
return
print(f'Merge {partial_filenames} to {filename}')
with open(filename, 'w', encoding='utf-8') as f:
json.dump(preds, f, indent=4, ensure_ascii=False)
def dispatch_tasks(cfg):
for model in cfg['models']:
for dataset in cfg['datasets']:
PredictionMerger({
'model': model,
'dataset': dataset,
'work_dir': cfg['work_dir']
}).run()
def main():
args = parse_args()
cfg = Config.fromfile(args.config)
# set work_dir
if args.work_dir is not None:
cfg['work_dir'] = args.work_dir
else:
cfg.setdefault('work_dir', './outputs/default')
dispatch_tasks(cfg)
if __name__ == '__main__':
main()