| | import json |
| | import re |
| | from pathlib import Path |
| | from typing import Optional, Tuple |
| | from config import OUT_ROOT, FRAME_INTERVAL_SEC, MAX_FRAMES, FRAMES_SUBDIR |
| | from video import prepare_dirs, extract_audio_ffmpeg, extract_frames |
| | from transcribe import transcribe |
| | from vision import caption_folder, dump_json |
| | from packager import write_text, write_manifest, make_zip |
| |
|
| | TITLE_PATTERNS = [ |
| | re.compile(r"\bIn\s+([A-Za-z0-9:'\-&\. ]+)\s*\((20\d{2}|19\d{2})\)", re.IGNORECASE), |
| | ] |
| |
|
| | def infer_title_year(run_dir: Path) -> Tuple[str, str]: |
| | """ |
| | Return (title, year) if explicitly detectable in explanations.json metadata or transcription text, else ('',''). |
| | Deterministic. No guessing. |
| | """ |
| | |
| | try: |
| | ej = json.loads((run_dir / "explanations.json").read_text(encoding="utf-8")) |
| | if isinstance(ej, dict) and "metadata" in ej and isinstance(ej["metadata"], dict): |
| | t = ej["metadata"].get("title") |
| | y = ej["metadata"].get("year") |
| | if t and y: |
| | return str(t).strip(), str(y).strip() |
| | except Exception: |
| | pass |
| |
|
| | |
| | try: |
| | txt = (run_dir / "transcription.txt").read_text(encoding="utf-8") |
| | m = TITLE_PATTERNS[0].search(txt) |
| | if m: |
| | return m.group(1).strip(), m.group(2) |
| | except Exception: |
| | pass |
| |
|
| | return "", "" |
| |
|
| | def process_video( |
| | video_file: Path, |
| | interval_sec: Optional[float] = None, |
| | max_frames: Optional[int] = None, |
| | movie_title: str = "", |
| | movie_year: str = "", |
| | ) -> Path: |
| | interval = interval_sec if interval_sec is not None else FRAME_INTERVAL_SEC |
| | cap_frames = max_frames if max_frames is not None else MAX_FRAMES |
| |
|
| | run_dir, frames_dir = prepare_dirs(OUT_ROOT, video_file) |
| |
|
| | |
| | n = extract_frames(video_file, frames_dir, interval, cap_frames) |
| |
|
| | |
| | wav_path = run_dir / "audio.wav" |
| | extract_audio_ffmpeg(video_file, wav_path) |
| | transcript = transcribe(wav_path) |
| | write_text(transcript, run_dir / "transcription.txt") |
| |
|
| | |
| | explanations = caption_folder(frames_dir) |
| | dump_json(explanations, run_dir / "explanations.json") |
| |
|
| | |
| | if not movie_title or not movie_year: |
| | it, iy = infer_title_year(run_dir) |
| | movie_title = movie_title or it |
| | movie_year = movie_year or iy |
| |
|
| | manifest = { |
| | "video": str(video_file.name), |
| | "frames_dir": FRAMES_SUBDIR, |
| | "num_frames": n, |
| | "files": [ |
| | "transcription.txt", |
| | "explanations.json", |
| | "audio.wav", |
| | ], |
| | "interval_sec": float(interval), |
| | } |
| |
|
| | if movie_title: |
| | manifest["movie"] = movie_title |
| | if movie_year: |
| | manifest["year"] = movie_year |
| |
|
| | write_manifest(run_dir, manifest) |
| |
|
| | |
| | zip_path = make_zip(run_dir) |
| | return zip_path |
| |
|
| | if __name__ == "__main__": |
| | import argparse |
| | ap = argparse.ArgumentParser() |
| | ap.add_argument("--video", required=True, type=Path) |
| | ap.add_argument("--interval", type=float, default=FRAME_INTERVAL_SEC) |
| | ap.add_argument("--max_frames", type=int, default=MAX_FRAMES) |
| | ap.add_argument("--movie_title", type=str, default="") |
| | ap.add_argument("--movie_year", type=str, default="") |
| | args = ap.parse_args() |
| |
|
| | z = process_video( |
| | args.video, |
| | interval_sec=args.interval, |
| | max_frames=args.max_frames, |
| | movie_title=args.movie_title, |
| | movie_year=args.movie_year, |
| | ) |
| | print(z) |
| |
|