2023-11-07 19:11:44 +08:00
|
|
|
import json
|
|
|
|
import os
|
|
|
|
import os.path as osp
|
|
|
|
import re
|
|
|
|
from typing import List, Optional
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
from datasets import Dataset
|
|
|
|
|
|
|
|
from opencompass.openicl.icl_evaluator import BaseEvaluator
|
|
|
|
from opencompass.registry import LOAD_DATASET
|
|
|
|
|
|
|
|
from .base import BaseDataset
|
|
|
|
|
|
|
|
|
|
|
|
def load_experiment(file: str) -> dict:
|
|
|
|
"""Load single experiment file with solutions."""
|
|
|
|
with open(file, 'r') as f:
|
|
|
|
notebook = json.load(f)
|
|
|
|
example = notebook['cells']
|
|
|
|
|
|
|
|
questions = []
|
|
|
|
outputs = []
|
|
|
|
tags = []
|
|
|
|
for cell in example:
|
|
|
|
if cell['cell_type'] == 'markdown':
|
|
|
|
text = ''.join(cell['source'])
|
|
|
|
# append the formatted text
|
|
|
|
questions.append(text)
|
|
|
|
elif cell['cell_type'] == 'code':
|
|
|
|
if cell['outputs'] and 'data' in cell['outputs'][-1]:
|
|
|
|
if 'image/png' in cell['outputs'][-1]['data']:
|
|
|
|
# skip vis temporarily due to lack of evaluation
|
|
|
|
tags.append('vis')
|
|
|
|
outputs.append(
|
|
|
|
cell['outputs'][-1]['data']['image/png'])
|
|
|
|
elif 'text/plain' in cell['outputs'][-1]['data']:
|
|
|
|
tags.append('general')
|
|
|
|
outputs.append(''.join(
|
|
|
|
cell['outputs'][-1]['data']['text/plain']))
|
|
|
|
else:
|
|
|
|
tags.append('executable')
|
|
|
|
outputs.append(None)
|
|
|
|
return dict(
|
|
|
|
experiment=file,
|
|
|
|
questions=questions,
|
|
|
|
references=dict(outputs=outputs, tags=tags, experiment=file),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@LOAD_DATASET.register_module()
|
|
|
|
class CIBenchDataset(BaseDataset):
|
|
|
|
"""Code Interpreter dataset."""
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def load(path: str):
|
|
|
|
"""Load whole dataset."""
|
|
|
|
data_list = []
|
|
|
|
for cwd, dirs, files in os.walk(path):
|
|
|
|
dirs.sort()
|
|
|
|
files.sort()
|
|
|
|
for f in files:
|
|
|
|
if '.ipynb' in f:
|
|
|
|
try:
|
|
|
|
data = load_experiment(os.path.join(cwd, f))
|
|
|
|
except Exception:
|
|
|
|
print(f'Error with file {os.path.join(cwd, f)}')
|
|
|
|
continue
|
|
|
|
data_list.append(data)
|
|
|
|
|
|
|
|
dataset = Dataset.from_list(data_list)
|
|
|
|
return dataset
|
|
|
|
|
|
|
|
|
|
|
|
class CIBenchEvaluator(BaseEvaluator):
|
|
|
|
"""Evaluator for CI dataset.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
output_dir (optional, str): The directory to save experiment
|
|
|
|
files in a markdown or notebook format.
|
|
|
|
user_data_dir (str): The directory to load local files.
|
|
|
|
Defaults to 'ENV', which means use environment variable
|
|
|
|
`USER_DATA_DIR` to get the data dir.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
output_dir: Optional[str] = None,
|
|
|
|
user_data_dir: str = 'ENV') -> None:
|
|
|
|
# TODO: should use work dir for this task.
|
|
|
|
self.output_dir = output_dir
|
|
|
|
if user_data_dir == 'ENV':
|
|
|
|
user_data_dir = os.environ.get('USER_DATA_DIR', '')
|
|
|
|
self.user_data_dir = user_data_dir
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def valid_step(step):
|
|
|
|
"""Whether the step is executable and valid."""
|
|
|
|
# Found the latest code interpreter to determine valid
|
|
|
|
for action in step[::-1]:
|
|
|
|
if action['type'] == 'IPythonInterpreter':
|
|
|
|
if action['errmsg']:
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
return True
|
|
|
|
# No code interpreter for this step, reckon as False
|
|
|
|
return False
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def correct_step(step, target):
|
|
|
|
"""Whether the step output is correct."""
|
|
|
|
# Found the latest code interpreter to determine correct
|
|
|
|
for action in step[::-1]:
|
|
|
|
if action['type'] == 'IPythonInterpreter':
|
|
|
|
if action['result']:
|
|
|
|
try:
|
|
|
|
pred = action['result']['text']
|
|
|
|
match = re.search('```\n(.*?)\n```', pred, re.DOTALL)
|
|
|
|
if match:
|
|
|
|
out = match.group(1)
|
|
|
|
return out == target or out in target
|
|
|
|
except Exception:
|
|
|
|
return False
|
|
|
|
# Fall back to False
|
|
|
|
return False
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def vis_similarity_step(step, target):
|
|
|
|
"""Whether the step output image has the same structure similarity with
|
|
|
|
the given images."""
|
|
|
|
# Found the latest code interpreter to determine correct
|
|
|
|
import base64
|
|
|
|
|
|
|
|
import skimage
|
|
|
|
|
|
|
|
for action in step[::-1]:
|
|
|
|
if action['type'] == 'IPythonInterpreter':
|
|
|
|
if action['result']:
|
|
|
|
try:
|
|
|
|
pred = action['result']['text']
|
|
|
|
match = re.search(r'!\[fig-[0-9]*\]\((.*?)\)', pred,
|
|
|
|
re.DOTALL)
|
|
|
|
if match:
|
|
|
|
img_pred = match.group(1)
|
|
|
|
img2 = base64.b64decode(target)
|
|
|
|
img2 = skimage.io.imread(img2, plugin='imageio')
|
|
|
|
img1 = skimage.io.imread(img_pred, plugin='imageio')
|
|
|
|
img1 = skimage.transform.resize(img1, img2.shape[:2])
|
|
|
|
img1 = 255 * img1
|
|
|
|
# Convert to integer data type pixels.
|
|
|
|
img1 = img1.astype(np.uint8)
|
|
|
|
ssim = skimage.metrics.structural_similarity(
|
|
|
|
img1, img2, channel_axis=-1)
|
|
|
|
# mse = skimage.metrics.mean_squared_error(img1, img2)
|
|
|
|
# ssim greater better
|
|
|
|
# mse smaller better but has no upper bound
|
|
|
|
return ssim
|
|
|
|
except Exception:
|
|
|
|
return 0
|
|
|
|
# Fall back to 0
|
|
|
|
return 0
|
|
|
|
|
|
|
|
def save_results(self, origin_prompt, steps):
|
|
|
|
"""Save the prediction result in a markdown and notebook format."""
|
|
|
|
|
|
|
|
def check_jupytext():
|
|
|
|
"""Check requirements existence."""
|
|
|
|
from shutil import which
|
|
|
|
|
|
|
|
assert which('jupytext'), (
|
|
|
|
"Please install jupytext use 'pip install jupytext' to ensure"
|
|
|
|
'the conversion processes.')
|
|
|
|
|
|
|
|
check_jupytext()
|
2023-11-08 17:58:49 +08:00
|
|
|
from opencompass.lagent.actions.ipython_interpreter import extract_code
|
2023-11-07 19:11:44 +08:00
|
|
|
for idx, (example_origin_prompt,
|
|
|
|
example_steps) in enumerate(zip(origin_prompt, steps)):
|
|
|
|
markdown_lines = []
|
|
|
|
for prompt, step in zip(example_origin_prompt, example_steps):
|
|
|
|
for action in step[::-1]:
|
|
|
|
if action['type'] == 'IPythonInterpreter':
|
|
|
|
valid_action = action
|
|
|
|
break
|
|
|
|
# fall back to final action
|
|
|
|
valid_action = step[-1]
|
|
|
|
markdown_lines.append(prompt)
|
|
|
|
markdown_lines.append('\n')
|
|
|
|
code_text = valid_action['args']['text']
|
|
|
|
code_text = extract_code(code_text)
|
|
|
|
code_text = '```python\n' + code_text + '\n```'
|
|
|
|
markdown_lines.append(code_text)
|
|
|
|
markdown_lines.append('\n')
|
|
|
|
|
|
|
|
md_file = f'experiment{idx}.md'
|
|
|
|
with open(md_file, 'w') as f:
|
|
|
|
f.writelines(markdown_lines)
|
|
|
|
|
|
|
|
# TODO: be careful for this
|
|
|
|
# convert markdown to ipynb and exectue with error tolerance
|
|
|
|
# subprocess.Popen(
|
|
|
|
# "jupytext --to ipynb --pipe-fmt ipynb "
|
|
|
|
# "--pipe 'jupyter nbconvert --to ipynb --execute "
|
|
|
|
# f"--allow-errors --stdin --stdout' {md_file}",
|
|
|
|
# shell=True)
|
|
|
|
|
|
|
|
def set_data_dir(self, work_dir):
|
|
|
|
"""Set work directory and link data files for save notebook results."""
|
|
|
|
if self.user_data_dir:
|
|
|
|
if self.user_data_dir.endswith('/'):
|
|
|
|
basename = osp.basename(osp.split(self.user_data_dir)[0])
|
|
|
|
else:
|
|
|
|
basename = osp.basename(self.user_data_dir)
|
|
|
|
if not osp.exists(osp.join(self.output_dir, basename)):
|
|
|
|
os.symlink(self.user_data_dir,
|
|
|
|
osp.join(self.output_dir, basename))
|
|
|
|
os.chdir(work_dir)
|
|
|
|
|
|
|
|
def unset_data_dir(self, work_dir):
|
|
|
|
"""Change work directory and keep the symlink."""
|
|
|
|
os.chdir(work_dir)
|
|
|
|
|
|
|
|
def score(self, predictions: List, references: List, steps: List,
|
|
|
|
origin_prompt: List):
|
|
|
|
"""Calculate accuracy."""
|
|
|
|
cwd = os.getcwd()
|
|
|
|
if self.output_dir:
|
|
|
|
if not osp.exists(self.output_dir):
|
|
|
|
os.makedirs(self.output_dir)
|
|
|
|
self.set_data_dir(self.output_dir)
|
|
|
|
self.save_results(origin_prompt, steps)
|
|
|
|
self.unset_data_dir(cwd)
|
|
|
|
|
|
|
|
num_cells_list = []
|
|
|
|
num_general_list = []
|
|
|
|
passed_list = []
|
|
|
|
correct_list = []
|
|
|
|
vis_list = []
|
|
|
|
for gold, single_steps in zip(references, steps):
|
|
|
|
tags = gold['tags']
|
|
|
|
outputs = gold['outputs']
|
|
|
|
num_cells = len(tags)
|
|
|
|
num_general = sum([tag == 'general' for tag in tags])
|
|
|
|
|
|
|
|
passed = sum([self.valid_step(step) for step in single_steps])
|
|
|
|
correct = 0
|
|
|
|
vis_sim = []
|
|
|
|
for tag, step, output in zip(tags, single_steps, outputs):
|
|
|
|
if tag == 'general':
|
|
|
|
correct += self.correct_step(step, output)
|
|
|
|
elif tag == 'vis':
|
|
|
|
vis_sim.append(self.vis_similarity_step(step, output))
|
|
|
|
|
|
|
|
num_cells_list.append(num_cells)
|
|
|
|
num_general_list.append(num_general)
|
|
|
|
passed_list.append(passed)
|
|
|
|
correct_list.append(correct)
|
|
|
|
if vis_sim:
|
|
|
|
vis_list.append(sum(vis_sim) / len(vis_sim))
|
|
|
|
else:
|
|
|
|
vis_list.append(-1)
|
|
|
|
|
|
|
|
if len([v for v in vis_list if v >= 0]) > 0:
|
|
|
|
visualize_similarity = sum([v for v in vis_list if v >= 0]) / len(
|
|
|
|
[v for v in vis_list if v >= 0])
|
|
|
|
else:
|
|
|
|
# not valid
|
|
|
|
visualize_similarity = -1
|
|
|
|
|
|
|
|
if sum(num_general_list) > 0:
|
|
|
|
general_accuracy = sum(correct_list) / sum(num_general_list)
|
|
|
|
else:
|
|
|
|
# not valid
|
|
|
|
general_accuracy = -1
|
|
|
|
|
|
|
|
result = dict(
|
|
|
|
executable_rate=sum(passed_list) / sum(num_cells_list) * 100,
|
|
|
|
general_accuracy=general_accuracy * 100,
|
|
|
|
visualize_similarity=visualize_similarity * 100,
|
|
|
|
num_cells_list=num_cells_list,
|
|
|
|
num_general_list=num_general_list,
|
|
|
|
passed_list=passed_list,
|
|
|
|
correct_list=correct_list,
|
|
|
|
vis_list=vis_list,
|
|
|
|
)
|
|
|
|
return result
|