OpenCompass/opencompass/partitioners/sub_size.py
bittersweet1999 68ca48496b
[Refactor] Reorganize subjective eval (#1284)
* fix pip version

* fix pip version

* reorganize subjective eval

* reorg sub

* reorg subeval

* reorg subeval

* update subjective doc

* reorg subeval

* reorg subeval
2024-07-05 22:11:37 +08:00

288 lines
12 KiB
Python

# flake8: noqa: E501
import copy
import math
import os.path as osp
from fnmatch import fnmatch
from typing import Dict, List, Optional, Tuple, Union
import mmengine
from mmengine.config import Config, ConfigDict
from opencompass.registry import PARTITIONERS
from opencompass.utils import (build_dataset_from_cfg, dataset_abbr_from_cfg,
get_infer_output_path)
from .sub_naive import (SubjectiveNaivePartitioner, get_model_combinations,
remove_already_tasks,
replicate_tasks_with_judge_models)
@PARTITIONERS.register_module()
class SubjectiveSizePartitioner(SubjectiveNaivePartitioner):
"""Task partitioner based on the size of the dataset (with some rough
expansion as an estimation of computational cost).
Args:
out_dir (str): The output directory of tasks.
max_task_size (int): The maximum size of a task.
gen_task_coef (int): The dataset cost measurement coefficient for
generation tasks.
strategy (str): The partition strategy. Supported strategies are:
'heuristic' and 'split'. Defaults to 'heuristic'.
heuristic: split large datasets into several tasks, merge small
datasets into one task.
split: split large datasets into several tasks only.
dataset_size_path (str): The path to the dataset size cache file.
keep_keys (list[str]): The keys to be kept from the experiment config
to the task config.
"""
def __init__(
self,
out_dir: str,
models: Optional[List[ConfigDict]] = [],
base_models: Optional[List[ConfigDict]] = [],
compare_models: Optional[List[ConfigDict]] = [],
judge_models: Optional[List[ConfigDict]] = [],
meta_judge_model: Optional[ConfigDict] = None,
model_pairs: Optional[List[Tuple]] = None,
max_task_size: int = 40000,
gen_task_coef: int = 20,
strategy: str = 'heuristic',
dataset_size_path: str = '.cache/dataset_size.json',
keep_keys: Optional[List[str]] = None,
):
super().__init__(
out_dir=out_dir,
keep_keys=keep_keys,
models=models,
base_models=base_models,
compare_models=compare_models,
judge_models=judge_models,
meta_judge_model=meta_judge_model,
model_pairs=model_pairs,
)
self.max_task_size = max_task_size
self.gen_task_coef = gen_task_coef
self.dataset_size_path = dataset_size_path
assert strategy in ('heuristic', 'split'), \
f'Unsupported partition strategy: {strategy}. '\
'Supported strategies are: `heuristic`, `split` .'
self.strategy = strategy
def partition(self,
models: List[ConfigDict],
datasets: List[ConfigDict],
work_dir: str,
out_dir: str,
add_cfg: Dict = {}) -> List[ConfigDict]:
"""Partition model-dataset pairs into tasks. Each task is defined as a
dict and will run independently as a unit. Its structure is as
follows:
.. code-block:: python
{
'models': [], # a list of model configs
'datasets': [[]], # a nested list of dataset configs, each
list corresponds to a model
'work_dir': '', # the work dir
**add_cfg # other keys to be kept in the config
}
Args:
models (List[ConfigDict]): A list of model configs.
datasets (List[ConfigDict]): A list of dataset configs.
work_dir (str): The work dir for the task.
out_dir (str): The full output path for the task, intended for
Partitioners to check whether the task is finished via the
existency of result file in this directory.
add_cfg (dict): Other common keys to be added in the task config,
used to share the same config among tasks. Defaults to {}.
Returns:
List[ConfigDict]: A list of tasks.
"""
models = self.models if self.models != [] else models
base_models, compare_models = self.base_models, self.compare_models
judge_models, meta_judge_model = self.judge_models, self.meta_judge_model
self.max_task_size *= len(datasets)
all_tasks = []
for dataset in datasets:
mode = dataset['mode']
infer_order = dataset.get('infer_order', None)
assert mode in ['singlescore', 'allpair', 'm2n', 'fixed']
assert infer_order in ['random', 'double', None]
if mode == 'singlescore':
temp_models = models
else:
temp_models = get_model_combinations(mode, models,
dataset['base_models'],
models)
model_dataset_combinations = [{
'models': temp_models,
'datasets': [dataset]
}]
tasks = []
for comb in model_dataset_combinations:
comb['datasets'] = sorted(comb['datasets'],
key=lambda x: self.get_cost(x),
reverse=True)
for model in comb['models']:
chunks = [] # elements: tuple(size, dataset_chunk)
for dataset in comb['datasets']:
filename = get_infer_output_path(
model, dataset, out_dir)
# skip the task if the task output exists
# if osp.exists(filename):
# continue
dataset_size = self.get_cost(dataset)
if dataset_size > self.max_task_size:
root, ext = osp.splitext(filename)
dataset_splits = self.split_dataset(dataset)
for i, dataset_split in enumerate(dataset_splits):
if not osp.exists(f'{root}_{i}{ext}'):
chunks.append(
(self.max_task_size, dataset_split))
else:
chunks.append((dataset_size, dataset))
if self.strategy == 'heuristic':
chunks = sorted(chunks,
key=lambda x: x[0],
reverse=True)
current_size, current_chunks = 0, []
for index in range(len(chunks)):
current_size += chunks[index][0]
current_chunks.append(chunks[index][1])
if index == len(
chunks) - 1 or current_size + chunks[
index + 1][0] > self.max_task_size:
tasks.append(
Config({
'models': [model],
'datasets': [current_chunks],
'work_dir': work_dir,
**add_cfg
}))
current_size, current_chunks = 0, []
elif self.strategy == 'split':
for _, dataset in chunks:
tasks.append(
Config({
'models': [model],
'datasets': [[dataset]],
'work_dir': work_dir,
**add_cfg
}))
tasks = replicate_tasks_with_judge_models(tasks, judge_models,
meta_judge_model)
tasks = remove_already_tasks(tasks, work_dir, meta_judge_model)
if isinstance(tasks, list) and len(tasks) != 0 and isinstance(
tasks[0], list):
# Refer to meta review judge
for task_stage in tasks:
for task in task_stage:
task['infer_order'] = infer_order
else:
# Refer to just have review judge
for task in tasks:
task['infer_order'] = infer_order
all_tasks += tasks
return all_tasks
@property
def dataset_size(self):
if not hasattr(self, '_dataset_size'):
if osp.exists(self.dataset_size_path):
self._dataset_size = mmengine.load(self.dataset_size_path)
else:
self._dataset_size = {}
return self._dataset_size
def split_dataset(self, dataset_cfg: ConfigDict) -> List[ConfigDict]:
"""Split dataset into several parts."""
dataset_size, num_repeats = self.get_cost(dataset_cfg,
get_raw_factors=True)
split_configs = []
abbr = dataset_abbr_from_cfg(dataset_cfg)
step = self.max_task_size // num_repeats
# evenly distribute the task
step = math.ceil(dataset_size / math.ceil(dataset_size / step))
for part, i in enumerate(range(0, dataset_size, step)):
cfg = copy.deepcopy(dataset_cfg)
cfg['abbr'] = abbr + f'_{part}'
test_range = cfg['reader_cfg'].get('test_range', '')
cfg['reader_cfg']['test_range'] = f'{test_range}[{i}:{i+step}]'
split_configs.append(cfg)
return split_configs
def get_factor(self, dataset: ConfigDict) -> int:
infer_cfg = dataset.infer_cfg
template = (infer_cfg.prompt_template.template if 'prompt_template'
in infer_cfg else infer_cfg.ice_template.template)
# If it's the Gen template, the dataset size will be multiplied by the
# self.gen_task_coef
factor = self.gen_task_coef
# If it's the PPL template, the dataset size will be multiplied by the
# number of labels
if isinstance(template, dict):
ctr = sum(key in template for key in ('begin', 'round', 'end'))
if ctr != len(template.keys()):
factor = len(template.keys())
dataset_abbr = dataset_abbr_from_cfg(dataset)
if any(
fnmatch(dataset_abbr, pattern)
for pattern in ('bbh*', 'gsm8k*', 'math*', 'strategyqa*',
'agieval-jec*', 'agieval-gaokao-mathcloze',
'agieval-math', '*professional_law')):
factor *= 10
return factor
def get_cost(self,
dataset: ConfigDict,
get_raw_factors: bool = False) -> Union[int, Tuple[int, int]]:
"""Get the computational cost of inferring on the dataset.
Args:
dataset (ConfigDict): The dataset config.
get_raw_factors (bool): If True, the raw factors of computational
cost will be returned.
Returns:
int or Tuple[int, int]: The size of the dataset. If get_raw_factors
is True, the number of repeats will also be returned.
"""
dataset_abbr = dataset_abbr_from_cfg(dataset)
test_range = dataset.reader_cfg.get('test_range', '')
factor = self.get_factor(dataset)
if dataset_abbr in self.dataset_size:
actual_size = eval('len(range(self.dataset_size[dataset_abbr])'
f'{test_range})')
if get_raw_factors:
return actual_size, factor
return factor * actual_size
dataset = build_dataset_from_cfg(dataset)
self.dataset_size[dataset_abbr] = len(dataset.test)
mmengine.mkdir_or_exist('.cache/')
mmengine.dump(self.dataset_size,
self.dataset_size_path,
indent=4,
ensure_ascii=False)
actual_size = eval('len(range(self.dataset_size[dataset_abbr])'
f'{test_range})')
if get_raw_factors:
return actual_size, factor
return factor * actual_size