""" Inference script for anime filename parser. Loads a trained model and tokenizer, parses anime filenames, and outputs structured metadata. Usage: python inference.py "[ANi] 葬送的芙莉莲 S2 - 03 [1080P][WEB-DL]" python inference.py --input-file filenames.txt --output-file results.jsonl """ import argparse import json import os import re import sys from typing import Dict, List, Optional, Tuple import torch from transformers import BertForTokenClassification from config import Config from tokenizer import AnimeTokenizer, load_tokenizer # Chinese number mapping CN_NUM_MAP: Dict[str, int] = { "一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9, "十": 10, } def extract_season_number(text: str) -> Optional[int]: """ Extract season number from various season formats. Examples: "S2" → 2, "Season 2" → 2, "第二季" → 2, "1st Season" → 1 """ # Arabic digits match = re.search(r'(\d+)', text) if match: return int(match.group(1)) # Chinese digits for cn, num in CN_NUM_MAP.items(): if cn in text: return num return None def extract_episode_number(text: str) -> Optional[int]: """ Extract episode number from various episode formats. Examples: "03" → 3, "EP21" → 21, "第7话" → 7, "#01" → 1 """ match = re.search(r'(\d+)', text) if match: return int(match.group(1)) return None def extract_resolution(text: str) -> Optional[str]: """Extract resolution string (e.g., '1080P', '4K', '1920x1080').""" # Strip brackets for matching clean = text.strip("[]()【】") return clean if clean else None def display_token(token: str) -> str: """Make whitespace tokens visible in debug output.""" if token == " ": return "" if token == "\t": return "" return token def trim_decorations(text: str) -> str: """Trim outer release brackets from an extracted entity.""" return text.strip().strip("[]()【】《》()").strip() def join_entity_tokens(tokens: List[str], tokenizer: Optional[AnimeTokenizer] = None) -> str: """Join entity tokens according to the tokenizer granularity.""" if tokenizer is not None and getattr(tokenizer, "tokenizer_variant", "regex") == "char": return "".join(tokens) text = "".join(tokens) if " " in tokens: return text return text def labels_to_entities( tokens: List[str], labels: List[str], tokenizer: Optional[AnimeTokenizer] = None, ) -> List[Tuple[str, str]]: """ Convert BIO labels into entity spans. Illegal orphan I-X labels start a new entity so debug output exposes the model behavior instead of silently dropping tokens. """ entities: List[Tuple[str, str]] = [] current_entity: Optional[str] = None current_tokens: List[str] = [] for token, label in zip(tokens, labels): if label.startswith("B-"): if current_entity: entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) current_entity = label[2:] current_tokens = [token] elif label.startswith("I-"): entity_type = label[2:] if current_entity == entity_type: current_tokens.append(token) else: if current_entity: entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) current_entity = entity_type current_tokens = [token] else: if current_entity: entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) current_entity = None current_tokens = [] if current_entity: entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) return entities def is_allowed_bio_transition(previous_label: str, label: str) -> bool: """Return whether previous_label -> label is valid under IOB2.""" if label.startswith("I-"): entity = label[2:] return previous_label in {f"B-{entity}", f"I-{entity}"} return True def constrained_bio_decode(emissions: torch.Tensor, id2label: Dict[int, str]) -> List[int]: """ Decode token logits with hard BIO transition constraints. This is a lightweight CRF-style Viterbi decoder without learned transition weights. It prevents impossible orphan I-X spans at inference time. """ if emissions.numel() == 0: return [] num_tokens, num_labels = emissions.shape scores = emissions.detach().cpu() backpointers = torch.zeros((num_tokens, num_labels), dtype=torch.long) dp = torch.full((num_labels,), float("-inf")) for label_id in range(num_labels): label = id2label.get(label_id, "O") if not label.startswith("I-"): dp[label_id] = scores[0, label_id] for idx in range(1, num_tokens): next_dp = torch.full((num_labels,), float("-inf")) for label_id in range(num_labels): label = id2label.get(label_id, "O") best_score = float("-inf") best_prev = 0 for prev_id in range(num_labels): prev_label = id2label.get(prev_id, "O") if not is_allowed_bio_transition(prev_label, label): continue candidate = dp[prev_id] + scores[idx, label_id] if candidate > best_score: best_score = float(candidate) best_prev = prev_id next_dp[label_id] = best_score backpointers[idx, label_id] = best_prev dp = next_dp best_last = int(torch.argmax(dp).item()) decoded = [best_last] for idx in range(num_tokens - 1, 0, -1): decoded.append(int(backpointers[idx, decoded[-1]].item())) decoded.reverse() return decoded def postprocess( tokens: List[str], labels: List[str], tokenizer: Optional[AnimeTokenizer] = None, filename: Optional[str] = None, use_rules: bool = True, ) -> Dict: """ Convert BIO-labeled tokens into structured metadata. Merges consecutive B- / I- tokens of the same entity type, then extracts structured fields. """ result: Dict = { "title": None, "season": None, "episode": None, "group": None, "resolution": None, "source": None, "special": None, } entities = labels_to_entities(tokens, labels, tokenizer) # Fill result for entity_type, text in entities: if entity_type == "TITLE": result["title"] = result["title"] or trim_decorations(text) # If we find multiple title fragments, concatenate them # (handles "That" + ... + "Time" etc.) elif entity_type == "SEASON": season_num = extract_season_number(text) if season_num is not None: # Keep the highest/last season number if multiple result["season"] = season_num elif entity_type == "EPISODE": ep_num = extract_episode_number(text) if ep_num is not None: if result["episode"] is None: result["episode"] = ep_num elif entity_type == "GROUP": group = text.strip("[]()【】") if result["group"] is None: result["group"] = group elif entity_type == "SPECIAL": special = text.strip("[]()【】") result["special"] = special elif entity_type == "RESOLUTION": res = extract_resolution(text) if res: result["resolution"] = res elif entity_type == "SOURCE": src = text.strip("[]()【】") result["source"] = src # Handle multi-fragment titles: concatenate all TITLE fragments # (This is needed because O tokens between words break entity continuity) title_fragments = [t for e, t in entities if e == "TITLE"] if title_fragments: result["title"] = " ".join( trimmed for f in title_fragments if (trimmed := trim_decorations(f)) ) if use_rules and filename: result = apply_rule_assists(filename, result) return result BRACKET_RE = re.compile(r"\[([^\]]+)\]|\(([^)]+)\)|【([^】]+)】|《([^》]+)》") RESOLUTION_RE = re.compile(r"\b(?:\d{3,4}[pP]|\d[Kk]|\d{3,4}[xX×]\d{3,4})\b") SOURCE_RE = re.compile( r"\b(?:WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|DVDRip|DVD|TVRip|HDTV|" r"Netflix|NF|AMZN|Baha|CR|ABEMA|DSNP|U[-_ ]?NEXT|Hulu|AT[-_ ]?X)\b", re.I, ) EPISODE_PATTERNS = [ re.compile(r"(?:^|[\s._\-\[\(【《#])(?:EP?|第)?(?P\d{1,4})(?:v\d+)?(?:[话話集])?(?=$|[\s._\-\]\)】》])", re.I), re.compile(r"[Ss]\d{1,2}[Ee](?P\d{1,4})(?:v\d+)?", re.I), ] SEASON_RE = re.compile(r"(?:^|[\s._\-\[\(【《])(?:[Ss](?P\d{1,2})|Season\s*(?P\d{1,2})|第(?P[一二三四五六七八九十\d]+)[季期部])", re.I) NOISE_META_RE = re.compile( r"^(?:\d{3,4}[pP]|\d[Kk]|WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|DVDRip|DVD|TVRip|" r"HDTV|Netflix|NF|AMZN|Baha|CR|HEVC|AVC|AV1|x26[45]|h\.?26[45]|AAC.*|FLAC|MP3|DTS|" r"Opus|ASS.*|CHS|CHT|BIG5|GB|JPN?|MP4|MKV|繁中|简中|内封|外挂)$", re.I, ) def cn_number_to_int(text: str) -> Optional[int]: if text.isdigit(): return int(text) values = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9} if text == "十": return 10 if text.startswith("十") and len(text) == 2: return 10 + values.get(text[1], 0) if text.endswith("十") and len(text) == 2: return values.get(text[0], 0) * 10 if "十" in text and len(text) == 3: return values.get(text[0], 0) * 10 + values.get(text[2], 0) return values.get(text) def bracket_parts(filename: str) -> List[Tuple[str, int, int]]: parts: List[Tuple[str, int, int]] = [] for match in BRACKET_RE.finditer(filename): text = next(group for group in match.groups() if group is not None) parts.append((text.strip(), match.start(), match.end())) return parts def looks_like_group(text: str) -> bool: if not text or NOISE_META_RE.search(text): return False return bool( re.search( r"(?:字幕|字幕组|字幕組|sub|subs|raws?|fansub|studio|house|team|project|" r"loli|ani|vcb|airota|kiss|dmhy|erai|subsplease)", text, re.I, ) ) def apply_rule_assists(filename: str, result: Dict) -> Dict: """ Fill high-confidence structural fields from filename conventions. The model remains the primary tagger; rules only fill missing obvious fields or repair common boundary drift around leading group brackets and episodes. """ repaired = dict(result) brackets = bracket_parts(filename) if (not repaired.get("group") or (repaired.get("title") and repaired["group"] in repaired["title"])) and brackets: first_text, first_start, _first_end = brackets[0] if first_start == 0 and looks_like_group(first_text): repaired["group"] = first_text if not repaired.get("resolution"): match = RESOLUTION_RE.search(filename) if match: repaired["resolution"] = match.group(0) if not repaired.get("source"): match = SOURCE_RE.search(filename) if match: repaired["source"] = match.group(0).replace("_", "-") if repaired.get("season") is None: match = SEASON_RE.search(filename) if match: value = next(group for group in match.groups() if group) season = cn_number_to_int(value) if season is not None: repaired["season"] = season if repaired.get("episode") is None: candidates: List[Tuple[int, int, str]] = [] for pattern in EPISODE_PATTERNS: for match in pattern.finditer(filename): ep_text = match.group("ep") ep = int(ep_text) if ep == 0 or ep > 2000: continue score = match.start() if 1 <= ep <= 200: score += 10000 if "-" in filename[max(0, match.start() - 3):match.start() + 1]: score += 1000 if match.start() > len(filename) // 3: score += 200 candidates.append((score, ep, ep_text)) if candidates: repaired["episode"] = max(candidates, key=lambda item: item[0])[1] title = repaired.get("title") group = repaired.get("group") if title and group and title.startswith(group): title = title[len(group):].lstrip("]】)>})》 \t-_.") repaired["title"] = title or repaired["title"] if (not repaired.get("title") or (group and repaired["title"].startswith(group))) and repaired.get("episode"): repaired_title = infer_title_span(filename, group, repaired["episode"]) if repaired_title: repaired["title"] = repaired_title return repaired def infer_title_span(filename: str, group: Optional[str], episode: Optional[int]) -> Optional[str]: start = 0 if group: first = BRACKET_RE.match(filename) if first and group in first.group(0): start = first.end() end = None if episode is not None: ep_patterns = [ rf"\s[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", rf"[\[\(【《]0*{episode}(?:v\d+)?[\]\)】》]", rf"[Ee]0*{episode}(?:v\d+)?", ] for pattern in ep_patterns: match = re.search(pattern, filename[start:], re.I) if match: end = start + match.start() break if end is None: for text, bracket_start, _bracket_end in bracket_parts(filename): if bracket_start <= start: continue if NOISE_META_RE.search(text) or RESOLUTION_RE.search(text) or SOURCE_RE.search(text): end = bracket_start break if end is None or end <= start: return None title = filename[start:end].strip(" \t-_.[]()【】《》()") return title or None def parse_filename( filename: str, model: BertForTokenClassification, tokenizer: AnimeTokenizer, id2label: Dict[int, str], max_length: int = 64, debug: bool = False, use_rules: bool = True, constrain_bio: bool = True, ) -> Dict: """ Parse an anime filename and extract structured metadata. Args: filename: Raw anime filename string. model: Trained BertForTokenClassification model. tokenizer: AnimeTokenizer instance. id2label: Mapping from label ID to label string. max_length: Maximum sequence length (including special tokens). Returns: Dict with parsed fields (title, season, episode, etc.). """ # Tokenize tokens = tokenizer.tokenize(filename) if not tokens: return {"title": None, "season": None, "episode": None, "group": None, "resolution": None, "source": None, "special": None} # Convert to input IDs input_ids = tokenizer.convert_tokens_to_ids(tokens) unk_token_id = tokenizer.unk_token_id unk_tokens = [token for token, token_id in zip(tokens, input_ids) if token_id == unk_token_id] # Add special tokens input_ids = [tokenizer.cls_token_id] + input_ids + [tokenizer.sep_token_id] attention_mask = [1] * len(input_ids) # Truncate if needed if len(input_ids) > max_length: input_ids = [input_ids[0]] + input_ids[1:max_length - 1] + [tokenizer.sep_token_id] attention_mask = [1] * len(input_ids) # Pad pad_len = max_length - len(input_ids) if pad_len > 0: input_ids += [tokenizer.pad_token_id] * pad_len attention_mask += [0] * pad_len # Predict device = next(model.parameters()).device input_tensor = torch.tensor([input_ids], device=device) mask_tensor = torch.tensor([attention_mask], device=device) # Remove special token predictions # Count real tokens used (minus CLS/SEP) real_token_count = len(tokens) # Truncate real tokens if we had to truncate available = min(real_token_count, max_length - 2) if available <= 0: return {"title": None, "season": None, "episode": None, "group": None, "resolution": None, "source": None, "special": None} with torch.no_grad(): logits = model(input_ids=input_tensor, attention_mask=mask_tensor).logits token_logits = logits[0, 1:1 + available, :] probabilities = torch.softmax(token_logits, dim=-1) scores, greedy_predictions = torch.max(probabilities, dim=-1) if constrain_bio: pred_labels = constrained_bio_decode(token_logits, id2label) selected_scores = [ probabilities[idx, label_id].detach().cpu().item() for idx, label_id in enumerate(pred_labels) ] else: pred_labels = greedy_predictions.detach().cpu().tolist() selected_scores = scores.detach().cpu().tolist() label_strings = [id2label.get(p, "O") for p in pred_labels] # Post-process result = postprocess( tokens[:available], label_strings, tokenizer=tokenizer, filename=filename, use_rules=use_rules, ) if debug: result["_debug"] = { "tokenizer_variant": getattr(tokenizer, "tokenizer_variant", "regex"), "decoder": "constrained_bio" if constrain_bio else "greedy", "max_length": max_length, "token_count": len(tokens), "available_token_count": available, "truncated": len(tokens) > available, "unk_count": len(unk_tokens), "unk_rate": len(unk_tokens) / len(tokens) if tokens else 0.0, "unk_tokens": unk_tokens[:50], "tokens": tokens[:available], "labels": label_strings, "scores": [round(float(score), 4) for score in selected_scores], "token_table": [ { "i": i, "token": display_token(token), "id": int(token_id), "label": label, "score": round(float(score), 4), } for i, (token, token_id, label, score) in enumerate( zip(tokens[:available], input_ids[1:1 + available], label_strings, selected_scores) ) ], "entities": [ {"type": entity_type, "text": text} for entity_type, text in labels_to_entities(tokens[:available], label_strings, tokenizer) ], } return result def main(): parser = argparse.ArgumentParser(description="Anime filename parser") parser.add_argument("filename", nargs="?", type=str, help="Anime filename to parse") parser.add_argument("--input-file", type=str, help="File with filenames (one per line)") parser.add_argument("--output-file", type=str, help="Output file for results (JSONL)") parser.add_argument("--model-dir", type=str, default="./checkpoints/final", help="Path to trained model directory") parser.add_argument("--tokenizer", choices=["regex", "char"], default=None, help="Tokenizer variant override. Defaults to checkpoint metadata") parser.add_argument("--max-length", type=int, default=64, help="Maximum sequence length") parser.add_argument("--debug", action="store_true", help="Include tokenizer, labels, scores, and entity spans in JSON output") parser.add_argument("--no-rule-assist", action="store_true", help="Disable high-confidence structural post-processing rules") parser.add_argument("--no-constrained-bio", action="store_true", help="Use greedy per-token decoding instead of constrained BIO Viterbi") args = parser.parse_args() # Load config cfg = Config() # Load tokenizer print(f"Loading tokenizer from {args.model_dir}...", file=sys.stderr) tokenizer = load_tokenizer(args.model_dir, args.tokenizer) # Load model print(f"Loading model from {args.model_dir}...", file=sys.stderr) model = BertForTokenClassification.from_pretrained(args.model_dir) model.eval() id2label = {int(k): v for k, v in getattr(model.config, "id2label", cfg.id2label).items()} max_length = args.max_length if max_length == 64: max_length = int(getattr(model.config, "max_seq_length", max_length)) # Process filenames filenames_to_parse: List[str] = [] if args.filename: filenames_to_parse.append(args.filename) if args.input_file: with open(args.input_file, 'r', encoding='utf-8') as f: filenames_to_parse.extend(line.strip() for line in f if line.strip()) if not filenames_to_parse: # Read from stdin filenames_to_parse.extend(sys.stdin.read().strip().splitlines()) # Parse and output results: List[Dict] = [] for fn in filenames_to_parse: if not fn.strip(): continue result = parse_filename( fn, model, tokenizer, id2label, max_length, debug=args.debug, use_rules=not args.no_rule_assist, constrain_bio=not args.no_constrained_bio, ) result["_input"] = fn results.append(result) if args.output_file is None: print(json.dumps(result, ensure_ascii=False)) if args.output_file: with open(args.output_file, 'w', encoding='utf-8') as f: for r in results: f.write(json.dumps(r, ensure_ascii=False) + '\n') print(f"Results saved to {args.output_file}", file=sys.stderr) if __name__ == "__main__": main()