import os import gc import json import time import random import threading import traceback from pathlib import Path from datetime import datetime import gradio as gr import psutil from tqdm.auto import tqdm from datasets import load_dataset from tokenizers import Tokenizer from tokenizers.models import BPE from tokenizers.trainers import BpeTrainer from tokenizers.pre_tokenizers import ByteLevel from tokenizers.decoders import ByteLevel as ByteLevelDecoder from tokenizers.processors import TemplateProcessing from huggingface_hub import HfApi, create_repo, upload_folder # ============================================================ # Env-driven config # ============================================================ HF_TOKEN = os.environ.get("HF_TOKEN", "").strip() REPO_ID = os.environ.get("MODEL_REPO_ID", "Efe2898/new-model").strip() PUSH_TO_HUB = os.environ.get("PUSH_TO_HUB", "1").strip().lower() in {"1", "true", "yes", "y"} DATASET_ID = os.environ.get("DATASET_ID", "turkish-nlp-suite/BellaTurca").strip() SUBSETS = [ s.strip() for s in os.environ.get( "SUBSETS", "AkademikDerlem,OzenliDerlem,temiz-OSCAR,temiz-mC4", ).split(",") if s.strip() ] TEXT_COLUMN = os.environ.get("TEXT_COLUMN", "text").strip() SPLIT = os.environ.get("SPLIT", "train").strip() VOCAB_SIZE = int(os.environ.get("VOCAB_SIZE", "65536")) MODEL_MAX_LENGTH = int(os.environ.get("MODEL_MAX_LENGTH", "262144")) BPE_MIN_FREQUENCY = int(os.environ.get("BPE_MIN_FREQUENCY", "3")) TARGET_TOTAL_EST_TOKENS = int(os.environ.get("TARGET_TOTAL_EST_TOKENS", "700000000")) TARGET_PER_SUBSET_EST_TOKENS = TARGET_TOTAL_EST_TOKENS // max(1, len(SUBSETS)) CHARS_PER_EST_TOKEN = float(os.environ.get("CHARS_PER_EST_TOKEN", "4.0")) CPU_THREADS = int(os.environ.get("CPU_THREADS", "2")) ROW_BATCH_SIZE = int(os.environ.get("ROW_BATCH_SIZE", "2048")) MAX_CHARS_PER_BATCH = int(os.environ.get("MAX_CHARS_PER_BATCH", "2000000")) MAX_TEXT_CHARS = int(os.environ.get("MAX_TEXT_CHARS", "512000")) MIN_TEXT_CHARS = int(os.environ.get("MIN_TEXT_CHARS", "20")) ENABLE_CPU_COOLDOWN = os.environ.get("ENABLE_CPU_COOLDOWN", "1").strip().lower() in {"1", "true", "yes", "y"} MICRO_SLEEP_AFTER_EACH_BATCH = float(os.environ.get("MICRO_SLEEP_AFTER_EACH_BATCH", "0.25")) COOLDOWN_EVERY_SECONDS = int(os.environ.get("COOLDOWN_EVERY_SECONDS", "60")) COOLDOWN_SECONDS = float(os.environ.get("COOLDOWN_SECONDS", "5")) CPU_CHECK_INTERVAL = float(os.environ.get("CPU_CHECK_INTERVAL", "0.10")) CPU_SOFT_LIMIT_PERCENT = float(os.environ.get("CPU_SOFT_LIMIT_PERCENT", "80")) CPU_HARD_LIMIT_PERCENT = float(os.environ.get("CPU_HARD_LIMIT_PERCENT", "95")) CPU_SOFT_SLEEP_SECONDS = float(os.environ.get("CPU_SOFT_SLEEP_SECONDS", "2")) CPU_HARD_SLEEP_SECONDS = float(os.environ.get("CPU_HARD_SLEEP_SECONDS", "6")) RAM_HARD_LIMIT_PERCENT = float(os.environ.get("RAM_HARD_LIMIT_PERCENT", "90")) SHUFFLE_BATCH = os.environ.get("SHUFFLE_BATCH", "0").strip().lower() in {"1", "true", "yes", "y"} RANDOM_SEED = int(os.environ.get("RANDOM_SEED", "42")) AUTO_START = os.environ.get("AUTO_START", "0").strip().lower() in {"1", "true", "yes", "y"} SPACE_PASSWORD = os.environ.get("SPACE_PASSWORD", "738825").strip() def gradio_auth(username: str, password: str) -> bool: """Protect the Space UI. Username is ignored; only the password matters.""" return str(password).strip() == SPACE_PASSWORD # Use /data when persistent storage is attached; otherwise Space working dir. ROOT_DIR = Path(os.environ.get("OUTPUT_ROOT", "/data" if Path("/data").exists() else ".")) OUT_DIR = ROOT_DIR / "rslm_tokenizer_bellaturca_65k" OUT_DIR.mkdir(parents=True, exist_ok=True) LOG_PATH = OUT_DIR / "training.log" STATUS_PATH = OUT_DIR / "status.json" SPECIAL_TOKENS = [ "<|pad|>", "<|bos|>", "<|eos|>", "<|unk|>", "<|system|>", "<|user|>", "<|assistant|>", "<|answer|>", "<|end|>", "", "", ] # Keep CPU usage sane. os.environ["RAYON_NUM_THREADS"] = str(CPU_THREADS) os.environ["OMP_NUM_THREADS"] = str(CPU_THREADS) os.environ["MKL_NUM_THREADS"] = str(CPU_THREADS) os.environ["OPENBLAS_NUM_THREADS"] = str(CPU_THREADS) os.environ["NUMEXPR_NUM_THREADS"] = str(CPU_THREADS) os.environ["VECLIB_MAXIMUM_THREADS"] = str(CPU_THREADS) os.environ["BLIS_NUM_THREADS"] = str(CPU_THREADS) os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = os.environ.get("HF_HUB_ENABLE_HF_TRANSFER", "0") _proc = psutil.Process(os.getpid()) psutil.cpu_percent(interval=None) _proc.cpu_percent(interval=None) random.seed(RANDOM_SEED) _state = { "thread": None, "running": False, "finished": False, "error": None, "uploaded": False, } def log(msg: str): ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts} UTC] {msg}" print(line, flush=True) with open(LOG_PATH, "a", encoding="utf-8") as f: f.write(line + "\n") def write_status(**kwargs): data = { "running": _state["running"], "finished": _state["finished"], "error": _state["error"], "uploaded": _state["uploaded"], "repo_id": REPO_ID, "out_dir": str(OUT_DIR), "updated_at_utc": datetime.utcnow().isoformat() + "Z", } data.update(kwargs) data["progress_percent"] = round(calculate_progress_percent(data), 4) stats_obj = data.get("stats") if isinstance(stats_obj, dict): per_subset = {} for subset_name, subset_stats in stats_obj.items(): try: subset_est = int(subset_stats.get("est_tokens", 0)) per_subset[subset_name] = round( max(0.0, min(100.0, 100.0 * subset_est / max(1, TARGET_PER_SUBSET_EST_TOKENS))), 4, ) except Exception: pass data["subset_progress_percent"] = per_subset with open(STATUS_PATH, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def calculate_progress_percent(status_data=None): status_data = status_data or {} total = status_data.get("total_est_tokens") stats_obj = status_data.get("stats") if total is None and isinstance(stats_obj, dict): try: total = sum(int(v.get("est_tokens", 0)) for v in stats_obj.values()) except Exception: total = 0 try: total = int(total or 0) except Exception: total = 0 target = max(1, int(TARGET_TOTAL_EST_TOKENS)) percent = max(0.0, min(100.0, 100.0 * total / target)) return percent def load_status_file(): if not STATUS_PATH.exists(): return {} try: return json.loads(STATUS_PATH.read_text(encoding="utf-8")) except Exception: return {} def tail_log(n=220): if not LOG_PATH.exists(): return "Henüz log yok." lines = LOG_PATH.read_text(encoding="utf-8", errors="replace").splitlines() return "\n".join(lines[-n:]) def system_snapshot(): vm = psutil.virtual_memory() cpu = psutil.cpu_percent(interval=CPU_CHECK_INTERVAL) proc_cpu = _proc.cpu_percent(interval=None) proc_cpu_norm = proc_cpu / max(1, CPU_THREADS) return { "cpu": cpu, "proc_cpu": proc_cpu, "proc_cpu_norm": proc_cpu_norm, "ram_percent": vm.percent, "rss_gb": _proc.memory_info().rss / (1024 ** 3), "available_gb": vm.available / (1024 ** 3), } _last_cooldown_at = time.time() def maybe_cooldown(reason="batch"): global _last_cooldown_at if not ENABLE_CPU_COOLDOWN: return if MICRO_SLEEP_AFTER_EACH_BATCH > 0: time.sleep(MICRO_SLEEP_AFTER_EACH_BATCH) snap = system_snapshot() if snap["ram_percent"] >= RAM_HARD_LIMIT_PERCENT: log( f"[RAM-FREN] ram={snap['ram_percent']:.1f}% rss={snap['rss_gb']:.2f}GB " f"available={snap['available_gb']:.2f}GB -> gc + sleep {CPU_HARD_SLEEP_SECONDS}s" ) gc.collect() time.sleep(CPU_HARD_SLEEP_SECONDS) _last_cooldown_at = time.time() return cpu_load = max(snap["cpu"], snap["proc_cpu_norm"]) if cpu_load >= CPU_HARD_LIMIT_PERCENT: log( f"[CPU-HARD] reason={reason} cpu={snap['cpu']:.1f}% " f"proc_norm={snap['proc_cpu_norm']:.1f}% -> sleep {CPU_HARD_SLEEP_SECONDS}s" ) time.sleep(CPU_HARD_SLEEP_SECONDS) _last_cooldown_at = time.time() elif cpu_load >= CPU_SOFT_LIMIT_PERCENT: log( f"[CPU-SOFT] reason={reason} cpu={snap['cpu']:.1f}% " f"proc_norm={snap['proc_cpu_norm']:.1f}% -> sleep {CPU_SOFT_SLEEP_SECONDS}s" ) time.sleep(CPU_SOFT_SLEEP_SECONDS) _last_cooldown_at = time.time() now = time.time() if now - _last_cooldown_at >= COOLDOWN_EVERY_SECONDS: log( f"[COOLDOWN] reason={reason} regular sleep {COOLDOWN_SECONDS}s | " f"cpu={snap['cpu']:.1f}% proc_norm={snap['proc_cpu_norm']:.1f}% " f"rss={snap['rss_gb']:.2f}GB ram={snap['ram_percent']:.1f}%" ) gc.collect() time.sleep(COOLDOWN_SECONDS) _last_cooldown_at = time.time() def clean_text(value): if value is None: return None if not isinstance(value, str): value = str(value) value = value.strip() if len(value) < MIN_TEXT_CHARS: return None if MAX_TEXT_CHARS and len(value) > MAX_TEXT_CHARS: value = value[:MAX_TEXT_CHARS] return value def estimate_tokens(text: str) -> int: return max(1, int(len(text) / CHARS_PER_EST_TOKEN)) def train_tokenizer(): _state["running"] = True _state["finished"] = False _state["error"] = None _state["uploaded"] = False write_status(phase="starting") stats = { subset: { "rows": 0, "chars": 0, "est_tokens": 0, "batches": 0, "exhausted": False, "started_at": None, "finished_at": None, } for subset in SUBSETS } def total_est_tokens_seen(): return sum(stats[s]["est_tokens"] for s in SUBSETS) def print_stats(prefix=""): snap = system_snapshot() total_rows = sum(stats[s]["rows"] for s in SUBSETS) total_chars = sum(stats[s]["chars"] for s in SUBSETS) total_est = total_est_tokens_seen() log( f"{prefix} cpu={snap['cpu']:.1f}% proc_norm={snap['proc_cpu_norm']:.1f}% " f"rss={snap['rss_gb']:.2f}GB ram={snap['ram_percent']:.1f}%" ) log(f"TOTAL rows={total_rows:,} chars={total_chars:,} est_tokens={total_est:,}") for subset in SUBSETS: st = stats[subset] pct = 100 * st["est_tokens"] / max(1, TARGET_PER_SUBSET_EST_TOKENS) log( f" {subset:16s} rows={st['rows']:,} batches={st['batches']:,} " f"chars={st['chars']:,} est={st['est_tokens']:,} target%={pct:.2f} " f"exhausted={st['exhausted']}" ) write_status(phase="training", stats=stats, total_est_tokens=total_est) def load_stream(subset: str): log(f"Loading stream: {DATASET_ID} / {subset}") kwargs = dict(path=DATASET_ID, name=subset, split=SPLIT, streaming=True) if HF_TOKEN: kwargs["token"] = HF_TOKEN return iter(load_dataset(**kwargs)) def pull_batch(iterator, subset: str): batch = [] batch_chars = 0 pulled = 0 while ( pulled < ROW_BATCH_SIZE and batch_chars < MAX_CHARS_PER_BATCH and stats[subset]["est_tokens"] < TARGET_PER_SUBSET_EST_TOKENS and total_est_tokens_seen() < TARGET_TOTAL_EST_TOKENS ): try: row = next(iterator) except StopIteration: stats[subset]["exhausted"] = True log(f"[WARN] Subset exhausted before target: {subset}") break text = clean_text(row.get(TEXT_COLUMN)) if text is None: continue batch.append(text) stats[subset]["rows"] += 1 stats[subset]["chars"] += len(text) stats[subset]["est_tokens"] += estimate_tokens(text) batch_chars += len(text) pulled += 1 if batch: stats[subset]["batches"] += 1 return batch def text_batches(): start = time.time() last_total = 0 global_batch_idx = 0 for subset in SUBSETS: if total_est_tokens_seen() >= TARGET_TOTAL_EST_TOKENS: break stats[subset]["started_at"] = time.time() subset_start_est = total_est_tokens_seen() stream = load_stream(subset) log(f"===== START SUBSET: {subset} =====") while ( stats[subset]["est_tokens"] < TARGET_PER_SUBSET_EST_TOKENS and total_est_tokens_seen() < TARGET_TOTAL_EST_TOKENS and not stats[subset]["exhausted"] ): batch = pull_batch(stream, subset) if not batch: break if SHUFFLE_BATCH: random.shuffle(batch) total_est = total_est_tokens_seen() global_batch_idx += 1 if global_batch_idx % 10 == 0: elapsed = max(1e-9, time.time() - start) speed = (total_est - last_total) / max(1e-9, elapsed) / 1e6 last_total = total_est print_stats(prefix=f"[batch {global_batch_idx}] elapsed={elapsed/60:.1f}min speed~{speed:.3f}M est_tok/s") yield batch maybe_cooldown(reason=f"batch_{global_batch_idx}_{subset}") stats[subset]["finished_at"] = time.time() new_est = total_est_tokens_seen() - subset_start_est print_stats(prefix=f"===== FINISH SUBSET: {subset} | new_est={new_est:,} =====") del stream gc.collect() maybe_cooldown(reason=f"finish_{subset}") print_stats(prefix="Final iterator stats:") try: LOG_PATH.parent.mkdir(parents=True, exist_ok=True) log("Tokenizer training started.") log(f"Repo target: {REPO_ID}") log(f"Dataset: {DATASET_ID} / subsets={SUBSETS}") log(f"VOCAB_SIZE={VOCAB_SIZE} target_est_tokens={TARGET_TOTAL_EST_TOKENS:,}") log(f"CPU_THREADS={CPU_THREADS} ROW_BATCH_SIZE={ROW_BATCH_SIZE} MAX_CHARS_PER_BATCH={MAX_CHARS_PER_BATCH:,}") if PUSH_TO_HUB and not HF_TOKEN: raise RuntimeError("HF_TOKEN secret/env yok. Push için Space Settings > Secrets içine HF_TOKEN ekle.") try: bpe_model = BPE(unk_token="<|unk|>", fuse_unk=True, byte_fallback=True) except TypeError: log("[WARN] tokenizers BPE(byte_fallback=True) desteklemiyor; byte_fallback olmadan devam.") bpe_model = BPE(unk_token="<|unk|>", fuse_unk=True) tokenizer = Tokenizer(bpe_model) tokenizer.pre_tokenizer = ByteLevel(add_prefix_space=False) tokenizer.decoder = ByteLevelDecoder() trainer = BpeTrainer( vocab_size=VOCAB_SIZE, min_frequency=BPE_MIN_FREQUENCY, special_tokens=SPECIAL_TOKENS, initial_alphabet=ByteLevel.alphabet(), show_progress=True, ) t0 = time.time() tokenizer.train_from_iterator(text_batches(), trainer=trainer, length=TARGET_TOTAL_EST_TOKENS) elapsed = time.time() - t0 log(f"Tokenizer training finished in {elapsed/60:.2f} minutes.") log(f"Final vocab size: {tokenizer.get_vocab_size()}") bos_id = tokenizer.token_to_id("<|bos|>") eos_id = tokenizer.token_to_id("<|eos|>") assert bos_id is not None and eos_id is not None, "BOS/EOS token id bulunamadı." tokenizer.post_processor = TemplateProcessing( single="<|bos|> $A <|eos|>", pair="<|bos|> $A <|eos|> $B:1 <|eos|>:1", special_tokens=[("<|bos|>", bos_id), ("<|eos|>", eos_id)], ) tokenizer.save(str(OUT_DIR / "tokenizer.json")) special_tokens_map = { "bos_token": "<|bos|>", "eos_token": "<|eos|>", "unk_token": "<|unk|>", "pad_token": "<|pad|>", "additional_special_tokens": [ "<|system|>", "<|user|>", "<|assistant|>", "<|answer|>", "<|end|>", "", "", ], } chat_template = ( "{% for message in messages %}" "{% if message['role'] == 'system' %}<|system|>\n{{ message['content'] }}<|end|>\n" "{% elif message['role'] == 'user' %}<|user|>\n{{ message['content'] }}<|end|>\n" "{% elif message['role'] == 'assistant' %}<|assistant|>\n{{ message['content'] }}<|end|>\n" "{% endif %}" "{% endfor %}" "{% if add_generation_prompt %}<|assistant|>\n{% endif %}" ) tokenizer_config = { "model_max_length": MODEL_MAX_LENGTH, "tokenizer_class": "PreTrainedTokenizerFast", "clean_up_tokenization_spaces": False, "padding_side": "right", "truncation_side": "right", "bos_token": "<|bos|>", "eos_token": "<|eos|>", "unk_token": "<|unk|>", "pad_token": "<|pad|>", "additional_special_tokens": special_tokens_map["additional_special_tokens"], "chat_template": chat_template, } with open(OUT_DIR / "special_tokens_map.json", "w", encoding="utf-8") as f: json.dump(special_tokens_map, f, ensure_ascii=False, indent=2) with open(OUT_DIR / "tokenizer_config.json", "w", encoding="utf-8") as f: json.dump(tokenizer_config, f, ensure_ascii=False, indent=2) report = { "created_at_utc": datetime.utcnow().isoformat() + "Z", "dataset_id": DATASET_ID, "subsets": SUBSETS, "split": SPLIT, "text_column": TEXT_COLUMN, "vocab_size": VOCAB_SIZE, "actual_vocab_size": tokenizer.get_vocab_size(), "bpe_min_frequency": BPE_MIN_FREQUENCY, "model_max_length": MODEL_MAX_LENGTH, "target_total_est_tokens": TARGET_TOTAL_EST_TOKENS, "target_per_subset_est_tokens": TARGET_PER_SUBSET_EST_TOKENS, "chars_per_est_token": CHARS_PER_EST_TOKEN, "cpu_threads": CPU_THREADS, "row_batch_size": ROW_BATCH_SIZE, "max_chars_per_batch": MAX_CHARS_PER_BATCH, "max_text_chars": MAX_TEXT_CHARS, "stats": stats, "special_tokens": SPECIAL_TOKENS, } with open(OUT_DIR / "tokenizer_training_report.json", "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) subsets_md = "\n".join(f"- `{s}`" for s in SUBSETS) specials_md = "\n".join(f"- `{t}`" for t in SPECIAL_TOKENS) readme = f"""--- library_name: tokenizers language: - tr tags: - turkish - tokenizer - byte-level-bpe - rslm - think --- # RSLM Tokenizer 65K CPU-safe Byte-Level BPE tokenizer for RSLM. ## Training data Dataset: `{DATASET_ID}` Subsets: {subsets_md} Column: `{TEXT_COLUMN}` Target estimated tokens: `{TARGET_TOTAL_EST_TOKENS:,}` total, approximately `{TARGET_PER_SUBSET_EST_TOKENS:,}` per subset. ## Vocab - Requested vocab size: `{VOCAB_SIZE:,}` - Actual vocab size: `{tokenizer.get_vocab_size():,}` - BPE min frequency: `{BPE_MIN_FREQUENCY}` ## Special tokens {specials_md} """ with open(OUT_DIR / "README.md", "w", encoding="utf-8") as f: f.write(readme) log("Saved files:") for p in sorted(OUT_DIR.iterdir()): log(f" - {p.name} ({p.stat().st_size / 1024 / 1024:.3f} MB)") if PUSH_TO_HUB: log("Creating/updating model repo and uploading tokenizer files...") create_repo(repo_id=REPO_ID, token=HF_TOKEN, repo_type="model", exist_ok=True) upload_folder( repo_id=REPO_ID, folder_path=str(OUT_DIR), token=HF_TOKEN, repo_type="model", commit_message="Add 65K RSLM tokenizer trained on BellaTurca from Space", ignore_patterns=["training.log", "status.json"], ) _state["uploaded"] = True log(f"Uploaded tokenizer to: https://huggingface.co/{REPO_ID}") _state["finished"] = True write_status(phase="finished", stats=stats) except Exception as e: _state["error"] = repr(e) log("ERROR: " + repr(e)) log(traceback.format_exc()) write_status(phase="error") finally: _state["running"] = False def start_training(): if _state["running"]: return "Zaten çalışıyor.\n\n" + tail_log() if _state["finished"]: return "Bu oturumda eğitim bitmiş görünüyor. Yeniden başlatmak için Space'i restart edip tekrar deneyebilirsin.\n\n" + tail_log() t = threading.Thread(target=train_tokenizer, daemon=True) _state["thread"] = t t.start() return "Eğitim başlatıldı. Logları Refresh ile takip et.\n\n" + tail_log() def refresh(): saved_status = load_status_file() percent = calculate_progress_percent(saved_status) status = { "running": _state["running"], "finished": _state["finished"], "error": _state["error"], "uploaded": _state["uploaded"], "repo_id": REPO_ID, "out_dir": str(OUT_DIR), "progress_percent": round(percent, 4), } if saved_status: status.update(saved_status) status["running"] = _state["running"] status["finished"] = _state["finished"] or saved_status.get("phase") == "finished" status["error"] = _state["error"] or saved_status.get("error") status["uploaded"] = _state["uploaded"] or bool(saved_status.get("uploaded")) percent = calculate_progress_percent(status) status["progress_percent"] = round(percent, 4) return json.dumps(status, ensure_ascii=False, indent=2), tail_log(), round(percent, 4) if AUTO_START and not _state["running"] and not _state["finished"]: t = threading.Thread(target=train_tokenizer, daemon=True) _state["thread"] = t t.start() with gr.Blocks(title="RSLM Tokenizer Trainer") as demo: gr.Markdown( """ # RSLM Tokenizer Trainer Bu Space, BellaTurca üzerinden 65K Byte-Level BPE tokenizer eğitir ve sonucu Hugging Face model repo'suna push eder. Tokeni koda yazma. Space Settings → Secrets içine `HF_TOKEN` olarak ekle. """ ) with gr.Row(): start_btn = gr.Button("Start training", variant="primary") refresh_btn = gr.Button("Refresh logs") progress_box = gr.Slider( minimum=0, maximum=100, value=0, step=0.01, label="Toplam ilerleme (%)", interactive=False, ) status_box = gr.Code(label="Status", language="json") log_box = gr.Textbox(label="Logs", lines=28, max_lines=50) start_btn.click(start_training, outputs=log_box) refresh_btn.click(refresh, outputs=[status_box, log_box, progress_box]) demo.load(refresh, outputs=[status_box, log_box, progress_box]) demo.queue(default_concurrency_limit=1).launch(auth=gradio_auth)