Token Classification
Transformers
ONNX
Safetensors
English
Japanese
Chinese
bert
anime
filename-parsing
Eval Results (legacy)
Instructions to use ModerRAS/AniFileBERT with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use ModerRAS/AniFileBERT with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("token-classification", model="ModerRAS/AniFileBERT")# Load model directly from transformers import AutoTokenizer, AutoModelForTokenClassification tokenizer = AutoTokenizer.from_pretrained("ModerRAS/AniFileBERT") model = AutoModelForTokenClassification.from_pretrained("ModerRAS/AniFileBERT") - Notebooks
- Google Colab
- Kaggle
| """ | |
| Inference script for anime filename parser. | |
| Loads a trained model and tokenizer, parses anime filenames, | |
| and outputs structured metadata. | |
| Usage: | |
| python inference.py "[ANi] 葬送的芙莉莲 S2 - 03 [1080P][WEB-DL]" | |
| python inference.py --input-file filenames.txt --output-file results.jsonl | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from typing import Dict, List, Optional, Tuple | |
| import torch | |
| from transformers import BertForTokenClassification | |
| from config import Config | |
| from label_repairs import season_marker_number | |
| from tokenizer import AnimeTokenizer, load_tokenizer | |
| # Chinese number mapping | |
| CN_NUM_MAP: Dict[str, int] = { | |
| "一": 1, "二": 2, "三": 3, "四": 4, "五": 5, | |
| "六": 6, "七": 7, "八": 8, "九": 9, "十": 10, | |
| } | |
| def extract_season_number(text: str) -> Optional[int]: | |
| """ | |
| Extract season number from various season formats. | |
| Examples: | |
| "S2" → 2, "Season 2" → 2, "第二季" → 2, "1st Season" → 1 | |
| """ | |
| marker_value = season_marker_number(text) | |
| if marker_value is not None: | |
| return marker_value | |
| # Arabic digits | |
| match = re.search(r'(\d+)', text) | |
| if match: | |
| return int(match.group(1)) | |
| # Chinese digits | |
| for cn, num in CN_NUM_MAP.items(): | |
| if cn in text: | |
| return num | |
| return None | |
| def extract_episode_number(text: str) -> Optional[int]: | |
| """ | |
| Extract episode number from various episode formats. | |
| Examples: | |
| "03" → 3, "EP21" → 21, "第7话" → 7, "#01" → 1 | |
| """ | |
| match = re.search(r'(\d+)', text) | |
| if match: | |
| return int(match.group(1)) | |
| return None | |
| def extract_resolution(text: str) -> Optional[str]: | |
| """Extract resolution string (e.g., '1080P', '4K', '1920x1080').""" | |
| # Strip brackets for matching | |
| clean = text.strip("[]()【】") | |
| return clean if clean else None | |
| def display_token(token: str) -> str: | |
| """Make whitespace tokens visible in debug output.""" | |
| if token == " ": | |
| return "<SPACE>" | |
| if token == "\t": | |
| return "<TAB>" | |
| return token | |
| def trim_decorations(text: str) -> str: | |
| """Trim outer release brackets from an extracted entity.""" | |
| return text.strip().strip("[]()【】《》()").strip() | |
| def join_entity_tokens(tokens: List[str], tokenizer: Optional[AnimeTokenizer] = None) -> str: | |
| """Join entity tokens according to the tokenizer granularity.""" | |
| if tokenizer is not None and getattr(tokenizer, "tokenizer_variant", "regex") == "char": | |
| return "".join(tokens) | |
| text = "".join(tokens) | |
| if " " in tokens: | |
| return text | |
| return text | |
| def labels_to_entities( | |
| tokens: List[str], | |
| labels: List[str], | |
| tokenizer: Optional[AnimeTokenizer] = None, | |
| ) -> List[Tuple[str, str]]: | |
| """ | |
| Convert BIO labels into entity spans. | |
| Illegal orphan I-X labels start a new entity so debug output exposes the | |
| model behavior instead of silently dropping tokens. | |
| """ | |
| entities: List[Tuple[str, str]] = [] | |
| current_entity: Optional[str] = None | |
| current_tokens: List[str] = [] | |
| for token, label in zip(tokens, labels): | |
| if label.startswith("B-"): | |
| if current_entity: | |
| entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) | |
| current_entity = label[2:] | |
| current_tokens = [token] | |
| elif label.startswith("I-"): | |
| entity_type = label[2:] | |
| if current_entity == entity_type: | |
| current_tokens.append(token) | |
| else: | |
| if current_entity: | |
| entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) | |
| current_entity = entity_type | |
| current_tokens = [token] | |
| else: | |
| if current_entity: | |
| entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) | |
| current_entity = None | |
| current_tokens = [] | |
| if current_entity: | |
| entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) | |
| return entities | |
| def is_allowed_bio_transition(previous_label: str, label: str) -> bool: | |
| """Return whether previous_label -> label is valid under IOB2.""" | |
| if label.startswith("I-"): | |
| entity = label[2:] | |
| return previous_label in {f"B-{entity}", f"I-{entity}"} | |
| return True | |
| def constrained_bio_decode(emissions: torch.Tensor, id2label: Dict[int, str]) -> List[int]: | |
| """ | |
| Decode token logits with hard BIO transition constraints. | |
| This is a lightweight CRF-style Viterbi decoder without learned transition | |
| weights. It prevents impossible orphan I-X spans at inference time. | |
| """ | |
| if emissions.numel() == 0: | |
| return [] | |
| num_tokens, num_labels = emissions.shape | |
| scores = emissions.detach().cpu() | |
| backpointers = torch.zeros((num_tokens, num_labels), dtype=torch.long) | |
| dp = torch.full((num_labels,), float("-inf")) | |
| for label_id in range(num_labels): | |
| label = id2label.get(label_id, "O") | |
| if not label.startswith("I-"): | |
| dp[label_id] = scores[0, label_id] | |
| for idx in range(1, num_tokens): | |
| next_dp = torch.full((num_labels,), float("-inf")) | |
| for label_id in range(num_labels): | |
| label = id2label.get(label_id, "O") | |
| best_score = float("-inf") | |
| best_prev = 0 | |
| for prev_id in range(num_labels): | |
| prev_label = id2label.get(prev_id, "O") | |
| if not is_allowed_bio_transition(prev_label, label): | |
| continue | |
| candidate = dp[prev_id] + scores[idx, label_id] | |
| if candidate > best_score: | |
| best_score = float(candidate) | |
| best_prev = prev_id | |
| next_dp[label_id] = best_score | |
| backpointers[idx, label_id] = best_prev | |
| dp = next_dp | |
| best_last = int(torch.argmax(dp).item()) | |
| decoded = [best_last] | |
| for idx in range(num_tokens - 1, 0, -1): | |
| decoded.append(int(backpointers[idx, decoded[-1]].item())) | |
| decoded.reverse() | |
| return decoded | |
| def postprocess( | |
| tokens: List[str], | |
| labels: List[str], | |
| tokenizer: Optional[AnimeTokenizer] = None, | |
| filename: Optional[str] = None, | |
| use_rules: bool = True, | |
| ) -> Dict: | |
| """ | |
| Convert BIO-labeled tokens into structured metadata. | |
| Merges consecutive B- / I- tokens of the same entity type, | |
| then extracts structured fields. | |
| """ | |
| result: Dict = { | |
| "title": None, | |
| "season": None, | |
| "episode": None, | |
| "group": None, | |
| "resolution": None, | |
| "source": None, | |
| "special": None, | |
| } | |
| entities = labels_to_entities(tokens, labels, tokenizer) | |
| # Fill result | |
| for entity_type, text in entities: | |
| if entity_type == "TITLE": | |
| result["title"] = result["title"] or trim_decorations(text) | |
| # If we find multiple title fragments, concatenate them | |
| # (handles "That" + ... + "Time" etc.) | |
| elif entity_type == "SEASON": | |
| season_num = extract_season_number(text) | |
| if season_num is not None: | |
| # Keep the highest/last season number if multiple | |
| result["season"] = season_num | |
| elif entity_type == "EPISODE": | |
| ep_num = extract_episode_number(text) | |
| if ep_num is not None: | |
| if result["episode"] is None: | |
| result["episode"] = ep_num | |
| elif entity_type == "GROUP": | |
| group = text.strip("[]()【】") | |
| if result["group"] is None: | |
| result["group"] = group | |
| elif entity_type == "SPECIAL": | |
| special = text.strip("[]()【】") | |
| result["special"] = special | |
| elif entity_type == "RESOLUTION": | |
| res = extract_resolution(text) | |
| if res: | |
| result["resolution"] = res | |
| elif entity_type == "SOURCE": | |
| src = text.strip("[]()【】") | |
| result["source"] = src | |
| # Handle multi-fragment titles: concatenate all TITLE fragments | |
| # (This is needed because O tokens between words break entity continuity) | |
| title_fragments = [t for e, t in entities if e == "TITLE"] | |
| if title_fragments: | |
| result["title"] = " ".join( | |
| trimmed for f in title_fragments | |
| if (trimmed := trim_decorations(f)) | |
| ) | |
| if use_rules and filename: | |
| result = apply_rule_assists(filename, result) | |
| return result | |
| BRACKET_RE = re.compile(r"\[([^\]]+)\]|\(([^)]+)\)|【([^】]+)】|《([^》]+)》") | |
| RESOLUTION_RE = re.compile(r"(?<![A-Za-z0-9])(?:\d{3,4}[pP]|\d[Kk]|\d{3,4}[xX×]\d{3,4})(?![A-Za-z0-9])") | |
| SOURCE_TOKEN_PATTERN = ( | |
| r"WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|BD|DVDRip|DVD|TVRip|HDTV|" | |
| r"Netflix|NF|AMZN|Baha|CR|ABEMA|DSNP|U[-_ ]?NEXT|Hulu|AT[-_ ]?X|" | |
| r"CHS|CHT|GB|BIG5|JPN?|繁中|简中" | |
| ) | |
| SOURCE_RE = re.compile(rf"\b(?:{SOURCE_TOKEN_PATTERN})\b", re.I) | |
| SOURCE_TAG_RE = re.compile( | |
| rf"^(?:{SOURCE_TOKEN_PATTERN})(?:\s*(?:[&+/]|,\s*)\s*(?:{SOURCE_TOKEN_PATTERN}))*$", | |
| re.I, | |
| ) | |
| SPECIAL_TAG_RE = re.compile( | |
| r"^(?:檢索|检索|搜索|搜寻|搜尋|别名|別名|alias|search|keyword)\s*[::].+", | |
| re.I, | |
| ) | |
| EPISODE_PATTERNS = [ | |
| ("season_episode", re.compile(r"[Ss]\d{1,2}[Ee](?P<ep>\d{1,4})(?:v\d+)?", re.I)), | |
| ("dash_episode", re.compile(r"(?:^|[\s._])[-_]\s*(?P<ep>\d{1,4})(?:v\d+)?(?=$|[\s._\-\]\)】》\[])")), | |
| ("bracket_episode", re.compile(r"[\[\(【《](?:EP?|#)?(?P<ep>\d{1,4})(?:v\d+)?[\]\)】》]", re.I)), | |
| ("explicit_episode", re.compile(r"(?:^|[\s._\-\[\(【《#])(?:EP?|第|#)(?P<ep>\d{1,4})(?:v\d+)?(?:[话話集])?(?=$|[\s._\-\]\)】》])", re.I)), | |
| ( | |
| "long_episode", | |
| re.compile( | |
| r"(?:^|[\s._\-\[\(【《])(?P<ep>\d{3,4})(?:v\d+)?" | |
| r"(?=[\s._\-\]\)】》\[]+(?:\d{3,4}[pP]|WEB|BD|BluRay|HDTV|NF|AMZN|CR|Baha))", | |
| re.I, | |
| ), | |
| ), | |
| ("generic_episode", re.compile(r"(?:^|[\s._\-\[\(【《#])(?P<ep>\d{1,3})(?:v\d+)?(?=$|[\s._\-\]\)】》])", re.I)), | |
| ] | |
| SEASON_RE = re.compile(r"(?:^|[\s._\-\[\(【《])(?:[Ss](?P<s1>\d{1,2})|Season\s*(?P<s2>\d{1,2})|第(?P<s3>[一二三四五六七八九十\d]+)[季期部])", re.I) | |
| SEQUEL_MARKER_RE = re.compile( | |
| r"(?<![A-Za-z0-9])" | |
| r"(?P<marker>" | |
| r"Ni\s+no\s+(?:Sara|Shou|Sho|Syo|Shō)|" | |
| r"San\s+no\s+(?:Sara|Shou|Sho|Syo)|" | |
| r"(?:Yon|Shi|Shin)\s+no\s+Sara|" | |
| r"(?:Go|Gou)\s+no\s+Sara|" | |
| r"Ni\s+Gakki|Sono\s+Ni|Ni|" | |
| r"II|III|IV|V|VI|VII|VIII|IX|[ⅡⅢⅣⅤⅥⅦⅧⅨ]|" | |
| r"[一二三四五六七八九十兩两貳贰弐弍參叁参肆伍陸陆柒捌玖](?:\s*(?:ノ|の|之)\s*(?:章|期|季|部))?" | |
| r")" | |
| r"(?![A-Za-z0-9])", | |
| re.I, | |
| ) | |
| TRAILING_SEQUEL_MARKER_RE = re.compile( | |
| r"(?:^|[\s._-])" | |
| r"(?P<marker>" | |
| r"Ni\s+no\s+(?:Sara|Shou|Sho|Syo|Shō)|" | |
| r"San\s+no\s+(?:Sara|Shou|Sho|Syo)|" | |
| r"(?:Yon|Shi|Shin)\s+no\s+Sara|" | |
| r"(?:Go|Gou)\s+no\s+Sara|" | |
| r"Ni\s+Gakki|Sono\s+Ni|Ni|" | |
| r"II|III|IV|V|VI|VII|VIII|IX|[ⅡⅢⅣⅤⅥⅦⅧⅨ]|" | |
| r"[一二三四五六七八九十兩两貳贰弐弍參叁参肆伍陸陆柒捌玖](?:\s*(?:ノ|の|之)\s*(?:章|期|季|部))?" | |
| r")$", | |
| re.I, | |
| ) | |
| NOISE_META_RE = re.compile( | |
| r"^(?:\d{3,4}[pP]|\d[Kk]|WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|BD|DVDRip|DVD|TVRip|" | |
| r"HDTV|Netflix|NF|AMZN|Baha|CR|HEVC|AVC|AV1|x26[45]|h\.?26[45]|AAC.*|FLAC|MP3|DTS|" | |
| r"Opus|ASS.*|CHS|CHT|BIG5|GB|JPN?|MP4|MKV|繁中|简中|内封|外挂)$", | |
| re.I, | |
| ) | |
| def cn_number_to_int(text: str) -> Optional[int]: | |
| if text.isdigit(): | |
| return int(text) | |
| values = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9} | |
| if text == "十": | |
| return 10 | |
| if text.startswith("十") and len(text) == 2: | |
| return 10 + values.get(text[1], 0) | |
| if text.endswith("十") and len(text) == 2: | |
| return values.get(text[0], 0) * 10 | |
| if "十" in text and len(text) == 3: | |
| return values.get(text[0], 0) * 10 + values.get(text[2], 0) | |
| return values.get(text) | |
| def bracket_parts(filename: str) -> List[Tuple[str, int, int]]: | |
| parts: List[Tuple[str, int, int]] = [] | |
| for match in BRACKET_RE.finditer(filename): | |
| text = next(group for group in match.groups() if group is not None) | |
| parts.append((text.strip(), match.start(), match.end())) | |
| return parts | |
| def looks_like_group(text: str) -> bool: | |
| if not text or NOISE_META_RE.search(text): | |
| return False | |
| return bool( | |
| re.search( | |
| r"(?:字幕|字幕组|字幕組|sub|subs|raws?|fansub|studio|house|team|project|" | |
| r"loli|ani|vcb|airota|kiss|dmhy|erai|subsplease)", | |
| text, | |
| re.I, | |
| ) | |
| ) | |
| def looks_like_episode_or_meta(text: str) -> bool: | |
| if not text: | |
| return False | |
| clean = text.strip() | |
| return bool( | |
| re.fullmatch(r"(?:EP?|#)?\d{1,4}(?:v\d+)?", clean, re.I) | |
| or RESOLUTION_RE.search(clean) | |
| or SOURCE_TAG_RE.fullmatch(clean) | |
| or SOURCE_RE.search(clean) | |
| or SPECIAL_TAG_RE.search(clean) | |
| or NOISE_META_RE.search(clean) | |
| ) | |
| def looks_like_structural_group(text: str, filename: str, bracket_end: int) -> bool: | |
| """Heuristic for short leading release-group brackets not in the name list.""" | |
| if looks_like_group(text): | |
| return True | |
| if not text or looks_like_episode_or_meta(text): | |
| return False | |
| after = filename[bracket_end:].lstrip(" \t._") | |
| if after.startswith("-"): | |
| return False | |
| next_bracket = BRACKET_RE.match(after) | |
| if next_bracket: | |
| next_text = next(group for group in next_bracket.groups() if group is not None) | |
| if looks_like_episode_or_meta(next_text): | |
| return False | |
| words = re.findall(r"[A-Za-z0-9]+", text) | |
| if not words: | |
| if re.search(r"[\u3400-\u9fff]", text) and len(text) <= 32: | |
| return True | |
| return False | |
| if len(text) > 32: | |
| return False | |
| if len(words) == 1: | |
| return True | |
| if any(sep in text for sep in "-_"): | |
| return True | |
| if words[0].isupper() and len(words[0]) <= 4 and len(words) <= 3: | |
| return True | |
| return False | |
| def apply_rule_assists(filename: str, result: Dict) -> Dict: | |
| """ | |
| Fill high-confidence structural fields from filename conventions. | |
| The model remains the primary tagger; rules only fill missing obvious fields | |
| or repair common boundary drift around leading group brackets and episodes. | |
| """ | |
| repaired = dict(result) | |
| brackets = bracket_parts(filename) | |
| if (not repaired.get("group") or (repaired.get("title") and repaired["group"] in repaired["title"])) and brackets: | |
| first_text, first_start, first_end = brackets[0] | |
| if first_start == 0 and looks_like_structural_group(first_text, filename, first_end): | |
| repaired["group"] = first_text | |
| if not repaired.get("resolution"): | |
| match = RESOLUTION_RE.search(filename) | |
| if match: | |
| repaired["resolution"] = match.group(0) | |
| source_matches = source_candidates(filename) | |
| current_source = repaired.get("source") | |
| preferred_source = source_matches[0] if source_matches else None | |
| if source_matches and ( | |
| not current_source | |
| or not SOURCE_RE.fullmatch(str(current_source)) | |
| or len(str(current_source)) <= 3 and str(current_source).lower() not in {"nf", "cr"} | |
| or ( | |
| preferred_source | |
| and str(current_source).lower().replace("_", "-") in {"web-dl", "webdl", "webrip", "web-rip"} | |
| and preferred_source.lower().replace("_", "-") not in {"web-dl", "webdl", "webrip", "web-rip"} | |
| ) | |
| ): | |
| repaired["source"] = preferred_source | |
| if not repaired.get("special"): | |
| for text, _start, _end in brackets: | |
| clean = text.strip() | |
| if SPECIAL_TAG_RE.search(clean): | |
| repaired["special"] = clean | |
| break | |
| episode = best_structural_episode(filename) | |
| if episode is not None and ( | |
| repaired.get("episode") is None | |
| or not plausible_episode_context(filename, int(repaired["episode"])) | |
| ): | |
| repaired["episode"] = episode | |
| if repaired.get("season") is None: | |
| match = SEASON_RE.search(filename) | |
| if match: | |
| value = next(group for group in match.groups() if group) | |
| season = cn_number_to_int(value) | |
| if season is not None: | |
| repaired["season"] = season | |
| if repaired.get("season") is None and repaired.get("episode") is not None: | |
| sequel = structural_sequel_marker(filename, repaired.get("group"), repaired.get("episode")) | |
| if sequel is not None: | |
| repaired["season"] = sequel[1] | |
| elif repaired.get("episode") == repaired.get("season") and not SEASON_RE.search(filename): | |
| repaired["season"] = None | |
| title = repaired.get("title") | |
| group = repaired.get("group") | |
| if group and (NOISE_META_RE.search(str(group)) or SOURCE_RE.fullmatch(str(group)) or RESOLUTION_RE.fullmatch(str(group))): | |
| repaired["group"] = None | |
| group = None | |
| if title and group and title.startswith(group): | |
| title = title[len(group):].lstrip("]】)>})》 \t-_.") | |
| repaired["title"] = title or repaired["title"] | |
| if repaired.get("episode"): | |
| repaired_title = infer_title_span(filename, group, repaired["episode"]) | |
| if repaired_title: | |
| repaired["title"] = repaired_title | |
| if repaired.get("title") and repaired.get("season") is not None: | |
| repaired["title"] = strip_trailing_season_from_title(repaired["title"], repaired["season"]) | |
| return repaired | |
| def structural_sequel_marker( | |
| filename: str, | |
| group: Optional[str], | |
| episode: Optional[int], | |
| ) -> Optional[Tuple[str, int]]: | |
| if episode is None: | |
| return None | |
| title_end = None | |
| if episode is not None: | |
| ep_patterns = [ | |
| rf"[Ss]\d{{1,2}}[Ee]0*{episode}(?:v\d+)?", | |
| rf"\s[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", | |
| rf"[\[\(【《]0*{episode}(?:v\d+)?[\]\)】》]", | |
| rf"#\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", | |
| rf"(?:^|[\s._\-\[\(【《])第0*{episode}(?:[话話集])?(?=$|[\s._\-\]\)】》])", | |
| ] | |
| start = 0 | |
| if group: | |
| first = BRACKET_RE.match(filename) | |
| if first and group in first.group(0): | |
| start = first.end() | |
| for pattern in ep_patterns: | |
| match = re.search(pattern, filename[start:], re.I) | |
| if match: | |
| title_end = start + match.start() | |
| break | |
| if title_end is None: | |
| return None | |
| prefix = filename[:title_end].rstrip(" \t-_.") | |
| for match in reversed(list(SEQUEL_MARKER_RE.finditer(prefix))): | |
| marker = match.group("marker") | |
| value = season_marker_number(marker) | |
| if value is None: | |
| continue | |
| tail = prefix[match.end():].strip(" \t-_.") | |
| if tail: | |
| continue | |
| if marker.lower() == "ni" and "Kakuriyo no Yadomeshi Ni" not in prefix: | |
| continue | |
| return marker, value | |
| return None | |
| def normalize_source_text(text: str) -> str: | |
| text = re.sub(r"\s+", "", text.strip()) | |
| text = re.sub(r"(?i)WEB[_ ]?DL", "WEB-DL", text) | |
| text = re.sub(r"(?i)WEB[_ ]?Rip", "WebRip", text) | |
| text = re.sub(r"(?i)U[_ ]?NEXT", "U-NEXT", text) | |
| text = re.sub(r"(?i)AT[_ ]?X", "AT-X", text) | |
| return text.replace("_", "-") | |
| def source_priority(source: str) -> int: | |
| normalized = source.lower().replace("_", "-").replace(" ", "") | |
| parts = re.split(r"[&+/,]", normalized) | |
| if any(part in {"nf", "netflix", "amzn", "baha", "cr", "abema", "dsnp", "u-next", "hulu", "at-x"} for part in parts): | |
| return 90 | |
| if any(part in {"web-dl", "webdl", "webrip", "web-rip", "bdrip", "bluray", "bdmv", "bd", "dvdrip", "dvd", "tvrip", "hdtv"} for part in parts): | |
| return 60 | |
| if len(parts) > 1: | |
| return 40 | |
| return 20 | |
| def source_candidates(filename: str) -> List[str]: | |
| candidates: List[Tuple[int, int, str]] = [] | |
| for text, start, _end in bracket_parts(filename): | |
| clean = text.strip() | |
| if SOURCE_TAG_RE.fullmatch(clean): | |
| normalized = normalize_source_text(clean) | |
| candidates.append((source_priority(normalized), -start, normalized)) | |
| for match in SOURCE_RE.finditer(filename): | |
| normalized = normalize_source_text(match.group(0)) | |
| candidates.append((source_priority(normalized), -match.start(), normalized)) | |
| deduped: Dict[str, Tuple[int, int, str]] = {} | |
| for priority, neg_start, value in candidates: | |
| key = value.lower() | |
| if key not in deduped or (priority, neg_start) > (deduped[key][0], deduped[key][1]): | |
| deduped[key] = (priority, neg_start, value) | |
| return [value for _priority, _neg_start, value in sorted(deduped.values(), reverse=True)] | |
| def best_structural_episode(filename: str) -> Optional[int]: | |
| priorities = { | |
| "season_episode": 1000, | |
| "dash_episode": 900, | |
| "bracket_episode": 850, | |
| "explicit_episode": 800, | |
| "long_episode": 750, | |
| "generic_episode": 100, | |
| } | |
| candidates: List[Tuple[int, int, int]] = [] | |
| for name, pattern in EPISODE_PATTERNS: | |
| for match in pattern.finditer(filename): | |
| ep_text = match.group("ep") | |
| ep = int(ep_text) | |
| if ep == 0 or ep > 2000: | |
| continue | |
| context = filename[max(0, match.start() - 5):match.end() + 5] | |
| if RESOLUTION_RE.search(context) or re.search(r"AAC|DDP|AC3|H\.?26[45]|x26[45]", context, re.I): | |
| continue | |
| priority = priorities[name] | |
| if 1 <= ep <= 200: | |
| priority += 20 | |
| candidates.append((priority, match.start(), ep)) | |
| if not candidates: | |
| return None | |
| return max(candidates, key=lambda item: (item[0], item[1]))[2] | |
| def plausible_episode_context(filename: str, episode: int) -> bool: | |
| ep_text = str(episode) | |
| padded = f"{episode:02d}" | |
| if re.search(rf"(?<![A-Za-z0-9])(?:H|x)\.?0*{re.escape(ep_text)}(?!\d)", filename, re.I): | |
| return False | |
| patterns = [ | |
| rf"[Ss]\d{{1,2}}[Ee]0*{episode}(?:v\d+)?", | |
| rf"(?:^|[\s._])[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s._\-\]\)】》\[])", | |
| rf"[\[\(【《](?:EP?|#)?0*{episode}(?:v\d+)?[\]\)】》]", | |
| rf"(?:^|[\s._\-\[\(【《#])(?:EP?|第|#)0*{episode}(?:v\d+)?(?:[话話集])?(?=$|[\s._\-\]\)】》])", | |
| rf"(?:^|[\s._\-\[\(【《])0*{episode}(?:v\d+)?(?=[\s._\-\]\)】》\[]+(?:\d{{3,4}}[pP]|WEB|BD|BluRay|HDTV|NF|AMZN|CR|Baha))", | |
| ] | |
| return any(re.search(pattern, filename, re.I) for pattern in patterns) or bool( | |
| re.search(rf"(?:^|[\s._\-\[\(【《])(?:{re.escape(ep_text)}|{re.escape(padded)})(?=$|[\s._\-\]\)】》])", filename) | |
| ) | |
| def strip_trailing_season_from_title(title: str, season: int) -> str: | |
| season_text = str(season) | |
| patterns = [ | |
| rf"\s+[Ss]0*{season_text}$", | |
| rf"\s+Season\s*0*{season_text}$", | |
| rf"\s+0*{season_text}$", | |
| ] | |
| cleaned = title | |
| for pattern in patterns: | |
| cleaned = re.sub(pattern, "", cleaned, flags=re.I).strip(" \t-_.") | |
| match = TRAILING_SEQUEL_MARKER_RE.search(cleaned) | |
| if match and season_marker_number(match.group("marker")) == season: | |
| cleaned = cleaned[:match.start()].strip(" \t-_.") | |
| return cleaned or title | |
| def clean_inferred_title(title: str) -> str: | |
| raw_title = title.strip(" \t-_.") | |
| bracket_matches = list(BRACKET_RE.finditer(raw_title)) | |
| if bracket_matches: | |
| first = bracket_matches[0] | |
| prefix = raw_title[:first.start()].strip(" \t-_.★☆") | |
| text = next(group for group in first.groups() if group is not None).strip() | |
| if text and not looks_like_episode_or_meta(text) and ( | |
| not prefix | |
| or re.search(r"(?:新番|月|合集|繁|简|字幕|先行|合集|★|☆)", prefix, re.I) | |
| ): | |
| return text | |
| return raw_title.strip("[]()【】《》()") | |
| def infer_title_span(filename: str, group: Optional[str], episode: Optional[int]) -> Optional[str]: | |
| start = 0 | |
| if group: | |
| first = BRACKET_RE.match(filename) | |
| if first and group in first.group(0): | |
| start = first.end() | |
| else: | |
| # Some releases put leading metadata before the actual title, e.g. | |
| # `[1080p] Title - 01`. Do not keep that wrapper as title text. | |
| while True: | |
| leading = BRACKET_RE.match(filename[start:].lstrip(" \t._-")) | |
| if not leading: | |
| break | |
| skipped_ws = len(filename[start:]) - len(filename[start:].lstrip(" \t._-")) | |
| text = next(group for group in leading.groups() if group is not None) | |
| if not looks_like_episode_or_meta(text): | |
| break | |
| start += skipped_ws + leading.end() | |
| end = None | |
| if episode is not None: | |
| ep_patterns = [ | |
| rf"[Ss]\d{{1,2}}[Ee]0*{episode}(?:v\d+)?", | |
| rf"\s[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", | |
| rf"[\[\(【《]0*{episode}(?:v\d+)?[\]\)】》]", | |
| rf"#\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", | |
| rf"(?:^|[\s._\-\[\(【《])第0*{episode}(?:[话話集])?(?=$|[\s._\-\]\)】》])", | |
| rf"[Ee]0*{episode}(?:v\d+)?", | |
| ] | |
| for pattern in ep_patterns: | |
| match = re.search(pattern, filename[start:], re.I) | |
| if match: | |
| end = start + match.start() | |
| break | |
| if end is None: | |
| for text, bracket_start, _bracket_end in bracket_parts(filename): | |
| if bracket_start <= start: | |
| continue | |
| if NOISE_META_RE.search(text) or RESOLUTION_RE.search(text) or SOURCE_RE.search(text): | |
| end = bracket_start | |
| break | |
| if end is None or end <= start: | |
| return None | |
| title = clean_inferred_title(filename[start:end]) | |
| return title or None | |
| def parse_filename( | |
| filename: str, | |
| model: BertForTokenClassification, | |
| tokenizer: AnimeTokenizer, | |
| id2label: Dict[int, str], | |
| max_length: int = 64, | |
| debug: bool = False, | |
| use_rules: bool = True, | |
| constrain_bio: bool = True, | |
| ) -> Dict: | |
| """ | |
| Parse an anime filename and extract structured metadata. | |
| Args: | |
| filename: Raw anime filename string. | |
| model: Trained BertForTokenClassification model. | |
| tokenizer: AnimeTokenizer instance. | |
| id2label: Mapping from label ID to label string. | |
| max_length: Maximum sequence length (including special tokens). | |
| Returns: | |
| Dict with parsed fields (title, season, episode, etc.). | |
| """ | |
| # Tokenize | |
| tokens = tokenizer.tokenize(filename) | |
| if not tokens: | |
| return {"title": None, "season": None, "episode": None, | |
| "group": None, "resolution": None, "source": None, | |
| "special": None} | |
| # Convert to input IDs | |
| input_ids = tokenizer.convert_tokens_to_ids(tokens) | |
| embedding_size = model.get_input_embeddings().weight.shape[0] | |
| out_of_range_tokens = [ | |
| token for token, token_id in zip(tokens, input_ids) | |
| if token_id >= embedding_size | |
| ] | |
| if out_of_range_tokens: | |
| input_ids = [ | |
| token_id if token_id < embedding_size else tokenizer.unk_token_id | |
| for token_id in input_ids | |
| ] | |
| unk_token_id = tokenizer.unk_token_id | |
| unk_tokens = [token for token, token_id in zip(tokens, input_ids) if token_id == unk_token_id] | |
| # Add special tokens | |
| input_ids = [tokenizer.cls_token_id] + input_ids + [tokenizer.sep_token_id] | |
| attention_mask = [1] * len(input_ids) | |
| # Truncate if needed | |
| if len(input_ids) > max_length: | |
| input_ids = [input_ids[0]] + input_ids[1:max_length - 1] + [tokenizer.sep_token_id] | |
| attention_mask = [1] * len(input_ids) | |
| # Pad | |
| pad_len = max_length - len(input_ids) | |
| if pad_len > 0: | |
| input_ids += [tokenizer.pad_token_id] * pad_len | |
| attention_mask += [0] * pad_len | |
| # Predict | |
| device = next(model.parameters()).device | |
| input_tensor = torch.tensor([input_ids], device=device) | |
| mask_tensor = torch.tensor([attention_mask], device=device) | |
| # Remove special token predictions | |
| # Count real tokens used (minus CLS/SEP) | |
| real_token_count = len(tokens) | |
| # Truncate real tokens if we had to truncate | |
| available = min(real_token_count, max_length - 2) | |
| if available <= 0: | |
| return {"title": None, "season": None, "episode": None, | |
| "group": None, "resolution": None, "source": None, | |
| "special": None} | |
| with torch.no_grad(): | |
| logits = model(input_ids=input_tensor, attention_mask=mask_tensor).logits | |
| token_logits = logits[0, 1:1 + available, :] | |
| probabilities = torch.softmax(token_logits, dim=-1) | |
| scores, greedy_predictions = torch.max(probabilities, dim=-1) | |
| if constrain_bio: | |
| pred_labels = constrained_bio_decode(token_logits, id2label) | |
| selected_scores = [ | |
| probabilities[idx, label_id].detach().cpu().item() | |
| for idx, label_id in enumerate(pred_labels) | |
| ] | |
| else: | |
| pred_labels = greedy_predictions.detach().cpu().tolist() | |
| selected_scores = scores.detach().cpu().tolist() | |
| label_strings = [id2label.get(p, "O") for p in pred_labels] | |
| # Post-process | |
| result = postprocess( | |
| tokens[:available], | |
| label_strings, | |
| tokenizer=tokenizer, | |
| filename=filename, | |
| use_rules=use_rules, | |
| ) | |
| if debug: | |
| result["_debug"] = { | |
| "tokenizer_variant": getattr(tokenizer, "tokenizer_variant", "regex"), | |
| "decoder": "constrained_bio" if constrain_bio else "greedy", | |
| "max_length": max_length, | |
| "token_count": len(tokens), | |
| "available_token_count": available, | |
| "truncated": len(tokens) > available, | |
| "unk_count": len(unk_tokens), | |
| "unk_rate": len(unk_tokens) / len(tokens) if tokens else 0.0, | |
| "unk_tokens": unk_tokens[:50], | |
| "vocab_mismatch": bool(out_of_range_tokens), | |
| "model_embedding_size": int(embedding_size), | |
| "tokenizer_vocab_size": int(tokenizer.vocab_size), | |
| "out_of_range_tokens": out_of_range_tokens[:50], | |
| "tokens": tokens[:available], | |
| "labels": label_strings, | |
| "scores": [round(float(score), 4) for score in selected_scores], | |
| "token_table": [ | |
| { | |
| "i": i, | |
| "token": display_token(token), | |
| "id": int(token_id), | |
| "label": label, | |
| "score": round(float(score), 4), | |
| } | |
| for i, (token, token_id, label, score) in enumerate( | |
| zip(tokens[:available], input_ids[1:1 + available], label_strings, selected_scores) | |
| ) | |
| ], | |
| "entities": [ | |
| {"type": entity_type, "text": text} | |
| for entity_type, text in labels_to_entities(tokens[:available], label_strings, tokenizer) | |
| ], | |
| } | |
| return result | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Anime filename parser") | |
| parser.add_argument("filename", nargs="?", type=str, help="Anime filename to parse") | |
| parser.add_argument("--input-file", type=str, help="File with filenames (one per line)") | |
| parser.add_argument("--output-file", type=str, help="Output file for results (JSONL)") | |
| parser.add_argument("--model-dir", type=str, default=".", | |
| help="Path to trained model directory") | |
| parser.add_argument("--tokenizer", choices=["regex", "char"], default=None, | |
| help="Tokenizer variant override. Defaults to checkpoint metadata") | |
| parser.add_argument("--max-length", type=int, default=64, | |
| help="Maximum sequence length") | |
| parser.add_argument("--debug", action="store_true", | |
| help="Include tokenizer, labels, scores, and entity spans in JSON output") | |
| parser.add_argument("--no-rule-assist", action="store_true", | |
| help="Disable high-confidence structural post-processing rules") | |
| parser.add_argument("--no-constrained-bio", action="store_true", | |
| help="Use greedy per-token decoding instead of constrained BIO Viterbi") | |
| args = parser.parse_args() | |
| # Load config | |
| cfg = Config() | |
| # Load tokenizer | |
| print(f"Loading tokenizer from {args.model_dir}...", file=sys.stderr) | |
| tokenizer = load_tokenizer(args.model_dir, args.tokenizer) | |
| # Load model | |
| print(f"Loading model from {args.model_dir}...", file=sys.stderr) | |
| model = BertForTokenClassification.from_pretrained(args.model_dir) | |
| model.eval() | |
| id2label = {int(k): v for k, v in getattr(model.config, "id2label", cfg.id2label).items()} | |
| max_length = args.max_length | |
| if max_length == 64: | |
| max_length = int(getattr(model.config, "max_seq_length", max_length)) | |
| # Process filenames | |
| filenames_to_parse: List[str] = [] | |
| if args.filename: | |
| filenames_to_parse.append(args.filename) | |
| if args.input_file: | |
| with open(args.input_file, 'r', encoding='utf-8') as f: | |
| filenames_to_parse.extend(line.strip() for line in f if line.strip()) | |
| if not filenames_to_parse: | |
| # Read from stdin | |
| filenames_to_parse.extend(sys.stdin.read().strip().splitlines()) | |
| # Parse and output | |
| results: List[Dict] = [] | |
| for fn in filenames_to_parse: | |
| if not fn.strip(): | |
| continue | |
| result = parse_filename( | |
| fn, | |
| model, | |
| tokenizer, | |
| id2label, | |
| max_length, | |
| debug=args.debug, | |
| use_rules=not args.no_rule_assist, | |
| constrain_bio=not args.no_constrained_bio, | |
| ) | |
| result["_input"] = fn | |
| results.append(result) | |
| if args.output_file is None: | |
| print(json.dumps(result, ensure_ascii=False)) | |
| if args.output_file: | |
| with open(args.output_file, 'w', encoding='utf-8') as f: | |
| for r in results: | |
| f.write(json.dumps(r, ensure_ascii=False) + '\n') | |
| print(f"Results saved to {args.output_file}", file=sys.stderr) | |
| if __name__ == "__main__": | |
| main() | |