| | import os |
| | from typing import Dict, List, Optional, Union |
| |
|
| | import numpy as np |
| | import torch |
| | import transformers |
| |
|
| | from opencompass.models.base import BaseModel |
| | from opencompass.models.base_api import APITemplateParser |
| | from opencompass.registry import MODELS |
| | from opencompass.utils.logging import get_logger |
| | from opencompass.utils.prompt import PromptList |
| |
|
| | PromptType = Union[PromptList, str] |
| |
|
| |
|
| | class MultiTokenEOSCriteria(transformers.StoppingCriteria): |
| | """Criteria to stop on the specified multi-token sequence.""" |
| |
|
| | def __init__( |
| | self, |
| | sequence: str, |
| | tokenizer: transformers.PreTrainedTokenizer, |
| | batch_size: int, |
| | ): |
| | self.done_tracker = [False] * batch_size |
| | self.sequence = sequence |
| | self.sequence_ids = tokenizer.encode(sequence, |
| | add_special_tokens=False) |
| | self.sequence_id_len = len(self.sequence_ids) |
| | self.tokenizer = tokenizer |
| |
|
| | def __call__(self, input_ids, scores, **kwargs) -> bool: |
| | |
| | lookback_ids_batch = input_ids[:, -self.sequence_id_len:] |
| | lookback_tokens_batch = self.tokenizer.batch_decode(lookback_ids_batch) |
| | for i, done in enumerate(self.done_tracker): |
| | if done: |
| | continue |
| | self.done_tracker[i] = self.sequence in lookback_tokens_batch[i] |
| | return False not in self.done_tracker |
| |
|
| |
|
| | @MODELS.register_module() |
| | class HuggingFace(BaseModel): |
| | """Model wrapper around HuggingFace models. |
| | |
| | Args: |
| | path (str): The name or path to HuggingFace's model. |
| | hf_cache_dir: Set the cache dir to HF model cache dir. If None, it will |
| | use the env variable HF_MODEL_HUB. Defaults to None. |
| | max_seq_len (int): The maximum length of the input sequence. Defaults |
| | to 2048. |
| | tokenizer_path (str): The path to the tokenizer. Defaults to None. |
| | tokenizer_kwargs (dict): Keyword arguments for the tokenizer. |
| | Defaults to {}. |
| | peft_path (str, optional): The name or path to the HuggingFace's PEFT |
| | model. If None, the original model will not be converted to PEFT. |
| | Defaults to None. |
| | tokenizer_only (bool): If True, only the tokenizer will be initialized. |
| | Defaults to False. |
| | model_kwargs (dict): Keyword arguments for the model, used in loader. |
| | Defaults to dict(device_map='auto'). |
| | meta_template (Dict, optional): The model's meta prompt |
| | template if needed, in case the requirement of injecting or |
| | wrapping of any meta instructions. |
| | extract_pred_after_decode (bool): Whether to extract the prediction |
| | string from the decoded output string, instead of extract the |
| | prediction tokens before decoding. Defaults to False. |
| | batch_padding (bool): If False, inference with be performed in for-loop |
| | without batch padding. |
| | pad_token_id (int): The id of the padding token. Defaults to None. Use |
| | (#vocab + pad_token_id) if get negative value. |
| | mode (str, optional): The method of input truncation when input length |
| | exceeds max_seq_len. 'mid' represents the part of input to |
| | truncate. Defaults to 'none'. |
| | use_fastchat_template (str, optional): Whether to use fastchat to get |
| | the conversation template. If True, fastchat needs to be |
| | implemented first. Defaults to False. |
| | end_str (str, optional): Whether to trim generated strings with end_str |
| | if the model has special ending strings that are not handled well. |
| | Defaults to None. |
| | |
| | Note: |
| | About ``extract_pred_after_decode``: Commonly, we should extract the |
| | the prediction tokens before decoding. But for some tokenizers using |
| | ``sentencepiece``, like LLaMA, this behavior may change the number of |
| | whitespaces, which is harmful for Python programming tasks. |
| | """ |
| |
|
| | def __init__(self, |
| | path: str, |
| | hf_cache_dir: Optional[str] = None, |
| | max_seq_len: int = 2048, |
| | tokenizer_path: Optional[str] = None, |
| | tokenizer_kwargs: dict = dict(), |
| | peft_path: Optional[str] = None, |
| | tokenizer_only: bool = False, |
| | model_kwargs: dict = dict(device_map='auto'), |
| | generation_kwargs: dict = dict(), |
| | meta_template: Optional[Dict] = None, |
| | extract_pred_after_decode: bool = False, |
| | batch_padding: bool = False, |
| | pad_token_id: Optional[int] = None, |
| | mode: str = 'none', |
| | use_fastchat_template: bool = False, |
| | end_str: Optional[str] = None): |
| | super().__init__(path=path, |
| | max_seq_len=max_seq_len, |
| | tokenizer_only=tokenizer_only, |
| | meta_template=meta_template) |
| | if hf_cache_dir is None: |
| | hf_cache_dir = os.getenv('HF_MODEL_HUB', None) |
| | self.logger = get_logger() |
| | self.pad_token_id = pad_token_id |
| | assert mode in ['none', 'mid'] |
| | self.mode = mode |
| | self._load_tokenizer(path=path, |
| | tokenizer_path=tokenizer_path, |
| | tokenizer_kwargs=tokenizer_kwargs) |
| | self.batch_padding = batch_padding |
| | self.extract_pred_after_decode = extract_pred_after_decode |
| | if not tokenizer_only: |
| | self._load_model(path=path, |
| | model_kwargs=model_kwargs, |
| | peft_path=peft_path) |
| | self.generation_kwargs = generation_kwargs |
| | self.use_fastchat_template = use_fastchat_template |
| | self.end_str = end_str |
| |
|
| | def _load_tokenizer(self, path: str, tokenizer_path: Optional[str], |
| | tokenizer_kwargs: dict): |
| | from transformers import AutoTokenizer |
| | self.tokenizer = AutoTokenizer.from_pretrained( |
| | tokenizer_path if tokenizer_path else path, **tokenizer_kwargs) |
| |
|
| | |
| | if self.pad_token_id is not None: |
| | if self.pad_token_id < 0: |
| | self.pad_token_id += self.tokenizer.vocab_size |
| | if self.tokenizer.pad_token_id is None: |
| | self.logger.debug(f'Using {self.pad_token_id} as pad_token_id') |
| | elif self.tokenizer.pad_token_id != self.pad_token_id: |
| | self.logger.warning( |
| | 'pad_token_id is not consistent with the tokenizer. Using ' |
| | f'{self.pad_token_id} as pad_token_id') |
| | self.tokenizer.pad_token_id = self.pad_token_id |
| | elif self.tokenizer.pad_token_id is None: |
| | self.logger.warning('pad_token_id is not set for the tokenizer.') |
| | if self.tokenizer.eos_token is not None: |
| | self.logger.warning( |
| | f'Using eos_token_id {self.tokenizer.eos_token} ' |
| | 'as pad_token_id.') |
| | self.tokenizer.pad_token = self.tokenizer.eos_token |
| | else: |
| | from transformers.generation import GenerationConfig |
| | gcfg = GenerationConfig.from_pretrained(path) |
| |
|
| | if gcfg.pad_token_id is not None: |
| | self.logger.warning( |
| | f'Using pad_token_id {gcfg.pad_token_id} ' |
| | 'as pad_token_id.') |
| | self.tokenizer.pad_token_id = gcfg.pad_token_id |
| | else: |
| | raise ValueError( |
| | 'pad_token_id is not set for this tokenizer. Try to ' |
| | 'set pad_token_id via passing ' |
| | '`pad_token_id={PAD_TOKEN_ID}` in model_cfg.') |
| |
|
| | |
| | if 'decapoda-research/llama' in path or \ |
| | (tokenizer_path and |
| | 'decapoda-research/llama' in tokenizer_path): |
| | self.logger.warning('We set new pad_token_id for LLaMA model') |
| | |
| | |
| | self.tokenizer.bos_token = '<s>' |
| | self.tokenizer.eos_token = '</s>' |
| | self.tokenizer.pad_token_id = 0 |
| |
|
| | def _set_model_kwargs_torch_dtype(self, model_kwargs): |
| | if 'torch_dtype' not in model_kwargs: |
| | torch_dtype = torch.float16 |
| | else: |
| | torch_dtype = { |
| | 'torch.float16': torch.float16, |
| | 'torch.bfloat16': torch.bfloat16, |
| | 'torch.float': torch.float, |
| | 'auto': 'auto', |
| | 'None': None |
| | }.get(model_kwargs['torch_dtype']) |
| | self.logger.debug(f'HF using torch_dtype: {torch_dtype}') |
| | if torch_dtype is not None: |
| | model_kwargs['torch_dtype'] = torch_dtype |
| |
|
| | def _load_model(self, |
| | path: str, |
| | model_kwargs: dict, |
| | peft_path: Optional[str] = None): |
| | from transformers import AutoModel, AutoModelForCausalLM |
| |
|
| | self._set_model_kwargs_torch_dtype(model_kwargs) |
| | try: |
| | self.model = AutoModelForCausalLM.from_pretrained( |
| | path, **model_kwargs) |
| | except ValueError: |
| | self.model = AutoModel.from_pretrained(path, **model_kwargs) |
| |
|
| | if peft_path is not None: |
| | from peft import PeftModel |
| | self.model = PeftModel.from_pretrained(self.model, |
| | peft_path, |
| | is_trainable=False) |
| | self.model.eval() |
| | self.model.generation_config.do_sample = False |
| |
|
| | |
| | if 'decapoda-research/llama' in path: |
| | self.model.config.bos_token_id = 1 |
| | self.model.config.eos_token_id = 2 |
| | self.model.config.pad_token_id = self.tokenizer.pad_token_id |
| |
|
| | def generate(self, |
| | inputs: List[str], |
| | max_out_len: int, |
| | min_out_len: Optional[int] = None, |
| | stopping_criteria: List[str] = [], |
| | **kwargs) -> List[str]: |
| | """Generate results given a list of inputs. |
| | |
| | Args: |
| | inputs (List[str]): A list of strings. |
| | max_out_len (int): The maximum length of the output. |
| | min_out_len (Optional[int]): The minimum length of the output. |
| | |
| | Returns: |
| | List[str]: A list of generated strings. |
| | """ |
| | generation_kwargs = kwargs.copy() |
| | generation_kwargs.update(self.generation_kwargs) |
| | if self.batch_padding and len(inputs) > 1: |
| | return self._batch_generate(inputs=inputs, |
| | max_out_len=max_out_len, |
| | min_out_len=min_out_len, |
| | stopping_criteria=stopping_criteria, |
| | **generation_kwargs) |
| | else: |
| | return sum( |
| | (self._single_generate(inputs=[input_], |
| | max_out_len=max_out_len, |
| | min_out_len=min_out_len, |
| | stopping_criteria=stopping_criteria, |
| | **generation_kwargs) |
| | for input_ in inputs), []) |
| |
|
| | def _batch_generate(self, |
| | inputs: List[str], |
| | max_out_len: int, |
| | min_out_len: Optional[int] = None, |
| | stopping_criteria: List[str] = [], |
| | **kwargs) -> List[str]: |
| | """Support for batch prompts inference. |
| | |
| | Args: |
| | inputs (List[str]): A list of strings. |
| | max_out_len (int): The maximum length of the output. |
| | |
| | Returns: |
| | List[str]: A list of generated strings. |
| | """ |
| | if self.extract_pred_after_decode: |
| | prompt_lens = [len(input_) for input_ in inputs] |
| |
|
| | if self.use_fastchat_template: |
| | try: |
| | from fastchat.model import get_conversation_template |
| | except ModuleNotFoundError: |
| | raise ModuleNotFoundError( |
| | 'Fastchat is not implemented. You can use ' |
| | '\'pip install "fschat[model_worker,webui]"\' ' |
| | 'to implement fastchat.') |
| | for i in range(len(inputs)): |
| | conv = get_conversation_template('vicuna') |
| | conv.append_message(conv.roles[0], inputs[i]) |
| | conv.append_message(conv.roles[1], None) |
| | inputs[i] = conv.get_prompt() |
| |
|
| | |
| | tokens = self.tokenizer.batch_encode_plus(inputs, |
| | padding=True, |
| | truncation=True, |
| | max_length=self.max_seq_len - |
| | max_out_len) |
| | tokens = { |
| | k: torch.tensor(np.array(tokens[k]), device=self.model.device) |
| | for k in tokens if k in ['input_ids', 'attention_mask'] |
| | } |
| |
|
| | if stopping_criteria: |
| | |
| | if self.tokenizer.eos_token is not None: |
| | stopping_criteria = stopping_criteria + [ |
| | self.tokenizer.eos_token |
| | ] |
| | stopping_criteria = transformers.StoppingCriteriaList([ |
| | *[ |
| | MultiTokenEOSCriteria(sequence, self.tokenizer, |
| | tokens['input_ids'].shape[0]) |
| | for sequence in stopping_criteria |
| | ], |
| | ]) |
| | kwargs['stopping_criteria'] = stopping_criteria |
| |
|
| | if min_out_len is not None: |
| | kwargs['min_new_tokens'] = min_out_len |
| |
|
| | |
| | outputs = self.model.generate(**tokens, |
| | max_new_tokens=max_out_len, |
| | **kwargs) |
| |
|
| | if not self.extract_pred_after_decode: |
| | outputs = outputs[:, tokens['input_ids'].shape[1]:] |
| |
|
| | decodeds = self.tokenizer.batch_decode(outputs, |
| | skip_special_tokens=True) |
| |
|
| | if self.extract_pred_after_decode: |
| | decodeds = [ |
| | token[len_:] for token, len_ in zip(decodeds, prompt_lens) |
| | ] |
| |
|
| | if self.end_str: |
| | decodeds = [token.split(self.end_str)[0] for token in decodeds] |
| | return decodeds |
| |
|
| | def _single_generate(self, |
| | inputs: List[str], |
| | max_out_len: int, |
| | min_out_len: Optional[int] = None, |
| | stopping_criteria: List[str] = [], |
| | **kwargs) -> List[str]: |
| | """Support for single prompt inference. |
| | |
| | Args: |
| | inputs (List[str]): A list of strings. |
| | max_out_len (int): The maximum length of the output. |
| | |
| | Returns: |
| | List[str]: A list of generated strings. |
| | """ |
| | if self.extract_pred_after_decode: |
| | prompt_lens = [len(input_) for input_ in inputs] |
| |
|
| | if self.use_fastchat_template: |
| | try: |
| | from fastchat.model import get_conversation_template |
| | except ModuleNotFoundError: |
| | raise ModuleNotFoundError( |
| | 'Fastchat is not implemented. You can use ' |
| | '\'pip install "fschat[model_worker,webui]"\' ' |
| | 'to implement fastchat.') |
| | conv = get_conversation_template('vicuna') |
| | conv.append_message(conv.roles[0], inputs[0]) |
| | conv.append_message(conv.roles[1], None) |
| | inputs = [conv.get_prompt()] |
| |
|
| | if self.mode == 'mid': |
| | input_ids = self.tokenizer(inputs, truncation=False)['input_ids'] |
| | input_ids = torch.tensor(input_ids, device=self.model.device) |
| | if len(input_ids[0]) > self.max_seq_len - max_out_len: |
| | half = int((self.max_seq_len - max_out_len) / 2) |
| | inputs = [ |
| | self.tokenizer.decode(input_ids[0][:half], |
| | skip_special_tokens=True) + |
| | self.tokenizer.decode(input_ids[0][-half:], |
| | skip_special_tokens=True) |
| | ] |
| |
|
| | input_ids = self.tokenizer(inputs, |
| | truncation=True, |
| | max_length=self.max_seq_len - |
| | max_out_len)['input_ids'] |
| | input_ids = torch.tensor(input_ids, device=self.model.device) |
| | if stopping_criteria: |
| | |
| | if self.tokenizer.eos_token is not None: |
| | stopping_criteria = stopping_criteria + [ |
| | self.tokenizer.eos_token |
| | ] |
| | stopping_criteria = transformers.StoppingCriteriaList([ |
| | *[ |
| | MultiTokenEOSCriteria(sequence, self.tokenizer, |
| | input_ids.shape[0]) |
| | for sequence in stopping_criteria |
| | ], |
| | ]) |
| | kwargs['stopping_criteria'] = stopping_criteria |
| |
|
| | if min_out_len is not None: |
| | kwargs['min_new_tokens'] = min_out_len |
| |
|
| | |
| | |
| | outputs = self.model.generate(input_ids=input_ids, |
| | max_new_tokens=max_out_len, |
| | **kwargs) |
| |
|
| | if not self.extract_pred_after_decode: |
| | outputs = outputs[:, input_ids.shape[1]:] |
| |
|
| | decodeds = self.tokenizer.batch_decode(outputs, |
| | skip_special_tokens=True) |
| |
|
| | if self.extract_pred_after_decode: |
| | decodeds = [ |
| | token[len_:] for token, len_ in zip(decodeds, prompt_lens) |
| | ] |
| |
|
| | if self.end_str: |
| | decodeds = [token.split(self.end_str)[0] for token in decodeds] |
| | return decodeds |
| |
|
| | def get_logits(self, inputs: List[str]): |
| |
|
| | if self.batch_padding and len(inputs) > 1: |
| | |
| | tokens = self.tokenizer(inputs, |
| | padding=True, |
| | truncation=True, |
| | max_length=self.max_seq_len) |
| |
|
| | tokens = { |
| | k: torch.tensor(np.array(tokens[k]), device=self.model.device) |
| | for k in tokens if k in ['input_ids', 'attention_mask'] |
| | } |
| | outputs = self.model(**tokens) |
| |
|
| | else: |
| | input_ids = self.tokenizer( |
| | inputs, |
| | padding=False, |
| | truncation=True, |
| | max_length=self.max_seq_len)['input_ids'] |
| | input_ids = torch.tensor(input_ids, device=self.model.device) |
| | tokens = {'input_ids': input_ids} |
| |
|
| | outputs = self.model(input_ids) |
| | return outputs[0], {'tokens': tokens} |
| |
|
| | def get_ppl(self, |
| | inputs: List[str], |
| | mask_length: Optional[List[int]] = None) -> List[float]: |
| | """Get perplexity scores given a list of inputs. |
| | |
| | Args: |
| | inputs (List[str]): A list of strings. |
| | mask_length (Optional[List[int]]): A list of mask lengths. If |
| | provided, the perplexity scores will be calculated with the |
| | first mask_length[i] tokens masked out. It's okay to skip |
| | its implementation if advanced features in PPLInfernecer is |
| | not needed. |
| | |
| | Returns: |
| | List[float]: A list of perplexity scores. |
| | """ |
| |
|
| | if self.batch_padding and len(inputs) > 1: |
| | assert self.tokenizer.pad_token |
| | return self._get_ppl(inputs, mask_length=mask_length) |
| | else: |
| | return np.concatenate([ |
| | self._get_ppl(inputs=[text], mask_length=mask_length) |
| | for text in inputs |
| | ]) |
| |
|
| | def _get_ppl(self, |
| | inputs: List[str], |
| | mask_length: Optional[List[int]] = None) -> List[float]: |
| | """Get perplexity scores given a list of inputs. |
| | |
| | Args: |
| | inputs (List[str]): A list of strings. |
| | mask_length (Optional[List[int]]): A list of mask lengths. If |
| | provided, the perplexity scores will be calculated with the |
| | first mask_length[i] tokens masked out. It's okay to skip |
| | its implementation if advanced features in PPLInfernecer is |
| | not needed. |
| | |
| | Returns: |
| | List[float]: A list of perplexity scores. |
| | """ |
| |
|
| | outputs, inputs = self.get_logits(inputs) |
| | shift_logits = outputs[..., :-1, :].contiguous().float() |
| |
|
| | shift_labels = inputs['tokens']['input_ids'][..., 1:].contiguous() |
| |
|
| | loss_fct = torch.nn.CrossEntropyLoss( |
| | reduction='none', ignore_index=self.tokenizer.pad_token_id) |
| | loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), |
| | shift_labels.view(-1)).view(shift_labels.size()) |
| |
|
| | if mask_length is not None: |
| | mask = torch.zeros_like(shift_labels) |
| | for i in range(len(mask)): |
| | for j in range(mask_length[i] - 1, len(mask[i])): |
| | mask[i][j] = 1 |
| | loss = loss * mask |
| |
|
| | lens = (inputs['tokens']['input_ids'] != |
| | self.tokenizer.pad_token_id).sum(-1).cpu().numpy() |
| | if mask_length is not None: |
| | lens -= np.array(mask_length) |
| | ce_loss = loss.float().sum(-1).cpu().detach().numpy() / lens |
| | return ce_loss |
| |
|
| | def get_loglikelihood( |
| | self, |
| | inputs: List[str], |
| | conts: List[str], |
| | mask_length: Optional[List[int]] = None) -> List[float]: |
| | """Get loglikelihood scores given a list of inputs. |
| | |
| | Args: |
| | inputs (List[str]): A list of strings. |
| | conts (List[str]): A list of strings: slices after the space. |
| | NOT SUPPORT mask_length YET! |
| | mask_length (Optional[List[int]]): A list of mask lengths. If |
| | provided, the perplexity scores will be calculated with the |
| | first mask_length[i] tokens masked out. It's okay to skip |
| | its implementation if advanced features in PPLInfernecer is |
| | not needed. |
| | |
| | Returns: |
| | List[float]: A list of loglikelihood scores. |
| | """ |
| | assert mask_length is None, 'Not support mask_length yet.' |
| | if self.batch_padding and len(inputs) > 1: |
| | assert self.tokenizer.pad_token |
| | return self._get_loglikelihood(inputs, conts) |
| | else: |
| | return np.concatenate([ |
| | self._get_loglikelihood(inputs=[inputs[idx]], |
| | conts=[conts[idx]]) |
| | for idx in range(len(inputs)) |
| | ]) |
| |
|
| | def _get_loglikelihood(self, inputs: str, conts: str) -> float: |
| | """Get loglikelihood scores given input string and continuation string. |
| | |
| | Args: |
| | inputs (str): string. |
| | conts (str): strings: slices after the space. |
| | Returns: |
| | float: loglikelihood scores. |
| | """ |
| | input_tokenizer_out = self.tokenizer(inputs, |
| | padding=True, |
| | truncation=False, |
| | return_length=True, |
| | return_tensors='pt').to( |
| | self.model.device) |
| |
|
| | input_ids = input_tokenizer_out['input_ids'][:, :self.max_seq_len] |
| | input_length = input_tokenizer_out['length'] |
| | context_ids = [ |
| | self.tokenizer(inputs[i].replace(conts[i], ''), |
| | padding=False, |
| | truncation=True, |
| | max_length=self.max_seq_len)['input_ids'] |
| | for i in range(len(inputs)) |
| | ] |
| | |
| | outputs = self.model(input_ids)['logits'] |
| | outputs = torch.nn.functional.log_softmax(outputs, dim=-1) |
| | |
| | answer = np.zeros(len(inputs)) |
| | for i in range(len(inputs)): |
| | if self.tokenizer.padding_side == 'right': |
| | cont_ids = input_ids[i, len(context_ids[i]):input_length[i]] |
| | logits = outputs[i, |
| | len(context_ids[i]) - 1:input_length[i] - |
| | 1, :] |
| | else: |
| | cont_ids = input_ids[i, len(context_ids[i]) - input_length[i]:] |
| | logits = outputs[i, |
| | len(context_ids[i]) - input_length[i] - 1:-1] |
| | |
| | logits_gather = torch.gather( |
| | logits.unsqueeze(0), 2, |
| | cont_ids.unsqueeze(0).unsqueeze(-1)) |
| | |
| | answer[i] = float(logits_gather.detach().cpu().sum()) |
| | return answer |
| |
|
| | def get_mink_percent(self, inputs: List[str], k: int = 20) -> List[float]: |
| | """https://swj0419.github.io/detect-pretrain.github.io/""" |
| |
|
| | if self.batch_padding and len(inputs) > 1: |
| | assert self.tokenizer.pad_token |
| | return self._get_mink_percent(inputs, k=k) |
| | else: |
| | return np.concatenate([ |
| | self._get_mink_percent(inputs=[text], k=k) for text in inputs |
| | ]) |
| |
|
| | def _get_mink_percent(self, inputs: List[str], k: int = 20) -> List[float]: |
| | outputs, inputs = self.get_logits(inputs) |
| | shift_logits = outputs[:, :-1, :].contiguous().float() |
| | shift_labels = inputs['tokens']['input_ids'][:, 1:].contiguous() |
| |
|
| | loss_fct = torch.nn.CrossEntropyLoss( |
| | reduction='none', ignore_index=self.tokenizer.pad_token_id) |
| | loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), |
| | shift_labels.view(-1)).view(shift_labels.size()) |
| | lens = (inputs['tokens']['input_ids'] != |
| | self.tokenizer.pad_token_id).sum(-1).cpu().numpy() |
| | mink_percent = [] |
| | for nloss, nlen in zip(loss, lens): |
| | nlen = int(nlen) |
| | minklen = max(nlen * k // 100, 1) |
| | nloss = torch.topk(loss[-nlen:], minklen, dim=-1)[0] |
| | nloss = -nloss.float().mean().cpu().detach().numpy() |
| | mink_percent.append(nloss) |
| | return np.array(mink_percent) |
| |
|
| | def get_token_len(self, prompt: str) -> int: |
| | """Get lengths of the tokenized strings. |
| | |
| | Args: |
| | prompt (str): Input string. |
| | |
| | Returns: |
| | int: Length of the input tokens |
| | """ |
| | return len(self.tokenizer.encode(prompt)) |
| |
|
| |
|
| | @MODELS.register_module() |
| | class HuggingFaceCausalLM(HuggingFace): |
| | """Model wrapper around HuggingFace CausalLM. |
| | |
| | Args: |
| | path (str): The name or path to HuggingFace's model. |
| | hf_cache_dir: Set the cache dir to HF model cache dir. If None, it will |
| | use the env variable HF_MODEL_HUB. Defaults to None. |
| | max_seq_len (int): The maximum length of the input sequence. Defaults |
| | to 2048. |
| | tokenizer_path (str): The path to the tokenizer. Defaults to None. |
| | tokenizer_kwargs (dict): Keyword arguments for the tokenizer. |
| | Defaults to {}. |
| | peft_path (str, optional): The name or path to the HuggingFace's PEFT |
| | model. If None, the original model will not be converted to PEFT. |
| | Defaults to None. |
| | tokenizer_only (bool): If True, only the tokenizer will be initialized. |
| | Defaults to False. |
| | model_kwargs (dict): Keyword arguments for the model, used in loader. |
| | Defaults to dict(device_map='auto'). |
| | meta_template (Dict, optional): The model's meta prompt |
| | template if needed, in case the requirement of injecting or |
| | wrapping of any meta instructions. |
| | batch_padding (bool): If False, inference with be performed in for-loop |
| | without batch padding. |
| | """ |
| |
|
| | def _load_model(self, |
| | path: str, |
| | model_kwargs: dict, |
| | peft_path: Optional[str] = None): |
| | from transformers import AutoModelForCausalLM |
| |
|
| | self._set_model_kwargs_torch_dtype(model_kwargs) |
| | self.model = AutoModelForCausalLM.from_pretrained(path, **model_kwargs) |
| | if peft_path is not None: |
| | from peft import PeftModel |
| | self.model = PeftModel.from_pretrained(self.model, |
| | peft_path, |
| | is_trainable=False) |
| | self.model.eval() |
| | self.model.generation_config.do_sample = False |
| |
|
| |
|
| | class HuggingFaceChatGLM3(HuggingFace): |
| | """Model wrapper around HuggingFace's ChatGLM3. Details available in |
| | `https://huggingface.co/THUDM/chatglm3-6b`. |
| | |
| | model.chat() is used for inference. |
| | """ |
| |
|
| | def __init__(self, |
| | path: str, |
| | hf_cache_dir: Optional[str] = None, |
| | max_seq_len: int = 2048, |
| | tokenizer_path: Optional[str] = None, |
| | tokenizer_kwargs: dict = dict(), |
| | peft_path: Optional[str] = None, |
| | tokenizer_only: bool = False, |
| | model_kwargs: dict = dict(device_map='auto'), |
| | generation_kwargs: dict = dict(), |
| | meta_template: Optional[Dict] = None, |
| | extract_pred_after_decode: bool = False, |
| | batch_padding: bool = False, |
| | pad_token_id: Optional[int] = None, |
| | mode: str = 'none', |
| | num_extra_tokens: int = 50): |
| | super().__init__(path=path, |
| | hf_cache_dir=hf_cache_dir, |
| | max_seq_len=max_seq_len, |
| | tokenizer_path=tokenizer_path, |
| | tokenizer_kwargs=tokenizer_kwargs, |
| | peft_path=peft_path, |
| | tokenizer_only=tokenizer_only, |
| | generation_kwargs=generation_kwargs, |
| | model_kwargs=model_kwargs, |
| | meta_template=meta_template, |
| | extract_pred_after_decode=extract_pred_after_decode, |
| | batch_padding=batch_padding, |
| | pad_token_id=pad_token_id, |
| | mode=mode) |
| | self.template_parser = APITemplateParser(meta_template) |
| | |
| | self.num_extra_tokens = num_extra_tokens |
| |
|
| | def generate(self, |
| | inputs: List[str or PromptList], |
| | max_out_len: int = 512, |
| | skip_overlength=False, |
| | **kwargs) -> str: |
| | """Generate response from input prompt. |
| | |
| | Args: |
| | inputs (list): input prompt |
| | max_out_len (int): max output length |
| | """ |
| | generation_kwargs = kwargs.copy() |
| | generation_kwargs.update(self.generation_kwargs) |
| |
|
| | responses = [] |
| | for _input in inputs: |
| | assert isinstance(_input, (str, PromptList)) |
| | if isinstance(_input, str): |
| | history = [{'role': 'user', 'content': _input}] |
| | else: |
| | history = [] |
| | for item in _input: |
| | msg = { |
| | 'content': item['prompt'], |
| | 'role': { |
| | 'HUMAN': 'user', |
| | 'BOT': 'assistant', |
| | 'SYSTEM': 'system', |
| | }[item['role'].upper()] |
| | } |
| | history.append(msg) |
| | user_content = history[-1]['content'] |
| | history = history[:-1] |
| |
|
| | if skip_overlength: |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | len_user_content = len(self.tokenizer.encode(user_content)) |
| | if len_user_content > 8192: |
| | responses.append('') |
| | continue |
| |
|
| | response, history = self.model.chat(self.tokenizer, |
| | user_content, |
| | history=history, |
| | max_new_tokens=max_out_len, |
| | **generation_kwargs) |
| | |
| | if isinstance(response, dict): |
| | response = response.get('content', '') |
| | responses.append(response) |
| | return responses |
| |
|
| | def get_token_len(self, prompt: str) -> int: |
| | return len(self.tokenizer.encode(prompt)) + self.num_extra_tokens |
| |
|