""" Inference script for anime filename parser. Loads a trained model and tokenizer, parses anime filenames, and outputs structured metadata. Usage: python inference.py "[ANi] 葬送的芙莉莲 S2 - 03 [1080P][WEB-DL]" python inference.py --input-file filenames.txt --output-file results.jsonl """ import argparse import json import os import re import sys from typing import Dict, List, Optional, Tuple import torch from transformers import BertForTokenClassification from config import Config from label_repairs import season_marker_number from tokenizer import AnimeTokenizer, load_tokenizer # Chinese number mapping CN_NUM_MAP: Dict[str, int] = { "一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9, "十": 10, } def extract_season_number(text: str) -> Optional[int]: """ Extract season number from various season formats. Examples: "S2" → 2, "Season 2" → 2, "第二季" → 2, "1st Season" → 1 """ marker_value = season_marker_number(text) if marker_value is not None: return marker_value # Arabic digits match = re.search(r'(\d+)', text) if match: return int(match.group(1)) # Chinese digits for cn, num in CN_NUM_MAP.items(): if cn in text: return num return None def extract_episode_number(text: str) -> Optional[int]: """ Extract episode number from various episode formats. Examples: "03" → 3, "EP21" → 21, "第7话" → 7, "#01" → 1 """ match = re.search(r'(\d+)', text) if match: return int(match.group(1)) return None def extract_resolution(text: str) -> Optional[str]: """Extract resolution string (e.g., '1080P', '4K', '1920x1080').""" # Strip brackets for matching clean = text.strip("[]()【】") return clean if clean else None def display_token(token: str) -> str: """Make whitespace tokens visible in debug output.""" if token == " ": return "" if token == "\t": return "" return token def trim_decorations(text: str) -> str: """Trim outer release brackets from an extracted entity.""" return text.strip().strip("[]()【】《》()").strip() def join_entity_tokens(tokens: List[str], tokenizer: Optional[AnimeTokenizer] = None) -> str: """Join entity tokens according to the tokenizer granularity.""" if tokenizer is not None and getattr(tokenizer, "tokenizer_variant", "regex") == "char": return "".join(tokens) text = "".join(tokens) if " " in tokens: return text return text def labels_to_entities( tokens: List[str], labels: List[str], tokenizer: Optional[AnimeTokenizer] = None, ) -> List[Tuple[str, str]]: """ Convert BIO labels into entity spans. Illegal orphan I-X labels start a new entity so debug output exposes the model behavior instead of silently dropping tokens. """ entities: List[Tuple[str, str]] = [] current_entity: Optional[str] = None current_tokens: List[str] = [] for token, label in zip(tokens, labels): if label.startswith("B-"): if current_entity: entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) current_entity = label[2:] current_tokens = [token] elif label.startswith("I-"): entity_type = label[2:] if current_entity == entity_type: current_tokens.append(token) else: if current_entity: entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) current_entity = entity_type current_tokens = [token] else: if current_entity: entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) current_entity = None current_tokens = [] if current_entity: entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer))) return entities def is_allowed_bio_transition(previous_label: str, label: str) -> bool: """Return whether previous_label -> label is valid under IOB2.""" if label.startswith("I-"): entity = label[2:] return previous_label in {f"B-{entity}", f"I-{entity}"} return True def constrained_bio_decode(emissions: torch.Tensor, id2label: Dict[int, str]) -> List[int]: """ Decode token logits with hard BIO transition constraints. This is a lightweight CRF-style Viterbi decoder without learned transition weights. It prevents impossible orphan I-X spans at inference time. """ if emissions.numel() == 0: return [] num_tokens, num_labels = emissions.shape scores = emissions.detach().cpu() backpointers = torch.zeros((num_tokens, num_labels), dtype=torch.long) dp = torch.full((num_labels,), float("-inf")) for label_id in range(num_labels): label = id2label.get(label_id, "O") if not label.startswith("I-"): dp[label_id] = scores[0, label_id] for idx in range(1, num_tokens): next_dp = torch.full((num_labels,), float("-inf")) for label_id in range(num_labels): label = id2label.get(label_id, "O") best_score = float("-inf") best_prev = 0 for prev_id in range(num_labels): prev_label = id2label.get(prev_id, "O") if not is_allowed_bio_transition(prev_label, label): continue candidate = dp[prev_id] + scores[idx, label_id] if candidate > best_score: best_score = float(candidate) best_prev = prev_id next_dp[label_id] = best_score backpointers[idx, label_id] = best_prev dp = next_dp best_last = int(torch.argmax(dp).item()) decoded = [best_last] for idx in range(num_tokens - 1, 0, -1): decoded.append(int(backpointers[idx, decoded[-1]].item())) decoded.reverse() return decoded def postprocess( tokens: List[str], labels: List[str], tokenizer: Optional[AnimeTokenizer] = None, filename: Optional[str] = None, use_rules: bool = True, ) -> Dict: """ Convert BIO-labeled tokens into structured metadata. Merges consecutive B- / I- tokens of the same entity type, then extracts structured fields. """ result: Dict = { "title": None, "season": None, "episode": None, "group": None, "resolution": None, "source": None, "special": None, } entities = labels_to_entities(tokens, labels, tokenizer) # Fill result for entity_type, text in entities: if entity_type == "TITLE": result["title"] = result["title"] or trim_decorations(text) # If we find multiple title fragments, concatenate them # (handles "That" + ... + "Time" etc.) elif entity_type == "SEASON": season_num = extract_season_number(text) if season_num is not None: # Keep the highest/last season number if multiple result["season"] = season_num elif entity_type == "EPISODE": ep_num = extract_episode_number(text) if ep_num is not None: if result["episode"] is None: result["episode"] = ep_num elif entity_type == "GROUP": group = text.strip("[]()【】") if result["group"] is None: result["group"] = group elif entity_type == "SPECIAL": special = text.strip("[]()【】") result["special"] = special elif entity_type == "RESOLUTION": res = extract_resolution(text) if res: result["resolution"] = res elif entity_type == "SOURCE": src = text.strip("[]()【】") result["source"] = src # Handle multi-fragment titles: concatenate all TITLE fragments # (This is needed because O tokens between words break entity continuity) title_fragments = [t for e, t in entities if e == "TITLE"] if title_fragments: result["title"] = " ".join( trimmed for f in title_fragments if (trimmed := trim_decorations(f)) ) if use_rules and filename: result = apply_rule_assists(filename, result) return result BRACKET_RE = re.compile(r"\[([^\]]+)\]|\(([^)]+)\)|【([^】]+)】|《([^》]+)》") RESOLUTION_RE = re.compile(r"(?\d{1,4})(?:v\d+)?", re.I)), ("dash_episode", re.compile(r"(?:^|[\s._])[-_]\s*(?P\d{1,4})(?:v\d+)?(?=$|[\s._\-\]\)】》\[])")), ("bracket_episode", re.compile(r"[\[\(【《](?:EP?|#)?(?P\d{1,4})(?:v\d+)?[\]\)】》]", re.I)), ("explicit_episode", re.compile(r"(?:^|[\s._\-\[\(【《#])(?:EP?|第|#)(?P\d{1,4})(?:v\d+)?(?:[话話集])?(?=$|[\s._\-\]\)】》])", re.I)), ( "long_episode", re.compile( r"(?:^|[\s._\-\[\(【《])(?P\d{3,4})(?:v\d+)?" r"(?=[\s._\-\]\)】》\[]+(?:\d{3,4}[pP]|WEB|BD|BluRay|HDTV|NF|AMZN|CR|Baha))", re.I, ), ), ("generic_episode", re.compile(r"(?:^|[\s._\-\[\(【《#])(?P\d{1,3})(?:v\d+)?(?=$|[\s._\-\]\)】》])", re.I)), ] SEASON_RE = re.compile(r"(?:^|[\s._\-\[\(【《])(?:[Ss](?P\d{1,2})|Season\s*(?P\d{1,2})|第(?P[一二三四五六七八九十\d]+)[季期部])", re.I) SEQUEL_MARKER_RE = re.compile( r"(?" r"Ni\s+no\s+(?:Sara|Shou|Sho|Syo|Shō)|" r"San\s+no\s+(?:Sara|Shou|Sho|Syo)|" r"(?:Yon|Shi|Shin)\s+no\s+Sara|" r"(?:Go|Gou)\s+no\s+Sara|" r"Ni\s+Gakki|Sono\s+Ni|Ni|" r"II|III|IV|V|VI|VII|VIII|IX|[ⅡⅢⅣⅤⅥⅦⅧⅨ]|" r"[一二三四五六七八九十兩两貳贰弐弍參叁参肆伍陸陆柒捌玖](?:\s*(?:ノ|の|之)\s*(?:章|期|季|部))?" r")" r"(?![A-Za-z0-9])", re.I, ) TRAILING_SEQUEL_MARKER_RE = re.compile( r"(?:^|[\s._-])" r"(?P" r"Ni\s+no\s+(?:Sara|Shou|Sho|Syo|Shō)|" r"San\s+no\s+(?:Sara|Shou|Sho|Syo)|" r"(?:Yon|Shi|Shin)\s+no\s+Sara|" r"(?:Go|Gou)\s+no\s+Sara|" r"Ni\s+Gakki|Sono\s+Ni|Ni|" r"II|III|IV|V|VI|VII|VIII|IX|[ⅡⅢⅣⅤⅥⅦⅧⅨ]|" r"[一二三四五六七八九十兩两貳贰弐弍參叁参肆伍陸陆柒捌玖](?:\s*(?:ノ|の|之)\s*(?:章|期|季|部))?" r")$", re.I, ) NOISE_META_RE = re.compile( r"^(?:\d{3,4}[pP]|\d[Kk]|WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|BD|DVDRip|DVD|TVRip|" r"HDTV|Netflix|NF|AMZN|Baha|CR|HEVC|AVC|AV1|x26[45]|h\.?26[45]|AAC.*|FLAC|MP3|DTS|" r"Opus|ASS.*|CHS|CHT|BIG5|GB|JPN?|MP4|MKV|繁中|简中|内封|外挂)$", re.I, ) def cn_number_to_int(text: str) -> Optional[int]: if text.isdigit(): return int(text) values = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9} if text == "十": return 10 if text.startswith("十") and len(text) == 2: return 10 + values.get(text[1], 0) if text.endswith("十") and len(text) == 2: return values.get(text[0], 0) * 10 if "十" in text and len(text) == 3: return values.get(text[0], 0) * 10 + values.get(text[2], 0) return values.get(text) def bracket_parts(filename: str) -> List[Tuple[str, int, int]]: parts: List[Tuple[str, int, int]] = [] for match in BRACKET_RE.finditer(filename): text = next(group for group in match.groups() if group is not None) parts.append((text.strip(), match.start(), match.end())) return parts def looks_like_group(text: str) -> bool: if not text or NOISE_META_RE.search(text): return False return bool( re.search( r"(?:字幕|字幕组|字幕組|sub|subs|raws?|fansub|studio|house|team|project|" r"loli|ani|vcb|airota|kiss|dmhy|erai|subsplease)", text, re.I, ) ) def looks_like_episode_or_meta(text: str) -> bool: if not text: return False clean = text.strip() return bool( re.fullmatch(r"(?:EP?|#)?\d{1,4}(?:v\d+)?", clean, re.I) or RESOLUTION_RE.search(clean) or SOURCE_TAG_RE.fullmatch(clean) or SOURCE_RE.search(clean) or SPECIAL_TAG_RE.search(clean) or NOISE_META_RE.search(clean) ) def looks_like_structural_group(text: str, filename: str, bracket_end: int) -> bool: """Heuristic for short leading release-group brackets not in the name list.""" if looks_like_group(text): return True if not text or looks_like_episode_or_meta(text): return False after = filename[bracket_end:].lstrip(" \t._") if after.startswith("-"): return False next_bracket = BRACKET_RE.match(after) if next_bracket: next_text = next(group for group in next_bracket.groups() if group is not None) if looks_like_episode_or_meta(next_text): return False words = re.findall(r"[A-Za-z0-9]+", text) if not words: if re.search(r"[\u3400-\u9fff]", text) and len(text) <= 32: return True return False if len(text) > 32: return False if len(words) == 1: return True if any(sep in text for sep in "-_"): return True if words[0].isupper() and len(words[0]) <= 4 and len(words) <= 3: return True return False def apply_rule_assists(filename: str, result: Dict) -> Dict: """ Fill high-confidence structural fields from filename conventions. The model remains the primary tagger; rules only fill missing obvious fields or repair common boundary drift around leading group brackets and episodes. """ repaired = dict(result) brackets = bracket_parts(filename) if (not repaired.get("group") or (repaired.get("title") and repaired["group"] in repaired["title"])) and brackets: first_text, first_start, first_end = brackets[0] if first_start == 0 and looks_like_structural_group(first_text, filename, first_end): repaired["group"] = first_text if not repaired.get("resolution"): match = RESOLUTION_RE.search(filename) if match: repaired["resolution"] = match.group(0) source_matches = source_candidates(filename) current_source = repaired.get("source") preferred_source = source_matches[0] if source_matches else None if source_matches and ( not current_source or not SOURCE_RE.fullmatch(str(current_source)) or len(str(current_source)) <= 3 and str(current_source).lower() not in {"nf", "cr"} or ( preferred_source and str(current_source).lower().replace("_", "-") in {"web-dl", "webdl", "webrip", "web-rip"} and preferred_source.lower().replace("_", "-") not in {"web-dl", "webdl", "webrip", "web-rip"} ) ): repaired["source"] = preferred_source if not repaired.get("special"): for text, _start, _end in brackets: clean = text.strip() if SPECIAL_TAG_RE.search(clean): repaired["special"] = clean break episode = best_structural_episode(filename) if episode is not None and ( repaired.get("episode") is None or not plausible_episode_context(filename, int(repaired["episode"])) ): repaired["episode"] = episode if repaired.get("season") is None: match = SEASON_RE.search(filename) if match: value = next(group for group in match.groups() if group) season = cn_number_to_int(value) if season is not None: repaired["season"] = season if repaired.get("season") is None and repaired.get("episode") is not None: sequel = structural_sequel_marker(filename, repaired.get("group"), repaired.get("episode")) if sequel is not None: repaired["season"] = sequel[1] elif repaired.get("episode") == repaired.get("season") and not SEASON_RE.search(filename): repaired["season"] = None title = repaired.get("title") group = repaired.get("group") if group and (NOISE_META_RE.search(str(group)) or SOURCE_RE.fullmatch(str(group)) or RESOLUTION_RE.fullmatch(str(group))): repaired["group"] = None group = None if title and group and title.startswith(group): title = title[len(group):].lstrip("]】)>})》 \t-_.") repaired["title"] = title or repaired["title"] if repaired.get("episode"): repaired_title = infer_title_span(filename, group, repaired["episode"]) if repaired_title: repaired["title"] = repaired_title if repaired.get("title") and repaired.get("season") is not None: repaired["title"] = strip_trailing_season_from_title(repaired["title"], repaired["season"]) return repaired def structural_sequel_marker( filename: str, group: Optional[str], episode: Optional[int], ) -> Optional[Tuple[str, int]]: if episode is None: return None title_end = None if episode is not None: ep_patterns = [ rf"[Ss]\d{{1,2}}[Ee]0*{episode}(?:v\d+)?", rf"\s[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", rf"[\[\(【《]0*{episode}(?:v\d+)?[\]\)】》]", rf"#\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", rf"(?:^|[\s._\-\[\(【《])第0*{episode}(?:[话話集])?(?=$|[\s._\-\]\)】》])", ] start = 0 if group: first = BRACKET_RE.match(filename) if first and group in first.group(0): start = first.end() for pattern in ep_patterns: match = re.search(pattern, filename[start:], re.I) if match: title_end = start + match.start() break if title_end is None: return None prefix = filename[:title_end].rstrip(" \t-_.") for match in reversed(list(SEQUEL_MARKER_RE.finditer(prefix))): marker = match.group("marker") value = season_marker_number(marker) if value is None: continue tail = prefix[match.end():].strip(" \t-_.") if tail: continue if marker.lower() == "ni" and "Kakuriyo no Yadomeshi Ni" not in prefix: continue return marker, value return None def normalize_source_text(text: str) -> str: text = re.sub(r"\s+", "", text.strip()) text = re.sub(r"(?i)WEB[_ ]?DL", "WEB-DL", text) text = re.sub(r"(?i)WEB[_ ]?Rip", "WebRip", text) text = re.sub(r"(?i)U[_ ]?NEXT", "U-NEXT", text) text = re.sub(r"(?i)AT[_ ]?X", "AT-X", text) return text.replace("_", "-") def source_priority(source: str) -> int: normalized = source.lower().replace("_", "-").replace(" ", "") parts = re.split(r"[&+/,]", normalized) if any(part in {"nf", "netflix", "amzn", "baha", "cr", "abema", "dsnp", "u-next", "hulu", "at-x"} for part in parts): return 90 if any(part in {"web-dl", "webdl", "webrip", "web-rip", "bdrip", "bluray", "bdmv", "bd", "dvdrip", "dvd", "tvrip", "hdtv"} for part in parts): return 60 if len(parts) > 1: return 40 return 20 def source_candidates(filename: str) -> List[str]: candidates: List[Tuple[int, int, str]] = [] for text, start, _end in bracket_parts(filename): clean = text.strip() if SOURCE_TAG_RE.fullmatch(clean): normalized = normalize_source_text(clean) candidates.append((source_priority(normalized), -start, normalized)) for match in SOURCE_RE.finditer(filename): normalized = normalize_source_text(match.group(0)) candidates.append((source_priority(normalized), -match.start(), normalized)) deduped: Dict[str, Tuple[int, int, str]] = {} for priority, neg_start, value in candidates: key = value.lower() if key not in deduped or (priority, neg_start) > (deduped[key][0], deduped[key][1]): deduped[key] = (priority, neg_start, value) return [value for _priority, _neg_start, value in sorted(deduped.values(), reverse=True)] def best_structural_episode(filename: str) -> Optional[int]: priorities = { "season_episode": 1000, "dash_episode": 900, "bracket_episode": 850, "explicit_episode": 800, "long_episode": 750, "generic_episode": 100, } candidates: List[Tuple[int, int, int]] = [] for name, pattern in EPISODE_PATTERNS: for match in pattern.finditer(filename): ep_text = match.group("ep") ep = int(ep_text) if ep == 0 or ep > 2000: continue context = filename[max(0, match.start() - 5):match.end() + 5] if RESOLUTION_RE.search(context) or re.search(r"AAC|DDP|AC3|H\.?26[45]|x26[45]", context, re.I): continue priority = priorities[name] if 1 <= ep <= 200: priority += 20 candidates.append((priority, match.start(), ep)) if not candidates: return None return max(candidates, key=lambda item: (item[0], item[1]))[2] def plausible_episode_context(filename: str, episode: int) -> bool: ep_text = str(episode) padded = f"{episode:02d}" if re.search(rf"(? str: season_text = str(season) patterns = [ rf"\s+[Ss]0*{season_text}$", rf"\s+Season\s*0*{season_text}$", rf"\s+0*{season_text}$", ] cleaned = title for pattern in patterns: cleaned = re.sub(pattern, "", cleaned, flags=re.I).strip(" \t-_.") match = TRAILING_SEQUEL_MARKER_RE.search(cleaned) if match and season_marker_number(match.group("marker")) == season: cleaned = cleaned[:match.start()].strip(" \t-_.") return cleaned or title def clean_inferred_title(title: str) -> str: raw_title = title.strip(" \t-_.") bracket_matches = list(BRACKET_RE.finditer(raw_title)) if bracket_matches: first = bracket_matches[0] prefix = raw_title[:first.start()].strip(" \t-_.★☆") text = next(group for group in first.groups() if group is not None).strip() if text and not looks_like_episode_or_meta(text) and ( not prefix or re.search(r"(?:新番|月|合集|繁|简|字幕|先行|合集|★|☆)", prefix, re.I) ): return text return raw_title.strip("[]()【】《》()") def infer_title_span(filename: str, group: Optional[str], episode: Optional[int]) -> Optional[str]: start = 0 if group: first = BRACKET_RE.match(filename) if first and group in first.group(0): start = first.end() else: # Some releases put leading metadata before the actual title, e.g. # `[1080p] Title - 01`. Do not keep that wrapper as title text. while True: leading = BRACKET_RE.match(filename[start:].lstrip(" \t._-")) if not leading: break skipped_ws = len(filename[start:]) - len(filename[start:].lstrip(" \t._-")) text = next(group for group in leading.groups() if group is not None) if not looks_like_episode_or_meta(text): break start += skipped_ws + leading.end() end = None if episode is not None: ep_patterns = [ rf"[Ss]\d{{1,2}}[Ee]0*{episode}(?:v\d+)?", rf"\s[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", rf"[\[\(【《]0*{episode}(?:v\d+)?[\]\)】》]", rf"#\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])", rf"(?:^|[\s._\-\[\(【《])第0*{episode}(?:[话話集])?(?=$|[\s._\-\]\)】》])", rf"[Ee]0*{episode}(?:v\d+)?", ] for pattern in ep_patterns: match = re.search(pattern, filename[start:], re.I) if match: end = start + match.start() break if end is None: for text, bracket_start, _bracket_end in bracket_parts(filename): if bracket_start <= start: continue if NOISE_META_RE.search(text) or RESOLUTION_RE.search(text) or SOURCE_RE.search(text): end = bracket_start break if end is None or end <= start: return None title = clean_inferred_title(filename[start:end]) return title or None def parse_filename( filename: str, model: BertForTokenClassification, tokenizer: AnimeTokenizer, id2label: Dict[int, str], max_length: int = 64, debug: bool = False, use_rules: bool = True, constrain_bio: bool = True, ) -> Dict: """ Parse an anime filename and extract structured metadata. Args: filename: Raw anime filename string. model: Trained BertForTokenClassification model. tokenizer: AnimeTokenizer instance. id2label: Mapping from label ID to label string. max_length: Maximum sequence length (including special tokens). Returns: Dict with parsed fields (title, season, episode, etc.). """ # Tokenize tokens = tokenizer.tokenize(filename) if not tokens: return {"title": None, "season": None, "episode": None, "group": None, "resolution": None, "source": None, "special": None} # Convert to input IDs input_ids = tokenizer.convert_tokens_to_ids(tokens) embedding_size = model.get_input_embeddings().weight.shape[0] out_of_range_tokens = [ token for token, token_id in zip(tokens, input_ids) if token_id >= embedding_size ] if out_of_range_tokens: input_ids = [ token_id if token_id < embedding_size else tokenizer.unk_token_id for token_id in input_ids ] unk_token_id = tokenizer.unk_token_id unk_tokens = [token for token, token_id in zip(tokens, input_ids) if token_id == unk_token_id] # Add special tokens input_ids = [tokenizer.cls_token_id] + input_ids + [tokenizer.sep_token_id] attention_mask = [1] * len(input_ids) # Truncate if needed if len(input_ids) > max_length: input_ids = [input_ids[0]] + input_ids[1:max_length - 1] + [tokenizer.sep_token_id] attention_mask = [1] * len(input_ids) # Pad pad_len = max_length - len(input_ids) if pad_len > 0: input_ids += [tokenizer.pad_token_id] * pad_len attention_mask += [0] * pad_len # Predict device = next(model.parameters()).device input_tensor = torch.tensor([input_ids], device=device) mask_tensor = torch.tensor([attention_mask], device=device) # Remove special token predictions # Count real tokens used (minus CLS/SEP) real_token_count = len(tokens) # Truncate real tokens if we had to truncate available = min(real_token_count, max_length - 2) if available <= 0: return {"title": None, "season": None, "episode": None, "group": None, "resolution": None, "source": None, "special": None} with torch.no_grad(): logits = model(input_ids=input_tensor, attention_mask=mask_tensor).logits token_logits = logits[0, 1:1 + available, :] probabilities = torch.softmax(token_logits, dim=-1) scores, greedy_predictions = torch.max(probabilities, dim=-1) if constrain_bio: pred_labels = constrained_bio_decode(token_logits, id2label) selected_scores = [ probabilities[idx, label_id].detach().cpu().item() for idx, label_id in enumerate(pred_labels) ] else: pred_labels = greedy_predictions.detach().cpu().tolist() selected_scores = scores.detach().cpu().tolist() label_strings = [id2label.get(p, "O") for p in pred_labels] # Post-process result = postprocess( tokens[:available], label_strings, tokenizer=tokenizer, filename=filename, use_rules=use_rules, ) if debug: result["_debug"] = { "tokenizer_variant": getattr(tokenizer, "tokenizer_variant", "regex"), "decoder": "constrained_bio" if constrain_bio else "greedy", "max_length": max_length, "token_count": len(tokens), "available_token_count": available, "truncated": len(tokens) > available, "unk_count": len(unk_tokens), "unk_rate": len(unk_tokens) / len(tokens) if tokens else 0.0, "unk_tokens": unk_tokens[:50], "vocab_mismatch": bool(out_of_range_tokens), "model_embedding_size": int(embedding_size), "tokenizer_vocab_size": int(tokenizer.vocab_size), "out_of_range_tokens": out_of_range_tokens[:50], "tokens": tokens[:available], "labels": label_strings, "scores": [round(float(score), 4) for score in selected_scores], "token_table": [ { "i": i, "token": display_token(token), "id": int(token_id), "label": label, "score": round(float(score), 4), } for i, (token, token_id, label, score) in enumerate( zip(tokens[:available], input_ids[1:1 + available], label_strings, selected_scores) ) ], "entities": [ {"type": entity_type, "text": text} for entity_type, text in labels_to_entities(tokens[:available], label_strings, tokenizer) ], } return result def main(): parser = argparse.ArgumentParser(description="Anime filename parser") parser.add_argument("filename", nargs="?", type=str, help="Anime filename to parse") parser.add_argument("--input-file", type=str, help="File with filenames (one per line)") parser.add_argument("--output-file", type=str, help="Output file for results (JSONL)") parser.add_argument("--model-dir", type=str, default=".", help="Path to trained model directory") parser.add_argument("--tokenizer", choices=["regex", "char"], default=None, help="Tokenizer variant override. Defaults to checkpoint metadata") parser.add_argument("--max-length", type=int, default=64, help="Maximum sequence length") parser.add_argument("--debug", action="store_true", help="Include tokenizer, labels, scores, and entity spans in JSON output") parser.add_argument("--no-rule-assist", action="store_true", help="Disable high-confidence structural post-processing rules") parser.add_argument("--no-constrained-bio", action="store_true", help="Use greedy per-token decoding instead of constrained BIO Viterbi") args = parser.parse_args() # Load config cfg = Config() # Load tokenizer print(f"Loading tokenizer from {args.model_dir}...", file=sys.stderr) tokenizer = load_tokenizer(args.model_dir, args.tokenizer) # Load model print(f"Loading model from {args.model_dir}...", file=sys.stderr) model = BertForTokenClassification.from_pretrained(args.model_dir) model.eval() id2label = {int(k): v for k, v in getattr(model.config, "id2label", cfg.id2label).items()} max_length = args.max_length if max_length == 64: max_length = int(getattr(model.config, "max_seq_length", max_length)) # Process filenames filenames_to_parse: List[str] = [] if args.filename: filenames_to_parse.append(args.filename) if args.input_file: with open(args.input_file, 'r', encoding='utf-8') as f: filenames_to_parse.extend(line.strip() for line in f if line.strip()) if not filenames_to_parse: # Read from stdin filenames_to_parse.extend(sys.stdin.read().strip().splitlines()) # Parse and output results: List[Dict] = [] for fn in filenames_to_parse: if not fn.strip(): continue result = parse_filename( fn, model, tokenizer, id2label, max_length, debug=args.debug, use_rules=not args.no_rule_assist, constrain_bio=not args.no_constrained_bio, ) result["_input"] = fn results.append(result) if args.output_file is None: print(json.dumps(result, ensure_ascii=False)) if args.output_file: with open(args.output_file, 'w', encoding='utf-8') as f: for r in results: f.write(json.dumps(r, ensure_ascii=False) + '\n') print(f"Results saved to {args.output_file}", file=sys.stderr) if __name__ == "__main__": main()