#!/usr/bin/env python3 """ LLM-based semantic annotator for anime filenames. Replaces regex heuristics with a subagent that "reads" filenames like a human. Extracts filenames from the DMHY SQLite DB, sends batches to a subagent for annotation, and writes JSONL. Usage: python -m tools.llm_labeler --max-files 100 # annotate 100 files python -m tools.llm_labeler --min-id 689305 # resume from file ID python -m tools.llm_labeler --batch-size 15 # 15 files per subagent call """ import argparse import json import os import re import sqlite3 import subprocess import sys import tempfile import time from pathlib import Path DB_PATH = r"D:\WorkSpace\Python\dmhy-parser\dmhy_anime.db" OUTPUT_DIR = r"D:\WorkSpace\Android\MiruPlay\tools\anime_parser\data\dmhy" BATCH_DIR = os.path.join(OUTPUT_DIR, "llm_batches") VIDEO_EXTENSIONS = {".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".rmvb", ".ts", ".m2ts", ".webm", ".mpg", ".mpeg", ".m4v"} # Build the few-shot prompt template PROMPT_TEMPLATE = """You are an anime filename annotator. Read each filename and assign BIO labels token-by-token. LABEL SCHEME: - B-TITLE / I-TITLE: Anime title words (e.g. Sousou, no, Frieren, 葬送的, 芙莉莲) - B-SEASON: Season marker (S2, S02, Season 2, 第二季, 第N季, 第N部, 2nd Season, II when it means season 2) - B-EPISODE: Episode number (01, 06, EP01, 第01话, 第01話, #01) - B-GROUP / I-GROUP: Release group name [ANi], [SubsPlease], [LoliHouse], 【桜都字幕组】 - B-RESOLUTION: Resolution (1080p, 720P, 4K, 2160p, 1920x1080) - B-SOURCE: Source/format tag (WEB-DL, BDRip, HEVC, AAC, FLAC, CHT, CHS, GB, BIG5) - B-SPECIAL: Special type (OVA, OAD, Movie, SP, OP, ED, PV, CM) - O: Separators (space, -, _, |, ~, .) and noise IMPORTANT RULES: 1. Roman numerals (II, III, IV) at the end of a title often mean SEASON, not part of the title. Example: "Sword Art Online II" → "II" is B-SEASON, not I-TITLE Example: "Chibi Maruko-chan II" → "II" is B-SEASON (it's season 2) Exception: When the Roman numeral is PART of the franchise name (e.g. "Final Fantasy X", "Kingdom Hearts III") 2. "Season" followed by a number is a season marker. "3rd Season", "4th Season" are season markers. 3. Numbers that appear between the title and episode number are likely season numbers. Example: "Isekai Nonbiri Nouka 2 - 05" → "2" is B-SEASON 4. Bracketed items at the START are usually GROUP names. Bracketed items at the END are usually metadata (SOURCE, RESOLUTION). 5. Chinese markers like 第2季, 第二季, 第二部 are SEASON markers. 第01话, 第01話 are EPISODE markers. 6. Read the filename holistically - use your understanding of what the anime is about to determine if something is a title word or a technical marker. Return your answer as a JSON object with a "results" array. Each result has: "file_id": integer, "filename": string, "tokens": list of strings (the tokenized filename), "labels": list of strings (one BIO label per token) Tokenize carefully: - Keep bracket content as single tokens: [ANi], [1080P], [WEB-DL] - Chinese/Japanese characters: each character is its own token - English words: keep as whole words - Numbers: keep as single tokens - Separators (space, -, _, |, ~, ., etc.): each is its own token with label O FILENAMES TO ANNOTATE: {filenames_json} Return ONLY valid JSON. No markdown. No explanation. Just the JSON object. """ def get_basename_stem(filename: str) -> str: """Extract filename stem without extension.""" basename = re.split(r"[\\/]", filename)[-1].strip() stem, ext = os.path.splitext(basename) return stem.strip(), ext.lower() def get_skipped_filenames(min_id: int, max_count: int) -> list[tuple]: """ Get filenames from DB that are video files. Prioritizes files with Roman numerals or ordinal season patterns that regex handled poorly, then takes random samples. """ conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30) conn.execute("PRAGMA query_only=ON") try: cursor = conn.execute( "SELECT id, filename FROM files WHERE id >= ? AND id < ? + ? AND filename IS NOT NULL ORDER BY id", (min_id, min_id, max_count * 3) ) results = [] roman_results = [] for row in cursor: stem, ext = get_basename_stem(row[1]) if ext not in VIDEO_EXTENSIONS: continue if re.search(r'\b(ii|iii|iv)\b', stem, re.I) or re.search(r'\d+(?:st|nd|rd|th)\s+[Ss]eason', stem): roman_results.append((row[0], stem)) else: results.append((row[0], stem)) if len(results) + len(roman_results) >= max_count: break # Prioritize Roman numeral cases, fill rest with normal cases final = roman_results[:max_count] remaining = max_count - len(final) if remaining > 0: final.extend(results[:remaining]) return final finally: conn.close() def format_batch_prompt(files: list[tuple]) -> str: """Format filenames for the subagent prompt.""" entries = [] for fid, stem in files: entries.append({"file_id": fid, "filename": stem}) return json.dumps(entries, ensure_ascii=False, indent=2) def parse_subagent_output(raw_output: str) -> list[dict]: """Parse and validate subagent output.""" # Try to extract JSON from the output # First try direct JSON parse text = raw_output.strip() # Remove markdown code fences if present text = re.sub(r'^```(?:json)?\s*', '', text) text = re.sub(r'\s*```$', '', text) try: data = json.loads(text) except json.JSONDecodeError: # Try to find JSON block match = re.search(r'\{[\s\S]*"results"[\s\S]*\}', text) if match: try: data = json.loads(match.group()) except json.JSONDecodeError: return None else: return None if not isinstance(data, dict) or "results" not in data: return None results = data["results"] if not isinstance(results, list): return None # Validate each result valid = [] for r in results: if not isinstance(r, dict): continue if "file_id" not in r or "tokens" not in r or "labels" not in r: continue if len(r["tokens"]) != len(r["labels"]): continue if not r["tokens"]: continue valid.append(r) return valid if valid else None def save_batch_results(results: list[dict], batch_num: int): """Save batch results to a temp JSONL file.""" os.makedirs(BATCH_DIR, exist_ok=True) path = os.path.join(BATCH_DIR, f"batch_{batch_num:05d}.jsonl") with open(path, "w", encoding="utf-8") as f: for r in results: f.write(json.dumps(r, ensure_ascii=False) + "\n") return path def main(): parser = argparse.ArgumentParser(description="LLM anime filename annotator") parser.add_argument("--max-files", type=int, default=100, help="Max files to annotate") parser.add_argument("--min-id", type=int, default=1, help="Starting file ID") parser.add_argument("--batch-size", type=int, default=15, help="Files per subagent call") parser.add_argument("--output", default=os.path.join(OUTPUT_DIR, "dmhy_weak_llm.jsonl"), help="Output JSONL path") args = parser.parse_args() print(f"LLM Anime Filename Annotator") print(f" max-files: {args.max_files}") print(f" batch-size: {args.batch_size}") print(f" output: {args.output}") print() all_files = get_skipped_filenames(args.min_id, args.max_files) print(f"Got {len(all_files)} filenames to annotate (video files)") if not all_files: print("No files to annotate. Exiting.") return # Process in batches all_results = [] batch_count = (len(all_files) + args.batch_size - 1) // args.batch_size for batch_idx in range(batch_count): start = batch_idx * args.batch_size end = min(start + args.batch_size, len(all_files)) batch = all_files[start:end] prompt = PROMPT_TEMPLATE.format( filenames_json=format_batch_prompt(batch) ) # Write prompt to temp file for subagent reference prompt_path = os.path.join(tempfile.gettempdir(), f"llm_labeler_batch_{batch_idx:05d}.txt") with open(prompt_path, "w", encoding="utf-8") as f: f.write(prompt) f.write(f"\n\nReturn ONLY valid JSON output for these {len(batch)} filenames.") print(f"\nBatch {batch_idx + 1}/{batch_count} ({len(batch)} files)") print(f" Prompt saved to {prompt_path}") print(f" Files: {', '.join(fn for _, fn in batch)}") # The subagent will need to be invoked externally # For now, save the prompt and create a run script with open(os.path.join(BATCH_DIR, f"prompt_{batch_idx:05d}.txt"), "w", encoding="utf-8") as f: f.write(prompt) # Create a summary for the orchestrator summary_path = os.path.join(BATCH_DIR, "_summary.json") summary = { "total_files": len(all_files), "batches": batch_count, "batch_size": args.batch_size, "min_id": args.min_id, "prompt_file_prefix": "prompt_", "output_file": args.output, "instructions": "For each prompt_NNNNN.txt file, call task(category='deep', load_skills=[], prompt=contents_of_file) and save the JSON result to batch_NNNNN.jsonl", } with open(summary_path, "w", encoding="utf-8") as f: json.dump(summary, f, ensure_ascii=False, indent=2) print(f"\n{'='*60}") print(f"PROMPTS GENERATED: {batch_count} batches") print(f"Total files: {len(all_files)}") print(f"Batch directory: {BATCH_DIR}") print(f"{'='*60}") print() print("NEXT: For each prompt file, invoke a subagent with the prompt,") print("validate the JSON output, and save to batch_NNNNN.jsonl.") print("Then run: python -m tools.llm_labeler --merge") print() if __name__ == "__main__": main()