eda_trainning_lora / train.py
Ademir
Initial clean commit: scripts and config without logs
d4a00b2
#!/usr/bin/env python3
"""
Script de treinamento gerado para HuggingFace Training Platform.
Execute este script no HuggingFace Training ou localmente.
"""
import os
import sys
def _sanitize_thread_env() -> None:
"""Evita libgomp: Invalid value quando OMP_NUM_THREADS vem vazio ou invalido (ex.: Space/parent)."""
raw = (os.environ.get("OMP_NUM_THREADS") or "").strip()
if not raw.isdigit() or int(raw) < 1:
os.environ["OMP_NUM_THREADS"] = "1"
_sanitize_thread_env()
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
import json
import tempfile
from datetime import datetime
from pathlib import Path
import torch
from datasets import load_dataset
from peft import LoraConfig, get_peft_model
from reporting import build_report_backends
from training_env import parse_env_int
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
DataCollatorForLanguageModeling,
Trainer,
TrainerCallback,
TrainingArguments,
)
try:
from bi_backend.application.training.evaluators import FunctionalEvalRunner
from bi_backend.application.training.observability import (
CompositePublisher,
MlflowPublisher,
RealtimeMetricsCallback,
TensorboardPublisher,
WandbPublisher,
)
except Exception:
FunctionalEvalRunner = None
CompositePublisher = None
MlflowPublisher = None
RealtimeMetricsCallback = None
TensorboardPublisher = None
WandbPublisher = None
# Configuração (padrão = linha Qwen 2.5; ver hf_registry.json)
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-1.5B-Instruct")
DATASET_REPO = os.getenv("DATASET_REPO", "beAnalytic/eda-training-dataset")
OUTPUT_REPO = os.getenv("OUTPUT_REPO", "beAnalytic/eda-llm-qwen2.5-lora")
SPACE_BUILD_MARKER = "space-train-build-20260327-01"
# T4 ~15GB: padroes conservadores; logits.float() na loss pesa com seq longa
# Aliases: MAX_SEQ_LEN, PER_DEVICE_BATCH, GRAD_ACCUM (UI do Space)
MAX_SEQ_LENGTH = min(
4096,
max(128, parse_env_int("MAX_SEQ_LENGTH", "MAX_SEQ_LEN", default=256, minimum=128)),
)
PER_DEVICE_TRAIN_BATCH_SIZE = parse_env_int(
"PER_DEVICE_TRAIN_BATCH_SIZE", "PER_DEVICE_BATCH", default=1
)
PER_DEVICE_EVAL_BATCH_SIZE = parse_env_int(
"PER_DEVICE_EVAL_BATCH_SIZE", "PER_DEVICE_BATCH", default=1
)
GRADIENT_ACCUMULATION_STEPS = parse_env_int(
"GRADIENT_ACCUMULATION_STEPS", "GRAD_ACCUM", default=8
)
LORA_R = parse_env_int("LORA_R", default=16, minimum=1)
LORA_ALPHA = parse_env_int("LORA_ALPHA", default=32, minimum=1)
def _normalize_hf_token(raw: str) -> str:
t = raw.strip()
if len(t) >= 2 and ((t[0] == t[-1] == '"') or (t[0] == t[-1] == "'")):
t = t[1:-1].strip()
return t.replace("\n", "").replace("\r", "")
def _hub_login_from_env() -> None:
raw = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") or ""
token = _normalize_hf_token(raw)
if not token:
print(
"AVISO: HF_TOKEN nao definido. Datasets/modelos privados e push_to_hub podem falhar. "
"No Hugging Face Space: Settings > Repository secrets > HF_TOKEN (token com Write)."
)
return
from huggingface_hub import login
try:
login(token=token, add_to_git_credential=False)
except Exception as exc:
print(
"ERRO: o Hub rejeitou o HF_TOKEN (401 / token invalido).\n"
" 1) Em https://huggingface.co/settings/tokens cria um token novo com permissao Write.\n"
" 2) No Space: Settings > Repository secrets > edita HF_TOKEN (sem aspas, sem espacos, uma linha).\n"
" 3) Factory reboot do Space apos guardar.\n"
f" Detalhe: {exc}",
file=sys.stderr,
)
raise SystemExit(1) from exc
_hub_login_from_env()
# Carregar dataset
print(f"Build marker: {SPACE_BUILD_MARKER}")
print(f"Carregando dataset: {DATASET_REPO}")
DATASET_TRAIN_FILE = (os.environ.get("DATASET_TRAIN_FILE") or "train.jsonl").strip()
DATASET_VALIDATION_FILE = (os.environ.get("DATASET_VALIDATION_FILE") or "validation.jsonl").strip()
DATASET_TEST_FILE = (os.environ.get("DATASET_TEST_FILE") or "test.jsonl").strip()
DATASET_FORCE_CANONICAL_SPLITS = (
(os.environ.get("DATASET_FORCE_CANONICAL_SPLITS") or "true").strip().lower()
in ("1", "true", "yes")
)
if DATASET_FORCE_CANONICAL_SPLITS:
try:
data_files = {"train": DATASET_TRAIN_FILE}
if DATASET_VALIDATION_FILE:
data_files["validation"] = DATASET_VALIDATION_FILE
if DATASET_TEST_FILE:
data_files["test"] = DATASET_TEST_FILE
from huggingface_hub import hf_hub_download
def _coerce_input_text(value: object) -> str:
if isinstance(value, str):
return value
if isinstance(value, dict):
user_prompt = value.get("user_prompt") or value.get("prompt") or value.get("input")
data_context = value.get("data_context")
if isinstance(user_prompt, str) and user_prompt.strip():
if data_context in (None, "", {}, []):
return user_prompt.strip()
return (
f"{user_prompt.strip()}\n\nContexto tabular:\n"
f"{json.dumps(data_context, ensure_ascii=False)}"
)
return json.dumps(value, ensure_ascii=False)
if value is None:
return ""
return str(value)
def _coerce_output_text(value: object) -> str:
if isinstance(value, str):
return value
if isinstance(value, dict):
answer = value.get("answer") or value.get("output") or value.get("text")
if isinstance(answer, str) and answer.strip():
return answer.strip()
return json.dumps(value, ensure_ascii=False)
if value is None:
return ""
return str(value)
def _normalize_split_file(src_path: Path, dst_path: Path) -> int:
total = 0
dst_path.parent.mkdir(parents=True, exist_ok=True)
with src_path.open(encoding="utf-8") as src, dst_path.open("w", encoding="utf-8") as dst:
for line_num, line in enumerate(src, 1):
line = line.strip()
if not line:
continue
obj = json.loads(line)
input_text = _coerce_input_text(obj.get("input"))
output_text = _coerce_output_text(obj.get("output"))
if not input_text.strip() or not output_text.strip():
raise ValueError(
f"{src_path.name}:{line_num} sem conteudo textual em 'input'/'output'"
)
dst.write(
json.dumps({"input": input_text, "output": output_text}, ensure_ascii=False)
+ "\n"
)
total += 1
return total
workdir = Path(tempfile.mkdtemp(prefix="eda_ds_"))
local_data_files: dict[str, str] = {}
split_sizes: dict[str, int] = {}
for split_name, file_name in data_files.items():
downloaded = Path(
hf_hub_download(
repo_id=DATASET_REPO,
filename=file_name,
repo_type="dataset",
)
)
normalized = workdir / f"{split_name}.jsonl"
split_sizes[split_name] = _normalize_split_file(downloaded, normalized)
local_data_files[split_name] = str(normalized)
print(f"Tentando carregar splits canônicos: {data_files}")
print(f"Splits normalizados localmente: {split_sizes}")
dataset = load_dataset("json", data_files=local_data_files)
except Exception as exc:
print(
"ERRO: falha ao carregar splits canônicos obrigatórios por data_files.\n"
"O treino foi interrompido para evitar fallback silencioso para dataset incorreto.\n"
f"Detalhe: {exc}",
file=sys.stderr,
)
raise SystemExit(1) from exc
else:
dataset = load_dataset(DATASET_REPO)
if "test" in dataset:
print(
f"INFO: split 'test' presente ({len(dataset['test'])} exemplos) — reservado para benchmark "
"offline; nao entra no Trainer (train/validation apenas)."
)
MIN_TRAIN_SAMPLES = parse_env_int("MIN_TRAIN_SAMPLES", default=100, minimum=1)
ALLOW_TINY_DATASET = (os.environ.get("ALLOW_TINY_DATASET") or "").strip().lower() in (
"1",
"true",
"yes",
)
train_size = len(dataset["train"]) if "train" in dataset else 0
validation_size = len(dataset["validation"]) if "validation" in dataset else 0
test_size = len(dataset["test"]) if "test" in dataset else 0
print(
f"Dataset sizes: train={train_size}, validation={validation_size}, test={test_size}. "
f"MIN_TRAIN_SAMPLES={MIN_TRAIN_SAMPLES}"
)
if train_size < MIN_TRAIN_SAMPLES and not ALLOW_TINY_DATASET:
print(
"ERRO: dataset pequeno demais para treino util.\n"
f" train={train_size} < MIN_TRAIN_SAMPLES={MIN_TRAIN_SAMPLES}\n"
" Ajuste o dataset no Hub (recomendado >= 300 para teste serio; >= 1000 para evolucao real).\n"
" Se for apenas smoke-test, rode com ALLOW_TINY_DATASET=true.",
file=sys.stderr,
)
raise SystemExit(1)
# Configurar variáveis de ambiente para evitar problemas de memória
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")
# Carregar modelo e tokenizer
print(f"Carregando modelo: {MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
# Verificar se há GPU disponível
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
# Carregar modelo sem quantização (LoRA é suficiente para reduzir memória)
# device_map="auto" numa so GPU pode fazer offload/meta device; com uma GPU usamos {"": 0}
print("Carregando modelo (sem quantização, usando LoRA para eficiência)...")
_dt = torch.float16 if device == "cuda" else torch.float32
_device_map = None
if device == "cuda":
_device_map = "auto" if torch.cuda.device_count() > 1 else {"": 0}
try:
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
dtype=_dt,
device_map=_device_map,
trust_remote_code=True,
use_cache=False,
low_cpu_mem_usage=True,
)
except TypeError:
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
torch_dtype=_dt,
device_map=_device_map,
trust_remote_code=True,
use_cache=False,
low_cpu_mem_usage=True,
)
if device == "cuda":
torch.cuda.empty_cache()
if device == "cpu":
print("⚠️ Modelo carregado em CPU - treinamento será mais lento")
# Configurar LoRA (LORA_R / LORA_ALPHA opcionais no Space)
peft_config = LoraConfig(
r=LORA_R,
lora_alpha=LORA_ALPHA,
target_modules=['q_proj', 'v_proj', 'k_proj', 'o_proj'],
lora_dropout=0.1,
bias="none",
task_type="CAUSAL_LM",
)
model = get_peft_model(model, peft_config)
if device == "cuda":
model.gradient_checkpointing_enable()
model.enable_input_require_grads()
# Formatar prompts
EDA_SYSTEM_PROMPT = (
"Você é um analista de dados experiente, focado em gerar INSIGHTS e não em descrever processos técnicos.\n\n"
"Sua tarefa é realizar uma Análise Exploratória de Dados (EDA) extraindo padrões, tendências e comportamentos relevantes dos dados.\n\n"
"REGRAS OBRIGATÓRIAS:\n\n"
"1. NÃO descreva etapas técnicas, bibliotecas, código ou ferramentas (pandas, Python, gráficos, etc.).\n"
"2. NÃO explique \"como fazer\" a análise.\n"
"3. Extraia padrões, tendências e comportamentos relevantes dos dados.\n"
"4. Diferencie claramente:\n"
" • Observação (o que é visível nos dados)\n"
" • Interpretação (o que isso pode significar)\n"
" • Insight (qual a implicação prática ou de negócio)\n"
"5. Declare explicitamente o nível de confiança de cada insight (alto / médio / baixo).\n"
"6. Quando não houver dados suficientes, diga claramente \"não é possível afirmar\".\n\n"
"FORMATO OBRIGATÓRIO DA RESPOSTA:\n\n"
"Observações:\n"
"- …\n\n"
"Interpretações:\n"
"- …\n\n"
"Insights:\n"
"- …\n\n"
"Nível de confiança:\n"
"- …\n\n"
"OBJETIVO: Entregar conclusões úteis, claras e acionáveis, como um analista humano experiente faria."
)
def format_prompt(example):
existing = (example.get("text") or "").strip()
if existing:
return {"text": existing}
input_text = example.get("input", "")
output_text = example.get("output", "")
if getattr(tokenizer, "chat_template", None):
messages = [
{"role": "system", "content": EDA_SYSTEM_PROMPT},
{"role": "user", "content": input_text},
{"role": "assistant", "content": output_text},
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=False,
)
return {"text": text}
legacy = (
f"<|system|>\n{EDA_SYSTEM_PROMPT}\n<|user|>\n{input_text}\n<|assistant|>\n{output_text}"
)
return {"text": legacy}
# Aplicar formatação
train_dataset = dataset["train"].map(
format_prompt,
remove_columns=dataset["train"].column_names,
num_proc=1,
)
eval_dataset = None
# Verificar se existe dataset de validação, caso contrário criar a partir do train
if "validation" in dataset:
if len(dataset["validation"]) == 0:
print("AVISO: split 'validation' existe, mas esta vazio. Treino seguira sem avaliacao.")
else:
eval_dataset = dataset["validation"].map(
format_prompt,
remove_columns=dataset["validation"].column_names,
num_proc=1,
)
else:
n_train_raw = len(dataset["train"])
if n_train_raw < 2:
print(
"AVISO: Dataset de validacao ausente e split 'train' com menos de 2 exemplos. "
"Treino seguira sem avaliacao (nao e possivel fazer split 80/20)."
)
else:
print(
"AVISO: Dataset de validacao nao encontrado no Hub. "
"Criando divisao aleatoria 80/20 a partir de 'train' (preferir split 'validation' "
"e/ou split estratificado por familia via scripts/eda_ft_split_stratified.py)."
)
split_dataset = dataset["train"].train_test_split(test_size=0.2, seed=42)
train_dataset = split_dataset["train"].map(
format_prompt,
remove_columns=split_dataset["train"].column_names,
num_proc=1,
)
eval_dataset = split_dataset["test"].map(
format_prompt,
remove_columns=split_dataset["test"].column_names,
num_proc=1,
)
print(
f"✅ Dataset dividido: {len(train_dataset)} exemplos de treino, "
f"{len(eval_dataset)} exemplos de validação"
)
# Tokenizar
def tokenize_function(examples):
return tokenizer(
examples["text"],
truncation=True,
max_length=MAX_SEQ_LENGTH,
padding="max_length",
)
train_dataset = train_dataset.map(
tokenize_function,
batched=True,
remove_columns=["text"],
num_proc=1,
)
if eval_dataset is not None and len(eval_dataset) > 0:
eval_dataset = eval_dataset.map(
tokenize_function,
batched=True,
remove_columns=["text"],
num_proc=1,
)
else:
eval_dataset = None
# Criar diretório de logs
logs_dir = Path("./logs")
logs_dir.mkdir(exist_ok=True)
# TensorBoard: mesma raiz que checkpoints; run_name distingue execucoes no mesmo diretorio
_output_dir = os.environ.get("TRAINING_OUTPUT_DIR", "./results")
_logging_dir = os.environ.get("TENSORBOARD_LOGDIR") or os.environ.get("LOGGING_DIR") or _output_dir
_run_name = os.environ.get("TRAINING_RUN_NAME") or f"eda-lora-{datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}"
os.environ["TENSORBOARD_LOGGING_DIR"] = str(Path(_logging_dir).resolve())
print(
f"Memoria VRAM: max_seq_length={MAX_SEQ_LENGTH}, train_bs={PER_DEVICE_TRAIN_BATCH_SIZE}, "
f"eval_bs={PER_DEVICE_EVAL_BATCH_SIZE}, grad_accum={GRADIENT_ACCUMULATION_STEPS}, "
f"lora_r={LORA_R}, lora_alpha={LORA_ALPHA}, gradient_checkpointing=True"
)
if (os.environ.get("GPU_MAX_MEMORY") or "").strip():
print(
"AVISO: GPU_MAX_MEMORY nao e aplicado por este train.py; "
"ajusta batch/seq/grad_accum ou o hardware no Space."
)
if device == "cuda":
print(
"Se o treino falhar com OOM e o erro do CUDA mostrar varios processos na mesma GPU, "
"faz Factory reboot do Space (reinicios sem libertar VRAM podem acumular processos)."
)
_report_to = build_report_backends()
_has_eval = eval_dataset is not None and len(eval_dataset) > 0
# Configurar argumentos de treinamento
training_args = TrainingArguments(
output_dir=_output_dir,
run_name=_run_name,
num_train_epochs=300,
per_device_train_batch_size=PER_DEVICE_TRAIN_BATCH_SIZE,
per_device_eval_batch_size=PER_DEVICE_EVAL_BATCH_SIZE,
learning_rate=3e-05,
warmup_steps=100,
logging_steps=10,
logging_strategy="steps",
logging_first_step=True,
save_steps=500,
eval_strategy="steps" if _has_eval else "no",
eval_steps=500 if _has_eval else None,
save_total_limit=3,
load_best_model_at_end=_has_eval,
metric_for_best_model="eval_loss" if _has_eval else None,
greater_is_better=False if _has_eval else None,
fp16=device == "cuda",
gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
gradient_checkpointing=True,
dataloader_pin_memory=False,
dataloader_num_workers=0,
push_to_hub=True,
hub_model_id=OUTPUT_REPO,
hub_strategy="checkpoint",
report_to=_report_to,
)
print(
f"TensorBoard: logs em {Path(_logging_dir).resolve()} (run_name={_run_name}). "
"Apos o treino, analise com: python scripts/launch_tensorboard.py "
f"--logdir {Path(_logging_dir).as_posix()}"
)
if "wandb" in _report_to:
print(
f"Weights & Biases: project={os.environ.get('WANDB_PROJECT')} "
f"entity={os.environ.get('WANDB_ENTITY')} run_name={_run_name}. "
"Credenciais: WANDB_API_KEY (ou WANDB_MODE=offline para so local)."
)
if not _has_eval:
print(
"AVISO: sem dataset de validacao. "
"eval_strategy=no e load_best_model_at_end=False para evitar falha em datasets pequenos."
)
_ds_ver = (os.environ.get("DATASET_VERSION") or "").strip()
if _ds_ver:
_prev_tags = (os.environ.get("WANDB_TAGS") or "").strip()
_tag = f"dataset_version:{_ds_ver}"
os.environ["WANDB_TAGS"] = f"{_prev_tags},{_tag}" if _prev_tags else _tag
class WandbDatasetEnvCallback(TrainerCallback):
"""Envia DATASET_* ao wandb.config quando o run ja existir."""
def on_train_begin(self, args, state, control, **kwargs):
try:
import wandb
if wandb.run is None:
return
extra: dict[str, str] = {}
for key in ("DATASET_VERSION", "DATASET_MANIFEST_HASH", "DATASET_MANIFEST_PATH"):
v = (os.environ.get(key) or "").strip()
if v:
extra[key.lower()] = v
if extra:
wandb.config.update(extra, allow_val_change=True)
except Exception:
return
# Data collator
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False,
)
# Trainer
callbacks: list[TrainerCallback] = [WandbDatasetEnvCallback()]
if RealtimeMetricsCallback is not None and CompositePublisher is not None:
publishers = []
if WandbPublisher is not None:
publishers.append(WandbPublisher())
if MlflowPublisher is not None and (os.environ.get("MLFLOW_TRACKING_URI") or "").strip():
publishers.append(MlflowPublisher())
if TensorboardPublisher is not None and (os.environ.get("ENABLE_TENSORBOARD_PUBLISHER") or "").strip().lower() in (
"1",
"true",
"yes",
):
publishers.append(TensorboardPublisher())
if publishers:
composite = CompositePublisher(publishers)
composite.log_params(
{
"base_model": MODEL_NAME,
"dataset_repo": DATASET_REPO,
"output_repo": OUTPUT_REPO,
"lora_r": LORA_R,
"lora_alpha": LORA_ALPHA,
"lora_dropout": 0.1,
"learning_rate": training_args.learning_rate,
"batch_size": PER_DEVICE_TRAIN_BATCH_SIZE,
"gradient_accumulation_steps": GRADIENT_ACCUMULATION_STEPS,
}
)
dataset_version_tag = (os.environ.get("DATASET_VERSION") or "").strip()
git_commit_tag = (os.environ.get("GIT_COMMIT") or "").strip()
tags = {}
if dataset_version_tag:
tags["dataset_version"] = dataset_version_tag
if git_commit_tag:
tags["git_commit"] = git_commit_tag
if tags:
composite.log_tags(tags)
functional_runner = None
benchmark_path = (
os.environ.get("FUNCTIONAL_BENCHMARK_PATH")
or "/app/benchmarks/gold_sample.json"
)
benchmark_cases = parse_env_int("FUNCTIONAL_EVAL_MAX_CASES", default=2, minimum=1)
if FunctionalEvalRunner is not None and Path(benchmark_path).is_file():
def _predict_for_functional_eval(user_prompt: str, data_context: dict) -> str:
if getattr(tokenizer, "chat_template", None):
messages = [
{"role": "system", "content": EDA_SYSTEM_PROMPT},
{"role": "user", "content": f"{user_prompt}\n\nContexto: {json.dumps(data_context, ensure_ascii=False)}"},
]
prompt = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
)
else:
prompt = (
f"<|system|>\n{EDA_SYSTEM_PROMPT}\n<|user|>\n"
f"{user_prompt}\n\nContexto: {json.dumps(data_context, ensure_ascii=False)}\n"
"<|assistant|>\n"
)
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_SEQ_LENGTH)
if torch.cuda.is_available():
inputs = {k: v.to(model.device) for k, v in inputs.items()}
output = model.generate(
**inputs,
max_new_tokens=220,
do_sample=False,
temperature=0.0,
)
return tokenizer.decode(output[0], skip_special_tokens=True)
functional_runner = FunctionalEvalRunner(
benchmark_path=str(benchmark_path),
predict_fn=_predict_for_functional_eval,
max_cases=benchmark_cases,
)
else:
print(
f"AVISO: benchmark funcional não encontrado em {benchmark_path}; "
"métricas eval/hallucination|grounding|limitation|actionability não serão publicadas."
)
callbacks.append(RealtimeMetricsCallback(composite, functional_eval_runner=functional_runner))
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
data_collator=data_collator,
callbacks=callbacks,
)
# Treinar
print("Iniciando treinamento...")
try:
train_output = trainer.train()
except Exception as e:
print(f"❌ Erro durante treinamento: {e}")
# Tentar salvar resultados mesmo em caso de erro
train_output = None
# Coletar estado atual se possível
try:
state = trainer.state
final_log_history = state.log_history if hasattr(state, 'log_history') and state.log_history else []
except:
final_log_history = []
# Salvar log de erro
error_info = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"error": str(e),
"model_name": MODEL_NAME,
"dataset_repo": DATASET_REPO,
"status": "failed"
}
error_file = logs_dir / f"training_error_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
with open(error_file, 'w', encoding='utf-8') as f:
json.dump(error_info, f, indent=2, ensure_ascii=False)
print(f"✅ Informações de erro salvas em: {error_file}")
raise
# Coletar métricas finais do estado do trainer
state = trainer.state
final_log_history = state.log_history if hasattr(state, 'log_history') and state.log_history else []
# Tentar obter loss final de diferentes fontes
final_train_loss = None
if train_output and hasattr(train_output, 'training_loss'):
final_train_loss = train_output.training_loss
elif final_log_history:
for log_entry in reversed(final_log_history):
if 'loss' in log_entry and 'eval_loss' not in log_entry:
final_train_loss = log_entry.get('loss')
break
# Buscar últimas métricas de validação
last_eval_metrics = {}
if final_log_history:
for log_entry in reversed(final_log_history):
if 'eval_loss' in log_entry:
last_eval_metrics = {k: v for k, v in log_entry.items() if k.startswith('eval_')}
break
# Coletar informações do treinamento
training_info = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"model_name": MODEL_NAME,
"dataset_repo": DATASET_REPO,
"output_repo": OUTPUT_REPO,
"training_config": {
"num_train_epochs": training_args.num_train_epochs,
"per_device_train_batch_size": training_args.per_device_train_batch_size,
"per_device_eval_batch_size": training_args.per_device_eval_batch_size,
"gradient_accumulation_steps": training_args.gradient_accumulation_steps,
"learning_rate": training_args.learning_rate,
"warmup_steps": training_args.warmup_steps,
"fp16": training_args.fp16,
"max_seq_length": MAX_SEQ_LENGTH,
"gradient_checkpointing": True,
"tensorboard_logging_dir": _logging_dir,
"tensorboard_run_name": _run_name,
"report_to": _report_to,
"wandb_project": os.environ.get("WANDB_PROJECT") if "wandb" in _report_to else None,
"wandb_entity": os.environ.get("WANDB_ENTITY") if "wandb" in _report_to else None,
},
"dataset_info": {
"train_samples": len(train_dataset),
"eval_samples": len(eval_dataset) if eval_dataset else 0,
"dataset_version": os.environ.get("DATASET_VERSION"),
"dataset_manifest_hash": os.environ.get("DATASET_MANIFEST_HASH"),
"dataset_manifest_path": os.environ.get("DATASET_MANIFEST_PATH"),
},
"training_results": {
"final_train_loss": final_train_loss,
"final_eval_metrics": last_eval_metrics,
"total_steps": len(final_log_history) if final_log_history else 0,
"log_history": final_log_history[-50:],
},
"status": "completed",
}
# Salvar resultados em JSON
results_file = logs_dir / f"training_results_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(training_info, f, indent=2, ensure_ascii=False)
print(f"✅ Resultados salvos em: {results_file}")
# Criar resumo em texto legível
summary_file = logs_dir / f"training_summary_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.txt"
with open(summary_file, 'w', encoding='utf-8') as f:
f.write("=" * 80 + "\n")
f.write("RESUMO DO TREINAMENTO\n")
f.write("=" * 80 + "\n\n")
f.write(f"Data/Hora: {training_info['timestamp']}\n")
f.write(f"Modelo: {MODEL_NAME}\n")
f.write(f"Dataset: {DATASET_REPO}\n")
f.write(f"Output: {OUTPUT_REPO}\n\n")
f.write("CONFIGURAÇÃO DE TREINAMENTO:\n")
f.write("-" * 80 + "\n")
config = training_info['training_config']
f.write(f"Épocas: {config['num_train_epochs']}\n")
f.write(f"Batch Size (train): {config['per_device_train_batch_size']}\n")
f.write(f"Batch Size (eval): {config['per_device_eval_batch_size']}\n")
f.write(f"Gradient Accumulation Steps: {config['gradient_accumulation_steps']}\n")
f.write(f"Learning Rate: {config['learning_rate']}\n")
f.write(f"Warmup Steps: {config['warmup_steps']}\n")
f.write(f"FP16: {config['fp16']}\n\n")
f.write("DATASET:\n")
f.write("-" * 80 + "\n")
dataset_info = training_info['dataset_info']
f.write(f"Amostras de Treino: {dataset_info['train_samples']}\n")
f.write(f"Amostras de Validação: {dataset_info['eval_samples']}\n\n")
f.write("RESULTADOS:\n")
f.write("-" * 80 + "\n")
results = training_info['training_results']
if results['final_train_loss'] is not None:
f.write(f"Loss Final (Treino): {results['final_train_loss']:.6f}\n")
if results['final_eval_metrics']:
f.write("\nMétricas Finais de Validação:\n")
for key, value in results['final_eval_metrics'].items():
if isinstance(value, float):
f.write(f" {key}: {value:.6f}\n")
else:
f.write(f" {key}: {value}\n")
f.write(f"\nTotal de Steps: {results['total_steps']}\n")
f.write(f"Status: {training_info['status']}\n")
print(f"✅ Resumo salvo em: {summary_file}")
if RealtimeMetricsCallback is not None and CompositePublisher is not None:
try:
for cb in callbacks:
publisher = getattr(cb, "_publisher", None)
if publisher is not None:
publisher.log_artifact(str(results_file), "run-results")
publisher.log_artifact(str(summary_file), "run-summary")
break
except Exception as _artifact_exc:
print(f"AVISO: falha ao registrar artifacts de observabilidade: {_artifact_exc}")
# Fazer push final
print(f"Fazendo push do modelo final para {OUTPUT_REPO}")
try:
trainer.push_to_hub()
print("✅ Push para Hub concluído!")
except Exception as e:
print(f"⚠️ Aviso: Erro ao fazer push para Hub: {e}")
print("Os checkpoints estão salvos localmente em ./results")
print("✅ Treinamento concluído!")
def analyze_schema(csv_description: str, model_path: str = None):
"""
Função de inferência - modelo já 'obrigado' a pensar certo.
Args:
csv_description: Descrição do dataset CSV para análise
model_path: Caminho para o modelo treinado (opcional, usa modelo atual se None)
Returns:
Análise EDA gerada pelo modelo treinado
"""
# Se model_path for fornecido, carregar modelo treinado
inference_model = model
inference_tokenizer = tokenizer
if model_path:
print(f"Carregando modelo treinado de: {model_path}")
inference_tokenizer = AutoTokenizer.from_pretrained(model_path)
inference_model = AutoModelForCausalLM.from_pretrained(
model_path,
device_map="auto",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
user_block = f"Analise o seguinte dataset:\n{csv_description}"
if getattr(inference_tokenizer, "chat_template", None):
messages = [
{"role": "system", "content": EDA_SYSTEM_PROMPT},
{"role": "user", "content": user_block},
]
prompt = inference_tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
)
else:
prompt = f"""<|system|>
{EDA_SYSTEM_PROMPT}
<|user|>
{user_block}
<|assistant|>
"""
inputs = inference_tokenizer(prompt, return_tensors="pt").to(inference_model.device)
output = inference_model.generate(
**inputs,
max_new_tokens=1200,
temperature=0.2,
do_sample=False
)
return inference_tokenizer.decode(output[0], skip_special_tokens=True)
# Exemplo de uso após o treinamento:
# resultado = analyze_schema("Descrição do seu dataset aqui...")
# print(resultado)