#!/usr/bin/env python3 """ Script de treinamento gerado para HuggingFace Training Platform. Execute este script no HuggingFace Training ou localmente. """ import os import sys def _sanitize_thread_env() -> None: """Evita libgomp: Invalid value quando OMP_NUM_THREADS vem vazio ou invalido (ex.: Space/parent).""" raw = (os.environ.get("OMP_NUM_THREADS") or "").strip() if not raw.isdigit() or int(raw) < 1: os.environ["OMP_NUM_THREADS"] = "1" _sanitize_thread_env() os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") import json import tempfile from datetime import datetime from pathlib import Path import torch from datasets import load_dataset from peft import LoraConfig, get_peft_model from reporting import build_report_backends from training_env import parse_env_int from transformers import ( AutoModelForCausalLM, AutoTokenizer, DataCollatorForLanguageModeling, Trainer, TrainerCallback, TrainingArguments, ) try: from bi_backend.application.training.evaluators import FunctionalEvalRunner from bi_backend.application.training.observability import ( CompositePublisher, MlflowPublisher, RealtimeMetricsCallback, TensorboardPublisher, WandbPublisher, ) except Exception: FunctionalEvalRunner = None CompositePublisher = None MlflowPublisher = None RealtimeMetricsCallback = None TensorboardPublisher = None WandbPublisher = None # Configuração (padrão = linha Qwen 2.5; ver hf_registry.json) MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-1.5B-Instruct") DATASET_REPO = os.getenv("DATASET_REPO", "beAnalytic/eda-training-dataset") OUTPUT_REPO = os.getenv("OUTPUT_REPO", "beAnalytic/eda-llm-qwen2.5-lora") SPACE_BUILD_MARKER = "space-train-build-20260327-01" # T4 ~15GB: padroes conservadores; logits.float() na loss pesa com seq longa # Aliases: MAX_SEQ_LEN, PER_DEVICE_BATCH, GRAD_ACCUM (UI do Space) MAX_SEQ_LENGTH = min( 4096, max(128, parse_env_int("MAX_SEQ_LENGTH", "MAX_SEQ_LEN", default=256, minimum=128)), ) PER_DEVICE_TRAIN_BATCH_SIZE = parse_env_int( "PER_DEVICE_TRAIN_BATCH_SIZE", "PER_DEVICE_BATCH", default=1 ) PER_DEVICE_EVAL_BATCH_SIZE = parse_env_int( "PER_DEVICE_EVAL_BATCH_SIZE", "PER_DEVICE_BATCH", default=1 ) GRADIENT_ACCUMULATION_STEPS = parse_env_int( "GRADIENT_ACCUMULATION_STEPS", "GRAD_ACCUM", default=8 ) LORA_R = parse_env_int("LORA_R", default=16, minimum=1) LORA_ALPHA = parse_env_int("LORA_ALPHA", default=32, minimum=1) def _normalize_hf_token(raw: str) -> str: t = raw.strip() if len(t) >= 2 and ((t[0] == t[-1] == '"') or (t[0] == t[-1] == "'")): t = t[1:-1].strip() return t.replace("\n", "").replace("\r", "") def _hub_login_from_env() -> None: raw = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") or "" token = _normalize_hf_token(raw) if not token: print( "AVISO: HF_TOKEN nao definido. Datasets/modelos privados e push_to_hub podem falhar. " "No Hugging Face Space: Settings > Repository secrets > HF_TOKEN (token com Write)." ) return from huggingface_hub import login try: login(token=token, add_to_git_credential=False) except Exception as exc: print( "ERRO: o Hub rejeitou o HF_TOKEN (401 / token invalido).\n" " 1) Em https://huggingface.co/settings/tokens cria um token novo com permissao Write.\n" " 2) No Space: Settings > Repository secrets > edita HF_TOKEN (sem aspas, sem espacos, uma linha).\n" " 3) Factory reboot do Space apos guardar.\n" f" Detalhe: {exc}", file=sys.stderr, ) raise SystemExit(1) from exc _hub_login_from_env() # Carregar dataset print(f"Build marker: {SPACE_BUILD_MARKER}") print(f"Carregando dataset: {DATASET_REPO}") DATASET_TRAIN_FILE = (os.environ.get("DATASET_TRAIN_FILE") or "train.jsonl").strip() DATASET_VALIDATION_FILE = (os.environ.get("DATASET_VALIDATION_FILE") or "validation.jsonl").strip() DATASET_TEST_FILE = (os.environ.get("DATASET_TEST_FILE") or "test.jsonl").strip() DATASET_FORCE_CANONICAL_SPLITS = ( (os.environ.get("DATASET_FORCE_CANONICAL_SPLITS") or "true").strip().lower() in ("1", "true", "yes") ) if DATASET_FORCE_CANONICAL_SPLITS: try: data_files = {"train": DATASET_TRAIN_FILE} if DATASET_VALIDATION_FILE: data_files["validation"] = DATASET_VALIDATION_FILE if DATASET_TEST_FILE: data_files["test"] = DATASET_TEST_FILE from huggingface_hub import hf_hub_download def _coerce_input_text(value: object) -> str: if isinstance(value, str): return value if isinstance(value, dict): user_prompt = value.get("user_prompt") or value.get("prompt") or value.get("input") data_context = value.get("data_context") if isinstance(user_prompt, str) and user_prompt.strip(): if data_context in (None, "", {}, []): return user_prompt.strip() return ( f"{user_prompt.strip()}\n\nContexto tabular:\n" f"{json.dumps(data_context, ensure_ascii=False)}" ) return json.dumps(value, ensure_ascii=False) if value is None: return "" return str(value) def _coerce_output_text(value: object) -> str: if isinstance(value, str): return value if isinstance(value, dict): answer = value.get("answer") or value.get("output") or value.get("text") if isinstance(answer, str) and answer.strip(): return answer.strip() return json.dumps(value, ensure_ascii=False) if value is None: return "" return str(value) def _normalize_split_file(src_path: Path, dst_path: Path) -> int: total = 0 dst_path.parent.mkdir(parents=True, exist_ok=True) with src_path.open(encoding="utf-8") as src, dst_path.open("w", encoding="utf-8") as dst: for line_num, line in enumerate(src, 1): line = line.strip() if not line: continue obj = json.loads(line) input_text = _coerce_input_text(obj.get("input")) output_text = _coerce_output_text(obj.get("output")) if not input_text.strip() or not output_text.strip(): raise ValueError( f"{src_path.name}:{line_num} sem conteudo textual em 'input'/'output'" ) dst.write( json.dumps({"input": input_text, "output": output_text}, ensure_ascii=False) + "\n" ) total += 1 return total workdir = Path(tempfile.mkdtemp(prefix="eda_ds_")) local_data_files: dict[str, str] = {} split_sizes: dict[str, int] = {} for split_name, file_name in data_files.items(): downloaded = Path( hf_hub_download( repo_id=DATASET_REPO, filename=file_name, repo_type="dataset", ) ) normalized = workdir / f"{split_name}.jsonl" split_sizes[split_name] = _normalize_split_file(downloaded, normalized) local_data_files[split_name] = str(normalized) print(f"Tentando carregar splits canônicos: {data_files}") print(f"Splits normalizados localmente: {split_sizes}") dataset = load_dataset("json", data_files=local_data_files) except Exception as exc: print( "ERRO: falha ao carregar splits canônicos obrigatórios por data_files.\n" "O treino foi interrompido para evitar fallback silencioso para dataset incorreto.\n" f"Detalhe: {exc}", file=sys.stderr, ) raise SystemExit(1) from exc else: dataset = load_dataset(DATASET_REPO) if "test" in dataset: print( f"INFO: split 'test' presente ({len(dataset['test'])} exemplos) — reservado para benchmark " "offline; nao entra no Trainer (train/validation apenas)." ) MIN_TRAIN_SAMPLES = parse_env_int("MIN_TRAIN_SAMPLES", default=100, minimum=1) ALLOW_TINY_DATASET = (os.environ.get("ALLOW_TINY_DATASET") or "").strip().lower() in ( "1", "true", "yes", ) train_size = len(dataset["train"]) if "train" in dataset else 0 validation_size = len(dataset["validation"]) if "validation" in dataset else 0 test_size = len(dataset["test"]) if "test" in dataset else 0 print( f"Dataset sizes: train={train_size}, validation={validation_size}, test={test_size}. " f"MIN_TRAIN_SAMPLES={MIN_TRAIN_SAMPLES}" ) if train_size < MIN_TRAIN_SAMPLES and not ALLOW_TINY_DATASET: print( "ERRO: dataset pequeno demais para treino util.\n" f" train={train_size} < MIN_TRAIN_SAMPLES={MIN_TRAIN_SAMPLES}\n" " Ajuste o dataset no Hub (recomendado >= 300 para teste serio; >= 1000 para evolucao real).\n" " Se for apenas smoke-test, rode com ALLOW_TINY_DATASET=true.", file=sys.stderr, ) raise SystemExit(1) # Configurar variáveis de ambiente para evitar problemas de memória os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True") # Carregar modelo e tokenizer print(f"Carregando modelo: {MODEL_NAME}") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) tokenizer.pad_token = tokenizer.eos_token # Verificar se há GPU disponível device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Usando dispositivo: {device}") # Carregar modelo sem quantização (LoRA é suficiente para reduzir memória) # device_map="auto" numa so GPU pode fazer offload/meta device; com uma GPU usamos {"": 0} print("Carregando modelo (sem quantização, usando LoRA para eficiência)...") _dt = torch.float16 if device == "cuda" else torch.float32 _device_map = None if device == "cuda": _device_map = "auto" if torch.cuda.device_count() > 1 else {"": 0} try: model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, dtype=_dt, device_map=_device_map, trust_remote_code=True, use_cache=False, low_cpu_mem_usage=True, ) except TypeError: model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, torch_dtype=_dt, device_map=_device_map, trust_remote_code=True, use_cache=False, low_cpu_mem_usage=True, ) if device == "cuda": torch.cuda.empty_cache() if device == "cpu": print("⚠️ Modelo carregado em CPU - treinamento será mais lento") # Configurar LoRA (LORA_R / LORA_ALPHA opcionais no Space) peft_config = LoraConfig( r=LORA_R, lora_alpha=LORA_ALPHA, target_modules=['q_proj', 'v_proj', 'k_proj', 'o_proj'], lora_dropout=0.1, bias="none", task_type="CAUSAL_LM", ) model = get_peft_model(model, peft_config) if device == "cuda": model.gradient_checkpointing_enable() model.enable_input_require_grads() # Formatar prompts EDA_SYSTEM_PROMPT = ( "Você é um analista de dados experiente, focado em gerar INSIGHTS e não em descrever processos técnicos.\n\n" "Sua tarefa é realizar uma Análise Exploratória de Dados (EDA) extraindo padrões, tendências e comportamentos relevantes dos dados.\n\n" "REGRAS OBRIGATÓRIAS:\n\n" "1. NÃO descreva etapas técnicas, bibliotecas, código ou ferramentas (pandas, Python, gráficos, etc.).\n" "2. NÃO explique \"como fazer\" a análise.\n" "3. Extraia padrões, tendências e comportamentos relevantes dos dados.\n" "4. Diferencie claramente:\n" " • Observação (o que é visível nos dados)\n" " • Interpretação (o que isso pode significar)\n" " • Insight (qual a implicação prática ou de negócio)\n" "5. Declare explicitamente o nível de confiança de cada insight (alto / médio / baixo).\n" "6. Quando não houver dados suficientes, diga claramente \"não é possível afirmar\".\n\n" "FORMATO OBRIGATÓRIO DA RESPOSTA:\n\n" "Observações:\n" "- …\n\n" "Interpretações:\n" "- …\n\n" "Insights:\n" "- …\n\n" "Nível de confiança:\n" "- …\n\n" "OBJETIVO: Entregar conclusões úteis, claras e acionáveis, como um analista humano experiente faria." ) def format_prompt(example): existing = (example.get("text") or "").strip() if existing: return {"text": existing} input_text = example.get("input", "") output_text = example.get("output", "") if getattr(tokenizer, "chat_template", None): messages = [ {"role": "system", "content": EDA_SYSTEM_PROMPT}, {"role": "user", "content": input_text}, {"role": "assistant", "content": output_text}, ] text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=False, ) return {"text": text} legacy = ( f"<|system|>\n{EDA_SYSTEM_PROMPT}\n<|user|>\n{input_text}\n<|assistant|>\n{output_text}" ) return {"text": legacy} # Aplicar formatação train_dataset = dataset["train"].map( format_prompt, remove_columns=dataset["train"].column_names, num_proc=1, ) eval_dataset = None # Verificar se existe dataset de validação, caso contrário criar a partir do train if "validation" in dataset: if len(dataset["validation"]) == 0: print("AVISO: split 'validation' existe, mas esta vazio. Treino seguira sem avaliacao.") else: eval_dataset = dataset["validation"].map( format_prompt, remove_columns=dataset["validation"].column_names, num_proc=1, ) else: n_train_raw = len(dataset["train"]) if n_train_raw < 2: print( "AVISO: Dataset de validacao ausente e split 'train' com menos de 2 exemplos. " "Treino seguira sem avaliacao (nao e possivel fazer split 80/20)." ) else: print( "AVISO: Dataset de validacao nao encontrado no Hub. " "Criando divisao aleatoria 80/20 a partir de 'train' (preferir split 'validation' " "e/ou split estratificado por familia via scripts/eda_ft_split_stratified.py)." ) split_dataset = dataset["train"].train_test_split(test_size=0.2, seed=42) train_dataset = split_dataset["train"].map( format_prompt, remove_columns=split_dataset["train"].column_names, num_proc=1, ) eval_dataset = split_dataset["test"].map( format_prompt, remove_columns=split_dataset["test"].column_names, num_proc=1, ) print( f"✅ Dataset dividido: {len(train_dataset)} exemplos de treino, " f"{len(eval_dataset)} exemplos de validação" ) # Tokenizar def tokenize_function(examples): return tokenizer( examples["text"], truncation=True, max_length=MAX_SEQ_LENGTH, padding="max_length", ) train_dataset = train_dataset.map( tokenize_function, batched=True, remove_columns=["text"], num_proc=1, ) if eval_dataset is not None and len(eval_dataset) > 0: eval_dataset = eval_dataset.map( tokenize_function, batched=True, remove_columns=["text"], num_proc=1, ) else: eval_dataset = None # Criar diretório de logs logs_dir = Path("./logs") logs_dir.mkdir(exist_ok=True) # TensorBoard: mesma raiz que checkpoints; run_name distingue execucoes no mesmo diretorio _output_dir = os.environ.get("TRAINING_OUTPUT_DIR", "./results") _logging_dir = os.environ.get("TENSORBOARD_LOGDIR") or os.environ.get("LOGGING_DIR") or _output_dir _run_name = os.environ.get("TRAINING_RUN_NAME") or f"eda-lora-{datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}" os.environ["TENSORBOARD_LOGGING_DIR"] = str(Path(_logging_dir).resolve()) print( f"Memoria VRAM: max_seq_length={MAX_SEQ_LENGTH}, train_bs={PER_DEVICE_TRAIN_BATCH_SIZE}, " f"eval_bs={PER_DEVICE_EVAL_BATCH_SIZE}, grad_accum={GRADIENT_ACCUMULATION_STEPS}, " f"lora_r={LORA_R}, lora_alpha={LORA_ALPHA}, gradient_checkpointing=True" ) if (os.environ.get("GPU_MAX_MEMORY") or "").strip(): print( "AVISO: GPU_MAX_MEMORY nao e aplicado por este train.py; " "ajusta batch/seq/grad_accum ou o hardware no Space." ) if device == "cuda": print( "Se o treino falhar com OOM e o erro do CUDA mostrar varios processos na mesma GPU, " "faz Factory reboot do Space (reinicios sem libertar VRAM podem acumular processos)." ) _report_to = build_report_backends() _has_eval = eval_dataset is not None and len(eval_dataset) > 0 # Configurar argumentos de treinamento training_args = TrainingArguments( output_dir=_output_dir, run_name=_run_name, num_train_epochs=300, per_device_train_batch_size=PER_DEVICE_TRAIN_BATCH_SIZE, per_device_eval_batch_size=PER_DEVICE_EVAL_BATCH_SIZE, learning_rate=3e-05, warmup_steps=100, logging_steps=10, logging_strategy="steps", logging_first_step=True, save_steps=500, eval_strategy="steps" if _has_eval else "no", eval_steps=500 if _has_eval else None, save_total_limit=3, load_best_model_at_end=_has_eval, metric_for_best_model="eval_loss" if _has_eval else None, greater_is_better=False if _has_eval else None, fp16=device == "cuda", gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS, gradient_checkpointing=True, dataloader_pin_memory=False, dataloader_num_workers=0, push_to_hub=True, hub_model_id=OUTPUT_REPO, hub_strategy="checkpoint", report_to=_report_to, ) print( f"TensorBoard: logs em {Path(_logging_dir).resolve()} (run_name={_run_name}). " "Apos o treino, analise com: python scripts/launch_tensorboard.py " f"--logdir {Path(_logging_dir).as_posix()}" ) if "wandb" in _report_to: print( f"Weights & Biases: project={os.environ.get('WANDB_PROJECT')} " f"entity={os.environ.get('WANDB_ENTITY')} run_name={_run_name}. " "Credenciais: WANDB_API_KEY (ou WANDB_MODE=offline para so local)." ) if not _has_eval: print( "AVISO: sem dataset de validacao. " "eval_strategy=no e load_best_model_at_end=False para evitar falha em datasets pequenos." ) _ds_ver = (os.environ.get("DATASET_VERSION") or "").strip() if _ds_ver: _prev_tags = (os.environ.get("WANDB_TAGS") or "").strip() _tag = f"dataset_version:{_ds_ver}" os.environ["WANDB_TAGS"] = f"{_prev_tags},{_tag}" if _prev_tags else _tag class WandbDatasetEnvCallback(TrainerCallback): """Envia DATASET_* ao wandb.config quando o run ja existir.""" def on_train_begin(self, args, state, control, **kwargs): try: import wandb if wandb.run is None: return extra: dict[str, str] = {} for key in ("DATASET_VERSION", "DATASET_MANIFEST_HASH", "DATASET_MANIFEST_PATH"): v = (os.environ.get(key) or "").strip() if v: extra[key.lower()] = v if extra: wandb.config.update(extra, allow_val_change=True) except Exception: return # Data collator data_collator = DataCollatorForLanguageModeling( tokenizer=tokenizer, mlm=False, ) # Trainer callbacks: list[TrainerCallback] = [WandbDatasetEnvCallback()] if RealtimeMetricsCallback is not None and CompositePublisher is not None: publishers = [] if WandbPublisher is not None: publishers.append(WandbPublisher()) if MlflowPublisher is not None and (os.environ.get("MLFLOW_TRACKING_URI") or "").strip(): publishers.append(MlflowPublisher()) if TensorboardPublisher is not None and (os.environ.get("ENABLE_TENSORBOARD_PUBLISHER") or "").strip().lower() in ( "1", "true", "yes", ): publishers.append(TensorboardPublisher()) if publishers: composite = CompositePublisher(publishers) composite.log_params( { "base_model": MODEL_NAME, "dataset_repo": DATASET_REPO, "output_repo": OUTPUT_REPO, "lora_r": LORA_R, "lora_alpha": LORA_ALPHA, "lora_dropout": 0.1, "learning_rate": training_args.learning_rate, "batch_size": PER_DEVICE_TRAIN_BATCH_SIZE, "gradient_accumulation_steps": GRADIENT_ACCUMULATION_STEPS, } ) dataset_version_tag = (os.environ.get("DATASET_VERSION") or "").strip() git_commit_tag = (os.environ.get("GIT_COMMIT") or "").strip() tags = {} if dataset_version_tag: tags["dataset_version"] = dataset_version_tag if git_commit_tag: tags["git_commit"] = git_commit_tag if tags: composite.log_tags(tags) functional_runner = None benchmark_path = ( os.environ.get("FUNCTIONAL_BENCHMARK_PATH") or "/app/benchmarks/gold_sample.json" ) benchmark_cases = parse_env_int("FUNCTIONAL_EVAL_MAX_CASES", default=2, minimum=1) if FunctionalEvalRunner is not None and Path(benchmark_path).is_file(): def _predict_for_functional_eval(user_prompt: str, data_context: dict) -> str: if getattr(tokenizer, "chat_template", None): messages = [ {"role": "system", "content": EDA_SYSTEM_PROMPT}, {"role": "user", "content": f"{user_prompt}\n\nContexto: {json.dumps(data_context, ensure_ascii=False)}"}, ] prompt = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, ) else: prompt = ( f"<|system|>\n{EDA_SYSTEM_PROMPT}\n<|user|>\n" f"{user_prompt}\n\nContexto: {json.dumps(data_context, ensure_ascii=False)}\n" "<|assistant|>\n" ) inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_SEQ_LENGTH) if torch.cuda.is_available(): inputs = {k: v.to(model.device) for k, v in inputs.items()} output = model.generate( **inputs, max_new_tokens=220, do_sample=False, temperature=0.0, ) return tokenizer.decode(output[0], skip_special_tokens=True) functional_runner = FunctionalEvalRunner( benchmark_path=str(benchmark_path), predict_fn=_predict_for_functional_eval, max_cases=benchmark_cases, ) else: print( f"AVISO: benchmark funcional não encontrado em {benchmark_path}; " "métricas eval/hallucination|grounding|limitation|actionability não serão publicadas." ) callbacks.append(RealtimeMetricsCallback(composite, functional_eval_runner=functional_runner)) trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, data_collator=data_collator, callbacks=callbacks, ) # Treinar print("Iniciando treinamento...") try: train_output = trainer.train() except Exception as e: print(f"❌ Erro durante treinamento: {e}") # Tentar salvar resultados mesmo em caso de erro train_output = None # Coletar estado atual se possível try: state = trainer.state final_log_history = state.log_history if hasattr(state, 'log_history') and state.log_history else [] except: final_log_history = [] # Salvar log de erro error_info = { "timestamp": datetime.utcnow().isoformat() + "Z", "error": str(e), "model_name": MODEL_NAME, "dataset_repo": DATASET_REPO, "status": "failed" } error_file = logs_dir / f"training_error_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" with open(error_file, 'w', encoding='utf-8') as f: json.dump(error_info, f, indent=2, ensure_ascii=False) print(f"✅ Informações de erro salvas em: {error_file}") raise # Coletar métricas finais do estado do trainer state = trainer.state final_log_history = state.log_history if hasattr(state, 'log_history') and state.log_history else [] # Tentar obter loss final de diferentes fontes final_train_loss = None if train_output and hasattr(train_output, 'training_loss'): final_train_loss = train_output.training_loss elif final_log_history: for log_entry in reversed(final_log_history): if 'loss' in log_entry and 'eval_loss' not in log_entry: final_train_loss = log_entry.get('loss') break # Buscar últimas métricas de validação last_eval_metrics = {} if final_log_history: for log_entry in reversed(final_log_history): if 'eval_loss' in log_entry: last_eval_metrics = {k: v for k, v in log_entry.items() if k.startswith('eval_')} break # Coletar informações do treinamento training_info = { "timestamp": datetime.utcnow().isoformat() + "Z", "model_name": MODEL_NAME, "dataset_repo": DATASET_REPO, "output_repo": OUTPUT_REPO, "training_config": { "num_train_epochs": training_args.num_train_epochs, "per_device_train_batch_size": training_args.per_device_train_batch_size, "per_device_eval_batch_size": training_args.per_device_eval_batch_size, "gradient_accumulation_steps": training_args.gradient_accumulation_steps, "learning_rate": training_args.learning_rate, "warmup_steps": training_args.warmup_steps, "fp16": training_args.fp16, "max_seq_length": MAX_SEQ_LENGTH, "gradient_checkpointing": True, "tensorboard_logging_dir": _logging_dir, "tensorboard_run_name": _run_name, "report_to": _report_to, "wandb_project": os.environ.get("WANDB_PROJECT") if "wandb" in _report_to else None, "wandb_entity": os.environ.get("WANDB_ENTITY") if "wandb" in _report_to else None, }, "dataset_info": { "train_samples": len(train_dataset), "eval_samples": len(eval_dataset) if eval_dataset else 0, "dataset_version": os.environ.get("DATASET_VERSION"), "dataset_manifest_hash": os.environ.get("DATASET_MANIFEST_HASH"), "dataset_manifest_path": os.environ.get("DATASET_MANIFEST_PATH"), }, "training_results": { "final_train_loss": final_train_loss, "final_eval_metrics": last_eval_metrics, "total_steps": len(final_log_history) if final_log_history else 0, "log_history": final_log_history[-50:], }, "status": "completed", } # Salvar resultados em JSON results_file = logs_dir / f"training_results_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" with open(results_file, 'w', encoding='utf-8') as f: json.dump(training_info, f, indent=2, ensure_ascii=False) print(f"✅ Resultados salvos em: {results_file}") # Criar resumo em texto legível summary_file = logs_dir / f"training_summary_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.txt" with open(summary_file, 'w', encoding='utf-8') as f: f.write("=" * 80 + "\n") f.write("RESUMO DO TREINAMENTO\n") f.write("=" * 80 + "\n\n") f.write(f"Data/Hora: {training_info['timestamp']}\n") f.write(f"Modelo: {MODEL_NAME}\n") f.write(f"Dataset: {DATASET_REPO}\n") f.write(f"Output: {OUTPUT_REPO}\n\n") f.write("CONFIGURAÇÃO DE TREINAMENTO:\n") f.write("-" * 80 + "\n") config = training_info['training_config'] f.write(f"Épocas: {config['num_train_epochs']}\n") f.write(f"Batch Size (train): {config['per_device_train_batch_size']}\n") f.write(f"Batch Size (eval): {config['per_device_eval_batch_size']}\n") f.write(f"Gradient Accumulation Steps: {config['gradient_accumulation_steps']}\n") f.write(f"Learning Rate: {config['learning_rate']}\n") f.write(f"Warmup Steps: {config['warmup_steps']}\n") f.write(f"FP16: {config['fp16']}\n\n") f.write("DATASET:\n") f.write("-" * 80 + "\n") dataset_info = training_info['dataset_info'] f.write(f"Amostras de Treino: {dataset_info['train_samples']}\n") f.write(f"Amostras de Validação: {dataset_info['eval_samples']}\n\n") f.write("RESULTADOS:\n") f.write("-" * 80 + "\n") results = training_info['training_results'] if results['final_train_loss'] is not None: f.write(f"Loss Final (Treino): {results['final_train_loss']:.6f}\n") if results['final_eval_metrics']: f.write("\nMétricas Finais de Validação:\n") for key, value in results['final_eval_metrics'].items(): if isinstance(value, float): f.write(f" {key}: {value:.6f}\n") else: f.write(f" {key}: {value}\n") f.write(f"\nTotal de Steps: {results['total_steps']}\n") f.write(f"Status: {training_info['status']}\n") print(f"✅ Resumo salvo em: {summary_file}") if RealtimeMetricsCallback is not None and CompositePublisher is not None: try: for cb in callbacks: publisher = getattr(cb, "_publisher", None) if publisher is not None: publisher.log_artifact(str(results_file), "run-results") publisher.log_artifact(str(summary_file), "run-summary") break except Exception as _artifact_exc: print(f"AVISO: falha ao registrar artifacts de observabilidade: {_artifact_exc}") # Fazer push final print(f"Fazendo push do modelo final para {OUTPUT_REPO}") try: trainer.push_to_hub() print("✅ Push para Hub concluído!") except Exception as e: print(f"⚠️ Aviso: Erro ao fazer push para Hub: {e}") print("Os checkpoints estão salvos localmente em ./results") print("✅ Treinamento concluído!") def analyze_schema(csv_description: str, model_path: str = None): """ Função de inferência - modelo já 'obrigado' a pensar certo. Args: csv_description: Descrição do dataset CSV para análise model_path: Caminho para o modelo treinado (opcional, usa modelo atual se None) Returns: Análise EDA gerada pelo modelo treinado """ # Se model_path for fornecido, carregar modelo treinado inference_model = model inference_tokenizer = tokenizer if model_path: print(f"Carregando modelo treinado de: {model_path}") inference_tokenizer = AutoTokenizer.from_pretrained(model_path) inference_model = AutoModelForCausalLM.from_pretrained( model_path, device_map="auto", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 ) user_block = f"Analise o seguinte dataset:\n{csv_description}" if getattr(inference_tokenizer, "chat_template", None): messages = [ {"role": "system", "content": EDA_SYSTEM_PROMPT}, {"role": "user", "content": user_block}, ] prompt = inference_tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, ) else: prompt = f"""<|system|> {EDA_SYSTEM_PROMPT} <|user|> {user_block} <|assistant|> """ inputs = inference_tokenizer(prompt, return_tensors="pt").to(inference_model.device) output = inference_model.generate( **inputs, max_new_tokens=1200, temperature=0.2, do_sample=False ) return inference_tokenizer.decode(output[0], skip_special_tokens=True) # Exemplo de uso após o treinamento: # resultado = analyze_schema("Descrição do seu dataset aqui...") # print(resultado)