OpenCompass/opencompass/datasets/needlebench/parallel.py

286 lines
11 KiB
Python
Raw Normal View History

2024-02-23 14:48:58 +08:00
import json
import random
from pathlib import Path
import tiktoken
from datasets import Dataset
from opencompass.datasets.base import BaseDataset
from opencompass.openicl import BaseEvaluator
from opencompass.registry import LOAD_DATASET
def get_unique_entries(file_path,
n,
language,
unique_arg1=False,
unique_arg2=False,
unique_combination=False):
seen_arg1 = set()
seen_arg2 = set()
seen_combinations = set()
results = []
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
random.shuffle(lines)
for line in lines:
try:
entry = json.loads(line.strip())
except json.JSONDecodeError:
continue
if entry.get('language') != language:
continue
key1 = entry.get('arg1', '') if unique_arg1 else ''
key2 = entry.get('arg2', '') if unique_arg2 else ''
combination = (key1, key2) if unique_combination else ''
if (key1 not in seen_arg1 or not unique_arg1) and \
(key2 not in seen_arg2 or not unique_arg2) and \
(combination not in seen_combinations or not unique_combination):
seen_arg1.add(key1)
seen_arg2.add(key2)
seen_combinations.add(combination)
results.append(entry)
if len(results) == n:
break
return results
@LOAD_DATASET.register_module()
class NeedleBenchParallelDataset(BaseDataset):
@staticmethod
def load(
path: str,
needle_file_name: str,
length: int,
depths: list[int],
tokenizer_model: str,
file_list: list[str],
num_repeats_per_file: int,
length_buffer: int,
guide: bool,
language: str,
):
data = {'prompt': [], 'answer': []}
tokenizer = tiktoken.encoding_for_model(tokenizer_model)
files = Path(path).glob('*.jsonl')
for file in files:
if file.name == needle_file_name:
needle_file_path = file
predefined_needles_bak = get_unique_entries(needle_file_path,
len(depths),
language,
unique_arg1=True,
unique_arg2=True,
unique_combination=True)
def _generate_context(tokens_context, depths, needles):
insertion_points = [
int(len(tokens_context) * (depth / 100)) for depth in depths
]
cumulative_inserted_length = 0
for i, needle in enumerate(needles):
needle_tokens = _get_tokens_from_context(needle)
current_insertion_point = min(
insertion_points[i] + cumulative_inserted_length,
len(tokens_context))
tokens_context = tokens_context[:current_insertion_point] + \
needle_tokens + tokens_context[current_insertion_point:]
cumulative_inserted_length += len(needle_tokens)
new_context = _decode_tokens(tokens_context)
return new_context
def _get_tokens_from_context(context):
if isinstance(context, list):
return [tokenizer.encode(item) for item in context]
else:
return tokenizer.encode(context)
def _decode_tokens(tokens):
return tokenizer.decode(tokens)
def _modify_retrieval_question(retrieval_question):
if language == 'Chinese':
parts = retrieval_question.split('请按照')
guide_retrieval_question = (parts[0] + '在回答之前,请思考文档中与此问题'
'最相关的内容是什么。请按照' + parts[1])
return guide_retrieval_question
elif language == 'English':
parts = retrieval_question.split('Please answer in the format')
guide_retrieval_question = (
parts[0] + 'Before answering, please consider'
' what in the document is most relevant to this question.'
' Please answer in the format' + parts[1])
return guide_retrieval_question
else:
raise ValueError(f"Language '{language}' is not supported.")
def _generate_prompt(context, retrieval_question):
if guide:
retrieval_question = _modify_retrieval_question(
retrieval_question)
if language == 'Chinese':
prompt = ('你是一个善于回答用户问题的智能AI助手\n'
'请保持你的回答简洁清楚。不要说和下面文档中的无关的话'
',或重复你的回答\n请先仔细阅读下面的文档再依次回答'
f'最后提出的问题\n用户现在给你的文档是{context}\n\n'
f'现在请问:{retrieval_question}\n')
elif language == 'English':
prompt = (
'You are an intelligent AI assistant skilled in '
'answering user questions.\n'
'Please keep your answers concise and clear. Do not'
' talk about irrelevant topics or repeat your '
'answers.\n'
f'The document given to you by the user is {context}'
f'\n\nNow, the questions are: {retrieval_question}\n')
else:
raise ValueError(f"Language '{language}' is not supported.")
return prompt
files = Path(path).glob('*.jsonl')
for file in files:
if file.name not in file_list:
continue
with open(file, 'r', encoding='utf-8') as f:
lines_bak = [json.loads(line.strip()) for line in f]
lines = lines_bak.copy()
for counter in range(num_repeats_per_file):
random.seed(counter)
random.shuffle(lines)
predefined_needles = predefined_needles_bak.copy()
random.shuffle(predefined_needles)
needles = [
'\n' + item['needle'] + '\n' for item in predefined_needles
]
keywords = [item['arg2'] for item in predefined_needles]
if language == 'Chinese':
questions = ''.join([
item['retrieval_question'].split('')[0] + ''
for item in predefined_needles
])
answers_format = ''.join([
item['retrieval_question'].split("'")[1].split('')[0]
for item in predefined_needles
])
retrieval_question = questions + "请按照'" + \
answers_format + "'的格式回答。"
elif language == 'English':
questions = ''.join([
item['retrieval_question'].split('?')[0] + '?'
for item in predefined_needles
])
answers_format = ''.join([
item['retrieval_question'].split("'")[1].split('.')[0]
for item in predefined_needles
])
retrieval_question = questions + \
"Please answer in the format of '" + \
answers_format + "'"
context_length = length - length_buffer
target_length_per_record = context_length - \
sum(len(tokens) for tokens
in _get_tokens_from_context(needles))
target_length_per_record = max(target_length_per_record, 0)
accumulated_tokens = []
for line in lines:
tokens_current_line = _get_tokens_from_context(
line['text'])
accumulated_tokens.extend(tokens_current_line)
if len(accumulated_tokens) >= target_length_per_record:
break
processed_text = _generate_context(
accumulated_tokens[:target_length_per_record], depths,
needles)
processed_prompt = _generate_prompt(processed_text,
retrieval_question)
data['prompt'].append(processed_prompt)
data['answer'].append('*'.join(keywords) + '#' +
'*'.join(map(str, depths)))
dataset = Dataset.from_dict({
'prompt': data['prompt'],
'answer': data['answer'],
})
return dataset
class NeedleBenchParallelEvaluator(BaseEvaluator):
def levenshtein_distance(self, s1, s2):
if len(s1) < len(s2):
return self.levenshtein_distance(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
def score(self, predictions, gold):
if len(predictions) != len(gold):
return {'error': 'predictions and gold have different lengths'}
print('predictions:', predictions)
print('gold:', gold)
details = []
depths = [int(i) for i in gold[0].split('#')[1].split('*')]
scores_by_depth = {depth: 0 for depth in depths}
for prediction, reference in zip(predictions, gold):
print(reference)
keywords = reference.split('#')[0].split('*')
print(keywords)
for keyword, depth in zip(keywords, depths):
print('iterating:', keyword, depth)
if keyword in prediction:
print(f'{keyword} at depth {depth} is in {prediction}')
scores_by_depth[depth] += 100 / (len(predictions))
average_score = sum(scores_by_depth.values()) / len(scores_by_depth)
flattened_scores = {
'Depth' + str(depth): score
for depth, score in scores_by_depth.items()
}
result = {
**flattened_scores, 'details': details,
'average_score': average_score
}
return result