Token Classification
Transformers
ONNX
Safetensors
English
Japanese
Chinese
bert
anime
filename-parsing
Eval Results (legacy)
Instructions to use ModerRAS/AniFileBERT with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use ModerRAS/AniFileBERT with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("token-classification", model="ModerRAS/AniFileBERT")# Load model directly from transformers import AutoTokenizer, AutoModelForTokenClassification tokenizer = AutoTokenizer.from_pretrained("ModerRAS/AniFileBERT") model = AutoModelForTokenClassification.from_pretrained("ModerRAS/AniFileBERT") - Notebooks
- Google Colab
- Kaggle
| """ | |
| Export weakly-labeled anime filename samples from a DMHY crawler SQLite DB. | |
| The crawler database is append-only while it runs, so this script snapshots a | |
| high-water mark (`files.id <= last_file_id`) and writes that value to a manifest. | |
| Future exports can pass `--min-id last_file_id + 1` to label only newly crawled | |
| rows. | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import random | |
| import re | |
| import sqlite3 | |
| from collections import Counter | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Iterable, List, Optional, Sequence | |
| from tools.data_generator import LABEL_MAP, categorize_meta_token | |
| from anifilebert.label_repairs import season_marker_number | |
| from anifilebert.tokenizer import AnimeTokenizer | |
| VIDEO_EXTENSIONS = { | |
| ".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".rmvb", | |
| ".ts", ".m2ts", ".webm", ".mpg", ".mpeg", ".m4v", | |
| } | |
| NOISE_BRACKETS = { | |
| "mp4", "mkv", "avi", "webm", "mov", "wmv", "flv", "rmvb", "ts", "m2ts", | |
| "raw", "raws", "rip", "10bit", "8bit", "hi10p", "ma10p", "ass", "assx2", | |
| "tc", "sc", "gb", "big5", "cht", "chs", "jpn", "jp", "jap", "eng", | |
| "sdr", "hdr", "hdr10", "uhd", "remux", "tvb", "srt", "srtx2", | |
| "繁中", "简中", "繁日", "简日", "日语", "日文", "外挂", "内封", "字幕", | |
| } | |
| CATEGORY_BRACKETS = { | |
| "国漫", "國漫", "国产", "國產", "国产动漫", "國產動漫", "国产动画", "國產動畫", | |
| "国创", "國創", "中国动漫", "中國動漫", "中国动画", "中國動畫", | |
| } | |
| SPECIAL_RE = re.compile( | |
| r"^(?:ova\d*|oad\d*|sp\d*|movie|the\s*movie|op\d*|ed\d*|pv\d*|cm\d*|" | |
| r"ncop\d*|nced\d*|iv\d+|剧场版|劇場版|特别篇|特別篇)$", | |
| re.I, | |
| ) | |
| SPECIAL_INDEX_BASE_RE = re.compile(r"^(?:NCOP|NCED|OP|ED|PV|CM|IV)$", re.I) | |
| SPECIAL_INDEX_RE = re.compile(r"^(?:NCOP|NCED|OP|ED|PV|CM)\d*$|^IV\d+$", re.I) | |
| SPECIAL_COMPOSITE_RE = re.compile( | |
| r"^(?P<special>(?:(?:NCOP|NCED|OP|ED|PV|CM)\d*|IV\d+))" | |
| r"(?:(?P<sep>[\s._-]+)(?P<episode>(?:EP?|#)?\d{1,4}))?$", | |
| re.I, | |
| ) | |
| SPECIAL_SEARCH_RE = re.compile(r"^(?:檢索|检索|搜索|搜寻|搜尋|别名|別名|alias|search|keyword)\s*[::].+", re.I) | |
| EPISODE_RE = re.compile(r"^(?:[Ee][Pp]?|#)?(\d{1,4})(?:v\d+|END)?$", re.I) | |
| SEASON_RE = re.compile( | |
| r"^(?:" | |
| r"[Ss](\d{1,2})|" | |
| r"Seasons?\s*(\d{1,2})|" | |
| r"第([一二三四五六七八九十\d]+)[季期部]|" | |
| r"(\d+)(?:st|nd|rd|th)\s+[Ss]eason" | |
| r")$", re.I | |
| ) | |
| READING_SEASON_RE = re.compile( | |
| r"^(?:Ni\s+no\s+(?:Sara|Shou|Sho|Syo|Shō)|Ni\s+Gakki|Sono\s+Ni|" | |
| r"San\s+no\s+(?:Sara|Shou|Sho|Syo)|(?:Yon|Shi|Shin)\s+no\s+Sara|" | |
| r"(?:Go|Gou)\s+no\s+Sara)$", | |
| re.I, | |
| ) | |
| CJK_SEQUEL_SEASON_RE = re.compile( | |
| r"^(?:[一二三四五六七八九十兩两貳贰弐弍參叁参肆伍陸陆柒捌玖](?:\s*(?:ノ|の|之)\s*(?:章|期|季|部))?|" | |
| r"[ⅡⅢⅣⅤⅥⅦⅧⅨ]|II|III|IV|V|VI|VII|VIII|IX)$", | |
| re.I, | |
| ) | |
| SXE_RE = re.compile(r"^([Ss]\d{1,2})([Ee]\d{1,4})(?:v\d+)?$") | |
| DATE_RE = re.compile(r"^(?:19|20)\d{2}[.\-_年]?(?:0?[1-9]|1[0-2])?[.\-_月]?(?:0?[1-9]|[12]\d|3[01])?日?$") | |
| HASH_RE = re.compile(r"^[A-Fa-f0-9]{8,}$") | |
| DIMENSION_RE = re.compile(r"^\d{3,4}[xX×]\d{3,4}$") | |
| RESOLUTION_RE = re.compile(r"^(?:\d{3,4}[pP]|\d[Kk]|\d{3,4}[xX×]\d{3,4})$") | |
| RESOLUTION_SEARCH_RE = re.compile(r"(?<![A-Za-z0-9])(?:\d{3,4}[pP]|\d[Kk]|\d{3,4}[xX×]\d{3,4})(?![A-Za-z0-9])") | |
| SOURCE_RE = re.compile( | |
| r"^(?:WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|BD|DVDRip|DVD|TVRip|HDTV|" | |
| r"Netflix|NF|AMZN|Baha|CR|ABEMA|DSNP|U[-_ ]?NEXT|Hulu|AT[-_ ]?X|" | |
| r"x26[45]|h\.?26[45]|HEVC|AVC|AV1|AAC\d*(?:\.\d+)?|AAC|FLAC|MP3|DTS|Opus|" | |
| r"SDR|HDR10?|UHD|REMUX|10bit|8bit|Hi10p|Ma10p|ASSx?\d*|SRTx?\d*|" | |
| r"CHS|CHT|BIG5|GB|JPN?|JPSC|JPTC|简[体體]?|繁[体體]?|简日双语|繁日双语|内封|外挂|MSubs?)$", | |
| re.I, | |
| ) | |
| MEDIA_META_RE = re.compile( | |
| r"(?:WEB[-_ ]?DL|WEB[-_ ]?Rip|BDRip|BluRay|BDMV|BD|DVDRip|DVD|TVRip|HDTV|" | |
| r"x26[45]|h\.?26[45]|HEVC|AVC|AV1|AAC\d*(?:\.\d+)?|FLAC|MP3|DTS|Opus|" | |
| r"10bit|8bit|Hi10p|Ma10p|YUV\d+P?\d*)", | |
| re.I, | |
| ) | |
| GROUP_HINT_RE = re.compile( | |
| r"(?:字幕|字幕组|字幕組|sub|subs|raws?|fansub|studio|house|team|project|" | |
| r"loli|ani|baha|vcb|airota|kiss|dmhy|mabors|lilith|ohys|erai|subsplease)", | |
| re.I, | |
| ) | |
| TRAILING_DECORATION_RE = re.compile( | |
| r"(?:新番|月番|合集|合輯|全集|完结|完結|检索|檢索|招募|字幕|内封|內封|" | |
| r"年齡|年龄|限制|版本|版|" | |
| r"简中|繁中|GB|BIG5|CHS|CHT|JPN?|MP4|MKV|HEVC|AVC|AAC|FLAC|WEB-DL|1080[Pp]|720[Pp])" | |
| ) | |
| class ExportStats: | |
| scanned_rows: int = 0 | |
| video_rows: int = 0 | |
| duplicate_basenames: int = 0 | |
| labeled_samples: int = 0 | |
| skipped_no_episode: int = 0 | |
| skipped_no_title: int = 0 | |
| skipped_too_short: int = 0 | |
| skipped_too_long: int = 0 | |
| def normalize_path_basename(filename: str) -> str: | |
| return re.split(r"[\\/]", filename)[-1].strip() | |
| def strip_video_extension(basename: str) -> tuple[str, str]: | |
| stem, ext = os.path.splitext(basename) | |
| return stem.strip(), ext.lower() | |
| def clean_bracket(token: str) -> str: | |
| return token.strip().strip("[]()【】《》()").strip() | |
| def cn_number_to_int(text: str) -> Optional[int]: | |
| if text.isdigit(): | |
| return int(text) | |
| values = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9} | |
| if text == "十": | |
| return 10 | |
| if text.startswith("十") and len(text) == 2: | |
| return 10 + values.get(text[1], 0) | |
| if text.endswith("十") and len(text) == 2: | |
| return values.get(text[0], 0) * 10 | |
| if "十" in text and len(text) == 3: | |
| return values.get(text[0], 0) * 10 + values.get(text[2], 0) | |
| return values.get(text) | |
| def season_number(token: str) -> Optional[int]: | |
| clean = clean_bracket(token) | |
| match = SEASON_RE.match(clean) | |
| if match: | |
| value = next((g for g in match.groups() if g), None) | |
| if value is None: | |
| return None | |
| return cn_number_to_int(value) | |
| if READING_SEASON_RE.match(clean) or CJK_SEQUEL_SEASON_RE.match(clean): | |
| return season_marker_number(clean) | |
| return None | |
| def is_explicit_season(token: str) -> bool: | |
| """Return True for unambiguous season syntax such as S02 or 第2季.""" | |
| clean = clean_bracket(token) | |
| return bool(SEASON_RE.match(clean)) | |
| def episode_number(token: str) -> Optional[int]: | |
| clean = clean_bracket(token) | |
| if SPECIAL_INDEX_RE.match(clean): | |
| return None | |
| if season_number(clean) is not None: | |
| return None | |
| if DIMENSION_RE.match(clean) or DATE_RE.match(clean) or HASH_RE.match(clean): | |
| return None | |
| if re.match(r"^第\d{1,4}(?:\(\d{1,4}\))?[话話集]$", clean): | |
| return int(re.search(r"\d+", clean).group()) | |
| if re.match(r"^(?:OVA|OAD|SP)\d{1,4}$", clean, re.I): | |
| return int(re.search(r"\d+", clean).group()) | |
| if re.match(r"^\d{1,4}\s*END$", clean, re.I): | |
| return int(re.search(r"\d+", clean).group()) | |
| if re.match(r"^\d{1,4}[._]\d+$", clean): | |
| return int(re.search(r"\d+", clean).group()) | |
| match = EPISODE_RE.match(clean) | |
| if not match: | |
| return None | |
| number = int(match.group(1)) | |
| if number == 0 or number > 2000: | |
| return None | |
| return number | |
| def has_wrapping_brackets(token: str) -> bool: | |
| return len(token) >= 2 and token[0] in "[【(《" and token[-1] in "]】)》" | |
| def is_resolution(token: str) -> bool: | |
| clean = clean_bracket(token) | |
| return bool(RESOLUTION_RE.match(clean) or (has_wrapping_brackets(token) and RESOLUTION_SEARCH_RE.search(clean))) | |
| def is_source(token: str) -> bool: | |
| clean = clean_bracket(token) | |
| if not clean: | |
| return False | |
| if categorize_meta_token(token) in {"RESOLUTION", "SOURCE"} and ( | |
| is_resolution(clean) or SOURCE_RE.match(clean) | |
| ): | |
| return True | |
| if SOURCE_RE.match(clean): | |
| return True | |
| if has_wrapping_brackets(token): | |
| parts = [part for part in re.split(r"[\s&+/,._-]+", clean) if part] | |
| has_source_part = any(SOURCE_RE.match(part) for part in parts) | |
| return has_source_part and all(SOURCE_RE.match(part) or is_noise_bracket(part) for part in parts) | |
| return False | |
| def is_special(token: str) -> bool: | |
| clean = clean_bracket(token) | |
| return bool( | |
| SPECIAL_RE.match(clean) | |
| or SPECIAL_SEARCH_RE.match(clean) | |
| or SPECIAL_COMPOSITE_RE.fullmatch(clean) | |
| ) | |
| def is_special_index_base(token: str) -> bool: | |
| return bool(SPECIAL_INDEX_BASE_RE.match(clean_bracket(token))) | |
| def previous_significant_index(tokens: Sequence[str], idx: int) -> Optional[int]: | |
| cursor = idx - 1 | |
| while cursor >= 0: | |
| if not is_separator_token(tokens[cursor]): | |
| return cursor | |
| cursor -= 1 | |
| return None | |
| def next_significant_index(tokens: Sequence[str], idx: int) -> Optional[int]: | |
| cursor = idx + 1 | |
| while cursor < len(tokens): | |
| if not is_separator_token(tokens[cursor]): | |
| return cursor | |
| cursor += 1 | |
| return None | |
| def previous_non_space_index(tokens: Sequence[str], idx: int) -> Optional[int]: | |
| cursor = idx - 1 | |
| while cursor >= 0: | |
| if tokens[cursor].strip(): | |
| return cursor | |
| cursor -= 1 | |
| return None | |
| def is_special_index_continuation(tokens: Sequence[str], idx: int) -> bool: | |
| clean = clean_bracket(tokens[idx]) | |
| if not re.fullmatch(r"\d{1,4}", clean): | |
| return False | |
| prev_idx = previous_significant_index(tokens, idx) | |
| return prev_idx is not None and is_special_index_base(tokens[prev_idx]) | |
| def has_special_index_continuation_after(tokens: Sequence[str], idx: int) -> bool: | |
| next_idx = next_significant_index(tokens, idx) | |
| return next_idx is not None and is_special_index_continuation(tokens, next_idx) | |
| def is_special_index_sequence_token(tokens: Sequence[str], idx: int) -> bool: | |
| return ( | |
| is_special_index_continuation(tokens, idx) | |
| or (is_special_index_base(tokens[idx]) and has_special_index_continuation_after(tokens, idx)) | |
| ) | |
| def is_episode_after_special_index(tokens: Sequence[str], idx: int) -> bool: | |
| clean = clean_bracket(tokens[idx]) | |
| if episode_number(clean) is None: | |
| return False | |
| prev_idx = previous_significant_index(tokens, idx) | |
| if prev_idx is None: | |
| return False | |
| if is_special_index_continuation(tokens, prev_idx): | |
| return True | |
| if SPECIAL_INDEX_RE.match(clean_bracket(tokens[prev_idx])): | |
| return True | |
| return False | |
| def is_numeric_media_fragment(tokens: Sequence[str], idx: int) -> bool: | |
| clean = clean_bracket(tokens[idx]) | |
| if not re.fullmatch(r"\d{1,4}", clean): | |
| return False | |
| prev_idx = idx - 1 if idx > 0 else None | |
| next_idx = idx + 1 if idx + 1 < len(tokens) else None | |
| prev_clean = clean_bracket(tokens[prev_idx]).lower() if prev_idx is not None else "" | |
| next_clean = clean_bracket(tokens[next_idx]).lower() if next_idx is not None else "" | |
| if next_clean in {"bit", "bits"}: | |
| return True | |
| if prev_clean == "ma" and next_clean == "p": | |
| return True | |
| if prev_clean in {"aac", "flac", "dts", "ddp", "ac3", "mp"} and next_clean == ".": | |
| return True | |
| if prev_clean == ".": | |
| prev_prev = clean_bracket(tokens[idx - 2]).lower() if idx >= 2 else "" | |
| if re.fullmatch(r"\d+", prev_prev): | |
| return True | |
| return False | |
| def is_special_index_suffix(tokens: Sequence[str], idx: int) -> bool: | |
| clean = clean_bracket(tokens[idx]) | |
| if not re.fullmatch(r"\d{1,4}", clean): | |
| return False | |
| prev_idx = previous_significant_index(tokens, idx) | |
| if prev_idx is None: | |
| return False | |
| if is_special_index_base(tokens[prev_idx]): | |
| return True | |
| prev_clean = clean_bracket(tokens[prev_idx]) | |
| return bool(re.fullmatch(r"(?:NCOP|NCED|OP|ED|PV|CM)$", prev_clean, re.I)) | |
| def is_structural_episode_candidate(tokens: Sequence[str], idx: int, number: int) -> bool: | |
| clean = clean_bracket(tokens[idx]) | |
| if re.match(r"^(?:[Ee][Pp]?|#|第|OVA|OAD|SP)", clean, re.I): | |
| return True | |
| if re.match(r"^\d{1,4}(?:v\d+|END)$", clean, re.I): | |
| return True | |
| if has_wrapping_brackets(tokens[idx]): | |
| return True | |
| prev_idx = previous_non_space_index(tokens, idx) | |
| if prev_idx is not None and tokens[prev_idx] in {"-", "_", "|"}: | |
| return True | |
| if idx > 0 and tokens[idx - 1] == "#": | |
| return True | |
| if number >= 100: | |
| return True | |
| next_idx = next_significant_index(tokens, idx) | |
| if next_idx is not None and ( | |
| is_resolution(tokens[next_idx]) | |
| or is_source(tokens[next_idx]) | |
| or is_noise_bracket(tokens[next_idx]) | |
| ): | |
| if prev_idx is None: | |
| return False | |
| if tokens[prev_idx] in {"-", "_", "|"}: | |
| return True | |
| if has_wrapping_brackets(tokens[idx]): | |
| return True | |
| return False | |
| def is_category_bracket(token: str) -> bool: | |
| clean = re.sub(r"[\s._-]+", "", clean_bracket(token)) | |
| return has_wrapping_brackets(token) and clean in CATEGORY_BRACKETS | |
| def is_noise_bracket(token: str) -> bool: | |
| clean = clean_bracket(token) | |
| if not clean: | |
| return True | |
| normalized = re.sub(r"[\s._-]+", "", clean).lower() | |
| if normalized in NOISE_BRACKETS: | |
| return True | |
| if is_category_bracket(token): | |
| return True | |
| if DATE_RE.match(clean) or HASH_RE.match(clean): | |
| return True | |
| return False | |
| def is_group_bracket(token: str, index: int, tokens: Sequence[str]) -> bool: | |
| if not (token.startswith("[") or token.startswith("(") or token.startswith("【") or token.startswith("《")): | |
| return False | |
| clean = clean_bracket(token) | |
| if not clean or is_noise_bracket(token): | |
| return False | |
| if is_resolution(clean) or is_source(clean) or is_special(clean) or episode_number(clean) is not None: | |
| return False | |
| first_content_index = next((i for i, t in enumerate(tokens) if t not in {" ", "-", "_", "|", "~", "~", "."}), 0) | |
| if index == first_content_index: | |
| return True | |
| if index <= first_content_index + 2 and GROUP_HINT_RE.search(clean): | |
| return True | |
| return False | |
| def is_title_token(token: str) -> bool: | |
| if not token.strip(): | |
| return False | |
| if token in {" ", "-", "_", "|", "~", "~", "."}: | |
| return False | |
| clean = clean_bracket(token) | |
| if not clean: | |
| return False | |
| if is_resolution(clean) or is_source(clean) or is_special(clean): | |
| return False | |
| if is_explicit_season(clean) or episode_number(clean) is not None: | |
| return False | |
| if DATE_RE.match(clean) or HASH_RE.match(clean): | |
| return False | |
| if (token.startswith("[") or token.startswith("(") or token.startswith("【") or token.startswith("《")) and TRAILING_DECORATION_RE.search(clean): | |
| return False | |
| return True | |
| def is_title_start_token(tokens: Sequence[str], idx: int, end: int) -> bool: | |
| """Allow numeric title starts like `86 Eighty Six` without allowing episode tails.""" | |
| if is_title_token(tokens[idx]): | |
| return True | |
| clean = clean_bracket(tokens[idx]) | |
| if not re.fullmatch(r"\d{1,4}", clean): | |
| return False | |
| next_idx = idx + 1 | |
| while next_idx < end and is_separator_token(tokens[next_idx]): | |
| next_idx += 1 | |
| return next_idx < end and is_title_token(tokens[next_idx]) | |
| def skip_leading_title_decoration(tokens: Sequence[str], start: int, end: int) -> int: | |
| """Drop decorative release prefixes such as `★04月新番★` from title spans.""" | |
| while start < end: | |
| token = clean_bracket(tokens[start]) | |
| if token not in {"★", "☆"}: | |
| break | |
| closing = None | |
| for idx in range(start + 1, min(end, start + 12)): | |
| if clean_bracket(tokens[idx]) == token: | |
| closing = idx | |
| break | |
| if closing is None: | |
| break | |
| prefix_text = "".join(clean_bracket(piece) for piece in tokens[start:closing + 1]) | |
| if not re.search(r"(?:新番|月番|合集|合輯|全集|完结|完結)", prefix_text): | |
| break | |
| start = closing + 1 | |
| while start < end and is_separator_token(tokens[start]): | |
| start += 1 | |
| return start | |
| def trim_title_span(tokens: Sequence[str], start: int, end: int) -> tuple[int, int]: | |
| start = skip_leading_title_decoration(tokens, start, end) | |
| while start < end and not is_title_start_token(tokens, start, end): | |
| start += 1 | |
| while end > start and not is_title_token(tokens[end - 1]): | |
| end -= 1 | |
| while start < end and TRAILING_DECORATION_RE.search(clean_bracket(tokens[end - 1])): | |
| end -= 1 | |
| while end > start and tokens[end - 1] in {" ", "-", "_", "|", "~", "~", "."}: | |
| end -= 1 | |
| return start, end | |
| def find_episode_index(tokens: Sequence[str]) -> Optional[int]: | |
| candidates: list[tuple[int, int]] = [] | |
| for idx, token in enumerate(tokens): | |
| if is_special_index_continuation(tokens, idx) or is_numeric_media_fragment(tokens, idx): | |
| continue | |
| number = episode_number(token) | |
| if number is None: | |
| continue | |
| if not is_structural_episode_candidate(tokens, idx, number): | |
| continue | |
| clean = clean_bracket(token) | |
| if idx > 0 and tokens[idx - 1] == "." and re.fullmatch(r"\d+", clean): | |
| previous_clean = clean_bracket(tokens[idx - 2]) if idx >= 2 else "" | |
| if previous_clean.lower() in VIDEO_EXTENSIONS or f".{clean}".lower() in VIDEO_EXTENSIONS: | |
| continue | |
| score = 0 | |
| if re.match(r"^(?:[Ee][Pp]?|#|第|OVA|OAD|SP)", clean, re.I): | |
| score += 4 | |
| if token.startswith("[") or token.startswith("(") or token.startswith("【"): | |
| score += 3 | |
| prev_idx = previous_non_space_index(tokens, idx) | |
| if prev_idx is not None and tokens[prev_idx] in {"-", "_", "|"}: | |
| score += 2 | |
| if idx >= len(tokens) // 2: | |
| score += 1 | |
| if 1 <= number <= 200: | |
| score += 1 | |
| candidates.append((score, idx)) | |
| if not candidates: | |
| return None | |
| return max(candidates, key=lambda item: (item[0], item[1]))[1] | |
| def is_separator_token(token: str) -> bool: | |
| return token in {" ", "-", "_", "|", "~", "~", ".", "+", "&", "/", ","} | |
| def has_only_separators_between(tokens: Sequence[str], start: int, end: int) -> bool: | |
| return all(is_separator_token(token) for token in tokens[start:end]) | |
| def is_context_season_token(tokens: Sequence[str], idx: int, episode_idx: int) -> bool: | |
| """Detect compact season markers only when they structurally lead into an episode.""" | |
| if idx >= episode_idx: | |
| return False | |
| token = tokens[idx] | |
| clean = clean_bracket(token) | |
| if not clean: | |
| return False | |
| if is_explicit_season(clean): | |
| return True | |
| if season_number(clean) is None: | |
| return False | |
| if not has_only_separators_between(tokens, idx + 1, episode_idx): | |
| return False | |
| # A bare V is often the volume prefix in V02E01, not season five. | |
| if clean.upper() == "V": | |
| return False | |
| return True | |
| def split_special_composite(clean: str) -> Optional[tuple[str, Optional[str]]]: | |
| match = SPECIAL_COMPOSITE_RE.fullmatch(clean) | |
| if not match: | |
| return None | |
| return match.group("special"), match.group("episode") | |
| def label_special_composite_contents(token: str, tokenizer: AnimeTokenizer) -> tuple[List[str], List[str]]: | |
| inner = clean_bracket(token) | |
| composite = split_special_composite(inner) | |
| if composite is None: | |
| return label_bracket_contents(token, "special", tokenizer) | |
| special, episode = composite | |
| open_char, close_char = bracket_delimiters(token) | |
| tokens: List[str] = [] | |
| cats: List[str] = [] | |
| if open_char: | |
| tokens.append(open_char) | |
| cats.append("sep") | |
| for piece in tokenizer.tokenize(special): | |
| if is_separator_token(piece): | |
| tokens.append(piece) | |
| cats.append("sep") | |
| else: | |
| tokens.append(piece) | |
| cats.append("special") | |
| if episode: | |
| for piece in tokenizer.tokenize(episode): | |
| if is_separator_token(piece): | |
| tokens.append(piece) | |
| cats.append("sep") | |
| else: | |
| tokens.append(piece) | |
| cats.append("episode") | |
| if close_char: | |
| tokens.append(close_char) | |
| cats.append("sep") | |
| return tokens, cats | |
| def clear_trailing_title_separators(tokens: Sequence[str], categories: List[str]) -> None: | |
| idx = len(categories) - 1 | |
| while idx >= 0 and is_separator_token(tokens[idx]) and categories[idx] == "title": | |
| categories[idx] = "sep" | |
| idx -= 1 | |
| def label_context_season_tokens( | |
| tokens: Sequence[str], | |
| categories: List[str], | |
| episode_idx: int, | |
| ) -> None: | |
| if ( | |
| episode_idx >= 2 | |
| and clean_bracket(tokens[episode_idx]).upper().startswith("E") | |
| and clean_bracket(tokens[episode_idx - 2]).upper() == "V" | |
| and clean_bracket(tokens[episode_idx - 1]).isdigit() | |
| ): | |
| categories[episode_idx - 2] = "season" | |
| categories[episode_idx - 1] = "season" | |
| return | |
| for idx in range(episode_idx): | |
| if categories[idx] in {"group", "episode", "resolution", "source", "special"}: | |
| continue | |
| if is_context_season_token(tokens, idx, episode_idx): | |
| categories[idx] = "season" | |
| prev_idx = idx - 1 | |
| while prev_idx >= 0 and is_separator_token(tokens[prev_idx]) and categories[prev_idx] == "title": | |
| categories[prev_idx] = "sep" | |
| prev_idx -= 1 | |
| def label_special_index_sequences(tokens: Sequence[str], categories: List[str]) -> None: | |
| """Keep NCOP_01 / NCED 16 / IV05 style codes as a single SPECIAL span.""" | |
| idx = 0 | |
| while idx < len(tokens): | |
| if not is_special_index_base(tokens[idx]): | |
| idx += 1 | |
| continue | |
| next_idx = next_significant_index(tokens, idx) | |
| if next_idx is None or not is_special_index_continuation(tokens, next_idx): | |
| idx += 1 | |
| continue | |
| categories[idx] = "special" | |
| for between in range(idx + 1, next_idx): | |
| if is_separator_token(tokens[between]): | |
| categories[between] = "special" | |
| categories[next_idx] = "special" | |
| idx = next_idx + 1 | |
| def repair_structured_bracket_title_aliases( | |
| tokens: Sequence[str], | |
| categories: List[str], | |
| episode_idx: int, | |
| ) -> None: | |
| """Keep the primary title in category-prefixed bracket series. | |
| GM-Team-style rows often look like: | |
| [GROUP][国漫][中文标题 第2季][English Alias Ⅱ][2026][04][meta] | |
| The category, alias, and year brackets are metadata for parsing purposes; | |
| the first real title bracket after the category is the canonical title. | |
| """ | |
| if not any(is_category_bracket(tokens[idx]) for idx in range(min(episode_idx, len(tokens)))): | |
| return | |
| title_candidates = [ | |
| idx | |
| for idx in range(episode_idx) | |
| if categories[idx] == "title" | |
| and has_wrapping_brackets(tokens[idx]) | |
| and is_title_token(tokens[idx]) | |
| ] | |
| if not title_candidates: | |
| return | |
| primary_idx = title_candidates[0] | |
| for idx in title_candidates[1:]: | |
| categories[idx] = "sep" | |
| for idx in range(episode_idx): | |
| if idx == primary_idx: | |
| continue | |
| if is_category_bracket(tokens[idx]) or DATE_RE.match(clean_bracket(tokens[idx])): | |
| categories[idx] = "sep" | |
| def embedded_bracket_episode(token: str) -> Optional[tuple[str, str, str]]: | |
| """Split malformed tokens such as '[Group}Title[658]' into title + episode.""" | |
| clean_token = clean_bracket(token) | |
| if is_special(token) or SPECIAL_INDEX_RE.match(clean_token) or SPECIAL_COMPOSITE_RE.fullmatch(clean_token): | |
| return None | |
| if has_wrapping_brackets(token) and ( | |
| HASH_RE.match(clean_token) | |
| or RESOLUTION_SEARCH_RE.search(clean_token) | |
| or MEDIA_META_RE.search(clean_token) | |
| ): | |
| return None | |
| if episode_number(token) is not None: | |
| return None | |
| match = re.match(r"^(?P<prefix>.+?)\[(?P<episode>\d{1,4}(?:v\d+)?)(?P<close>\])?$", token, re.I) | |
| if match is None and has_wrapping_brackets(token): | |
| match = re.match(r"^(?P<prefix>.+?)(?P<episode>\d{2,4})(?P<close>[\]\)】》])$", token, re.I) | |
| if not match: | |
| return None | |
| prefix = match.group("prefix") | |
| episode = match.group("episode") | |
| close = match.group("close") or "" | |
| if not clean_bracket(prefix): | |
| return None | |
| if SPECIAL_INDEX_BASE_RE.match(clean_bracket(prefix)): | |
| return None | |
| number = int(re.search(r"\d+", episode).group()) | |
| if number == 0 or number > 2000: | |
| return None | |
| return prefix, episode, close | |
| def append_tokenized_category( | |
| tokens: List[str], | |
| categories: List[str], | |
| text: str, | |
| category: str, | |
| tokenizer: AnimeTokenizer, | |
| ) -> None: | |
| for piece in tokenizer.tokenize(text): | |
| if not piece: | |
| continue | |
| if is_separator_token(piece) or piece in {"[", "]", "(", ")", "【", "】", "《", "》"}: | |
| piece_category = "sep" | |
| else: | |
| piece_category = category | |
| tokens.append(piece) | |
| categories.append(piece_category) | |
| def finalize_weak_sample( | |
| tokens: Sequence[str], | |
| categories: Sequence[str], | |
| tokenizer: AnimeTokenizer, | |
| require_episode: bool = True, | |
| require_title: bool = True, | |
| ) -> Optional[dict]: | |
| expanded_tokens, expanded_categories = expand_tokens_and_categories(tokens, categories, tokenizer) | |
| # Only unambiguous season forms are promoted here. Compact sequel markers | |
| # such as 貳, II, or Ni no Sara need episode context and are repaired by | |
| # label_repairs from character spans; treating every single CJK numeral as | |
| # season would corrupt titles like 魯邦三世. | |
| for idx, token in enumerate(expanded_tokens): | |
| if expanded_categories[idx] in {"sep", "episode", "group", "source", "resolution", "special", "season"}: | |
| continue | |
| if is_explicit_season(token): | |
| expanded_categories[idx] = "season" | |
| prev_idx = idx - 1 | |
| while prev_idx >= 0 and is_separator_token(expanded_tokens[prev_idx]) and expanded_categories[prev_idx] == "title": | |
| expanded_categories[prev_idx] = "sep" | |
| prev_idx -= 1 | |
| labels = assign_iob2(expanded_categories) | |
| if len(expanded_tokens) != len(labels): | |
| return None | |
| if require_title and not any(label.endswith("TITLE") for label in labels): | |
| return None | |
| if require_episode and not any(label.endswith("EPISODE") for label in labels): | |
| return None | |
| return {"tokens": expanded_tokens, "labels": labels} | |
| def assign_iob2(categories: Sequence[str]) -> List[str]: | |
| labels: List[str] = [] | |
| previous_entity: Optional[str] = None | |
| for category in categories: | |
| entity = LABEL_MAP.get(category, "O") | |
| if entity == "O": | |
| labels.append("O") | |
| previous_entity = None | |
| continue | |
| prefix = "I" if previous_entity == entity else "B" | |
| labels.append(f"{prefix}-{entity}") | |
| previous_entity = entity | |
| return labels | |
| def fallback_embedded_episode_sample( | |
| tokens: Sequence[str], | |
| tokenizer: AnimeTokenizer, | |
| ) -> Optional[dict]: | |
| rebuilt_tokens: List[str] = [] | |
| rebuilt_categories: List[str] = [] | |
| used_episode = False | |
| for token in tokens: | |
| embedded = embedded_bracket_episode(token) | |
| if embedded and not used_episode: | |
| prefix, episode, close = embedded | |
| append_tokenized_category(rebuilt_tokens, rebuilt_categories, prefix, "title", tokenizer) | |
| rebuilt_tokens.append(episode) | |
| rebuilt_categories.append("episode") | |
| if close: | |
| rebuilt_tokens.append(close) | |
| rebuilt_categories.append("sep") | |
| used_episode = True | |
| continue | |
| if not used_episode: | |
| category = "sep" if is_separator_token(token) else "title" | |
| elif is_resolution(token): | |
| category = "resolution" | |
| elif is_source(token): | |
| category = "source" | |
| elif is_special(token): | |
| category = "special" | |
| else: | |
| category = "sep" | |
| rebuilt_tokens.append(token) | |
| rebuilt_categories.append(category) | |
| if not used_episode: | |
| return None | |
| return finalize_weak_sample(rebuilt_tokens, rebuilt_categories, tokenizer) | |
| def has_embedded_episode_candidate(tokens: Sequence[str]) -> bool: | |
| return any(embedded_bracket_episode(token) is not None for token in tokens) | |
| def fallback_episode_first_sample( | |
| tokens: Sequence[str], | |
| categories: Sequence[str], | |
| episode_idx: int, | |
| tokenizer: AnimeTokenizer, | |
| ) -> Optional[dict]: | |
| fallback_categories = ["sep"] * len(tokens) | |
| # V02E01-style catalog rows are episode-first. The tokenizer currently | |
| # exposes them as V, 02, E01, so keep V02 together as a season span. | |
| if ( | |
| episode_idx >= 2 | |
| and clean_bracket(tokens[episode_idx]).upper().startswith("E") | |
| and clean_bracket(tokens[episode_idx - 2]).upper() == "V" | |
| and clean_bracket(tokens[episode_idx - 1]).isdigit() | |
| ): | |
| fallback_categories[episode_idx - 2] = "season" | |
| fallback_categories[episode_idx - 1] = "season" | |
| else: | |
| label_context_season_tokens(tokens, fallback_categories, episode_idx) | |
| fallback_categories[episode_idx] = "episode" | |
| title_indices: List[int] = [] | |
| for idx in range(episode_idx + 1, len(tokens)): | |
| token = tokens[idx] | |
| if is_separator_token(token): | |
| continue | |
| if is_resolution(token) or is_source(token) or is_special(token) or is_noise_bracket(token): | |
| fallback_categories[idx] = "resolution" if is_resolution(token) else "source" if is_source(token) else "special" if is_special(token) else "sep" | |
| continue | |
| title_indices.append(idx) | |
| if not title_indices: | |
| # Some rows are title-only brackets followed by season/episode, | |
| # e.g. [伊蘇] II-01. If the leading bracket was guessed as GROUP but | |
| # no real title exists, use it as TITLE to keep the row useful. | |
| for idx in range(episode_idx): | |
| if categories[idx] == "group" and clean_bracket(tokens[idx]): | |
| title_indices.append(idx) | |
| break | |
| for idx in title_indices: | |
| fallback_categories[idx] = "title" | |
| if title_indices: | |
| for idx in range(title_indices[0], title_indices[-1] + 1): | |
| if is_separator_token(tokens[idx]): | |
| fallback_categories[idx] = "title" | |
| return finalize_weak_sample(tokens, fallback_categories, tokenizer) | |
| def fallback_minimal_sample( | |
| tokens: Sequence[str], | |
| episode_idx: int, | |
| tokenizer: AnimeTokenizer, | |
| ) -> Optional[dict]: | |
| """Keep malformed low-information rows instead of silently dropping them.""" | |
| categories: List[str] = [] | |
| title_idx: Optional[int] = None | |
| for idx, token in enumerate(tokens): | |
| if idx == episode_idx: | |
| categories.append("episode") | |
| elif is_resolution(token): | |
| categories.append("resolution") | |
| elif is_source(token): | |
| categories.append("source") | |
| elif is_special(token): | |
| categories.append("special") | |
| if title_idx is None: | |
| title_idx = idx | |
| else: | |
| categories.append("sep") | |
| if title_idx is None: | |
| for idx, token in enumerate(tokens): | |
| if idx == episode_idx or is_separator_token(token): | |
| continue | |
| if categories[idx] not in {"resolution", "source"}: | |
| title_idx = idx | |
| break | |
| if title_idx is None: | |
| return None | |
| categories[title_idx] = "title" | |
| return finalize_weak_sample(tokens, categories, tokenizer) | |
| def fallback_no_episode_sample(tokens: Sequence[str], tokenizer: AnimeTokenizer) -> Optional[dict]: | |
| """Label movies, OP/ED/SP, and malformed rows that have no true episode token.""" | |
| categories: List[str] = [] | |
| seen_title = False | |
| title_allowed = True | |
| for idx, token in enumerate(tokens): | |
| if is_separator_token(token): | |
| categories.append("title" if seen_title and title_allowed else "sep") | |
| continue | |
| if idx == 0 and is_group_bracket(token, idx, tokens): | |
| categories.append("group") | |
| continue | |
| if is_resolution(token): | |
| categories.append("resolution") | |
| title_allowed = False | |
| continue | |
| if is_source(token): | |
| categories.append("source") | |
| title_allowed = False | |
| continue | |
| if is_special_index_sequence_token(tokens, idx) or is_special(token): | |
| clear_trailing_title_separators(tokens, categories) | |
| categories.append("special") | |
| title_allowed = False | |
| continue | |
| if is_noise_bracket(token): | |
| categories.append("sep") | |
| continue | |
| if seen_title and not title_allowed: | |
| categories.append("sep") | |
| continue | |
| categories.append("title") | |
| seen_title = True | |
| label_special_index_sequences(tokens, categories) | |
| require_title = any(category == "title" for category in categories) | |
| return finalize_weak_sample( | |
| tokens, | |
| categories, | |
| tokenizer, | |
| require_episode=False, | |
| require_title=require_title, | |
| ) | |
| def bracket_delimiters(token: str) -> tuple[str, str]: | |
| open_char = token[0] if token and token[0] in "[【(《" else "" | |
| close_char = token[-1] if token and token[-1] in "]】)》" else "" | |
| return open_char, close_char | |
| def label_bracket_contents(token: str, category: str, tokenizer: AnimeTokenizer) -> tuple[List[str], List[str]]: | |
| inner = clean_bracket(token) | |
| if not inner: | |
| return [token], [category] | |
| open_char, close_char = bracket_delimiters(token) | |
| inner_tokens = tokenizer.tokenize(inner) | |
| tokens: List[str] = [] | |
| cats: List[str] = [] | |
| if open_char: | |
| tokens.append(open_char) | |
| cats.append("sep") | |
| tokens.extend(inner_tokens) | |
| cats.extend([category] * len(inner_tokens)) | |
| if close_char: | |
| tokens.append(close_char) | |
| cats.append("sep") | |
| return tokens, cats | |
| def label_meta_bracket_contents(token: str, tokenizer: AnimeTokenizer) -> tuple[List[str], List[str]]: | |
| inner = clean_bracket(token) | |
| if not inner: | |
| return [token], ["sep"] | |
| open_char, close_char = bracket_delimiters(token) | |
| inner_tokens = tokenizer.tokenize(inner) | |
| tokens: List[str] = [] | |
| cats: List[str] = [] | |
| if open_char: | |
| tokens.append(open_char) | |
| cats.append("sep") | |
| for inner_token in inner_tokens: | |
| if inner_token in {" ", "-", "_", "|", "~", "~", ".", "+", "&", "/", ","}: | |
| cat = "sep" | |
| elif is_resolution(inner_token) or RESOLUTION_SEARCH_RE.fullmatch(inner_token): | |
| cat = "resolution" | |
| elif is_source(inner_token): | |
| cat = "source" | |
| elif is_special(inner_token): | |
| cat = "special" | |
| elif is_noise_bracket(inner_token): | |
| cat = "sep" | |
| else: | |
| cat = "sep" | |
| tokens.append(inner_token) | |
| cats.append(cat) | |
| if close_char: | |
| tokens.append(close_char) | |
| cats.append("sep") | |
| return tokens, cats | |
| def expand_tokens_and_categories( | |
| tokens: Sequence[str], | |
| categories: Sequence[str], | |
| tokenizer: AnimeTokenizer, | |
| ) -> tuple[List[str], List[str]]: | |
| expanded_tokens: List[str] = [] | |
| expanded_categories: List[str] = [] | |
| for token, category in zip(tokens, categories): | |
| clean = clean_bracket(token) | |
| if category == "season": | |
| match = SXE_RE.match(clean) | |
| if match: | |
| expanded_tokens.extend([match.group(1), match.group(2)]) | |
| expanded_categories.extend(["season", "episode"]) | |
| continue | |
| if category == "special" and ( | |
| token.startswith("[") or token.startswith("(") or token.startswith("【") or token.startswith("《") | |
| ): | |
| split_tokens, split_categories = label_special_composite_contents(token, tokenizer) | |
| expanded_tokens.extend(split_tokens) | |
| expanded_categories.extend(split_categories) | |
| continue | |
| if category in {"group", "title"} and ( | |
| token.startswith("[") or token.startswith("(") or token.startswith("【") or token.startswith("《") | |
| ): | |
| split_tokens, split_categories = label_bracket_contents(token, category, tokenizer) | |
| expanded_tokens.extend(split_tokens) | |
| expanded_categories.extend(split_categories) | |
| continue | |
| if category in {"source", "resolution", "special", "sep"} and ( | |
| token.startswith("[") or token.startswith("(") or token.startswith("【") or token.startswith("《") | |
| ): | |
| split_tokens, split_categories = label_meta_bracket_contents(token, tokenizer) | |
| if any(cat != "sep" for cat in split_categories): | |
| expanded_tokens.extend(split_tokens) | |
| expanded_categories.extend(split_categories) | |
| continue | |
| expanded_tokens.append(token) | |
| expanded_categories.append(category) | |
| return expanded_tokens, expanded_categories | |
| def weak_label_filename(filename: str, tokenizer: AnimeTokenizer) -> Optional[dict]: | |
| basename = normalize_path_basename(str(filename)) | |
| stem, ext = strip_video_extension(basename) | |
| if ext in VIDEO_EXTENSIONS: | |
| filename = stem | |
| else: | |
| filename = basename | |
| tokens = tokenizer.tokenize(filename) | |
| if not tokens: | |
| return None | |
| if has_embedded_episode_candidate(tokens): | |
| embedded_sample = fallback_embedded_episode_sample(tokens, tokenizer) | |
| if embedded_sample is not None: | |
| return embedded_sample | |
| categories = ["sep" if token in {" ", "-", "_", "|", "~", "~", "."} else "title" for token in tokens] | |
| for idx, token in enumerate(tokens): | |
| if is_group_bracket(token, idx, tokens): | |
| categories[idx] = "group" | |
| for idx, token in enumerate(tokens): | |
| if categories[idx] == "group": | |
| continue | |
| if is_category_bracket(token): | |
| categories[idx] = "sep" | |
| elif is_resolution(token): | |
| categories[idx] = "resolution" | |
| elif is_source(token): | |
| categories[idx] = "source" | |
| elif is_special_index_sequence_token(tokens, idx): | |
| categories[idx] = "special" | |
| elif is_special(token): | |
| categories[idx] = "special" | |
| elif is_explicit_season(token): | |
| categories[idx] = "season" | |
| elif is_noise_bracket(token): | |
| categories[idx] = "sep" | |
| episode_idx = find_episode_index(tokens) | |
| if episode_idx is None: | |
| label_special_index_sequences(tokens, categories) | |
| return fallback_embedded_episode_sample(tokens, tokenizer) or fallback_no_episode_sample(tokens, tokenizer) | |
| categories[episode_idx] = "episode" | |
| label_special_index_sequences(tokens, categories) | |
| label_context_season_tokens(tokens, categories, episode_idx) | |
| repair_structured_bracket_title_aliases(tokens, categories, episode_idx) | |
| # S01E07 is tokenized as S01 + E07 after tokenizer changes. If an older | |
| # token slips through, expand_tokens_and_categories will split it. | |
| clean_episode = clean_bracket(tokens[episode_idx]) | |
| sxe_match = SXE_RE.match(clean_episode) | |
| if sxe_match: | |
| categories[episode_idx] = "season" | |
| elif not any(cat == "season" for cat in categories[:episode_idx]): | |
| for idx in range(episode_idx - 1, -1, -1): | |
| if categories[idx] == "sep": | |
| continue | |
| clean = clean_bracket(tokens[idx]) | |
| if re.fullmatch(r"[0-9]+", clean) and 1 <= int(clean) <= 20 and not ( | |
| tokens[idx].startswith("[") or tokens[idx].startswith("(") or tokens[idx].startswith("【") | |
| ): | |
| categories[idx] = "season" | |
| break | |
| title_end = episode_idx | |
| while title_end > 0 and categories[title_end - 1] in {"season", "sep"}: | |
| title_end -= 1 | |
| title_start = 0 | |
| while title_start < title_end and categories[title_start] in {"group", "sep", "source", "resolution", "special"}: | |
| title_start += 1 | |
| title_start, title_end = trim_title_span(tokens, title_start, title_end) | |
| if title_start >= title_end: | |
| return fallback_embedded_episode_sample(tokens, tokenizer) or fallback_episode_first_sample( | |
| tokens, categories, episode_idx, tokenizer | |
| ) or fallback_minimal_sample( | |
| tokens, episode_idx, tokenizer | |
| ) | |
| for idx, token in enumerate(tokens): | |
| if title_start <= idx < title_end: | |
| if categories[idx] not in {"group", "season", "episode", "resolution", "source", "special"}: | |
| categories[idx] = "title" | |
| elif categories[idx] == "title": | |
| categories[idx] = "sep" | |
| if not any(cat == "title" for cat in categories) or not any(cat == "episode" for cat in categories): | |
| return fallback_embedded_episode_sample(tokens, tokenizer) or fallback_episode_first_sample( | |
| tokens, categories, episode_idx, tokenizer | |
| ) or fallback_minimal_sample( | |
| tokens, episode_idx, tokenizer | |
| ) | |
| return finalize_weak_sample(tokens, categories, tokenizer) | |
| def iter_db_rows(db_path: Path, min_id: int, max_id: int) -> Iterable[tuple[int, str]]: | |
| uri = f"file:{db_path}?mode=ro" | |
| conn = sqlite3.connect(uri, uri=True, timeout=30) | |
| conn.execute("PRAGMA query_only=ON") | |
| try: | |
| query = "SELECT id, filename FROM files WHERE id >= ? AND id <= ? ORDER BY id" | |
| yield from conn.execute(query, (min_id, max_id)) | |
| finally: | |
| conn.close() | |
| def export_dataset(args: argparse.Namespace) -> None: | |
| db_path = Path(args.db) | |
| output_path = Path(args.output) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=30) | |
| conn.execute("PRAGMA query_only=ON") | |
| try: | |
| db_max_id = conn.execute("SELECT MAX(id) FROM files").fetchone()[0] or 0 | |
| max_id = min(args.max_id if args.max_id is not None else db_max_id, db_max_id) | |
| finally: | |
| conn.close() | |
| base_vocab = None | |
| if args.base_vocab: | |
| base_tokenizer = AnimeTokenizer(vocab_file=args.base_vocab) | |
| base_vocab = base_tokenizer.get_vocab() | |
| tokenizer = AnimeTokenizer() | |
| stats = ExportStats() | |
| seen_basenames: set[str] = set() | |
| token_lists: List[List[str]] = [] | |
| label_counter: Counter[str] = Counter() | |
| examples: List[dict] = [] | |
| with output_path.open("w", encoding="utf-8") as out: | |
| for file_id, raw_filename in iter_db_rows(db_path, args.min_id, max_id): | |
| stats.scanned_rows += 1 | |
| basename = normalize_path_basename(raw_filename) | |
| stem, ext = strip_video_extension(basename) | |
| if ext not in VIDEO_EXTENSIONS: | |
| continue | |
| stats.video_rows += 1 | |
| if stem in seen_basenames: | |
| stats.duplicate_basenames += 1 | |
| continue | |
| seen_basenames.add(stem) | |
| if len(stem) < args.min_chars: | |
| stats.skipped_too_short += 1 | |
| continue | |
| if len(stem) > args.max_chars: | |
| stats.skipped_too_long += 1 | |
| continue | |
| sample = weak_label_filename(stem, tokenizer) | |
| if sample is None: | |
| # Most failures are no confident episode or no title; keep the | |
| # manifest aggregate conservative instead of over-classifying. | |
| stats.skipped_no_episode += 1 | |
| continue | |
| labels = sample["labels"] | |
| if not any(label.endswith("TITLE") for label in labels): | |
| stats.skipped_no_title += 1 | |
| continue | |
| if not any(label.endswith("EPISODE") for label in labels): | |
| stats.skipped_no_episode += 1 | |
| continue | |
| record = { | |
| "file_id": file_id, | |
| "filename": stem, | |
| "tokens": sample["tokens"], | |
| "labels": labels, | |
| } | |
| out.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| stats.labeled_samples += 1 | |
| token_lists.append(sample["tokens"]) | |
| label_counter.update(labels) | |
| if len(examples) < args.example_count: | |
| examples.append(record) | |
| if args.limit and stats.labeled_samples >= args.limit: | |
| break | |
| tokenizer.build_vocab(token_lists, max_size=args.max_vocab_size, base_vocab=base_vocab) | |
| tokenizer.save_vocabulary(output_path.parent) | |
| manifest = { | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "source_db": str(db_path), | |
| "output": str(output_path), | |
| "min_file_id": args.min_id, | |
| "last_file_id": max_id, | |
| "db_max_file_id_at_export_start": db_max_id, | |
| "limit": args.limit, | |
| "stats": stats.__dict__, | |
| "label_counts": dict(label_counter), | |
| "vocab_size": tokenizer.vocab_size, | |
| "notes": [ | |
| "Rows are a snapshot of files.id <= last_file_id.", | |
| "Future incremental export can use --min-id last_file_id+1.", | |
| "Weak labels target GROUP, TITLE, SEASON, and EPISODE; media tags are boundary labels/noise.", | |
| ], | |
| "examples": examples, | |
| } | |
| manifest_path = output_path.with_suffix(".manifest.json") | |
| manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8") | |
| print(json.dumps({k: v for k, v in manifest.items() if k != "examples"}, ensure_ascii=False, indent=2)) | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Export weakly-labeled DMHY filename dataset") | |
| parser.add_argument("--db", default=r"D:\WorkSpace\Python\dmhy-parser\dmhy_anime.db", help="DMHY SQLite database") | |
| parser.add_argument("--output", default="data/dmhy_weak.jsonl", help="Output JSONL path") | |
| parser.add_argument("--min-id", type=int, default=1, help="Minimum files.id to export") | |
| parser.add_argument("--max-id", type=int, default=None, help="Maximum files.id to export; defaults to current DB max") | |
| parser.add_argument("--limit", type=int, default=None, help="Maximum labeled samples to write") | |
| parser.add_argument("--min-chars", type=int, default=4, help="Minimum stem length") | |
| parser.add_argument("--max-chars", type=int, default=180, help="Maximum stem length") | |
| parser.add_argument("--example-count", type=int, default=20, help="Examples to include in manifest") | |
| parser.add_argument("--base-vocab", default=None, help="Optional vocab whose IDs should be preserved") | |
| parser.add_argument("--max-vocab-size", type=int, default=3000, help="Maximum vocab size including special tokens") | |
| parser.add_argument("--seed", type=int, default=42, help="Random seed") | |
| return parser.parse_args() | |
| if __name__ == "__main__": | |
| parsed_args = parse_args() | |
| random.seed(parsed_args.seed) | |
| export_dataset(parsed_args) | |