mirror of
https://github.com/open-compass/opencompass.git
synced 2025-05-30 16:03:24 +08:00
118 lines
4.1 KiB
Python
118 lines
4.1 KiB
Python
# flake8: noqa: E501
|
|
import re
|
|
from collections import defaultdict
|
|
|
|
from datasets import Dataset
|
|
|
|
from opencompass.registry import DICT_POSTPROCESSORS, LOAD_DATASET
|
|
|
|
from .subjective_cmp import SubjectiveCmpDataset
|
|
from .utils import get_judgeanswer_and_reference
|
|
|
|
|
|
@LOAD_DATASET.register_module()
|
|
class CompassArenaDataset(SubjectiveCmpDataset):
|
|
|
|
def load(self, path: str, name: str, *args, **kwargs):
|
|
dataset = list(super().load(path, name))
|
|
creation_dataset = []
|
|
for data in dataset:
|
|
if 'reference' in data['others']:
|
|
if data['others']['reference'] is not None:
|
|
data['ref'] = data['others']['reference']
|
|
else:
|
|
data['ref'] = '满足用户需求,言之有理即可'
|
|
else:
|
|
data['ref'] = '满足用户需求,言之有理即可'
|
|
creation_dataset.append(data)
|
|
dataset = Dataset.from_list(creation_dataset)
|
|
return dataset
|
|
|
|
|
|
def check_position_bias(judged_answers, references, banned_choice=['C']):
|
|
"""Check position bias for judgellm's judgement.
|
|
|
|
Args:
|
|
judged_answers: The successfully extracted judgement.
|
|
references: The references contains original question, which is used to located the same question for different position judgement.
|
|
"""
|
|
position_bias_flag = 0
|
|
position_bias_dict = {}
|
|
for judge, ref in zip(judged_answers, references):
|
|
question = ref['question']
|
|
question_hash = hash(question)
|
|
if question_hash not in position_bias_dict:
|
|
position_bias_dict[question_hash] = {
|
|
'question': question,
|
|
'judge': judge
|
|
}
|
|
else:
|
|
first_judge = position_bias_dict[question_hash]['judge']
|
|
if judge == first_judge and first_judge not in banned_choice and judge not in banned_choice:
|
|
# If second choice is same with first choice, there has position bias.
|
|
position_bias_flag += 1
|
|
return position_bias_flag
|
|
|
|
|
|
def post_process_compassarena(item):
|
|
s = item['prediction']
|
|
if result := re.findall('(?:选择:|Choice: )([ABC])', s):
|
|
return result[0]
|
|
else:
|
|
return None
|
|
|
|
|
|
@DICT_POSTPROCESSORS.register_module('compassarena')
|
|
def compassarena_postprocess(output: dict,
|
|
output_path: str,
|
|
summary_type='half_add',
|
|
check_pos_bias=True) -> dict:
|
|
judged_answers, references = get_judgeanswer_and_reference(
|
|
output, output_path, post_process_compassarena)
|
|
|
|
if check_pos_bias:
|
|
bias_num = check_position_bias(judged_answers, references)
|
|
else:
|
|
bias_num = 0
|
|
|
|
win_model1 = defaultdict(float)
|
|
win_model2 = defaultdict(float)
|
|
categories = defaultdict(float)
|
|
model1 = references[0]['answer1']
|
|
|
|
for prediction, reference in zip(judged_answers, references):
|
|
categories[reference['capability']] += 1
|
|
|
|
if prediction == 'A':
|
|
if reference['answer1'] == model1:
|
|
score_1, score_2 = 1, 0
|
|
else:
|
|
score_1, score_2 = 0, 1
|
|
elif prediction == 'B':
|
|
if reference['answer1'] == model1:
|
|
score_1, score_2 = 0, 1
|
|
else:
|
|
score_1, score_2 = 1, 0
|
|
elif prediction == 'C':
|
|
if summary_type == 'half_add':
|
|
score_1, score_2 = 0.5, 0.5
|
|
else:
|
|
score_1, score_2 = 0, 0
|
|
|
|
win_model1[reference['capability']] += score_1
|
|
win_model2[reference['capability']] += score_2
|
|
for capability in categories:
|
|
win_model1[
|
|
capability] = win_model1[capability] / categories[capability] * 100
|
|
win_model1[capability] = round(win_model1[capability], 2)
|
|
win_model2[
|
|
capability] = win_model2[capability] / categories[capability] * 100
|
|
win_model2[capability] = round(win_model2[capability], 2)
|
|
|
|
win_model1['position_bias'] = bias_num
|
|
win_model2['position_bias'] = bias_num
|
|
|
|
results = win_model2
|
|
results['details'] = output
|
|
return results
|