Token Classification
Transformers
ONNX
Safetensors
English
Japanese
Chinese
bert
anime
filename-parsing
Eval Results (legacy)
Instructions to use ModerRAS/AniFileBERT with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use ModerRAS/AniFileBERT with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("token-classification", model="ModerRAS/AniFileBERT")# Load model directly from transformers import AutoTokenizer, AutoModelForTokenClassification tokenizer = AutoTokenizer.from_pretrained("ModerRAS/AniFileBERT") model = AutoModelForTokenClassification.from_pretrained("ModerRAS/AniFileBERT") - Notebooks
- Google Colab
- Kaggle
| #!/usr/bin/env python3 | |
| """ | |
| LLM-based semantic annotator for anime filenames. | |
| Replaces regex heuristics with a subagent that "reads" filenames like a human. | |
| Extracts filenames from the DMHY SQLite DB, sends batches to a subagent for | |
| annotation, and writes JSONL. | |
| Usage: | |
| python -m tools.llm_labeler --max-files 100 # annotate 100 files | |
| python -m tools.llm_labeler --min-id 689305 # resume from file ID | |
| python -m tools.llm_labeler --batch-size 15 # 15 files per subagent call | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import sqlite3 | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| DB_PATH = r"D:\WorkSpace\Python\dmhy-parser\dmhy_anime.db" | |
| OUTPUT_DIR = r"D:\WorkSpace\Android\MiruPlay\tools\anime_parser\data\dmhy" | |
| BATCH_DIR = os.path.join(OUTPUT_DIR, "llm_batches") | |
| VIDEO_EXTENSIONS = {".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".rmvb", | |
| ".ts", ".m2ts", ".webm", ".mpg", ".mpeg", ".m4v"} | |
| # Build the few-shot prompt template | |
| PROMPT_TEMPLATE = """You are an anime filename annotator. Read each filename and assign BIO labels token-by-token. | |
| LABEL SCHEME: | |
| - B-TITLE / I-TITLE: Anime title words (e.g. Sousou, no, Frieren, 葬送的, 芙莉莲) | |
| - B-SEASON: Season marker (S2, S02, Season 2, 第二季, 第N季, 第N部, 2nd Season, II when it means season 2) | |
| - B-EPISODE: Episode number (01, 06, EP01, 第01话, 第01話, #01) | |
| - B-GROUP / I-GROUP: Release group name [ANi], [SubsPlease], [LoliHouse], 【桜都字幕组】 | |
| - B-RESOLUTION: Resolution (1080p, 720P, 4K, 2160p, 1920x1080) | |
| - B-SOURCE: Source/format tag (WEB-DL, BDRip, HEVC, AAC, FLAC, CHT, CHS, GB, BIG5) | |
| - B-SPECIAL: Special type (OVA, OAD, Movie, SP, OP, ED, PV, CM) | |
| - O: Separators (space, -, _, |, ~, .) and noise | |
| IMPORTANT RULES: | |
| 1. Roman numerals (II, III, IV) at the end of a title often mean SEASON, not part of the title. | |
| Example: "Sword Art Online II" → "II" is B-SEASON, not I-TITLE | |
| Example: "Chibi Maruko-chan II" → "II" is B-SEASON (it's season 2) | |
| Exception: When the Roman numeral is PART of the franchise name (e.g. "Final Fantasy X", "Kingdom Hearts III") | |
| 2. "Season" followed by a number is a season marker. "3rd Season", "4th Season" are season markers. | |
| 3. Numbers that appear between the title and episode number are likely season numbers. | |
| Example: "Isekai Nonbiri Nouka 2 - 05" → "2" is B-SEASON | |
| 4. Bracketed items at the START are usually GROUP names. | |
| Bracketed items at the END are usually metadata (SOURCE, RESOLUTION). | |
| 5. Chinese markers like 第2季, 第二季, 第二部 are SEASON markers. | |
| 第01话, 第01話 are EPISODE markers. | |
| 6. Read the filename holistically - use your understanding of what the anime is about | |
| to determine if something is a title word or a technical marker. | |
| Return your answer as a JSON object with a "results" array. Each result has: | |
| "file_id": integer, | |
| "filename": string, | |
| "tokens": list of strings (the tokenized filename), | |
| "labels": list of strings (one BIO label per token) | |
| Tokenize carefully: | |
| - Keep bracket content as single tokens: [ANi], [1080P], [WEB-DL] | |
| - Chinese/Japanese characters: each character is its own token | |
| - English words: keep as whole words | |
| - Numbers: keep as single tokens | |
| - Separators (space, -, _, |, ~, ., etc.): each is its own token with label O | |
| FILENAMES TO ANNOTATE: | |
| {filenames_json} | |
| Return ONLY valid JSON. No markdown. No explanation. Just the JSON object. | |
| """ | |
| def get_basename_stem(filename: str) -> str: | |
| """Extract filename stem without extension.""" | |
| basename = re.split(r"[\\/]", filename)[-1].strip() | |
| stem, ext = os.path.splitext(basename) | |
| return stem.strip(), ext.lower() | |
| def get_skipped_filenames(min_id: int, max_count: int) -> list[tuple]: | |
| """ | |
| Get filenames from DB that are video files. | |
| Prioritizes files with Roman numerals or ordinal season patterns | |
| that regex handled poorly, then takes random samples. | |
| """ | |
| conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30) | |
| conn.execute("PRAGMA query_only=ON") | |
| try: | |
| cursor = conn.execute( | |
| "SELECT id, filename FROM files WHERE id >= ? AND id < ? + ? AND filename IS NOT NULL ORDER BY id", | |
| (min_id, min_id, max_count * 3) | |
| ) | |
| results = [] | |
| roman_results = [] | |
| for row in cursor: | |
| stem, ext = get_basename_stem(row[1]) | |
| if ext not in VIDEO_EXTENSIONS: | |
| continue | |
| if re.search(r'\b(ii|iii|iv)\b', stem, re.I) or re.search(r'\d+(?:st|nd|rd|th)\s+[Ss]eason', stem): | |
| roman_results.append((row[0], stem)) | |
| else: | |
| results.append((row[0], stem)) | |
| if len(results) + len(roman_results) >= max_count: | |
| break | |
| # Prioritize Roman numeral cases, fill rest with normal cases | |
| final = roman_results[:max_count] | |
| remaining = max_count - len(final) | |
| if remaining > 0: | |
| final.extend(results[:remaining]) | |
| return final | |
| finally: | |
| conn.close() | |
| def format_batch_prompt(files: list[tuple]) -> str: | |
| """Format filenames for the subagent prompt.""" | |
| entries = [] | |
| for fid, stem in files: | |
| entries.append({"file_id": fid, "filename": stem}) | |
| return json.dumps(entries, ensure_ascii=False, indent=2) | |
| def parse_subagent_output(raw_output: str) -> list[dict]: | |
| """Parse and validate subagent output.""" | |
| # Try to extract JSON from the output | |
| # First try direct JSON parse | |
| text = raw_output.strip() | |
| # Remove markdown code fences if present | |
| text = re.sub(r'^```(?:json)?\s*', '', text) | |
| text = re.sub(r'\s*```$', '', text) | |
| try: | |
| data = json.loads(text) | |
| except json.JSONDecodeError: | |
| # Try to find JSON block | |
| match = re.search(r'\{[\s\S]*"results"[\s\S]*\}', text) | |
| if match: | |
| try: | |
| data = json.loads(match.group()) | |
| except json.JSONDecodeError: | |
| return None | |
| else: | |
| return None | |
| if not isinstance(data, dict) or "results" not in data: | |
| return None | |
| results = data["results"] | |
| if not isinstance(results, list): | |
| return None | |
| # Validate each result | |
| valid = [] | |
| for r in results: | |
| if not isinstance(r, dict): | |
| continue | |
| if "file_id" not in r or "tokens" not in r or "labels" not in r: | |
| continue | |
| if len(r["tokens"]) != len(r["labels"]): | |
| continue | |
| if not r["tokens"]: | |
| continue | |
| valid.append(r) | |
| return valid if valid else None | |
| def save_batch_results(results: list[dict], batch_num: int): | |
| """Save batch results to a temp JSONL file.""" | |
| os.makedirs(BATCH_DIR, exist_ok=True) | |
| path = os.path.join(BATCH_DIR, f"batch_{batch_num:05d}.jsonl") | |
| with open(path, "w", encoding="utf-8") as f: | |
| for r in results: | |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") | |
| return path | |
| def main(): | |
| parser = argparse.ArgumentParser(description="LLM anime filename annotator") | |
| parser.add_argument("--max-files", type=int, default=100, help="Max files to annotate") | |
| parser.add_argument("--min-id", type=int, default=1, help="Starting file ID") | |
| parser.add_argument("--batch-size", type=int, default=15, help="Files per subagent call") | |
| parser.add_argument("--output", default=os.path.join(OUTPUT_DIR, "dmhy_weak_llm.jsonl"), | |
| help="Output JSONL path") | |
| args = parser.parse_args() | |
| print(f"LLM Anime Filename Annotator") | |
| print(f" max-files: {args.max_files}") | |
| print(f" batch-size: {args.batch_size}") | |
| print(f" output: {args.output}") | |
| print() | |
| all_files = get_skipped_filenames(args.min_id, args.max_files) | |
| print(f"Got {len(all_files)} filenames to annotate (video files)") | |
| if not all_files: | |
| print("No files to annotate. Exiting.") | |
| return | |
| # Process in batches | |
| all_results = [] | |
| batch_count = (len(all_files) + args.batch_size - 1) // args.batch_size | |
| for batch_idx in range(batch_count): | |
| start = batch_idx * args.batch_size | |
| end = min(start + args.batch_size, len(all_files)) | |
| batch = all_files[start:end] | |
| prompt = PROMPT_TEMPLATE.format( | |
| filenames_json=format_batch_prompt(batch) | |
| ) | |
| # Write prompt to temp file for subagent reference | |
| prompt_path = os.path.join(tempfile.gettempdir(), f"llm_labeler_batch_{batch_idx:05d}.txt") | |
| with open(prompt_path, "w", encoding="utf-8") as f: | |
| f.write(prompt) | |
| f.write(f"\n\nReturn ONLY valid JSON output for these {len(batch)} filenames.") | |
| print(f"\nBatch {batch_idx + 1}/{batch_count} ({len(batch)} files)") | |
| print(f" Prompt saved to {prompt_path}") | |
| print(f" Files: {', '.join(fn for _, fn in batch)}") | |
| # The subagent will need to be invoked externally | |
| # For now, save the prompt and create a run script | |
| with open(os.path.join(BATCH_DIR, f"prompt_{batch_idx:05d}.txt"), "w", encoding="utf-8") as f: | |
| f.write(prompt) | |
| # Create a summary for the orchestrator | |
| summary_path = os.path.join(BATCH_DIR, "_summary.json") | |
| summary = { | |
| "total_files": len(all_files), | |
| "batches": batch_count, | |
| "batch_size": args.batch_size, | |
| "min_id": args.min_id, | |
| "prompt_file_prefix": "prompt_", | |
| "output_file": args.output, | |
| "instructions": "For each prompt_NNNNN.txt file, call task(category='deep', load_skills=[], prompt=contents_of_file) and save the JSON result to batch_NNNNN.jsonl", | |
| } | |
| with open(summary_path, "w", encoding="utf-8") as f: | |
| json.dump(summary, f, ensure_ascii=False, indent=2) | |
| print(f"\n{'='*60}") | |
| print(f"PROMPTS GENERATED: {batch_count} batches") | |
| print(f"Total files: {len(all_files)}") | |
| print(f"Batch directory: {BATCH_DIR}") | |
| print(f"{'='*60}") | |
| print() | |
| print("NEXT: For each prompt file, invoke a subagent with the prompt,") | |
| print("validate the JSON output, and save to batch_NNNNN.jsonl.") | |
| print("Then run: python -m tools.llm_labeler --merge") | |
| print() | |
| if __name__ == "__main__": | |
| main() | |