# -*- coding: utf-8 -*- """Codex-friendly Google Colab runner for AniFileBERT training. Typical Colab usage: python colab_train.py --config colab/configs/dmhy_regex_finetune.json This script keeps the Colab side reproducible by putting run parameters in JSON profiles. It can clone/update the repo, mount Drive, install dependencies, train, optionally export ONNX, run an inference smoke check, and write a run manifest that Codex can inspect later. """ from __future__ import annotations import argparse import copy import datetime as dt import json import os from pathlib import Path import shlex import shutil import subprocess import sys import traceback from typing import Any, Mapping, Sequence import urllib.request DEFAULT_CONFIG: dict[str, Any] = { "name": "dmhy-regex-finetune", "repo_url": "https://huggingface.co/ModerRAS/AniFileBERT", "repo_ref": "main", "repo_dir": "/content/AniFileBERT", "drive_root": "/content/drive/MyDrive/AniFileBERT", "mount_drive": True, "pull": True, "install": { "requirements": True, "git_lfs": True, "extra_packages": [], }, "training": { "tokenizer": "regex", "data_file": "datasets/AnimeName/dmhy_weak.jsonl", "vocab_file": "datasets/AnimeName/vocab.json", "save_dir": "{drive_root}/checkpoints/{name}", "init_model_dir": ".", "epochs": 1, "batch_size": 128, "learning_rate": 0.0003, "warmup_steps": 300, "train_split": 0.9, "max_seq_length": 64, "seed": 42, "limit_samples": None, "rebuild_vocab": False, "max_vocab_size": None, "resume_from_checkpoint": "auto", "checkpoint_steps": 1000, "save_total_limit": 3, "cpu": False, "no_shuffle": False, "extra_args": [], }, "export": { "enabled": True, "required": False, "output": "{save_dir}/exports/anime_filename_parser.onnx", "max_length": "{max_seq_length}", "sample": "Witch.Hat.Atelier.S01E07.1080p.NF.WEB-DL.JPN.AAC2.0.H.264.MSubs-ToonsHub", "android_assets_dir": None, }, "smoke": { "enabled": True, "required": True, "sample": "Witch.Hat.Atelier.S01E07.1080p.NF.WEB-DL.JPN.AAC2.0.H.264.MSubs-ToonsHub", }, "artifacts": { "manifest": "{save_dir}/colab_run_manifest.json", "latest_manifest": "{drive_root}/last_run_manifest.json", }, } COMMAND_LOG: list[dict[str, Any]] = [] class SafeFormatDict(dict): def __missing__(self, key: str) -> str: return "{" + key + "}" def utc_now() -> str: return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def deep_merge(base: Mapping[str, Any], override: Mapping[str, Any]) -> dict[str, Any]: merged = copy.deepcopy(dict(base)) for key, value in override.items(): if isinstance(value, Mapping) and isinstance(merged.get(key), Mapping): merged[key] = deep_merge(merged[key], value) else: merged[key] = copy.deepcopy(value) return merged def render_templates(value: Any, context: Mapping[str, Any]) -> Any: if isinstance(value, str): return value.format_map(SafeFormatDict(context)) if isinstance(value, list): return [render_templates(item, context) for item in value] if isinstance(value, dict): return {key: render_templates(item, context) for key, item in value.items()} return value def command_text(args: str | Sequence[Any]) -> str: if isinstance(args, str): return args return " ".join(shlex.quote(str(arg)) for arg in args) def run( args: str | Sequence[Any], *, cwd: str | os.PathLike[str] | None = None, check: bool = True, dry_run: bool = False, ) -> int: text = command_text(args) entry: dict[str, Any] = { "cmd": text, "cwd": os.fspath(cwd) if cwd is not None else None, "started_at": utc_now(), "dry_run": dry_run, } COMMAND_LOG.append(entry) print(f"\n$ {text}") if dry_run: entry["returncode"] = 0 entry["finished_at"] = utc_now() return 0 proc = subprocess.Popen( args, cwd=cwd, shell=isinstance(args, str), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", bufsize=1, ) assert proc.stdout is not None for line in proc.stdout: print(line, end="") proc.wait() entry["returncode"] = proc.returncode entry["finished_at"] = utc_now() if check and proc.returncode != 0: raise RuntimeError(f"Command failed with exit code {proc.returncode}: {text}") return proc.returncode def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run AniFileBERT training in Colab") parser.add_argument("--config", help="JSON profile path or URL") parser.add_argument("--profile", help="Profile name under colab/configs without .json") parser.add_argument("--repo-url", help="Override repository URL") parser.add_argument("--repo-ref", help="Override branch, tag, or commit to checkout") parser.add_argument("--repo-dir", help="Override Colab repository directory") parser.add_argument("--drive-root", help="Override Google Drive output root") parser.add_argument("--save-dir", help="Override checkpoint output directory") parser.add_argument("--epochs", type=float, help="Override training epochs") parser.add_argument("--batch-size", type=int, help="Override per-device batch size") parser.add_argument("--learning-rate", type=float, help="Override learning rate") parser.add_argument("--warmup-steps", type=int, help="Override warmup steps") parser.add_argument("--limit-samples", type=int, help="Use only the first N dataset rows") parser.add_argument("--skip-install", action="store_true", help="Do not install pip or git-lfs dependencies") parser.add_argument("--skip-export", action="store_true", help="Do not run ONNX export") parser.add_argument("--skip-smoke", action="store_true", help="Do not run inference smoke check") parser.add_argument("--no-mount-drive", action="store_true", help="Do not mount Google Drive") parser.add_argument("--no-pull", action="store_true", help="Do not pull an existing checkout") parser.add_argument("--dry-run", action="store_true", help="Print commands and write no training outputs") parser.add_argument("--print-config", action="store_true", help="Print resolved config before running") return parser.parse_args() def load_json_source(source: str | None, *, required: bool) -> dict[str, Any]: if not source: return {} if source.startswith(("http://", "https://")): with urllib.request.urlopen(source) as response: return json.loads(response.read().decode("utf-8")) candidates = [Path(source), Path(__file__).resolve().parent / source] for candidate in candidates: if candidate.is_file(): return json.loads(candidate.read_text(encoding="utf-8")) if required: raise FileNotFoundError(f"Config file not found: {source}") return {} def load_config(args: argparse.Namespace) -> dict[str, Any]: config_source = args.config required = bool(args.config) if config_source is None and args.profile: config_source = os.fspath(Path("colab") / "configs" / f"{args.profile}.json") required = True profile_config = load_json_source(config_source, required=required) config = deep_merge(DEFAULT_CONFIG, profile_config) if args.repo_url: config["repo_url"] = args.repo_url if args.repo_ref: config["repo_ref"] = args.repo_ref if args.repo_dir: config["repo_dir"] = args.repo_dir if args.drive_root: config["drive_root"] = args.drive_root if args.no_mount_drive: config["mount_drive"] = False if args.no_pull: config["pull"] = False if args.skip_install: config["install"]["requirements"] = False config["install"]["git_lfs"] = False config["install"]["extra_packages"] = [] if args.skip_export: config["export"]["enabled"] = False if args.skip_smoke: config["smoke"]["enabled"] = False training = config["training"] for arg_name, key in [ ("save_dir", "save_dir"), ("epochs", "epochs"), ("batch_size", "batch_size"), ("learning_rate", "learning_rate"), ("warmup_steps", "warmup_steps"), ("limit_samples", "limit_samples"), ]: value = getattr(args, arg_name) if value is not None: training[key] = value return resolve_config(config) def resolve_config(config: dict[str, Any]) -> dict[str, Any]: context: dict[str, Any] = { "name": config["name"], "repo_url": config["repo_url"], "repo_ref": config.get("repo_ref") or "", "repo_dir": config["repo_dir"], "drive_root": config["drive_root"], } training = render_templates(config["training"], context) context.update(training) if not training.get("save_dir"): training["save_dir"] = os.path.join(config["drive_root"], "checkpoints", config["name"]) training = render_templates(training, {**context, **training}) context.update(training) context["save_dir"] = training["save_dir"] context["final_model_dir"] = os.path.join(training["save_dir"], "final") resolved = copy.deepcopy(config) resolved["training"] = training resolved["export"] = render_templates(config["export"], context) resolved["smoke"] = render_templates(config["smoke"], context) resolved["artifacts"] = render_templates(config["artifacts"], context) return resolved def maybe_mount_drive(config: Mapping[str, Any]) -> None: if not config.get("mount_drive", True): print("Google Drive mount disabled.") return try: from google.colab import drive # type: ignore except Exception: print("[WARN] google.colab is unavailable; skipping Drive mount.") return print("Mounting Google Drive...") drive.mount("/content/drive") def install_git_lfs_if_needed(config: Mapping[str, Any], *, dry_run: bool) -> None: if not config.get("install", {}).get("git_lfs", True): return if shutil.which("git-lfs"): run(["git", "lfs", "install"], check=False, dry_run=dry_run) return if Path("/content").exists(): print("Installing git-lfs for Hugging Face model artifacts...") run(["apt-get", "update"], check=False, dry_run=dry_run) run(["apt-get", "install", "-y", "git-lfs"], dry_run=dry_run) run(["git", "lfs", "install"], check=False, dry_run=dry_run) else: print("[WARN] git-lfs not found. Existing LFS pointers may not contain model weights.") def is_git_repo(path: Path) -> bool: return (path / ".git").exists() def prepare_repo(config: Mapping[str, Any], *, dry_run: bool) -> Path: repo_dir = Path(config["repo_dir"]) repo_url = config["repo_url"] repo_ref = config.get("repo_ref") if not is_git_repo(repo_dir): if repo_dir.exists() and any(repo_dir.iterdir()): raise RuntimeError(f"{repo_dir} exists but is not a git checkout") repo_dir.parent.mkdir(parents=True, exist_ok=True) run(["git", "clone", "--recursive", repo_url, os.fspath(repo_dir)], dry_run=dry_run) else: print(f"Using existing repository checkout: {repo_dir}") if repo_ref: run(["git", "fetch", "--all", "--tags"], cwd=repo_dir, check=False, dry_run=dry_run) run(["git", "checkout", str(repo_ref)], cwd=repo_dir, dry_run=dry_run) if config.get("pull", True): run(["git", "pull", "--ff-only"], cwd=repo_dir, check=False, dry_run=dry_run) run(["git", "submodule", "update", "--init", "--recursive"], cwd=repo_dir, dry_run=dry_run) if shutil.which("git-lfs"): run(["git", "lfs", "pull"], cwd=repo_dir, check=False, dry_run=dry_run) return repo_dir def install_python_deps(config: Mapping[str, Any], repo_dir: Path, *, dry_run: bool) -> None: install = config.get("install", {}) if install.get("requirements", True): run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], cwd=repo_dir, dry_run=dry_run) for package in install.get("extra_packages", []): run([sys.executable, "-m", "pip", "install", str(package)], cwd=repo_dir, dry_run=dry_run) def verify_runtime(repo_dir: Path, *, dry_run: bool) -> None: run(["nvidia-smi"], cwd=repo_dir, check=False, dry_run=dry_run) run( [ sys.executable, "-c", "import torch; print(f'PyTorch {torch.__version__}, CUDA available: {torch.cuda.is_available()}')", ], cwd=repo_dir, check=False, dry_run=dry_run, ) def add_arg(cmd: list[str], flag: str, value: Any) -> None: if value is None or value is False: return if value is True: cmd.append(flag) else: cmd.extend([flag, str(value)]) def build_train_command(training: Mapping[str, Any]) -> list[str]: cmd = [sys.executable, "train.py"] for key, flag in [ ("tokenizer", "--tokenizer"), ("data_file", "--data-file"), ("vocab_file", "--vocab-file"), ("save_dir", "--save-dir"), ("init_model_dir", "--init-model-dir"), ("epochs", "--epochs"), ("batch_size", "--batch-size"), ("learning_rate", "--learning-rate"), ("warmup_steps", "--warmup-steps"), ("train_split", "--train-split"), ("max_seq_length", "--max-seq-length"), ("seed", "--seed"), ("limit_samples", "--limit-samples"), ("max_vocab_size", "--max-vocab-size"), ("resume_from_checkpoint", "--resume-from-checkpoint"), ("checkpoint_steps", "--checkpoint-steps"), ("save_total_limit", "--save-total-limit"), ]: add_arg(cmd, flag, training.get(key)) add_arg(cmd, "--rebuild-vocab", training.get("rebuild_vocab")) add_arg(cmd, "--cpu", training.get("cpu")) add_arg(cmd, "--no-shuffle", training.get("no_shuffle")) cmd.extend(str(arg) for arg in training.get("extra_args", [])) return cmd def run_training(config: Mapping[str, Any], repo_dir: Path, *, dry_run: bool) -> None: training = config["training"] if not dry_run: Path(training["save_dir"]).mkdir(parents=True, exist_ok=True) run(build_train_command(training), cwd=repo_dir, dry_run=dry_run) def run_export(config: Mapping[str, Any], repo_dir: Path, *, dry_run: bool) -> None: export = config["export"] if not export.get("enabled", True): print("ONNX export disabled.") return cmd = [ sys.executable, "export_onnx.py", "--model-dir", os.path.join(config["training"]["save_dir"], "final"), "--output", export["output"], "--max-length", str(export["max_length"]), ] add_arg(cmd, "--sample", export.get("sample")) add_arg(cmd, "--android-assets-dir", export.get("android_assets_dir")) try: run(cmd, cwd=repo_dir, dry_run=dry_run) except Exception: if export.get("required", False): raise print("[WARN] ONNX export failed, but export.required is false.") traceback.print_exc() def run_smoke(config: Mapping[str, Any], repo_dir: Path, *, dry_run: bool) -> None: smoke = config["smoke"] if not smoke.get("enabled", True): print("Inference smoke check disabled.") return cmd = [ sys.executable, "inference.py", "--model-dir", os.path.join(config["training"]["save_dir"], "final"), smoke["sample"], ] try: run(cmd, cwd=repo_dir, dry_run=dry_run) except Exception: if smoke.get("required", True): raise print("[WARN] Smoke check failed, but smoke.required is false.") traceback.print_exc() def git_commit(repo_dir: Path, *, dry_run: bool) -> str | None: if dry_run: return None try: return subprocess.check_output( ["git", "rev-parse", "HEAD"], cwd=repo_dir, text=True, encoding="utf-8", errors="replace", ).strip() except Exception: return None def write_json(path: str | os.PathLike[str], data: Mapping[str, Any], *, dry_run: bool) -> None: print(f"Writing manifest: {path}") if dry_run: return output_path = Path(path) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") def write_manifests( config: Mapping[str, Any], repo_dir: Path, *, status: str, started_at: str, error: str | None, dry_run: bool, ) -> None: save_dir = config["training"]["save_dir"] manifest = { "status": status, "name": config["name"], "started_at": started_at, "finished_at": utc_now(), "repo_url": config["repo_url"], "repo_ref": config.get("repo_ref"), "repo_commit": git_commit(repo_dir, dry_run=dry_run), "repo_dir": os.fspath(repo_dir), "save_dir": save_dir, "final_model_dir": os.path.join(save_dir, "final"), "onnx_output": config["export"].get("output") if config["export"].get("enabled") else None, "config": config, "commands": COMMAND_LOG, "error": error, } artifacts = config["artifacts"] write_json(artifacts["manifest"], manifest, dry_run=dry_run) if artifacts.get("latest_manifest"): write_json(artifacts["latest_manifest"], manifest, dry_run=dry_run) def main() -> None: args = parse_args() started_at = utc_now() config = load_config(args) if args.print_config: print(json.dumps(config, ensure_ascii=False, indent=2)) repo_dir = Path(config["repo_dir"]) status = "failed" error: str | None = None try: maybe_mount_drive(config) install_git_lfs_if_needed(config, dry_run=args.dry_run) repo_dir = prepare_repo(config, dry_run=args.dry_run) install_python_deps(config, repo_dir, dry_run=args.dry_run) verify_runtime(repo_dir, dry_run=args.dry_run) run_training(config, repo_dir, dry_run=args.dry_run) run_export(config, repo_dir, dry_run=args.dry_run) run_smoke(config, repo_dir, dry_run=args.dry_run) status = "success" except Exception as exc: error = f"{type(exc).__name__}: {exc}" raise finally: write_manifests(config, repo_dir, status=status, started_at=started_at, error=error, dry_run=args.dry_run) print("\nDone.") print(f"Final model: {os.path.join(config['training']['save_dir'], 'final')}") print(f"Manifest: {config['artifacts']['manifest']}") if __name__ == "__main__": main()