| |
| |
|
|
| """Implements a Hugging Causal LM wrapped inside a :class:`.ComposerModel`.""" |
|
|
| import os |
| from copy import deepcopy |
| import warnings |
| import numpy as np |
| import logging |
| from typing import ( |
| TYPE_CHECKING, |
| Any, |
| List, |
| Mapping, |
| Optional, |
| Tuple, |
| Union, |
| Dict, |
| ) |
|
|
| import torch |
| import torch.nn as nn |
| from types import SimpleNamespace |
| from composer.models.huggingface import peft_installed |
| from composer.utils import dist |
|
|
| from torchmetrics import Metric |
| from transformers import ( |
| AutoConfig, |
| AutoModelForCausalLM, |
| PretrainedConfig, |
| PreTrainedModel, |
| PreTrainedTokenizerBase, |
| PreTrainedTokenizerFast, |
| PreTrainedTokenizer, |
| ) |
|
|
| from llmfoundry.models.hf.hf_fsdp import hf_get_init_device |
| from llmfoundry.models.layers.attention import is_flash_v2_installed |
| from llmfoundry.models.utils import init_empty_weights |
| from llmfoundry.utils.config_utils import get_hf_config_value |
|
|
| from composer.models.huggingface import HuggingFaceModel |
| from compose_rl.reward_learning.utils import prepare_hf_sequence_classification_model_for_fsdp, SequenceClassifierOutput |
|
|
| if TYPE_CHECKING: |
| from peft import PeftModel |
|
|
| __all__ = ['ComposerHFSequenceClassification'] |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| Tokenizer = Union[PreTrainedTokenizer, PreTrainedTokenizerFast] |
|
|
|
|
| def layer_init(layer: nn.Module, std: float=np.sqrt(2), bias_const: float=0.0): |
| torch.nn.init.normal_(layer.weight, std=std) |
| torch.nn.init.constant_(layer.bias, val=bias_const) |
| return layer |
|
|
|
|
| class RewardModelConfig(PretrainedConfig): |
| model_type = "pairwise_rm" |
|
|
| def __init__( |
| self, |
| base_model: str = "meta-llama/Meta-Llama-3-70B-Instruct", |
| base_config: PretrainedConfig = AutoConfig.from_pretrained("meta-llama/Meta-Llama-3-70B-Instruct"), |
| p_dropout: float = 0.0, |
| n_labels: int = 1, |
| bias: float = 0.0, |
| return_logits: bool = False, |
| pretrain_cfg: Dict[str, Any] = {}, |
| pretrained: bool = False, |
| **kwargs: Any, |
| ): |
| super().__init__(**kwargs) |
| self.base_model = base_model |
| self.base_config = base_config |
| temp_config = deepcopy(base_config) |
| if not isinstance(base_config, dict): |
| temp_config = base_config.__dict__ |
| for key, value in temp_config.items(): |
| if key not in ["_name_or_path", "architectures"]: |
| setattr(self, key, value) |
| self.p_dropout = p_dropout |
| self.n_labels = n_labels |
| self.bias = bias |
| self.return_logits = return_logits |
| self.pretrain_cfg = pretrain_cfg |
| self.pretrained = pretrained |
|
|
|
|
| class ValueHead(nn.Module): |
|
|
| def __init__(self, config: RewardModelConfig): |
| super().__init__() |
| self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
| self.dropout = nn.Dropout(config.p_dropout) |
| self.score = layer_init( |
| nn.Linear(config.hidden_size, config.n_labels), |
| std=1 / np.sqrt(config.hidden_size + 1), |
| ) |
| self.score = nn.Linear(config.hidden_size, config.n_labels) |
|
|
| def forward(self, hidden_states: torch.Tensor, **kwargs: Any): |
| hidden_states = self.dropout(hidden_states) |
| hidden_states = self.dense(hidden_states) |
| hidden_states = torch.tanh(hidden_states) |
| hidden_states = self.dropout(hidden_states) |
| output = self.score(hidden_states) |
| return output |
|
|
|
|
| class AutoModelForCausalLMWithRM(PreTrainedModel): |
| config_class = RewardModelConfig |
|
|
| def __init__(self, config: RewardModelConfig): |
| super().__init__(config) |
| self.config = config |
| pretrain_cfg = config.pretrain_cfg |
| pretrained = config.pretrained |
| if pretrained: |
| self.lm_backbone = AutoModelForCausalLM.from_pretrained( |
| config.base_model, |
| config=config.base_config, |
| **pretrain_cfg, |
| ) |
| else: |
| |
| if isinstance(config.base_config, dict): |
| config.base_config = AutoConfig.from_pretrained(config.base_model, **config.base_config) |
| self.lm_backbone = AutoModelForCausalLM.from_config( |
| config.base_config, |
| trust_remote_code=True, |
| ) |
| self.value_head = ValueHead(config) |
| |
| def generate(self, *args: Any, **kwargs: Any): |
| return self.lm_backbone.generate(**kwargs) |
|
|
| def resize_token_embeddings( |
| self, new_num_tokens: Optional[int] = None, pad_to_multiple_of: Optional[int] = None |
| ) -> nn.Embedding: |
| |
| self.config.base_config.vocab_size = new_num_tokens |
| model_embeds = super().resize_token_embeddings(new_num_tokens=new_num_tokens, pad_to_multiple_of=pad_to_multiple_of) |
| return model_embeds |
|
|
| def set_input_embeddings(self, new_embeddings): |
| return self.lm_backbone.set_input_embeddings(new_embeddings) |
|
|
| def get_input_embeddings(self): |
| return self.lm_backbone.get_input_embeddings() |
| |
| def set_output_embeddings(self, new_embeddings): |
| return self.lm_backbone.set_output_embeddings(new_embeddings) |
|
|
| def get_output_embeddings(self): |
| return self.lm_backbone.get_output_embeddings() |
|
|
| def forward( |
| self, |
| input_ids: torch.LongTensor = None, |
| attention_mask: Optional[torch.Tensor] = None, |
| position_ids: Optional[torch.LongTensor] = None, |
| past_key_values: Optional[Any] = None, |
| inputs_embeds: Optional[torch.FloatTensor] = None, |
| labels: Optional[torch.LongTensor] = None, |
| use_cache: Optional[bool] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| cache_position: Optional[torch.LongTensor] = None, |
| **kwargs: Any, |
| ): |
| output = self.lm_backbone( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| position_ids=position_ids, |
| past_key_values=past_key_values, |
| inputs_embeds=inputs_embeds, |
| labels=labels, |
| use_cache=use_cache, |
| output_attentions=output_attentions, |
| output_hidden_states=True, |
| return_dict=True, |
| cache_position=cache_position, |
| ) |
| scores = self.value_head(output.hidden_states[-1]).squeeze(-1) - self.config.bias |
|
|
| logits = None |
| if self.config.return_logits: |
| logits = output.logits |
|
|
| return SequenceClassifierOutput( |
| loss=output.loss, |
| scores=scores, |
| logits=logits, |
| past_key_values=output.past_key_values, |
| hidden_states=output.hidden_states, |
| attentions=output.attentions, |
| ) |
|
|
|
|
| class ComposerHFSequenceClassification(HuggingFaceModel): |
| |
| """Configures a :class:`.HuggingFaceModel` around a Causal LM. |
| |
| Args: |
| pretrained_model_name_or_path (str): The name of or local path to |
| the HF Causal LM (e.g., `gpt2` to instantiate a GPT2LMHeadModel). |
| config_overrides (dict, optional): An optional dictionary of keyword |
| arguments that override the default configuration associated with |
| cfg.pretrained_model_name_or_path. |
| pretrained (bool): Whether to instantiate the model with pre-trained |
| weights coming from cfg.pretrained_model_name_or_path. If ``True``, |
| cfg.config_overrides must be compatible with the pre-trained weights. |
| init_device ('cpu' | 'meta'): Which device, 'cpu' or 'meta', to |
| initialize the model on. Currently, `meta` is only supported when |
| cfg.pretrained is ``False``. Default: ``'cpu'``. |
| peft_config (dict, optional): An optional dictionary of keyword arguments to be |
| passed to the PeftConfig constructor. If provided, the model will be wrapped in a PeftModel. |
| trust_remote_code (bool, optional): Whether to trust remote code when loading from Hugging Face |
| Hub. Default: ``True``. |
| use_auth_token (bool, optional): Whether to use the Hugging Face authentication token when |
| loading from Hugging Face Hub. Default: ``False``. |
| use_train_metrics (bool, optional): Whether to use training metrics. Default: ``True``. |
| load_in_8bit (bool, optional): Whether to load the model in 8-bit mode. Default: ``False``. |
| init_device (str, optional): Which device to initialize the model on. Default: ``'cpu'``. |
| use_flash_attention_2 (bool, optional): Whether to use flash-attention 2. Default: ``False``. |
| tokenizer (PreTrainedTokenizer): The tokenizer that the model will use. |
| """ |
|
|
| def __init__( |
| self, |
| tokenizer: PreTrainedTokenizerBase, |
| pretrained_model_name_or_path: str, |
| pretrained: bool = True, |
| pretrained_lora_id_or_path: Optional[str] = None, |
| trust_remote_code: bool = True, |
| use_auth_token: bool = False, |
| use_flash_attention_2: bool = False, |
| load_in_8bit: bool = False, |
| init_device: str = 'cpu', |
| config_overrides: Optional[Dict[str, Any]] = None, |
| peft_config: Optional[Dict[str, Any]] = None, |
| use_train_metrics: bool = True, |
| additional_train_metrics: Optional[List] = None, |
| additional_eval_metrics: Optional[List] = None, |
| return_lm_logits: Optional[bool] = False, |
| ): |
|
|
| config_overrides = config_overrides or {} |
|
|
| model = ComposerHFSequenceClassification.build_inner_model( |
| pretrained_model_name_or_path=pretrained_model_name_or_path, |
| pretrained_lora_id_or_path=pretrained_lora_id_or_path, |
| trust_remote_code=trust_remote_code, |
| init_device=init_device, |
| use_flash_attention_2=use_flash_attention_2, |
| use_auth_token=use_auth_token, |
| config_overrides=config_overrides, |
| load_in_8bit=load_in_8bit, |
| pretrained=pretrained, |
| prepare_for_fsdp=True, |
| return_lm_logits=return_lm_logits, |
| ) |
| |
| train_metrics, eval_metrics = ComposerHFSequenceClassification.build_metrics( |
| use_train_metrics=use_train_metrics, |
| additional_train_metrics=additional_train_metrics, |
| additional_eval_metrics=additional_eval_metrics, |
| ) |
|
|
| if peft_config is not None and not peft_installed: |
| raise NotImplementedError("PEFT is not supported") |
|
|
| peft_config_object = None |
| if peft_config is not None: |
| peft_config_object = self._get_peft_config(peft_config) |
|
|
| |
| super().__init__( |
| model=model, |
| shift_labels=True, |
| tokenizer=tokenizer, |
| metrics=train_metrics, |
| eval_metrics=eval_metrics, |
| peft_config=peft_config_object, |
| allow_embedding_resizing=True, |
| ) |
| |
| |
| self.model.config.pretrained = False |
|
|
| @staticmethod |
| def build_metrics( |
| use_train_metrics: bool, |
| additional_train_metrics: Optional[List[str]] = None, |
| additional_eval_metrics: Optional[List[str]] = None, |
| ) -> Tuple[List[Metric], List[Metric]]: |
| """Builds the training and evaluation metrics for the model. |
| |
| Args: |
| use_train_metrics (bool): Whether to use training metrics. |
| additional_train_metrics (Optional[List[str]]): Additional training metrics to include. |
| additional_eval_metrics (Optional[List[str]]): Additional evaluation metrics to include. |
| |
| Returns: |
| Tuple[List[Metric], List[Metric]]: A tuple containing the list of training metrics and evaluation metrics. |
| """ |
| from llmfoundry.utils.builders import build_metric |
| train_metric_names = additional_train_metrics if additional_train_metrics is not None else [] |
| eval_metric_names = additional_eval_metrics if additional_eval_metrics is not None else [] |
| train_metrics = [ |
| build_metric(metric, {}) for metric in train_metric_names |
| ] if use_train_metrics else [] |
| eval_metrics = [ |
| build_metric(metric, {}) for metric in eval_metric_names |
| ] |
| return train_metrics, eval_metrics |
|
|
| @staticmethod |
| def build_inner_model( |
| pretrained_model_name_or_path: str, |
| pretrained_lora_id_or_path: Optional[str], |
| trust_remote_code: bool, |
| init_device: str, |
| use_flash_attention_2: bool, |
| use_auth_token: bool, |
| config_overrides: Dict[str, Any], |
| load_in_8bit: bool, |
| pretrained: bool, |
| prepare_for_fsdp: bool = False, |
| return_lm_logits: bool = False, |
| ) -> Union[PreTrainedModel, 'PeftModel']: |
| """Builds the inner model for the ComposerHFCausalLM. |
| |
| Args: |
| pretrained_model_name_or_path (str): The pretrained model name or path. |
| pretrained_lora_id_or_path (Optional[str]): The pretrained LORA ID or path. |
| trust_remote_code (bool): Whether to trust remote code. |
| init_device (str): The initialization device. |
| use_flash_attention_2 (bool): Whether to use flash attention 2. |
| use_auth_token (bool): Whether to use an authentication token. |
| config_overrides (Dict[str, Any]): The configuration overrides. |
| load_in_8bit (bool): Whether to load in 8-bit. |
| prepare_for_fsdp (bool, optional): Whether to prepare the model for FSDP wrapping. Default: False. |
| |
| Returns: |
| Union[PreTrainedModel, 'PeftModel']: The built inner model. |
| prepare_for_fsdp (bool): Whether to prepare the model for FSDP wrapping. Default: ``False``. |
| """ |
| if not trust_remote_code and pretrained_model_name_or_path.startswith( |
| 'mosaicml/mpt', |
| ): |
| raise ValueError( |
| 'trust_remote_code must be set to True for MPT models. Without this, the MPT model code will come from the transformers library, ' |
| + |
| 'which is significantly slower and not compatible with the LLM foundry training code, rather than the code release by MosaicML.', |
| ) |
| |
| resolved_init_device = hf_get_init_device(init_device) |
| requested_attention_implementation = 'flash_attention_2' if use_flash_attention_2 else 'eager' |
|
|
| if use_flash_attention_2 and not is_flash_v2_installed(): |
| raise ValueError( |
| 'use_flash_attention_2 is set to True, but flash-attention 2 is not installed. ' |
| + 'Please `pip install llm-foundry[gpu]`.', |
| ) |
|
|
| |
| base_config = AutoConfig.from_pretrained( |
| pretrained_model_name_or_path, |
| trust_remote_code=trust_remote_code, |
| token=True, |
| attn_implementation=requested_attention_implementation, |
| use_cache=False, |
| |
| ) |
| |
| config = RewardModelConfig( |
| base_model=pretrained_model_name_or_path, |
| base_config=base_config, |
| hidden_size=base_config.hidden_size, |
| torch_dtype=base_config.torch_dtype, |
| return_logits=return_lm_logits, |
| vocab_size=base_config.vocab_size, |
| ) |
|
|
|
|
| |
| |
| |
| |
| def _autoset_attn_implementation_monkeypatch( |
| cls, |
| config, |
| *args, |
| **kwargs, |
| ): |
| config._attn_implementation = requested_attention_implementation |
| return config |
|
|
| PreTrainedModel._autoset_attn_implementation = classmethod( |
| _autoset_attn_implementation_monkeypatch, |
| ) |
|
|
| |
| for k, v in config_overrides.items(): |
| if not hasattr(config, k): |
| raise ValueError( |
| f'config does not have attribute "{k}" to override ({k}: {v}).', |
| ) |
|
|
| attr = getattr(config, k) |
| |
| if isinstance(attr, Mapping): |
| extra_keys = [_k for _k in v.keys() if _k not in attr.keys()] |
| if extra_keys: |
| raise ValueError( |
| f'Config dict override got unknown keys. ' + |
| f'Extra keys: {extra_keys}. ' + |
| f'Expected (a subset of) keys: {list(attr.keys())}.', |
| ) |
| getattr(config, k).update(v) |
| |
| elif attr is None and isinstance(v, Mapping): |
| setattr(config, k, {}) |
| getattr(config, k).update(v) |
| elif isinstance(attr, PretrainedConfig): |
| if not isinstance(v, Mapping): |
| raise ValueError( |
| f'Expected a dictionary for config override {k}, but got {v}.', |
| ) |
|
|
| for _k, _v in v.items(): |
| if not hasattr(attr, _k): |
| raise ValueError( |
| f'config does not have attribute "{_k}" to override ({k}: {_k}: {_v}).', |
| ) |
| setattr(attr, _k, _v) |
| else: |
| setattr(config, k, v) |
|
|
| if hasattr(config, 'attn_config') and get_hf_config_value( |
| config.attn_config, |
| 'seq_parallel_world_size', |
| ) is not None: |
| raise NotImplementedError( |
| 'Sequence Parallelism is not supported for HuggingFace models.', |
| ) |
|
|
| |
| |
| if dist.get_local_rank() != 0 and init_device == 'mixed': |
| pretrained = False |
|
|
| |
| |
| |
| |
| if dist.get_local_rank() == 0: |
| if os.path.isdir(pretrained_model_name_or_path): |
| with init_empty_weights(include_buffers=False): |
| with warnings.catch_warnings(): |
| warnings.simplefilter('ignore', UserWarning) |
| AutoModelForCausalLM.from_pretrained( |
| pretrained_model_name_or_path, |
| trust_remote_code=trust_remote_code, |
| token=True, |
| config=base_config, |
| ) |
| else: |
| with init_empty_weights(include_buffers=False): |
| AutoModelForCausalLM.from_config( |
| base_config, |
| trust_remote_code=trust_remote_code, |
| ) |
|
|
| dist.barrier() |
|
|
| |
| config.pretrained = pretrained |
| if resolved_init_device == 'cpu': |
| if pretrained: |
| config.pretrain_cfg = { |
| "trust_remote_code": trust_remote_code, |
| "token": True, |
| "load_in_8bit": load_in_8bit, |
| } |
| model = AutoModelForCausalLMWithRM(config) |
| else: |
| config.pretrain_cfg = { |
| "trust_remote_code": trust_remote_code, |
| } |
| model = AutoModelForCausalLMWithRM(config) |
| elif resolved_init_device == 'meta': |
| if pretrained: |
| raise ValueError( |
| 'Setting cfg.pretrained=True is not supported when init_device="meta".', |
| ) |
| with init_empty_weights(include_buffers=False): |
| config.pretrain_cfg = { |
| "trust_remote_code": trust_remote_code, |
| } |
| model = AutoModelForCausalLMWithRM(config) |
| else: |
| raise ValueError( |
| f'init_device="{init_device}" must be either "cpu" or "meta".', |
| ) |
|
|
| signal_file_path = f'.node_{dist.get_node_rank()}_local_rank0_completed' |
| if dist.get_local_rank() == 0: |
| with open(signal_file_path, 'wb') as f: |
| f.write(b'local_rank0_completed_download') |
|
|
| |
| |
| with dist.local_rank_zero_download_and_wait(signal_file_path): |
| |
| dist.barrier() |
|
|
| if dist.get_local_rank() == 0: |
| os.remove(signal_file_path) |
|
|
| |
| |
| if model.config.tie_word_embeddings and resolved_init_device == 'meta': |
| model.tie_weights() |
|
|
| if pretrained_lora_id_or_path is not None: |
| """TODO not supported""" |
| raise NotImplementedError("PEFT IS NOT SUPPORTED") |
| |
| if prepare_for_fsdp: |
| |
| |
| prepare_hf_sequence_classification_model_for_fsdp(model, init_device) |
|
|
| |
| model.param_init_fn = lambda module: model._init_weights(module) |
| return model |
|
|