File size: 10,301 Bytes
f4f4e0e
 
 
 
 
 
 
 
 
8c50d16
 
 
f4f4e0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c50d16
f4f4e0e
 
 
 
8c50d16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
#!/usr/bin/env python3
"""
LLM-based semantic annotator for anime filenames.

Replaces regex heuristics with a subagent that "reads" filenames like a human.
Extracts filenames from the DMHY SQLite DB, sends batches to a subagent for
annotation, and writes JSONL.

Usage:
  python -m tools.llm_labeler --max-files 100       # annotate 100 files
  python -m tools.llm_labeler --min-id 689305       # resume from file ID
  python -m tools.llm_labeler --batch-size 15       # 15 files per subagent call
"""
import argparse
import json
import os
import re
import sqlite3
import subprocess
import sys
import tempfile
import time
from pathlib import Path

DB_PATH = r"D:\WorkSpace\Python\dmhy-parser\dmhy_anime.db"
OUTPUT_DIR = r"D:\WorkSpace\Android\MiruPlay\tools\anime_parser\data\dmhy"
BATCH_DIR = os.path.join(OUTPUT_DIR, "llm_batches")

VIDEO_EXTENSIONS = {".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".rmvb",
                    ".ts", ".m2ts", ".webm", ".mpg", ".mpeg", ".m4v"}

# Build the few-shot prompt template
PROMPT_TEMPLATE = """You are an anime filename annotator. Read each filename and assign BIO labels token-by-token.

LABEL SCHEME:
- B-TITLE / I-TITLE: Anime title words (e.g. Sousou, no, Frieren, 葬送的, 芙莉莲)
- B-SEASON: Season marker (S2, S02, Season 2, 第二季, 第N季, 第N部, 2nd Season, II when it means season 2)
- B-EPISODE: Episode number (01, 06, EP01, 第01话, 第01話, #01)
- B-GROUP / I-GROUP: Release group name [ANi], [SubsPlease], [LoliHouse], 【桜都字幕组】
- B-RESOLUTION: Resolution (1080p, 720P, 4K, 2160p, 1920x1080)
- B-SOURCE: Source/format tag (WEB-DL, BDRip, HEVC, AAC, FLAC, CHT, CHS, GB, BIG5)
- B-SPECIAL: Special type (OVA, OAD, Movie, SP, OP, ED, PV, CM)
- O: Separators (space, -, _, |, ~, .) and noise

IMPORTANT RULES:
1. Roman numerals (II, III, IV) at the end of a title often mean SEASON, not part of the title.
   Example: "Sword Art Online II" → "II" is B-SEASON, not I-TITLE
   Example: "Chibi Maruko-chan II" → "II" is B-SEASON (it's season 2)
   Exception: When the Roman numeral is PART of the franchise name (e.g. "Final Fantasy X", "Kingdom Hearts III")

2. "Season" followed by a number is a season marker. "3rd Season", "4th Season" are season markers.

3. Numbers that appear between the title and episode number are likely season numbers.
   Example: "Isekai Nonbiri Nouka 2 - 05" → "2" is B-SEASON

4. Bracketed items at the START are usually GROUP names.
   Bracketed items at the END are usually metadata (SOURCE, RESOLUTION).

5. Chinese markers like 第2季, 第二季, 第二部 are SEASON markers.
   第01话, 第01話 are EPISODE markers.

6. Read the filename holistically - use your understanding of what the anime is about
   to determine if something is a title word or a technical marker.

Return your answer as a JSON object with a "results" array. Each result has:
  "file_id": integer,
  "filename": string,
  "tokens": list of strings (the tokenized filename),
  "labels": list of strings (one BIO label per token)

Tokenize carefully:
- Keep bracket content as single tokens: [ANi], [1080P], [WEB-DL]
- Chinese/Japanese characters: each character is its own token
- English words: keep as whole words
- Numbers: keep as single tokens
- Separators (space, -, _, |, ~, ., etc.): each is its own token with label O

FILENAMES TO ANNOTATE:
{filenames_json}

Return ONLY valid JSON. No markdown. No explanation. Just the JSON object.
"""

def get_basename_stem(filename: str) -> str:
    """Extract filename stem without extension."""
    basename = re.split(r"[\\/]", filename)[-1].strip()
    stem, ext = os.path.splitext(basename)
    return stem.strip(), ext.lower()

def get_skipped_filenames(min_id: int, max_count: int) -> list[tuple]:
    """
    Get filenames from DB that are video files.
    Prioritizes files with Roman numerals or ordinal season patterns
    that regex handled poorly, then takes random samples.
    """
    conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30)
    conn.execute("PRAGMA query_only=ON")
    try:
        cursor = conn.execute(
            "SELECT id, filename FROM files WHERE id >= ? AND id < ? + ? AND filename IS NOT NULL ORDER BY id",
            (min_id, min_id, max_count * 3)
        )
        results = []
        roman_results = []
        for row in cursor:
            stem, ext = get_basename_stem(row[1])
            if ext not in VIDEO_EXTENSIONS:
                continue
            if re.search(r'\b(ii|iii|iv)\b', stem, re.I) or re.search(r'\d+(?:st|nd|rd|th)\s+[Ss]eason', stem):
                roman_results.append((row[0], stem))
            else:
                results.append((row[0], stem))
            if len(results) + len(roman_results) >= max_count:
                break
        
        # Prioritize Roman numeral cases, fill rest with normal cases
        final = roman_results[:max_count]
        remaining = max_count - len(final)
        if remaining > 0:
            final.extend(results[:remaining])
        return final
    finally:
        conn.close()

def format_batch_prompt(files: list[tuple]) -> str:
    """Format filenames for the subagent prompt."""
    entries = []
    for fid, stem in files:
        entries.append({"file_id": fid, "filename": stem})
    return json.dumps(entries, ensure_ascii=False, indent=2)

def parse_subagent_output(raw_output: str) -> list[dict]:
    """Parse and validate subagent output."""
    # Try to extract JSON from the output
    # First try direct JSON parse
    text = raw_output.strip()
    
    # Remove markdown code fences if present
    text = re.sub(r'^```(?:json)?\s*', '', text)
    text = re.sub(r'\s*```$', '', text)
    
    try:
        data = json.loads(text)
    except json.JSONDecodeError:
        # Try to find JSON block
        match = re.search(r'\{[\s\S]*"results"[\s\S]*\}', text)
        if match:
            try:
                data = json.loads(match.group())
            except json.JSONDecodeError:
                return None
        else:
            return None
    
    if not isinstance(data, dict) or "results" not in data:
        return None
    
    results = data["results"]
    if not isinstance(results, list):
        return None
    
    # Validate each result
    valid = []
    for r in results:
        if not isinstance(r, dict):
            continue
        if "file_id" not in r or "tokens" not in r or "labels" not in r:
            continue
        if len(r["tokens"]) != len(r["labels"]):
            continue
        if not r["tokens"]:
            continue
        valid.append(r)
    
    return valid if valid else None

def save_batch_results(results: list[dict], batch_num: int):
    """Save batch results to a temp JSONL file."""
    os.makedirs(BATCH_DIR, exist_ok=True)
    path = os.path.join(BATCH_DIR, f"batch_{batch_num:05d}.jsonl")
    with open(path, "w", encoding="utf-8") as f:
        for r in results:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")
    return path

def main():
    parser = argparse.ArgumentParser(description="LLM anime filename annotator")
    parser.add_argument("--max-files", type=int, default=100, help="Max files to annotate")
    parser.add_argument("--min-id", type=int, default=1, help="Starting file ID")
    parser.add_argument("--batch-size", type=int, default=15, help="Files per subagent call")
    parser.add_argument("--output", default=os.path.join(OUTPUT_DIR, "dmhy_weak_llm.jsonl"),
                        help="Output JSONL path")
    args = parser.parse_args()
    
    print(f"LLM Anime Filename Annotator")
    print(f"  max-files: {args.max_files}")
    print(f"  batch-size: {args.batch_size}")
    print(f"  output: {args.output}")
    print()
    
    all_files = get_skipped_filenames(args.min_id, args.max_files)
    print(f"Got {len(all_files)} filenames to annotate (video files)")
    
    if not all_files:
        print("No files to annotate. Exiting.")
        return
    
    # Process in batches
    all_results = []
    batch_count = (len(all_files) + args.batch_size - 1) // args.batch_size
    
    for batch_idx in range(batch_count):
        start = batch_idx * args.batch_size
        end = min(start + args.batch_size, len(all_files))
        batch = all_files[start:end]
        
        prompt = PROMPT_TEMPLATE.format(
            filenames_json=format_batch_prompt(batch)
        )
        
        # Write prompt to temp file for subagent reference
        prompt_path = os.path.join(tempfile.gettempdir(), f"llm_labeler_batch_{batch_idx:05d}.txt")
        with open(prompt_path, "w", encoding="utf-8") as f:
            f.write(prompt)
            f.write(f"\n\nReturn ONLY valid JSON output for these {len(batch)} filenames.")
        
        print(f"\nBatch {batch_idx + 1}/{batch_count} ({len(batch)} files)")
        print(f"  Prompt saved to {prompt_path}")
        print(f"  Files: {', '.join(fn for _, fn in batch)}")
        
        # The subagent will need to be invoked externally
        # For now, save the prompt and create a run script
        with open(os.path.join(BATCH_DIR, f"prompt_{batch_idx:05d}.txt"), "w", encoding="utf-8") as f:
            f.write(prompt)
    
    # Create a summary for the orchestrator
    summary_path = os.path.join(BATCH_DIR, "_summary.json")
    summary = {
        "total_files": len(all_files),
        "batches": batch_count,
        "batch_size": args.batch_size,
        "min_id": args.min_id,
        "prompt_file_prefix": "prompt_",
        "output_file": args.output,
        "instructions": "For each prompt_NNNNN.txt file, call task(category='deep', load_skills=[], prompt=contents_of_file) and save the JSON result to batch_NNNNN.jsonl",
    }
    with open(summary_path, "w", encoding="utf-8") as f:
        json.dump(summary, f, ensure_ascii=False, indent=2)
    
    print(f"\n{'='*60}")
    print(f"PROMPTS GENERATED: {batch_count} batches")
    print(f"Total files: {len(all_files)}")
    print(f"Batch directory: {BATCH_DIR}")
    print(f"{'='*60}")
    print()
    print("NEXT: For each prompt file, invoke a subagent with the prompt,")
    print("validate the JSON output, and save to batch_NNNNN.jsonl.")
    print("Then run: python -m tools.llm_labeler --merge")
    print()

if __name__ == "__main__":
    main()