Training / train.py
amarorn's picture
fix: corrigir crash de treinamento e adicionar salvamento de resultados
22dfad5
#!/usr/bin/env python3
"""
Script de treinamento gerado para HuggingFace Training Platform.
Execute este script no HuggingFace Training ou localmente.
"""
from datasets import load_dataset
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TrainingArguments,
Trainer,
DataCollatorForLanguageModeling,
)
from peft import LoraConfig, get_peft_model
import torch
import os
import json
from datetime import datetime
from pathlib import Path
# Configuração
MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct"
DATASET_REPO = "beAnalytic/eda-training-dataset"
OUTPUT_REPO = "beAnalytic/eda-llm-model"
# Carregar dataset
print(f"Carregando dataset: {DATASET_REPO}")
dataset = load_dataset(DATASET_REPO)
# Configurar variáveis de ambiente para evitar problemas de memória
os.environ["OMP_NUM_THREADS"] = "1"
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")
# Carregar modelo e tokenizer
print(f"Carregando modelo: {MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
# Verificar se há GPU disponível
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
# Carregar modelo sem quantização (LoRA é suficiente para reduzir memória)
# Quantização 4-bit está causando problemas de GPU RAM no HuggingFace Space
print("Carregando modelo (sem quantização, usando LoRA para eficiência)...")
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto" if device == "cuda" else None,
trust_remote_code=True,
use_cache=False,
)
if device == "cpu":
print("⚠️ Modelo carregado em CPU - treinamento será mais lento")
# Configurar LoRA
peft_config = LoraConfig(
r=16,
lora_alpha=32,
target_modules=['q_proj', 'v_proj', 'k_proj', 'o_proj'],
lora_dropout=0.1,
bias="none",
task_type="CAUSAL_LM",
)
model = get_peft_model(model, peft_config)
# Formatar prompts
EDA_SYSTEM_PROMPT = (
"Você é um analista de dados experiente, focado em gerar INSIGHTS e não em descrever processos técnicos.\n\n"
"Sua tarefa é realizar uma Análise Exploratória de Dados (EDA) extraindo padrões, tendências e comportamentos relevantes dos dados.\n\n"
"REGRAS OBRIGATÓRIAS:\n\n"
"1. NÃO descreva etapas técnicas, bibliotecas, código ou ferramentas (pandas, Python, gráficos, etc.).\n"
"2. NÃO explique \"como fazer\" a análise.\n"
"3. Extraia padrões, tendências e comportamentos relevantes dos dados.\n"
"4. Diferencie claramente:\n"
" • Observação (o que é visível nos dados)\n"
" • Interpretação (o que isso pode significar)\n"
" • Insight (qual a implicação prática ou de negócio)\n"
"5. Declare explicitamente o nível de confiança de cada insight (alto / médio / baixo).\n"
"6. Quando não houver dados suficientes, diga claramente \"não é possível afirmar\".\n\n"
"FORMATO OBRIGATÓRIO DA RESPOSTA:\n\n"
"Observações:\n"
"- …\n\n"
"Interpretações:\n"
"- …\n\n"
"Insights:\n"
"- …\n\n"
"Nível de confiança:\n"
"- …\n\n"
"OBJETIVO: Entregar conclusões úteis, claras e acionáveis, como um analista humano experiente faria."
)
def format_prompt(example):
input_text = example.get("input", "")
output_text = example.get("output", "")
prompt = f"<|system|>\n{EDA_SYSTEM_PROMPT}\n<|user|>\n{input_text}\n<|assistant|>\n{output_text}"
return {"text": prompt}
# Aplicar formatação
train_dataset = dataset["train"].map(format_prompt, remove_columns=dataset["train"].column_names)
# Verificar se existe dataset de validação, caso contrário criar a partir do train
if "validation" in dataset:
eval_dataset = dataset["validation"].map(format_prompt, remove_columns=dataset["validation"].column_names)
else:
print("⚠️ Dataset de validação não encontrado. Criando divisão a partir do dataset de treinamento...")
# Dividir o dataset de treinamento em train e validation (80/20)
split_dataset = dataset["train"].train_test_split(test_size=0.2, seed=42)
train_dataset = split_dataset["train"].map(format_prompt, remove_columns=split_dataset["train"].column_names)
eval_dataset = split_dataset["test"].map(format_prompt, remove_columns=split_dataset["test"].column_names)
print(f"✅ Dataset dividido: {len(train_dataset)} exemplos de treino, {len(eval_dataset)} exemplos de validação")
# Tokenizar
def tokenize_function(examples):
return tokenizer(
examples["text"],
truncation=True,
max_length=1024,
padding="max_length",
)
train_dataset = train_dataset.map(tokenize_function, batched=True, remove_columns=["text"])
eval_dataset = eval_dataset.map(tokenize_function, batched=True, remove_columns=["text"])
# Criar diretório de logs
logs_dir = Path("./logs")
logs_dir.mkdir(exist_ok=True)
# Configurar argumentos de treinamento
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=2,
per_device_eval_batch_size=2,
learning_rate=3e-05,
warmup_steps=100,
logging_steps=10,
save_steps=500,
eval_strategy="steps",
eval_steps=500,
save_total_limit=3,
load_best_model_at_end=True,
fp16=device == "cuda",
gradient_accumulation_steps=4,
dataloader_pin_memory=False,
push_to_hub=True,
hub_model_id=OUTPUT_REPO,
hub_strategy="checkpoint",
)
# Data collator
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False,
)
# Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
data_collator=data_collator,
)
# Treinar
print("Iniciando treinamento...")
try:
train_output = trainer.train()
except Exception as e:
print(f"❌ Erro durante treinamento: {e}")
# Tentar salvar resultados mesmo em caso de erro
train_output = None
# Coletar estado atual se possível
try:
state = trainer.state
final_log_history = state.log_history if hasattr(state, 'log_history') and state.log_history else []
except:
final_log_history = []
# Salvar log de erro
error_info = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"error": str(e),
"model_name": MODEL_NAME,
"dataset_repo": DATASET_REPO,
"status": "failed"
}
error_file = logs_dir / f"training_error_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
with open(error_file, 'w', encoding='utf-8') as f:
json.dump(error_info, f, indent=2, ensure_ascii=False)
print(f"✅ Informações de erro salvas em: {error_file}")
raise
# Coletar métricas finais do estado do trainer
state = trainer.state
final_log_history = state.log_history if hasattr(state, 'log_history') and state.log_history else []
# Tentar obter loss final de diferentes fontes
final_train_loss = None
if train_output and hasattr(train_output, 'training_loss'):
final_train_loss = train_output.training_loss
elif final_log_history:
for log_entry in reversed(final_log_history):
if 'loss' in log_entry and 'eval_loss' not in log_entry:
final_train_loss = log_entry.get('loss')
break
# Buscar últimas métricas de validação
last_eval_metrics = {}
if final_log_history:
for log_entry in reversed(final_log_history):
if 'eval_loss' in log_entry:
last_eval_metrics = {k: v for k, v in log_entry.items() if k.startswith('eval_')}
break
# Coletar informações do treinamento
training_info = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"model_name": MODEL_NAME,
"dataset_repo": DATASET_REPO,
"output_repo": OUTPUT_REPO,
"training_config": {
"num_train_epochs": training_args.num_train_epochs,
"per_device_train_batch_size": training_args.per_device_train_batch_size,
"per_device_eval_batch_size": training_args.per_device_eval_batch_size,
"gradient_accumulation_steps": training_args.gradient_accumulation_steps,
"learning_rate": training_args.learning_rate,
"warmup_steps": training_args.warmup_steps,
"fp16": training_args.fp16,
},
"dataset_info": {
"train_samples": len(train_dataset),
"eval_samples": len(eval_dataset) if eval_dataset else 0,
},
"training_results": {
"final_train_loss": final_train_loss,
"final_eval_metrics": last_eval_metrics,
"total_steps": len(final_log_history) if final_log_history else 0,
"log_history": final_log_history[-50:],
},
"status": "completed",
}
# Salvar resultados em JSON
results_file = logs_dir / f"training_results_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(training_info, f, indent=2, ensure_ascii=False)
print(f"✅ Resultados salvos em: {results_file}")
# Criar resumo em texto legível
summary_file = logs_dir / f"training_summary_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.txt"
with open(summary_file, 'w', encoding='utf-8') as f:
f.write("=" * 80 + "\n")
f.write("RESUMO DO TREINAMENTO\n")
f.write("=" * 80 + "\n\n")
f.write(f"Data/Hora: {training_info['timestamp']}\n")
f.write(f"Modelo: {MODEL_NAME}\n")
f.write(f"Dataset: {DATASET_REPO}\n")
f.write(f"Output: {OUTPUT_REPO}\n\n")
f.write("CONFIGURAÇÃO DE TREINAMENTO:\n")
f.write("-" * 80 + "\n")
config = training_info['training_config']
f.write(f"Épocas: {config['num_train_epochs']}\n")
f.write(f"Batch Size (train): {config['per_device_train_batch_size']}\n")
f.write(f"Batch Size (eval): {config['per_device_eval_batch_size']}\n")
f.write(f"Gradient Accumulation Steps: {config['gradient_accumulation_steps']}\n")
f.write(f"Learning Rate: {config['learning_rate']}\n")
f.write(f"Warmup Steps: {config['warmup_steps']}\n")
f.write(f"FP16: {config['fp16']}\n\n")
f.write("DATASET:\n")
f.write("-" * 80 + "\n")
dataset_info = training_info['dataset_info']
f.write(f"Amostras de Treino: {dataset_info['train_samples']}\n")
f.write(f"Amostras de Validação: {dataset_info['eval_samples']}\n\n")
f.write("RESULTADOS:\n")
f.write("-" * 80 + "\n")
results = training_info['training_results']
if results['final_train_loss'] is not None:
f.write(f"Loss Final (Treino): {results['final_train_loss']:.6f}\n")
if results['final_eval_metrics']:
f.write("\nMétricas Finais de Validação:\n")
for key, value in results['final_eval_metrics'].items():
if isinstance(value, float):
f.write(f" {key}: {value:.6f}\n")
else:
f.write(f" {key}: {value}\n")
f.write(f"\nTotal de Steps: {results['total_steps']}\n")
f.write(f"Status: {training_info['status']}\n")
print(f"✅ Resumo salvo em: {summary_file}")
# Fazer push final
print(f"Fazendo push do modelo final para {OUTPUT_REPO}")
try:
trainer.push_to_hub()
print("✅ Push para Hub concluído!")
except Exception as e:
print(f"⚠️ Aviso: Erro ao fazer push para Hub: {e}")
print("Os checkpoints estão salvos localmente em ./results")
print("✅ Treinamento concluído!")
def analyze_schema(csv_description: str, model_path: str = None):
"""
Função de inferência - modelo já 'obrigado' a pensar certo.
Args:
csv_description: Descrição do dataset CSV para análise
model_path: Caminho para o modelo treinado (opcional, usa modelo atual se None)
Returns:
Análise EDA gerada pelo modelo treinado
"""
# Se model_path for fornecido, carregar modelo treinado
inference_model = model
inference_tokenizer = tokenizer
if model_path:
print(f"Carregando modelo treinado de: {model_path}")
inference_tokenizer = AutoTokenizer.from_pretrained(model_path)
inference_model = AutoModelForCausalLM.from_pretrained(
model_path,
device_map="auto",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
prompt = f"""<|system|>
{EDA_SYSTEM_PROMPT}
<|user|>
Analise o seguinte dataset:
{csv_description}
<|assistant|>
"""
inputs = inference_tokenizer(prompt, return_tensors="pt").to(inference_model.device)
output = inference_model.generate(
**inputs,
max_new_tokens=1200,
temperature=0.2,
do_sample=False
)
return inference_tokenizer.decode(output[0], skip_special_tokens=True)
# Exemplo de uso após o treinamento:
# resultado = analyze_schema("Descrição do seu dataset aqui...")
# print(resultado)