OpenCompass/opencompass/partitioners/size.py

228 lines
9.7 KiB
Python
Raw Normal View History

2023-07-05 10:33:12 +08:00
import copy
import math
import os.path as osp
from fnmatch import fnmatch
2023-10-27 20:31:22 +08:00
from typing import Dict, List, Optional, Tuple, Union
2023-07-05 10:33:12 +08:00
import mmengine
from mmengine.config import Config, ConfigDict
from opencompass.registry import PARTITIONERS
from opencompass.utils import (build_dataset_from_cfg, dataset_abbr_from_cfg,
get_infer_output_path)
from .base import BasePartitioner
@PARTITIONERS.register_module()
class SizePartitioner(BasePartitioner):
"""Task partitioner based on the size of the dataset (with some rough
expansion as an estimation of computational cost).
Args:
out_dir (str): The output directory of tasks.
max_task_size (int): The maximum size of a task.
gen_task_coef (int): The dataset cost measurement coefficient for
generation tasks.
2023-10-27 20:31:22 +08:00
strategy (str): The partition strategy. Supported strategies are:
'heuristic' and 'split'. Defaults to 'heuristic'.
heuristic: split large datasets into several tasks, merge small
datasets into one task.
split: split large datasets into several tasks only.
2023-07-05 10:33:12 +08:00
dataset_size_path (str): The path to the dataset size cache file.
keep_keys (list[str]): The keys to be kept from the experiment config
to the task config.
2023-07-05 10:33:12 +08:00
"""
def __init__(self,
out_dir: str,
max_task_size: int = 40000,
2023-07-05 10:33:12 +08:00
gen_task_coef: int = 20,
2023-10-27 20:31:22 +08:00
strategy: str = 'heuristic',
dataset_size_path: str = '.cache/dataset_size.json',
2023-10-27 20:31:22 +08:00
keep_keys: Optional[List[str]] = None):
super().__init__(out_dir=out_dir, keep_keys=keep_keys)
2023-07-05 10:33:12 +08:00
self.max_task_size = max_task_size
self.gen_task_coef = gen_task_coef
self.dataset_size_path = dataset_size_path
2023-10-27 20:31:22 +08:00
assert strategy in ('heuristic', 'split'), \
f'Unsupported partition strategy: {strategy}. '\
'Supported strategies are: `heuristic`, `split` .'
self.strategy = strategy
2023-07-05 10:33:12 +08:00
def partition(self,
2023-12-11 17:42:53 +08:00
model_dataset_combinations: List[Dict[str,
List[ConfigDict]]],
work_dir: str,
out_dir: str,
add_cfg: Dict = {}) -> List[ConfigDict]:
2023-07-05 10:33:12 +08:00
"""Partition model-dataset pairs into tasks. Each task is defined as a
dict and will run independently as a unit. Its structure is as
follows:
.. code-block:: python
{
'models': [], # a list of model configs
'datasets': [[]], # a nested list of dataset configs, each
list corresponds to a model
'work_dir': '', # the work dir
**add_cfg # other keys to be kept in the config
2023-07-05 10:33:12 +08:00
}
Args:
2023-12-11 17:42:53 +08:00
model_dataset_combinations (List[Dict]): List of
`{models: [...], datasets: [...]}` dicts. Each dict contains
a list of model configs and a list of dataset configs.
2023-07-05 10:33:12 +08:00
work_dir (str): The work dir for the task.
out_dir (str): The full output path for the task, intended for
Partitioners to check whether the task is finished via the
existency of result file in this directory.
add_cfg (dict): Other common keys to be added in the task config,
used to share the same config among tasks. Defaults to {}.
2023-07-05 10:33:12 +08:00
Returns:
List[ConfigDict]: A list of tasks.
"""
tasks = []
2023-12-11 17:42:53 +08:00
for comb in model_dataset_combinations:
comb['datasets'] = sorted(comb['datasets'],
key=lambda x: self.get_cost(x),
reverse=True)
for model in comb['models']:
chunks = [] # elements: tuple(size, dataset_chunk)
for dataset in comb['datasets']:
filename = get_infer_output_path(model, dataset, out_dir)
# skip the task if the task output exists
if osp.exists(filename):
continue
dataset_size = self.get_cost(dataset)
if dataset_size > self.max_task_size:
root, ext = osp.splitext(filename)
dataset_splits = self.split_dataset(dataset)
for i, dataset_split in enumerate(dataset_splits):
if not osp.exists(f'{root}_{i}{ext}'):
chunks.append(
(self.max_task_size, dataset_split))
else:
chunks.append((dataset_size, dataset))
if self.strategy == 'heuristic':
chunks = sorted(chunks, key=lambda x: x[0], reverse=True)
current_size, current_chunks = 0, []
for index in range(len(chunks)):
current_size += chunks[index][0]
current_chunks.append(chunks[index][1])
if index == len(chunks) - 1 or current_size + chunks[
index + 1][0] > self.max_task_size:
tasks.append(
Config({
'models': [model],
'datasets': [current_chunks],
'work_dir': work_dir,
**add_cfg
}))
current_size, current_chunks = 0, []
elif self.strategy == 'split':
for _, dataset in chunks:
2023-10-27 20:31:22 +08:00
tasks.append(
Config({
'models': [model],
2023-12-11 17:42:53 +08:00
'datasets': [[dataset]],
2023-10-27 20:31:22 +08:00
'work_dir': work_dir,
**add_cfg
}))
2023-07-05 10:33:12 +08:00
return tasks
@property
def dataset_size(self):
if not hasattr(self, '_dataset_size'):
if osp.exists(self.dataset_size_path):
self._dataset_size = mmengine.load(self.dataset_size_path)
else:
self._dataset_size = {}
return self._dataset_size
def split_dataset(self, dataset_cfg: ConfigDict) -> List[ConfigDict]:
"""Split dataset into several parts."""
dataset_size, num_repeats = self.get_cost(dataset_cfg,
get_raw_factors=True)
split_configs = []
abbr = dataset_abbr_from_cfg(dataset_cfg)
step = self.max_task_size // num_repeats
# evenly distribute the task
step = math.ceil(dataset_size / math.ceil(dataset_size / step))
for part, i in enumerate(range(0, dataset_size, step)):
cfg = copy.deepcopy(dataset_cfg)
cfg['abbr'] = abbr + f'_{part}'
test_range = cfg['reader_cfg'].get('test_range', '')
cfg['reader_cfg']['test_range'] = f'{test_range}[{i}:{i+step}]'
split_configs.append(cfg)
return split_configs
def get_factor(self, dataset: ConfigDict) -> int:
infer_cfg = dataset.infer_cfg
template = (infer_cfg.prompt_template.template if 'prompt_template'
in infer_cfg else infer_cfg.ice_template.template)
# If it's the Gen template, the dataset size will be multiplied by the
# self.gen_task_coef
factor = self.gen_task_coef
# If it's the PPL template, the dataset size will be multiplied by the
# number of labels
if isinstance(template, dict):
ctr = sum(key in template for key in ('begin', 'round', 'end'))
if ctr != len(template.keys()):
factor = len(template.keys())
dataset_abbr = dataset_abbr_from_cfg(dataset)
if any(
fnmatch(dataset_abbr, pattern)
for pattern in ('bbh*', 'gsm8k*', 'math*', 'strategyqa*',
'agieval-jec*', 'agieval-gaokao-mathcloze',
'agieval-math', '*professional_law')):
factor *= 10
return factor
2023-07-05 10:33:12 +08:00
def get_cost(self,
dataset: ConfigDict,
get_raw_factors: bool = False) -> Union[int, Tuple[int, int]]:
"""Get the computational cost of inferring on the dataset.
Args:
dataset (ConfigDict): The dataset config.
get_raw_factors (bool): If True, the raw factors of computational
cost will be returned.
Returns:
int or Tuple[int, int]: The size of the dataset. If get_raw_factors
is True, the number of repeats will also be returned.
"""
dataset_abbr = dataset_abbr_from_cfg(dataset)
test_range = dataset.reader_cfg.get('test_range', '')
factor = self.get_factor(dataset)
2023-07-05 10:33:12 +08:00
if dataset_abbr in self.dataset_size:
actual_size = eval('len(range(self.dataset_size[dataset_abbr])'
f'{test_range})')
if get_raw_factors:
return actual_size, factor
return factor * actual_size
dataset = build_dataset_from_cfg(dataset)
self.dataset_size[dataset_abbr] = len(dataset.test)
mmengine.mkdir_or_exist('.cache/')
mmengine.dump(self.dataset_size,
self.dataset_size_path,
indent=4,
ensure_ascii=False)
actual_size = eval('len(range(self.dataset_size[dataset_abbr])'
f'{test_range})')
if get_raw_factors:
return actual_size, factor
return factor * actual_size