| import copy |
| import logging |
| import os |
| import io |
| import json |
| import torch |
| import transformers |
| import random |
|
|
| from tqdm import tqdm |
| from typing import Optional, Sequence, Dict |
| from dataclasses import dataclass, field |
| from torch.utils.data import Dataset |
| from transformers import Trainer |
|
|
| IGNORE_INDEX = -100 |
|
|
|
|
| @dataclass |
| class ModelArguments: |
| model_name_or_path: Optional[str] = field(default="davidkim205/komt-llama2-7b-v1") |
|
|
|
|
| @dataclass |
| class DataArguments: |
| data_path: str = field(default='datasets/komt_squad.json', metadata={"help": "Path to the training data."}) |
| complex_data: Optional[str] = field(default=None) |
|
|
|
|
| @dataclass |
| class TrainingArguments(transformers.TrainingArguments): |
| cache_dir: Optional[str] = field(default=None) |
| optim: str = field(default="adamw_torch") |
| output_dir: str = field(default="output/") |
| model_max_length: int = field( |
| default=2048, |
| metadata={"help": "Maximum sequence length. Sequences will be right padded (and possibly truncated)."}, |
| ) |
| per_device_train_batch_size: int = field( |
| default=32, metadata={"help": "Batch size per GPU/TPU/MPS/NPU core/CPU for training."} |
| ) |
| per_device_eval_batch_size: int = field( |
| default=32, metadata={"help": "Batch size per GPU/TPU/MPS/NPU core/CPU for evaluation."} |
| ) |
| num_train_epochs: float = field(default=3.0, metadata={"help": "Total number of training epochs to perform."}) |
| warmup_steps: int = field(default=2, metadata={"help": "Linear warmup over warmup_steps."}) |
| logging_steps: float = field( |
| default=1, |
| metadata={ |
| "help": ( |
| "Log every X updates steps. Should be an integer or a float in range `[0,1)`." |
| "If smaller than 1, will be interpreted as ratio of total training steps." |
| ) |
| }, |
| ) |
| lr_scheduler_type: Optional[str] = field(default='cosine') |
| fp16: bool = field( |
| default=True, |
| metadata={"help": "Whether to use fp16 (mixed) precision instead of 32-bit"}, |
| ) |
| learning_rate: float = field(default=1e-5, metadata={"help": "The initial learning rate for AdamW."}) |
|
|
| report_to: Optional[str] = field(default='tensorboard') |
| gradient_checkpointing: bool = field( |
| default=True, |
| metadata={ |
| "help": "If True, use gradient checkpointing to save memory at the expense of slower backward pass." |
| }, |
| ) |
|
|
| deepspeed: Optional[str] = field(default='configs/deepspeed_config.json') |
|
|
| def _tokenize_fn(strings: Sequence[str], tokenizer: transformers.PreTrainedTokenizer) -> Dict: |
| """Tokenize a list of strings.""" |
| tokenized_list = [ |
| tokenizer( |
| text, |
| return_tensors="pt", |
| padding="longest", |
| max_length=tokenizer.model_max_length, |
| truncation=True, |
| ) |
| for text in strings |
| ] |
| input_ids = labels = [tokenized.input_ids[0] for tokenized in tokenized_list] |
| input_ids_lens = labels_lens = [ |
| tokenized.input_ids.ne(tokenizer.pad_token_id).sum().item() for tokenized in tokenized_list |
| ] |
| return dict( |
| input_ids=input_ids, |
| labels=labels, |
| input_ids_lens=input_ids_lens, |
| labels_lens=labels_lens, |
| ) |
|
|
|
|
| def preprocess( |
| sources: Sequence[str], |
| targets: Sequence[str], |
| tokenizer: transformers.PreTrainedTokenizer, |
| ) -> Dict: |
| """Preprocess the data by tokenizing.""" |
| examples = [s + t for s, t in zip(sources, targets)] |
| examples_tokenized, sources_tokenized = [_tokenize_fn(strings, tokenizer) for strings in (tqdm(examples), sources)] |
| input_ids = examples_tokenized["input_ids"] |
| labels = copy.deepcopy(input_ids) |
| for label, source_len in zip(labels, sources_tokenized["input_ids_lens"]): |
| label[:source_len] = IGNORE_INDEX |
| return dict(input_ids=input_ids, labels=labels) |
|
|
|
|
| class SupervisedDataset(Dataset): |
| """Dataset for supervised fine-tuning.""" |
|
|
| def __init__(self, data_path: str, tokenizer: transformers.PreTrainedTokenizer): |
| super(SupervisedDataset, self).__init__() |
| logging.warning("Loading data...") |
| list_data_dict = jload(data_path) |
| random.shuffle(list_data_dict) |
| logging.warning("Formatting inputs...") |
|
|
| prompt_input = ("{instruction}\n\n### Response:") |
| sources = [ |
| prompt_input.format_map(example) for example in list_data_dict |
| ] |
| targets = [f"{example['output']}{tokenizer.eos_token}" for example in list_data_dict] |
| logging.warning("sample data") |
|
|
| logging.warning(sources[0]) |
| logging.warning(targets[0]) |
| logging.warning('------------------------') |
| logging.warning(sources[1]) |
| logging.warning(targets[1]) |
| logging.warning('------------------------') |
| logging.warning(sources[2]) |
| logging.warning(targets[2]) |
| logging.warning('------------------------') |
|
|
| logging.warning("Tokenizing inputs... This may take some time...") |
| data_dict = preprocess(sources, targets, tokenizer) |
|
|
| self.input_ids = data_dict["input_ids"] |
| self.labels = data_dict["labels"] |
|
|
| def __len__(self): |
| return len(self.input_ids) |
|
|
| def __getitem__(self, i) -> Dict[str, torch.Tensor]: |
| return dict(input_ids=self.input_ids[i], labels=self.labels[i]) |
|
|
|
|
| @dataclass |
| class DataCollatorForSupervisedDataset(object): |
| """Collate examples for supervised fine-tuning.""" |
|
|
| tokenizer: transformers.PreTrainedTokenizer |
|
|
| def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]: |
| input_ids, labels = tuple([instance[key] for instance in instances] for key in ("input_ids", "labels")) |
| input_ids = torch.nn.utils.rnn.pad_sequence( |
| input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id |
| ) |
| labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first=True, padding_value=IGNORE_INDEX) |
| return dict( |
| input_ids=input_ids, |
| labels=labels, |
| attention_mask=input_ids.ne(self.tokenizer.pad_token_id), |
| ) |
|
|
|
|
| def make_supervised_data_module(tokenizer: transformers.PreTrainedTokenizer, data_args) -> Dict: |
| """Make dataset and collator for supervised fine-tuning.""" |
|
|
| train_dataset = SupervisedDataset(tokenizer=tokenizer, data_path=data_args.data_path) |
| data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer) |
| return dict(train_dataset=train_dataset, eval_dataset=None, data_collator=data_collator) |
|
|
|
|
| def _make_w_io_base(f, mode: str): |
| if not isinstance(f, io.IOBase): |
| f_dirname = os.path.dirname(f) |
| if f_dirname != "": |
| os.makedirs(f_dirname, exist_ok=True) |
| f = open(f, mode=mode) |
| return f |
|
|
|
|
| def _make_r_io_base(f, mode: str): |
| if not isinstance(f, io.IOBase): |
| f = open(f, mode=mode) |
| return f |
|
|
|
|
| def jdump(obj, f, mode="w", indent=4, default=str): |
| """Dump a str or dictionary to a file in json format. |
| |
| Args: |
| obj: An object to be written. |
| f: A string path to the location on disk. |
| mode: Mode for opening the file. |
| indent: Indent for storing json dictionaries. |
| default: A function to handle non-serializable entries; defaults to `str`. |
| """ |
| f = _make_w_io_base(f, mode) |
| if isinstance(obj, (dict, list)): |
| json.dump(obj, f, indent=indent, default=default) |
| elif isinstance(obj, str): |
| f.write(obj) |
| else: |
| raise ValueError(f"Unexpected type: {type(obj)}") |
| f.close() |
|
|
|
|
| def jload(f, mode="r"): |
| """Load a .json file into a dictionary.""" |
| f = _make_r_io_base(f, mode) |
| jdict = json.load(f) |
| f.close() |
| return jdict |
|
|
|
|
| def train(): |
| parser = transformers.HfArgumentParser((ModelArguments, DataArguments, TrainingArguments)) |
| model_args, data_args, training_args = parser.parse_args_into_dataclasses() |
|
|
| model = transformers.AutoModelForCausalLM.from_pretrained( |
| model_args.model_name_or_path, |
| cache_dir=training_args.cache_dir, |
| ) |
|
|
| tokenizer = transformers.AutoTokenizer.from_pretrained( |
| model_args.model_name_or_path, |
| cache_dir=training_args.cache_dir, |
| model_max_length=training_args.model_max_length, |
| padding_side="right", |
| use_fast=False, |
| ) |
|
|
| tokenizer.add_special_tokens( |
| { |
| "eos_token": "</s>", |
| "bos_token": "</s>", |
| "unk_token": "</s>", |
| } |
| ) |
| train_dataset = SupervisedDataset(tokenizer=tokenizer, data_path=data_args.data_path) |
| data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer) |
| data_module = dict(train_dataset=train_dataset, eval_dataset=None, data_collator=data_collator) |
|
|
| model.is_parallelizable = True |
| model.model_parallel = True |
|
|
| trainer = Trainer(model=model, tokenizer=tokenizer, args=training_args, **data_module) |
| model.config.use_cache = False |
|
|
| trainer.train() |
| trainer.save_model(training_args.output_dir) |
| trainer.save_state() |
|
|
|
|
| if __name__ == "__main__": |
| train() |
|
|