mirror of
https://github.com/open-compass/opencompass.git
synced 2025-05-30 16:03:24 +08:00

* add judgellm prompts * add judgelm prompts * update import info * fix situation that no abbr in config * fix situation that no abbr in config * add summarizer for other judgellm * change config name * add maxlen * add maxlen * dict assert * dict assert * fix strings * fix strings
226 lines
10 KiB
Python
226 lines
10 KiB
Python
# flake8: noqa: E501
|
|
import csv
|
|
import os
|
|
import os.path as osp
|
|
import re
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from itertools import product
|
|
|
|
import mmengine
|
|
from mmengine import ConfigDict
|
|
|
|
try:
|
|
from prettytable import from_csv
|
|
except ImportError:
|
|
from_csv = None
|
|
|
|
from opencompass.partitioners.sub_naive import remove_duplicate_pairs
|
|
from opencompass.utils import dataset_abbr_from_cfg, model_abbr_from_cfg
|
|
|
|
|
|
def match_general_answer(s):
|
|
temp = s[0]
|
|
if temp in ['A', 'B', 'C', 'D']:
|
|
return temp
|
|
else:
|
|
return None
|
|
|
|
|
|
def match_GPT4_answer(s):
|
|
if result := re.findall('(?:选择:|Choice: )([ABCD])', s):
|
|
return result[0]
|
|
else:
|
|
return None
|
|
|
|
|
|
judge_map = {'smart': match_GPT4_answer, 'other': match_general_answer}
|
|
|
|
|
|
def call_function(name, arg):
|
|
if name in judge_map:
|
|
return judge_map[name](arg)
|
|
else:
|
|
print('Function not found in the map.')
|
|
|
|
|
|
class Corev2Summarizer:
|
|
"""Do the subjectivity analyze based on evaluation results.
|
|
|
|
Args:
|
|
config (ConfigDict): The configuration object of the evaluation task.
|
|
It's expected to be filled out at runtime.
|
|
"""
|
|
|
|
def __init__(self, config: ConfigDict, match_method='smart') -> None:
|
|
self.tasks = []
|
|
self.cfg = config
|
|
self.match_method = match_method
|
|
self.base_models = self.cfg['eval']['partitioner']['base_models']
|
|
self.compare_models = self.cfg['eval']['partitioner']['compare_models']
|
|
self.judge_abbr = model_abbr_from_cfg(self.cfg['judge_model'])
|
|
|
|
def summarize(self,
|
|
time_str: str = datetime.now().strftime('%Y%m%d_%H%M%S')):
|
|
"""Summarize the subjectivity analysis based on evaluation results.
|
|
|
|
Args:
|
|
time_str (str): Timestamp for file naming.
|
|
|
|
Returns:
|
|
pd.DataFrame: The summary results.
|
|
"""
|
|
dataset_cfgs = self.cfg['datasets']
|
|
work_dir = self.cfg['work_dir']
|
|
self.work_dir = work_dir
|
|
|
|
self.time_str = time_str
|
|
output_path = osp.join(self.work_dir, 'summary',
|
|
f'summary_{self.time_str}.txt')
|
|
output_dir = osp.join(osp.split(output_path)[0], f'{self.time_str}')
|
|
mmengine.mkdir_or_exist(output_dir)
|
|
results_folder = osp.join(work_dir, 'results')
|
|
|
|
model_combinations = list(
|
|
product(self.base_models, self.compare_models))
|
|
unique_combinations = remove_duplicate_pairs(
|
|
[combo for combo in model_combinations if combo[0] != combo[1]])
|
|
|
|
for model_pair in unique_combinations:
|
|
model1, model2, judge_model = model_pair[0]['abbr'], model_pair[1][
|
|
'abbr'], self.judge_abbr
|
|
subdir = model1 + '_' + model2 + '_judged-by--' + self.judge_abbr
|
|
subdir_path = os.path.join(results_folder, subdir)
|
|
if os.path.isdir(subdir_path):
|
|
fout = osp.join(output_dir,
|
|
'judged-by--' + judge_model + '-report.csv')
|
|
for dataset in dataset_cfgs:
|
|
dataset_abbr = dataset_abbr_from_cfg(dataset)
|
|
filename = os.path.join(subdir_path,
|
|
dataset_abbr + '.json')
|
|
partial_filename = os.path.join(subdir_path,
|
|
dataset_abbr + '_0.json')
|
|
if osp.exists(osp.realpath(filename)):
|
|
result = mmengine.load(filename)
|
|
elif osp.exists(osp.realpath(partial_filename)):
|
|
filename = partial_filename
|
|
result = {}
|
|
i = 1
|
|
partial_dict_flag = 0
|
|
while osp.exists(osp.realpath(filename)):
|
|
res = mmengine.load(filename)
|
|
for k, v in res.items():
|
|
result[partial_dict_flag] = v
|
|
partial_dict_flag += 1
|
|
filename = os.path.join(
|
|
subdir_path,
|
|
dataset_abbr + '_' + str(i) + '.json')
|
|
i += 1
|
|
else:
|
|
result = {}
|
|
|
|
if len(result) == 0:
|
|
print('*' * 100)
|
|
print('There are no results for ' + filename + ' or ' +
|
|
partial_filename)
|
|
print('*' * 100)
|
|
assert len(result > 0)
|
|
|
|
judged_answers = []
|
|
references = []
|
|
for k, v in result.items():
|
|
judged_answers.append(
|
|
call_function(self.match_method, v['prediction']))
|
|
references.append(v['gold'])
|
|
successful_judged_answers = len(
|
|
judged_answers) - judged_answers.count(None)
|
|
print(
|
|
f'Among {len(judged_answers)} judgements, successfully extracted {successful_judged_answers} judgements.'
|
|
)
|
|
if successful_judged_answers == 0:
|
|
print('*' * 100)
|
|
print(
|
|
'There are no extracted judgements, please change your judge model or check your prompt!!!'
|
|
)
|
|
print('*' * 100)
|
|
assert successful_judged_answers > 0
|
|
|
|
win_both_model1, win_both_model2, half_draw_model1, half_draw_model2, categories = defaultdict(
|
|
float), defaultdict(float), defaultdict(
|
|
float), defaultdict(float), defaultdict(float)
|
|
model1 = references[0]['answer1']
|
|
model2 = references[0]['answer2']
|
|
for prediction, reference in zip(judged_answers,
|
|
references):
|
|
if prediction is not None:
|
|
categories[reference['capability'].split('-')
|
|
[0]] += 1
|
|
categories[reference['capability']] += 1
|
|
winner = ''
|
|
if prediction == 'A':
|
|
winner = reference['answer1']
|
|
elif prediction == 'B':
|
|
winner = reference['answer2']
|
|
elif prediction == 'C':
|
|
win_both_model1[reference['capability'].split(
|
|
'-')[0]] += 1
|
|
win_both_model2[reference['capability'].split(
|
|
'-')[0]] += 1
|
|
win_both_model1[reference['capability']] += 1
|
|
win_both_model2[reference['capability']] += 1
|
|
if model1 == winner:
|
|
half_draw_model1[reference['capability'].split(
|
|
'-')[0]] += 1
|
|
win_both_model1[reference['capability'].split(
|
|
'-')[0]] += 1
|
|
half_draw_model1[reference['capability']] += 1
|
|
win_both_model1[reference['capability']] += 1
|
|
elif model2 == winner:
|
|
half_draw_model2[reference['capability'].split(
|
|
'-')[0]] += 1
|
|
win_both_model2[reference['capability'].split(
|
|
'-')[0]] += 1
|
|
half_draw_model2[reference['capability']] += 1
|
|
win_both_model2[reference['capability']] += 1
|
|
for capability in categories:
|
|
if capability not in half_draw_model1:
|
|
win_both_model1[capability] = 0.0
|
|
half_draw_model1[capability] = 0.0
|
|
else:
|
|
win_both_model1[capability] = round(
|
|
(win_both_model1[capability] /
|
|
categories[capability]) * 100, 2)
|
|
half_draw_model1[capability] = round(
|
|
(half_draw_model1[capability] /
|
|
categories[capability]) * 100, 2)
|
|
if capability not in half_draw_model2:
|
|
win_both_model2[capability] = 0.0
|
|
half_draw_model2[capability] = 0.0
|
|
else:
|
|
win_both_model2[capability] = round(
|
|
(win_both_model2[capability] /
|
|
categories[capability]) * 100, 2)
|
|
half_draw_model2[capability] = round(
|
|
(half_draw_model2[capability] /
|
|
categories[capability]) * 100, 2)
|
|
scores = {
|
|
'win_both_' + model1: win_both_model1,
|
|
'half_draw_' + model1: half_draw_model1,
|
|
'win_both_' + model2: win_both_model2,
|
|
'half_draw_' + model2: half_draw_model2
|
|
}
|
|
rows = list(scores.keys())
|
|
columns = list(scores[rows[0]].keys())
|
|
with open(fout, 'a+', newline='') as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow([model1 + '_vs_' + model2] + columns)
|
|
for row in rows:
|
|
writer.writerow(
|
|
[row] +
|
|
[scores[row][column] for column in columns])
|
|
else:
|
|
print(subdir_path + ' is not exist! please check!')
|
|
with open(fout, 'r') as f:
|
|
x = from_csv(f)
|
|
print(x)
|