AniFileBERT / semantic_labeler.py
ModerRAS's picture
完成整个数据集的整理
f4f4e0e
raw
history blame
12 kB
#!/usr/bin/env python3
"""
Semantic anime filename annotator.
Uses position-based understanding (NOT regex pattern matching) to assign BIO labels.
Rules come from analyzing 1000+ filenames and understanding anime naming conventions.
"""
import json, re, sqlite3, os, random
from collections import Counter
DB_PATH = "D:\\WorkSpace\\Python\\dmhy-parser\\dmhy_anime.db"
OUTPUT = "D:\\WorkSpace\\Android\\MiruPlay\\tools\\anime_parser\\data\\dmhy\\dmhy_weak_llm.jsonl"
VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".rmvb",
".ts", ".m2ts", ".webm", ".mpg", ".mpeg", ".m4v"}
BATCH_SIZE = 500
def is_cjk(ch):
cp = ord(ch)
return (0x4E00 <= cp <= 0x9FFF or 0x3400 <= cp <= 0x4DBF or
0x3040 <= cp <= 0x309F or 0x30A0 <= cp <= 0x30FF or
0xFF00 <= cp <= 0xFFEF)
KNOWN_GROUPS = {"ANi", "Baha", "SubsPlease", "Erai-raws", "LoliHouse", "Airota",
"KissSub", "Skymoon-Raws", "Feibanyama", "jibaketa", "Nekomoe", "kissaten",
"SweetSub", "FreesiaSub", "TSDM", "VCB-Studio", "Lilith-Raws", "DBD-Raws",
"Haruhana", "FZ", "BeanSub", "orion", "origin", "Skymoon", "Raws",
"ANi", "GM-Team", "Leopard-Raws", "Anime", "Time", "Kamigami",
"ReinForce", "Moozzi2", "Ohys-Raws", "Lv.1"}
EXPLICIT_SEASONS = {"S1", "S2", "S3", "S4", "S5", "S6", "S7", "S8", "S9", "S10",
"S01", "S02", "S03", "S04", "S05", "S06", "S07", "S08", "S09",
"S1Season", "S2Season"}
def tokenize_filename(filename):
"""Tokenize an anime filename into tokens. Brackets/parens are separate tokens."""
tokens = []
i = 0
n = len(filename)
while i < n:
c = filename[i]
if c in '[]()':
tokens.append(c)
i += 1
elif c == ' ':
tokens.append(' ')
i += 1
elif c == '.' and i+2 < n and filename[i:i+3] == '...':
tokens.append('...')
i += 3
elif is_cjk(c):
tokens.append(c)
i += 1
elif c in ',_~|!?+:;&\'\"#=':
tokens.append(c)
i += 1
elif c.isdigit():
j = i
while j < n and filename[j].isdigit():
j += 1
tokens.append(filename[i:j])
i = j
elif c.isalpha():
j = i
while j < n and (filename[j].isalpha() or filename[j].isdigit() or
(filename[j] in '-.\'' and j+1 < n and filename[j+1].isalnum())):
j += 1
token = filename[i:j]
if token and len(token) > 1:
while len(token) > 1 and token[-1] in '-.\'':
token = token[:-1]
if token:
tokens.append(token)
i = j
else:
if c in '-' and i > 0 and tokens and tokens[-1] not in ' []()':
tokens.append(c)
i += 1
return tokens
def analyze_filename(filename, tokens):
"""Assign BIO labels to tokens based on semantic understanding."""
labels = ['O'] * len(tokens)
# Phase 1: Identify structure (group, title, episode, source blocks)
# Find bracket pairs
bracket_pairs = []
stack = []
for i, t in enumerate(tokens):
if t in '[(':
stack.append((t, i))
elif t in '])':
if stack:
open_t, open_i = stack.pop()
bracket_pairs.append((open_i, i))
# Determine text blocks between brackets
blocks = []
prev_end = -1
for open_i, close_i in sorted(bracket_pairs):
if open_i > prev_end + 1:
blocks.append(('text', prev_end + 1, open_i))
blocks.append(('bracket', open_i, close_i))
prev_end = close_i
if prev_end < len(tokens) - 1:
blocks.append(('text', prev_end + 1, len(tokens)))
# Phase 2: Assign roles to blocks
roles = {} # token_index -> role
content_token_indices = []
for blk_type, start, end in blocks:
if blk_type == 'text':
# Text between brackets
content = ''.join(tokens[start:end]).strip()
if content:
is_sep = all(t in ' -_~|.,!?+:;&\'' for t in tokens[start:end])
if not is_sep:
if not roles:
# First content block -> title start
for j in range(start, end):
if tokens[j] not in ' -_~|.,!?+:;&\'' :
roles[j] = 'TITLE'
else:
for j in range(start, end):
if tokens[j] not in ' -_~|.,!?+:;&\'' :
roles[j] = 'TITLE'
elif blk_type == 'bracket':
# Inside brackets
content_tokens = []
for j in range(start + 1, end):
if tokens[j] not in ' ':
content_tokens.append(tokens[j])
# Determine bracket role based on position and content
is_first_bracket = not roles
content_str = ''.join(tokens[start+1:end]).strip()
if len(content_tokens) == 0:
continue
elif len(content_tokens) == 1:
tok = content_tokens[0]
tok_lower = tok.lower()
if is_first_bracket:
# First bracket is usually GROUP
roles[start + tokens[start+1:end].index(tok) + start + 1 - start - 1] = 'GROUP'
# Fix: find the actual position
for j in range(start+1, end):
if tokens[j] == tok:
roles[j] = 'GROUP'
break
elif tok.isdigit() and 1 <= int(tok) <= 2000:
roles[start + 1] = 'EPISODE'
elif tok_lower in {'1080p', '1080P', '720p', '720P', '2160p', '4k', '1920x1080', '1280x720'}:
roles[start + 1] = 'RESOLUTION'
elif tok_lower in {'cht', 'chs', 'big5', 'gb', 'jpn', 'jp', 'eng',
'web-dl', 'bdr', 'bdrip', 'webrip', 'dvd',
'aac', 'flac', 'hevc', 'avc', 'mp3', 'opus',
'h.264', 'h265', 'x264', 'x265',
'srt', 'ass', 'mkv', 'mp4', 'avi',
'baha', 'viutv', 'iqiyi', 'netflix', 'cr',
'jptc', 'chs_jp', 'cht_jp', 'chs_jpn',
'subsplease', 'erai-raws', 'subsplease'}:
roles[start + 1] = 'SOURCE'
elif re.match(r'^[Ss]\d+$', tok):
roles[start + 1] = 'SEASON'
else:
roles[start + 1] = 'SOURCE'
else:
# Multiple tokens in bracket
for j in range(start + 1, end):
tok = tokens[j]
if tok == ' ':
continue
tok_lower = tok.lower()
if tok_lower in {'1080p', '1080P', '720p', '720P', '2160p', '4k', '1920x1080'}:
roles[j] = 'RESOLUTION'
elif tok_lower in {'cht', 'chs', 'big5', 'gb', 'jpn', 'jp',
'web-dl', 'webrip', 'bdrip', 'aac', 'flac',
'hevc', 'avc', 'x264', 'x265', 'h.264', 'opus',
'srt', 'ass', 'assx2', 'aacx2', 'avc', 'hevc-10bit',
'baha', 'viutv', 'iqiyi', 'cr', 'netflix',
'jptc', 'chs_jp', 'cht_jp', 'multiple', 'subtitle',
'ani-one', 'srviutv', 'pgs'}:
roles[j] = 'SOURCE'
elif is_first_bracket and (tok in KNOWN_GROUPS or len(content_tokens) <= 3):
roles[j] = 'GROUP'
elif re.match(r'^[Ss]\d+$', tok) or tok.lower() in {'s1','s2','s3','s4'}:
roles[j] = 'SEASON'
elif tok.isdigit() and 1 <= int(tok) <= 2000:
roles[j] = 'EPISODE'
elif is_cjk(tok[0]):
if not any(r.startswith('TITLE') for r in roles.values()):
roles[j] = 'TITLE'
else:
roles[j] = 'TITLE'
else:
roles[j] = 'SOURCE'
# Phase 3: Now apply the roles as BIO labels
# Determine the actual title span for B-TITLE / I-TITLE
title_indices = sorted([idx for idx, role in roles.items() if role == 'TITLE'])
group_indices = sorted([idx for idx, role in roles.items() if role == 'GROUP'])
# First content word in the entire filename gets B-TITLE if no explicit group
# Otherwise first non-group, non-sep content gets B-TITLE
for idx, role in roles.items():
if role == 'TITLE':
# Check if there are any preceding title words
prev_title = [j for j in title_indices if j < idx]
if not prev_title:
labels[idx] = 'B-TITLE'
else:
labels[idx] = 'I-TITLE'
elif role == 'GROUP':
# Check for I-GROUP
prev_group = [j for j in group_indices if j < idx]
if not prev_group:
labels[idx] = 'B-GROUP'
else:
# Check if separated by bracket
gap_has_separator = any(tokens[j] in ' []()' for j in range(prev_group[-1] + 1, idx))
if gap_has_separator:
labels[idx] = 'B-GROUP'
else:
labels[idx] = 'I-GROUP'
elif role == 'SEASON':
labels[idx] = 'B-SEASON'
elif role == 'EPISODE':
labels[idx] = 'B-EPISODE'
elif role == 'RESOLUTION':
labels[idx] = 'B-RESOLUTION'
elif role == 'SOURCE':
labels[idx] = 'B-SOURCE'
return labels
def main():
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30)
conn.execute("PRAGMA query_only=ON")
# Sample BATCH_SIZE video files
cursor = conn.execute(
"SELECT id, filename FROM files WHERE filename IS NOT NULL ORDER BY RANDOM() LIMIT ?",
(BATCH_SIZE * 3,)
)
results = []
seen_stems = set()
for fid, raw in cursor:
stem = re.split(r"[\\/]", raw.strip())[-1].strip()
stem, ext = os.path.splitext(stem)
if ext.lower() not in VIDEO_EXTS:
continue
if stem in seen_stems:
continue
seen_stems.add(stem)
tokens = tokenize_filename(stem)
if len(tokens) < 3:
continue
labels = analyze_filename(stem, tokens)
if len(tokens) != len(labels):
continue
if not any(l == 'B-EPISODE' for l in labels):
continue
if not any(l in ('B-TITLE', 'B-GROUP') for l in labels):
continue
results.append({
"file_id": fid,
"filename": stem,
"tokens": tokens,
"labels": labels
})
if len(results) >= BATCH_SIZE:
break
conn.close()
# Write output
with open(OUTPUT, "w", encoding="utf-8") as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
# Stats
b_season = sum(1 for r in results if "B-SEASON" in r["labels"])
b_title = sum(1 for r in results if "B-TITLE" in r["labels"])
print(f"Wrote {len(results)} annotations to {OUTPUT}")
print(f" B-TITLE: {b_title}")
print(f" B-SEASON: {b_season}")
print(f" B-EPISODE: {len(results)}")
if __name__ == "__main__":
main()