CPU_Fine_tuning / app.py
Fedir-Ilina's picture
Rename CPU_Fine_tuning.py to app.py
ef13e68 verified
# app.py
# =========================================================================
# 0. Bootstrap — pip upgrades antes de qualquer import sensível
# =========================================================================
import os
import sys
import subprocess
subprocess.check_call([
sys.executable, "-m", "pip", "install", "--quiet", "--upgrade",
"urllib3<2.0", "charset_normalizer<3.4", "huggingface_hub"
])
BASE_PATH = os.path.dirname(os.path.abspath(__file__))
# ================================================
# HF Persistent Storage — bucket montada em /data
# Em local, cai back para BASE_PATH
# ================================================
HF_DATASET_REPO = "Fedir-Ilina/CPU_Fine_tuning" # só para referência/documentação
_STORAGE_ROOT = "/data" if os.path.isdir("/data") else BASE_PATH
FINAL_OUTPUT_DIR = os.path.join(_STORAGE_ROOT, "trained_model_output")
DATASET_OUT_DIR = os.path.join(_STORAGE_ROOT, "dataset_output")
GGUF_OUT_DIR = os.path.join(_STORAGE_ROOT, "gguf_output")
os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
os.makedirs(DATASET_OUT_DIR, exist_ok=True)
os.makedirs(GGUF_OUT_DIR, exist_ok=True)
# ================================================
# Suprimir warnings antes de qualquer outro import
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
try:
from requests.packages.urllib3.exceptions import DependencyWarning
warnings.simplefilter("ignore", DependencyWarning)
except ImportError:
pass
# Silenciar oneDNN / TF (mesmo sem TensorFlow instalado)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "0"
# =========================================================================
# 1. Stdlib
# =========================================================================
import glob
import inspect
import json
import logging
import math
import platform
import statistics
import threading
import time
import traceback
from contextlib import contextmanager, suppress
from datetime import datetime, timedelta
from threading import Thread
from typing import Any, Dict, Union
from warnings import warn as log_warning
# =========================================================================
# 2. Third-party — sistema / hardware
# =========================================================================
import chardet
import cpuinfo
import ctypes as ct
import psutil
# =========================================================================
# 3. PyTorch
# =========================================================================
import torch
import torch.nn as nn
# =========================================================================
# 4. Flask
# =========================================================================
from flask import Flask, jsonify, render_template, request, send_file, send_from_directory
# =========================================================================
# 5. HuggingFace / PEFT
# =========================================================================
from datasets import Dataset, load_from_disk
from peft import LoraConfig, PeftModel, TaskType, get_peft_model
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
DataCollatorForLanguageModeling,
Trainer,
TrainerCallback,
TrainerControl,
TrainerState,
TrainingArguments,
)
# =========================================================================
# 6. Local
# =========================================================================
from config_manager import (
_load_constants_from_file,
map_backend_to_frontend,
update_python_constants,
)
# =========================================================================
# 7. Logging
# =========================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
#---------------------
app = Flask(__name__)
all_data = []
#--------------------
print("-" * 80)
#-----------------------------------------------------------
def _ts():
return datetime.now().strftime("%H:%M:%S")
# -------------------------
# Deteção P/E cross-platform
# -------------------------
def detect_core_types():
"""
Tenta:
1) Windows: EfficiencyClass (Windows 11 expõe isto; em máquinas homogéneas virá tudo Efficiency=0).
2) Linux: sysfs core_type (1=Atom/E, 2=Core/P; 0=Unknown).
3) Fallback: assume homogéneo → todos P, E vazio.
Retorna (P_IDS, E_IDS, meta_dict).
"""
logical = psutil.cpu_count(logical=True) or os.cpu_count() or 1
# 1) Windows EfficiencyClass
if platform.system() == "Windows":
try:
RELATION_PROCESSOR_CORE = 0
class GROUP_AFFINITY(ct.Structure):
_fields_ = [("Mask", ct.c_ulonglong),
("Group", ct.c_ushort),
("Reserved", ct.c_ushort * 3)]
class PROCESSOR_RELATIONSHIP(ct.Structure):
_fields_ = [("Flags", ct.c_ubyte),
("EfficiencyClass", ct.c_ubyte),
("Reserved", ct.c_ubyte * 20),
("GroupCount", ct.c_ushort)]
class SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_HEADER(ct.Structure):
_fields_ = [("Relationship", ct.c_int),
("Size", ct.c_ulong)]
GetLPIEx = ct.windll.kernel32.GetLogicalProcessorInformationEx
GetLPIEx.restype = ct.c_bool
GetLPIEx.argtypes = [ct.c_int, ct.c_void_p, ct.POINTER(ct.c_ulong)]
buf_size = ct.c_ulong(0)
GetLPIEx(RELATION_PROCESSOR_CORE, None, ct.byref(buf_size))
if buf_size.value:
buf = (ct.c_byte * buf_size.value)()
if GetLPIEx(RELATION_PROCESSOR_CORE, ct.byref(buf), ct.byref(buf_size)):
eff_by_logical = {}
offset = 0
single_group = (logical <= 64)
while offset < buf_size.value:
header = SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_HEADER.from_buffer(buf, offset)
size = header.Size
if header.Relationship == RELATION_PROCESSOR_CORE:
pr = PROCESSOR_RELATIONSHIP.from_buffer(buf, offset + ct.sizeof(header))
eff = pr.EfficiencyClass # 0 = mais performante (P), >0 = mais eficiente (E)
ga_offset = offset + ct.sizeof(header) + ct.sizeof(PROCESSOR_RELATIONSHIP)
for i in range(pr.GroupCount):
ga = GROUP_AFFINITY.from_buffer(buf, ga_offset + i * ct.sizeof(GROUP_AFFINITY))
mask = ga.Mask
if single_group:
for bit in range(64):
if (mask >> bit) & 1:
eff_by_logical[bit] = eff
else:
# Multi-grupo (>64 lógicos): mapping global complexo, cai para fallback.
pass
offset += size
if eff_by_logical:
p_ids = sorted([i for i in range(logical) if eff_by_logical.get(i, 0) == 0])
e_ids = sorted([i for i in range(logical) if eff_by_logical.get(i, 0) > 0])
return p_ids, e_ids, {"method": "windows_efficiencyclass", "notes": []}
except Exception:
pass # continua para Linux/fallback
# 2) Linux sysfs core_type
if platform.system() == "Linux":
try:
p_ids, e_ids, unknown = [], [], []
for cpu in range(logical):
path = f"/sys/devices/system/cpu/cpu{cpu}/topology/core_type"
try:
with open(path) as f:
val = f.read().strip()
except FileNotFoundError:
val = None
if val is None:
p_ids = e_ids = []
break
try:
t = int(val)
except ValueError:
t = 0
# Kernel moderno: 1=Atom(E), 2=Core(P), 0=Unknown
if t == 2:
p_ids.append(cpu)
elif t == 1:
e_ids.append(cpu)
else:
unknown.append(cpu)
if p_ids or e_ids:
notes = []
if unknown:
notes.append(f"{len(unknown)} CPUs com core_type=Unknown (tratados como P).")
p_ids = sorted(p_ids + unknown) # conservador: desconhecidos como P
return sorted(p_ids), sorted(e_ids), {"method": "linux_core_type", "notes": notes}
except Exception:
pass
# 3) Fallback: homogéneo
return list(range(logical)), [], {"method": "homogeneous_fallback", "notes": []}
# -------------------------
# Mapa de siblings (HT) por core físico
# -------------------------
def get_core_siblings():
"""
Retorna lista de listas: cada sublista são os logical IDs que pertencem ao mesmo core físico.
"""
logical = psutil.cpu_count(logical=True) or 1
if platform.system() == "Windows":
try:
RELATION_PROCESSOR_CORE = 0
class GROUP_AFFINITY(ct.Structure):
_fields_ = [("Mask", ct.c_ulonglong),
("Group", ct.c_ushort),
("Reserved", ct.c_ushort * 3)]
class PROCESSOR_RELATIONSHIP(ct.Structure):
_fields_ = [("Flags", ct.c_ubyte),
("EfficiencyClass", ct.c_ubyte),
("Reserved", ct.c_ubyte * 20),
("GroupCount", ct.c_ushort)]
class SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_HEADER(ct.Structure):
_fields_ = [("Relationship", ct.c_int),
("Size", ct.c_ulong)]
GetLPIEx = ct.windll.kernel32.GetLogicalProcessorInformationEx
GetLPIEx.restype = ct.c_bool
GetLPIEx.argtypes = [ct.c_int, ct.c_void_p, ct.POINTER(ct.c_ulong)]
buf_size = ct.c_ulong(0)
GetLPIEx(RELATION_PROCESSOR_CORE, None, ct.byref(buf_size))
if not buf_size.value:
return []
buf = (ct.c_byte * buf_size.value)()
if not GetLPIEx(RELATION_PROCESSOR_CORE, ct.byref(buf), ct.byref(buf_size)):
return []
siblings = []
offset = 0
single_group = (logical <= 64)
while offset < buf_size.value:
header = SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_HEADER.from_buffer(buf, offset)
size = header.Size
if header.Relationship == RELATION_PROCESSOR_CORE:
pr = PROCESSOR_RELATIONSHIP.from_buffer(buf, offset + ct.sizeof(header))
ga_offset = offset + ct.sizeof(header) + ct.sizeof(PROCESSOR_RELATIONSHIP)
core_logicals = []
for i in range(pr.GroupCount):
ga = GROUP_AFFINITY.from_buffer(buf, ga_offset + i * ct.sizeof(GROUP_AFFINITY))
mask = ga.Mask
for bit in range(64):
if (mask >> bit) & 1:
core_logicals.append(bit) # single/multi-grupo: aproximação razoável
if core_logicals:
siblings.append(sorted(set(core_logicals)))
offset += size
return sorted(siblings, key=lambda s: min(s) if s else 1e9)
except Exception:
return []
elif platform.system() == "Linux":
try:
sibs = []
for cpu in range(logical):
path = f"/sys/devices/system/cpu/cpu{cpu}/topology/thread_siblings_list"
try:
with open(path) as f:
txt = f.read().strip()
except FileNotFoundError:
return []
items = []
for part in txt.split(","):
if "-" in part:
a, b = part.split("-")
items.extend(range(int(a), int(b) + 1))
else:
items.append(int(part))
sibs.append(sorted(set(items)))
# Deduplica sublistas iguais
seen = set()
uniq = []
for s in sibs:
t = tuple(s)
if t not in seen:
seen.add(t)
uniq.append(s)
return sorted(uniq, key=lambda s: min(s))
except Exception:
return []
return []
def order_by_physical_first(candidates, siblings_map):
"""
Reordena 'candidates' para usar primeiro 1 logical por core físico (evita siblings logo de início).
Se 'siblings_map' estiver vazio, retorna candidatos ordenados naturalmente.
"""
if not siblings_map:
return sorted(candidates)
cand_set = set(candidates)
first_pass = [next((x for x in g if x in cand_set), None) for g in siblings_map]
first_pass = [x for x in first_pass if x is not None]
others = [x for g in siblings_map for x in g if x in cand_set and x not in first_pass]
leftovers = [x for x in sorted(candidates) if x not in first_pass and x not in others]
return first_pass + others + leftovers
# =========================================================================
# 1. Configuração base de hardware, Otimização e Hiperparâmetros
# =========================================================================
# --- 1.1 Hardware ---
LOGICAL_CPUS = psutil.cpu_count(logical=True) or os.cpu_count() or 16
PHYSICAL_CPUS = psutil.cpu_count(logical=False) or max(1, LOGICAL_CPUS // 2)
TOTAL_RAM_GB = psutil.virtual_memory().total / (1024 ** 3)
# ===============================================================
# Ajuste de performance (auto/manual)
# ===============================================================
OMP_THREADS_UTILIZATION = 0.95
DETECTION_PERFORMANCE = "auto" # "auto" | "manual"
CORES_UTILIZATION = 1.0
def compute_cores(cpu_count: int, utilization: float) -> int:
"""Aplica uma utilização fracional a um contagem de CPUs com clamp [1, cpu_count]."""
return max(1, min(int(math.floor(cpu_count * utilization)), cpu_count))
# NOTA: compute_effective_cores e compute_omp_threads faziam exactamente o mesmo;
# ambos substituídos por compute_cores acima. Usos em baixo são drop-in equivalentes.
EFFECTIVE_LOGICAL_CPUS = compute_cores(LOGICAL_CPUS, CORES_UTILIZATION)
# --- Detectar núcleos P/E e filtrar para o limite efectivo ---
_DET_P_full, _DET_E_full, _META = detect_core_types()
_DET_P = [i for i in _DET_P_full if i < EFFECTIVE_LOGICAL_CPUS]
_DET_E = [i for i in _DET_E_full if i < EFFECTIVE_LOGICAL_CPUS]
# ===============================================================
# Meta-informação de arranque (modo auto/manual)
# ===============================================================
OMP_THREADS = compute_cores(EFFECTIVE_LOGICAL_CPUS, OMP_THREADS_UTILIZATION)
if DETECTION_PERFORMANCE.lower() == "auto":
total_det = len(_DET_P) + len(_DET_E)
if total_det == 0:
_AUTO_META = "auto (fallback homogéneo)"
elif len(_DET_P) / total_det >= 0.5:
_AUTO_META = f"auto (balanceado P/E → OMP={OMP_THREADS})"
else:
_AUTO_META = f"auto (balanceado E-heavy → OMP={OMP_THREADS})"
else:
_AUTO_META = "manual"
print(f"[INFO] [{_ts()}] Modo de desempenho: {DETECTION_PERFORMANCE} ({_AUTO_META})")
# --- 1.2 Alocação de cores para tensores (OMP) e DataLoader ---
siblings = get_core_siblings()
ordered_P = order_by_physical_first(_DET_P, siblings)
ordered_E = order_by_physical_first(_DET_E, siblings)
needed = OMP_THREADS
P_CORE_IDS = []
if ordered_P:
take = ordered_P[:min(needed, len(ordered_P))]
P_CORE_IDS.extend(take)
needed -= len(take)
if needed > 0 and ordered_E:
take = [i for i in ordered_E if i not in P_CORE_IDS][:needed]
P_CORE_IDS.extend(take)
needed -= len(take)
# Fallback homogéneo
if not P_CORE_IDS:
P_CORE_IDS = list(range(min(OMP_THREADS, EFFECTIVE_LOGICAL_CPUS)))
# Secundários (DataLoader): E sobrantes → P sobrantes → restantes
used = set(P_CORE_IDS)
available = set(range(EFFECTIVE_LOGICAL_CPUS))
secondary = (
[i for i in _DET_E if i not in used and i in available] +
[i for i in _DET_P if i not in used and i in available] +
[i for i in available if i not in used and i not in _DET_P and i not in _DET_E]
)
_seen = set()
REMAINING_CORE_IDS = [x for x in secondary if not (x in _seen or _seen.add(x))]
DATALOADER_WORKERS = min(len(REMAINING_CORE_IDS), max(0, EFFECTIVE_LOGICAL_CPUS - len(P_CORE_IDS)))
# Variáveis de ambiente
os.environ["OMP_NUM_THREADS"] = str(OMP_THREADS)
os.environ["MKL_NUM_THREADS"] = str(OMP_THREADS)
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# Afinidade
#-----------------------------------
def set_affinity(core_ids):
try:
if platform.system() == "Linux":
os.sched_setaffinity(0, set(core_ids))
elif platform.system() == "Windows":
psutil.Process().cpu_affinity(core_ids)
except Exception as e:
print(f"[WARN] [{_ts()}] Não foi possível definir afinidade: {e}")
final_affinity_cores = sorted(i for i in set(P_CORE_IDS + REMAINING_CORE_IDS) if i < EFFECTIVE_LOGICAL_CPUS)
set_affinity(final_affinity_cores)
#-------------------------
# ---------------------------------------------------------------
# Logs de arranque (apenas uma vez, evita duplicação no reload do Flask)
# ---------------------------------------------------------------
if os.environ.get("WERKZEUG_RUN_MAIN") != "true" and not globals().get("_LOGS_PRINTED", False):
_LOGS_PRINTED = True
has_e = bool(_DET_E)
print(f"[INFO] [{_ts()}] Método de deteção: {_META.get('method')}")
for note in _META.get("notes", []):
print(f"[INFO] [{_ts()}] Nota: {note}")
print(f"[INFO] [{_ts()}] Detetados (filtrados) → P: {len(_DET_P)} | E: {len(_DET_E)}")
# Tensores
P_from_P = [i for i in P_CORE_IDS if i in _DET_P]
P_from_E = [i for i in P_CORE_IDS if i in _DET_E]
if has_e:
e_str = f", +E: {len(P_from_E)} {sorted(P_from_E)}" if P_from_E else ""
print(f"[INFO] [{_ts()}] Tensores (OMP={OMP_THREADS}) → P: {len(P_from_P)} {sorted(P_from_P)}{e_str}")
else:
print(f"[INFO] [{_ts()}] Tensores (OMP={OMP_THREADS}) [homogéneo] → {len(P_CORE_IDS)} {sorted(P_CORE_IDS)}")
# DataLoader
SEC_E = [i for i in REMAINING_CORE_IDS if i in _DET_E]
SEC_P = [i for i in REMAINING_CORE_IDS if i in _DET_P]
SEC_OTHER = [i for i in REMAINING_CORE_IDS if i not in _DET_P and i not in _DET_E]
if has_e:
if SEC_E: print(f"[INFO] [{_ts()}] Secundário/DataLoader → E: {len(SEC_E)} {sorted(SEC_E)}")
if SEC_P: print(f"[INFO] [{_ts()}] Secundário/DataLoader → P: {len(SEC_P)} {sorted(SEC_P)}")
if SEC_OTHER: print(f"[INFO] [{_ts()}] Secundário/DataLoader → Outros: {len(SEC_OTHER)} {sorted(SEC_OTHER)}")
else:
print(f"[INFO] [{_ts()}] Cores restantes (homogéneo) → {len(REMAINING_CORE_IDS)} {sorted(REMAINING_CORE_IDS)}")
print("-" * 80)
print(f"[INFO] [{_ts()}] LOGICAL_CPUS: {LOGICAL_CPUS} | PHYSICAL_CPUS: {PHYSICAL_CPUS} | TOTAL_RAM_GB: {TOTAL_RAM_GB:.2f}")
print(f"[INFO] [{_ts()}] Cores Utilization: {CORES_UTILIZATION:.2%} → efetivos: {EFFECTIVE_LOGICAL_CPUS}")
print(f"[INFO] [{_ts()}] OMP Threads Utilization: {OMP_THREADS_UTILIZATION:.2%}")
print(f"[INFO] [{_ts()}] OMP Threads: {OMP_THREADS} | Dataloader Workers: {DATALOADER_WORKERS}")
if DATALOADER_WORKERS == 0:
print(f"[INFO] [{_ts()}] Sem cores sobrantes para DataLoader (todos {EFFECTIVE_LOGICAL_CPUS} alocados a OMP).")
total_allocated = len(set(P_CORE_IDS + REMAINING_CORE_IDS))
print(f"[INFO] [{_ts()}] Total alocados (OMP + DataLoader): {total_allocated} (de {EFFECTIVE_LOGICAL_CPUS} efetivos)")
print("-" * 80)
print(f"[INFO] [{_ts()}] Servidor pronto.\n")
print("-" * 80)
#-----------------------------------------------------------------------------------------------------------------------------
# =========================================================================
# 1.3 Parâmetros base do treino
# =========================================================================
BASE_BATCH_SIZE = 4
INITIAL_ACCUMULATION_MIN_STEPS = 4
INITIAL_ACCUMULATION_MAX_STEPS = 6
BASE_EVAL_SIZE = 10
BASE_LEARNING_RATE = 1e-4
LR_SCHEDULER_TYPE = "constant_with_warmup"
WARMUP_RATIO = 0.03
LOGGIN_STEPS = 10
SAVE_STRATEGY = "steps"
SAVE_STEPS = 500
EVAL_STEPS = 10
WEIGHT_DECAY = 0.1
OPTIM = "adamw_torch"
# =========================================================================
# 1.4 – 1.6 RAM e MAX_LEN
# =========================================================================
TARGET_RAM_UTILIZATION = 0.95 # Limite máximo de uso da RAM total
# Alias mantido por compatibilidade com ajustar_accumulation_steps() e outros usos
TARGET_ACCUMULATION_RAM_UTILIZATION = TARGET_RAM_UTILIZATION
# Custo empírico de RAM por batch (ajustar conforme modelo)
ESTIMATED_BATCH_GB = 0.35 # 0.4 para LLaMA
BASE_MAX_LEN = 256
TARGET_MAX_LEN_UTILIZATION = 0.75
MAX_LEN_INCREMENT = 256
MAX_LEN_CAP = 256
DEFAULT_FALLBACK_CAP = 8192
TOKENIZER_SENTINEL_CAP = 10_000_000
# Custo empírico de RAM para o dataset (ajustar conforme modelo)
ESTIMATED_BASE_DATASET_RAM_GB = 1.50
COST_PER_INCREMENT_GB = 0.45 # 0.45 para LLaMA
# =========================================================================
# 1.7 DynamicAccumulationCallback
# =========================================================================
DYNAMIC_ACCUMULATION_MAX_STEPS = 256
DYNAMIC_ACCUMULATION_TARGET_UTIL = 0.99 # mantido para referência futura
DYNAMIC_ACCUMULATION_HIGH_RAM_LIMIT = 98.5
DYNAMIC_ACCUMULATION_LOW_RAM_LIMIT = 45.0
# =========================================================================
# 1.8 Ganchos opcionais para PyTorch
# =========================================================================
from contextlib import suppress
with suppress(ImportError):
import torch
if hasattr(torch, "set_num_threads"):
torch.set_num_threads(max(1, OMP_THREADS))
if hasattr(torch, "set_num_interop_threads"):
# Interop baixo ajuda a estabilidade — normalmente 1 ou 2 é suficiente
torch.set_num_interop_threads(max(1, min(2, DATALOADER_WORKERS)))
# =========================================================================
# 1.9 Outros parâmetros
# =========================================================================
OPTIMIZER_REFRESH_INTERVAL = 600 # segundos
# =========================================================================
# 2️.Configuração Inicial e Cálculos
# =========================================================================
# --- Funções de Ajuste Dinâmico ---
def ajustar_accumulation_steps(
base_batch: int = BASE_BATCH_SIZE,
target_utilization: float = TARGET_ACCUMULATION_RAM_UTILIZATION,
estimated_batch_gb: float = ESTIMATED_BATCH_GB,
min_steps: int = INITIAL_ACCUMULATION_MIN_STEPS,
max_steps: int = INITIAL_ACCUMULATION_MAX_STEPS
) -> int:
"""
Ajusta gradient_accumulation_steps com base na RAM disponível (somente no arranque).
Respeita limites próprios e separados da fase dinâmica.
"""
mem = psutil.virtual_memory()
free_gb = mem.available / (1024 ** 3)
available_ram_for_batch_gb = TOTAL_RAM_GB * target_utilization
max_batches_fit = int(available_ram_for_batch_gb / estimated_batch_gb)
steps = max(min_steps, min(max_batches_fit // base_batch, max_steps))
logging.info(f"Memória usada: {mem.percent:.1f}% ({free_gb:.1f} GB livres)")
logging.info(f"Ajustando gradient_accumulation_steps (inicial) → {steps}")
return steps
#----------------------------------------------------------
def ajustar_max_len(
tokenizer_model_max_length: int,
target_utilization: float = TARGET_MAX_LEN_UTILIZATION,
base_max_len: int = BASE_MAX_LEN,
increment: int = MAX_LEN_INCREMENT,
max_cap: int = MAX_LEN_CAP,
estimated_base_dataset_ram_gb: float = ESTIMATED_BASE_DATASET_RAM_GB,
cost_per_increment_gb: float = COST_PER_INCREMENT_GB
) -> int:
"""
Ajusta automaticamente MAX_LEN conforme RAM disponível e limite do modelo.
"""
# RAM que pode ser usada para o dataset tokenizado
target_dataset_ram_gb = TOTAL_RAM_GB * target_utilization
current_max_len = base_max_len
if target_dataset_ram_gb < estimated_base_dataset_ram_gb:
log_warning(f"RAM disponível ({target_dataset_ram_gb:.1f} GB) é menor que a RAM estimada para MAX_LEN base ({estimated_base_dataset_ram_gb:.1f} GB). Usando MAX_LEN={base_max_len}.")
return min(base_max_len, tokenizer_model_max_length)
# Quantidade de RAM que podemos usar para expandir o MAX_LEN além do custo base
expandable_ram_gb = target_dataset_ram_gb - estimated_base_dataset_ram_gb
if expandable_ram_gb > 0:
num_increments = int(expandable_ram_gb / cost_per_increment_gb)
current_max_len += num_increments * increment
# Limitar pelo MAX_LEN_CAP e pelo limite do modelo
final_max_len = min(current_max_len, max_cap, tokenizer_model_max_length)
# logging.info(f"RAM total: {TOTAL_RAM_GB:.1f} GB. RAM alvo para dataset: {target_dataset_ram_gb:.1f} GB.")
# logging.info(f"Ajustando MAX_LEN dinamicamente -> {final_max_len} (limite do modelo: {tokenizer_model_max_length})")
logging.info(
f"RAM total: {TOTAL_RAM_GB:.1f} GB. RAM alvo para dataset: {target_dataset_ram_gb:.1f} GB.")
logging.info(
f"Ajustando MAX_LEN dinamicamente -> {final_max_len} "
f"(limite efetivo do modelo/tokenizer: {tokenizer_model_max_length})"
)
return final_max_len
# ==============================
# 3.Optimizer Refresh Callback
# ==============================
class OptimizerRefreshCallback(TrainerCallback):
def __init__(self, refresh_interval_sec: int = OPTIMIZER_REFRESH_INTERVAL):
self.refresh_interval = refresh_interval_sec
self.last_refresh_time = time.time()
def on_step_end(self, args, state, control, **kwargs):
trainer = kwargs.get("trainer", None)
if trainer is None:
return control
now = time.time()
elapsed = now - self.last_refresh_time
if elapsed >= self.refresh_interval:
logging.info(f"🔄 Refresh do otimizador após {elapsed:.1f}s (passo {state.global_step})")
# Faz o refresh
if hasattr(trainer, "scaler") and trainer.scaler is not None:
trainer.scaler.step(trainer.optimizer)
trainer.scaler.update()
else:
trainer.optimizer.step()
trainer.optimizer.zero_grad()
self.last_refresh_time = now
return control
# ========================================================
# 4.CALLBACK DINÂMICO PARA AJUSTE DE GRADIENT ACCUMULATION
# ========================================================
class DynamicAccumulationCallback(TrainerCallback):
def __init__(
self,
base_batch: int = BASE_BATCH_SIZE,
max_steps: int = DYNAMIC_ACCUMULATION_MAX_STEPS,
high_ram_limit: int = DYNAMIC_ACCUMULATION_HIGH_RAM_LIMIT,
low_ram_limit: int = DYNAMIC_ACCUMULATION_LOW_RAM_LIMIT,
step_interval: int = 5, # nº de steps entre verificações
time_interval: int = 240 # intervalo mínimo em segundos (10 min)
):
self.base_batch = base_batch
self.max_steps = max_steps
self.high_ram_limit = high_ram_limit
self.low_ram_limit = low_ram_limit
self.step_interval = step_interval
self.time_interval = time_interval
# controle interno
self.last_check_step = 0
self.last_check_time = time.time()
def on_step_end(self, args, state, control, **kwargs):
global CURRENT_ACCUM_STEPS
current_step = state.global_step
now = time.time()
# --- 1️⃣ Verifica se há override manual ativo ---
if CURRENT_ACCUM_STEPS is not None:
adjusted_steps = max(1, min(self.max_steps, CURRENT_ACCUM_STEPS))
args.gradient_accumulation_steps = adjusted_steps
effective_batch = adjusted_steps * self.base_batch
train_progress["current_accum_steps"] = adjusted_steps
logging.info(
f"🧭 Ajuste manual aplicado → gradient_accumulation_steps = {adjusted_steps} "
f"(Effective Batch: {effective_batch})"
)
CURRENT_ACCUM_STEPS = None
return control
# --- 2️⃣ Verifica se já passou o intervalo mínimo ---
step_diff = current_step - self.last_check_step
time_diff = now - self.last_check_time
if step_diff < self.step_interval and time_diff < self.time_interval:
# ainda não é hora de checar novamente
return control
# Atualiza controle de tempo e step
self.last_check_step = current_step
self.last_check_time = now
# --- 3️⃣ Ajuste automático baseado na RAM ---
mem = psutil.virtual_memory()
used = mem.percent
current_steps = args.gradient_accumulation_steps
# RAM alta → reduzir steps
if used > self.high_ram_limit and current_steps > 1:
new_steps = max(1, math.floor(current_steps / 1.6))
args.gradient_accumulation_steps = new_steps
new_effective_batch = new_steps * self.base_batch # ✅ novo valor após ajuste
train_progress["current_accum_steps"] = new_steps
logging.warning(
f"⚠️ RAM alta ({used:.1f}%) → diminuindo accumulation_steps "
f"de {current_steps} para {new_steps} "
f"(Effective Batch: {new_effective_batch})"
)
# RAM ociosa → aumentar steps
elif used < self.low_ram_limit and current_steps < self.max_steps:
new_steps = current_steps + 1
args.gradient_accumulation_steps = new_steps
new_effective_batch = new_steps * self.base_batch # ✅ recalculado após aumento
train_progress["current_accum_steps"] = new_steps
logging.info(
f"💡 RAM ociosa ({used:.1f}%) → aumentando accumulation_steps "
f"de {current_steps} para {new_steps} "
f"(Effective Batch: {new_effective_batch})"
)
return control
# ================================================================
# Sistema de Logs Coloridos
# ================================================================
class Colors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
def _timestamp():
return datetime.now().strftime("%H:%M:%S")
def log_info(msg):
ts = _timestamp()
print(f"{Colors.OKCYAN}[INFO]{Colors.ENDC} [{ts}] {msg}")
training_logs.append(f"[INFO] [{ts}] {msg}")
def log_success(msg):
ts = _timestamp()
print(f"{Colors.OKGREEN}[SUCESSO]{Colors.ENDC} [{ts}] {msg}")
training_logs.append(f"[SUCESSO] [{ts}] {msg}")
def log_warning(msg):
ts = _timestamp()
print(f"{Colors.WARNING}[AVISO]{Colors.ENDC} [{ts}] {msg}")
training_logs.append(f"[AVISO] [{ts}] {msg}")
def log_error(msg):
ts = _timestamp()
print(f"{Colors.FAIL}[ERRO]{Colors.ENDC} [{ts}] {msg}")
training_logs.append(f"[ERRO] [{ts}] {msg}")
def log_step(msg, step=None, percent=None):
ts = _timestamp()
progress = f" | Passo {step}" if step is not None else ""
pct = f" ({percent:.1f}%)" if percent is not None else ""
print(f"{Colors.OKBLUE}[{ts}]{Colors.ENDC} {msg}{progress}{pct}")
training_logs.append(f"[{ts}] {msg}{progress}{pct}")
# ================================================================
# Estado partilhado entre callbacks
# (anteriormente espalhado em múltiplas globais)
# ================================================================
_step_state = {
"last_step_time": time.time(), # usado por SimpleStepTimerCallback
"last_log_time": time.time(), # usado por LogCallback
"last_logged_step": -1, # usado por LogCallback (evita log duplicado)
}
# ================================================================
# 1. SimpleStepTimerCallback
# Loga o tempo de cada passo individual em segundos.
# ================================================================
class SimpleStepTimerCallback(TrainerCallback):
def on_step_begin(self, args, state, control, **kwargs):
if state.is_local_process_zero:
_step_state["last_step_time"] = time.time()
def on_step_end(self, args, state, control, **kwargs):
if not state.is_local_process_zero or state.global_step == 0:
return
elapsed = time.time() - _step_state["last_step_time"]
msg = (
f"PASSO: - - | {state.global_step}/{state.max_steps} |"
f" Tempo: | {elapsed:.3f}s |"
)
logging.info(f">> {msg}")
log_info(msg) # regista em training_logs com timestamp
# ================================================================
# 2. LogCallback
# Loga métricas de treino/avaliação e actualiza train_progress.
# ================================================================
class LogCallback(TrainerCallback):
def on_log(self, args, state, control, logs=None, **kwargs):
global train_progress
if not state.is_local_process_zero or not logs:
return
step = state.global_step
total_steps = state.max_steps
# Evita repetir o log para o mesmo step
if step == _step_state["last_logged_step"]:
return
_step_state["last_logged_step"] = step
now = time.time()
elapsed = now - _step_state["last_log_time"]
_step_state["last_log_time"] = now
loss = logs.get("loss")
eval_loss = logs.get("eval_loss")
lr = logs.get("learning_rate")
accum = train_progress.get("current_accum_steps",
args.gradient_accumulation_steps)
mem = psutil.virtual_memory()
if loss is not None and total_steps > 0:
percent = step / total_steps * 100
msg = (
f"Treino: step {step}/{total_steps} | Perda: {loss:.4f} | "
f"Tempo/step: {elapsed:.2f}s | Memória usada: {mem.percent:.1f}% | "
f"LR: {lr:.2e} | Accum: {accum} |"
)
log_step(msg, step=step, percent=percent)
train_progress.update({
"current": step,
"total": total_steps,
"percent": int(percent),
"status": "training",
"message": msg,
})
elif eval_loss is not None:
msg = f"Avaliação: step {step}/{total_steps} | Eval Loss: {eval_loss:.4f}"
try:
perplexity = math.exp(eval_loss) if eval_loss < 20 else float("inf")
msg += f" | Perplexidade: {perplexity:.2f}"
except Exception:
pass
log_info(msg)
train_progress["message"] = msg
def on_epoch_end(self, args, state, control, **kwargs):
global epoch_losses
if not state.is_local_process_zero:
return
epoch = int(state.epoch)
# Último log desta época com loss de treino (sem eval_loss)
train_log = next(
(l for l in reversed(state.log_history)
if "loss" in l and "epoch" in l
and int(l["epoch"]) == epoch
and "eval_loss" not in l),
None
)
# Último log desta época com eval_loss
eval_log = next(
(l for l in reversed(state.log_history)
if "eval_loss" in l and "epoch" in l
and int(l["epoch"]) == epoch),
None
)
parts = []
if train_log:
loss = train_log["loss"]
parts.append(f"Perda Média de Treino: {loss:.4f}")
epoch_losses.append({"epoch": epoch, "loss": loss})
if eval_log:
eval_loss = eval_log["eval_loss"]
parts.append(f"Perda de Avaliação: {eval_loss:.4f}")
suffix = " | ".join(parts) if parts else ""
log_success(f"Fim da Época {epoch}" + (f" | {suffix}" if suffix else ""))
# ================================================================
# 3. DynamicAccumulationCallback
# Ajuste dinâmico de gradient_accumulation_steps via RAM.
# ================================================================
class DynamicAccumulationCallback(TrainerCallback):
def __init__(
self,
base_batch: int = BASE_BATCH_SIZE,
max_steps: int = DYNAMIC_ACCUMULATION_MAX_STEPS,
high_ram_limit: int = DYNAMIC_ACCUMULATION_HIGH_RAM_LIMIT,
low_ram_limit: int = DYNAMIC_ACCUMULATION_LOW_RAM_LIMIT,
step_interval: int = 5, # nº de steps entre verificações automáticas
time_interval: int = 240, # intervalo mínimo em segundos
):
self.base_batch = base_batch
self.max_steps = max_steps
self.high_ram_limit = high_ram_limit
self.low_ram_limit = low_ram_limit
self.step_interval = step_interval
self.time_interval = time_interval
self._last_check_step = 0
self._last_check_time = time.time()
def on_step_end(self, args, state, control, **kwargs):
global CURRENT_ACCUM_STEPS
# 1. Override manual activo → aplica e limpa
if CURRENT_ACCUM_STEPS is not None:
new = max(1, min(self.max_steps, CURRENT_ACCUM_STEPS))
args.gradient_accumulation_steps = new
train_progress["current_accum_steps"] = new
logging.info(
f"🧭 Ajuste manual aplicado → gradient_accumulation_steps = {new} "
f"(Effective Batch: {new * self.base_batch})"
)
CURRENT_ACCUM_STEPS = None
return control
# 2. Ainda não é hora de verificar?
step_diff = state.global_step - self._last_check_step
time_diff = time.time() - self._last_check_time
if step_diff < self.step_interval and time_diff < self.time_interval:
return control
self._last_check_step = state.global_step
self._last_check_time = time.time()
# 3. Ajuste automático baseado na RAM
used = psutil.virtual_memory().percent
current = args.gradient_accumulation_steps
if used > self.high_ram_limit and current > 1:
new = max(1, math.floor(current / 1.6))
args.gradient_accumulation_steps = new
train_progress["current_accum_steps"] = new
logging.warning(
f"⚠️ RAM alta ({used:.1f}%) → diminuindo accumulation_steps "
f"de {current} para {new} "
f"(Effective Batch: {new * self.base_batch})"
)
elif used < self.low_ram_limit and current < self.max_steps:
new = current + 1
args.gradient_accumulation_steps = new
train_progress["current_accum_steps"] = new
logging.info(
f"💡 RAM ociosa ({used:.1f}%) → aumentando accumulation_steps "
f"de {current} para {new} "
f"(Effective Batch: {new * self.base_batch})"
)
return control
# =========================================================================
# Variáveis Globais
# =========================================================================
model = None
tokenizer = None
chat_model = None
chat_tokenizer = None
gguf_logs = []
gguf_status = {"status": "idle", "message": ""}
train_progress = {
"current": 0,
"total": 1,
"percent": 0,
"status": "not started",
"message": "Aguardando início do treino.",
}
training_logs = [] # linhas de log acumuladas durante o treino
epoch_losses = []
all_data = [] # dados do último treino (permite continuar)
dataset_logs = []
dataset_status = {"status": "idle", "records": 0, "message": ""}
CURRENT_ACCUM_STEPS = None # None = sem override manual activo
LAST_ACCUM_ORIGIN = None # "manual" | "auto" | None
TOTAL_TRAIN_STEPS = None # preenchido após tokenização/Trainer
BASE_BATCH_SIZE_EFFECTIVE = 4 # usado em /api/train_status para Effective Batch
# =========================================================================
# Cálculos iniciais de acumulação
# =========================================================================
ACCUMULATION_STEPS = ajustar_accumulation_steps()
EFFECTIVE_BATCH_SIZE = BASE_BATCH_SIZE * ACCUMULATION_STEPS
LAST_ACCUM_ORIGIN = "auto"
BASE_BATCH_SIZE_EFFECTIVE = EFFECTIVE_BATCH_SIZE
# Log após o cálculo — agora ACCUMULATION_STEPS tem valor real
training_logs.append(f"[INFO] accumulation inicial = {ACCUMULATION_STEPS} (auto)")
print(f"torch.get_num_threads(): {torch.get_num_threads()}")
print(f"torch.get_num_interop_threads(): {torch.get_num_interop_threads()}")
#----------------------------
# Função principal de treino
#----------------------------
def train_model_lora(
file_data, epochs, model_path, output_dir, accumulation_steps,
dataloader_workers, mode, lora_adapter_to_load=None,
initial_epochs_completed=0, train_mode="new_train"
):
"""
Função principal de treino LoRA (Low-Rank Adaptation).
CPU-only: XPU/CUDA removidos por decisão de arquitectura.
"""
# ---------------- VARIÁVEIS GLOBAIS ----------------
global training_logs, train_progress, epoch_losses, all_data
training_logs.clear()
all_data = file_data
# ---------------- ESTADO INICIAL ----------------
initial_status_message = (
"Retomando treino a partir de adaptador LoRA salvo..."
if lora_adapter_to_load else
"Iniciando processo de treino..."
)
train_progress.update({
"current": 0,
"total": 1,
"percent": 0,
"status": "starting",
"message": initial_status_message,
})
log_info(initial_status_message)
log_info(f"OMP Threads: {OMP_THREADS}, Dataloader Workers: {DATALOADER_WORKERS}")
log_info(f"Batch base: {BASE_BATCH_SIZE}, Steps iniciais: {ACCUMULATION_STEPS}")
log_info(f"Batch efetivo inicial: {EFFECTIVE_BATCH_SIZE}")
# ---------------- AUXILIAR: detetar camadas LoRA ----------------
def guess_lora_targets(model):
names = {name for name, _ in model.named_modules()}
if any("q_proj" in n and "v_proj" in n for n in names):
return ["q_proj", "v_proj"]
if any("query_key_value" in n for n in names):
return ["query_key_value"]
if any("Wqkv" in n for n in names):
return ["Wqkv"]
if any("c_attn" in n for n in names):
return ["c_attn"]
log_warning("Não foi possível detetar os target_modules. Usando ['q_proj', 'v_proj'] por defeito.")
return ["q_proj", "v_proj"]
# ---------------- BLOCO 1: Carregar Modelo e Tokenizer ----------------
try:
log_step("A carregar modelo e tokenizer base...")
os.makedirs("offload", exist_ok=True)
global tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=True)
log_info(
f"Tokens antes do ajuste: PAD={tokenizer.pad_token}, "
f"EOS={tokenizer.eos_token}, BOS={tokenizer.bos_token}"
)
if tokenizer.pad_token is None:
tokenizer.add_special_tokens({'pad_token': '<PAD>'})
if tokenizer.eos_token is None:
tokenizer.add_special_tokens({'eos_token': '</s>'})
if tokenizer.bos_token is None:
tokenizer.add_special_tokens({'bos_token': '<s>'})
if tokenizer.unk_token is None:
tokenizer.add_special_tokens({'unk_token': '<unk>'})
log_info(
f"Tokens após ajuste: PAD={tokenizer.pad_token}, "
f"EOS={tokenizer.eos_token}, BOS={tokenizer.bos_token}, "
f"UNK={tokenizer.unk_token}"
)
# CPU-only: bf16 é ~2× mais rápido que float32 em CPU moderno
model = AutoModelForCausalLM.from_pretrained(
model_path,
device_map="cpu",
offload_folder="offload",
dtype=torch.bfloat16,
low_cpu_mem_usage=True,
)
# model.resize_token_embeddings(len(tokenizer))
model.resize_token_embeddings(len(tokenizer), mean_resizing=False)
model.config.use_cache = False
model.gradient_checkpointing_enable()
log_info("Modelo e tokenizer carregados (CPU / bfloat16).")
# --- Variáveis de controlo de fluxo (usadas mais abaixo) ---
resume_from_trainer_checkpoint = None
lora_adapter_path_final = os.path.join(output_dir, "lora_model")
lora_adapter_to_load_for_training = None
lora_model_loaded_for_initial_eval = False
#----------------------------------------------------------------------------------
# --- Lógica de 4 opções ---
# Prioridade:
#1. Checkpoint do Trainer
#2. LoRA Final Salvo
#3. LoRA passado como parâmetro (para continuar)
#4. Novo LoRA
# 1. Tentar encontrar o checkpoint mais recente do Trainer
checkpoint_dirs = [d for d in os.listdir(output_dir) if d.startswith("checkpoint-")]
if checkpoint_dirs:
checkpoint_dirs.sort(key=lambda x: int(x.split('-')[1]), reverse=True)
latest_candidate_checkpoint_dir = os.path.join(output_dir, checkpoint_dirs[0])
trainer_state_exists = os.path.exists(os.path.join(latest_candidate_checkpoint_dir, "trainer_state.json"))
lora_adapter_exists_in_checkpoint = (
os.path.exists(os.path.join(latest_candidate_checkpoint_dir, "adapter_model.safetensors")) or
os.path.exists(os.path.join(latest_candidate_checkpoint_dir, "adapter_model.bin"))
)
if trainer_state_exists and lora_adapter_exists_in_checkpoint:
resume_from_trainer_checkpoint = latest_candidate_checkpoint_dir
lora_adapter_to_load_for_training = latest_candidate_checkpoint_dir # Usar este LoRA para o treino
log_info(f"Opção 1: Encontrado checkpoint COMPLETO do Trainer em: {resume_from_trainer_checkpoint}. O treino será retomado daqui.")
# Extrair initial_epochs_completed do trainer_state.json
trainer_state_path = os.path.join(resume_from_trainer_checkpoint, "trainer_state.json")
if os.path.exists(trainer_state_path):
with open(trainer_state_path, 'r') as f:
trainer_state = json.load(f)
initial_epochs_completed = int(trainer_state.get('epoch', 0))
log_info(f"Checkpoint do Trainer encontrado. Épocas já completadas: {initial_epochs_completed}")
else:
log_warning(f"trainer_state.json não encontrado em {resume_from_trainer_checkpoint}. Assumindo 0 épocas completadas.")
else:
log_info(f"Diretório '{latest_candidate_checkpoint_dir}' encontrado, mas NÃO é um checkpoint COMPLETO do Trainer (faltam trainer_state.json ou arquivos do adaptador LoRA).")
#------------------------------------------------------------------------------------------------------------------------------------------------
# 2. Se não encontrou checkpoint do Trainer, tentar carregar um modelo LoRA final salvo (para avaliação/decisão)
# Esta opção deve ter prioridade sobre o LoRA final salvo, se o usuário explicitamente pediu para continuar com um LoRA.
if not resume_from_trainer_checkpoint and os.path.exists(lora_adapter_path_final) and any(os.scandir(lora_adapter_path_final)):
try:
# Carrega o LoRA final para avaliação/decisão, mas não para retomar o treino do Trainer
model = PeftModel.from_pretrained(model, lora_adapter_path_final, device_map=model_device_map)
for name, param in model.named_parameters():
if "lora" in name:
param.requires_grad = True
lora_adapter_to_load_for_training = lora_adapter_to_load # Usar este LoRA para o treino
log_info(f"Opção 2: Carregado adaptador LoRA de '{lora_adapter_to_load}' para continuar o treino.")
# Se estamos continuando, precisamos saber as épocas já completadas.
# Se lora_adapter_to_load é um checkpoint, podemos tentar ler o trainer_state.json
# Caso contrário, assumimos que é um LoRA final e o treino começa "do zero" em termos de Trainer state.
if "checkpoint-" in lora_adapter_to_load: # Se o LoRA passado é um checkpoint
trainer_state_path = os.path.join(lora_adapter_to_load, "trainer_state.json")
if os.path.exists(trainer_state_path):
with open(trainer_state_path, 'r') as f:
trainer_state = json.load(f)
initial_epochs_completed = int(trainer_state.get('epoch', 0))
log_info(f"LoRA de checkpoint encontrado. Épocas já completadas: {initial_epochs_completed}")
else:
log_warning(f"trainer_state.json não encontrado em {lora_adapter_to_load}. Assumindo 0 épocas completadas para continuação.")
else:
# Se é um LoRA final, não há estado de Trainer para retomar, então initial_epochs_completed permanece 0
log_info("LoRA final carregado para continuação. Treino do Trainer começará do zero.")
except Exception as e:
log_error(f"Falha ao carregar LoRA de '{lora_adapter_to_load}' para continuar o treino: {e}. Iniciando um novo LoRA.")
lora_adapter_to_load = None
lora_adapter_to_load_for_training = None # Resetar para criar um novo LoRA
# 3. Se não há checkpoint do Trainer nem LoRA final, mas um LoRA foi passado para continuar
# Esta opção só deve ser considerada se NENHUMA das opções anteriores foi ativada.
if not resume_from_trainer_checkpoint and not lora_adapter_to_load_for_training and os.path.exists(lora_adapter_path_final) and any(os.scandir(lora_adapter_path_final)):
try:
# Carrega o LoRA final para avaliação/decisão, mas não para retomar o treino do Trainer
model = PeftModel.from_pretrained(model, lora_adapter_path_final, device_map=model_device_map)
for name, param in model.named_parameters():
if "lora" in name:
param.requires_grad = True
lora_model_loaded_for_initial_eval = True
lora_adapter_to_load_for_training = lora_adapter_path_final # Usar este LoRA para o treino
log_info(f"Opção 3: Carregado adaptador LoRA final de: {lora_adapter_path_final}. Será avaliado e aguardará decisão do usuário.")
# Se carregamos um LoRA final, não há estado de Trainer para retomar, então initial_epochs_completed permanece 0
initial_epochs_completed = 0 # Garante que é 0 se não veio de um checkpoint completo
except Exception as e:
log_warning(f"Falha ao carregar LoRA final de {lora_adapter_path_final}: {e}. Prosseguindo sem ele.")
lora_adapter_path_final = None
lora_model_loaded_for_initial_eval = False
lora_adapter_to_load_for_training = None # Resetar para criar um novo LoRA
# --- Carregar o LoRA no modelo base ANTES de inicializar o Trainer ---
# Este bloco garante que o modelo base tenha o LoRA aplicado antes do Trainer ser instanciado.
# Ele só deve ser executado se um LoRA foi identificado para ser carregado
# E se o modelo ainda não é um PeftModel (ou seja, não foi carregado nas opções 1, 2 ou 3 acima)
if lora_adapter_to_load_for_training and not isinstance(model, PeftModel):
try:
model = PeftModel.from_pretrained(model, lora_adapter_to_load_for_training, device_map=model_device_map)
for name, param in model.named_parameters():
if "lora" in name:
param.requires_grad = True
log_info(f"Aplicado adaptador LoRA '{lora_adapter_to_load_for_training}' ao modelo base.")
except Exception as e:
log_error(f"Falha crítica ao aplicar LoRA '{lora_adapter_to_load_for_training}' ao modelo base: {e}. Iniciando um novo LoRA.")
lora_adapter_to_load_for_training = None
resume_from_trainer_checkpoint = None # Não podemos retomar o Trainer se o LoRA falhou
initial_epochs_completed = 0 # Resetar se falhou
# 4. Se nenhuma das opções acima, criar um novo adaptador LoRA
if not lora_adapter_to_load_for_training: # Se nenhum LoRA foi carregado/selecionado
targets = guess_lora_targets(model)
lora_config = LoraConfig(
r=16, lora_alpha=32, target_modules=targets, lora_dropout=0.05, bias="none", task_type=TaskType.CAUSAL_LM
)
model = get_peft_model(model, lora_config)
log_info(f"Opção 1: Nenhum checkpoint ou LoRA anterior encontrado. Novo adaptador LoRA criado (targets={targets}).")
resume_from_trainer_checkpoint = None # Se é um novo LoRA, não há estado de Trainer para retomar
if hasattr(model, "print_trainable_parameters"):
model.print_trainable_parameters()
else:
log_warning("print_trainable_parameters() não disponível neste modelo base.")
#-----------------------
# Antes de calcular total_target_epochs, ajuste initial_epochs_completed
# Determine o número de épocas que o Trainer deve executar
# Este é o valor que será passado para num_train_epochs do TrainingArguments
if train_mode == "new_train":
# Para um novo treino, o Trainer deve executar 'epochs' vezes.
# initial_epochs_completed é 0, o que é handled pelo Trainer.
num_epochs_for_trainer = float(epochs)
log_info(f"Modo 'new_train' detectado. O treino será executado por {num_epochs_for_trainer} épocas.")
elif train_mode == "continue_train":
# Para continuar, o Trainer deve executar até o 'initial_epochs_completed + epochs' total.
# Se você quer fazer mais 'X' épocas, o total final será initial_epochs_completed + X.
num_epochs_for_trainer = float(initial_epochs_completed + epochs)
log_info(f"Modo 'continue_train' detectado. Treino retomará da época {initial_epochs_completed} e visa um total de {num_epochs_for_trainer} épocas.")
else: # Default para new_train
log_warning(f"Modo de treino desconhecido: {train_mode}. Assumindo 'new_train'.")
num_epochs_for_trainer = float(epochs)
#--------------------------------------------------------------------------------------------
model.train()
total_target_epochs = initial_epochs_completed + epochs
# ── Limites de contexto do modelo e tokenizer ──────────────────────
max_len_model = getattr(model.config, "max_position_embeddings", None)
if not (isinstance(max_len_model, int) and max_len_model > 0):
max_len_model = DEFAULT_FALLBACK_CAP
tok_max_len_raw = getattr(tokenizer, "model_max_length", None)
if (tok_max_len_raw is None
or not isinstance(tok_max_len_raw, int)
or tok_max_len_raw <= 0
or tok_max_len_raw > TOKENIZER_SENTINEL_CAP):
tok_max_len_raw = DEFAULT_FALLBACK_CAP
effective_model_max_len = min(max_len_model, tok_max_len_raw)
DYNAMIC_MAX_LEN = ajustar_max_len(
tokenizer_model_max_length=effective_model_max_len,
target_utilization=TARGET_MAX_LEN_UTILIZATION,
base_max_len=BASE_MAX_LEN,
increment=MAX_LEN_INCREMENT,
max_cap=MAX_LEN_CAP,
estimated_base_dataset_ram_gb=ESTIMATED_BASE_DATASET_RAM_GB,
cost_per_increment_gb=COST_PER_INCREMENT_GB,
)
logging.info(f"MAX_LEN dinâmico definido para: {DYNAMIC_MAX_LEN}")
# ── Dataset: carregar do disco ou tokenizar ────────────────────────
tokenized_path = os.path.join(output_dir, "tokenized_dataset")
#----------------------------------------------------------------------------
# Depois de tokenizar o dataset
# enviar para bucket
#----------------------------------------------------------------------------
if os.path.exists(tokenized_path):
log_info("Dataset tokenizado encontrado no disco. A carregar...")
tokenized_datasets = load_from_disk(tokenized_path)
else:
training_data = []
for item in file_data:
if "prompt" in item and "completion" in item:
prompt = item.get("prompt", "")
completion = item.get("completion", "")
if isinstance(prompt, str) and isinstance(completion, str) and prompt and completion:
training_data.append({"text": f"<s>[INST] {prompt.strip()} [/INST] {completion.strip()}</s>"})
elif "text" in item and isinstance(item["text"], str) and item["text"].strip():
training_data.append({"text": f"<s>{item['text'].strip()}</s>"})
if not training_data:
log_error("Nenhum dado válido encontrado para treino após formatação.")
train_progress.update({"status": "error", "message": "Nenhum dado válido para treino."})
return
def tokenize_function(examples):
return tokenizer(examples["text"], truncation=True, max_length=DYNAMIC_MAX_LEN, padding=False)
tokenized_datasets = Dataset.from_list(training_data).map(
tokenize_function,
batched=True,
num_proc=DATALOADER_WORKERS,
remove_columns=["text"],
)
tokenized_datasets.save_to_disk(tokenized_path)
log_success(f"Dataset tokenizado salvo em {tokenized_path} com MAX_LEN={DYNAMIC_MAX_LEN}")
split_dataset = tokenized_datasets.train_test_split(test_size=0.1, seed=42)
train_dataset = split_dataset["train"]
eval_dataset = split_dataset["test"]
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)
# ── TrainingArguments ──────────────────────────────────────────────
# ATENÇÃO: num_epochs_for_trainer deve ser definido antes desta função.
# Se não existir, usa `epochs` do argumento como fallback seguro.
_epochs_for_trainer = num_epochs_for_trainer if 'num_epochs_for_trainer' in dir() else epochs
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=_epochs_for_trainer,
per_device_train_batch_size=BASE_BATCH_SIZE,
per_device_eval_batch_size=BASE_EVAL_SIZE,
gradient_accumulation_steps=ACCUMULATION_STEPS,
learning_rate=BASE_LEARNING_RATE,
warmup_ratio=WARMUP_RATIO,
lr_scheduler_type=LR_SCHEDULER_TYPE,
weight_decay=WEIGHT_DECAY,
logging_steps=LOGGIN_STEPS,
save_strategy=SAVE_STRATEGY,
save_steps=SAVE_STEPS,
eval_steps=EVAL_STEPS,
optim=OPTIM,
dataloader_num_workers=DATALOADER_WORKERS,
bf16=False,
max_grad_norm=1.0,
report_to="none",
save_total_limit=0,
seed=42,
)
# ── Trainer ────────────────────────────────────────────────────────
# NOTA: DynamicAccumulationCallback(**dynacc_kwargs) — instância com ()
# O bloco inspect.signature foi removido: os parâmetros do callback são
# conhecidos e fixos; passar kwargs directamente é mais claro e seguro.
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
data_collator=data_collator,
callbacks=[
LogCallback(),
DynamicAccumulationCallback(
base_batch=BASE_BATCH_SIZE,
max_steps=DYNAMIC_ACCUMULATION_MAX_STEPS,
),
SimpleStepTimerCallback(),
OptimizerRefreshCallback(refresh_interval_sec=OPTIMIZER_REFRESH_INTERVAL),
],
)
# --- ADD: resumo final com steps e accumulation/EB
try:
global TOTAL_TRAIN_STEPS
# tentar ler steps efetivos do Trainer ao final
steps_final = None
if hasattr(trainer, "state") and getattr(trainer.state, "global_step", None) is not None:
steps_final = trainer.state.global_step
elif hasattr(trainer, "state") and getattr(trainer.state, "max_steps", None):
steps_final = trainer.state.max_steps
if steps_final is not None and isinstance(steps_final, int) and steps_final > 0:
TOTAL_TRAIN_STEPS = steps_final # ajusta se necessário
origem = LAST_ACCUM_ORIGIN or "desconhecido"
base_bsz = BASE_BATCH_SIZE_EFFECTIVE if 'BASE_BATCH_SIZE_EFFECTIVE' in globals() and BASE_BATCH_SIZE_EFFECTIVE else BASE_BATCH_SIZE
eb = (base_bsz * CURRENT_ACCUM_STEPS) if CURRENT_ACCUM_STEPS else None
training_logs.append("[SUCESSO] Treino concluído (resumo):")
training_logs.append(f" - total_train_steps = {TOTAL_TRAIN_STEPS}")
if CURRENT_ACCUM_STEPS is not None:
training_logs.append(f" - gradient_accumulation_steps final = {CURRENT_ACCUM_STEPS} ({origem})")
if eb is not None:
training_logs.append(f" - Effective Batch final (base={base_bsz}) = {eb}")
except Exception as _e:
training_logs.append(f"[AVISO] não foi possível gerar resumo final detalhado ({_e})")
# --- END ADD
# Estimar total de steps antes do treino
try:
steps_estimados = trainer.args.max_steps
if steps_estimados <= 0:
steps_estimados = len(split_dataset["train"]) // (BASE_BATCH_SIZE * ACCUMULATION_STEPS) * int(epochs)
training_logs.append(f"[INFO] [{datetime.now().strftime('%H:%M:%S')}] A iniciar iteração do treino (0/{steps_estimados} passos estimados)...")
except Exception:
training_logs.append(f"[INFO] [{datetime.now().strftime('%H:%M:%S')}] A iniciar iteração do treino...")
# ── Log de acumulação pré-treino (se override manual activo) ───────
if CURRENT_ACCUM_STEPS is not None:
origem = LAST_ACCUM_ORIGIN or "desconhecido"
training_logs.append(
f"[INFO] gradient_accumulation_steps atual (pré-train): {CURRENT_ACCUM_STEPS} ({origem})"
)
eb = BASE_BATCH_SIZE_EFFECTIVE * CURRENT_ACCUM_STEPS
training_logs.append(f"[INFO] Effective Batch inicial (base={BASE_BATCH_SIZE_EFFECTIVE}) = {eb}")
#------------------------------------------------------------------------------------------------------------------
#INICIO DO TREINO E SALVAMENTO
#-----------------------------------------------------------------------------
try:
log_success("Configuração do Trainer concluída. A iniciar o treino...")
train_progress.update({
"status": "training",
"message": "Iniciando o treinamento do modelo..."
})
trainer_output = trainer.train(resume_from_checkpoint=resume_from_trainer_checkpoint)
# -------------------------------
# Salvar adapter LoRA localmente
# -------------------------------
lora_model_path = os.path.join(output_dir, "lora_model")
os.makedirs(lora_model_path, exist_ok=True)
model.save_pretrained(lora_model_path)
tokenizer.save_pretrained(lora_model_path)
training_logs.append("Adapter LoRA salvo com sucesso!")
# ================================================
# ENVIAR LoRA PARA BUCKET
# ================================================
# ---------------------------------
# Avaliação final do modelo
# ---------------------------------
log_info("A avaliar o modelo final no dataset de validação...")
final_metrics = trainer.evaluate()
eval_loss = final_metrics.get("eval_loss")
if eval_loss is not None:
perplexity = math.exp(eval_loss)
log_success("Avaliação Final Concluída:")
log_info(f" -> Eval Loss: {eval_loss:.4f}")
log_info(f" -> Perplexity: {perplexity:.4f}")
with open(os.path.join(output_dir, "final_metrics.txt"), "w") as f:
f.write(f"Eval Loss: {eval_loss}\nPerplexity: {perplexity}\n")
else:
log_warning("Não foi possível obter 'eval_loss' das métricas finais.")
train_progress.update({
"status": "awaiting_merge",
"percent": 100,
"message": "Treino concluído. LoRA salvo. Decida a próxima ação."
})
training_logs.append("Treino concluído. Adapter LoRA salvo em " + lora_model_path)
log_success("Treino concluído. Adapter LoRA salvo. Aguardando decisão (merge ou continuar).")
except RuntimeError as e:
msg = str(e)
training_logs.append(f"Erro durante treino: {msg}")
train_progress["status"] = "error"
log_error(f"Erro de Runtime durante o treino: {e}")
traceback.print_exc()
except Exception as e:
log_error(f"Erro crítico durante o treino: {e}")
train_progress.update({"status": "error", "message": f"Erro crítico no treino: {e}"})
traceback.print_exc()
return
#----------------------------------------------------
TENSORBOARD_LOGDIR = "trained_model_output\logs"
TENSORBOARD_PORT = 6006
def run_tensorboard():
# Lança o TensorBoard em thread separada para não bloquear
subprocess.Popen([
"tensorboard",
f"--logdir={TENSORBOARD_LOGDIR}",
f"--port={TENSORBOARD_PORT}"
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
#-----------------------------------------------------------
#----------------------
# Rotas FLASK
#----------------------
@app.route('/')
def home():
""" Rota principal que serve o ficheiro index.html. """
return render_template('index-6.html')
#---------------------------------------
def guess_lora_targets(model):
"""
Função auxiliar para identificar automaticamente os módulos alvo para LoRA.
Analisa camadas lineares no modelo e retorna uma lista de nomes de camadas candidatas.
"""
targets = []
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear) and module.weight.requires_grad:
targets.append(name)
if not targets:
targets = ["q_proj", "v_proj"] # fallback padrão usado por PEFT
return targets
#-------------------------------------
# ROTA DE TREINAMENTO
#-------------------------------------
@app.route('/api/train', methods=['POST'])
def handle_train_request():
global all_data
try:
model_path = request.form.get('model_path')
epochs = int(request.form.get('epochs'))
uploaded_files = request.files.getlist('files')
logging.info(f"🚀 Novo treino iniciado")
logging.info(f"Modelo: {model_path}")
logging.info(f"Épocas: {epochs}")
if not all([model_path, epochs, uploaded_files]):
return jsonify({"status": "error", "message": "Faltam parâmetros: modelo, épocas ou ficheiros."}), 400
# Limpa os dados de treinos anteriores
all_data = []
for file in uploaded_files:
try:
raw_content = file.read()
encoding = chardet.detect(raw_content)['encoding'] or 'utf-8'
content = raw_content.decode(encoding)
for line in content.splitlines():
if not line.strip(): continue
if file.filename.endswith('.jsonl'):
all_data.append(json.loads(line))
else: # Assume .txt ou outros formatos de texto
all_data.append({"text": line.strip()})
except Exception as e:
return jsonify({"status": "error", "message": f"Erro ao ler ficheiro {file.filename}: {e}"}), 400
if not all_data:
return jsonify({"status": "error", "message": "Nenhum dado válido encontrado nos ficheiros."}), 400
output_dir = FINAL_OUTPUT_DIR
os.makedirs(output_dir, exist_ok=True)
training_logs.append(f"[DEBUG] CWD: {os.getcwd()}")
training_logs.append(f"[DEBUG] OUTPUT_DIR: {output_dir}")
training_logs.append(f"[DEBUG] FILES: {os.listdir(os.getcwd())}")
# Passar um flag indicando que é um "novo" treino (ou re-treino)
thread = Thread(target=train_model_lora, args=(all_data, epochs, model_path, output_dir, ACCUMULATION_STEPS, DATALOADER_WORKERS, "new_train"))
thread.start()
return jsonify({"status": "started", "message": "Requisição de treino recebida. O processo foi iniciado."})
except Exception as e:
return jsonify({"status": "error", "message": f"Erro no servidor: {e}"}), 500
#-------------------------------------------
@app.route("/api/dataset/process", methods=["POST"])
def dataset_process():
global dataset_logs, dataset_status
file = request.files.get("file")
if not file or file.filename == "":
return jsonify({"error": "Nenhum ficheiro enviado."}), 400
# Parâmetros do formulário
schema = request.form.get("schema", "lite")
fmt = request.form.get("format", "jsonl")
chunk_size = int(request.form.get("chunk_size", 1200))
overlap = int(request.form.get("overlap", 150))
chunk_mode = request.form.get("chunk_mode", "chars")
clean_level = request.form.get("clean_level", "basic")
dedup = request.form.get("dedup", "0") == "1"
auto_pc = request.form.get("auto_pc", "0") == "1"
min_chars = int(request.form.get("min_chars", 0))
# Guardar ficheiro recebido em temp
import tempfile, threading
suffix = os.path.splitext(file.filename)[1].lower()
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
file.save(tmp.name)
tmp.close()
dataset_logs.clear()
dataset_status.update({"status": "processing", "records": 0, "message": "A iniciar..."})
def _run():
global dataset_logs, dataset_status
try:
from pdf_txt_to_dataset import (
process_pdf_to_records, process_txt_to_records,
process_docx_to_records, process_md_to_records,
process_jsonl_passthrough, process_mediawiki_xml_to_records,
write_jsonl, write_csv,
ensure_outdir, is_pdf, is_txt, is_docx, is_md, is_jsonl
)
import json
ensure_outdir(DATASET_OUT_DIR)
dataset_logs.append(f"[INFO] Ficheiro recebido: {file.filename}")
dataset_logs.append(f"[INFO] Schema: {schema} | Formato: {fmt} | Chunk: {chunk_size} | Overlap: {overlap}")
common = dict(
chunk_size=chunk_size, overlap=overlap, chunk_mode=chunk_mode,
schema=schema, clean_level=clean_level, dedup=dedup, min_chars=min_chars
)
fp = tmp.name
if is_pdf(fp):
recs, report, _ = process_pdf_to_records(fp, **common)
elif is_txt(fp):
recs, report, _ = process_txt_to_records(fp, **common)
elif is_docx(fp):
recs, report, _ = process_docx_to_records(fp, **common)
elif is_md(fp):
recs, report, _ = process_md_to_records(fp, **common)
elif is_jsonl(fp):
recs, report, _ = process_jsonl_passthrough(
fp, schema=schema, clean_level=clean_level,
dedup=dedup, min_chars=min_chars, auto_prompt_completion=auto_pc
)
elif fp.endswith(".xml") or file.filename.lower().endswith(".xml"):
max_wiki = int(request.form.get("max_wiki_articles", 0))
dataset_logs.append(f"[INFO] Wiki XML detectado — max_articles={max_wiki or 'sem limite'}")
recs, report, _ = process_mediawiki_xml_to_records(
fp, max_articles=max_wiki, log_fn=dataset_logs.append, **common
)
else:
dataset_logs.append(f"[ERRO] Formato não suportado: {suffix}")
dataset_status.update({"status": "error", "message": "Formato não suportado."})
return
if not recs:
dataset_logs.append("[WARN] Nenhum registo extraído. Verifica o ficheiro ou parâmetros.")
dataset_status.update({"status": "error", "message": "Sem registos."})
return
# Guardar
base_name = os.path.splitext(file.filename)[0]
out_path = os.path.join(DATASET_OUT_DIR, f"{base_name}.{fmt}")
if fmt == "jsonl":
write_jsonl(recs, out_path)
else:
write_csv(recs, out_path)
# Relatório
report_path = os.path.join(DATASET_OUT_DIR, f"{base_name}_report.json")
with open(report_path, "w", encoding="utf-8") as f_rep:
json.dump(report, f_rep, ensure_ascii=False, indent=2)
dataset_logs.append(f"[OK] {len(recs)} registos extraídos.")
dataset_logs.append(f"[OK] Dataset gravado em: {out_path}")
dataset_logs.append(f"[OK] Relatório gravado em: {report_path}")
dataset_status.update({"status": "done", "records": len(recs), "message": "Concluído."})
except Exception as e:
import traceback
dataset_logs.append(f"[ERRO] {e}")
dataset_logs.append(traceback.format_exc())
dataset_status.update({"status": "error", "message": str(e)})
finally:
try:
os.remove(tmp.name)
except Exception:
pass
threading.Thread(target=_run, daemon=True).start()
return jsonify({"ok": True, "message": "A processar em background..."})
#-------------------------------------------
@app.route("/api/dataset/status")
def dataset_status_route():
logs_tail = dataset_logs[-200:] if len(dataset_logs) > 200 else dataset_logs[:]
return jsonify({
"status": dataset_status.get("status", "idle"),
"records": dataset_status.get("records", 0),
"message": dataset_status.get("message", ""),
"logs": logs_tail
})
#-------------------------------------------
@app.route("/api/dataset_dir")
def dataset_dir():
listing = []
if os.path.isdir(DATASET_OUT_DIR):
for fname in sorted(os.listdir(DATASET_OUT_DIR)):
fpath = os.path.join(DATASET_OUT_DIR, fname)
size_kb = os.path.getsize(fpath) // 1024
listing.append(f"{fname} ({size_kb} KB)")
else:
listing.append("(pasta ainda não criada)")
return jsonify({"listing": listing})
#-------------------------------------------
@app.route("/api/dataset/download/<path:filename>")
def dataset_download(filename):
"""Serve ficheiro do directório de output do dataset para download."""
import re
# Segurança: só permite nomes de ficheiro simples sem path traversal
safe_name = os.path.basename(filename)
file_path = os.path.join(DATASET_OUT_DIR, safe_name)
if not os.path.isfile(file_path):
return jsonify({"error": "File not found."}), 404
return send_file(file_path, as_attachment=True, download_name=safe_name)
#--------------------------------------------
@app.route("/api/train_status")
def train_status():
progress = train_progress.get("percent", 0)
status = train_progress.get("status", "")
epoch_list = epoch_losses
logs_tail = training_logs[-200:] if len(training_logs) > 200 else training_logs[:]
# Acrescentar as linhas pedidas (apenas texto; simples)
if CURRENT_ACCUM_STEPS is not None:
origem = LAST_ACCUM_ORIGIN or "desconhecido"
logs_tail.append(f"[INFO] gradient_accumulation_steps atual: {CURRENT_ACCUM_STEPS} ({origem})")
# opcional: mostrar Effective Batch
if BASE_BATCH_SIZE_EFFECTIVE:
eb = BASE_BATCH_SIZE_EFFECTIVE * CURRENT_ACCUM_STEPS
logs_tail.append(f"[INFO] Effective Batch (base={BASE_BATCH_SIZE_EFFECTIVE}) = {eb}")
if TOTAL_TRAIN_STEPS is not None:
logs_tail.append(f"[INFO] total_train_steps desta execução: {TOTAL_TRAIN_STEPS}")
return jsonify({
"progress": progress,
"status": status,
"epoch_losses": epoch_list,
"logs": logs_tail
})
#---------------------------------------
@app.route('/api/load_lora_chat', methods=['POST'])
def load_lora_chat():
try:
data = request.get_json()
model_path = data.get("model_path")
if not model_path or "C:/" in model_path:
return jsonify({"status": "error", "message": "Caminho do modelo inválido para Linux (HF Spaces). Use o ID do Hugging Face."}), 400
output_dir = FINAL_OUTPUT_DIR
lora_model_path = os.path.join(output_dir, "lora_model")
if not os.path.exists(lora_model_path):
return jsonify({"status": "error", "message": f"LoRA não encontrado em: {lora_model_path}"}), 404
# Determinar dtype e device_map
if torch.cuda.is_available():
model_dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
model_device_map = "auto"
else:
model_dtype = torch.bfloat16
model_device_map = "cpu"
# Carregar o tokenizer do LoRA primeiro para obter o tamanho de vocabulário correto
tokenizer = AutoTokenizer.from_pretrained(lora_model_path, use_fast=True)
base_model = AutoModelForCausalLM.from_pretrained(
model_path,
dtype=model_dtype,
device_map=model_device_map,
low_cpu_mem_usage=True
)
# CORREÇÃO: Redimensionar o modelo base para o tamanho do vocabulário do tokenizer do LoRA
base_model.resize_token_embeddings(len(tokenizer))
lora_model = PeftModel.from_pretrained(base_model, lora_model_path, device_map=model_device_map)
chat_model = lora_model
chat_tokenizer = tokenizer
log_success("Modelo LoRA carregado com sucesso para chat (sem merge).")
train_progress.update({"status": "awaiting_merge", "message": "Modelo LoRA carregado para chat. Decida a próxima ação."})
return jsonify({"status": "ok", "message": "Modelo LoRA carregado para chat (sem merge)."})
except Exception as e:
log_error(f"Erro ao carregar LoRA para chat: {e}")
train_progress.update({"status": "error", "message": f"Erro ao carregar LoRA para chat: {e}"})
return jsonify({"status": "error", "message": str(e)}), 500
#--------------------------------------------
@app.route('/api/chat', methods=['POST'])
def handle_chat():
global chat_model, chat_tokenizer
if not chat_model or not chat_tokenizer:
return jsonify({
"response": "ERRO: O modelo ainda não está carregado. Faça o 'merge' ou 'load_lora_chat' primeiro."
}), 400
try:
data = request.get_json()
prompt = data.get('prompt')
if not prompt:
return jsonify({"response": "Erro: 'prompt' não foi enviado."}), 400
formatted_prompt = f"<s>[INST] {prompt} [/INST]"
inputs = chat_tokenizer(formatted_prompt, return_tensors="pt").to(chat_model.device)
outputs = chat_model.generate(
**inputs,
max_new_tokens=256,
num_return_sequences=1,
eos_token_id=chat_tokenizer.eos_token_id,
pad_token_id=chat_tokenizer.eos_token_id,
do_sample=True,
temperature=0.01, # <--- Sugestão: Reduzir temperatura para respostas mais determinísticas
top_p=0.9,
)
response_full = chat_tokenizer.decode(outputs[0], skip_special_tokens=True)
response_only = response_full.split("[/INST]")[-1].strip()
return jsonify({"response": response_only})
except Exception as e:
return jsonify({"response": f"Erro interno no chat: {str(e)}"}), 500
#--------------------------------------------
@app.route('/api/decide_merge', methods=['POST'])
def decide_merge():
global chat_model, chat_tokenizer, train_progress
try:
decision = request.form.get('decision')
model_path = request.form.get('model_path')
output_dir = os.path.join(os.getcwd(), "trained_model_output")
lora_model_path = os.path.join(output_dir, "lora_model")
if torch.cuda.is_available():
model_dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
model_device_map = "auto"
else:
model_dtype = torch.bfloat16
model_device_map = "cpu"
if decision == "merge":
log_info("Iniciando o merge do adapter LoRA com o modelo base...")
train_progress.update({"status": "merging", "message": "Iniciando a fusão do adapter LoRA..."})
# Carregar o tokenizer do LoRA primeiro para obter o tamanho de vocabulário correto
tokenizer = AutoTokenizer.from_pretrained(lora_model_path, use_fast=True)
base_model = AutoModelForCausalLM.from_pretrained(
model_path,
dtype=model_dtype,
device_map=model_device_map,
low_cpu_mem_usage=True
)
# CORREÇÃO: Redimensionar o modelo base para o tamanho do vocabulário do tokenizer do LoRA
base_model.resize_token_embeddings(len(tokenizer))
lora_model = PeftModel.from_pretrained(base_model, lora_model_path)
merged_model = lora_model.merge_and_unload()
final_model_path = os.path.join(output_dir, "merged_model")
os.makedirs(final_model_path, exist_ok=True)
merged_model.save_pretrained(final_model_path, safe_serialization=True)
# O tokenizer já foi carregado acima
tokenizer.save_pretrained(final_model_path)
chat_model = merged_model
chat_tokenizer = tokenizer
log_success("Modelo fundido com sucesso e pronto para o chat!")
train_progress.update({"status": "finished", "percent": 100, "message": "Modelo LoRA fundido e salvo."})
return jsonify({"status": "merged", "message": "Modelo fundido com sucesso."})
#-----------------------------------------------------------------------------------------
elif decision == "continue":
log_info("A continuar o treino por mais épocas a partir do adapter salvo...")
epochs_to_add = int(request.form.get('epochs', 1)) # Renomeado para clareza
if not os.path.exists(lora_model_path):
log_error("Nenhum adapter LoRA encontrado para retomar o treino.")
train_progress.update({"status": "continuing_training", "message": f"Continuando treino por mais {epochs_to_add} épocas..."})
return jsonify({"status": "error", "message": "Nenhum adapter LoRA encontrado para retomar o treino."}), 400
# --- Determinar épocas já completadas ---
initial_epochs_completed = 0
output_dir = FINAL_OUTPUT_DIR
checkpoint_dirs = [d for d in os.listdir(output_dir) if d.startswith("checkpoint-")]
if checkpoint_dirs:
checkpoint_dirs.sort(key=lambda x: int(x.split('-')[1]), reverse=True)
latest_candidate_checkpoint_dir = os.path.join(output_dir, checkpoint_dirs[0])
trainer_state_path = os.path.join(latest_candidate_checkpoint_dir, "trainer_state.json")
if os.path.exists(trainer_state_path):
try:
with open(trainer_state_path, 'r') as f:
trainer_state = json.load(f)
initial_epochs_completed = int(trainer_state.get('epoch', 0))
log_info(f"Detectado que {initial_epochs_completed} épocas foram completadas no último checkpoint.")
except Exception as e:
log_warning(f"Erro ao ler trainer_state.json do checkpoint: {e}. Assumindo 0 épocas completadas.")
# Atualiza o status de progresso para indicar que o treino vai continuar
train_progress.update({"status": "continuing_training", "message": f"Continuando treino por mais {epochs_to_add} épocas..."})
thread = Thread(
target=train_model_lora,
args=(
all_data,
epochs_to_add,
model_path,
output_dir,
ACCUMULATION_STEPS,
DATALOADER_WORKERS,
"continue_train",
lora_model_path,
initial_epochs_completed,
"continue_train"
)
)
thread.start()
return jsonify({
"status": "continued",
"message": f"Treino adicional de {epochs_to_add} épocas iniciado."
})
else:
log_warning(f"Decisão inválida para /decide_merge: {decision}")
train_progress.update({"status": "error", "message": f"Decisão inválida: {decision}"})
return jsonify({"status": "error", "message": "Decisão inválida."}), 400
except Exception as e:
log_error(f"Erro durante o merge/continuação: {e}")
train_progress.update({"status": "error", "message": f"Erro durante o merge/continuação: {e}"})
return jsonify({"status": "error", "message": str(e)}), 500
#--------------------------------------------------------------------
@app.post("/api/tensorboard/start")
def start_tensorboard():
threading.Thread(target=run_tensorboard, daemon=True).start()
return jsonify({"status": "ok", "url": f"http://localhost:{TENSORBOARD_PORT}"}), 200
# =========================================================================
# ROTAS DE DIRETORIO DO OUTPUT
# =========================================================================
@app.route('/api/model_dir', methods=['GET'])
def list_model_dir():
try:
# Tenta vários caminhos possíveis no Hugging Face
paths_to_check = [
os.path.join(os.getcwd(), "trained_model_output"),
"/app/trained_model_output",
"./trained_model_output"
]
target = paths_to_check[0]
for p in paths_to_check:
if os.path.exists(p):
target = p
break
# Se não existir, cria agora para o botão não falhar
if not os.path.exists(target):
os.makedirs(target, exist_ok=True)
return jsonify({"files": ["Pasta criada agora. Está vazia."]}), 200
files_list = []
for root, dirs, filenames in os.walk(target):
for f in filenames:
rel_path = os.path.relpath(os.path.join(root, f), target)
files_list.append(rel_path)
if not files_list:
return jsonify({"files": ["Pasta encontrada, mas não tem ficheiros lá dentro."]}), 200
return jsonify({"files": sorted(files_list)}), 200
except Exception as e:
return jsonify({"files": [f"Erro ao ler pasta: {str(e)}"]}), 200 # Retornamos 200 para o JS não dar erro
#-------------------------------------------------------------------------
@app.route('/api/download/<path:filename>')
def download_file(filename):
# Caminho absoluto para a pasta de output
directory = os.path.join(os.getcwd(), "trained_model_output")
# send_from_directory é seguro e evita que utilizadores acedam a pastas do sistema
try:
return send_from_directory(directory, filename, as_attachment=True)
except Exception as e:
return str(e), 404
# =========================================================================
# ROTAS DE CONFIGURAÇÃO (GET para Ler, POST para Guardar)
# =========================================================================
@app.route('/api/config', methods=['GET', 'POST'])
def api_config():
from config_manager import set_runtime_globals, _load_constants_from_file, map_backend_to_frontend
set_runtime_globals({
'logical_cpus': LOGICAL_CPUS,
'omp_threads': OMP_THREADS,
'dataloader_workers': DATALOADER_WORKERS,
'total_ram_gb': TOTAL_RAM_GB,
})
constants = _load_constants_from_file()
return jsonify(map_backend_to_frontend(constants))
# =========================================================================
# Endpoints
# =========================================================================
@app.route("/api/gguf/convert", methods=["POST"])
def gguf_convert():
"""
Inicia conversão do modelo merged para GGUF em background.
Body JSON: { "model_path": "...", "format": "f16" | "q8_0" }
"""
global gguf_logs, gguf_status
import threading
data = request.get_json(force=True) or {}
model_path = data.get("model_path", "").strip()
fmt = data.get("format", "f16").lower() # "f16" ou "q8_0"
if not model_path:
return jsonify({"error": "model_path é obrigatório."}), 400
if not os.path.isdir(model_path):
return jsonify({"error": f"Diretório não encontrado: {model_path}"}), 400
if fmt not in ("f16", "q8_0"):
return jsonify({"error": "format deve ser 'f16' ou 'q8_0'."}), 400
gguf_logs.clear()
gguf_status.update({"status": "processing", "message": "A iniciar conversão..."})
def _run():
global gguf_logs, gguf_status
import subprocess, sys
os.makedirs(GGUF_OUT_DIR, exist_ok=True)
model_name = os.path.basename(model_path.rstrip("/\\")) or "model"
out_filename = f"{model_name}_{fmt}.gguf"
out_path = os.path.join(GGUF_OUT_DIR, out_filename)
gguf_logs.append(f"[INFO] Modelo: {model_path}")
gguf_logs.append(f"[INFO] Formato: {fmt.upper()}")
gguf_logs.append(f"[INFO] Output: {out_path}")
try:
# ── Método 1: convert_hf_to_gguf.py do llama.cpp ──────────────
# Procura o script em localizações comuns
convert_script = _find_convert_script()
if convert_script:
gguf_logs.append(f"[INFO] Usando: {convert_script}")
cmd = [
sys.executable, convert_script,
model_path,
"--outfile", out_path,
"--outtype", fmt,
]
gguf_logs.append(f"[CMD] {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True, text=True, timeout=3600
)
if result.stdout:
for line in result.stdout.splitlines():
gguf_logs.append(line)
if result.stderr:
for line in result.stderr.splitlines():
gguf_logs.append(f"[STDERR] {line}")
if result.returncode != 0:
raise RuntimeError(f"Conversão falhou (exit {result.returncode})")
else:
# ── Método 2: llama-cpp-python (se instalado) ──────────────
gguf_logs.append("[INFO] convert_hf_to_gguf.py não encontrado.")
gguf_logs.append("[INFO] A tentar llama_cpp...")
try:
from llama_cpp import llama_model_quantize_params, llama_model_quantize
gguf_logs.append("[WARN] llama_cpp encontrado mas API de quantização não suportada directamente.")
gguf_logs.append("[WARN] Instala o llama.cpp e coloca convert_hf_to_gguf.py no PATH.")
raise RuntimeError("Sem método de conversão disponível.")
except ImportError:
gguf_logs.append("[ERRO] llama-cpp-python não está instalado.")
gguf_logs.append("[INFO] Instala com: pip install llama-cpp-python")
gguf_logs.append("[INFO] Ou clona: https://github.com/ggerganov/llama.cpp")
raise RuntimeError("llama-cpp-python não instalado.")
if not os.path.isfile(out_path):
raise RuntimeError(f"Ficheiro GGUF não foi criado: {out_path}")
size_mb = os.path.getsize(out_path) / (1024 * 1024)
gguf_logs.append(f"[OK] Conversão concluída: {out_filename} ({size_mb:.1f} MB)")
gguf_status.update({"status": "done", "message": "Concluído."})
except Exception as e:
import traceback
gguf_logs.append(f"[ERRO] {e}")
gguf_logs.append(traceback.format_exc())
gguf_status.update({"status": "error", "message": str(e)})
threading.Thread(target=_run, daemon=True).start()
return jsonify({"ok": True, "message": "Conversão iniciada em background..."})
def _find_convert_script():
"""Procura convert_hf_to_gguf.py em localizações comuns."""
import shutil
candidates = [
"convert_hf_to_gguf.py", # mesmo diretório
os.path.join(os.path.dirname(__file__), "convert_hf_to_gguf.py"),
os.path.expanduser("~/llama.cpp/convert_hf_to_gguf.py"),
"/app/convert_hf_to_gguf.py", # HuggingFace Space
"/llama.cpp/convert_hf_to_gguf.py",
]
for path in candidates:
if os.path.isfile(path):
return path
# Tenta encontrar no PATH
found = shutil.which("convert_hf_to_gguf.py")
if found:
return found
return None
#-------------------------------------------
@app.route("/api/gguf/status")
def gguf_status_route():
logs_tail = gguf_logs[-200:] if len(gguf_logs) > 200 else gguf_logs[:]
return jsonify({
"status": gguf_status.get("status", "idle"),
"message": gguf_status.get("message", ""),
"logs": logs_tail,
})
#-------------------------------------------
@app.route("/api/gguf/dir")
def gguf_dir():
listing = []
if os.path.isdir(GGUF_OUT_DIR):
for fname in sorted(os.listdir(GGUF_OUT_DIR)):
fpath = os.path.join(GGUF_OUT_DIR, fname)
if os.path.isfile(fpath):
size_mb = os.path.getsize(fpath) / (1024 * 1024)
listing.append(f"{fname} ({size_mb:.1f} MB)")
else:
listing.append("(pasta ainda não criada)")
return jsonify({"listing": listing})
#-------------------------------------------
@app.route("/api/gguf/download/<path:filename>")
def gguf_download(filename):
"""Serve ficheiro GGUF para download."""
safe_name = os.path.basename(filename)
file_path = os.path.join(GGUF_OUT_DIR, safe_name)
if not os.path.isfile(file_path):
return jsonify({"error": "File not found."}), 404
return send_file(file_path, as_attachment=True, download_name=safe_name)
#------------------------------------
@app.post("/api/adjust_accumulation")
def adjust_accumulation():
"""
Ajuste manual relativo do gradient_accumulation_steps.
Usa o valor atual real do treino se CURRENT_ACCUM_STEPS não estiver definido.
Envia { "action": "auto" } para voltar ao modo automático.
"""
global CURRENT_ACCUM_STEPS
data = request.get_json(force=True)
delta = int(data.get("delta", 0))
action = data.get("action")
# Modo automático
if action == "auto":
CURRENT_ACCUM_STEPS = None
logging.info("🔄 Modo automático de accumulation reativado.")
return jsonify({"status": "ok", "mode": "auto", "message": "Modo automático reativado."})
# Se CURRENT_ACCUM_STEPS ainda não definido, usa valor atual do treino
if CURRENT_ACCUM_STEPS is None:
CURRENT_ACCUM_STEPS = train_progress.get("current_accum_steps", 16) # valor padrão caso não exista
# Aplica delta e limita
CURRENT_ACCUM_STEPS = max(1, min(DYNAMIC_ACCUMULATION_MAX_STEPS, CURRENT_ACCUM_STEPS + delta))
logging.info(f"🎚 Ajuste manual recebido → CURRENT_ACCUM_STEPS = {CURRENT_ACCUM_STEPS} (delta={delta})")
return jsonify({
"status": "ok",
"current_steps": CURRENT_ACCUM_STEPS,
"mode": "manual"
})
#----------------------------------------------------
@app.route('/api/create_model', methods=['POST'])
def create_model():
global model
data = request.json
model_name = data.get("model_name")
if not model_name:
return jsonify({"status": "error", "message": "model_name não fornecido."}), 400
# O modelo real é carregado no início do treino via train_model_lora
return jsonify({"status": "ok", "message": f"Modelo '{model_name}' será carregado ao iniciar o treino."})
#---------------------------------------------------------------
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False)