gue_save / train.py
nancyH's picture
Upload folder using huggingface_hub
f2c46e8 verified
import wandb
# wandb.login(key="293cdcc20c72cb7e8cc5a077eaacf86b254e46ed")
#Nancy
wandb.login(key="04fa40f46e9b09c72fc2dcb1457767c7ad809037")
import os
import sys
os.environ["DISABLE_TRITON"] = "1"
sys.modules['triton'] = None
sys.modules['flash_attn_triton'] = None
import csv
import copy
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Optional, Dict, Sequence, Tuple, List, Union
import torch
import transformers
import sklearn
import numpy as np
from torch.utils.data import Dataset
import importlib
from pathlib import Path
import itertools
from transformers import BertConfig, BertForSequenceClassification
from transformers import (
WEIGHTS_NAME,
AdamW,
BertConfig,
BertForMaskedLM,
BertTokenizer,
CamembertConfig,
CamembertForMaskedLM,
CamembertTokenizer,
DistilBertConfig,
DistilBertForMaskedLM,
DistilBertTokenizer,
GPT2Config,
GPT2LMHeadModel,
GPT2Tokenizer,
OpenAIGPTConfig,
OpenAIGPTLMHeadModel,
OpenAIGPTTokenizer,
PreTrainedModel,
PreTrainedTokenizer,
RobertaConfig,
RobertaForMaskedLM,
RobertaTokenizer,
get_linear_schedule_with_warmup,
)
@dataclass
class ModelArguments:
model_name_or_path: Optional[str] = field(default="facebook/opt-125m")
trust_remote_code: bool = field(default=False, metadata={"help": "for custom models(has custom code that needs to be executed (e.g., custom architectures, tokenizers, or modeling files)), whether local or from the Hub"})
use_lora: bool = field(default=False, metadata={"help": "whether to use LoRA"})
lora_r: int = field(default=8, metadata={"help": "hidden dimension for LoRA"})
lora_alpha: int = field(default=32, metadata={"help": "alpha for LoRA"})
lora_dropout: float = field(default=0.05, metadata={"help": "dropout rate for LoRA"})
lora_target_modules: str = field(default="query,value", metadata={"help": "where to perform LoRA"})
tokenizer_path: Optional[str] = field(default="facebook/opt-125m")
@dataclass
class DataArguments:
data_path: str = field(default=None, metadata={"help": "Path to the training data."})
kmer: int = field(default=-1, metadata={"help": "k-mer for input sequence. -1 means not using k-mer."})
customized_tokenizer: Optional[str] = field(default=None)
@dataclass
class TrainingArguments(transformers.TrainingArguments):
vocab_file: Optional[str] = field(
default=None,
metadata={"help": "Path to custom vocabulary file (overrides Hugging Face default)"}
)
cache_dir: Optional[str] = field(default=None)
run_name: str = field(default="run")
optim: str = field(default="adamw_torch")
model_max_length: int = field(default=512, metadata={"help": "Maximum sequence length."})
gradient_accumulation_steps: int = field(default=1)
per_device_train_batch_size: int = field(default=1)
per_device_eval_batch_size: int = field(default=1)
num_train_epochs: int = field(default=1)
fp16: bool = field(default=False)
logging_steps: int = field(default=100)
save_steps: int = field(default=100)
eval_steps: int = field(default=100)
evaluation_strategy: str = field(default="steps"),
warmup_steps: int = field(default=50)
weight_decay: float = field(default=0.01)
learning_rate: float = field(default=1e-4)
save_total_limit: int = field(default=3)
load_best_model_at_end: bool = field(default=False)
output_dir: str = field(default="output")
find_unused_parameters: bool = field(default=False)
checkpointing: bool = field(default=False)
dataloader_pin_memory: bool = field(default=False)
eval_and_save_results: bool = field(default=True)
save_model: bool = field(default=False)
seed: int = field(default=42)
project_name: str = field(default=None)
def safe_save_model_for_hf_trainer(trainer: transformers.Trainer, output_dir: str):
"""Collects the state dict and dump to disk."""
state_dict = trainer.model.state_dict()
if trainer.args.should_save:
cpu_state_dict = {key: value.cpu() for key, value in state_dict.items()}
del state_dict
trainer._save(output_dir, state_dict=cpu_state_dict) # noqa
"""
Get the reversed complement of the original DNA sequence.
"""
def get_alter_of_dna_sequence(sequence: str):
MAP = {"A": "T", "T": "A", "C": "G", "G": "C"}
# return "".join([MAP[c] for c in reversed(sequence)])
return "".join([MAP[c] for c in sequence])
"""
Transform a dna sequence to k-mer string
"""
def generate_kmer_str(sequence: str, k: int) -> str:
"""Generate k-mer string from DNA sequence."""
return " ".join([sequence[i:i+k] for i in range(len(sequence) - k + 1)])
"""
Load or generate k-mer string for each DNA sequence. The generated k-mer string will be saved to the same directory as the original data with the same name but with a suffix of "_{k}mer".
"""
def load_or_generate_kmer(data_path: str, texts: List[str], k: int) -> List[str]:
"""Load or generate k-mer string for each DNA sequence."""
kmer_path = data_path.replace(".csv", f"_{k}mer.json")
if os.path.exists(kmer_path):
logging.warning(f"Loading k-mer from {kmer_path}...")
with open(kmer_path, "r") as f:
kmer = json.load(f)
else:
logging.warning(f"Generating k-mer...")
kmer = [generate_kmer_str(text, k) for text in texts]
with open(kmer_path, "w") as f:
logging.warning(f"Saving k-mer to {kmer_path}...")
json.dump(kmer, f)
return kmer
def load_customized_data(data_path: str, texts: List[str], customized_tokenizer: str) -> List[str]:
"""Load or generate k-mer string for each DNA sequence."""
customize_path = data_path.replace(".csv", f"_{customized_tokenizer}.json")
print(customize_path)
if os.path.exists(customize_path):
logging.warning(f"Loading data by customized tokenizer from {customize_path}...")
with open(customize_path, "r") as f:
data = json.load(f)
return data
class SupervisedDataset(Dataset):
"""Dataset for supervised fine-tuning."""
def __init__(self,
data_path: str,
tokenizer: transformers.PreTrainedTokenizer,
kmer: int = -1,
customized_tokenizer = None):
super(SupervisedDataset, self).__init__()
# load data from the disk
with open(data_path, "r") as f:
data = list(csv.reader(f))[1:]
if len(data[0]) == 2:
# data is in the format of [text, label]
logging.warning("Perform single sequence classification...")
texts = [d[0] for d in data]
labels = [int(d[1]) for d in data]
elif len(data[0]) == 3:
# data is in the format of [text1, text2, label]
logging.warning("Perform sequence-pair classification...")
texts = [[d[0], d[1]] for d in data]
labels = [int(d[2]) for d in data]
else:
raise ValueError("Data format not supported.")
if kmer != -1:
logging.warning(f"Using {kmer}-mer as input...")
texts = load_or_generate_kmer(data_path, texts, kmer)
elif kmer == -1 and customized_tokenizer:
logging.warning(f"Using {customized_tokenizer} as input...")
texts = load_customized_data(data_path, texts, customized_tokenizer)
output = tokenizer(
texts,
return_tensors="pt",
padding="longest",
max_length=tokenizer.model_max_length,
truncation=True,
)
# print(texts, output["input_ids"])
self.input_ids = output["input_ids"]
self.attention_mask = output["attention_mask"]
self.labels = labels
self.num_labels = len(set(labels))
def __len__(self):
return len(self.input_ids)
def __getitem__(self, i) -> Dict[str, torch.Tensor]:
return dict(input_ids=self.input_ids[i], labels=self.labels[i])
@dataclass
class DataCollatorForSupervisedDataset(object):
"""Collate examples for supervised fine-tuning."""
tokenizer: transformers.PreTrainedTokenizer
def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
input_ids, labels = tuple([instance[key] for instance in instances] for key in ("input_ids", "labels"))
input_ids = torch.nn.utils.rnn.pad_sequence(
input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id
)
labels = torch.Tensor(labels).long()
return dict(
input_ids=input_ids,
labels=labels,
attention_mask=input_ids.ne(self.tokenizer.pad_token_id),
)
"""
Manually calculate the accuracy, f1, matthews_correlation, precision, recall with sklearn.
"""
def calculate_metric_with_sklearn(predictions: np.ndarray, labels: np.ndarray):
valid_mask = labels != -100 # Exclude padding tokens (assuming -100 is the padding token ID)
valid_predictions = predictions[valid_mask]
valid_labels = labels[valid_mask]
return {
"accuracy": sklearn.metrics.accuracy_score(valid_labels, valid_predictions),
"f1": sklearn.metrics.f1_score(
valid_labels, valid_predictions, average="macro", zero_division=0
),
"matthews_correlation": sklearn.metrics.matthews_corrcoef(
valid_labels, valid_predictions
),
"precision": sklearn.metrics.precision_score(
valid_labels, valid_predictions, average="macro", zero_division=0
),
"recall": sklearn.metrics.recall_score(
valid_labels, valid_predictions, average="macro", zero_division=0
),
}
# from: https://discuss.huggingface.co/t/cuda-out-of-memory-when-using-trainer-with-compute-metrics/2941/13
def preprocess_logits_for_metrics(logits:Union[torch.Tensor, Tuple[torch.Tensor, Any]], _):
if isinstance(logits, tuple): # Unpack logits if it's a tuple
logits = logits[0]
if logits.ndim == 3:
# Reshape logits to 2D if needed
logits = logits.reshape(-1, logits.shape[-1])
return torch.argmax(logits, dim=-1)
"""
Compute metrics used for huggingface trainer.
"""
def compute_metrics(eval_pred):
predictions, labels = eval_pred
return calculate_metric_with_sklearn(predictions, labels)
def load_token_v5_1(tokenizer_kwargs):
config_class, model_class, tokenizer_class = MODEL_CLASSES['motifBert']
tokenizer = MotifTokenizer(**tokenizer_kwargs)
bases = ['A', 'T', 'C', 'G']
token_wc = [
f"{operator}_POS_{i}_*_{char}"
for operator, i, char in itertools.product(['WC'], range(12), bases)
]
motif_wildcarded = []
with open(os.path.join('/storage2/fs1/btc/Active/yeli/xiaoxiao.zhou/tokenize/tokenizers/tokenizer_v5.1/hg38_NOOP', "motifs_wildcard.txt"), "r") as file:
for line in file:
seq, operations = line.strip().split(maxsplit=1) # Split only on the first space
motif_wildcarded.append(operations.split()[0]) # Store in dictionary
tokenizer.add_tokens(token_wc + motif_wildcarded)
return tokenizer
def load_token_v4(tokenizer_kwargs):
config_class, model_class, tokenizer_class = MODEL_CLASSES['motifBert']
tokenizer = MotifTokenizer(**tokenizer_kwargs)
bases = ['A', 'T', 'C', 'G']
token_del = [
f"{operator}_POS_{i}_{char}"
for operator, i, char in itertools.product(['DEL'], range(12), bases)
]
token_rep = [
f"{operator}_POS_{i}_{char1}_{char2}"
for operator, i, char1, char2 in itertools.product(['SUB'], range(12), bases, bases)
if char1 != char2
]
token_wc = [
f"{operator}_POS_{i}_*_{char}"
for operator, i, char in itertools.product(['WC'], range(12), bases)
]
token_ins = [
f"{operator}_POS_{i}_{char}"
for operator, i, char in itertools.product(['INS'], range(13), bases)
]
motif_wildcarded = []
with open(os.path.join('/storage2/fs1/btc/Active/yeli/xiaoxiao.zhou/tokenize/tokenizers/tokenizer_v4/hg38', "motifs_wildcard.txt"), "r") as file:
for line in file:
seq, operations = line.strip().split(maxsplit=1) # Split only on the first space
motif_wildcarded.append(operations.split()[0]) # Store in dictionary
tokenizer.add_tokens(token_del + token_rep + token_wc + token_ins + motif_wildcarded)
return tokenizer
def train():
parser = transformers.HfArgumentParser((ModelArguments, DataArguments, TrainingArguments))
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
wandb.init(
project=training_args.project_name,
)
tokenizer_kwargs = {
"cache_dir": training_args.cache_dir,
"model_max_length": training_args.model_max_length,
"padding_side": "right",
"use_fast": True,
"trust_remote_code": model_args.trust_remote_code # 除非必要否则建议保持False
}
if training_args.vocab_file is not None:
if not os.path.exists(training_args.vocab_file):
raise ValueError(f"Vocab file not found at: {training_args.vocab_file}")
tokenizer_kwargs["vocab_file"] = training_args.vocab_file
if data_args.customized_tokenizer == 'token_v4':
tokenizer = load_token_v4(tokenizer_kwargs)
elif data_args.customized_tokenizer == 'token_v5_1':
tokenizer = load_token_v5_1(tokenizer_kwargs)
else:
tokenizer = transformers.PreTrainedTokenizerFast(
tokenizer_file=model_args.tokenizer_path,
**tokenizer_kwargs
)
tokenizer.pad_token = "[PAD]"
tokenizer.unk_token = "[UNK]"
tokenizer.cls_token = "[CLS]"
tokenizer.sep_token = "[SEP]"
tokenizer.mask_token = "[MASK]"
if "InstaDeepAI" in model_args.model_name_or_path:
tokenizer.eos_token = tokenizer.pad_token
# define datasets and data collator
train_dataset = SupervisedDataset(tokenizer=tokenizer,
data_path=os.path.join(data_args.data_path, "train.csv"),
kmer=data_args.kmer,
customized_tokenizer=data_args.customized_tokenizer)
val_dataset = SupervisedDataset(tokenizer=tokenizer,
data_path=os.path.join(data_args.data_path, "dev.csv"),
kmer=data_args.kmer,
customized_tokenizer=data_args.customized_tokenizer)
test_dataset = SupervisedDataset(tokenizer=tokenizer,
data_path=os.path.join(data_args.data_path, "test.csv"),
kmer=data_args.kmer,
customized_tokenizer=data_args.customized_tokenizer)
data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer)
config = transformers.AutoConfig.from_pretrained(
model_args.model_name_or_path,
num_labels = train_dataset.num_labels,
trust_remote_code=model_args.trust_remote_code
)
model = transformers.AutoModelForSequenceClassification.from_pretrained(
model_args.model_name_or_path,
cache_dir=training_args.cache_dir,
config=config, # pass the adjusted config
trust_remote_code=model_args.trust_remote_code
).to("cuda")
# configure LoRA
if model_args.use_lora:
lora_config = LoraConfig(
r=model_args.lora_r,
lora_alpha=model_args.lora_alpha,
target_modules=list(model_args.lora_target_modules.split(",")),
lora_dropout=model_args.lora_dropout,
bias="none",
task_type="SEQ_CLS",
inference_mode=False,
)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
# define trainer
trainer = transformers.Trainer(model=model,
tokenizer=tokenizer,
args=training_args,
preprocess_logits_for_metrics=preprocess_logits_for_metrics,
compute_metrics=compute_metrics,
train_dataset=train_dataset,
eval_dataset=val_dataset,
data_collator=data_collator)
trainer.train()
if training_args.save_model:
trainer.save_state()
safe_save_model_for_hf_trainer(trainer=trainer, output_dir=training_args.output_dir)
# get the evaluation results from trainer
if training_args.eval_and_save_results:
results_path = os.path.join(training_args.output_dir, "results", training_args.run_name)
results = trainer.evaluate(eval_dataset=test_dataset)
os.makedirs(results_path, exist_ok=True)
with open(os.path.join(results_path, "eval_results.json"), "w") as f:
json.dump(results, f)
if __name__ == "__main__":
train()