File size: 3,637 Bytes
197c523 a4ed485 197c523 a4ed485 197c523 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 a4ed485 5857c35 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | import json
import re
from pathlib import Path
from typing import Optional, Tuple
from config import OUT_ROOT, FRAME_INTERVAL_SEC, MAX_FRAMES, FRAMES_SUBDIR
from video import prepare_dirs, extract_audio_ffmpeg, extract_frames
from transcribe import transcribe
from vision import caption_folder, dump_json
from packager import write_text, write_manifest, make_zip
TITLE_PATTERNS = [
re.compile(r"\bIn\s+([A-Za-z0-9:'\-&\. ]+)\s*\((20\d{2}|19\d{2})\)", re.IGNORECASE),
]
def infer_title_year(run_dir: Path) -> Tuple[str, str]:
"""
Return (title, year) if explicitly detectable in explanations.json metadata or transcription text, else ('','').
Deterministic. No guessing.
"""
# Try explanations.json → metadata.title/year
try:
ej = json.loads((run_dir / "explanations.json").read_text(encoding="utf-8"))
if isinstance(ej, dict) and "metadata" in ej and isinstance(ej["metadata"], dict):
t = ej["metadata"].get("title")
y = ej["metadata"].get("year")
if t and y:
return str(t).strip(), str(y).strip()
except Exception:
pass
# Try transcription.txt pattern
try:
txt = (run_dir / "transcription.txt").read_text(encoding="utf-8")
m = TITLE_PATTERNS[0].search(txt)
if m:
return m.group(1).strip(), m.group(2)
except Exception:
pass
return "", ""
def process_video(
video_file: Path,
interval_sec: Optional[float] = None,
max_frames: Optional[int] = None,
movie_title: str = "",
movie_year: str = "",
) -> Path:
interval = interval_sec if interval_sec is not None else FRAME_INTERVAL_SEC
cap_frames = max_frames if max_frames is not None else MAX_FRAMES
run_dir, frames_dir = prepare_dirs(OUT_ROOT, video_file)
# 1) Frames
n = extract_frames(video_file, frames_dir, interval, cap_frames)
# 2) Audio + ASR
wav_path = run_dir / "audio.wav"
extract_audio_ffmpeg(video_file, wav_path)
transcript = transcribe(wav_path)
write_text(transcript, run_dir / "transcription.txt")
# 3) Vision captions per frame
explanations = caption_folder(frames_dir)
dump_json(explanations, run_dir / "explanations.json")
# 4) Manifest with explicit movie metadata if provided or inferred deterministically
if not movie_title or not movie_year:
it, iy = infer_title_year(run_dir)
movie_title = movie_title or it
movie_year = movie_year or iy
manifest = {
"video": str(video_file.name),
"frames_dir": FRAMES_SUBDIR,
"num_frames": n,
"files": [
"transcription.txt",
"explanations.json",
"audio.wav",
],
"interval_sec": float(interval),
}
if movie_title:
manifest["movie"] = movie_title
if movie_year:
manifest["year"] = movie_year
write_manifest(run_dir, manifest)
# 5) Zip
zip_path = make_zip(run_dir)
return zip_path
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--video", required=True, type=Path)
ap.add_argument("--interval", type=float, default=FRAME_INTERVAL_SEC)
ap.add_argument("--max_frames", type=int, default=MAX_FRAMES)
ap.add_argument("--movie_title", type=str, default="")
ap.add_argument("--movie_year", type=str, default="")
args = ap.parse_args()
z = process_video(
args.video,
interval_sec=args.interval,
max_frames=args.max_frames,
movie_title=args.movie_title,
movie_year=args.movie_year,
)
print(z)
|