AniFileBERT / tools /llm_labeler.py
ModerRAS's picture
Organize parser modules and tools
8c50d16
#!/usr/bin/env python3
"""
LLM-based semantic annotator for anime filenames.
Replaces regex heuristics with a subagent that "reads" filenames like a human.
Extracts filenames from the DMHY SQLite DB, sends batches to a subagent for
annotation, and writes JSONL.
Usage:
python -m tools.llm_labeler --max-files 100 # annotate 100 files
python -m tools.llm_labeler --min-id 689305 # resume from file ID
python -m tools.llm_labeler --batch-size 15 # 15 files per subagent call
"""
import argparse
import json
import os
import re
import sqlite3
import subprocess
import sys
import tempfile
import time
from pathlib import Path
DB_PATH = r"D:\WorkSpace\Python\dmhy-parser\dmhy_anime.db"
OUTPUT_DIR = r"D:\WorkSpace\Android\MiruPlay\tools\anime_parser\data\dmhy"
BATCH_DIR = os.path.join(OUTPUT_DIR, "llm_batches")
VIDEO_EXTENSIONS = {".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".rmvb",
".ts", ".m2ts", ".webm", ".mpg", ".mpeg", ".m4v"}
# Build the few-shot prompt template
PROMPT_TEMPLATE = """You are an anime filename annotator. Read each filename and assign BIO labels token-by-token.
LABEL SCHEME:
- B-TITLE / I-TITLE: Anime title words (e.g. Sousou, no, Frieren, 葬送的, 芙莉莲)
- B-SEASON: Season marker (S2, S02, Season 2, 第二季, 第N季, 第N部, 2nd Season, II when it means season 2)
- B-EPISODE: Episode number (01, 06, EP01, 第01话, 第01話, #01)
- B-GROUP / I-GROUP: Release group name [ANi], [SubsPlease], [LoliHouse], 【桜都字幕组】
- B-RESOLUTION: Resolution (1080p, 720P, 4K, 2160p, 1920x1080)
- B-SOURCE: Source/format tag (WEB-DL, BDRip, HEVC, AAC, FLAC, CHT, CHS, GB, BIG5)
- B-SPECIAL: Special type (OVA, OAD, Movie, SP, OP, ED, PV, CM)
- O: Separators (space, -, _, |, ~, .) and noise
IMPORTANT RULES:
1. Roman numerals (II, III, IV) at the end of a title often mean SEASON, not part of the title.
Example: "Sword Art Online II" → "II" is B-SEASON, not I-TITLE
Example: "Chibi Maruko-chan II" → "II" is B-SEASON (it's season 2)
Exception: When the Roman numeral is PART of the franchise name (e.g. "Final Fantasy X", "Kingdom Hearts III")
2. "Season" followed by a number is a season marker. "3rd Season", "4th Season" are season markers.
3. Numbers that appear between the title and episode number are likely season numbers.
Example: "Isekai Nonbiri Nouka 2 - 05" → "2" is B-SEASON
4. Bracketed items at the START are usually GROUP names.
Bracketed items at the END are usually metadata (SOURCE, RESOLUTION).
5. Chinese markers like 第2季, 第二季, 第二部 are SEASON markers.
第01话, 第01話 are EPISODE markers.
6. Read the filename holistically - use your understanding of what the anime is about
to determine if something is a title word or a technical marker.
Return your answer as a JSON object with a "results" array. Each result has:
"file_id": integer,
"filename": string,
"tokens": list of strings (the tokenized filename),
"labels": list of strings (one BIO label per token)
Tokenize carefully:
- Keep bracket content as single tokens: [ANi], [1080P], [WEB-DL]
- Chinese/Japanese characters: each character is its own token
- English words: keep as whole words
- Numbers: keep as single tokens
- Separators (space, -, _, |, ~, ., etc.): each is its own token with label O
FILENAMES TO ANNOTATE:
{filenames_json}
Return ONLY valid JSON. No markdown. No explanation. Just the JSON object.
"""
def get_basename_stem(filename: str) -> str:
"""Extract filename stem without extension."""
basename = re.split(r"[\\/]", filename)[-1].strip()
stem, ext = os.path.splitext(basename)
return stem.strip(), ext.lower()
def get_skipped_filenames(min_id: int, max_count: int) -> list[tuple]:
"""
Get filenames from DB that are video files.
Prioritizes files with Roman numerals or ordinal season patterns
that regex handled poorly, then takes random samples.
"""
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30)
conn.execute("PRAGMA query_only=ON")
try:
cursor = conn.execute(
"SELECT id, filename FROM files WHERE id >= ? AND id < ? + ? AND filename IS NOT NULL ORDER BY id",
(min_id, min_id, max_count * 3)
)
results = []
roman_results = []
for row in cursor:
stem, ext = get_basename_stem(row[1])
if ext not in VIDEO_EXTENSIONS:
continue
if re.search(r'\b(ii|iii|iv)\b', stem, re.I) or re.search(r'\d+(?:st|nd|rd|th)\s+[Ss]eason', stem):
roman_results.append((row[0], stem))
else:
results.append((row[0], stem))
if len(results) + len(roman_results) >= max_count:
break
# Prioritize Roman numeral cases, fill rest with normal cases
final = roman_results[:max_count]
remaining = max_count - len(final)
if remaining > 0:
final.extend(results[:remaining])
return final
finally:
conn.close()
def format_batch_prompt(files: list[tuple]) -> str:
"""Format filenames for the subagent prompt."""
entries = []
for fid, stem in files:
entries.append({"file_id": fid, "filename": stem})
return json.dumps(entries, ensure_ascii=False, indent=2)
def parse_subagent_output(raw_output: str) -> list[dict]:
"""Parse and validate subagent output."""
# Try to extract JSON from the output
# First try direct JSON parse
text = raw_output.strip()
# Remove markdown code fences if present
text = re.sub(r'^```(?:json)?\s*', '', text)
text = re.sub(r'\s*```$', '', text)
try:
data = json.loads(text)
except json.JSONDecodeError:
# Try to find JSON block
match = re.search(r'\{[\s\S]*"results"[\s\S]*\}', text)
if match:
try:
data = json.loads(match.group())
except json.JSONDecodeError:
return None
else:
return None
if not isinstance(data, dict) or "results" not in data:
return None
results = data["results"]
if not isinstance(results, list):
return None
# Validate each result
valid = []
for r in results:
if not isinstance(r, dict):
continue
if "file_id" not in r or "tokens" not in r or "labels" not in r:
continue
if len(r["tokens"]) != len(r["labels"]):
continue
if not r["tokens"]:
continue
valid.append(r)
return valid if valid else None
def save_batch_results(results: list[dict], batch_num: int):
"""Save batch results to a temp JSONL file."""
os.makedirs(BATCH_DIR, exist_ok=True)
path = os.path.join(BATCH_DIR, f"batch_{batch_num:05d}.jsonl")
with open(path, "w", encoding="utf-8") as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
return path
def main():
parser = argparse.ArgumentParser(description="LLM anime filename annotator")
parser.add_argument("--max-files", type=int, default=100, help="Max files to annotate")
parser.add_argument("--min-id", type=int, default=1, help="Starting file ID")
parser.add_argument("--batch-size", type=int, default=15, help="Files per subagent call")
parser.add_argument("--output", default=os.path.join(OUTPUT_DIR, "dmhy_weak_llm.jsonl"),
help="Output JSONL path")
args = parser.parse_args()
print(f"LLM Anime Filename Annotator")
print(f" max-files: {args.max_files}")
print(f" batch-size: {args.batch_size}")
print(f" output: {args.output}")
print()
all_files = get_skipped_filenames(args.min_id, args.max_files)
print(f"Got {len(all_files)} filenames to annotate (video files)")
if not all_files:
print("No files to annotate. Exiting.")
return
# Process in batches
all_results = []
batch_count = (len(all_files) + args.batch_size - 1) // args.batch_size
for batch_idx in range(batch_count):
start = batch_idx * args.batch_size
end = min(start + args.batch_size, len(all_files))
batch = all_files[start:end]
prompt = PROMPT_TEMPLATE.format(
filenames_json=format_batch_prompt(batch)
)
# Write prompt to temp file for subagent reference
prompt_path = os.path.join(tempfile.gettempdir(), f"llm_labeler_batch_{batch_idx:05d}.txt")
with open(prompt_path, "w", encoding="utf-8") as f:
f.write(prompt)
f.write(f"\n\nReturn ONLY valid JSON output for these {len(batch)} filenames.")
print(f"\nBatch {batch_idx + 1}/{batch_count} ({len(batch)} files)")
print(f" Prompt saved to {prompt_path}")
print(f" Files: {', '.join(fn for _, fn in batch)}")
# The subagent will need to be invoked externally
# For now, save the prompt and create a run script
with open(os.path.join(BATCH_DIR, f"prompt_{batch_idx:05d}.txt"), "w", encoding="utf-8") as f:
f.write(prompt)
# Create a summary for the orchestrator
summary_path = os.path.join(BATCH_DIR, "_summary.json")
summary = {
"total_files": len(all_files),
"batches": batch_count,
"batch_size": args.batch_size,
"min_id": args.min_id,
"prompt_file_prefix": "prompt_",
"output_file": args.output,
"instructions": "For each prompt_NNNNN.txt file, call task(category='deep', load_skills=[], prompt=contents_of_file) and save the JSON result to batch_NNNNN.jsonl",
}
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
print(f"\n{'='*60}")
print(f"PROMPTS GENERATED: {batch_count} batches")
print(f"Total files: {len(all_files)}")
print(f"Batch directory: {BATCH_DIR}")
print(f"{'='*60}")
print()
print("NEXT: For each prompt file, invoke a subagent with the prompt,")
print("validate the JSON output, and save to batch_NNNNN.jsonl.")
print("Then run: python -m tools.llm_labeler --merge")
print()
if __name__ == "__main__":
main()