"""Generate path-prefix noisy char BIO rows from real DMHY samples. The generated rows look like: noise/noise/TITLE/Season 01/03 [1080P][WEB-DL].mkv Prefix directories are always labeled ``O``. The title directory, season directory, episode/special filename stem, and optional meta tags keep their BIO labels so the model learns to ignore library paths without relying on runtime path stripping. """ from __future__ import annotations import argparse import json import random from collections import Counter from datetime import datetime, timezone from pathlib import Path from statistics import mean from typing import Iterable, Optional ENTITY_NAMES = { "TITLE", "SEASON", "EPISODE", "SPECIAL", "RESOLUTION", "SOURCE", "GROUP", } PREFIX_COMPONENTS = { "windows": [ ("O:", "115open", "Anime"), ("D:", "Media", "Anime"), ("E:", "Downloads", "Bangumi"), ("Z:", "Library", "Anime"), ("C:", "Archive", "completed"), ], "unix": [ ("", "mnt", "media", "anime"), ("", "volume1", "anime"), ("home", "media", "Bangumi"), ("library", "anime"), ("srv", "downloads", "anime"), ], } EXTRA_NOISE_DIRS = [ "整理中", "completed", "old", "temp", "115", "Bangumi", "Library", "_archive", "2024", "misc", ] EXTENSIONS = [".mkv", ".mp4", ".avi"] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--input", required=True, help="Authoritative char JSONL input") parser.add_argument("--output", required=True, help="Generated char JSONL output") parser.add_argument("--manifest-output", default=None, help="Manifest JSON path") parser.add_argument("--samples-per-source", type=int, default=2) parser.add_argument("--max-length", type=int, default=128) parser.add_argument("--limit", type=int, default=None, help="Read at most N source rows") parser.add_argument("--max-rows", type=int, default=None, help="Write at most N rows") parser.add_argument("--seed", type=int, default=105) parser.add_argument("--source", default="path_prefix_noise") parser.add_argument("--path-styles", default="windows,unix") parser.add_argument("--group-prefix-prob", type=float, default=0.70) parser.add_argument("--basename-title-prob", type=float, default=0.85) parser.add_argument("--require-group", action="store_true") parser.add_argument("--max-group-length", type=int, default=None) parser.add_argument("--progress", type=int, default=50_000) return parser.parse_args() def iter_jsonl(path: Path) -> Iterable[dict]: with path.open("r", encoding="utf-8") as handle: for line_no, line in enumerate(handle, 1): line = line.strip() if not line: continue try: yield json.loads(line) except json.JSONDecodeError as exc: raise ValueError(f"{path}:{line_no}: invalid JSON") from exc def extract_entities(tokens: list[str], labels: list[str]) -> dict[str, list[str]]: entities: dict[str, list[str]] = {name: [] for name in ENTITY_NAMES} active_entity: Optional[str] = None active_tokens: list[str] = [] def flush() -> None: nonlocal active_entity, active_tokens if active_entity and active_tokens: entities.setdefault(active_entity, []).append("".join(active_tokens).strip()) active_entity = None active_tokens = [] for token, label in zip(tokens, labels): label = str(label) token = str(token) if label.startswith("B-"): flush() active_entity = label.split("-", 1)[1] active_tokens = [token] elif label.startswith("I-"): entity = label.split("-", 1)[1] if active_entity == entity: active_tokens.append(token) else: flush() active_entity = entity active_tokens = [token] else: flush() flush() return { entity: [value for value in values if value] for entity, values in entities.items() if values } def choose_entity(entities: dict[str, list[str]], name: str, rng: random.Random) -> Optional[str]: values = [value.strip() for value in entities.get(name, []) if value.strip()] if not values: return None return rng.choice(values) def choose_group( entities: dict[str, list[str]], rng: random.Random, max_group_length: Optional[int], ) -> Optional[str]: values = [value.strip() for value in entities.get("GROUP", []) if value.strip()] if max_group_length is not None: values = [value for value in values if len(value) <= max_group_length] if not values: return None return rng.choice(values) def first_ascii_number(value: str) -> Optional[int]: current = [] for ch in value: if ch.isascii() and ch.isdigit(): current.append(ch) elif current: break if not current: return None return int("".join(current)) def season_text(value: Optional[str], rng: random.Random) -> str: if value: number = first_ascii_number(value) variants = [value.strip()] if number is not None: variants.extend([f"Season {number}", f"Season {number:02}", f"S{number:02}", f"第{number}季"]) return rng.choice(variants) number = rng.choice([1, 1, 1, 2]) return rng.choice([f"Season {number}", f"Season {number:02}", f"S{number:02}", f"第{number}季"]) def episode_text(value: str, rng: random.Random) -> str: number = first_ascii_number(value) variants = [value.strip()] if number is not None: variants.extend([f"{number:02}", f"E{number:02}", f"EP{number:02}"]) return rng.choice(variants) def special_text(value: str, rng: random.Random) -> str: number = first_ascii_number(value) variants = [value.strip()] if number is not None: variants.extend([f"SP{number:02}", f"Special {number:02}"]) return rng.choice(variants) def prefix_components(style: str, rng: random.Random) -> list[list[tuple[str, Optional[str]]]]: templates = PREFIX_COMPONENTS[style] selected = list(rng.choice(templates)) extra_count = rng.randint(0, 2) insert_at = max(1, len(selected) - 1) for _ in range(extra_count): selected.insert(insert_at, rng.choice(EXTRA_NOISE_DIRS)) insert_at += 1 return [[(component, None)] for component in selected] def append_meta( pieces: list[tuple[str, Optional[str]]], entities: dict[str, list[str]], rng: random.Random, ) -> None: resolution = choose_entity(entities, "RESOLUTION", rng) if resolution and rng.random() < 0.85: pieces.extend([(" [", None), (resolution, "RESOLUTION"), ("]", None)]) source_values = list(entities.get("SOURCE", [])) rng.shuffle(source_values) for source in source_values[: 2 if rng.random() < 0.35 else 1]: if source and rng.random() < 0.75: pieces.extend([("[", None), (source.strip(), "SOURCE"), ("]", None)]) def build_path_row( record: dict, source: str, rng: random.Random, styles: list[str], max_length: int, group_prefix_prob: float, basename_title_prob: float, require_group: bool, max_group_length: Optional[int], ) -> Optional[dict]: tokens = [str(token) for token in record.get("tokens", [])] labels = [str(label) for label in record.get("labels", [])] if len(tokens) != len(labels): return None entities = extract_entities(tokens, labels) title = choose_entity(entities, "TITLE", rng) if not title: return None group = choose_group(entities, rng, max_group_length) if require_group and not group: return None episode = choose_entity(entities, "EPISODE", rng) special = choose_entity(entities, "SPECIAL", rng) if not episode and not special: return None style = rng.choice(styles) separator = "\\" if style == "windows" else "/" components = prefix_components(style, rng) components.append([(title, "TITLE")]) components.append([(season_text(choose_entity(entities, "SEASON", rng), rng), "SEASON")]) endpoint_pieces: list[tuple[str, Optional[str]]] = [] if group and rng.random() < group_prefix_prob: endpoint_pieces.extend([("[", None), (group, "GROUP"), ("] ", None)]) if rng.random() < basename_title_prob: endpoint_pieces.extend([(title, None), (" - ", None)]) if episode and (not special or rng.random() >= 0.18): endpoint_pieces.append((episode_text(episode, rng), "EPISODE")) else: endpoint_pieces.append((special_text(str(special), rng), "SPECIAL")) append_meta(endpoint_pieces, entities, rng) endpoint_pieces.append((rng.choice(EXTENSIONS), None)) components.append(endpoint_pieces) text_parts: list[str] = [] char_labels: list[str] = [] first_component = True for component in components: if not first_component: text_parts.append(separator) char_labels.append("O") first_component = False for text, entity in component: if not text: continue text_parts.append(text) if entity is None: char_labels.extend(["O"] * len(text)) continue char_labels.append(f"B-{entity}") char_labels.extend([f"I-{entity}"] * (len(text) - 1)) filename = "".join(text_parts) if len(filename) + 2 > max_length: return None char_tokens = list(filename) if len(char_tokens) != len(char_labels): raise ValueError(f"token/label mismatch for generated path: {filename}") return { "filename": filename, "tokens": char_tokens, "labels": char_labels, "tokenizer_variant": "char", "source": source, "base_filename": record.get("filename"), "char_token_count": len(char_tokens), } def percentile(values: list[int], pct: float) -> int: if not values: return 0 ordered = sorted(values) index = min(len(ordered) - 1, round((pct / 100) * (len(ordered) - 1))) return ordered[index] def main() -> None: args = parse_args() if args.samples_per_source < 0: raise ValueError("--samples-per-source must be non-negative") if not 0.0 <= args.group_prefix_prob <= 1.0: raise ValueError("--group-prefix-prob must be between 0 and 1") if not 0.0 <= args.basename_title_prob <= 1.0: raise ValueError("--basename-title-prob must be between 0 and 1") if args.max_group_length is not None and args.max_group_length < 1: raise ValueError("--max-group-length must be positive") styles = [style.strip().lower() for style in args.path_styles.split(",") if style.strip()] unknown_styles = [style for style in styles if style not in PREFIX_COMPONENTS] if unknown_styles: raise ValueError(f"Unsupported path styles: {unknown_styles}") if not styles: raise ValueError("--path-styles must include at least one style") input_path = Path(args.input) output_path = Path(args.output) manifest_path = Path(args.manifest_output) if args.manifest_output else output_path.with_suffix(".manifest.json") output_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.parent.mkdir(parents=True, exist_ok=True) rng = random.Random(args.seed) source_rows = 0 eligible_rows = 0 written_rows = 0 skipped_too_long = 0 label_counts: Counter[str] = Counter() char_counter: Counter[str] = Counter() lengths: list[int] = [] examples: list[dict] = [] with output_path.open("w", encoding="utf-8", newline="\n") as out: for record in iter_jsonl(input_path): source_rows += 1 if args.limit is not None and source_rows > args.limit: break per_source_written = 0 per_source_attempts = 0 while per_source_written < args.samples_per_source and per_source_attempts < args.samples_per_source * 8 + 8: per_source_attempts += 1 row = build_path_row( record, args.source, rng, styles, args.max_length, args.group_prefix_prob, args.basename_title_prob, args.require_group, args.max_group_length, ) if row is None: skipped_too_long += 1 continue if per_source_written == 0: eligible_rows += 1 out.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n") written_rows += 1 per_source_written += 1 length = int(row["char_token_count"]) lengths.append(length) char_counter.update(row["tokens"]) label_counts.update(row["labels"]) if len(examples) < 5: examples.append(row) if args.max_rows is not None and written_rows >= args.max_rows: break if args.max_rows is not None and written_rows >= args.max_rows: break if args.progress and source_rows % args.progress == 0: print(f"processed {source_rows:,} rows; wrote {written_rows:,} path rows") manifest = { "created_at": datetime.now(timezone.utc).isoformat(), "input": str(input_path), "output": str(output_path), "source": args.source, "seed": args.seed, "samples_per_source": args.samples_per_source, "max_length": args.max_length, "path_styles": styles, "group_prefix_prob": args.group_prefix_prob, "basename_title_prob": args.basename_title_prob, "require_group": args.require_group, "max_group_length": args.max_group_length, "source_rows": source_rows if args.limit is None else min(source_rows, args.limit), "eligible_rows": eligible_rows, "written_rows": written_rows, "skipped_attempts": skipped_too_long, "unique_char_count": len(char_counter), "label_counts": dict(label_counts), "char_length": { "min": min(lengths) if lengths else 0, "mean": mean(lengths) if lengths else 0, "p50": percentile(lengths, 50), "p90": percentile(lengths, 90), "p95": percentile(lengths, 95), "p99": percentile(lengths, 99), "max": max(lengths) if lengths else 0, }, "examples": examples, } manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") print(json.dumps({k: v for k, v in manifest.items() if k != "examples"}, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()