AniFileBERT / colab_train.py
ModerRAS's picture
Add Codex Colab training workflow
e458112
raw
history blame
19.2 kB
# -*- coding: utf-8 -*-
"""Codex-friendly Google Colab runner for AniFileBERT training.
Typical Colab usage:
python colab_train.py --config colab/configs/dmhy_regex_finetune.json
This script keeps the Colab side reproducible by putting run parameters in JSON
profiles. It can clone/update the repo, mount Drive, install dependencies,
train, optionally export ONNX, run an inference smoke check, and write a run
manifest that Codex can inspect later.
"""
from __future__ import annotations
import argparse
import copy
import datetime as dt
import json
import os
from pathlib import Path
import shlex
import shutil
import subprocess
import sys
import traceback
from typing import Any, Mapping, Sequence
import urllib.request
DEFAULT_CONFIG: dict[str, Any] = {
"name": "dmhy-regex-finetune",
"repo_url": "https://huggingface.co/ModerRAS/AniFileBERT",
"repo_ref": "main",
"repo_dir": "/content/AniFileBERT",
"drive_root": "/content/drive/MyDrive/AniFileBERT",
"mount_drive": True,
"pull": True,
"install": {
"requirements": True,
"git_lfs": True,
"extra_packages": [],
},
"training": {
"tokenizer": "regex",
"data_file": "datasets/AnimeName/dmhy_weak.jsonl",
"vocab_file": "datasets/AnimeName/vocab.json",
"save_dir": "{drive_root}/checkpoints/{name}",
"init_model_dir": ".",
"epochs": 1,
"batch_size": 128,
"learning_rate": 0.0003,
"warmup_steps": 300,
"train_split": 0.9,
"max_seq_length": 64,
"seed": 42,
"limit_samples": None,
"rebuild_vocab": False,
"max_vocab_size": None,
"resume_from_checkpoint": "auto",
"checkpoint_steps": 1000,
"save_total_limit": 3,
"cpu": False,
"no_shuffle": False,
"extra_args": [],
},
"export": {
"enabled": True,
"required": False,
"output": "{save_dir}/exports/anime_filename_parser.onnx",
"max_length": "{max_seq_length}",
"sample": "Witch.Hat.Atelier.S01E07.1080p.NF.WEB-DL.JPN.AAC2.0.H.264.MSubs-ToonsHub",
"android_assets_dir": None,
},
"smoke": {
"enabled": True,
"required": True,
"sample": "Witch.Hat.Atelier.S01E07.1080p.NF.WEB-DL.JPN.AAC2.0.H.264.MSubs-ToonsHub",
},
"artifacts": {
"manifest": "{save_dir}/colab_run_manifest.json",
"latest_manifest": "{drive_root}/last_run_manifest.json",
},
}
COMMAND_LOG: list[dict[str, Any]] = []
class SafeFormatDict(dict):
def __missing__(self, key: str) -> str:
return "{" + key + "}"
def utc_now() -> str:
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def deep_merge(base: Mapping[str, Any], override: Mapping[str, Any]) -> dict[str, Any]:
merged = copy.deepcopy(dict(base))
for key, value in override.items():
if isinstance(value, Mapping) and isinstance(merged.get(key), Mapping):
merged[key] = deep_merge(merged[key], value)
else:
merged[key] = copy.deepcopy(value)
return merged
def render_templates(value: Any, context: Mapping[str, Any]) -> Any:
if isinstance(value, str):
return value.format_map(SafeFormatDict(context))
if isinstance(value, list):
return [render_templates(item, context) for item in value]
if isinstance(value, dict):
return {key: render_templates(item, context) for key, item in value.items()}
return value
def command_text(args: str | Sequence[Any]) -> str:
if isinstance(args, str):
return args
return " ".join(shlex.quote(str(arg)) for arg in args)
def run(
args: str | Sequence[Any],
*,
cwd: str | os.PathLike[str] | None = None,
check: bool = True,
dry_run: bool = False,
) -> int:
text = command_text(args)
entry: dict[str, Any] = {
"cmd": text,
"cwd": os.fspath(cwd) if cwd is not None else None,
"started_at": utc_now(),
"dry_run": dry_run,
}
COMMAND_LOG.append(entry)
print(f"\n$ {text}")
if dry_run:
entry["returncode"] = 0
entry["finished_at"] = utc_now()
return 0
proc = subprocess.Popen(
args,
cwd=cwd,
shell=isinstance(args, str),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
)
assert proc.stdout is not None
for line in proc.stdout:
print(line, end="")
proc.wait()
entry["returncode"] = proc.returncode
entry["finished_at"] = utc_now()
if check and proc.returncode != 0:
raise RuntimeError(f"Command failed with exit code {proc.returncode}: {text}")
return proc.returncode
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run AniFileBERT training in Colab")
parser.add_argument("--config", help="JSON profile path or URL")
parser.add_argument("--profile", help="Profile name under colab/configs without .json")
parser.add_argument("--repo-url", help="Override repository URL")
parser.add_argument("--repo-ref", help="Override branch, tag, or commit to checkout")
parser.add_argument("--repo-dir", help="Override Colab repository directory")
parser.add_argument("--drive-root", help="Override Google Drive output root")
parser.add_argument("--save-dir", help="Override checkpoint output directory")
parser.add_argument("--epochs", type=float, help="Override training epochs")
parser.add_argument("--batch-size", type=int, help="Override per-device batch size")
parser.add_argument("--learning-rate", type=float, help="Override learning rate")
parser.add_argument("--warmup-steps", type=int, help="Override warmup steps")
parser.add_argument("--limit-samples", type=int, help="Use only the first N dataset rows")
parser.add_argument("--skip-install", action="store_true", help="Do not install pip or git-lfs dependencies")
parser.add_argument("--skip-export", action="store_true", help="Do not run ONNX export")
parser.add_argument("--skip-smoke", action="store_true", help="Do not run inference smoke check")
parser.add_argument("--no-mount-drive", action="store_true", help="Do not mount Google Drive")
parser.add_argument("--no-pull", action="store_true", help="Do not pull an existing checkout")
parser.add_argument("--dry-run", action="store_true", help="Print commands and write no training outputs")
parser.add_argument("--print-config", action="store_true", help="Print resolved config before running")
return parser.parse_args()
def load_json_source(source: str | None, *, required: bool) -> dict[str, Any]:
if not source:
return {}
if source.startswith(("http://", "https://")):
with urllib.request.urlopen(source) as response:
return json.loads(response.read().decode("utf-8"))
candidates = [Path(source), Path(__file__).resolve().parent / source]
for candidate in candidates:
if candidate.is_file():
return json.loads(candidate.read_text(encoding="utf-8"))
if required:
raise FileNotFoundError(f"Config file not found: {source}")
return {}
def load_config(args: argparse.Namespace) -> dict[str, Any]:
config_source = args.config
required = bool(args.config)
if config_source is None and args.profile:
config_source = os.fspath(Path("colab") / "configs" / f"{args.profile}.json")
required = True
profile_config = load_json_source(config_source, required=required)
config = deep_merge(DEFAULT_CONFIG, profile_config)
if args.repo_url:
config["repo_url"] = args.repo_url
if args.repo_ref:
config["repo_ref"] = args.repo_ref
if args.repo_dir:
config["repo_dir"] = args.repo_dir
if args.drive_root:
config["drive_root"] = args.drive_root
if args.no_mount_drive:
config["mount_drive"] = False
if args.no_pull:
config["pull"] = False
if args.skip_install:
config["install"]["requirements"] = False
config["install"]["git_lfs"] = False
config["install"]["extra_packages"] = []
if args.skip_export:
config["export"]["enabled"] = False
if args.skip_smoke:
config["smoke"]["enabled"] = False
training = config["training"]
for arg_name, key in [
("save_dir", "save_dir"),
("epochs", "epochs"),
("batch_size", "batch_size"),
("learning_rate", "learning_rate"),
("warmup_steps", "warmup_steps"),
("limit_samples", "limit_samples"),
]:
value = getattr(args, arg_name)
if value is not None:
training[key] = value
return resolve_config(config)
def resolve_config(config: dict[str, Any]) -> dict[str, Any]:
context: dict[str, Any] = {
"name": config["name"],
"repo_url": config["repo_url"],
"repo_ref": config.get("repo_ref") or "",
"repo_dir": config["repo_dir"],
"drive_root": config["drive_root"],
}
training = render_templates(config["training"], context)
context.update(training)
if not training.get("save_dir"):
training["save_dir"] = os.path.join(config["drive_root"], "checkpoints", config["name"])
training = render_templates(training, {**context, **training})
context.update(training)
context["save_dir"] = training["save_dir"]
context["final_model_dir"] = os.path.join(training["save_dir"], "final")
resolved = copy.deepcopy(config)
resolved["training"] = training
resolved["export"] = render_templates(config["export"], context)
resolved["smoke"] = render_templates(config["smoke"], context)
resolved["artifacts"] = render_templates(config["artifacts"], context)
return resolved
def maybe_mount_drive(config: Mapping[str, Any]) -> None:
if not config.get("mount_drive", True):
print("Google Drive mount disabled.")
return
try:
from google.colab import drive # type: ignore
except Exception:
print("[WARN] google.colab is unavailable; skipping Drive mount.")
return
print("Mounting Google Drive...")
drive.mount("/content/drive")
def install_git_lfs_if_needed(config: Mapping[str, Any], *, dry_run: bool) -> None:
if not config.get("install", {}).get("git_lfs", True):
return
if shutil.which("git-lfs"):
run(["git", "lfs", "install"], check=False, dry_run=dry_run)
return
if Path("/content").exists():
print("Installing git-lfs for Hugging Face model artifacts...")
run(["apt-get", "update"], check=False, dry_run=dry_run)
run(["apt-get", "install", "-y", "git-lfs"], dry_run=dry_run)
run(["git", "lfs", "install"], check=False, dry_run=dry_run)
else:
print("[WARN] git-lfs not found. Existing LFS pointers may not contain model weights.")
def is_git_repo(path: Path) -> bool:
return (path / ".git").exists()
def prepare_repo(config: Mapping[str, Any], *, dry_run: bool) -> Path:
repo_dir = Path(config["repo_dir"])
repo_url = config["repo_url"]
repo_ref = config.get("repo_ref")
if not is_git_repo(repo_dir):
if repo_dir.exists() and any(repo_dir.iterdir()):
raise RuntimeError(f"{repo_dir} exists but is not a git checkout")
repo_dir.parent.mkdir(parents=True, exist_ok=True)
run(["git", "clone", "--recursive", repo_url, os.fspath(repo_dir)], dry_run=dry_run)
else:
print(f"Using existing repository checkout: {repo_dir}")
if repo_ref:
run(["git", "fetch", "--all", "--tags"], cwd=repo_dir, check=False, dry_run=dry_run)
run(["git", "checkout", str(repo_ref)], cwd=repo_dir, dry_run=dry_run)
if config.get("pull", True):
run(["git", "pull", "--ff-only"], cwd=repo_dir, check=False, dry_run=dry_run)
run(["git", "submodule", "update", "--init", "--recursive"], cwd=repo_dir, dry_run=dry_run)
if shutil.which("git-lfs"):
run(["git", "lfs", "pull"], cwd=repo_dir, check=False, dry_run=dry_run)
return repo_dir
def install_python_deps(config: Mapping[str, Any], repo_dir: Path, *, dry_run: bool) -> None:
install = config.get("install", {})
if install.get("requirements", True):
run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], cwd=repo_dir, dry_run=dry_run)
for package in install.get("extra_packages", []):
run([sys.executable, "-m", "pip", "install", str(package)], cwd=repo_dir, dry_run=dry_run)
def verify_runtime(repo_dir: Path, *, dry_run: bool) -> None:
run(["nvidia-smi"], cwd=repo_dir, check=False, dry_run=dry_run)
run(
[
sys.executable,
"-c",
"import torch; print(f'PyTorch {torch.__version__}, CUDA available: {torch.cuda.is_available()}')",
],
cwd=repo_dir,
check=False,
dry_run=dry_run,
)
def add_arg(cmd: list[str], flag: str, value: Any) -> None:
if value is None or value is False:
return
if value is True:
cmd.append(flag)
else:
cmd.extend([flag, str(value)])
def build_train_command(training: Mapping[str, Any]) -> list[str]:
cmd = [sys.executable, "train.py"]
for key, flag in [
("tokenizer", "--tokenizer"),
("data_file", "--data-file"),
("vocab_file", "--vocab-file"),
("save_dir", "--save-dir"),
("init_model_dir", "--init-model-dir"),
("epochs", "--epochs"),
("batch_size", "--batch-size"),
("learning_rate", "--learning-rate"),
("warmup_steps", "--warmup-steps"),
("train_split", "--train-split"),
("max_seq_length", "--max-seq-length"),
("seed", "--seed"),
("limit_samples", "--limit-samples"),
("max_vocab_size", "--max-vocab-size"),
("resume_from_checkpoint", "--resume-from-checkpoint"),
("checkpoint_steps", "--checkpoint-steps"),
("save_total_limit", "--save-total-limit"),
]:
add_arg(cmd, flag, training.get(key))
add_arg(cmd, "--rebuild-vocab", training.get("rebuild_vocab"))
add_arg(cmd, "--cpu", training.get("cpu"))
add_arg(cmd, "--no-shuffle", training.get("no_shuffle"))
cmd.extend(str(arg) for arg in training.get("extra_args", []))
return cmd
def run_training(config: Mapping[str, Any], repo_dir: Path, *, dry_run: bool) -> None:
training = config["training"]
if not dry_run:
Path(training["save_dir"]).mkdir(parents=True, exist_ok=True)
run(build_train_command(training), cwd=repo_dir, dry_run=dry_run)
def run_export(config: Mapping[str, Any], repo_dir: Path, *, dry_run: bool) -> None:
export = config["export"]
if not export.get("enabled", True):
print("ONNX export disabled.")
return
cmd = [
sys.executable,
"export_onnx.py",
"--model-dir",
os.path.join(config["training"]["save_dir"], "final"),
"--output",
export["output"],
"--max-length",
str(export["max_length"]),
]
add_arg(cmd, "--sample", export.get("sample"))
add_arg(cmd, "--android-assets-dir", export.get("android_assets_dir"))
try:
run(cmd, cwd=repo_dir, dry_run=dry_run)
except Exception:
if export.get("required", False):
raise
print("[WARN] ONNX export failed, but export.required is false.")
traceback.print_exc()
def run_smoke(config: Mapping[str, Any], repo_dir: Path, *, dry_run: bool) -> None:
smoke = config["smoke"]
if not smoke.get("enabled", True):
print("Inference smoke check disabled.")
return
cmd = [
sys.executable,
"inference.py",
"--model-dir",
os.path.join(config["training"]["save_dir"], "final"),
smoke["sample"],
]
try:
run(cmd, cwd=repo_dir, dry_run=dry_run)
except Exception:
if smoke.get("required", True):
raise
print("[WARN] Smoke check failed, but smoke.required is false.")
traceback.print_exc()
def git_commit(repo_dir: Path, *, dry_run: bool) -> str | None:
if dry_run:
return None
try:
return subprocess.check_output(
["git", "rev-parse", "HEAD"],
cwd=repo_dir,
text=True,
encoding="utf-8",
errors="replace",
).strip()
except Exception:
return None
def write_json(path: str | os.PathLike[str], data: Mapping[str, Any], *, dry_run: bool) -> None:
print(f"Writing manifest: {path}")
if dry_run:
return
output_path = Path(path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def write_manifests(
config: Mapping[str, Any],
repo_dir: Path,
*,
status: str,
started_at: str,
error: str | None,
dry_run: bool,
) -> None:
save_dir = config["training"]["save_dir"]
manifest = {
"status": status,
"name": config["name"],
"started_at": started_at,
"finished_at": utc_now(),
"repo_url": config["repo_url"],
"repo_ref": config.get("repo_ref"),
"repo_commit": git_commit(repo_dir, dry_run=dry_run),
"repo_dir": os.fspath(repo_dir),
"save_dir": save_dir,
"final_model_dir": os.path.join(save_dir, "final"),
"onnx_output": config["export"].get("output") if config["export"].get("enabled") else None,
"config": config,
"commands": COMMAND_LOG,
"error": error,
}
artifacts = config["artifacts"]
write_json(artifacts["manifest"], manifest, dry_run=dry_run)
if artifacts.get("latest_manifest"):
write_json(artifacts["latest_manifest"], manifest, dry_run=dry_run)
def main() -> None:
args = parse_args()
started_at = utc_now()
config = load_config(args)
if args.print_config:
print(json.dumps(config, ensure_ascii=False, indent=2))
repo_dir = Path(config["repo_dir"])
status = "failed"
error: str | None = None
try:
maybe_mount_drive(config)
install_git_lfs_if_needed(config, dry_run=args.dry_run)
repo_dir = prepare_repo(config, dry_run=args.dry_run)
install_python_deps(config, repo_dir, dry_run=args.dry_run)
verify_runtime(repo_dir, dry_run=args.dry_run)
run_training(config, repo_dir, dry_run=args.dry_run)
run_export(config, repo_dir, dry_run=args.dry_run)
run_smoke(config, repo_dir, dry_run=args.dry_run)
status = "success"
except Exception as exc:
error = f"{type(exc).__name__}: {exc}"
raise
finally:
write_manifests(config, repo_dir, status=status, started_at=started_at, error=error, dry_run=args.dry_run)
print("\nDone.")
print(f"Final model: {os.path.join(config['training']['save_dir'], 'final')}")
print(f"Manifest: {config['artifacts']['manifest']}")
if __name__ == "__main__":
main()