OpenCompass/opencompass/models/turbomind.py
2024-05-21 14:22:46 +08:00

237 lines
9.2 KiB
Python

import copy
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Union
import numpy as np
from opencompass.models.base import BaseModel
from opencompass.utils.logging import get_logger
from opencompass.utils.prompt import PromptList
PromptType = Union[PromptList, str]
def valid_str(string, coding='utf-8'):
"""decode text according to its encoding type."""
invalid_chars = [b'\xef\xbf\xbd']
bstr = bytes(string, coding)
for invalid_char in invalid_chars:
bstr = bstr.replace(invalid_char, b'')
ret = bstr.decode(encoding=coding, errors='ignore')
return ret
class TurboMindModel(BaseModel):
"""Model wrapper for TurboMind Python API.
Args:
path (str): path of the turbomind model
concurrency (int): the maximum allowed concurrency of turbomind.
max_seq_len (int): The maximum allowed sequence length of a model.
Note that the length of prompt + generated tokens shall not exceed
this value. Defaults to 2048.
meta_template (Dict, optional): The model's meta prompt
template if needed, in case the requirement of injecting or
wrapping of any meta instructions.
engine_config (Dict, optional): The engine config to set
arguments like session_len, max_batch_size for TurboMind.
gen_config (Dict, optional): Generation config to set
arguments like top_k, top_p, temperature.
end_str (str, optional): Whether to trim generated strings with end_str
if the model has special ending strings that are not handled well.
Defaults to None.
"""
def __init__(self,
path: str,
concurrency: int = 8,
max_seq_len: int = 2048,
meta_template: Optional[Dict] = None,
engine_config: Dict = {},
gen_config: Dict = {},
end_str: Optional[str] = None):
super().__init__(path=path,
max_seq_len=max_seq_len,
meta_template=meta_template)
from lmdeploy.turbomind import TurboMind
from lmdeploy.version import version_info
if engine_config is not None:
from lmdeploy.messages import TurbomindEngineConfig
engine_config = TurbomindEngineConfig(**engine_config)
self.logger = get_logger()
tm_model = TurboMind.from_pretrained(path, engine_config=engine_config)
self.tokenizer = tm_model.tokenizer
self.generators = [
tm_model.create_instance() for i in range(concurrency)
]
self.generator_ids = [i + 1 for i in range(concurrency)]
self.gen_config = gen_config
self.major_version, self.minor_version, _ = version_info
self.end_str = end_str
def generate(self,
inputs: List[str],
max_out_len: int = 512,
stopping_criteria: List[str] = [],
do_sample: Optional[bool] = None,
temperature: int = 1,
**kwargs) -> List[str]:
"""Generate results given a list of inputs.
Args:
inputs (List[str]): A list of prompts
max_out_len (int): The maximum length of the output.
Returns:
List[str]: A list of generated strings.
"""
assert isinstance(
inputs, List), f'List(str) is expected, but got {type(inputs)}'
# split inputs into batches
batch_size = len(self.generators)
batch_inputs = [
inputs[i:i + batch_size] for i in range(0, len(inputs), batch_size)
]
gen_config = copy.deepcopy(self.gen_config)
if do_sample is not None:
if do_sample:
gen_config['top_k'] = 1000
gen_config['temperature'] = temperature
else:
gen_config['top_k'] = 1
if stopping_criteria:
stop_words = gen_config.get('stop_words', [])
for t in stopping_criteria:
t = self.tokenizer.encode(t, add_bos=False)
stop_words.append(t[0])
gen_config['stop_words'] = list(set(stop_words))
gen_config.setdefault('min_new_tokens', 1)
from lmdeploy.messages import EngineGenerationConfig
gen_config = EngineGenerationConfig(**gen_config)
results = []
for batch_input in batch_inputs:
with ThreadPoolExecutor() as executor:
_results = list(
executor.map(
self._generate,
self.generators[:len(batch_input)],
self.generator_ids[:len(batch_input)],
batch_input,
[max_out_len] * len(batch_input),
[gen_config] * len(batch_input),
[self.end_str] * len(batch_input),
))
results += _results
if stopping_criteria:
for s in stopping_criteria:
results = [r.split(s)[0] for r in results]
return results
def get_token_len(self, prompt: str) -> int:
input_ids = self.tokenizer.encode(prompt)
return len(input_ids)
def wait(self):
"""Wait till the next query can be sent.
Applicable in both single-thread and multi-thread environments.
"""
return self.token_bucket.get_token()
def _generate(self,
generator,
session_id,
prompt: PromptType,
max_out_len: int,
gen_config=None,
end_str: Optional[str] = None) -> str:
"""Generate results given a list of inputs.
Args:
prompt (PromptType): A string or PromptDict.
The PromptDict should be organized in OpenCompass'
API format.
max_out_len (int): The maximum length of the output.
gen_config (EngineGenerationConfig, optional): Generation
config to set arguments like top_k, top_p, temperature.
end_str (str, optional): Whether to trim generated strings
with end_str if the model has special ending strings
that are not handled well.
Defaults to None.
Returns:
str: The generated string.
"""
assert type(
prompt) is str, 'We only support string for TurboMind Python API'
input_ids = self.tokenizer.encode(prompt)
for outputs in generator.stream_infer(session_id=session_id,
input_ids=[input_ids],
gen_config=gen_config,
request_output_len=max_out_len,
sequence_start=True,
sequence_end=True,
step=0,
stream_output=False):
if self.major_version >= 0 and self.minor_version >= 4:
output_ids = outputs.token_ids
else:
_, output_ids, _ = outputs
response = self.tokenizer.decode(output_ids)
response = valid_str(response)
# used to trim
if end_str:
response = response.split(end_str)[0]
return response
def get_ppl(self,
inputs: List[str],
mask_length: Optional[List[int]] = None) -> List[float]:
"""Get perplexity scores given a list of inputs.
Args:
inputs (List[str]): A list of strings.
mask_length (Optional[List[int]]): A list of mask lengths. If
provided, the perplexity scores will be calculated with the
first mask_length[i] tokens masked out. It's okay to skip
its implementation if advanced features in PPLInfernecer is
not needed.
Returns:
np.ndarray: The perplexity scores in shape of (N,)
"""
assert isinstance(
inputs, List), f'List(str) is expected, but got {type(inputs)}'
results = []
for text in inputs:
input_ids = self.tokenizer.encode(text)
res = self.generators[0].get_ppl(input_ids)
results.append(res)
results = np.concatenate(results)
return results
def get_loglikelihood(
self,
inputs: List[str],
conts: List[str],
mask_length: Optional[List[int]] = None) -> List[float]:
assert isinstance(
inputs, List), f'List(str) is expected, but got {type(inputs)}'
results = []
for text, cont in zip(inputs, conts):
input_ids = self.tokenizer.encode(text)
res = self.generators[0].get_ppl(input_ids)
logit_sum = res * len(input_ids)
input_ids = self.tokenizer.encode(text.replace(cont, ''))
res = self.generators[0].get_ppl(input_ids)
logit_part = res * len(input_ids)
results.append(-(logit_sum - logit_part))
results = np.concatenate(results)
return results