| from datasets import load_dataset
|
| from transformers import BitsAndBytesConfig, AutoModelForCausalLM, AutoTokenizer
|
| import torch
|
| from peft import LoraConfig, get_peft_model
|
| import transformers
|
| from datetime import datetime
|
| import os
|
|
|
| os.environ["CUDA_VISIBLE_DEVICES"] = "0"
|
|
|
| def apply_chat_template(example):
|
|
|
| messages = [
|
| {
|
| "role": "system",
|
| "content": "You are a chess grandmaster specializing in finding checkmate moves in any chess position."
|
| },
|
| {
|
| "role": "user",
|
| "content": f"Given the following chessboard, identify the move that delivers checkmate:\n\n{example['board']}\n\n"
|
| },
|
| {
|
| "role": "assistant",
|
| "content": f"The move to achieve checkmate is: {example['mate']}"
|
| }
|
| ]
|
|
|
|
|
| formatted_text = ""
|
| for msg in messages:
|
| formatted_text += f"{msg['content']} "
|
|
|
| example["text"] = formatted_text.strip()
|
| return example
|
|
|
|
|
| def main():
|
|
|
| data_files = {
|
| 'train': '/home/luciano/Documents/Tesis Ezequiel/Tesis/data_boards/high_train.csv',
|
| 'test': '/home/luciano/Documents/Tesis Ezequiel/Tesis/data_boards/high_test.csv',
|
| }
|
|
|
|
|
| dataset = load_dataset(
|
| 'csv',
|
| data_files=data_files,
|
| delimiter=',',
|
| usecols=['board', 'mate'],
|
| on_bad_lines='skip',
|
| )
|
|
|
|
|
|
|
| train_dataset = dataset['train']
|
| eval_dataset = dataset['test']
|
|
|
| print('Train Dataset:', train_dataset, '\nTest Dataset:', eval_dataset)
|
|
|
|
|
| train_dataset = train_dataset.map(
|
| apply_chat_template,
|
| num_proc=2,
|
|
|
| )
|
|
|
| eval_dataset = eval_dataset.map(
|
| apply_chat_template,
|
| num_proc=2,
|
|
|
| desc="Applying chat template"
|
| )
|
|
|
|
|
| print("\nFirst Training Example Text:\n", train_dataset[0]['text'])
|
|
|
|
|
| quantization_config = BitsAndBytesConfig(
|
| load_in_4bit=True,
|
| bnb_4bit_quant_type="nf4",
|
| bnb_4bit_compute_dtype=torch.bfloat16,
|
| )
|
|
|
| model_id = 'mistralai/Mistral-7B-Instruct-v0.3'
|
|
|
|
|
| model = AutoModelForCausalLM.from_pretrained(
|
| model_id,
|
| attn_implementation='eager',
|
| trust_remote_code=True,
|
| quantization_config=quantization_config,
|
| device_map="auto"
|
| )
|
|
|
| print("Model is loaded on device:", next(model.parameters()).device)
|
|
|
|
|
| tokenizer = AutoTokenizer.from_pretrained(
|
| model_id,
|
| padding_side="right",
|
| use_fast=False,
|
| )
|
| tokenizer.pad_token = tokenizer.eos_token
|
|
|
|
|
| print("\nTokenizer Special Tokens:")
|
| print("EOS Token:", tokenizer.eos_token)
|
| print("BOS Token:", tokenizer.bos_token)
|
| print("PAD Token:", tokenizer.pad_token)
|
|
|
| def generate_and_tokenize_prompt(data_point):
|
|
|
| prompt = (
|
| "You are a chess grandmaster specializing in finding checkmate moves in any chess position. "
|
| "Given the following chessboard, identify the move that delivers checkmate:\n\n"
|
| f"{data_point['board']}\n\n"
|
| )
|
| response = f"The move to achieve checkmate is: {data_point['mate']}"
|
|
|
|
|
| tokenized = tokenizer(
|
| prompt + response,
|
| padding='max_length',
|
| truncation=True,
|
| max_length=200,
|
| return_tensors='pt',
|
| )
|
|
|
| input_ids = tokenized['input_ids'][0].tolist()
|
| attention_mask = tokenized['attention_mask'][0].tolist()
|
|
|
|
|
| response_start_str = response
|
| response_start_idx = (prompt + response).find(response_start_str)
|
|
|
| if response_start_idx == -1:
|
| print("Warning: Response start string not found in the concatenated text.")
|
| response_start_idx = len(prompt)
|
|
|
|
|
| prompt_tokenized = tokenizer(
|
| prompt,
|
| add_special_tokens=False,
|
| return_tensors='pt'
|
| )
|
| prompt_length = prompt_tokenized['input_ids'].shape[1]
|
|
|
|
|
| labels = [-100] * prompt_length + input_ids[prompt_length:]
|
|
|
|
|
| if len(labels) < 200:
|
| labels += [-100] * (200 - len(labels))
|
| else:
|
| labels = labels[:200]
|
|
|
|
|
| input_ids = input_ids[:200]
|
| attention_mask = attention_mask[:200]
|
| labels = labels[:200]
|
|
|
| """ # Debug prints to verify correctness
|
| print("\n--- Tokenization Debug ---")
|
| print("Prompt Text:\n", prompt)
|
| print("Response Text:\n", response)
|
| print("Prompt Token IDs:", prompt_tokenized['input_ids'][0].tolist())
|
| print("Response Token IDs:", input_ids[prompt_length:])
|
| print("Combined Input IDs:", input_ids)
|
| print("Combined Attention Mask:", attention_mask)
|
| print("Combined Labels:", labels)
|
| print("Decoded Input IDs:\n", tokenizer.decode(input_ids, skip_special_tokens=False))
|
| print("--- End of Debug ---\n")"""
|
|
|
| return {
|
| 'input_ids': input_ids,
|
| 'attention_mask': attention_mask,
|
| 'labels': labels
|
| }
|
|
|
|
|
|
|
| def generate_and_tokenize_prompt_wrapper(x):
|
| return generate_and_tokenize_prompt(x)
|
|
|
|
|
| tokenized_train_dataset = train_dataset.map(
|
| generate_and_tokenize_prompt_wrapper,
|
| remove_columns=['text'],
|
| batched=False,
|
| )
|
|
|
| tokenized_val_dataset = eval_dataset.map(
|
| generate_and_tokenize_prompt_wrapper,
|
| remove_columns=['text'],
|
| batched=False,
|
| )
|
|
|
|
|
| sample = tokenized_train_dataset[0]
|
| print("\n--- Tokenized Sample ---")
|
| print("Input IDs:", sample['input_ids'])
|
| print("Attention Mask:", sample['attention_mask'])
|
| print("Labels:", sample['labels'])
|
| print("Decoded Input IDs:\n", tokenizer.decode(sample['input_ids'], skip_special_tokens=False))
|
| print("--- End of Sample ---\n")
|
|
|
|
|
| lora_config = LoraConfig(
|
| r=64,
|
| lora_alpha=16,
|
| lora_dropout=0.1,
|
| bias="none",
|
| task_type="CAUSAL_LM",
|
| target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
|
| )
|
|
|
| model = get_peft_model(model, lora_config)
|
|
|
| project = "tesis"
|
| base_model_name = "med"
|
| run_name = f"{base_model_name}-{project}"
|
| output_dir = f"./{run_name}"
|
|
|
|
|
| training_args = transformers.TrainingArguments(
|
| output_dir=output_dir,
|
| max_grad_norm=1.0,
|
| warmup_steps=100,
|
| num_train_epochs=1,
|
| per_device_train_batch_size=11,
|
| per_device_eval_batch_size=10,
|
| gradient_accumulation_steps=4,
|
| evaluation_strategy="epoch",
|
| eval_steps=50,
|
| save_steps=1000,
|
| logging_steps=10,
|
| learning_rate=1e-5,
|
| fp16=True,
|
| logging_dir=r"/home/luciano/Documents/Tesis Ezequiel/Tesis/med/logs_med",
|
| report_to="tensorboard",
|
| run_name=f"{run_name}-{datetime.now().strftime('%Y-%m-%d-%H-%M')}",
|
| )
|
|
|
|
|
| trainer = transformers.Trainer(
|
| model=model,
|
| train_dataset=tokenized_train_dataset,
|
| eval_dataset=tokenized_val_dataset,
|
| args=training_args,
|
| data_collator=transformers.DataCollatorForLanguageModeling(tokenizer, mlm=False),
|
| )
|
|
|
|
|
| model.config.use_cache = False
|
|
|
|
|
| trainer.train(resume_from_checkpoint=r'/home/luciano/Documents/Tesis Ezequiel/Tesis/med/med_checkpoint')
|
|
|
|
|
| trainer.save_model("./fine-tuned-model_high")
|
| tokenizer.save_pretrained("./fine-tuned-model_high")
|
|
|
|
|
| if __name__ == "__main__":
|
| main() |