| |
| """ |
| Script de treinamento gerado para HuggingFace Training Platform. |
| Execute este script no HuggingFace Training ou localmente. |
| """ |
|
|
| import os |
| import sys |
|
|
|
|
| def _sanitize_thread_env() -> None: |
| """Evita libgomp: Invalid value quando OMP_NUM_THREADS vem vazio ou invalido (ex.: Space/parent).""" |
| raw = (os.environ.get("OMP_NUM_THREADS") or "").strip() |
| if not raw.isdigit() or int(raw) < 1: |
| os.environ["OMP_NUM_THREADS"] = "1" |
|
|
|
|
| _sanitize_thread_env() |
| os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") |
|
|
| import json |
| import tempfile |
| from datetime import datetime |
| from pathlib import Path |
|
|
| import torch |
| from datasets import load_dataset |
| from peft import LoraConfig, get_peft_model |
| from reporting import build_report_backends |
| from training_env import parse_env_int |
| from transformers import ( |
| AutoModelForCausalLM, |
| AutoTokenizer, |
| DataCollatorForLanguageModeling, |
| Trainer, |
| TrainerCallback, |
| TrainingArguments, |
| ) |
|
|
| try: |
| from bi_backend.application.training.evaluators import FunctionalEvalRunner |
| from bi_backend.application.training.observability import ( |
| CompositePublisher, |
| MlflowPublisher, |
| RealtimeMetricsCallback, |
| TensorboardPublisher, |
| WandbPublisher, |
| ) |
| except Exception: |
| FunctionalEvalRunner = None |
| CompositePublisher = None |
| MlflowPublisher = None |
| RealtimeMetricsCallback = None |
| TensorboardPublisher = None |
| WandbPublisher = None |
|
|
| |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-1.5B-Instruct") |
| DATASET_REPO = os.getenv("DATASET_REPO", "beAnalytic/eda-training-dataset") |
| OUTPUT_REPO = os.getenv("OUTPUT_REPO", "beAnalytic/eda-llm-qwen2.5-lora") |
| SPACE_BUILD_MARKER = "space-train-build-20260327-01" |
|
|
| |
| |
| MAX_SEQ_LENGTH = min( |
| 4096, |
| max(128, parse_env_int("MAX_SEQ_LENGTH", "MAX_SEQ_LEN", default=256, minimum=128)), |
| ) |
| PER_DEVICE_TRAIN_BATCH_SIZE = parse_env_int( |
| "PER_DEVICE_TRAIN_BATCH_SIZE", "PER_DEVICE_BATCH", default=1 |
| ) |
| PER_DEVICE_EVAL_BATCH_SIZE = parse_env_int( |
| "PER_DEVICE_EVAL_BATCH_SIZE", "PER_DEVICE_BATCH", default=1 |
| ) |
| GRADIENT_ACCUMULATION_STEPS = parse_env_int( |
| "GRADIENT_ACCUMULATION_STEPS", "GRAD_ACCUM", default=8 |
| ) |
| LORA_R = parse_env_int("LORA_R", default=16, minimum=1) |
| LORA_ALPHA = parse_env_int("LORA_ALPHA", default=32, minimum=1) |
|
|
|
|
| def _normalize_hf_token(raw: str) -> str: |
| t = raw.strip() |
| if len(t) >= 2 and ((t[0] == t[-1] == '"') or (t[0] == t[-1] == "'")): |
| t = t[1:-1].strip() |
| return t.replace("\n", "").replace("\r", "") |
|
|
|
|
| def _hub_login_from_env() -> None: |
| raw = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") or "" |
| token = _normalize_hf_token(raw) |
| if not token: |
| print( |
| "AVISO: HF_TOKEN nao definido. Datasets/modelos privados e push_to_hub podem falhar. " |
| "No Hugging Face Space: Settings > Repository secrets > HF_TOKEN (token com Write)." |
| ) |
| return |
| from huggingface_hub import login |
|
|
| try: |
| login(token=token, add_to_git_credential=False) |
| except Exception as exc: |
| print( |
| "ERRO: o Hub rejeitou o HF_TOKEN (401 / token invalido).\n" |
| " 1) Em https://huggingface.co/settings/tokens cria um token novo com permissao Write.\n" |
| " 2) No Space: Settings > Repository secrets > edita HF_TOKEN (sem aspas, sem espacos, uma linha).\n" |
| " 3) Factory reboot do Space apos guardar.\n" |
| f" Detalhe: {exc}", |
| file=sys.stderr, |
| ) |
| raise SystemExit(1) from exc |
|
|
|
|
| _hub_login_from_env() |
|
|
| |
| print(f"Build marker: {SPACE_BUILD_MARKER}") |
| print(f"Carregando dataset: {DATASET_REPO}") |
| DATASET_TRAIN_FILE = (os.environ.get("DATASET_TRAIN_FILE") or "train.jsonl").strip() |
| DATASET_VALIDATION_FILE = (os.environ.get("DATASET_VALIDATION_FILE") or "validation.jsonl").strip() |
| DATASET_TEST_FILE = (os.environ.get("DATASET_TEST_FILE") or "test.jsonl").strip() |
| DATASET_FORCE_CANONICAL_SPLITS = ( |
| (os.environ.get("DATASET_FORCE_CANONICAL_SPLITS") or "true").strip().lower() |
| in ("1", "true", "yes") |
| ) |
|
|
| if DATASET_FORCE_CANONICAL_SPLITS: |
| try: |
| data_files = {"train": DATASET_TRAIN_FILE} |
| if DATASET_VALIDATION_FILE: |
| data_files["validation"] = DATASET_VALIDATION_FILE |
| if DATASET_TEST_FILE: |
| data_files["test"] = DATASET_TEST_FILE |
| from huggingface_hub import hf_hub_download |
|
|
| def _coerce_input_text(value: object) -> str: |
| if isinstance(value, str): |
| return value |
| if isinstance(value, dict): |
| user_prompt = value.get("user_prompt") or value.get("prompt") or value.get("input") |
| data_context = value.get("data_context") |
| if isinstance(user_prompt, str) and user_prompt.strip(): |
| if data_context in (None, "", {}, []): |
| return user_prompt.strip() |
| return ( |
| f"{user_prompt.strip()}\n\nContexto tabular:\n" |
| f"{json.dumps(data_context, ensure_ascii=False)}" |
| ) |
| return json.dumps(value, ensure_ascii=False) |
| if value is None: |
| return "" |
| return str(value) |
|
|
| def _coerce_output_text(value: object) -> str: |
| if isinstance(value, str): |
| return value |
| if isinstance(value, dict): |
| answer = value.get("answer") or value.get("output") or value.get("text") |
| if isinstance(answer, str) and answer.strip(): |
| return answer.strip() |
| return json.dumps(value, ensure_ascii=False) |
| if value is None: |
| return "" |
| return str(value) |
|
|
| def _normalize_split_file(src_path: Path, dst_path: Path) -> int: |
| total = 0 |
| dst_path.parent.mkdir(parents=True, exist_ok=True) |
| with src_path.open(encoding="utf-8") as src, dst_path.open("w", encoding="utf-8") as dst: |
| for line_num, line in enumerate(src, 1): |
| line = line.strip() |
| if not line: |
| continue |
| obj = json.loads(line) |
| input_text = _coerce_input_text(obj.get("input")) |
| output_text = _coerce_output_text(obj.get("output")) |
| if not input_text.strip() or not output_text.strip(): |
| raise ValueError( |
| f"{src_path.name}:{line_num} sem conteudo textual em 'input'/'output'" |
| ) |
| dst.write( |
| json.dumps({"input": input_text, "output": output_text}, ensure_ascii=False) |
| + "\n" |
| ) |
| total += 1 |
| return total |
|
|
| workdir = Path(tempfile.mkdtemp(prefix="eda_ds_")) |
| local_data_files: dict[str, str] = {} |
| split_sizes: dict[str, int] = {} |
| for split_name, file_name in data_files.items(): |
| downloaded = Path( |
| hf_hub_download( |
| repo_id=DATASET_REPO, |
| filename=file_name, |
| repo_type="dataset", |
| ) |
| ) |
| normalized = workdir / f"{split_name}.jsonl" |
| split_sizes[split_name] = _normalize_split_file(downloaded, normalized) |
| local_data_files[split_name] = str(normalized) |
|
|
| print(f"Tentando carregar splits canônicos: {data_files}") |
| print(f"Splits normalizados localmente: {split_sizes}") |
| dataset = load_dataset("json", data_files=local_data_files) |
| except Exception as exc: |
| print( |
| "ERRO: falha ao carregar splits canônicos obrigatórios por data_files.\n" |
| "O treino foi interrompido para evitar fallback silencioso para dataset incorreto.\n" |
| f"Detalhe: {exc}", |
| file=sys.stderr, |
| ) |
| raise SystemExit(1) from exc |
| else: |
| dataset = load_dataset(DATASET_REPO) |
| if "test" in dataset: |
| print( |
| f"INFO: split 'test' presente ({len(dataset['test'])} exemplos) — reservado para benchmark " |
| "offline; nao entra no Trainer (train/validation apenas)." |
| ) |
|
|
| MIN_TRAIN_SAMPLES = parse_env_int("MIN_TRAIN_SAMPLES", default=100, minimum=1) |
| ALLOW_TINY_DATASET = (os.environ.get("ALLOW_TINY_DATASET") or "").strip().lower() in ( |
| "1", |
| "true", |
| "yes", |
| ) |
| train_size = len(dataset["train"]) if "train" in dataset else 0 |
| validation_size = len(dataset["validation"]) if "validation" in dataset else 0 |
| test_size = len(dataset["test"]) if "test" in dataset else 0 |
| print( |
| f"Dataset sizes: train={train_size}, validation={validation_size}, test={test_size}. " |
| f"MIN_TRAIN_SAMPLES={MIN_TRAIN_SAMPLES}" |
| ) |
| if train_size < MIN_TRAIN_SAMPLES and not ALLOW_TINY_DATASET: |
| print( |
| "ERRO: dataset pequeno demais para treino util.\n" |
| f" train={train_size} < MIN_TRAIN_SAMPLES={MIN_TRAIN_SAMPLES}\n" |
| " Ajuste o dataset no Hub (recomendado >= 300 para teste serio; >= 1000 para evolucao real).\n" |
| " Se for apenas smoke-test, rode com ALLOW_TINY_DATASET=true.", |
| file=sys.stderr, |
| ) |
| raise SystemExit(1) |
|
|
| |
| os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True") |
|
|
| |
| print(f"Carregando modelo: {MODEL_NAME}") |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) |
| tokenizer.pad_token = tokenizer.eos_token |
|
|
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| print(f"Usando dispositivo: {device}") |
|
|
| |
| |
| print("Carregando modelo (sem quantização, usando LoRA para eficiência)...") |
| _dt = torch.float16 if device == "cuda" else torch.float32 |
| _device_map = None |
| if device == "cuda": |
| _device_map = "auto" if torch.cuda.device_count() > 1 else {"": 0} |
|
|
| try: |
| model = AutoModelForCausalLM.from_pretrained( |
| MODEL_NAME, |
| dtype=_dt, |
| device_map=_device_map, |
| trust_remote_code=True, |
| use_cache=False, |
| low_cpu_mem_usage=True, |
| ) |
| except TypeError: |
| model = AutoModelForCausalLM.from_pretrained( |
| MODEL_NAME, |
| torch_dtype=_dt, |
| device_map=_device_map, |
| trust_remote_code=True, |
| use_cache=False, |
| low_cpu_mem_usage=True, |
| ) |
|
|
| if device == "cuda": |
| torch.cuda.empty_cache() |
|
|
| if device == "cpu": |
| print("⚠️ Modelo carregado em CPU - treinamento será mais lento") |
|
|
| |
| peft_config = LoraConfig( |
| r=LORA_R, |
| lora_alpha=LORA_ALPHA, |
| target_modules=['q_proj', 'v_proj', 'k_proj', 'o_proj'], |
| lora_dropout=0.1, |
| bias="none", |
| task_type="CAUSAL_LM", |
| ) |
|
|
| model = get_peft_model(model, peft_config) |
|
|
| if device == "cuda": |
| model.gradient_checkpointing_enable() |
| model.enable_input_require_grads() |
|
|
| |
| EDA_SYSTEM_PROMPT = ( |
| "Você é um analista de dados experiente, focado em gerar INSIGHTS e não em descrever processos técnicos.\n\n" |
| "Sua tarefa é realizar uma Análise Exploratória de Dados (EDA) extraindo padrões, tendências e comportamentos relevantes dos dados.\n\n" |
| "REGRAS OBRIGATÓRIAS:\n\n" |
| "1. NÃO descreva etapas técnicas, bibliotecas, código ou ferramentas (pandas, Python, gráficos, etc.).\n" |
| "2. NÃO explique \"como fazer\" a análise.\n" |
| "3. Extraia padrões, tendências e comportamentos relevantes dos dados.\n" |
| "4. Diferencie claramente:\n" |
| " • Observação (o que é visível nos dados)\n" |
| " • Interpretação (o que isso pode significar)\n" |
| " • Insight (qual a implicação prática ou de negócio)\n" |
| "5. Declare explicitamente o nível de confiança de cada insight (alto / médio / baixo).\n" |
| "6. Quando não houver dados suficientes, diga claramente \"não é possível afirmar\".\n\n" |
| "FORMATO OBRIGATÓRIO DA RESPOSTA:\n\n" |
| "Observações:\n" |
| "- …\n\n" |
| "Interpretações:\n" |
| "- …\n\n" |
| "Insights:\n" |
| "- …\n\n" |
| "Nível de confiança:\n" |
| "- …\n\n" |
| "OBJETIVO: Entregar conclusões úteis, claras e acionáveis, como um analista humano experiente faria." |
| ) |
|
|
| def format_prompt(example): |
| existing = (example.get("text") or "").strip() |
| if existing: |
| return {"text": existing} |
| input_text = example.get("input", "") |
| output_text = example.get("output", "") |
| if getattr(tokenizer, "chat_template", None): |
| messages = [ |
| {"role": "system", "content": EDA_SYSTEM_PROMPT}, |
| {"role": "user", "content": input_text}, |
| {"role": "assistant", "content": output_text}, |
| ] |
| text = tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=False, |
| ) |
| return {"text": text} |
| legacy = ( |
| f"<|system|>\n{EDA_SYSTEM_PROMPT}\n<|user|>\n{input_text}\n<|assistant|>\n{output_text}" |
| ) |
| return {"text": legacy} |
|
|
| |
| train_dataset = dataset["train"].map( |
| format_prompt, |
| remove_columns=dataset["train"].column_names, |
| num_proc=1, |
| ) |
| eval_dataset = None |
|
|
| |
| if "validation" in dataset: |
| if len(dataset["validation"]) == 0: |
| print("AVISO: split 'validation' existe, mas esta vazio. Treino seguira sem avaliacao.") |
| else: |
| eval_dataset = dataset["validation"].map( |
| format_prompt, |
| remove_columns=dataset["validation"].column_names, |
| num_proc=1, |
| ) |
| else: |
| n_train_raw = len(dataset["train"]) |
| if n_train_raw < 2: |
| print( |
| "AVISO: Dataset de validacao ausente e split 'train' com menos de 2 exemplos. " |
| "Treino seguira sem avaliacao (nao e possivel fazer split 80/20)." |
| ) |
| else: |
| print( |
| "AVISO: Dataset de validacao nao encontrado no Hub. " |
| "Criando divisao aleatoria 80/20 a partir de 'train' (preferir split 'validation' " |
| "e/ou split estratificado por familia via scripts/eda_ft_split_stratified.py)." |
| ) |
| split_dataset = dataset["train"].train_test_split(test_size=0.2, seed=42) |
| train_dataset = split_dataset["train"].map( |
| format_prompt, |
| remove_columns=split_dataset["train"].column_names, |
| num_proc=1, |
| ) |
| eval_dataset = split_dataset["test"].map( |
| format_prompt, |
| remove_columns=split_dataset["test"].column_names, |
| num_proc=1, |
| ) |
| print( |
| f"✅ Dataset dividido: {len(train_dataset)} exemplos de treino, " |
| f"{len(eval_dataset)} exemplos de validação" |
| ) |
|
|
| |
| def tokenize_function(examples): |
| return tokenizer( |
| examples["text"], |
| truncation=True, |
| max_length=MAX_SEQ_LENGTH, |
| padding="max_length", |
| ) |
|
|
| train_dataset = train_dataset.map( |
| tokenize_function, |
| batched=True, |
| remove_columns=["text"], |
| num_proc=1, |
| ) |
| if eval_dataset is not None and len(eval_dataset) > 0: |
| eval_dataset = eval_dataset.map( |
| tokenize_function, |
| batched=True, |
| remove_columns=["text"], |
| num_proc=1, |
| ) |
| else: |
| eval_dataset = None |
|
|
| |
| logs_dir = Path("./logs") |
| logs_dir.mkdir(exist_ok=True) |
|
|
| |
| _output_dir = os.environ.get("TRAINING_OUTPUT_DIR", "./results") |
| _logging_dir = os.environ.get("TENSORBOARD_LOGDIR") or os.environ.get("LOGGING_DIR") or _output_dir |
| _run_name = os.environ.get("TRAINING_RUN_NAME") or f"eda-lora-{datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}" |
|
|
| os.environ["TENSORBOARD_LOGGING_DIR"] = str(Path(_logging_dir).resolve()) |
|
|
| print( |
| f"Memoria VRAM: max_seq_length={MAX_SEQ_LENGTH}, train_bs={PER_DEVICE_TRAIN_BATCH_SIZE}, " |
| f"eval_bs={PER_DEVICE_EVAL_BATCH_SIZE}, grad_accum={GRADIENT_ACCUMULATION_STEPS}, " |
| f"lora_r={LORA_R}, lora_alpha={LORA_ALPHA}, gradient_checkpointing=True" |
| ) |
| if (os.environ.get("GPU_MAX_MEMORY") or "").strip(): |
| print( |
| "AVISO: GPU_MAX_MEMORY nao e aplicado por este train.py; " |
| "ajusta batch/seq/grad_accum ou o hardware no Space." |
| ) |
| if device == "cuda": |
| print( |
| "Se o treino falhar com OOM e o erro do CUDA mostrar varios processos na mesma GPU, " |
| "faz Factory reboot do Space (reinicios sem libertar VRAM podem acumular processos)." |
| ) |
|
|
| _report_to = build_report_backends() |
| _has_eval = eval_dataset is not None and len(eval_dataset) > 0 |
|
|
| |
| training_args = TrainingArguments( |
| output_dir=_output_dir, |
| run_name=_run_name, |
| num_train_epochs=300, |
| per_device_train_batch_size=PER_DEVICE_TRAIN_BATCH_SIZE, |
| per_device_eval_batch_size=PER_DEVICE_EVAL_BATCH_SIZE, |
| learning_rate=3e-05, |
| warmup_steps=100, |
| logging_steps=10, |
| logging_strategy="steps", |
| logging_first_step=True, |
| save_steps=500, |
| eval_strategy="steps" if _has_eval else "no", |
| eval_steps=500 if _has_eval else None, |
| save_total_limit=3, |
| load_best_model_at_end=_has_eval, |
| metric_for_best_model="eval_loss" if _has_eval else None, |
| greater_is_better=False if _has_eval else None, |
| fp16=device == "cuda", |
| gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS, |
| gradient_checkpointing=True, |
| dataloader_pin_memory=False, |
| dataloader_num_workers=0, |
| push_to_hub=True, |
| hub_model_id=OUTPUT_REPO, |
| hub_strategy="checkpoint", |
| report_to=_report_to, |
| ) |
|
|
| print( |
| f"TensorBoard: logs em {Path(_logging_dir).resolve()} (run_name={_run_name}). " |
| "Apos o treino, analise com: python scripts/launch_tensorboard.py " |
| f"--logdir {Path(_logging_dir).as_posix()}" |
| ) |
| if "wandb" in _report_to: |
| print( |
| f"Weights & Biases: project={os.environ.get('WANDB_PROJECT')} " |
| f"entity={os.environ.get('WANDB_ENTITY')} run_name={_run_name}. " |
| "Credenciais: WANDB_API_KEY (ou WANDB_MODE=offline para so local)." |
| ) |
| if not _has_eval: |
| print( |
| "AVISO: sem dataset de validacao. " |
| "eval_strategy=no e load_best_model_at_end=False para evitar falha em datasets pequenos." |
| ) |
|
|
| _ds_ver = (os.environ.get("DATASET_VERSION") or "").strip() |
| if _ds_ver: |
| _prev_tags = (os.environ.get("WANDB_TAGS") or "").strip() |
| _tag = f"dataset_version:{_ds_ver}" |
| os.environ["WANDB_TAGS"] = f"{_prev_tags},{_tag}" if _prev_tags else _tag |
|
|
|
|
| class WandbDatasetEnvCallback(TrainerCallback): |
| """Envia DATASET_* ao wandb.config quando o run ja existir.""" |
|
|
| def on_train_begin(self, args, state, control, **kwargs): |
| try: |
| import wandb |
|
|
| if wandb.run is None: |
| return |
| extra: dict[str, str] = {} |
| for key in ("DATASET_VERSION", "DATASET_MANIFEST_HASH", "DATASET_MANIFEST_PATH"): |
| v = (os.environ.get(key) or "").strip() |
| if v: |
| extra[key.lower()] = v |
| if extra: |
| wandb.config.update(extra, allow_val_change=True) |
| except Exception: |
| return |
|
|
|
|
| |
| data_collator = DataCollatorForLanguageModeling( |
| tokenizer=tokenizer, |
| mlm=False, |
| ) |
|
|
| |
| callbacks: list[TrainerCallback] = [WandbDatasetEnvCallback()] |
| if RealtimeMetricsCallback is not None and CompositePublisher is not None: |
| publishers = [] |
| if WandbPublisher is not None: |
| publishers.append(WandbPublisher()) |
| if MlflowPublisher is not None and (os.environ.get("MLFLOW_TRACKING_URI") or "").strip(): |
| publishers.append(MlflowPublisher()) |
| if TensorboardPublisher is not None and (os.environ.get("ENABLE_TENSORBOARD_PUBLISHER") or "").strip().lower() in ( |
| "1", |
| "true", |
| "yes", |
| ): |
| publishers.append(TensorboardPublisher()) |
| if publishers: |
| composite = CompositePublisher(publishers) |
| composite.log_params( |
| { |
| "base_model": MODEL_NAME, |
| "dataset_repo": DATASET_REPO, |
| "output_repo": OUTPUT_REPO, |
| "lora_r": LORA_R, |
| "lora_alpha": LORA_ALPHA, |
| "lora_dropout": 0.1, |
| "learning_rate": training_args.learning_rate, |
| "batch_size": PER_DEVICE_TRAIN_BATCH_SIZE, |
| "gradient_accumulation_steps": GRADIENT_ACCUMULATION_STEPS, |
| } |
| ) |
| dataset_version_tag = (os.environ.get("DATASET_VERSION") or "").strip() |
| git_commit_tag = (os.environ.get("GIT_COMMIT") or "").strip() |
| tags = {} |
| if dataset_version_tag: |
| tags["dataset_version"] = dataset_version_tag |
| if git_commit_tag: |
| tags["git_commit"] = git_commit_tag |
| if tags: |
| composite.log_tags(tags) |
|
|
| functional_runner = None |
| benchmark_path = ( |
| os.environ.get("FUNCTIONAL_BENCHMARK_PATH") |
| or "/app/benchmarks/gold_sample.json" |
| ) |
| benchmark_cases = parse_env_int("FUNCTIONAL_EVAL_MAX_CASES", default=2, minimum=1) |
| if FunctionalEvalRunner is not None and Path(benchmark_path).is_file(): |
| def _predict_for_functional_eval(user_prompt: str, data_context: dict) -> str: |
| if getattr(tokenizer, "chat_template", None): |
| messages = [ |
| {"role": "system", "content": EDA_SYSTEM_PROMPT}, |
| {"role": "user", "content": f"{user_prompt}\n\nContexto: {json.dumps(data_context, ensure_ascii=False)}"}, |
| ] |
| prompt = tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=True, |
| ) |
| else: |
| prompt = ( |
| f"<|system|>\n{EDA_SYSTEM_PROMPT}\n<|user|>\n" |
| f"{user_prompt}\n\nContexto: {json.dumps(data_context, ensure_ascii=False)}\n" |
| "<|assistant|>\n" |
| ) |
| inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_SEQ_LENGTH) |
| if torch.cuda.is_available(): |
| inputs = {k: v.to(model.device) for k, v in inputs.items()} |
| output = model.generate( |
| **inputs, |
| max_new_tokens=220, |
| do_sample=False, |
| temperature=0.0, |
| ) |
| return tokenizer.decode(output[0], skip_special_tokens=True) |
|
|
| functional_runner = FunctionalEvalRunner( |
| benchmark_path=str(benchmark_path), |
| predict_fn=_predict_for_functional_eval, |
| max_cases=benchmark_cases, |
| ) |
| else: |
| print( |
| f"AVISO: benchmark funcional não encontrado em {benchmark_path}; " |
| "métricas eval/hallucination|grounding|limitation|actionability não serão publicadas." |
| ) |
| callbacks.append(RealtimeMetricsCallback(composite, functional_eval_runner=functional_runner)) |
|
|
| trainer = Trainer( |
| model=model, |
| args=training_args, |
| train_dataset=train_dataset, |
| eval_dataset=eval_dataset, |
| data_collator=data_collator, |
| callbacks=callbacks, |
| ) |
|
|
| |
| print("Iniciando treinamento...") |
| try: |
| train_output = trainer.train() |
| except Exception as e: |
| print(f"❌ Erro durante treinamento: {e}") |
| |
| train_output = None |
| |
| |
| try: |
| state = trainer.state |
| final_log_history = state.log_history if hasattr(state, 'log_history') and state.log_history else [] |
| except: |
| final_log_history = [] |
| |
| |
| error_info = { |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "error": str(e), |
| "model_name": MODEL_NAME, |
| "dataset_repo": DATASET_REPO, |
| "status": "failed" |
| } |
| |
| error_file = logs_dir / f"training_error_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" |
| with open(error_file, 'w', encoding='utf-8') as f: |
| json.dump(error_info, f, indent=2, ensure_ascii=False) |
| print(f"✅ Informações de erro salvas em: {error_file}") |
| |
| raise |
|
|
| |
| state = trainer.state |
| final_log_history = state.log_history if hasattr(state, 'log_history') and state.log_history else [] |
|
|
| |
| final_train_loss = None |
| if train_output and hasattr(train_output, 'training_loss'): |
| final_train_loss = train_output.training_loss |
| elif final_log_history: |
| for log_entry in reversed(final_log_history): |
| if 'loss' in log_entry and 'eval_loss' not in log_entry: |
| final_train_loss = log_entry.get('loss') |
| break |
|
|
| |
| last_eval_metrics = {} |
| if final_log_history: |
| for log_entry in reversed(final_log_history): |
| if 'eval_loss' in log_entry: |
| last_eval_metrics = {k: v for k, v in log_entry.items() if k.startswith('eval_')} |
| break |
|
|
| |
| training_info = { |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "model_name": MODEL_NAME, |
| "dataset_repo": DATASET_REPO, |
| "output_repo": OUTPUT_REPO, |
| "training_config": { |
| "num_train_epochs": training_args.num_train_epochs, |
| "per_device_train_batch_size": training_args.per_device_train_batch_size, |
| "per_device_eval_batch_size": training_args.per_device_eval_batch_size, |
| "gradient_accumulation_steps": training_args.gradient_accumulation_steps, |
| "learning_rate": training_args.learning_rate, |
| "warmup_steps": training_args.warmup_steps, |
| "fp16": training_args.fp16, |
| "max_seq_length": MAX_SEQ_LENGTH, |
| "gradient_checkpointing": True, |
| "tensorboard_logging_dir": _logging_dir, |
| "tensorboard_run_name": _run_name, |
| "report_to": _report_to, |
| "wandb_project": os.environ.get("WANDB_PROJECT") if "wandb" in _report_to else None, |
| "wandb_entity": os.environ.get("WANDB_ENTITY") if "wandb" in _report_to else None, |
| }, |
| "dataset_info": { |
| "train_samples": len(train_dataset), |
| "eval_samples": len(eval_dataset) if eval_dataset else 0, |
| "dataset_version": os.environ.get("DATASET_VERSION"), |
| "dataset_manifest_hash": os.environ.get("DATASET_MANIFEST_HASH"), |
| "dataset_manifest_path": os.environ.get("DATASET_MANIFEST_PATH"), |
| }, |
| "training_results": { |
| "final_train_loss": final_train_loss, |
| "final_eval_metrics": last_eval_metrics, |
| "total_steps": len(final_log_history) if final_log_history else 0, |
| "log_history": final_log_history[-50:], |
| }, |
| "status": "completed", |
| } |
|
|
| |
| results_file = logs_dir / f"training_results_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" |
| with open(results_file, 'w', encoding='utf-8') as f: |
| json.dump(training_info, f, indent=2, ensure_ascii=False) |
| print(f"✅ Resultados salvos em: {results_file}") |
|
|
| |
| summary_file = logs_dir / f"training_summary_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.txt" |
| with open(summary_file, 'w', encoding='utf-8') as f: |
| f.write("=" * 80 + "\n") |
| f.write("RESUMO DO TREINAMENTO\n") |
| f.write("=" * 80 + "\n\n") |
| f.write(f"Data/Hora: {training_info['timestamp']}\n") |
| f.write(f"Modelo: {MODEL_NAME}\n") |
| f.write(f"Dataset: {DATASET_REPO}\n") |
| f.write(f"Output: {OUTPUT_REPO}\n\n") |
| |
| f.write("CONFIGURAÇÃO DE TREINAMENTO:\n") |
| f.write("-" * 80 + "\n") |
| config = training_info['training_config'] |
| f.write(f"Épocas: {config['num_train_epochs']}\n") |
| f.write(f"Batch Size (train): {config['per_device_train_batch_size']}\n") |
| f.write(f"Batch Size (eval): {config['per_device_eval_batch_size']}\n") |
| f.write(f"Gradient Accumulation Steps: {config['gradient_accumulation_steps']}\n") |
| f.write(f"Learning Rate: {config['learning_rate']}\n") |
| f.write(f"Warmup Steps: {config['warmup_steps']}\n") |
| f.write(f"FP16: {config['fp16']}\n\n") |
| |
| f.write("DATASET:\n") |
| f.write("-" * 80 + "\n") |
| dataset_info = training_info['dataset_info'] |
| f.write(f"Amostras de Treino: {dataset_info['train_samples']}\n") |
| f.write(f"Amostras de Validação: {dataset_info['eval_samples']}\n\n") |
| |
| f.write("RESULTADOS:\n") |
| f.write("-" * 80 + "\n") |
| results = training_info['training_results'] |
| if results['final_train_loss'] is not None: |
| f.write(f"Loss Final (Treino): {results['final_train_loss']:.6f}\n") |
| |
| if results['final_eval_metrics']: |
| f.write("\nMétricas Finais de Validação:\n") |
| for key, value in results['final_eval_metrics'].items(): |
| if isinstance(value, float): |
| f.write(f" {key}: {value:.6f}\n") |
| else: |
| f.write(f" {key}: {value}\n") |
| |
| f.write(f"\nTotal de Steps: {results['total_steps']}\n") |
| f.write(f"Status: {training_info['status']}\n") |
|
|
| print(f"✅ Resumo salvo em: {summary_file}") |
|
|
| if RealtimeMetricsCallback is not None and CompositePublisher is not None: |
| try: |
| for cb in callbacks: |
| publisher = getattr(cb, "_publisher", None) |
| if publisher is not None: |
| publisher.log_artifact(str(results_file), "run-results") |
| publisher.log_artifact(str(summary_file), "run-summary") |
| break |
| except Exception as _artifact_exc: |
| print(f"AVISO: falha ao registrar artifacts de observabilidade: {_artifact_exc}") |
|
|
| |
| print(f"Fazendo push do modelo final para {OUTPUT_REPO}") |
| try: |
| trainer.push_to_hub() |
| print("✅ Push para Hub concluído!") |
| except Exception as e: |
| print(f"⚠️ Aviso: Erro ao fazer push para Hub: {e}") |
| print("Os checkpoints estão salvos localmente em ./results") |
|
|
| print("✅ Treinamento concluído!") |
|
|
|
|
| def analyze_schema(csv_description: str, model_path: str = None): |
| """ |
| Função de inferência - modelo já 'obrigado' a pensar certo. |
| |
| Args: |
| csv_description: Descrição do dataset CSV para análise |
| model_path: Caminho para o modelo treinado (opcional, usa modelo atual se None) |
| |
| Returns: |
| Análise EDA gerada pelo modelo treinado |
| """ |
| |
| inference_model = model |
| inference_tokenizer = tokenizer |
| |
| if model_path: |
| print(f"Carregando modelo treinado de: {model_path}") |
| inference_tokenizer = AutoTokenizer.from_pretrained(model_path) |
| inference_model = AutoModelForCausalLM.from_pretrained( |
| model_path, |
| device_map="auto", |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 |
| ) |
| |
| user_block = f"Analise o seguinte dataset:\n{csv_description}" |
| if getattr(inference_tokenizer, "chat_template", None): |
| messages = [ |
| {"role": "system", "content": EDA_SYSTEM_PROMPT}, |
| {"role": "user", "content": user_block}, |
| ] |
| prompt = inference_tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=True, |
| ) |
| else: |
| prompt = f"""<|system|> |
| {EDA_SYSTEM_PROMPT} |
| <|user|> |
| {user_block} |
| <|assistant|> |
| """ |
|
|
| inputs = inference_tokenizer(prompt, return_tensors="pt").to(inference_model.device) |
|
|
| output = inference_model.generate( |
| **inputs, |
| max_new_tokens=1200, |
| temperature=0.2, |
| do_sample=False |
| ) |
|
|
| return inference_tokenizer.decode(output[0], skip_special_tokens=True) |
|
|
|
|
| |
| |
| |
|
|