Token Classification
Transformers
ONNX
Safetensors
English
Japanese
Chinese
bert
anime
filename-parsing
Eval Results (legacy)
Instructions to use ModerRAS/AniFileBERT with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use ModerRAS/AniFileBERT with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("token-classification", model="ModerRAS/AniFileBERT")# Load model directly from transformers import AutoTokenizer, AutoModelForTokenClassification tokenizer = AutoTokenizer.from_pretrained("ModerRAS/AniFileBERT") model = AutoModelForTokenClassification.from_pretrained("ModerRAS/AniFileBERT") - Notebooks
- Google Colab
- Kaggle
| """Annotate DMHY prefix graph terminals and emit weak-label dataset rows. | |
| The graph producer intentionally leaves terminal.annotations empty. This tool | |
| adds a deterministic suffix-format layer without depending on network access: | |
| - classify suffix examples into episode-title text vs media/hash metadata | |
| - optionally ask an OpenAI-compatible Responses API for a second opinion | |
| - write dmhy_weak-compatible JSONL records: filename, tokens, labels | |
| - optionally write graph annotation patch JSONL and/or a merged graph JSON | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import urllib.error | |
| import urllib.request | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Iterable | |
| from anifilebert.tokenizer import AnimeTokenizer | |
| from tools.dmhy_dataset import weak_label_filename | |
| DEFAULT_GRAPH = Path("datasets/AnimeName/dmhy_prefix_graph.json") | |
| DEFAULT_SOURCE_LIST = Path("datasets/AnimeName/dmhy_list.jsonl") | |
| DEFAULT_OUTPUT = Path("datasets/AnimeName/dmhy_weak.generated.jsonl") | |
| DEFAULT_PATCH_OUTPUT = Path("datasets/AnimeName/dmhy_prefix_graph.annotations.jsonl") | |
| DEFAULT_MODEL = "gpt-5.4-mini" | |
| SOURCE = "heuristic-v1" | |
| LLM_SOURCE = "responses-v1" | |
| TRAILING_HASH_RE = re.compile(r"^[A-Fa-f0-9]{8,}$") | |
| RESOLUTION_RE = re.compile(r"(?i)(?:\b\d{3,4}p\b|\b\dk\b|\b\d{3,4}[xX×]\d{3,4}\b)") | |
| MEDIA_WORD_RE = re.compile( | |
| r"(?i)\b(?:" | |
| r"web[-_. ]?dl|web[-_. ]?rip|bdrip|blu[-_. ]?ray|bdmv|bd|dvd[-_. ]?rip|dvd|" | |
| r"hdtv|tvrip|remux|x26[45]|h\.?26[45]|hevc|avc|av1|aac\d*(?:\.\d+)?|" | |
| r"flac|mp3|dts|opus|10[-_. ]?bit|8[-_. ]?bit|hi10p|ma10p|yuv\d+p?\d*|" | |
| r"chs|cht|gb|big5|jpn?|eng|m?subs?|assx?\d*|srtx?\d*|vfr|cfr|" | |
| r"nf|netflix|amzn|baha|cr|abema|dsnp|hulu" | |
| r")\b" | |
| ) | |
| LANG_CJK_RE = re.compile(r"(?:字幕|简体|繁体|简中|繁中|日语|英语|双语|内封|外挂)") | |
| QUOTED_RE = re.compile(r"[「『\"“](.+?)[」』\"”]") | |
| BRACKET_SEGMENT_RE = re.compile(r"(\[[^\]]+\]|\([^)]+\)|【[^】]+】|《[^》]+》)") | |
| PATH_EPISODE_TITLE_RE = re.compile( | |
| r"(?i)(?:^|[/\\])[^/\\]*?(?:" | |
| r"S\d{1,2}E\d{1,4}|\d{1,2}x\d{1,4}|EP?\.?\s*\d{1,4}|ACT\.?\d{1,4}|第\s*\d{1,4}\s*[話话集回]" | |
| r")\s*[-_ ]+(?P<title>[^/\\\[\(【《]+)" | |
| ) | |
| class Args: | |
| graph: Path | |
| source_list: Path | |
| output: Path | |
| patch_output: Path | None | |
| merge_output: Path | None | |
| limit: int | None | |
| min_weight: int | None | |
| only_needs_review: bool | |
| llm: bool | |
| base_url: str | |
| api_key: str | None | |
| model: str | |
| max_requests: int | None | |
| http_timeout: int | |
| preserve_i_labels: bool | |
| examples_only: bool | |
| def parse_args() -> Args: | |
| parser = argparse.ArgumentParser( | |
| description="Annotate DMHY prefix graph terminals and write dmhy_weak-compatible rows" | |
| ) | |
| parser.add_argument("--graph", type=Path, default=DEFAULT_GRAPH, help="Input dmhy_prefix_graph.json") | |
| parser.add_argument( | |
| "--source-list", | |
| type=Path, | |
| default=DEFAULT_SOURCE_LIST, | |
| help="Input dmhy_list.jsonl with full raw values; each line must contain a JSON object with value", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| type=Path, | |
| default=DEFAULT_OUTPUT, | |
| help="Output dataset JSONL records compatible with dmhy_weak.jsonl", | |
| ) | |
| parser.add_argument( | |
| "--patch-output", | |
| default=str(DEFAULT_PATCH_OUTPUT), | |
| help="Optional JSONL terminal annotation patches; use empty string to disable", | |
| ) | |
| parser.add_argument("--merge-output", type=Path, default=None, help="Optional full graph JSON with terminal.annotations merged") | |
| parser.add_argument("--limit", type=int, default=None, help="Maximum selected terminals to process") | |
| parser.add_argument("--min-weight", type=int, default=None, help="Only process terminals with weight >= this value") | |
| parser.add_argument("--only-needs-review", action="store_true", help="Only process terminals with ambiguous suffix examples") | |
| parser.add_argument("--llm", action="store_true", help="Opt in to Responses API annotation") | |
| parser.add_argument( | |
| "--base-url", | |
| default=os.environ.get("ANIFILEBERT_LLM_BASE_URL", "http://10.137.32.209:8317/v1"), | |
| help="OpenAI-compatible API base URL; used only with --llm", | |
| ) | |
| parser.add_argument( | |
| "--api-key", | |
| default=os.environ.get("ANIFILEBERT_LLM_API_KEY"), | |
| help="API key; falls back to ANIFILEBERT_LLM_API_KEY", | |
| ) | |
| parser.add_argument("--model", default=DEFAULT_MODEL, help="Responses API model") | |
| parser.add_argument("--max-requests", type=int, default=None, help="Maximum LLM requests; omitted means no cap") | |
| parser.add_argument("--http-timeout", type=int, default=120, help="HTTP timeout in seconds per LLM request") | |
| parser.add_argument( | |
| "--preserve-i-labels", | |
| action="store_true", | |
| help="Keep I-* labels from weak labeling instead of normalizing generated token labels to B/O only", | |
| ) | |
| parser.add_argument( | |
| "--examples-only", | |
| action="store_true", | |
| help="Use terminal.value_examples only; preserves the old small-sample behavior", | |
| ) | |
| ns = parser.parse_args() | |
| patch_output_arg = str(ns.patch_output).strip() | |
| patch_output = Path(patch_output_arg) if patch_output_arg else None | |
| if patch_output is not None and str(patch_output).strip() == "": | |
| patch_output = None | |
| return Args( | |
| graph=ns.graph, | |
| source_list=ns.source_list, | |
| output=ns.output, | |
| patch_output=patch_output, | |
| merge_output=ns.merge_output, | |
| limit=ns.limit, | |
| min_weight=ns.min_weight, | |
| only_needs_review=ns.only_needs_review, | |
| llm=ns.llm, | |
| base_url=ns.base_url, | |
| api_key=ns.api_key, | |
| model=ns.model, | |
| max_requests=ns.max_requests, | |
| http_timeout=ns.http_timeout, | |
| preserve_i_labels=ns.preserve_i_labels, | |
| examples_only=ns.examples_only, | |
| ) | |
| def load_graph(path: Path) -> dict[str, Any]: | |
| if not path.exists(): | |
| raise SystemExit(f"graph not found: {path}") | |
| try: | |
| graph = json.loads(path.read_text(encoding="utf-8")) | |
| except json.JSONDecodeError as exc: | |
| raise SystemExit(f"invalid graph JSON in {path}: {exc}") from exc | |
| if not isinstance(graph, dict): | |
| raise SystemExit(f"invalid graph schema in {path}: root must be an object") | |
| terminals = graph.get("terminals") | |
| if not isinstance(terminals, list): | |
| raise SystemExit(f"invalid graph schema in {path}: missing terminals list") | |
| if not terminals: | |
| raise SystemExit(f"graph has no terminals: {path}") | |
| return graph | |
| def terminal_id(terminal: dict[str, Any], index: int) -> str: | |
| for key in ("terminal_id", "id", "node_id"): | |
| value = terminal.get(key) | |
| if value is not None: | |
| return str(value) | |
| return str(index) | |
| def string_list(value: Any) -> list[str]: | |
| if not isinstance(value, list): | |
| return [] | |
| return [str(item) for item in value if str(item).strip()] | |
| def unique_keep_order(values: Iterable[str]) -> list[str]: | |
| seen: set[str] = set() | |
| result: list[str] = [] | |
| for value in values: | |
| cleaned = normalize_space(value) | |
| if not cleaned or cleaned in seen: | |
| continue | |
| seen.add(cleaned) | |
| result.append(cleaned) | |
| return result | |
| def normalize_space(value: str) -> str: | |
| return re.sub(r"\s+", " ", value).strip() | |
| def clean_candidate(value: str) -> str: | |
| value = normalize_space(value) | |
| value = value.strip("-_ .~/\\|") | |
| value = value.strip("[]()【】《》「」『』\"“”") | |
| return normalize_space(value.replace("_", " ")) | |
| def is_media_fragment(value: str) -> bool: | |
| text = clean_candidate(value) | |
| if not text: | |
| return False | |
| if TRAILING_HASH_RE.fullmatch(text): | |
| return True | |
| if RESOLUTION_RE.search(text) or MEDIA_WORD_RE.search(text) or LANG_CJK_RE.search(text): | |
| return True | |
| if len(text) <= 16 and re.fullmatch(r"[A-Fa-f0-9]{8,}(?:\s*rev)?", text): | |
| return True | |
| return False | |
| def split_suffix_fragments(suffix: str) -> tuple[list[str], list[str]]: | |
| episode_titles: list[str] = [] | |
| media: list[str] = [] | |
| for match in QUOTED_RE.finditer(suffix): | |
| candidate = clean_candidate(match.group(1)) | |
| if candidate and not is_media_fragment(candidate): | |
| episode_titles.append(candidate) | |
| remainder = suffix | |
| for segment in BRACKET_SEGMENT_RE.findall(suffix): | |
| cleaned = clean_candidate(segment) | |
| if is_media_fragment(cleaned): | |
| media.append(segment.strip()) | |
| remainder = remainder.replace(segment, " ", 1) | |
| for match in PATH_EPISODE_TITLE_RE.finditer(suffix): | |
| candidate = clean_candidate(match.group("title")) | |
| if candidate and not is_media_fragment(candidate): | |
| episode_titles.append(candidate) | |
| for piece in re.split(r"[/\\]", remainder): | |
| cleaned = clean_candidate(piece) | |
| if not cleaned: | |
| continue | |
| if is_media_fragment(cleaned): | |
| media.append(cleaned) | |
| elif QUOTED_RE.search(piece): | |
| continue | |
| elif looks_like_plain_episode_title(cleaned): | |
| episode_titles.append(cleaned) | |
| return unique_keep_order(episode_titles), unique_keep_order(media) | |
| def looks_like_plain_episode_title(value: str) -> bool: | |
| if len(value) < 3 or is_media_fragment(value): | |
| return False | |
| if re.fullmatch(r"(?i)(?:part|ova|special|season|stage|act)\s*\d+", value): | |
| return False | |
| if re.fullmatch(r"[\d\s._-]+", value): | |
| return False | |
| return bool(re.search(r"[A-Za-z\u3040-\u30ff\u3400-\u9fff]", value)) | |
| def heuristic_patch(terminal: dict[str, Any], index: int) -> dict[str, Any]: | |
| suffix_examples = string_list(terminal.get("suffix_examples")) | |
| value_examples = string_list(terminal.get("value_examples")) | |
| episode_titles: list[str] = [] | |
| media_suffixes: list[str] = [] | |
| for suffix in suffix_examples: | |
| title_bits, media_bits = split_suffix_fragments(suffix) | |
| episode_titles.extend(title_bits) | |
| media_suffixes.extend(media_bits) | |
| if not episode_titles: | |
| for value in value_examples: | |
| for match in PATH_EPISODE_TITLE_RE.finditer(value): | |
| candidate = clean_candidate(match.group("title")) | |
| if candidate and not is_media_fragment(candidate): | |
| episode_titles.append(candidate) | |
| episode_titles = unique_keep_order(episode_titles) | |
| media_suffixes = unique_keep_order(media_suffixes) | |
| title_candidates = unique_keep_order(clean_candidate(item) for item in episode_titles) | |
| needs_review = needs_llm_review(terminal, episode_titles, media_suffixes) | |
| notes = summarize_notes(suffix_examples, episode_titles, media_suffixes, needs_review) | |
| return { | |
| "terminal_id": terminal_id(terminal, index), | |
| "needs_llm_review": needs_review, | |
| "episode_title_suffixes": episode_titles, | |
| "media_suffixes": media_suffixes, | |
| "title_candidates": title_candidates, | |
| "llm_label": None, | |
| "notes": notes, | |
| "source": SOURCE, | |
| } | |
| def needs_llm_review( | |
| terminal: dict[str, Any], | |
| episode_titles: list[str], | |
| media_suffixes: list[str], | |
| ) -> bool: | |
| suffix_examples = string_list(terminal.get("suffix_examples")) | |
| if not suffix_examples: | |
| return False | |
| classified = len(episode_titles) + len(media_suffixes) | |
| if episode_titles and media_suffixes: | |
| return True | |
| if classified == 0: | |
| return True | |
| suffix_text = " ".join(suffix_examples) | |
| if "/" in suffix_text or "\\" in suffix_text: | |
| return True | |
| return False | |
| def summarize_notes( | |
| suffix_examples: list[str], | |
| episode_titles: list[str], | |
| media_suffixes: list[str], | |
| needs_review: bool, | |
| ) -> str: | |
| parts = [ | |
| f"suffix_examples={len(suffix_examples)}", | |
| f"episode_title_suffixes={len(episode_titles)}", | |
| f"media_suffixes={len(media_suffixes)}", | |
| ] | |
| if needs_review: | |
| parts.append("ambiguous_suffix_layer") | |
| return "; ".join(parts) | |
| def selected_terminals(graph: dict[str, Any], args: Args) -> list[tuple[int, dict[str, Any], dict[str, Any]]]: | |
| selected: list[tuple[int, dict[str, Any], dict[str, Any]]] = [] | |
| for index, terminal in enumerate(graph["terminals"]): | |
| if not isinstance(terminal, dict): | |
| continue | |
| weight = int(terminal.get("weight") or terminal.get("count") or 0) | |
| if args.min_weight is not None and weight < args.min_weight: | |
| continue | |
| patch = heuristic_patch(terminal, index) | |
| if args.only_needs_review and not patch["needs_llm_review"]: | |
| continue | |
| selected.append((index, terminal, patch)) | |
| if args.limit is not None and len(selected) >= args.limit: | |
| break | |
| return selected | |
| def responses_url(base_url: str) -> str: | |
| return base_url.rstrip("/") + "/responses" | |
| def extract_response_text(data: dict[str, Any]) -> str: | |
| output_text = data.get("output_text") | |
| if isinstance(output_text, str) and output_text.strip(): | |
| return output_text | |
| chunks: list[str] = [] | |
| for item in data.get("output") or []: | |
| if not isinstance(item, dict): | |
| continue | |
| for content in item.get("content") or []: | |
| if not isinstance(content, dict): | |
| continue | |
| text = content.get("text") | |
| if isinstance(text, str): | |
| chunks.append(text) | |
| return "\n".join(chunks).strip() | |
| def call_llm(terminal: dict[str, Any], patch: dict[str, Any], args: Args) -> dict[str, Any] | None: | |
| if not args.api_key: | |
| raise RuntimeError("--llm requires --api-key or ANIFILEBERT_LLM_API_KEY") | |
| instructions = ( | |
| "You annotate anime filename suffix examples. Return strict JSON only with keys " | |
| "episode_title_suffixes, media_suffixes, title_candidates, llm_label, notes. " | |
| "Classify quoted human episode titles separately from media tags such as resolution, " | |
| "codec, source, language, subtitle markers, hashes, and release metadata." | |
| ) | |
| payload = { | |
| "model": args.model, | |
| "instructions": instructions, | |
| "input": json.dumps( | |
| { | |
| "terminal_id": patch["terminal_id"], | |
| "prefix": terminal.get("prefix"), | |
| "digit_skeleton": terminal.get("digit_skeleton"), | |
| "suffix_examples": string_list(terminal.get("suffix_examples")), | |
| "value_examples": string_list(terminal.get("value_examples")), | |
| "heuristic_patch": patch, | |
| }, | |
| ensure_ascii=False, | |
| ), | |
| } | |
| request = urllib.request.Request( | |
| responses_url(args.base_url), | |
| data=json.dumps(payload, ensure_ascii=False).encode("utf-8"), | |
| headers={ | |
| "Authorization": f"Bearer {args.api_key}", | |
| "Content-Type": "application/json", | |
| }, | |
| method="POST", | |
| ) | |
| try: | |
| with urllib.request.urlopen(request, timeout=args.http_timeout) as response: | |
| raw = response.read().decode("utf-8") | |
| except urllib.error.HTTPError as exc: | |
| body = exc.read().decode("utf-8", errors="replace") | |
| raise RuntimeError(f"Responses API HTTP {exc.code}: {body[:500]}") from exc | |
| except urllib.error.URLError as exc: | |
| raise RuntimeError(f"Responses API request failed: {exc}") from exc | |
| try: | |
| data = json.loads(raw) | |
| text = extract_response_text(data) | |
| annotation = json.loads(strip_json_fence(text)) | |
| except (json.JSONDecodeError, TypeError) as exc: | |
| raise RuntimeError(f"Responses API returned non-JSON annotation: {raw[:500]}") from exc | |
| if not isinstance(annotation, dict): | |
| raise RuntimeError("Responses API annotation must be a JSON object") | |
| merged = dict(patch) | |
| for key in ("episode_title_suffixes", "media_suffixes", "title_candidates"): | |
| if key in annotation: | |
| merged[key] = unique_keep_order(str(item) for item in annotation.get(key) or []) | |
| if "llm_label" in annotation: | |
| merged["llm_label"] = annotation["llm_label"] | |
| if "notes" in annotation: | |
| merged["notes"] = str(annotation["notes"]) | |
| merged["source"] = LLM_SOURCE | |
| return merged | |
| def strip_json_fence(text: str) -> str: | |
| text = text.strip() | |
| text = re.sub(r"^```(?:json)?\s*", "", text) | |
| text = re.sub(r"\s*```$", "", text) | |
| return text.strip() | |
| PREFIX_BOUNDARY_CHARS = set(" \t\r\n-_.~/\\|::[]()【】《》「」『』\"'") | |
| def prefix_boundary_ok(value: str, prefix: str) -> bool: | |
| if not prefix or not value.startswith(prefix): | |
| return False | |
| if len(value) == len(prefix): | |
| return True | |
| next_char = value[len(prefix)] | |
| last_char = prefix[-1] | |
| return next_char in PREFIX_BOUNDARY_CHARS or last_char in PREFIX_BOUNDARY_CHARS | |
| class PrefixTrieNode: | |
| __slots__ = ("children", "terminal_ordinals") | |
| def __init__(self) -> None: | |
| self.children: dict[str, PrefixTrieNode] = {} | |
| self.terminal_ordinals: list[int] = [] | |
| def build_prefix_trie(selected: list[tuple[int, dict[str, Any], dict[str, Any]]]) -> PrefixTrieNode: | |
| root = PrefixTrieNode() | |
| for ordinal, (_index, terminal, _patch) in enumerate(selected): | |
| prefix = str(terminal.get("prefix") or "") | |
| if not prefix: | |
| continue | |
| node = root | |
| for char in prefix: | |
| node = node.children.setdefault(char, PrefixTrieNode()) | |
| node.terminal_ordinals.append(ordinal) | |
| return root | |
| def matching_terminal_ordinal(value: str, trie: PrefixTrieNode, selected: list[tuple[int, dict[str, Any], dict[str, Any]]]) -> int | None: | |
| node = trie | |
| best: int | None = None | |
| for char in value: | |
| node = node.children.get(char) | |
| if node is None: | |
| break | |
| for ordinal in node.terminal_ordinals: | |
| prefix = str(selected[ordinal][1].get("prefix") or "") | |
| if prefix_boundary_ok(value, prefix): | |
| best = ordinal | |
| return best | |
| def source_list_matches( | |
| source_list: Path, | |
| selected: list[tuple[int, dict[str, Any], dict[str, Any]]], | |
| ) -> dict[int, list[tuple[int, str]]]: | |
| if not source_list.exists(): | |
| raise SystemExit(f"source list not found: {source_list}") | |
| trie = build_prefix_trie(selected) | |
| matches: dict[int, list[tuple[int, str]]] = {ordinal: [] for ordinal in range(len(selected))} | |
| with source_list.open("r", encoding="utf-8") as handle: | |
| for line_number, line in enumerate(handle, start=1): | |
| if not line.strip(): | |
| continue | |
| try: | |
| row = json.loads(line) | |
| except json.JSONDecodeError as exc: | |
| raise SystemExit(f"invalid JSON in {source_list}:{line_number}: {exc}") from exc | |
| if not isinstance(row, dict): | |
| continue | |
| value = row.get("value") | |
| if not isinstance(value, str) or not value.strip(): | |
| continue | |
| ordinal = matching_terminal_ordinal(value, trie, selected) | |
| if ordinal is not None: | |
| matches[ordinal].append((line_number, value)) | |
| return matches | |
| def dataset_records( | |
| terminal: dict[str, Any], | |
| index: int, | |
| patch: dict[str, Any], | |
| tokenizer: AnimeTokenizer, | |
| *, | |
| filenames: Iterable[tuple[int, str]] | None = None, | |
| preserve_i_labels: bool = False, | |
| ) -> list[dict[str, Any]]: | |
| records: list[dict[str, Any]] = [] | |
| seen: set[str] = set() | |
| if filenames is None: | |
| filenames = enumerate(string_list(terminal.get("value_examples"))) | |
| for source_index, filename in filenames: | |
| if filename in seen: | |
| continue | |
| seen.add(filename) | |
| sample = weak_label_filename(filename, tokenizer) | |
| if sample is None: | |
| continue | |
| tokens, labels = normalize_generated_tokens( | |
| sample["tokens"], | |
| sample["labels"], | |
| preserve_i_labels=preserve_i_labels, | |
| ) | |
| records.append( | |
| { | |
| "file_id": f"prefix-graph:{patch['terminal_id']}:{source_index}", | |
| "filename": filename, | |
| "tokens": tokens, | |
| "labels": labels, | |
| "terminal_id": patch["terminal_id"], | |
| "terminal_index": index, | |
| "source": patch["source"], | |
| "needs_llm_review": patch["needs_llm_review"], | |
| "episode_title_suffixes": patch["episode_title_suffixes"], | |
| "media_suffixes": patch["media_suffixes"], | |
| "title_candidates": patch["title_candidates"], | |
| "annotations": { | |
| "terminal_id": patch["terminal_id"], | |
| "terminal_index": index, | |
| "source": patch["source"], | |
| "needs_llm_review": patch["needs_llm_review"], | |
| "episode_title_suffixes": patch["episode_title_suffixes"], | |
| "media_suffixes": patch["media_suffixes"], | |
| "title_candidates": patch["title_candidates"], | |
| "llm_label": patch["llm_label"], | |
| "notes": patch["notes"], | |
| }, | |
| } | |
| ) | |
| return records | |
| def is_standalone_separator(token: str) -> bool: | |
| return len(token) == 1 and (token.isspace() or not token.isalnum()) | |
| def split_generated_token(token: str) -> list[str]: | |
| pieces: list[str] = [] | |
| current: list[str] = [] | |
| for char in token: | |
| if char.isspace() or not char.isalnum(): | |
| if current: | |
| pieces.append("".join(current)) | |
| current.clear() | |
| pieces.append(char) | |
| else: | |
| current.append(char) | |
| if current: | |
| pieces.append("".join(current)) | |
| return pieces | |
| def b_only_label(label: str) -> str: | |
| if label.startswith(("B-", "I-")): | |
| return "B-" + label.split("-", 1)[1] | |
| return "O" if label == "O" else str(label) | |
| def normalize_generated_tokens( | |
| tokens: list[str], | |
| labels: list[str], | |
| *, | |
| preserve_i_labels: bool = False, | |
| ) -> tuple[list[str], list[str]]: | |
| normalized_tokens: list[str] = [] | |
| normalized_labels: list[str] = [] | |
| for token, label in zip(tokens, labels): | |
| source_label = str(label) | |
| entity_label = source_label if preserve_i_labels else b_only_label(source_label) | |
| for piece in split_generated_token(str(token)): | |
| normalized_tokens.append(piece) | |
| if entity_label == "O" or is_standalone_separator(piece): | |
| normalized_labels.append("O") | |
| else: | |
| normalized_labels.append(entity_label) | |
| return normalized_tokens, normalized_labels | |
| def write_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> int: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| count = 0 | |
| with path.open("w", encoding="utf-8", newline="\n") as handle: | |
| for row in rows: | |
| handle.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n") | |
| count += 1 | |
| return count | |
| def merge_annotations(graph: dict[str, Any], patches_by_id: dict[str, dict[str, Any]]) -> dict[str, Any]: | |
| merged = json.loads(json.dumps(graph, ensure_ascii=False)) | |
| for index, terminal in enumerate(merged.get("terminals") or []): | |
| if not isinstance(terminal, dict): | |
| continue | |
| patch = patches_by_id.get(terminal_id(terminal, index)) | |
| if patch is None: | |
| continue | |
| terminal["annotations"] = { | |
| "episode_title_suffixes": patch["episode_title_suffixes"], | |
| "media_suffixes": patch["media_suffixes"], | |
| "title_candidates": patch["title_candidates"], | |
| "needs_llm_review": patch["needs_llm_review"], | |
| "llm_label": patch["llm_label"], | |
| "notes": patch["notes"], | |
| "source": patch["source"], | |
| "annotated_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| return merged | |
| def main() -> None: | |
| args = parse_args() | |
| graph = load_graph(args.graph) | |
| selected = selected_terminals(graph, args) | |
| if not selected: | |
| raise SystemExit("no terminals selected; adjust --limit/--min-weight/--only-needs-review") | |
| tokenizer = AnimeTokenizer() | |
| llm_requests = 0 | |
| patches: list[dict[str, Any]] = [] | |
| records: list[dict[str, Any]] = [] | |
| source_matches = None if args.examples_only else source_list_matches(args.source_list, selected) | |
| for ordinal, (index, terminal, patch) in enumerate(selected): | |
| if args.llm and patch["needs_llm_review"]: | |
| if args.max_requests is None or llm_requests < args.max_requests: | |
| try: | |
| llm_patch = call_llm(terminal, patch, args) | |
| if llm_patch is not None: | |
| patch = llm_patch | |
| llm_requests += 1 | |
| except RuntimeError as exc: | |
| print(f"warning: terminal {patch['terminal_id']}: {exc}; using heuristic patch", file=sys.stderr) | |
| patch["notes"] = f"{patch['notes']}; llm_error={exc}" | |
| patches.append(patch) | |
| records.extend( | |
| dataset_records( | |
| terminal, | |
| index, | |
| patch, | |
| tokenizer, | |
| filenames=None if args.examples_only else source_matches.get(ordinal, []), | |
| preserve_i_labels=args.preserve_i_labels, | |
| ) | |
| ) | |
| record_count = write_jsonl(args.output, records) | |
| patch_count = 0 | |
| if args.patch_output is not None: | |
| patch_count = write_jsonl(args.patch_output, patches) | |
| if args.merge_output is not None: | |
| args.merge_output.parent.mkdir(parents=True, exist_ok=True) | |
| merged = merge_annotations(graph, {patch["terminal_id"]: patch for patch in patches}) | |
| args.merge_output.write_text(json.dumps(merged, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") | |
| summary = { | |
| "graph": str(args.graph), | |
| "source_list": None if args.examples_only else str(args.source_list), | |
| "output": str(args.output), | |
| "patch_output": str(args.patch_output) if args.patch_output is not None else None, | |
| "merge_output": str(args.merge_output) if args.merge_output is not None else None, | |
| "selected_terminals": len(selected), | |
| "examples_only": args.examples_only, | |
| "dataset_records": record_count, | |
| "patches": patch_count, | |
| "llm_enabled": args.llm, | |
| "llm_requests": llm_requests, | |
| } | |
| print(json.dumps(summary, ensure_ascii=False, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |