AniFileBERT / inference.py
ModerRAS's picture
Add parser diagnostics and inference debugging
b57780c
raw
history blame
22.2 kB
"""
Inference script for anime filename parser.
Loads a trained model and tokenizer, parses anime filenames,
and outputs structured metadata.
Usage:
python inference.py "[ANi] 葬送的芙莉莲 S2 - 03 [1080P][WEB-DL]"
python inference.py --input-file filenames.txt --output-file results.jsonl
"""
import argparse
import json
import os
import re
import sys
from typing import Dict, List, Optional, Tuple
import torch
from transformers import BertForTokenClassification
from config import Config
from tokenizer import AnimeTokenizer, load_tokenizer
# Chinese number mapping
CN_NUM_MAP: Dict[str, int] = {
"一": 1, "二": 2, "三": 3, "四": 4, "五": 5,
"六": 6, "七": 7, "八": 8, "九": 9, "十": 10,
}
def extract_season_number(text: str) -> Optional[int]:
"""
Extract season number from various season formats.
Examples:
"S2" → 2, "Season 2" → 2, "第二季" → 2, "1st Season" → 1
"""
# Arabic digits
match = re.search(r'(\d+)', text)
if match:
return int(match.group(1))
# Chinese digits
for cn, num in CN_NUM_MAP.items():
if cn in text:
return num
return None
def extract_episode_number(text: str) -> Optional[int]:
"""
Extract episode number from various episode formats.
Examples:
"03" → 3, "EP21" → 21, "第7话" → 7, "#01" → 1
"""
match = re.search(r'(\d+)', text)
if match:
return int(match.group(1))
return None
def extract_resolution(text: str) -> Optional[str]:
"""Extract resolution string (e.g., '1080P', '4K', '1920x1080')."""
# Strip brackets for matching
clean = text.strip("[]()【】")
return clean if clean else None
def display_token(token: str) -> str:
"""Make whitespace tokens visible in debug output."""
if token == " ":
return "<SPACE>"
if token == "\t":
return "<TAB>"
return token
def trim_decorations(text: str) -> str:
"""Trim outer release brackets from an extracted entity."""
return text.strip().strip("[]()【】《》()").strip()
def join_entity_tokens(tokens: List[str], tokenizer: Optional[AnimeTokenizer] = None) -> str:
"""Join entity tokens according to the tokenizer granularity."""
if tokenizer is not None and getattr(tokenizer, "tokenizer_variant", "regex") == "char":
return "".join(tokens)
text = "".join(tokens)
if " " in tokens:
return text
return text
def labels_to_entities(
tokens: List[str],
labels: List[str],
tokenizer: Optional[AnimeTokenizer] = None,
) -> List[Tuple[str, str]]:
"""
Convert BIO labels into entity spans.
Illegal orphan I-X labels start a new entity so debug output exposes the
model behavior instead of silently dropping tokens.
"""
entities: List[Tuple[str, str]] = []
current_entity: Optional[str] = None
current_tokens: List[str] = []
for token, label in zip(tokens, labels):
if label.startswith("B-"):
if current_entity:
entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer)))
current_entity = label[2:]
current_tokens = [token]
elif label.startswith("I-"):
entity_type = label[2:]
if current_entity == entity_type:
current_tokens.append(token)
else:
if current_entity:
entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer)))
current_entity = entity_type
current_tokens = [token]
else:
if current_entity:
entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer)))
current_entity = None
current_tokens = []
if current_entity:
entities.append((current_entity, join_entity_tokens(current_tokens, tokenizer)))
return entities
def is_allowed_bio_transition(previous_label: str, label: str) -> bool:
"""Return whether previous_label -> label is valid under IOB2."""
if label.startswith("I-"):
entity = label[2:]
return previous_label in {f"B-{entity}", f"I-{entity}"}
return True
def constrained_bio_decode(emissions: torch.Tensor, id2label: Dict[int, str]) -> List[int]:
"""
Decode token logits with hard BIO transition constraints.
This is a lightweight CRF-style Viterbi decoder without learned transition
weights. It prevents impossible orphan I-X spans at inference time.
"""
if emissions.numel() == 0:
return []
num_tokens, num_labels = emissions.shape
scores = emissions.detach().cpu()
backpointers = torch.zeros((num_tokens, num_labels), dtype=torch.long)
dp = torch.full((num_labels,), float("-inf"))
for label_id in range(num_labels):
label = id2label.get(label_id, "O")
if not label.startswith("I-"):
dp[label_id] = scores[0, label_id]
for idx in range(1, num_tokens):
next_dp = torch.full((num_labels,), float("-inf"))
for label_id in range(num_labels):
label = id2label.get(label_id, "O")
best_score = float("-inf")
best_prev = 0
for prev_id in range(num_labels):
prev_label = id2label.get(prev_id, "O")
if not is_allowed_bio_transition(prev_label, label):
continue
candidate = dp[prev_id] + scores[idx, label_id]
if candidate > best_score:
best_score = float(candidate)
best_prev = prev_id
next_dp[label_id] = best_score
backpointers[idx, label_id] = best_prev
dp = next_dp
best_last = int(torch.argmax(dp).item())
decoded = [best_last]
for idx in range(num_tokens - 1, 0, -1):
decoded.append(int(backpointers[idx, decoded[-1]].item()))
decoded.reverse()
return decoded
def postprocess(
tokens: List[str],
labels: List[str],
tokenizer: Optional[AnimeTokenizer] = None,
filename: Optional[str] = None,
use_rules: bool = True,
) -> Dict:
"""
Convert BIO-labeled tokens into structured metadata.
Merges consecutive B- / I- tokens of the same entity type,
then extracts structured fields.
"""
result: Dict = {
"title": None,
"season": None,
"episode": None,
"group": None,
"resolution": None,
"source": None,
"special": None,
}
entities = labels_to_entities(tokens, labels, tokenizer)
# Fill result
for entity_type, text in entities:
if entity_type == "TITLE":
result["title"] = result["title"] or trim_decorations(text)
# If we find multiple title fragments, concatenate them
# (handles "That" + ... + "Time" etc.)
elif entity_type == "SEASON":
season_num = extract_season_number(text)
if season_num is not None:
# Keep the highest/last season number if multiple
result["season"] = season_num
elif entity_type == "EPISODE":
ep_num = extract_episode_number(text)
if ep_num is not None:
if result["episode"] is None:
result["episode"] = ep_num
elif entity_type == "GROUP":
group = text.strip("[]()【】")
if result["group"] is None:
result["group"] = group
elif entity_type == "SPECIAL":
special = text.strip("[]()【】")
result["special"] = special
elif entity_type == "RESOLUTION":
res = extract_resolution(text)
if res:
result["resolution"] = res
elif entity_type == "SOURCE":
src = text.strip("[]()【】")
result["source"] = src
# Handle multi-fragment titles: concatenate all TITLE fragments
# (This is needed because O tokens between words break entity continuity)
title_fragments = [t for e, t in entities if e == "TITLE"]
if title_fragments:
result["title"] = " ".join(
trimmed for f in title_fragments
if (trimmed := trim_decorations(f))
)
if use_rules and filename:
result = apply_rule_assists(filename, result)
return result
BRACKET_RE = re.compile(r"\[([^\]]+)\]|\(([^)]+)\)|【([^】]+)】|《([^》]+)》")
RESOLUTION_RE = re.compile(r"\b(?:\d{3,4}[pP]|\d[Kk]|\d{3,4}[xX×]\d{3,4})\b")
SOURCE_RE = re.compile(
r"\b(?:WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|DVDRip|DVD|TVRip|HDTV|"
r"Netflix|NF|AMZN|Baha|CR|ABEMA|DSNP|U[-_ ]?NEXT|Hulu|AT[-_ ]?X)\b",
re.I,
)
EPISODE_PATTERNS = [
re.compile(r"(?:^|[\s._\-\[\(【《#])(?:EP?|第)?(?P<ep>\d{1,4})(?:v\d+)?(?:[话話集])?(?=$|[\s._\-\]\)】》])", re.I),
re.compile(r"[Ss]\d{1,2}[Ee](?P<ep>\d{1,4})(?:v\d+)?", re.I),
]
SEASON_RE = re.compile(r"(?:^|[\s._\-\[\(【《])(?:[Ss](?P<s1>\d{1,2})|Season\s*(?P<s2>\d{1,2})|第(?P<s3>[一二三四五六七八九十\d]+)[季期部])", re.I)
NOISE_META_RE = re.compile(
r"^(?:\d{3,4}[pP]|\d[Kk]|WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|DVDRip|DVD|TVRip|"
r"HDTV|Netflix|NF|AMZN|Baha|CR|HEVC|AVC|AV1|x26[45]|h\.?26[45]|AAC.*|FLAC|MP3|DTS|"
r"Opus|ASS.*|CHS|CHT|BIG5|GB|JPN?|MP4|MKV|繁中|简中|内封|外挂)$",
re.I,
)
def cn_number_to_int(text: str) -> Optional[int]:
if text.isdigit():
return int(text)
values = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9}
if text == "十":
return 10
if text.startswith("十") and len(text) == 2:
return 10 + values.get(text[1], 0)
if text.endswith("十") and len(text) == 2:
return values.get(text[0], 0) * 10
if "十" in text and len(text) == 3:
return values.get(text[0], 0) * 10 + values.get(text[2], 0)
return values.get(text)
def bracket_parts(filename: str) -> List[Tuple[str, int, int]]:
parts: List[Tuple[str, int, int]] = []
for match in BRACKET_RE.finditer(filename):
text = next(group for group in match.groups() if group is not None)
parts.append((text.strip(), match.start(), match.end()))
return parts
def looks_like_group(text: str) -> bool:
if not text or NOISE_META_RE.search(text):
return False
return bool(
re.search(
r"(?:字幕|字幕组|字幕組|sub|subs|raws?|fansub|studio|house|team|project|"
r"loli|ani|vcb|airota|kiss|dmhy|erai|subsplease)",
text,
re.I,
)
)
def apply_rule_assists(filename: str, result: Dict) -> Dict:
"""
Fill high-confidence structural fields from filename conventions.
The model remains the primary tagger; rules only fill missing obvious fields
or repair common boundary drift around leading group brackets and episodes.
"""
repaired = dict(result)
brackets = bracket_parts(filename)
if (not repaired.get("group") or (repaired.get("title") and repaired["group"] in repaired["title"])) and brackets:
first_text, first_start, _first_end = brackets[0]
if first_start == 0 and looks_like_group(first_text):
repaired["group"] = first_text
if not repaired.get("resolution"):
match = RESOLUTION_RE.search(filename)
if match:
repaired["resolution"] = match.group(0)
if not repaired.get("source"):
match = SOURCE_RE.search(filename)
if match:
repaired["source"] = match.group(0).replace("_", "-")
if repaired.get("season") is None:
match = SEASON_RE.search(filename)
if match:
value = next(group for group in match.groups() if group)
season = cn_number_to_int(value)
if season is not None:
repaired["season"] = season
if repaired.get("episode") is None:
candidates: List[Tuple[int, int, str]] = []
for pattern in EPISODE_PATTERNS:
for match in pattern.finditer(filename):
ep_text = match.group("ep")
ep = int(ep_text)
if ep == 0 or ep > 2000:
continue
score = match.start()
if 1 <= ep <= 200:
score += 10000
if "-" in filename[max(0, match.start() - 3):match.start() + 1]:
score += 1000
if match.start() > len(filename) // 3:
score += 200
candidates.append((score, ep, ep_text))
if candidates:
repaired["episode"] = max(candidates, key=lambda item: item[0])[1]
title = repaired.get("title")
group = repaired.get("group")
if title and group and title.startswith(group):
title = title[len(group):].lstrip("]】)>})》 \t-_.")
repaired["title"] = title or repaired["title"]
if (not repaired.get("title") or (group and repaired["title"].startswith(group))) and repaired.get("episode"):
repaired_title = infer_title_span(filename, group, repaired["episode"])
if repaired_title:
repaired["title"] = repaired_title
return repaired
def infer_title_span(filename: str, group: Optional[str], episode: Optional[int]) -> Optional[str]:
start = 0
if group:
first = BRACKET_RE.match(filename)
if first and group in first.group(0):
start = first.end()
end = None
if episode is not None:
ep_patterns = [
rf"\s[-_]\s*0*{episode}(?:v\d+)?(?=$|[\s\[\(【《._-])",
rf"[\[\(【《]0*{episode}(?:v\d+)?[\]\)】》]",
rf"[Ee]0*{episode}(?:v\d+)?",
]
for pattern in ep_patterns:
match = re.search(pattern, filename[start:], re.I)
if match:
end = start + match.start()
break
if end is None:
for text, bracket_start, _bracket_end in bracket_parts(filename):
if bracket_start <= start:
continue
if NOISE_META_RE.search(text) or RESOLUTION_RE.search(text) or SOURCE_RE.search(text):
end = bracket_start
break
if end is None or end <= start:
return None
title = filename[start:end].strip(" \t-_.[]()【】《》()")
return title or None
def parse_filename(
filename: str,
model: BertForTokenClassification,
tokenizer: AnimeTokenizer,
id2label: Dict[int, str],
max_length: int = 64,
debug: bool = False,
use_rules: bool = True,
constrain_bio: bool = True,
) -> Dict:
"""
Parse an anime filename and extract structured metadata.
Args:
filename: Raw anime filename string.
model: Trained BertForTokenClassification model.
tokenizer: AnimeTokenizer instance.
id2label: Mapping from label ID to label string.
max_length: Maximum sequence length (including special tokens).
Returns:
Dict with parsed fields (title, season, episode, etc.).
"""
# Tokenize
tokens = tokenizer.tokenize(filename)
if not tokens:
return {"title": None, "season": None, "episode": None,
"group": None, "resolution": None, "source": None,
"special": None}
# Convert to input IDs
input_ids = tokenizer.convert_tokens_to_ids(tokens)
unk_token_id = tokenizer.unk_token_id
unk_tokens = [token for token, token_id in zip(tokens, input_ids) if token_id == unk_token_id]
# Add special tokens
input_ids = [tokenizer.cls_token_id] + input_ids + [tokenizer.sep_token_id]
attention_mask = [1] * len(input_ids)
# Truncate if needed
if len(input_ids) > max_length:
input_ids = [input_ids[0]] + input_ids[1:max_length - 1] + [tokenizer.sep_token_id]
attention_mask = [1] * len(input_ids)
# Pad
pad_len = max_length - len(input_ids)
if pad_len > 0:
input_ids += [tokenizer.pad_token_id] * pad_len
attention_mask += [0] * pad_len
# Predict
device = next(model.parameters()).device
input_tensor = torch.tensor([input_ids], device=device)
mask_tensor = torch.tensor([attention_mask], device=device)
# Remove special token predictions
# Count real tokens used (minus CLS/SEP)
real_token_count = len(tokens)
# Truncate real tokens if we had to truncate
available = min(real_token_count, max_length - 2)
if available <= 0:
return {"title": None, "season": None, "episode": None,
"group": None, "resolution": None, "source": None,
"special": None}
with torch.no_grad():
logits = model(input_ids=input_tensor, attention_mask=mask_tensor).logits
token_logits = logits[0, 1:1 + available, :]
probabilities = torch.softmax(token_logits, dim=-1)
scores, greedy_predictions = torch.max(probabilities, dim=-1)
if constrain_bio:
pred_labels = constrained_bio_decode(token_logits, id2label)
selected_scores = [
probabilities[idx, label_id].detach().cpu().item()
for idx, label_id in enumerate(pred_labels)
]
else:
pred_labels = greedy_predictions.detach().cpu().tolist()
selected_scores = scores.detach().cpu().tolist()
label_strings = [id2label.get(p, "O") for p in pred_labels]
# Post-process
result = postprocess(
tokens[:available],
label_strings,
tokenizer=tokenizer,
filename=filename,
use_rules=use_rules,
)
if debug:
result["_debug"] = {
"tokenizer_variant": getattr(tokenizer, "tokenizer_variant", "regex"),
"decoder": "constrained_bio" if constrain_bio else "greedy",
"max_length": max_length,
"token_count": len(tokens),
"available_token_count": available,
"truncated": len(tokens) > available,
"unk_count": len(unk_tokens),
"unk_rate": len(unk_tokens) / len(tokens) if tokens else 0.0,
"unk_tokens": unk_tokens[:50],
"tokens": tokens[:available],
"labels": label_strings,
"scores": [round(float(score), 4) for score in selected_scores],
"token_table": [
{
"i": i,
"token": display_token(token),
"id": int(token_id),
"label": label,
"score": round(float(score), 4),
}
for i, (token, token_id, label, score) in enumerate(
zip(tokens[:available], input_ids[1:1 + available], label_strings, selected_scores)
)
],
"entities": [
{"type": entity_type, "text": text}
for entity_type, text in labels_to_entities(tokens[:available], label_strings, tokenizer)
],
}
return result
def main():
parser = argparse.ArgumentParser(description="Anime filename parser")
parser.add_argument("filename", nargs="?", type=str, help="Anime filename to parse")
parser.add_argument("--input-file", type=str, help="File with filenames (one per line)")
parser.add_argument("--output-file", type=str, help="Output file for results (JSONL)")
parser.add_argument("--model-dir", type=str, default="./checkpoints/final",
help="Path to trained model directory")
parser.add_argument("--tokenizer", choices=["regex", "char"], default=None,
help="Tokenizer variant override. Defaults to checkpoint metadata")
parser.add_argument("--max-length", type=int, default=64,
help="Maximum sequence length")
parser.add_argument("--debug", action="store_true",
help="Include tokenizer, labels, scores, and entity spans in JSON output")
parser.add_argument("--no-rule-assist", action="store_true",
help="Disable high-confidence structural post-processing rules")
parser.add_argument("--no-constrained-bio", action="store_true",
help="Use greedy per-token decoding instead of constrained BIO Viterbi")
args = parser.parse_args()
# Load config
cfg = Config()
# Load tokenizer
print(f"Loading tokenizer from {args.model_dir}...", file=sys.stderr)
tokenizer = load_tokenizer(args.model_dir, args.tokenizer)
# Load model
print(f"Loading model from {args.model_dir}...", file=sys.stderr)
model = BertForTokenClassification.from_pretrained(args.model_dir)
model.eval()
id2label = {int(k): v for k, v in getattr(model.config, "id2label", cfg.id2label).items()}
max_length = args.max_length
if max_length == 64:
max_length = int(getattr(model.config, "max_seq_length", max_length))
# Process filenames
filenames_to_parse: List[str] = []
if args.filename:
filenames_to_parse.append(args.filename)
if args.input_file:
with open(args.input_file, 'r', encoding='utf-8') as f:
filenames_to_parse.extend(line.strip() for line in f if line.strip())
if not filenames_to_parse:
# Read from stdin
filenames_to_parse.extend(sys.stdin.read().strip().splitlines())
# Parse and output
results: List[Dict] = []
for fn in filenames_to_parse:
if not fn.strip():
continue
result = parse_filename(
fn,
model,
tokenizer,
id2label,
max_length,
debug=args.debug,
use_rules=not args.no_rule_assist,
constrain_bio=not args.no_constrained_bio,
)
result["_input"] = fn
results.append(result)
if args.output_file is None:
print(json.dumps(result, ensure_ascii=False))
if args.output_file:
with open(args.output_file, 'w', encoding='utf-8') as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + '\n')
print(f"Results saved to {args.output_file}", file=sys.stderr)
if __name__ == "__main__":
main()