OpenCompass/opencompass/datasets/livemathbench/livemathbench.py

400 lines
15 KiB
Python
Raw Normal View History

import concurrent.futures
import os
import re
from copy import deepcopy
from itertools import product
from typing import Any, Dict, List
import jsonlines
import numpy as np
from datasets import Dataset
from opencompass.models import OpenAISDK
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET, MODELS
from ..base import BaseDataset
from .prompts import (EXTRACT_PROMPT_CN, EXTRACT_PROMPT_EN, JUDGE_PROMPT_CN,
JUDGE_PROMPT_EN, PROMPT_CN, PROMPT_EN)
@LOAD_DATASET.register_module()
class LiveMathBenchDataset(BaseDataset):
dataset_splits = ['AIMC', 'CEE', 'CMO', 'MATH500']
dataset_languages = ['cn', 'en']
@staticmethod
def load(
path: str,
k: int,
n: int,
) -> List[Dict[str, Any]]:
dataset = []
dataset_info = {}
for split, language in product(LiveMathBenchDataset.dataset_splits,
LiveMathBenchDataset.dataset_languages):
file_path = os.path.join(path, f'{split}_{language}.jsonl')
if not os.path.exists(file_path):
continue
dataset_info[f'{split}_{language}'] = {
'single-choice': 0,
'multiple-choice': 0,
'fill-in-the-blank': 0,
'problem-solving': 0
}
question_type_mapping = {
'单选': 'single-choice',
'多选': 'multiple-choice',
'填空': 'fill-in-the-blank',
'问答': 'problem-solving'
}
with jsonlines.open(file_path, 'r') as file:
for example_idx, example in enumerate(file):
dataset_info[f'{split}_{language}'][
example['question_type'] if language == 'en' else
question_type_mapping[example['question_type']]] += 1
prompt = PROMPT_EN if language == 'en' else PROMPT_CN
example.update({
'dataset_key':
f'{split}_{language}_{example_idx}',
'prompt':
prompt.format(question_type=example['question_type'],
question=example['question'] +
('' if 'options' not in example else
' '.join(example['options']))),
'k':
k,
'n':
n
})
for idx in range(k * n):
duplicated_example = deepcopy(example)
duplicated_example.update({'duplicated_idx': idx})
dataset.append(duplicated_example)
return Dataset.from_list(dataset)
@ICL_EVALUATORS.register_module()
class LiveMathBenchEvaluator(BaseEvaluator):
api_meta_template = dict(round=[
dict(role='HUMAN', api_role='HUMAN'),
dict(role='BOT', api_role='BOT', generate=True),
])
def __init__(self,
model_name,
url,
with_postprocess=True,
use_extract_model=False,
post_url=[],
post_model_name='',
**kwargs):
if isinstance(url, str):
url = [url]
self.model = [
MODELS.build(
dict(
type=OpenAISDK,
path=model_name,
openai_api_base=url,
key='EMPTY',
query_per_second=2,
meta_template=self.api_meta_template,
temperature=kwargs.get('temperature', 0.01),
max_seq_len=kwargs.get('max_tokens', 2048),
)) for url in url
]
self.with_postprocess = with_postprocess
self.use_extract_model = use_extract_model
self.post_url = post_url
self.post_model_name = post_model_name
def batch_response(self, models: List[OpenAISDK],
inputs: List[str]) -> List[str]:
batch_num = len(models)
batch_size = (len(inputs) + batch_num - 1) // batch_num
result_responses = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=batch_num) as executor:
futures = [
executor.submit(models[i].generate,
inputs[i * batch_size:(i + 1) * batch_size])
for i in range(batch_num)
]
for response in executor.map(lambda f: f.result(), futures):
result_responses.extend(response)
return result_responses
def postprocess(self, questions: List[str], predictions: List[str],
question_types: List[str],
languages: List[str]) -> List[str]:
if self.use_extract_model:
assert len(self.post_url) > 0 and self.post_model_name != ''
post_model = [
MODELS.build(
dict(
type=OpenAISDK,
path=self.post_model_name,
openai_api_base=url,
key='EMPTY',
query_per_second=2,
meta_template=self.api_meta_template,
temperature=0.01,
max_seq_len=1024,
)) for url in self.post_url
]
input_prompts = []
for question, prediction, question_type, language in zip(
questions, predictions, question_types, languages):
prompt = (EXTRACT_PROMPT_EN
if language == 'en' else EXTRACT_PROMPT_CN)
input_prompts.append(
prompt.format(question=question,
response=prediction,
question_type=question_type))
result_responses = self.batch_response(post_model, input_prompts)
return result_responses
def last_boxed_only_string(string):
idx = string.rfind('\\boxed')
if idx < 0:
idx = string.rfind('\\fbox')
if idx < 0:
return None
i = idx
right_brace_idx = None
num_left_braces_open = 0
while i < len(string):
if string[i] == '{':
num_left_braces_open += 1
if string[i] == '}':
num_left_braces_open -= 1
if num_left_braces_open == 0:
right_brace_idx = i
break
i += 1
if right_brace_idx is None:
retval = None
else:
retval = string[idx:right_brace_idx + 1]
return retval
def remove_boxed(s):
left = '\\boxed{'
try:
assert s[:len(left)] == left
assert s[-1] == '}'
return s[len(left):-1]
except Exception:
return None
def extract_boxed_answer(pred_str, strip_double_curly_brace=False):
boxed_str = last_boxed_only_string(pred_str)
if boxed_str is None:
return None
answer = remove_boxed(boxed_str)
if answer is None:
return None
if strip_double_curly_brace:
match = re.match('^\{(.*)\}$', answer) # noqa: W605
if match:
answer = match.group(1)
return answer
predictions = [
extract_boxed_answer(prediction) for prediction in predictions
]
return predictions
def extract_boxed_answer(self, text):
match = re.findall(r'\\boxed{(.+?)}', text)
if match:
return match[-1]
return None
def score(self, predictions, references, origin_prompt, test_set):
if len(predictions) != len(references):
return {'error': 'preds and refrs have different length'}
questions = test_set['question']
question_types = test_set['question_type']
languages = [key.split('_')[1] for key in test_set['dataset_key']]
if self.with_postprocess:
predictions = self.postprocess(questions, predictions,
question_types, languages)
inputs = []
for prediction, reference, question, language in zip(
predictions, references, questions, languages):
prompt = JUDGE_PROMPT_EN if language == 'en' else JUDGE_PROMPT_CN
inputs.append(
prompt.format(answer=prediction,
gold_answer=reference,
question=question))
result_responses = self.batch_response(self.model, inputs)
results = [
self.extract_boxed_answer(result) == 'yes'
for result in result_responses
]
K = test_set['k'][0]
N = test_set['n'][0]
key2example = {}
for example, result_response, result, prediction in zip(
test_set, result_responses, results, predictions):
if example['dataset_key'] not in key2example:
key2example[example['dataset_key']] = []
example.update({
'eval_response': result_response,
'prediction': prediction,
'correct': result
})
key2example[example['dataset_key']].append(example)
for key in key2example:
key2example[key] = [
key2example[key][i * K:(i + 1) * K] for i in range(N)
]
count = []
total_pass_num = []
details = []
all_dataset = set()
for key, examples in key2example.items():
detail = {
'question': examples[0][0]['question'],
'answer': examples[0][0]['answer'],
'responses': [],
'dataset': '_'.join(key.split('_')[:-1])
}
all_dataset.add('_'.join(key.split('_')[:-1]))
if_pass_list = []
for single_run_examples in examples:
detail['responses'].append([])
if_pass_list.append([])
for example in single_run_examples:
detail['responses'][-1].append({
'prediction':
example['prediction'],
'eval_response':
example['eval_response']
})
if_pass_list[-1].append(1.0 if example['correct'] else 0.0)
if_pass_list = [
sorted(if_pass, reverse=True) for if_pass in if_pass_list
]
if_pass_list = np.array(if_pass_list)
i = 1
while i <= K:
detail.update({
f'pass-rate@{i}':
if_pass_list[:, :i].mean(axis=1).mean(axis=0).item(),
f'pass-rate@{i}/std':
if_pass_list[:, :i].mean(axis=1).std(axis=0).item(),
f'pass@{i}':
if_pass_list[:, :1].mean(axis=1).mean(axis=0).item(),
f'pass@{i}/std':
if_pass_list[:, :1].mean(axis=1).std(axis=0).item(),
})
i = i * 2
for threshold in [0.5, 0.75, 1.0]:
detail.update({
f'{K}-pass@{threshold}':
np.floor(
np.where(
if_pass_list.mean(axis=1) >= threshold, 1.0,
0.0).mean(axis=0))
})
count.append(np.ones_like(if_pass_list).sum(axis=1))
total_pass_num.append(if_pass_list.sum(axis=1))
details.append(detail)
detailed_result = {'details': details}
i = 1
while i <= K:
detailed_result.update({
f'pass-rate@{i}':
100. *
np.mean([detail[f'pass-rate@{i}'] for detail in details]),
f'pass-rate@{i}/std':
100. *
np.mean([detail[f'pass-rate@{i}/std'] for detail in details]),
f'pass@{i}':
100. * np.mean([detail[f'pass@{i}'] for detail in details]),
f'pass@{i}/std':
100. * np.mean([detail[f'pass@{i}/std'] for detail in details])
})
for d in sorted(list(all_dataset)):
detailed_result.update({
f'{d}/pass-rate@{i}':
100. * np.mean([
detail[f'pass-rate@{i}']
for detail in details if detail['dataset'] == d
]),
f'{d}/pass-rate@{i}/std':
100. * np.mean([
detail[f'pass-rate@{i}/std']
for detail in details if detail['dataset'] == d
]),
f'{d}/pass@{i}':
100. * np.mean([
detail[f'pass@{i}']
for detail in details if detail['dataset'] == d
]),
f'{d}/pass@{i}/std':
100. * np.mean([
detail[f'pass@{i}/std']
for detail in details if detail['dataset'] == d
])
})
i = i * 2
for threshold in [0.5, 0.75, 1.0]:
detailed_result.update({
f'{K}-pass@{threshold}':
100. * np.mean([
detail[f'{K}-pass@{threshold}'] for detail in details
])
})
detailed_result.update({
f'{K}-pass@{threshold}/std':
100. * np.std([
detail[f'{K}-pass@{threshold}'] for detail in details
])
})
for d in sorted(list(all_dataset)):
detailed_result.update({
f'{d}/{K}-pass@{threshold}':
100. * np.mean([
detail[f'{K}-pass@{threshold}']
for detail in details if detail['dataset'] == d
])
})
detailed_result.update({
f'{d}/{K}-pass@{threshold}/std':
100. * np.std([
detail[f'{K}-pass@{threshold}']
for detail in details if detail['dataset'] == d
])
})
return detailed_result