AniFileBERT / inference.py
ModerRAS's picture
Improve anime filename parser model
e63569d
raw
history blame
34.5 kB
"""
Inference script for anime filename parser.
Loads a trained model and tokenizer, parses anime filenames,
and outputs structured metadata.
Usage:
python inference.py "[ANi] 葬送的芙莉莲 S2 - 03 [1080P][WEB-DL]"
python inference.py --input-file filenames.txt --output-file results.jsonl
"""
import argparse
import json
import os
import re
import sys
from typing import Dict, List, Optional, Tuple
import torch
from transformers import BertForTokenClassification
from config import Config
from label_repairs import season_marker_number
from tokenizer import AnimeTokenizer, load_tokenizer
# Chinese number mapping
CN_NUM_MAP: Dict[str, int] = {
"一": 1, "二": 2, "三": 3, "四": 4, "五": 5,
"六": 6, "七": 7, "八": 8, "九": 9, "十": 10,
}
def extract_season_number(text: str) -> Optional[int]:
"""
Extract season number from various season formats.
Examples:
"S2" → 2, "Season 2" → 2, "第二季" → 2, "1st Season" → 1
"""
marker_value = season_marker_number(text)
if marker_value is not None:
return marker_value
# Arabic digits
match = re.search(r'(\d+)', text)
if match:
return int(match.group(1))
# Chinese digits
for cn, num in CN_NUM_MAP.items():
if cn in text:
return num
return None
def extract_episode_number(text: str) -> Optional[int]:
"""
Extract episode number from various episode formats.
Examples:
"03" → 3, "EP21" → 21, "第7话" → 7, "#01" → 1
"""
match = re.search(r'(\d+)', text)
if match:
return int(match.group(1))
return None
def extract_resolution(text: str) -> Optional[str]:
"""Extract resolution string (e.g., '1080P', '4K', '1920x1080')."""
# Strip brackets for matching
clean = text.strip("[]()【】")
return clean if clean else None
def display_token(token: str) -> str:
"""Make whitespace tokens visible in debug output."""
if token == " ":
return "<SPACE>"
if token == "\t":
return "<TAB>"
return token
def trim_decorations(text: str) -> str:
"""Trim outer release brackets from an extracted entity."""
return text.strip().strip("[]()【】《》()").strip()
def join_entity_tokens(tokens: List[str], tokenizer: Optional[AnimeTokenizer] = None) -> str:
"""Join entity tokens according to the tokenizer granularity."""
if tokenizer is not None and getattr(tokenizer, "tokenizer_variant", "regex") == "char":
return "".join(tokens)
text = "".join(tokens)
if " " in tokens:
return text
return text
def labels_to_entities(
tokens: List[str],
labels: List[str],
tokenizer: Optional[AnimeTokenizer] = None,
) -> List[Tuple[str, str]]:
"""
Convert BIO labels into entity spans.
Illegal orphan I-X labels start a new entity so debug output exposes the
model behavior instead of silently dropping tokens.
"""
entities: List[Tuple[str, str]] = []
current_entity: Optional[str] = None
current_tokens: List[str] = []
for token, label in zip(tokens, labels):
if label.startswith("B-"):
if current_entity:
entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer)))
current_entity = label[2:]
current_tokens = [token]
elif label.startswith("I-"):
entity_type = label[2:]
if current_entity == entity_type:
current_tokens.append(token)
else:
if current_entity:
entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer)))
current_entity = entity_type
current_tokens = [token]
else:
if current_entity:
entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer)))
current_entity = None
current_tokens = []
if current_entity:
entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer)))
return entities
def is_allowed_bio_transition(previous_label: str, label: str) -> bool:
"""Return whether previous_label -> label is valid under IOB2."""
if label.startswith("I-"):
entity = label[2:]
return previous_label in {f"B-{entity}", f"I-{entity}"}
return True
def constrained_bio_decode(emissions: torch.Tensor, id2label: Dict[int, str]) -> List[int]:
"""
Decode token logits with hard BIO transition constraints.
This is a lightweight CRF-style Viterbi decoder without learned transition
weights. It prevents impossible orphan I-X spans at inference time.
"""
if emissions.numel() == 0:
return []
num_tokens, num_labels = emissions.shape
scores = emissions.detach().cpu()
backpointers = torch.zeros((num_tokens, num_labels), dtype=torch.long)
dp = torch.full((num_labels,), float("-inf"))
for label_id in range(num_labels):
label = id2label.get(label_id, "O")
if not label.startswith("I-"):
dp[label_id] = scores[0, label_id]
for idx in range(1, num_tokens):
next_dp = torch.full((num_labels,), float("-inf"))
for label_id in range(num_labels):
label = id2label.get(label_id, "O")
best_score = float("-inf")
best_prev = 0
for prev_id in range(num_labels):
prev_label = id2label.get(prev_id, "O")
if not is_allowed_bio_transition(prev_label, label):
continue
candidate = dp[prev_id] + scores[idx, label_id]
if candidate > best_score:
best_score = float(candidate)
best_prev = prev_id
next_dp[label_id] = best_score
backpointers[idx, label_id] = best_prev
dp = next_dp
best_last = int(torch.argmax(dp).item())
decoded = [best_last]
for idx in range(num_tokens - 1, 0, -1):
decoded.append(int(backpointers[idx, decoded[-1]].item()))
decoded.reverse()
return decoded
def postprocess(
tokens: List[str],
labels: List[str],
tokenizer: Optional[AnimeTokenizer] = None,
filename: Optional[str] = None,
use_rules: bool = True,
) -> Dict:
"""
Convert BIO-labeled tokens into structured metadata.
Merges consecutive B- / I- tokens of the same entity type,
then extracts structured fields.
"""
result: Dict = {
"title": None,
"season": None,
"episode": None,
"group": None,
"resolution": None,
"source": None,
"special": None,
}
entities = labels_to_entities(tokens, labels, tokenizer)
# Fill result
for entity_type, text in entities:
if entity_type == "TITLE":
result["title"] = result["title"] or trim_decorations(text)
# If we find multiple title fragments, concatenate them
# (handles "That" + ... + "Time" etc.)
elif entity_type == "SEASON":
season_num = extract_season_number(text)
if season_num is not None:
# Keep the highest/last season number if multiple
result["season"] = season_num
elif entity_type == "EPISODE":
ep_num = extract_episode_number(text)
if ep_num is not None:
if result["episode"] is None:
result["episode"] = ep_num
elif entity_type == "GROUP":
group = text.strip("[]()【】")
if result["group"] is None:
result["group"] = group
elif entity_type == "SPECIAL":
special = text.strip("[]()【】")
result["special"] = special
elif entity_type == "RESOLUTION":
res = extract_resolution(text)
if res:
result["resolution"] = res
elif entity_type == "SOURCE":
src = text.strip("[]()【】")
result["source"] = src
# Handle multi-fragment titles: concatenate all TITLE fragments
# (This is needed because O tokens between words break entity continuity)
title_fragments = [t for e, t in entities if e == "TITLE"]
if title_fragments:
result["title"] = " ".join(
trimmed for f in title_fragments
if (trimmed := trim_decorations(f))
)
if use_rules and filename:
result = apply_rule_assists(filename, result)
return result
BRACKET_RE = re.compile(r"\[([^\]]+)\]|\(([^)]+)\)|【([^】]+)】|《([^》]+)》")
RESOLUTION_RE = re.compile(r"(?<![A-Za-z0-9])(?:\d{3,4}[pP]|\d[Kk]|\d{3,4}[xX×]\d{3,4})(?![A-Za-z0-9])")
SOURCE_TOKEN_PATTERN = (
r"WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|BD|DVDRip|DVD|TVRip|HDTV|"
r"Netflix|NF|AMZN|Baha|CR|ABEMA|DSNP|U[-_ ]?NEXT|Hulu|AT[-_ ]?X|"
r"CHS|CHT|GB|BIG5|JPN?|繁中|简中"
)
SOURCE_RE = re.compile(rf"\b(?:{SOURCE_TOKEN_PATTERN})\b", re.I)
SOURCE_TAG_RE = re.compile(
rf"^(?:{SOURCE_TOKEN_PATTERN})(?:\s*(?:[&+/]|,\s*)\s*(?:{SOURCE_TOKEN_PATTERN}))*$",
re.I,
)
SPECIAL_TAG_RE = re.compile(
r"^(?:檢索|检索|搜索|搜寻|搜尋|别名|別名|alias|search|keyword)\s*[::].+",
re.I,
)
EPISODE_PATTERNS = [
("season_episode", re.compile(r"[Ss]\d{1,2}[Ee](?P<ep>\d{1,4})(?:v\d+)?", re.I)),
("dash_episode", re.compile(r"(?:^|[\s._])[-_]\s*(?P<ep>\d{1,4})(?:v\d+)?(?=$|[\s._\-\]\)】》\[])")),
("bracket_episode", re.compile(r"[\[\(【《](?:EP?|#)?(?P<ep>\d{1,4})(?:v\d+)?[\]\)】》]", re.I)),
("explicit_episode", re.compile(r"(?:^|[\s._\-\[\(【《#])(?:EP?|第|#)(?P<ep>\d{1,4})(?:v\d+)?(?:[话話集])?(?=$|[\s._\-\]\)】》])", re.I)),
(
"long_episode",
re.compile(
r"(?:^|[\s._\-\[\(【《])(?P<ep>\d{3,4})(?:v\d+)?"
r"(?=[\s._\-\]\)】》\[]+(?:\d{3,4}[pP]|WEB|BD|BluRay|HDTV|NF|AMZN|CR|Baha))",
re.I,
),
),
("generic_episode", re.compile(r"(?:^|[\s._\-\[\(【《#])(?P<ep>\d{1,3})(?:v\d+)?(?=$|[\s._\-\]\)】》])", re.I)),
]
SEASON_RE = re.compile(r"(?:^|[\s._\-\[\(【《])(?:[Ss](?P<s1>\d{1,2})|Season\s*(?P<s2>\d{1,2})|第(?P<s3>[一二三四五六七八九十\d]+)[季期部])", re.I)
SEQUEL_MARKER_RE = re.compile(
r"(?<![A-Za-z0-9])"
r"(?P<marker>"
r"Ni\s+no\s+(?:Sara|Shou|Sho|Syo|Shō)|"
r"San\s+no\s+(?:Sara|Shou|Sho|Syo)|"
r"(?:Yon|Shi|Shin)\s+no\s+Sara|"
r"(?:Go|Gou)\s+no\s+Sara|"
r"Ni\s+Gakki|Sono\s+Ni|Ni|"
r"II|III|IV|V|VI|VII|VIII|IX|[ⅡⅢⅣⅤⅥⅦⅧⅨ]|"
r"[一二三四五六七八九十兩两貳贰弐弍參叁参肆伍陸陆柒捌玖](?:\s*(?:ノ|の|之)\s*(?:章|期|季|部))?"
r")"
r"(?![A-Za-z0-9])",
re.I,
)
TRAILING_SEQUEL_MARKER_RE = re.compile(
r"(?:^|[\s._-])"
r"(?P<marker>"
r"Ni\s+no\s+(?:Sara|Shou|Sho|Syo|Shō)|"
r"San\s+no\s+(?:Sara|Shou|Sho|Syo)|"
r"(?:Yon|Shi|Shin)\s+no\s+Sara|"
r"(?:Go|Gou)\s+no\s+Sara|"
r"Ni\s+Gakki|Sono\s+Ni|Ni|"
r"II|III|IV|V|VI|VII|VIII|IX|[ⅡⅢⅣⅤⅥⅦⅧⅨ]|"
r"[一二三四五六七八九十兩两貳贰弐弍參叁参肆伍陸陆柒捌玖](?:\s*(?:ノ|の|之)\s*(?:章|期|季|部))?"
r")$",
re.I,
)
NOISE_META_RE = re.compile(
r"^(?:\d{3,4}[pP]|\d[Kk]|WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|BD|DVDRip|DVD|TVRip|"
r"HDTV|Netflix|NF|AMZN|Baha|CR|HEVC|AVC|AV1|x26[45]|h\.?26[45]|AAC.*|FLAC|MP3|DTS|"
r"Opus|ASS.*|CHS|CHT|BIG5|GB|JPN?|MP4|MKV|繁中|简中|内封|外挂)$",
re.I,
)
def cn_number_to_int(text: str) -> Optional[int]:
if text.isdigit():
return int(text)
values = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9}
if text == "十":
return 10
if text.startswith("十") and len(text) == 2:
return 10 + values.get(text[1], 0)
if text.endswith("十") and len(text) == 2:
return values.get(text[0], 0) * 10
if "十" in text and len(text) == 3:
return values.get(text[0], 0) * 10 + values.get(text[2], 0)
return values.get(text)
def bracket_parts(filename: str) -> List[Tuple[str, int, int]]:
parts: List[Tuple[str, int, int]] = []
for match in BRACKET_RE.finditer(filename):
text = next(group for group in match.groups() if group is not None)
parts.append((text.strip(), match.start(), match.end()))
return parts
def looks_like_group(text: str) -> bool:
if not text or NOISE_META_RE.search(text):
return False
return bool(
re.search(
r"(?:字幕|字幕组|字幕組|sub|subs|raws?|fansub|studio|house|team|project|"
r"loli|ani|vcb|airota|kiss|dmhy|erai|subsplease)",
text,
re.I,
)
)
def looks_like_episode_or_meta(text: str) -> bool:
if not text:
return False
clean = text.strip()
return bool(
re.fullmatch(r"(?:EP?|#)?\d{1,4}(?:v\d+)?", clean, re.I)
or RESOLUTION_RE.search(clean)
or SOURCE_TAG_RE.fullmatch(clean)
or SOURCE_RE.search(clean)
or SPECIAL_TAG_RE.search(clean)
or NOISE_META_RE.search(clean)
)
def looks_like_structural_group(text: str, filename: str, bracket_end: int) -> bool:
"""Heuristic for short leading release-group brackets not in the name list."""
if looks_like_group(text):
return True
if not text or looks_like_episode_or_meta(text):
return False
after = filename[bracket_end:].lstrip(" \t._")
if after.startswith("-"):
return False
next_bracket = BRACKET_RE.match(after)
if next_bracket:
next_text = next(group for group in next_bracket.groups() if group is not None)
if looks_like_episode_or_meta(next_text):
return False
words = re.findall(r"[A-Za-z0-9]+", text)
if not words:
if re.search(r"[\u3400-\u9fff]", text) and len(text) <= 32:
return True
return False
if len(text) > 32:
return False
if len(words) == 1:
return True
if any(sep in text for sep in "-_"):
return True
if words[0].isupper() and len(words[0]) <= 4 and len(words) <= 3:
return True
return False
def apply_rule_assists(filename: str, result: Dict) -> Dict:
"""
Fill high-confidence structural fields from filename conventions.
The model remains the primary tagger; rules only fill missing obvious fields
or repair common boundary drift around leading group brackets and episodes.
"""
repaired = dict(result)
brackets = bracket_parts(filename)
if (not repaired.get("group") or (repaired.get("title") and repaired["group"] in repaired["title"])) and brackets:
first_text, first_start, first_end = brackets[0]
if first_start == 0 and looks_like_structural_group(first_text, filename, first_end):
repaired["group"] = first_text
if not repaired.get("resolution"):
match = RESOLUTION_RE.search(filename)
if match:
repaired["resolution"] = match.group(0)
source_matches = source_candidates(filename)
current_source = repaired.get("source")
preferred_source = source_matches[0] if source_matches else None
if source_matches and (
not current_source
or not SOURCE_RE.fullmatch(str(current_source))
or len(str(current_source)) <= 3 and str(current_source).lower() not in {"nf", "cr"}
or (
preferred_source
and str(current_source).lower().replace("_", "-") in {"web-dl", "webdl", "webrip", "web-rip"}
and preferred_source.lower().replace("_", "-") not in {"web-dl", "webdl", "webrip", "web-rip"}
)
):
repaired["source"] = preferred_source
if not repaired.get("special"):
for text, _start, _end in brackets:
clean = text.strip()
if SPECIAL_TAG_RE.search(clean):
repaired["special"] = clean
break
episode = best_structural_episode(filename)
if episode is not None and (
repaired.get("episode") is None
or not plausible_episode_context(filename, int(repaired["episode"]))
):
repaired["episode"] = episode
if repaired.get("season") is None:
match = SEASON_RE.search(filename)
if match:
value = next(group for group in match.groups() if group)
season = cn_number_to_int(value)
if season is not None:
repaired["season"] = season
if repaired.get("season") is None and repaired.get("episode") is not None:
sequel = structural_sequel_marker(filename, repaired.get("group"), repaired.get("episode"))
if sequel is not None:
repaired["season"] = sequel[1]
elif repaired.get("episode") == repaired.get("season") and not SEASON_RE.search(filename):
repaired["season"] = None
title = repaired.get("title")
group = repaired.get("group")
if group and (NOISE_META_RE.search(str(group)) or SOURCE_RE.fullmatch(str(group)) or RESOLUTION_RE.fullmatch(str(group))):
repaired["group"] = None
group = None
if title and group and title.startswith(group):
title = title[len(group):].lstrip("]】)>})》 \t-_.")
repaired["title"] = title or repaired["title"]
if repaired.get("episode"):
repaired_title = infer_title_span(filename, group, repaired["episode"])
if repaired_title:
repaired["title"] = repaired_title
if repaired.get("title") and repaired.get("season") is not None:
repaired["title"] = strip_trailing_season_from_title(repaired["title"], repaired["season"])
return repaired
def structural_sequel_marker(
filename: str,
group: Optional[str],
episode: Optional[int],
) -> Optional[Tuple[str, int]]:
if episode is None:
return None
title_end = None
if episode is not None:
ep_patterns = [
rf"[Ss]\d{{1,2}}[Ee]0*{episode}(?:v\d+)?",
rf"\s[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])",
rf"[\[\(【《]0*{episode}(?:v\d+)?[\]\)】》]",
rf"#\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])",
rf"(?:^|[\s._\-\[\(【《])第0*{episode}(?:[话話集])?(?=$|[\s._\-\]\)】》])",
]
start = 0
if group:
first = BRACKET_RE.match(filename)
if first and group in first.group(0):
start = first.end()
for pattern in ep_patterns:
match = re.search(pattern, filename[start:], re.I)
if match:
title_end = start + match.start()
break
if title_end is None:
return None
prefix = filename[:title_end].rstrip(" \t-_.")
for match in reversed(list(SEQUEL_MARKER_RE.finditer(prefix))):
marker = match.group("marker")
value = season_marker_number(marker)
if value is None:
continue
tail = prefix[match.end():].strip(" \t-_.")
if tail:
continue
if marker.lower() == "ni" and "Kakuriyo no Yadomeshi Ni" not in prefix:
continue
return marker, value
return None
def normalize_source_text(text: str) -> str:
text = re.sub(r"\s+", "", text.strip())
text = re.sub(r"(?i)WEB[_ ]?DL", "WEB-DL", text)
text = re.sub(r"(?i)WEB[_ ]?Rip", "WebRip", text)
text = re.sub(r"(?i)U[_ ]?NEXT", "U-NEXT", text)
text = re.sub(r"(?i)AT[_ ]?X", "AT-X", text)
return text.replace("_", "-")
def source_priority(source: str) -> int:
normalized = source.lower().replace("_", "-").replace(" ", "")
parts = re.split(r"[&+/,]", normalized)
if any(part in {"nf", "netflix", "amzn", "baha", "cr", "abema", "dsnp", "u-next", "hulu", "at-x"} for part in parts):
return 90
if any(part in {"web-dl", "webdl", "webrip", "web-rip", "bdrip", "bluray", "bdmv", "bd", "dvdrip", "dvd", "tvrip", "hdtv"} for part in parts):
return 60
if len(parts) > 1:
return 40
return 20
def source_candidates(filename: str) -> List[str]:
candidates: List[Tuple[int, int, str]] = []
for text, start, _end in bracket_parts(filename):
clean = text.strip()
if SOURCE_TAG_RE.fullmatch(clean):
normalized = normalize_source_text(clean)
candidates.append((source_priority(normalized), -start, normalized))
for match in SOURCE_RE.finditer(filename):
normalized = normalize_source_text(match.group(0))
candidates.append((source_priority(normalized), -match.start(), normalized))
deduped: Dict[str, Tuple[int, int, str]] = {}
for priority, neg_start, value in candidates:
key = value.lower()
if key not in deduped or (priority, neg_start) > (deduped[key][0], deduped[key][1]):
deduped[key] = (priority, neg_start, value)
return [value for _priority, _neg_start, value in sorted(deduped.values(), reverse=True)]
def best_structural_episode(filename: str) -> Optional[int]:
priorities = {
"season_episode": 1000,
"dash_episode": 900,
"bracket_episode": 850,
"explicit_episode": 800,
"long_episode": 750,
"generic_episode": 100,
}
candidates: List[Tuple[int, int, int]] = []
for name, pattern in EPISODE_PATTERNS:
for match in pattern.finditer(filename):
ep_text = match.group("ep")
ep = int(ep_text)
if ep == 0 or ep > 2000:
continue
context = filename[max(0, match.start() - 5):match.end() + 5]
if RESOLUTION_RE.search(context) or re.search(r"AAC|DDP|AC3|H\.?26[45]|x26[45]", context, re.I):
continue
priority = priorities[name]
if 1 <= ep <= 200:
priority += 20
candidates.append((priority, match.start(), ep))
if not candidates:
return None
return max(candidates, key=lambda item: (item[0], item[1]))[2]
def plausible_episode_context(filename: str, episode: int) -> bool:
ep_text = str(episode)
padded = f"{episode:02d}"
if re.search(rf"(?<![A-Za-z0-9])(?:H|x)\.?0*{re.escape(ep_text)}(?!\d)", filename, re.I):
return False
patterns = [
rf"[Ss]\d{{1,2}}[Ee]0*{episode}(?:v\d+)?",
rf"(?:^|[\s._])[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s._\-\]\)】》\[])",
rf"[\[\(【《](?:EP?|#)?0*{episode}(?:v\d+)?[\]\)】》]",
rf"(?:^|[\s._\-\[\(【《#])(?:EP?|第|#)0*{episode}(?:v\d+)?(?:[话話集])?(?=$|[\s._\-\]\)】》])",
rf"(?:^|[\s._\-\[\(【《])0*{episode}(?:v\d+)?(?=[\s._\-\]\)】》\[]+(?:\d{{3,4}}[pP]|WEB|BD|BluRay|HDTV|NF|AMZN|CR|Baha))",
]
return any(re.search(pattern, filename, re.I) for pattern in patterns) or bool(
re.search(rf"(?:^|[\s._\-\[\(【《])(?:{re.escape(ep_text)}|{re.escape(padded)})(?=$|[\s._\-\]\)】》])", filename)
)
def strip_trailing_season_from_title(title: str, season: int) -> str:
season_text = str(season)
patterns = [
rf"\s+[Ss]0*{season_text}$",
rf"\s+Season\s*0*{season_text}$",
rf"\s+0*{season_text}$",
]
cleaned = title
for pattern in patterns:
cleaned = re.sub(pattern, "", cleaned, flags=re.I).strip(" \t-_.")
match = TRAILING_SEQUEL_MARKER_RE.search(cleaned)
if match and season_marker_number(match.group("marker")) == season:
cleaned = cleaned[:match.start()].strip(" \t-_.")
return cleaned or title
def clean_inferred_title(title: str) -> str:
raw_title = title.strip(" \t-_.")
bracket_matches = list(BRACKET_RE.finditer(raw_title))
if bracket_matches:
first = bracket_matches[0]
prefix = raw_title[:first.start()].strip(" \t-_.★☆")
text = next(group for group in first.groups() if group is not None).strip()
if text and not looks_like_episode_or_meta(text) and (
not prefix
or re.search(r"(?:新番|月|合集|繁|简|字幕|先行|合集|★|☆)", prefix, re.I)
):
return text
return raw_title.strip("[]()【】《》()")
def infer_title_span(filename: str, group: Optional[str], episode: Optional[int]) -> Optional[str]:
start = 0
if group:
first = BRACKET_RE.match(filename)
if first and group in first.group(0):
start = first.end()
else:
# Some releases put leading metadata before the actual title, e.g.
# `[1080p] Title - 01`. Do not keep that wrapper as title text.
while True:
leading = BRACKET_RE.match(filename[start:].lstrip(" \t._-"))
if not leading:
break
skipped_ws = len(filename[start:]) - len(filename[start:].lstrip(" \t._-"))
text = next(group for group in leading.groups() if group is not None)
if not looks_like_episode_or_meta(text):
break
start += skipped_ws + leading.end()
end = None
if episode is not None:
ep_patterns = [
rf"[Ss]\d{{1,2}}[Ee]0*{episode}(?:v\d+)?",
rf"\s[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])",
rf"[\[\(【《]0*{episode}(?:v\d+)?[\]\)】》]",
rf"#\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])",
rf"(?:^|[\s._\-\[\(【《])第0*{episode}(?:[话話集])?(?=$|[\s._\-\]\)】》])",
rf"[Ee]0*{episode}(?:v\d+)?",
]
for pattern in ep_patterns:
match = re.search(pattern, filename[start:], re.I)
if match:
end = start + match.start()
break
if end is None:
for text, bracket_start, _bracket_end in bracket_parts(filename):
if bracket_start <= start:
continue
if NOISE_META_RE.search(text) or RESOLUTION_RE.search(text) or SOURCE_RE.search(text):
end = bracket_start
break
if end is None or end <= start:
return None
title = clean_inferred_title(filename[start:end])
return title or None
def parse_filename(
filename: str,
model: BertForTokenClassification,
tokenizer: AnimeTokenizer,
id2label: Dict[int, str],
max_length: int = 64,
debug: bool = False,
use_rules: bool = True,
constrain_bio: bool = True,
) -> Dict:
"""
Parse an anime filename and extract structured metadata.
Args:
filename: Raw anime filename string.
model: Trained BertForTokenClassification model.
tokenizer: AnimeTokenizer instance.
id2label: Mapping from label ID to label string.
max_length: Maximum sequence length (including special tokens).
Returns:
Dict with parsed fields (title, season, episode, etc.).
"""
# Tokenize
tokens = tokenizer.tokenize(filename)
if not tokens:
return {"title": None, "season": None, "episode": None,
"group": None, "resolution": None, "source": None,
"special": None}
# Convert to input IDs
input_ids = tokenizer.convert_tokens_to_ids(tokens)
embedding_size = model.get_input_embeddings().weight.shape[0]
out_of_range_tokens = [
token for token, token_id in zip(tokens, input_ids)
if token_id >= embedding_size
]
if out_of_range_tokens:
input_ids = [
token_id if token_id < embedding_size else tokenizer.unk_token_id
for token_id in input_ids
]
unk_token_id = tokenizer.unk_token_id
unk_tokens = [token for token, token_id in zip(tokens, input_ids) if token_id == unk_token_id]
# Add special tokens
input_ids = [tokenizer.cls_token_id] + input_ids + [tokenizer.sep_token_id]
attention_mask = [1] * len(input_ids)
# Truncate if needed
if len(input_ids) > max_length:
input_ids = [input_ids[0]] + input_ids[1:max_length - 1] + [tokenizer.sep_token_id]
attention_mask = [1] * len(input_ids)
# Pad
pad_len = max_length - len(input_ids)
if pad_len > 0:
input_ids += [tokenizer.pad_token_id] * pad_len
attention_mask += [0] * pad_len
# Predict
device = next(model.parameters()).device
input_tensor = torch.tensor([input_ids], device=device)
mask_tensor = torch.tensor([attention_mask], device=device)
# Remove special token predictions
# Count real tokens used (minus CLS/SEP)
real_token_count = len(tokens)
# Truncate real tokens if we had to truncate
available = min(real_token_count, max_length - 2)
if available <= 0:
return {"title": None, "season": None, "episode": None,
"group": None, "resolution": None, "source": None,
"special": None}
with torch.no_grad():
logits = model(input_ids=input_tensor, attention_mask=mask_tensor).logits
token_logits = logits[0, 1:1 + available, :]
probabilities = torch.softmax(token_logits, dim=-1)
scores, greedy_predictions = torch.max(probabilities, dim=-1)
if constrain_bio:
pred_labels = constrained_bio_decode(token_logits, id2label)
selected_scores = [
probabilities[idx, label_id].detach().cpu().item()
for idx, label_id in enumerate(pred_labels)
]
else:
pred_labels = greedy_predictions.detach().cpu().tolist()
selected_scores = scores.detach().cpu().tolist()
label_strings = [id2label.get(p, "O") for p in pred_labels]
# Post-process
result = postprocess(
tokens[:available],
label_strings,
tokenizer=tokenizer,
filename=filename,
use_rules=use_rules,
)
if debug:
result["_debug"] = {
"tokenizer_variant": getattr(tokenizer, "tokenizer_variant", "regex"),
"decoder": "constrained_bio" if constrain_bio else "greedy",
"max_length": max_length,
"token_count": len(tokens),
"available_token_count": available,
"truncated": len(tokens) > available,
"unk_count": len(unk_tokens),
"unk_rate": len(unk_tokens) / len(tokens) if tokens else 0.0,
"unk_tokens": unk_tokens[:50],
"vocab_mismatch": bool(out_of_range_tokens),
"model_embedding_size": int(embedding_size),
"tokenizer_vocab_size": int(tokenizer.vocab_size),
"out_of_range_tokens": out_of_range_tokens[:50],
"tokens": tokens[:available],
"labels": label_strings,
"scores": [round(float(score), 4) for score in selected_scores],
"token_table": [
{
"i": i,
"token": display_token(token),
"id": int(token_id),
"label": label,
"score": round(float(score), 4),
}
for i, (token, token_id, label, score) in enumerate(
zip(tokens[:available], input_ids[1:1 + available], label_strings, selected_scores)
)
],
"entities": [
{"type": entity_type, "text": text}
for entity_type, text in labels_to_entities(tokens[:available], label_strings, tokenizer)
],
}
return result
def main():
parser = argparse.ArgumentParser(description="Anime filename parser")
parser.add_argument("filename", nargs="?", type=str, help="Anime filename to parse")
parser.add_argument("--input-file", type=str, help="File with filenames (one per line)")
parser.add_argument("--output-file", type=str, help="Output file for results (JSONL)")
parser.add_argument("--model-dir", type=str, default=".",
help="Path to trained model directory")
parser.add_argument("--tokenizer", choices=["regex", "char"], default=None,
help="Tokenizer variant override. Defaults to checkpoint metadata")
parser.add_argument("--max-length", type=int, default=64,
help="Maximum sequence length")
parser.add_argument("--debug", action="store_true",
help="Include tokenizer, labels, scores, and entity spans in JSON output")
parser.add_argument("--no-rule-assist", action="store_true",
help="Disable high-confidence structural post-processing rules")
parser.add_argument("--no-constrained-bio", action="store_true",
help="Use greedy per-token decoding instead of constrained BIO Viterbi")
args = parser.parse_args()
# Load config
cfg = Config()
# Load tokenizer
print(f"Loading tokenizer from {args.model_dir}...", file=sys.stderr)
tokenizer = load_tokenizer(args.model_dir, args.tokenizer)
# Load model
print(f"Loading model from {args.model_dir}...", file=sys.stderr)
model = BertForTokenClassification.from_pretrained(args.model_dir)
model.eval()
id2label = {int(k): v for k, v in getattr(model.config, "id2label", cfg.id2label).items()}
max_length = args.max_length
if max_length == 64:
max_length = int(getattr(model.config, "max_seq_length", max_length))
# Process filenames
filenames_to_parse: List[str] = []
if args.filename:
filenames_to_parse.append(args.filename)
if args.input_file:
with open(args.input_file, 'r', encoding='utf-8') as f:
filenames_to_parse.extend(line.strip() for line in f if line.strip())
if not filenames_to_parse:
# Read from stdin
filenames_to_parse.extend(sys.stdin.read().strip().splitlines())
# Parse and output
results: List[Dict] = []
for fn in filenames_to_parse:
if not fn.strip():
continue
result = parse_filename(
fn,
model,
tokenizer,
id2label,
max_length,
debug=args.debug,
use_rules=not args.no_rule_assist,
constrain_bio=not args.no_constrained_bio,
)
result["_input"] = fn
results.append(result)
if args.output_file is None:
print(json.dumps(result, ensure_ascii=False))
if args.output_file:
with open(args.output_file, 'w', encoding='utf-8') as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + '\n')
print(f"Results saved to {args.output_file}", file=sys.stderr)
if __name__ == "__main__":
main()