Support NPHardEval (#835)

* support NPHardEval

* add .md file and fix minor bugs

* refactor and minor fix

---------

Co-authored-by: Leymore <zfz-960727@163.com>
This commit is contained in:
Skyfall-xzz 2024-02-05 15:52:28 +08:00 committed by GitHub
parent b4a9acd7be
commit 7ad1168062
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1902 additions and 0 deletions

View File

@ -7,6 +7,7 @@ exclude: |
opencompass/datasets/lawbench/utils|
opencompass/datasets/lawbench/evaluation_functions/|
opencompass/datasets/medbench/|
opencompass/datasets/NPHardEval/|
docs/zh_cn/advanced_guides/compassbench_intro.md
)
repos:

View File

@ -7,6 +7,7 @@ exclude: |
opencompass/datasets/lawbench/utils|
opencompass/datasets/lawbench/evaluation_functions/|
opencompass/datasets/medbench/|
opencompass/datasets/NPHardEval/|
docs/zh_cn/advanced_guides/compassbench_intro.md
)
repos:

View File

@ -0,0 +1,4 @@
from mmengine.config import read_base
with read_base():
from .NPHardEval_gen_22aac5 import NPHardEval_datasets # noqa: F401, F403

View File

@ -0,0 +1,59 @@
from opencompass.openicl.icl_prompt_template import PromptTemplate
from opencompass.openicl.icl_retriever import ZeroRetriever
from opencompass.openicl.icl_inferencer import GenInferencer
from opencompass.datasets.NPHardEval import (
hard_GCP_Dataset, hard_GCP_Evaluator,
hard_TSP_Dataset, hard_TSP_Evaluator,
hard_MSP_Dataset, hard_MSP_Evaluator,
cmp_GCP_D_Dataset, cmp_GCP_D_Evaluator,
cmp_TSP_D_Dataset, cmp_TSP_D_Evaluator,
cmp_KSP_Dataset, cmp_KSP_Evaluator,
p_BSP_Dataset, p_BSP_Evaluator,
p_EDP_Dataset, p_EDP_Evaluator,
p_SPP_Dataset, p_SPP_Evaluator,
)
NPHardEval_tasks = [
["hard_GCP", "GCP", hard_GCP_Dataset, hard_GCP_Evaluator],
["hard_TSP", "TSP", hard_TSP_Dataset, hard_TSP_Evaluator],
["hard_MSP", "MSP", hard_MSP_Dataset, hard_MSP_Evaluator],
["cmp_GCP_D", "GCP_Decision", cmp_GCP_D_Dataset, cmp_GCP_D_Evaluator],
["cmp_TSP_D", "TSP_Decision", cmp_TSP_D_Dataset, cmp_TSP_D_Evaluator],
["cmp_KSP", "KSP", cmp_KSP_Dataset, cmp_KSP_Evaluator],
["p_BSP", "BSP", p_BSP_Dataset, p_BSP_Evaluator],
["p_EDP", "EDP", p_EDP_Dataset, p_EDP_Evaluator],
["p_SPP", "SPP", p_SPP_Dataset, p_SPP_Evaluator],
]
NPHardEval_datasets = []
for name, path_name, dataset, evaluator in NPHardEval_tasks:
NPHardEval_reader_cfg = dict(input_columns=["prompt", "level"], output_column="q")
NPHardEval_infer_cfg = dict(
ice_template=dict(
type=PromptTemplate,
template=dict(
begin="</E>",
round=[
dict(role="HUMAN", prompt="</E>{prompt}"),
dict(role="BOT", prompt=""),
],
),
ice_token="</E>",
),
retriever=dict(type=ZeroRetriever),
inferencer=dict(type=GenInferencer),
)
NPHardEval_eval_cfg = dict(evaluator=dict(type=evaluator), pred_role="BOT")
NPHardEval_datasets.append(
dict(
type=dataset,
abbr=name,
path=f"./data/NPHardEval/{path_name}/",
reader_cfg=NPHardEval_reader_cfg,
infer_cfg=NPHardEval_infer_cfg,
eval_cfg=NPHardEval_eval_cfg,
)
)

View File

@ -0,0 +1,126 @@
# NPHardEval
## Introduction
The following introduction comes from the abstract in [NPHardEval: Dynamic Benchmark on Reasoning Ability of Large Language Models via Complexity Classes](https://arxiv.org/abs/2312.14890)
```
This benchmark is designed to evaluate the reasoning abilities of LLMs across a broad spectrum of 900 algorithmic questions, extending up to the NP-Hard complexity class. These questions are meticulously chosen to represent a wide range of complexity class below the NP-hard complexity class, offering a rigorous measure of the reasoning ability of LLMs.
```
## Official link
### Paper
[NPHardEval: Dynamic Benchmark on Reasoning Ability of Large Language Models via Complexity Classes](https://arxiv.org/abs/2312.14890)
### Repository
[NPHardEval](https://github.com/casmlab/NPHardEval)
## Examples
Input example I:
```
The Graph Coloring Problem (Decision Version, GCP-D) involves determining if it is possible to color the vertices of a graph using a given number of colors, ensuring no two adjacent vertices have the same color.
Find out if the vertices of a graph with 6 vertices can be colored using only 3 colors, such that no adjacent vertices share the same color.
Provide a yes or no answer, along with a concise explanation of your reasoning. Keep your explanation focused and brief.
Enclose your output in <root></root> tags. Include your reasoning in <reasoning></reasoning> tags and the final yes/no answer in <final_answer></final_answer> tags, like <final_answer>{'Feasible': 'YES_OR_NO'}</final_answer>.
The graph is below:
Vertex 2 is connected to vertex 4.
Vertex 2 is connected to vertex 5.
Vertex 3 is connected to vertex 5.
Vertex 4 is connected to vertex 5.
Vertex 5 is connected to vertex 6.
```
Output example I (from GPT-4):
```
<root>
<final_answer>{'Feasible': 'YES'}</final_answer>
<reasoning>Yes, it is possible to color the vertices of the given graph with 6 vertices using only 3 colors, such that no adjacent vertices share the same color. The graph does not contain any triangles or cycles of length greater than 3, which means it is a planar graph. By the Four Color Theorem, any planar graph can be colored using at most 4 colors. Since we have only 3 colors available, it is feasible to color the vertices without any adjacent vertices sharing the same color.</reasoning>
</root>
```
Input example II:
```
The Traveling Salesman Problem (Decision Version, TSP-D) focuses on determining if a salesman can complete a route, visiting each city at least once, with the total travel distance being less than a specified value.
Check if it's possible for a salesman to visit each of the 10 cities at least once and return to the starting city with the total distance less than 3481.5. The distances between each pair of cities are given.
Provide a yes or no answer, with a succinct explanation of your decision process. Focus on clarity and brevity in your response.
Enclose your output in <root></root> tags. Present your reasoning in <reasoning></reasoning> tags and the final yes/no answer in <final_answer></final_answer> tags, like <final_answer>{'Feasible': 'YES_OR_NO'}</final_answer>.
The distances between cities are below:
The distance between City 0 and City 1 is 58.0.
The distance between City 0 and City 2 is 171.0.
The distance between City 0 and City 3 is 122.0.
The distance between City 0 and City 4 is 109.0.
The distance between City 0 and City 5 is 93.0.
The distance between City 0 and City 6 is 106.0.
The distance between City 0 and City 7 is 52.0.
The distance between City 0 and City 8 is 115.0.
The distance between City 0 and City 9 is 148.0.
The distance between City 1 and City 2 is 145.0.
The distance between City 1 and City 3 is 71.0.
The distance between City 1 and City 4 is 114.0.
The distance between City 1 and City 5 is 69.0.
The distance between City 1 and City 6 is 163.0.
The distance between City 1 and City 7 is 132.0.
The distance between City 1 and City 8 is 183.0.
The distance between City 1 and City 9 is 115.0.
The distance between City 2 and City 3 is 107.0.
The distance between City 2 and City 4 is 163.0.
The distance between City 2 and City 5 is 63.0.
The distance between City 2 and City 6 is 91.0.
The distance between City 2 and City 7 is 108.0.
The distance between City 2 and City 8 is 109.0.
The distance between City 2 and City 9 is 77.0.
The distance between City 3 and City 4 is 119.0.
The distance between City 3 and City 5 is 131.0.
The distance between City 3 and City 6 is 79.0.
The distance between City 3 and City 7 is 159.0.
The distance between City 3 and City 8 is 27.0.
The distance between City 3 and City 9 is 64.0.
The distance between City 4 and City 5 is 85.0.
The distance between City 4 and City 6 is 110.0.
The distance between City 4 and City 7 is 71.0.
The distance between City 4 and City 8 is 61.0.
The distance between City 4 and City 9 is 61.0.
The distance between City 5 and City 6 is 73.0.
The distance between City 5 and City 7 is 132.0.
The distance between City 5 and City 8 is 82.0.
The distance between City 5 and City 9 is 96.0.
The distance between City 6 and City 7 is 156.0.
The distance between City 6 and City 8 is 103.0.
The distance between City 6 and City 9 is 69.0.
The distance between City 7 and City 8 is 108.0.
The distance between City 7 and City 9 is 97.0.
The distance between City 8 and City 9 is 75.0.
```
Output example II (from GPT-4):
```
<root> <reasoning> The Traveling Salesman Problem is a well-known problem in the field of computer science and operations research. It is a combinatorial optimization problem, which means it involves finding the best solution from a finite set of possible solutions. The problem is NP-hard, which means there is no known algorithm that can solve it in polynomial time. Therefore, to solve this problem, we would need to use an algorithm that can find the optimal solution, such as a brute force algorithm, a genetic algorithm, or a simulated annealing algorithm. However, these algorithms can be computationally expensive, especially for large problem sizes. In this case, we have 10 cities, which means there are 10! = 3,628,800 possible routes. Therefore, without the use of a computer, it would be impractical to manually check all possible routes to find the one with the total distance less than 3481.5. </reasoning> <final_answer>{'Feasible': 'UNKNOWN'}</final_answer> </root>
```
## Evaluation results
```
dataset version metric mode internlm2-chat-7b-hf
--------- --------- ----------------- ------ ----------------------
hard_GCP 144a59 Weighted Accuracy gen 1.64
hard_TSP 144a59 Weighted Accuracy gen 0
hard_MSP 144a59 Weighted Accuracy gen 0
cmp_GCP_D 144a59 Weighted Accuracy gen 43.82
cmp_TSP_D 144a59 Weighted Accuracy gen 40.18
cmp_KSP 144a59 Weighted Accuracy gen 0
p_BSP 144a59 Weighted Accuracy gen 40.36
p_EDP 144a59 Weighted Accuracy gen 0
p_SPP 144a59 Weighted Accuracy gen 0
```
## Reference
```
@article{fan2023nphardeval,
title={NPHardEval: Dynamic Benchmark on Reasoning Ability of Large Language Models via Complexity Classes},
author={Fan, Lizhou and Hua, Wenyue and Li, Lingyao and Ling, Haoyang and Zhang, Yongfeng and Hemphill, Libby},
journal={arXiv preprint arXiv:2312.14890},
year={2023}
}
```

View File

@ -0,0 +1,9 @@
from .cmp_GCP_D import * # noqa: F401, F403
from .cmp_KSP import * # noqa: F401, F403
from .cmp_TSP_D import * # noqa: F401, F403
from .hard_GCP import * # noqa: F401, F403
from .hard_MSP import * # noqa: F401, F403
from .hard_TSP import * # noqa: F401, F403
from .p_BSP import * # noqa: F401, F403
from .p_EDP import * # noqa: F401, F403
from .p_SPP import * # noqa: F401, F403

View File

@ -0,0 +1,161 @@
import ast
import networkx as nx
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import gcp_dPrompts
def q2text(q, p=gcp_dPrompts):
number_of_colors = q.split('\n')[0].split()[-2] # last character of the first line
number_of_vertices = q.split('\n')[1].split(' ')[2] # third word of the second line
prompt_text = p['Intro'] + '\n' + \
p['Initial_question'].format(total_vertices=number_of_vertices, number_of_colors=number_of_colors) + '\n' + \
p['Output_content'] + '\n' + \
p['Output_format'] + '\n' + \
'\n The graph is below: \n'
for line in q.split('\n')[2:]:
vertex_list = line.split(' ')
this_line = 'Vertex {} is connected to vertex {}.'.format(
vertex_list[1], vertex_list[2])
prompt_text += this_line + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class cmp_GCP_D_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
for file_num in range(10):
with open(data_path + 'decision_data_GCP_{}.txt'.format(file_num)) as f:
data = f.read()
sample = data.split('\n\n')[:-1]
all_data += zip([file_num + 1] * len(sample), sample)
for (level, q) in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + q,
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class cmp_GCP_D_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
details = {}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = q.split('####\n')[-1]
try:
number_of_colors = int(q.split('\n')[0].split()[-2])
output, reasoning = self.parse_xml_to_dict(output)
output_dict['output'] = output
output_dict['correctness'], _ = self.gcp_decision_check(q, output, number_of_colors)
except Exception as e:
print(f'Attempt failed: {e}')
output_dict['correctness'] = False
output_dict['reasoning'] = reasoning
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
details[str(index)] = {'q': q, 'output': output, 'result': r}
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
result['details'] = details
final_result = {'Weighted Accuracy': result['score']}
return final_result
def parse_xml_to_dict(self, xml_string):
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
assert '<reasoning>' in xml_string
assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip()
reasoning_element = xml_string[reasoning_start:reasoning_end].rstrip().strip().rstrip()
try:
final_answer_element = ast.literal_eval(final_answer_element)
except Exception:
final_answer_element = ''
except Exception:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element
def read_dimacs_format(self, dimacs_str):
lines = dimacs_str.strip().split('\n')
p_line = next(line for line in lines if line.startswith('p'))
_, _, num_vertices, num_edges = p_line.split()
num_vertices, num_edges = int(num_vertices), int(num_edges)
adjacency_list = {i: set() for i in range(1, num_vertices + 1)}
for line in lines:
if line.startswith('e'):
_, vertex1, vertex2 = line.split()
vertex1, vertex2 = int(vertex1), int(vertex2)
if vertex1 in adjacency_list and vertex2 in adjacency_list:
adjacency_list[vertex1].add(vertex2)
adjacency_list[vertex2].add(vertex1)
return num_vertices, adjacency_list
def gcp_greedy_solution(self, adjacency_list):
"""Provides a greedy solution to the GCP problem.
:param adjacency_list: A dictionary of the adjacency list.
:return: A tuple of (num_colors, coloring).
"""
G = nx.Graph()
G.add_nodes_from(adjacency_list.keys())
for vertex, neighbors in adjacency_list.items():
for neighbor in neighbors:
G.add_edge(vertex, neighbor)
coloring = nx.coloring.greedy_color(G, strategy='largest_first')
num_colors = max(coloring.values()) + 1
return num_colors, coloring
def gcp_decision_check(self, dimacs_str, answer, k_colors):
"""Check if the given GCP instance is feasible with k_colors.
:param dimacs_str: The DIMACS format string of the GCP instance.
:param answer: The answer returned by the model.
:param k_colors: The target number of colors.
:return: A tuple of (is_correct, message).
"""
num_vertices, adjacency_list = self.read_dimacs_format(dimacs_str)
try:
is_feasible = answer.get('Feasible', 'no').lower() == 'yes'
except Exception:
return False, 'Feasible key not found'
num_colors, coloring = self.gcp_greedy_solution(adjacency_list)
exist_optimal = num_colors <= k_colors
if is_feasible != exist_optimal:
if exist_optimal:
return False, f'Feasibility mismatch: {coloring}'
else:
return False, f'Feasibility mismatch: {is_feasible} vs {exist_optimal}'
return True, 'Feasible' if is_feasible else 'Infeasible'

View File

@ -0,0 +1,183 @@
import ast
import json
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import kspPrompts
def q2text(q, p=kspPrompts):
knapsack_capacity = q['knapsack_capacity']
items = q['items']
prompt_text = p['Intro'] + '\n' + \
p['Initial_question'].format(knapsack_capacity=knapsack_capacity) + '\n' + \
p['Output_content'] + '\n' + \
p['Output_format'] + \
'\n The items details are as below: \n'
for item in items:
this_line = f"Item {item['id']} has weight {item['weight']} and value {item['value']}."
prompt_text += this_line + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class cmp_KSP_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
with open(data_path + 'ksp_instances.json', 'r') as f:
data = json.load(f)
for sample in data:
level = len(sample['items']) - 3
all_data.append((level, sample))
for (level, q) in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + json.dumps(q),
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class cmp_KSP_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
details = {}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = json.loads(q.split('####\n')[-1])
try:
llm_string = q
output, reasoning = self.parse_xml_to_dict(llm_string)
output_dict['output'] = output
output_dict['correctness'], _ = self.kspCheck(q, output)
output_dict['reasoning'] = reasoning
output_dict['level'] = level
except Exception as e:
print(f'Attempt failed: {e}')
if output_dict:
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
else:
print(f'Failed to run {q}')
r = 'fail'
result[r] += level
details[str(index)] = {'q': q, 'output': output, 'result': r}
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
result['details'] = details
final_result = {'Weighted Accuracy': result['score']}
return final_result
def parse_xml_to_dict(self, xml_string):
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
assert '<reasoning>' in xml_string
assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip()
reasoning_element = xml_string[reasoning_start:reasoning_end].rstrip().strip().rstrip()
try:
final_answer_element = ast.literal_eval(final_answer_element)
except Exception:
final_answer_element = ''
except Exception:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element
def ksp_optimal_solution(self, knapsacks, capacity):
"""Provides the optimal solution for the KSP instance with dynamic
programming.
:param knapsacks: A dictionary of the knapsacks.
:param capacity: The capacity of the knapsack.
:return: The optimal value.
"""
# num_knapsacks = len(knapsacks)
# Create a one-dimensional array to store intermediate solutions
dp = [0] * (capacity + 1)
for itemId, (weight, value) in knapsacks.items():
for w in range(capacity, weight - 1, -1):
dp[w] = max(dp[w], value + dp[w - weight])
return dp[capacity]
# KSP
def kspCheck(self, instance, solution):
"""Validates the solution for the KSP instance.
:param instance: A dictionary of the KSP instance.
:param solution: A dictionary of the solution.
:return: A tuple of (is_correct, message).
"""
# Change string key to integer key and value to boolean
items = instance.get('items', [])
knapsacks = {
item['id']: (item['weight'], item['value'])
for item in items
}
ksp_optimal_value = self.ksp_optimal_solution(
knapsacks, instance['knapsack_capacity'])
try:
is_feasible = (solution.get('Feasible', '').lower() == 'yes')
except Exception:
return False, f'Output format is incorrect.'
if is_feasible != (ksp_optimal_value > 0):
return False, f'The solution is {is_feasible} but the optimal solution is {ksp_optimal_value > 0}.'
total_value = int(solution.get('TotalValue', -1))
selectedItems = list(map(int, solution.get('SelectedItemIds', [])))
if len(set(selectedItems)) != len(selectedItems):
return False, f'Duplicate items are selected.'
total_weight = 0
cum_value = 0
# Calculate total weight and value of selected items
for item in selectedItems:
if knapsacks.get(item, False):
weight, value = knapsacks[item]
total_weight += weight
cum_value += value
else:
return False, f'Item {item} does not exist.'
# Check if the item weight exceeds the knapsack capacity
if total_weight > instance['knapsack_capacity']:
return False, f"Total weight {total_weight} exceeds knapsack capacity {instance['knapsack_capacity']}."
if total_value != cum_value:
return False, f'The total value {total_value} does not match the cumulative value {cum_value} of the selected items.'
if total_value != ksp_optimal_value:
return False, f'The total value {total_value} does not match the optimal value {ksp_optimal_value}.'
return True, f'The solution is valid with total weight {total_weight} and total value {total_value}.'

View File

@ -0,0 +1,150 @@
import ast
import json
import networkx as nx
import pandas as pd
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import tsp_dPrompts
def q2text(adj_matrix, distance_limit, p=tsp_dPrompts):
total_cities = adj_matrix.shape[0] # exclude the last row
prompt_text = p['Intro'] + '\n' + \
p['Initial_question'].format(total_cities=total_cities, distance_limit=distance_limit) + '\n' + \
p['Output_content'] + '\n' + \
p['Output_format'] + '\n' + \
'The distances between cities are below: \n'
for i in range(adj_matrix.shape[0]):
for j in range(adj_matrix.shape[1]):
if i < j: # only use the upper triangle
this_line = 'The distance between City {} and City {} is {}.'.format(i, j, adj_matrix[i, j])
prompt_text += this_line + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class cmp_TSP_D_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
for level in range(10):
for file_num in range(10):
df = pd.read_csv(data_path + 'decision_data_TSP_level_{}_instance_{}.csv'.format(level, file_num + 1),
header=None,
index_col=False)
all_data.append((level + 1, df))
for (level, q) in all_data:
threshold = q.iloc[-1, 0] # therashold is the last row
distance_matrix = q.iloc[:
-1].values # distance matrix is the rest of the rows
prompt = q2text(distance_matrix, threshold)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + json.dumps(q.to_json()),
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class cmp_TSP_D_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
details = {}
tsp_d_Results = []
for index, (q, llm_string) in enumerate(zip(references, predictions)):
output_dict = {}
output, reasoning = self.parse_xml_to_dict(llm_string)
level = int(q.split('####\n')[0])
q = json.loads(q.split('####\n')[-1])
q = pd.DataFrame(eval(q))
threshold = q.iloc[-1, 0] # therashold is the last row
distance_matrix = q.iloc[:-1].values # distance matrix is the rest of the rows
output_dict['output'] = output
try:
output_dict['correctness'], _ = self.tsp_decision_check(distance_matrix, threshold, output)
except Exception as e:
print(f'Check failed: {e}')
output_dict['correctness'] = False
output_dict['reasoning'] = reasoning
output_dict['level'] = level
if output_dict:
tsp_d_Results.append(output_dict)
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
details[str(index)] = {'q': q, 'output': output, 'result': r}
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
result['details'] = details
final_result = {'Weighted Accuracy': result['score']}
return final_result
def parse_xml_to_dict(self, xml_string):
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
assert '<reasoning>' in xml_string
assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip()
reasoning_element = xml_string[reasoning_start:reasoning_end].rstrip().strip().rstrip()
try:
final_answer_element = ast.literal_eval(final_answer_element)
except Exception:
final_answer_element = ''
except Exception:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element
def tsp_approx(self, distance_matrix):
"""Returns an approximate solution to the TSP problem.
:param distance_matrix: A 2D numpy array representing the distance matrix.
:return: A list of the cities in the order they were visited.
"""
G = nx.from_numpy_array(distance_matrix)
return nx.approximation.traveling_salesman_problem(G)
def tsp_decision_check(self, distance_matrix, threshold, tour):
"""Checks if a given TSP tour is valid and within the threshold
distance.
:param distance_matrix: A 2D numpy array representing the distance matrix.
:param threshold: The maximum distance allowed.
:param tour: A dictionary containing the feasibility.
"""
try:
is_feasible = tour.get('Feasible', 'no').lower() == 'yes'
except Exception:
return False, 'Output format incorrect'
# Calculate the approxed distance of the tour
tours = self.tsp_approx(distance_matrix)
tour_distance = sum(distance_matrix[tours[i], tours[i + 1]] for i in range(len(tours) - 1)) + distance_matrix[tours[-1], tours[0]]
if is_feasible != (tour_distance <= threshold):
return False, f'Feasibility mismatch: {is_feasible} vs {tour_distance} > {threshold}'
return True, 'Feasible: {} <= {}'.format(tour_distance, threshold)

View File

@ -0,0 +1,189 @@
import ast
import xml.etree.ElementTree as ET
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import gcpPrompts
def q2text(q, p=gcpPrompts): # q is the data for the HP-hard question, p is the prompt
# print(q)
chromatic_number = q.split('\n')[0][-1] # last character of the first line
number_of_vertices = q.split('\n')[1].split(' ')[2] # third word of the second line
prompt_text = p['Intro'] + '\n' \
+ p['Initial_question'].format(max_vertices=number_of_vertices,max_colors=chromatic_number) + '\n' \
+ p['Output_content'] + '\n' \
+ p['Output_format'] + \
'\n The graph is below: \n'
for line in q.split('\n')[2:]:
vertex_list = line.split(' ')
this_line = 'Vertex {} is connected to vertex {}.'.format(vertex_list[1], vertex_list[2])
prompt_text += this_line + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class hard_GCP_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
for file_num in range(10):
with open(data_path + 'synthesized_data_GCP_{}.txt'.format(file_num)) as f:
data = f.read()
sample = data.split('\n\n')[:-1]
all_data += zip([file_num + 1] * len(sample), sample)
for (level, q) in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + q,
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class hard_GCP_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
details = {}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = q.split('####\n')[-1]
output_dict['output'] = output
try:
output_dict['correctness'] = self.gcpCheck(q, output)
except Exception as e:
print(f'Check failed: {e}')
output_dict['correctness'] = False
output_dict['level'] = level
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
details[str(index)] = {'q': q, 'output': output, 'result': r}
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
result['details'] = details
final_result = {'Weighted Accuracy': result['score']}
return final_result
def parse_xml_to_dict(self, xml_string):
try:
# Parse the XML string
root = ET.fromstring(xml_string)
# Find the 'final_answer' tag
final_answer_element = root.find('final_answer')
# Find the 'reasoning' tag
reasoning_element = root.find('reasoning')
except Exception:
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
assert '<reasoning>' in xml_string
assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end]
reasoning_element = xml_string[reasoning_start:reasoning_end]
except Exception:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element
def gcpCheck(self, dimacs_str, answer_str):
num_vertices, adjacency_list = self.read_dimacs_format(dimacs_str)
answer_colors = self.parse_answer(answer_str)
# print(adjacency_list)
# print(answer_colors)
# Check if all colors in the answer are valid
for vertex, neighbors in adjacency_list.items():
for neighbor in neighbors:
try:
if answer_colors[vertex] == answer_colors[neighbor]:
print(f'Invalid coloring: Vertex {vertex} and {neighbor} have the same color.')
return False
except:
print(f'Invalid input.') # dealing with hullucination
return False
print(f'Valid coloring found with {len(set(answer_colors.values()))} colors: {answer_colors}')
return True
def read_dimacs_format(self, dimacs_str):
lines = dimacs_str.strip().split('\n')
# Read the number of vertices and edges
p_line = next(line for line in lines if line.startswith('p'))
_, _, num_vertices, num_edges = p_line.split()
num_vertices, num_edges = int(num_vertices), int(num_edges)
# Create adjacency list
adjacency_list = {i: set() for i in range(1, num_vertices + 1)}
# Read the edges and ignore those that reference non-existing vertices
for line in lines:
if line.startswith('e'):
_, vertex1, vertex2 = line.split()
vertex1, vertex2 = int(vertex1), int(vertex2)
if vertex1 in adjacency_list and vertex2 in adjacency_list:
adjacency_list[vertex1].add(vertex2)
adjacency_list[vertex2].add(vertex1)
return num_vertices, adjacency_list
def parse_answer(self, llm_string):
# # Convert the answer string to a dictionary
# answer_dict = {}
# # Remove the braces and split the string by commas
# entries = answer_str.strip("}{").split(', ')
# for entry in entries:
# vertex, color = entry.split(':')
# answer_dict[int(vertex)] = color
# return answer_dict
all_answers, reasoning_element = self.parse_xml_to_dict(llm_string)
if all_answers == '':
return {}
elif all_answers is None:
return {}
else:
if isinstance(all_answers, str):
try:
all_answers = ast.literal_eval(all_answers)
except Exception:
try:
all_answers = ast.literal_eval('{' + all_answers + '}')
except Exception:
return {}
else:
all_answers = ast.literal_eval(all_answers.text)
# answer_dict = {}
# for pair in all_answers:
# vertex, color = pair.split(":")
# answer_dict[int(vertex)] = color
# convert key type to int
all_answers = {int(k): v for k, v in all_answers.items()}
return all_answers # answer_dict

View File

@ -0,0 +1,203 @@
import ast
import json
import xml.etree.ElementTree as ET
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import mspPrompts
def q2text(q, p=mspPrompts): # q is the data for the HP-hard question, p is the prompt
total_participants = q['participants']
total_timeslots = q['time_slots']
prompt_text = p['Intro'] + '\n' \
+ p['Initial_question'].format(total_participants=total_participants,total_timeslots=total_timeslots) + '\n' \
+ p['Output_content'] + '\n' \
+ p['Output_format'] + \
'\n The meetings and participants details are as below: \n'
meetings = q['meetings']
participants = q['participants']
for meeting in meetings:
this_line = 'Meeting {} is with duration {}.'.format(meeting['id'], meeting['duration'])
prompt_text += this_line + '\n'
for j in participants.keys():
this_line = 'Participant {} is available at time slots {} and has meetings {}.'.format(j, participants[j]['available_slots'], participants[j]['meetings'])
prompt_text += this_line + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class hard_MSP_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
with open(data_path + 'msp_instances.json', 'r') as f:
data = json.load(f)
all_data = zip([int(d['complexity_level']) for d in data], data)
for (level, q) in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + json.dumps(q),
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class hard_MSP_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = json.loads(q.split('####\n')[-1])
output_dict['output'] = output
output_dict['level'] = level
try:
output_dict['correctness'], _ = self.mspCheck(q, output)
except Exception as e:
print(f'Check failed: {e}')
output_dict['correctness'] = False
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
final_result = {'Weighted Accuracy': result['score']}
return final_result
def mspCheck(self, instance, llm_string):
"""Validate the MSP solution.
Parameters:
- instance: The MSP instance as a dictionary.
- solution: A dictionary with meeting ids as keys and lists of scheduled time slots as values.
Returns:
- A tuple (is_valid, message). is_valid is True if the solution is valid, False otherwise.
message contains information about the validity of the solution.
"""
# print(llm_string)
solution, reasoning_element = self.parse_xml_to_dict(llm_string)
# print(solution.text)
# convert solution to dictionary
if solution == '':
return False, None
elif solution is None:
return False, None
else:
if isinstance(solution, str):
try:
solution = ast.literal_eval(solution)
if solution is None:
return False, None
except Exception:
try:
solution = ast.literal_eval('{' + solution + '}')
if solution is None:
return False, None
except Exception:
return False, None
else:
try:
solution = ast.literal_eval(solution.text)
if solution is None:
return False, None
except Exception:
return False, None
# convert key type to int
if isinstance(solution, dict):
print(solution)
solution = {int(k): v for k, v in solution.items()}
else:
return False, None
# Check if all meetings are scheduled within the available time slots
for meeting in instance['meetings']:
m_id = meeting['id']
duration = meeting['duration']
scheduled_slots = solution.get(m_id, None)
# Check if the meeting is scheduled
if scheduled_slots is None:
return False, f'Meeting {m_id} is not scheduled.'
# Check if the meeting fits within the number of total time slots
if any(slot >= instance['time_slots'] for slot in scheduled_slots):
return False, f'Meeting {m_id} does not fit within the available time slots.'
# Check if the scheduled slots are contiguous and fit the meeting duration
if len(scheduled_slots) != duration or not all(scheduled_slots[i] + 1 == scheduled_slots[i + 1]
for i in range(len(scheduled_slots) - 1)):
return False, f'Meeting {m_id} is not scheduled in contiguous time slots fitting its duration.'
# Check if all participants are available at the scheduled time
for p_id, participant in instance['participants'].items():
if m_id in participant['meetings']:
if not all(slot in participant['available_slots'] for slot in scheduled_slots):
return False, f'Participant {p_id} is not available for meeting {m_id} at the scheduled time.'
# Check if any participant is double-booked
participants_schedule = {p_id: [] for p_id in instance['participants']}
for m_id, time_slots in solution.items():
try:
duration = next(meeting['duration'] for meeting in instance['meetings'] if meeting['id'] == m_id)
if len(time_slots) != duration:
return False, f'Meeting {m_id} duration does not match the number of scheduled time slots.'
for p_id, participant in instance['participants'].items():
if m_id in participant['meetings']:
participants_schedule[p_id].extend(time_slots)
except Exception:
return False, f'Meeting {m_id} is not in the instance or program error.'
for p_id, slots in participants_schedule.items():
if len(slots) != len(set(slots)):
return False, f'Participant {p_id} is double-booked.'
return True, 'The solution is valid.'
def parse_xml_to_dict(self, xml_string):
try:
# Parse the XML string
root = ET.fromstring(xml_string)
# Find the 'final_answer' tag
final_answer_element = root.find('final_answer')
# Find the 'reasoning' tag
reasoning_element = root.find('reasoning')
except:
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
assert '<reasoning>' in xml_string
assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end]
reasoning_element = xml_string[reasoning_start:reasoning_end]
except:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element

View File

@ -0,0 +1,211 @@
import ast
import json
import xml.etree.ElementTree as ET
import numpy as np
import pandas as pd
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import tspPrompts
def q2text(q, p=tspPrompts): # q is the data for the HP-hard question, p is the prompt
total_cities = q.shape[0]
prompt_text = p['Intro'] + '\n' \
+ p['Initial_question'].format(total_cities=total_cities) + '\n' \
+ p['Output_content'] + '\n' \
+ p['Output_format'] + \
'\n The distances between cities are below: \n'
for i in range(q.shape[0]):
for j in range(q.shape[1]):
if i < j: # only use the upper triangle
this_line = 'The path between City {} and City {} is with distance {}.'.format(i, j, q.iloc[i, j])
prompt_text += this_line + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class hard_TSP_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
for level in range(10):
for file_num in range(10):
# read np array
df = pd.read_csv(data_path + 'synthesized_data_TSP_level_{}_instance_{}.csv'.format(level, file_num + 1),
header=None,
index_col=False)
# transform df to
all_data.append((level + 1, df))
for (level, q) in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + json.dumps(q.to_json()),
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class hard_TSP_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = json.loads(q.split('####\n')[-1])
q = pd.DataFrame(eval(q))
output_dict['output'] = output
try:
output_dict['correctness'], _ = self.tspCheck(q, output)
except Exception as e:
print(f'Check failed: {e}')
output_dict['correctness'] = False
output_dict['level'] = level
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
final_result = {'Weighted Accuracy': result['score']}
return final_result
def parse_xml_to_dict(self, xml_string):
try:
# Parse the XML string
root = ET.fromstring(xml_string)
# Find the 'final_answer' tag
final_answer_element = root.find('final_answer')
# Find the 'reasoning' tag
reasoning_element = root.find('reasoning')
except:
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
assert '<reasoning>' in xml_string
assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end]
reasoning_element = xml_string[reasoning_start:reasoning_end]
except:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element
def tspCheck(self, distance_matrix, llm_string):
"""Check if the TSP solution is complete and if the distance matches
the greedy solution.
:param tour_string: String representing the TSP tour in the format "0->1->2->...->N->0"
:param distance_matrix: 2D numpy array representing the distances between cities
:return: Boolean indicating whether the tour is complete and matches the greedy distance
"""
# convert distance_matrix to numpy array
distance_matrix = np.array(distance_matrix)
# Convert the tour string to a list of integers
# print(llm_string)
final_answer_element, reasoning_element = self.parse_xml_to_dict(llm_string)
# convert solution to dictionary
if final_answer_element == '':
return False, ''
elif final_answer_element is None:
return False, ''
else:
if isinstance(final_answer_element, str):
try:
tour_string = ast.literal_eval(final_answer_element)['Path']
if tour_string is None:
return False, ''
except Exception:
try:
tour_string = ast.literal_eval('{' + final_answer_element + '}')['Path']
if tour_string is None:
return False, ''
except Exception:
return False, ''
else:
try:
tour_string = ast.literal_eval(final_answer_element.text)['Path']
if tour_string is None:
return False, ''
except Exception:
return False, ''
try:
tour = list(map(int, tour_string.split('->')))
except Exception:
return False, ''
# we could also prinpt `reasoning_element` to see the reasoning of the answer
# we could also print the final distance of the tour by `final_answer_element['Distance']`
# Check if tour is a cycle
if tour[0] != tour[-1]:
return False, 'The tour must start and end at the same city.'
# Check if all cities are visited
if len(tour) != len(distance_matrix) + 1:
return False, 'The tour does not visit all cities exactly once.'
# Calculate the distance of the provided tour
tour_distance = sum(distance_matrix[tour[i]][tour[i + 1]]
for i in range(len(tour) - 1))
# Find the greedy tour distance for comparison
greedy_tour, greedy_distance = self.greedy_tsp(distance_matrix)
# Check if the provided tour distance is equal to the greedy tour distance
if tour_distance != greedy_distance:
return False, f'The tour distance ({tour_distance}) does not match the greedy solution ({greedy_distance}).'
return True, 'The solution is complete and matches the greedy solution distance.'
def greedy_tsp(self, distance_matrix):
"""Solve the Traveling Salesman Problem using a greedy algorithm.
:param distance_matrix: 2D numpy array where the element at [i, j] is the distance between city i and j
:return: A tuple containing a list of the cities in the order they were visited and the total distance
"""
num_cities = distance_matrix.shape[0]
unvisited_cities = set(range(num_cities))
current_city = np.random.choice(list(unvisited_cities))
tour = [current_city]
total_distance = 0
while unvisited_cities:
unvisited_cities.remove(current_city)
if unvisited_cities:
# Find the nearest unvisited city
distances_to_unvisited = distance_matrix[current_city][list(unvisited_cities)]
nearest_city = list(unvisited_cities)[np.argmin(distances_to_unvisited)]
tour.append(nearest_city)
# Update the total distance
total_distance += distance_matrix[current_city, nearest_city]
current_city = nearest_city
# Return to start
total_distance += distance_matrix[current_city, tour[0]]
tour.append(tour[0])
return tour, total_distance

View File

@ -0,0 +1,124 @@
import ast
import json
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import bspPrompts
def q2text(q, p=bspPrompts):
target_value = q['target']
# TO-DO: fix data not being sorted
array = sorted(q['array'])
prompt_text = p['Intro'] + '\n' + \
p['Initial_question'].format(target_value=target_value) + '\n' + \
p['Output_content'] + '\n' + \
p['Output_format'] + \
'\n The sorted array elements are: ' + ', '.join(map(str, array)) + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class p_BSP_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data, newdata = [], []
with open(data_path + 'bsp_instances.json', 'r') as f:
data = json.load(f)
for sample in data:
level = len(sample['array']) - 2
all_data.append((level, sample))
for level, q in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + json.dumps(q),
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class p_BSP_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = json.loads(q.split('####\n')[-1])
output, reasoning = self.parse_xml_to_dict(output)
output_dict['output'] = output
try:
output_dict['correctness'], _ = self.bsp_check(q, output)
except Exception as e:
print(f'Check failed: {e}')
output_dict['correctness'] = False
output_dict['reasoning'] = reasoning
output_dict['level'] = level
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
final_result = {'Weighted Accuracy': result['score']}
return final_result
def parse_xml_to_dict(self, xml_string):
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
assert '<reasoning>' in xml_string
assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip()
reasoning_element = xml_string[reasoning_start:reasoning_end].rstrip().strip().rstrip()
try:
final_answer_element = ast.literal_eval(final_answer_element)
except Exception:
final_answer_element = ''
except Exception:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element
def bsp_check(self, instance, solution):
"""Check if the binary search solution is valid.
:param instance: The instance dictionary with array and target value.
:param solution: The solution dictionary with the position of the target value.
:return: A tuple of (is_correct, message).
"""
array = sorted(instance['array'])
target_value = instance['target']
solution, reasoning = self.parse_xml_to_dict(solution)
if isinstance(solution, str):
return False, f'The solution is invalid.'
try:
position = int(solution['Position'])
except Exception:
return False, f'The solution is invalid.'
if position == -1 or position >= len(array):
return False, f'The solution is invalid.'
elif array[position] != target_value:
return False, f'The target index is incorrect.'
return True, 'The solution is valid.'

View File

@ -0,0 +1,145 @@
import ast
import json
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import edpPrompts
def q2text(q, p=edpPrompts):
string_a = q['string_a']
string_b = q['string_b']
prompt_text = p['Intro'] + '\n' + \
p['Initial_question'].format(string_a=string_a, string_b=string_b) + '\n' + \
p['Output_content'] + '\n' + \
p['Output_format']
return prompt_text
@LOAD_DATASET.register_module(force=True)
class p_EDP_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
with open(data_path + 'edp_instances.json', 'r') as f:
data = json.load(f)
for sample in data:
level = len(sample['string_a']) - 2
all_data.append((level, sample))
for level, q in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + json.dumps(q),
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class p_EDP_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = json.loads(q.split('####\n')[-1])
output, reasoning = self.parse_xml_to_dict(output)
output_dict['output'] = output
try:
output_dict['correctness'], _ = self.edp_check(q, output)
except Exception as e:
print(f'Check failed: {e}')
output_dict['correctness'] = False
output_dict['reasoning'] = reasoning
output_dict['level'] = level
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
final_result = {'Weighted Accuracy': result['score']}
return final_result
def compute_min_edit_distance(self, string_a, string_b):
"""Computes the minimum edit distance between two strings using dynamic
programming."""
m, n = len(string_a), len(string_b)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(m + 1):
for j in range(n + 1):
if i == 0:
dp[i][j] = j
elif j == 0:
dp[i][j] = i
elif string_a[i - 1] == string_b[j - 1]:
dp[i][j] = dp[i - 1][j - 1]
else:
dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1])
return dp[m][n]
def edp_check(self, instance, solution):
"""Check if the edit distance solution is valid.
:param instance: The instance dictionary with 'string_a' and 'string_b'.
:param solution: The solution dictionary with the reported 'edit_distance'.
:return: A tuple of (is_correct, message).
"""
string_a = instance['string_a']
string_b = instance['string_b']
try:
reported_distance = int(solution.get('Operations', -1))
except Exception:
reported_distance = -1
actual_distance = self.compute_min_edit_distance(string_a, string_b)
if reported_distance == -1:
return False, 'No solution provided.'
elif reported_distance != actual_distance:
return False, f'The reported edit distance ({reported_distance}) is incorrect. Actual distance: {actual_distance}.'
return True, 'The solution is valid.'
def parse_xml_to_dict(self, xml_string):
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
# assert '<reasoning>' in xml_string
# assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
# reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
# reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip()
assert '{' in final_answer_element
assert '}' in final_answer_element
dic_start = final_answer_element.index('{')
dic_end = final_answer_element.index('}')
final_answer_element = final_answer_element[dic_start:dic_end + 1].rstrip().strip().rstrip()
reasoning_element = xml_string
# reasoning_element = xml_string[reasoning_start:reasoning_end].rstrip().strip().rstrip()
try:
final_answer_element = ast.literal_eval(final_answer_element)
except Exception:
final_answer_element = ''
except Exception:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element

View File

@ -0,0 +1,196 @@
import ast
import json
import networkx as nx
from datasets import Dataset
from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET
from ..base import BaseDataset
from .prompts import sppPrompts
def q2text(q, p=sppPrompts):
# start_node = q['start_node']
# end_node = q['end_node']
# TO-DO: fix later
start_node = q['nodes'][0]
end_node = q['nodes'][-1]
edges = q['edges']
prompt_text = p['Intro'] + '\n' + \
p['Initial_question'].format(start_node=start_node, end_node=end_node) + '\n' + \
p['Output_content'] + '\n' + \
p['Output_format'] + \
"\n The graph's edges and weights are as follows: \n"
for edge in edges:
this_line = f"Edge from {edge['from']} to {edge['to']} has a weight of {edge['weight']}."
prompt_text += this_line + '\n'
return prompt_text
@LOAD_DATASET.register_module(force=True)
class p_SPP_Dataset(BaseDataset):
@staticmethod
def load(path: str):
raw_data = []
data_path = path
all_data = []
with open(data_path + 'spp_instances.json', 'r') as f:
data = json.load(f)
all_data = zip([int(d['complexity_level']) for d in data], data)
for level, q in all_data:
prompt = q2text(q)
raw_data.append({
'prompt': prompt,
'q': str(level) + '####\n' + json.dumps(q),
'level': level
})
dataset = Dataset.from_list(raw_data)
return dataset
@ICL_EVALUATORS.register_module(force=True)
class p_SPP_Evaluator(BaseEvaluator):
def score(self, predictions, references):
assert len(predictions) == len(references)
result = {'pass': 0, 'fail': 0}
for index, (q, output) in enumerate(zip(references, predictions)):
output_dict = {}
level = int(q.split('####\n')[0])
q = json.loads(q.split('####\n')[-1])
output, reasoning = self.parse_xml_to_dict(output)
output_dict['output'] = output
try:
output_dict['correctness'], _ = self.spp_check(q, output)
except Exception as e:
print(f'Check failed: {e}')
output_dict['correctness'] = False
output_dict['reasoning'] = reasoning
output_dict['level'] = level
if output_dict['correctness']:
r = 'pass'
else:
r = 'fail'
result[r] += level
result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
final_result = {'Weighted Accuracy': result['score']}
return final_result
def parse_xml_to_dict(self, xml_string):
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
# assert '<reasoning>' in xml_string
# assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
# reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
# reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip()
assert '{' in final_answer_element
assert '}' in final_answer_element
dic_start = final_answer_element.index('{')
dic_end = final_answer_element.index('}')
final_answer_element = final_answer_element[dic_start:dic_end + 1].rstrip().strip().rstrip()
# reasoning_element = xml_string[reasoning_start:reasoning_end].rstrip().strip().rstrip()
try:
final_answer_element = ast.literal_eval(final_answer_element)
reasoning_element = xml_string
except Exception:
final_answer_element = ''
reasoning_element = xml_string
except Exception:
final_answer_element = ''
reasoning_element = ''
return final_answer_element, reasoning_element
def ssp_optimal_solution(self, instance, source, target):
"""Provides the optimal solution for the SSP instance.
:param instance: The SSP instance as a dictionary with 'nodes' and 'edges'.
:param source: The source node.
:param target: The destination node.
:return: The optimal shortest path length and path.
"""
G = nx.Graph()
G.add_nodes_from(instance['nodes'])
G.add_weighted_edges_from([(edge['from'], edge['to'], edge['weight'])
for edge in instance['edges']])
shortest_path_length = None
shortest_path = None
if nx.has_path(G, source=source, target=target):
shortest_path_length = nx.shortest_path_length(G, source=source, target=target, weight='weight')
shortest_path = nx.shortest_path(G, source=source, target=target, weight='weight')
return shortest_path_length, shortest_path
# SPP
def spp_check(self, instance, solution, start_node=None, end_node=None):
"""Validate the solution of the SPP problem.
:param instance: The instance dictionary with nodes and edges.
:param solution: The solution dictionary with the path and total distance.
:param start_node: The start node.
:param end_node: The end node.
:return: A tuple of (is_correct, message).
"""
# Get the start and end nodes
# Currently, the start and end nodes are the first and last nodes in the instance
if start_node is None:
start_node = instance['nodes'][0]
if end_node is None:
end_node = instance['nodes'][-1]
# Convert solution to dictionary
try:
path_string = solution.get('Path', '')
cost_string = solution.get('TotalDistance', '')
except Exception:
return False, 'The solution is not a dictionary.'
# Calculate the optimal solution
ssp_optimal_length, ssp_optimal_path = self.ssp_optimal_solution(
instance, start_node, end_node)
if ssp_optimal_length is None:
if isinstance(cost_string, int) or cost_string.isdigit():
return False, f'No path between from node {start_node} to node {end_node}.'
else:
return True, 'No path found from node {start_node} to node {end_node}.'
try:
path = list(map(int, path_string.split('->')))
total_cost = int(cost_string)
except Exception:
return False, 'The solution is not a valid dictionary.'
# Check if path starts and ends with the correct nodes
if not path or path[0] != start_node or path[-1] != end_node:
return False, 'The path does not start or end at the correct nodes.'
# Check if the path is continuous and calculate the cost
calculated_cost = 0
is_in_edge = lambda edge, from_node, to_node: (edge['from'] == from_node and edge['to'] == to_node) or (edge['from'] == to_node and edge['to'] == from_node)
for i in range(len(path) - 1):
from_node, to_node = path[i], path[i + 1]
edge = next((edge for edge in instance['edges'] if is_in_edge(edge, from_node, to_node)), None)
if not edge:
return False, f'No edge found from node {from_node} to node {to_node}.'
calculated_cost += edge['weight']
# Check if the calculated cost matches the total cost provided in the solution
if calculated_cost != total_cost:
return False, f'The calculated cost ({calculated_cost}) does not match the provided total cost ({total_cost}).'
if calculated_cost != ssp_optimal_length:
# spp_optimal_path = '->'.join(map(str, ssp_optimal_path))
return False, f'The calculated cost ({calculated_cost}) does not match the optimal solution ({ssp_optimal_length}): {ssp_optimal_path}.'
return True, 'The solution is valid.'

View File

@ -0,0 +1,96 @@
# Overall fewshot prompts
FEW_SHOT_SELF = 'Please refer to a few examples of this problem and the corresponding reasoning process. The examples are:'
FEW_SHOT_OTHERS = 'Please refer to a few examples of another problem and the corresponding reasoning process. The problem is {initial_question}. {output_content}. The examples are:'
# P problems
sppPrompts = {
'Intro': 'The Shortest Path Problem (SPP) involves finding the shortest path between two nodes in a weighted graph.',
'Initial_question': "You need to find the shortest path between node {start_node} and node {end_node} in a graph. The graph's edges and their weights are given.",
'Output_content': 'Please provide the shortest path from {start_node} to {end_node} and its total distance. Offer a concise step-by-step explanation of your reasoning process. Aim for brevity and clarity in your response.',
'Output_format': "Your output should be enclosed within <root></root> tags. Include your reasoning in <reasoning></reasoning> tags and the final path and total distance in <final_answer></final_answer> tags, like <final_answer>{'Path': 'START->...->END', 'TotalDistance': 'INT_TOTAL_DISTANCE'}</final_answer>.",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
mfpPrompts = {
'Intro': 'The Maximum Flow Problem (MFP) seeks to find the maximum possible flow from a source node to a sink node in a flow network, subject to capacity constraints on the edges.',
'Initial_question': 'Determine the maximum flow from the source node {source_node} to the sink node {sink_node} in the given flow network. The capacities of the edges are provided.',
'Output_content': 'Please indicate the maximum flow value and the flow for each edge. Provide a brief explanation of your methodology. Keep your response concise and focused.',
'Output_format': "Enclose your output within <root></root> tags. Present your reasoning in <reasoning></reasoning> tags and the final maximum flow and edge flows in <final_answer></final_answer> tags, like <final_answer>{'MaxFlow': 'MAX_FLOW_VALUE', 'Flows': {'NODE_1->NODE_2': FLOW, ...}}</final_answer>.",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
bspPrompts = {
'Intro': 'The Binary Search Problem (BSP) deals with finding the position of a target value within a sorted array using a binary search algorithm, which efficiently narrows down the search range.',
'Initial_question': 'Find the position of the target value {target_value} in the sorted array. The index begins with 0. The array elements are provided.',
'Output_content': 'Please identify the position of the target value in the array. Offer a brief, step-by-step account of your search process. Aim for conciseness in your response.',
'Output_format': "Your output should be enclosed in <root></root> tags. Include your search process in <reasoning></reasoning> tags and the final position of the target value in <final_answer></final_answer> tags, like <final_answer>{'Position': 'TARGET_POSITION'}</final_answer>.",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
edpPrompts = {
'Intro': 'The Edit Distance Problem (EDP) involves finding the minimum number of operations required to transform one string into another, where each operation is either an insertion, deletion, or substitution of a single character.',
'Initial_question': 'Find the minimum number of operations required to transform the first string {string_a} into the second string {string_b}. The operations are insertion, deletion, and substitution of a single character, each requiring 1 edit operation.',
'Output_content': 'Please provide the minimum number of operations required to transform the first string into the second string. Offer a brief explanation of your methodology. Keep your response concise and focused.',
'Output_format': "Enclose your output within <root></root> tags. Present your reasoning in <reasoning></reasoning> tags and the final minimum number of operations in <final_answer></final_answer> tags, like <final_answer>{'Operations': 'MINIMUM_NUMBER_OF_OPERATIONS'}</final_answer>.",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
# NP-complete problems
tsp_dPrompts = {
'Intro': 'The Traveling Salesman Problem (Decision Version, TSP-D) focuses on determining if a salesman can complete a route, visiting each city at least once, with the total travel distance being less than a specified value.',
'Initial_question': "Check if it's possible for a salesman to visit each of the {total_cities} cities at least once and return to the starting city with the total distance less than {distance_limit}. The distances between each pair of cities are given.",
'Output_content': 'Provide a yes or no answer, with a succinct explanation of your decision process. Focus on clarity and brevity in your response.',
'Output_format': "Enclose your output in <root></root> tags. Present your reasoning in <reasoning></reasoning> tags and the final yes/no answer in <final_answer></final_answer> tags, like <final_answer>{'Feasible': 'YES_OR_NO'}</final_answer>.",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
gcp_dPrompts = {
'Intro': 'The Graph Coloring Problem (Decision Version, GCP-D) involves determining if it is possible to color the vertices of a graph using a given number of colors, ensuring no two adjacent vertices have the same color.',
'Initial_question': 'Find out if the vertices of a graph with {total_vertices} vertices can be colored using only {number_of_colors} colors, such that no adjacent vertices share the same color.',
'Output_content': 'Provide a yes or no answer, along with a concise explanation of your reasoning. Keep your explanation focused and brief.',
'Output_format': "Enclose your output in <root></root> tags. Include your reasoning in <reasoning></reasoning> tags and the final yes/no answer in <final_answer></final_answer> tags, like <final_answer>{'Feasible': 'YES_OR_NO'}</final_answer>.",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
kspPrompts = {
'Intro': 'The 0-1 Knapsack Problem (KSP) asks whether a subset of items, each with a given weight and value, can be chosen to fit into a knapsack of fixed capacity, maximizing the total value without exceeding the capacity.',
'Initial_question': 'Determine if a subset of items can be selected to fit into a knapsack with a capacity of {knapsack_capacity}, maximizing value without exceeding the capacity. Item weights and values are provided.',
'Output_content': 'Indicate if an optimal subset exists and its total value. Offer a concise explanation of your selection process. Aim for clarity and brevity in your response.',
'Output_format': "Your output should be enclosed within <root></root> tags. Include your selection process in <reasoning></reasoning> tags and the final decision and total value in <final_answer></final_answer> tags, like <final_answer>{'Feasible': 'YES_OR_NO', 'TotalValue': 'TOTAL_VALUE', 'SelectedItemIds': [0, 1]}</final_answer>.",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
# NP-hard problems
tspPrompts = {
'Intro': 'The traveling salesman problem (TSP) is a classic optimization problem that aims to find the shortest possible route that visits a set of cities, with each city being visited exactly once and the route returning to the original city.',
'Initial_question': 'You must find the shortest path that visits all {total_cities} cities, labelled from 1 to {total_cities}. The distances between each pair of cities are provided.',
'Output_content': 'Please list each city in the order they are visited. Provide the total distance of the trip. You should also provide very short step by step reasoning. Do not use multiple lines and try your best to save output tokens.',
'Output_format': "Your output should contain two parts enclosed by <root></root>. First, your step by step reasoning like <reasoning>The reasoning process</reasoning>. Second, the final output of the result path and total distance wrapped by final_answer tag, like <final_answer>{'Path': '0->1->2->...->N->0', 'TotalDistance': 'INT_TOTAL_DISTANCE'}</final_answer>",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
gcpPrompts = {
'Intro': 'Graph coloring refers to the problem of coloring vertices of a graph in such a way that no two adjacent vertices have the same color. ',
'Initial_question': 'There are {max_vertices} vertices 1 to {max_vertices} in a graph. You may use {max_colors} colors with alphabats from A, B, C,... to color the graph.',
'Output_content': "Please label every vertex, even if it is disconnected from the rest of the graph. Please provide each vertex's color. Do not skip any vertices. You should also provide very short step by step reasoning. Do not use multiple lines and try your best to save output tokens.",
'Output_format': "Your output should contain two parts enclosed by <root></root>. First, your step by step reasoning wrapped by <reasoning></reasoning>. Second, the final output of all vertex numbers and their associated colors, wrapped by final_answer tag, like <final_answer>{0:'COLOR_1', 1:'COLOR_2', ...}</final_answer>.",
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}
mspPrompts = {
'Intro': 'The meeting scheduling problem (MSP) is a type of constraint satisfaction problem where the goal is to find a suitable time slot for a meeting that all participants can attend without conflicts in their schedules.',
'Initial_question': "There are {total_participants} participants with their available time slots. There are {total_timeslots} consecutive non-overlapping time slots. Let's assume all meetings has duration of 1.",
'Output_content': 'Please provide a time slot where all participants can attend the meeting. You should also provide very short step by step reasoning. Do not use multiple lines and try your best to save output tokens.',
'Output_format': 'Your output should contain two parts enclosed by <root></root>. First, your step by step reasoning wrapped by <reasoning></reasoning>. Second, the final output of meeting numbers followed by a list of slots, like <final_answer>{0:[1,2], 1:[4], ...}</final_answer>.',
'Few_shot_self': FEW_SHOT_SELF,
'Few_shot_others': FEW_SHOT_OTHERS
}

View File

@ -0,0 +1,43 @@
import ast
import xml.etree.ElementTree as ET
def append_root_tags(string):
if not string.strip().startswith('<root>'):
string = '<root>\n' + string
if not string.strip().endswith('</root>'):
string += '\n</root>'
return string
def parse_xml_to_dict(xml_string):
final_answer_element = ''
reasoning_element = ''
try:
# Parse the XML string
root = ET.fromstring(xml_string)
# Find the 'final_answer' tag
final_answer_element = root.find('final_answer').text
# Find the 'reasoning' tag
reasoning_element = root.find('reasoning').text
except Exception:
try:
assert '<final_answer>' in xml_string
assert '</final_answer>' in xml_string
assert '<reasoning>' in xml_string
assert '</reasoning>' in xml_string
final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
final_answer_end = xml_string.index('</final_answer>')
reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
reasoning_end = xml_string.index('</reasoning>')
final_answer_element = xml_string[final_answer_start:final_answer_end]
reasoning_element = xml_string[reasoning_start:reasoning_end]
except Exception:
final_answer_element = ''
reasoning_element = ''
final_answer_element = ast.literal_eval(final_answer_element.strip())
return final_answer_element, reasoning_element

View File

@ -70,6 +70,7 @@ from .multirc import * # noqa: F401, F403
from .narrativeqa import * # noqa: F401, F403
from .natural_question import * # noqa: F401, F403
from .natural_question_cn import * # noqa: F401, F403
from .NPHardEval import * # noqa: F401, F403
from .obqa import * # noqa: F401, F403
from .piqa import * # noqa: F401, F403
from .py150 import * # noqa: F401, F403