# flake8: noqa # yapf: disable from typing import Dict, List, Optional import numpy as np from opencompass.models.base import BaseModel from opencompass.utils import get_logger from .huggingface_above_v4_33 import (_convert_chat_messages, _format_with_fast_chat_template, _get_meta_template, _get_possible_max_seq_len) try: from vllm import LLM, SamplingParams except ImportError: LLM, SamplingParams = None, None class VLLMwithChatTemplate(BaseModel): def __init__( self, path: str, model_kwargs: dict = dict(), tokenizer_only: bool = False, generation_kwargs: dict = dict(), max_seq_len: int = None, meta_template: Optional[Dict] = None, fastchat_template: Optional[str] = None, stop_words: List[str] = [], ): assert LLM, ('Please install VLLM with `pip install vllm`. note: torch==2.1.2 is required.') self.logger = get_logger() self.path = path self.tokenizer_only = tokenizer_only self.template_parser = _get_meta_template(meta_template) self.max_seq_len = _get_possible_max_seq_len(max_seq_len, path) if tokenizer_only: from transformers import AutoTokenizer self.tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True) else: self._load_model(path, model_kwargs) self.tokenizer = self.model.get_tokenizer() self.generation_kwargs = generation_kwargs self.generation_kwargs.pop('do_sample', None) self.fastchat_template = fastchat_template self.stop_words = list(set(stop_words + self._get_potential_stop_words(path))) def _load_model(self, path: str, added_model_kwargs: dict = dict()): import ray if ray.is_initialized(): self.logger.info('shutdown ray instance to avoid "Calling ray.init() again" error.') ray.shutdown() DEFAULT_MODEL_KWARGS = dict(trust_remote_code=True) model_kwargs = DEFAULT_MODEL_KWARGS.copy() model_kwargs.update(added_model_kwargs) self.model = LLM(path, **model_kwargs) def _get_potential_stop_words(self, path: Optional[str]): from transformers import GenerationConfig potential_stop_words = [] try: generation_config = GenerationConfig.from_pretrained(path) for token_id in generation_config.eos_token_id: potential_stop_words.append(self.tokenizer.decode(token_id)) except: pass potential_stop_words.append(self.tokenizer.eos_token) potential_stop_words = list(set(potential_stop_words)) return potential_stop_words def generate(self, inputs: List[str], max_out_len: int, stopping_criteria: List[str] = [], **kwargs) -> List[str]: """Generate results given a list of inputs. Args: inputs (List[str]): A list of strings. max_out_len (int): The maximum length of the output. Returns: List[str]: A list of generated strings. """ messages = _convert_chat_messages(inputs) if self.fastchat_template: messages = _format_with_fast_chat_template(messages, self.fastchat_template) else: messages = [self.tokenizer.apply_chat_template(m, add_generation_prompt=True, tokenize=False) for m in messages] DEFAULT_GENERATION_KWARGS = { 'temperature': 0, 'max_tokens': max_out_len, 'stop': list(set(self.stop_words + stopping_criteria)) } sampling_kwargs = DEFAULT_GENERATION_KWARGS.copy() sampling_kwargs.update(self.generation_kwargs) sampling_kwargs.update(kwargs) sampling_kwargs = SamplingParams(**sampling_kwargs) outputs = self.model.generate(messages, sampling_kwargs) prompt_list, output_strs = [], [] for output in outputs: prompt = output.prompt generated_text = output.outputs[0].text prompt_list.append(prompt) output_strs.append(generated_text) return output_strs def get_token_len(self, prompt: str) -> int: """Get lengths of the tokenized strings. Args: prompt (str): Input string. Returns: int: Length of the input tokens """ m = _convert_chat_messages([prompt])[0] t = self.tokenizer.apply_chat_template(m, add_generation_prompt=True, return_dict=True) return len(t['input_ids'])