import re from typing import List TIME_RE = re.compile(r'^\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}$') def parse_srt(text: str) -> List[list]: blocks = [] for raw in text.strip().split("\n\n"): lines = raw.splitlines() if lines: blocks.append(lines) return blocks def blocks_to_srt(blocks: List[list]) -> str: return "\n\n".join("\n".join(b) for b in blocks) + "\n" def last_end_time_ms(blocks: List[list]) -> int | None: if not blocks: return None end = blocks[-1][1].split(" --> ")[1] hh, mm, ss_ms = end.split(":") ss, ms = ss_ms.split(",") return (int(hh)*3600 + int(mm)*60 + int(ss))*1000 + int(ms) def validate_srt_batch(in_blocks: List[list], out_blocks: List[list], prev_last_end: int | None = None): """ Returns (ok: bool, report_lines: list[str]) """ report, ok = [], True if len(in_blocks) != len(out_blocks): ok = False report.append(f"[STRUCT] Block count mismatch: in={len(in_blocks)} out={len(out_blocks)}") for i, (ib, ob) in enumerate(zip(in_blocks, out_blocks)): if len(ob) < 3: ok = False; report.append(f"[STRUCT] Output block too short @{i}") continue if ib[0].strip() != ob[0].strip(): ok = False; report.append(f"[INDEX] Changed @{i}: {ib[0]} → {ob[0]}") if ib[1].strip() != ob[1].strip(): ok = False; report.append(f"[TIMECODE] Changed @{i}: {ib[1]} ≠ {ob[1]}") if not TIME_RE.match(ob[1].strip()): ok = False; report.append(f"[TIMECODE] Invalid format @{i}: {ob[1]}") if len(ib) != len(ob): ok = False; report.append(f"[LINES] Line-count changed @{i}: in={len(ib)} out={len(ob)}") if prev_last_end and out_blocks: def to_ms(tc: str) -> int: hh, mm, ss_ms = tc.split(":"); ss, ms = ss_ms.split(",") return (int(hh)*3600 + int(mm)*60 + int(ss))*1000 + int(ms) if to_ms(out_blocks[0][1].split(" --> ")[0]) < prev_last_end: ok = False; report.append("[OVERLAP] First block overlaps previous batch end time.") return ok, report def split_batches(blocks: List[list], approx_blocks: int = 10): for i in range(0, len(blocks), approx_blocks): yield blocks[i:i+approx_blocks]