Token Classification
Transformers
ONNX
Safetensors
English
Japanese
Chinese
bert
anime
filename-parsing
Eval Results (legacy)
Instructions to use ModerRAS/AniFileBERT with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use ModerRAS/AniFileBERT with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("token-classification", model="ModerRAS/AniFileBERT")# Load model directly from transformers import AutoTokenizer, AutoModelForTokenClassification tokenizer = AutoTokenizer.from_pretrained("ModerRAS/AniFileBERT") model = AutoModelForTokenClassification.from_pretrained("ModerRAS/AniFileBERT") - Notebooks
- Google Colab
- Kaggle
| # -*- coding: utf-8 -*- | |
| """Local client for controlling an active AniFileBERT Colab worker. | |
| The worker still has to be started manually in Colab, but once it prints a | |
| public URL and token this client lets Codex submit training jobs, tail logs, and | |
| inspect status from the local workspace. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| from pathlib import Path | |
| import sys | |
| import time | |
| from typing import Any | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| TERMINAL_STATES = {"success", "failed", "cancelled"} | |
| def load_json(path: str) -> Any: | |
| return json.loads(Path(path).read_text(encoding="utf-8")) | |
| class ColabClient: | |
| def __init__(self, base_url: str, token: str, timeout: int = 30): | |
| self.base_url = base_url.rstrip("/") | |
| self.token = token | |
| self.timeout = timeout | |
| def request(self, method: str, path: str, payload: Any | None = None) -> Any: | |
| url = self.base_url + path | |
| data = None | |
| headers = {"Authorization": f"Bearer {self.token}"} | |
| if payload is not None: | |
| data = json.dumps(payload, ensure_ascii=False).encode("utf-8") | |
| headers["Content-Type"] = "application/json; charset=utf-8" | |
| req = urllib.request.Request(url, data=data, headers=headers, method=method) | |
| try: | |
| with urllib.request.urlopen(req, timeout=self.timeout) as response: | |
| return json.loads(response.read().decode("utf-8")) | |
| except urllib.error.HTTPError as exc: | |
| body = exc.read().decode("utf-8", errors="replace") | |
| raise RuntimeError(f"{method} {url} failed: HTTP {exc.code}: {body}") from exc | |
| def health(self) -> Any: | |
| return self.request("GET", "/health") | |
| def submit(self, payload: dict[str, Any]) -> Any: | |
| return self.request("POST", "/jobs", payload) | |
| def jobs(self) -> Any: | |
| return self.request("GET", "/jobs") | |
| def status(self, job_id: str) -> Any: | |
| return self.request("GET", f"/jobs/{job_id}") | |
| def logs(self, job_id: str, tail: int) -> Any: | |
| query = urllib.parse.urlencode({"tail": tail}) | |
| return self.request("GET", f"/jobs/{job_id}/logs?{query}") | |
| def manifest(self, job_id: str) -> Any: | |
| return self.request("GET", f"/jobs/{job_id}/manifest") | |
| def cancel(self, job_id: str) -> Any: | |
| return self.request("POST", f"/jobs/{job_id}/cancel", {}) | |
| def print_json(data: Any) -> None: | |
| print(json.dumps(data, ensure_ascii=False, indent=2)) | |
| def require_connection(args: argparse.Namespace) -> ColabClient: | |
| url = args.url or os.environ.get("ANIFILEBERT_COLAB_URL") | |
| token = args.token or os.environ.get("ANIFILEBERT_COLAB_TOKEN") | |
| if not url or not token: | |
| raise SystemExit( | |
| "Set ANIFILEBERT_COLAB_URL and ANIFILEBERT_COLAB_TOKEN, " | |
| "or pass --url and --token." | |
| ) | |
| return ColabClient(url, token, timeout=args.timeout) | |
| def build_submit_payload(args: argparse.Namespace) -> dict[str, Any]: | |
| payload: dict[str, Any] = {} | |
| if args.config: | |
| payload["config"] = load_json(args.config) | |
| if args.profile: | |
| payload["profile"] = args.profile | |
| extra_args = list(args.args or []) + list(args.extra_args or []) | |
| if extra_args: | |
| payload["args"] = extra_args | |
| if not payload: | |
| payload["profile"] = "dmhy_regex_finetune" | |
| return payload | |
| def wait_for_job(client: ColabClient, job_id: str, poll: int, tail: int) -> dict[str, Any]: | |
| last_status = None | |
| while True: | |
| status = client.status(job_id) | |
| if status.get("status") != last_status: | |
| print_json(status) | |
| last_status = status.get("status") | |
| logs = client.logs(job_id, tail=tail) | |
| log_text = logs.get("log", "") | |
| if log_text: | |
| print("\n--- log tail ---") | |
| print(log_text.rstrip()) | |
| if status.get("status") in TERMINAL_STATES: | |
| return status | |
| time.sleep(poll) | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Control an active AniFileBERT Colab worker") | |
| parser.add_argument("--url", help="Worker URL, or ANIFILEBERT_COLAB_URL") | |
| parser.add_argument("--token", help="Worker token, or ANIFILEBERT_COLAB_TOKEN") | |
| parser.add_argument("--timeout", type=int, default=30) | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| subparsers.add_parser("health", help="Check worker health") | |
| subparsers.add_parser("jobs", help="List known jobs") | |
| submit = subparsers.add_parser("submit", help="Submit a training job") | |
| submit.add_argument("--config", help="Local JSON config to send to the worker") | |
| submit.add_argument("--profile", help="Remote profile name under colab/configs") | |
| submit.add_argument("--arg", dest="args", action="append", default=[], help="Extra arg for colab_train.py") | |
| submit.add_argument("--wait", action="store_true", help="Poll until the job finishes") | |
| submit.add_argument("--poll", type=int, default=60, help="Polling interval in seconds") | |
| submit.add_argument("--tail", type=int, default=80, help="Log lines to show while waiting") | |
| submit.add_argument("extra_args", nargs=argparse.REMAINDER, | |
| help="Arguments after -- are passed to colab_train.py") | |
| status = subparsers.add_parser("status", help="Show job status") | |
| status.add_argument("job_id") | |
| logs = subparsers.add_parser("logs", help="Show job logs") | |
| logs.add_argument("job_id") | |
| logs.add_argument("--tail", type=int, default=200) | |
| manifest = subparsers.add_parser("manifest", help="Show job manifest") | |
| manifest.add_argument("job_id") | |
| cancel = subparsers.add_parser("cancel", help="Cancel a running job") | |
| cancel.add_argument("job_id") | |
| return parser.parse_args() | |
| def main() -> None: | |
| args = parse_args() | |
| client = require_connection(args) | |
| if args.command == "health": | |
| print_json(client.health()) | |
| elif args.command == "jobs": | |
| print_json(client.jobs()) | |
| elif args.command == "submit": | |
| job = client.submit(build_submit_payload(args)) | |
| print_json(job) | |
| if args.wait: | |
| final_status = wait_for_job(client, job["job_id"], poll=args.poll, tail=args.tail) | |
| if final_status.get("status") != "success": | |
| sys.exit(1) | |
| elif args.command == "status": | |
| print_json(client.status(args.job_id)) | |
| elif args.command == "logs": | |
| print(client.logs(args.job_id, args.tail).get("log", ""), end="") | |
| elif args.command == "manifest": | |
| print_json(client.manifest(args.job_id)) | |
| elif args.command == "cancel": | |
| print_json(client.cancel(args.job_id)) | |
| if __name__ == "__main__": | |
| main() | |