#!/usr/bin/env python3 import os import json import re import asyncio import tempfile import subprocess from pathlib import Path from datetime import datetime from dotenv import load_dotenv from typing import List, Dict, Optional, Tuple from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from contextlib import asynccontextmanager import uvicorn try: from huggingface_hub import list_repo_files, hf_hub_download, upload_file import cv2 import numpy as np from PIL import Image, ImageDraw, ImageFont from faster_whisper import WhisperModel except ImportError as e: print(f"Missing dependency: {e}") print("Install with: pip install faster-whisper") exit(1) # Load environment variables load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: print("Error: Missing HF_TOKEN in .env") exit(1) @asynccontextmanager async def lifespan(app: FastAPI): """Load Whisper in background, then kick off video processing.""" loop = asyncio.get_event_loop() await loop.run_in_executor(None, _load_whisper_model) asyncio.create_task(scan_and_process_videos()) yield app = FastAPI(title="Video Processing Service", lifespan=lifespan) # Global state processing_state = { "is_running": False, "total_processed": 0, "current_file": None, "error_count": 0, "last_error": None, "processed_files": [], "whisper_ready": False } # Whisper model — loaded async at startup, not at import time whisper_model = None HF_DATASET_REPO = "factorstudios/movs" HOOKS_FOLDER = "hooks" READY_VIDEOS_FOLDER = "ready_videos" TRANSCRIPTION_FOLDER = "transcriptions" def _load_whisper_model(): """Blocking model load — runs in thread executor.""" global whisper_model print("Loading Whisper small model...") whisper_model = WhisperModel("small", device="auto", compute_type="int8") processing_state["whisper_ready"] = True print("✓ Whisper model loaded") def timestamp_to_seconds(timestamp: str) -> float: """Convert HH:MM:SS to seconds.""" try: parts = timestamp.split(":") return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2]) except Exception as e: print(f"Error converting timestamp {timestamp}: {e}") return 0.0 def extract_audio_segment(video_path: str, start_seconds: float, end_seconds: float, output_wav: str) -> bool: """Extract audio segment from video as WAV for Whisper.""" cmd = [ "ffmpeg", "-y", "-ss", str(start_seconds), "-to", str(end_seconds), "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", output_wav ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f" ✗ FFmpeg audio extraction failed: {result.stderr}") return False if not os.path.exists(output_wav): print(f" ✗ Output WAV file not created: {output_wav}") return False print(f" ✓ Audio extracted successfully") return True def transcribe_segment(audio_path: str) -> List[Tuple[float, float, str]]: """ Transcribe audio with Whisper small. Returns list of (start_sec, end_sec, text) relative to segment start. """ print(" Transcribing audio with Whisper small...") segments, info = whisper_model.transcribe( audio_path, beam_size=5, language=None, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500) ) captions = [] for seg in segments: text = seg.text.strip() if text: captions.append((seg.start, seg.end, text)) print(f" [{seg.start:.1f}s → {seg.end:.1f}s] {text}") print(f" ✓ Transcribed {len(captions)} caption segments") return captions def apply_color_grading_wedding_retro(frame: np.ndarray) -> np.ndarray: """Apply cinematic wedding LUT + retro style with high sharpening.""" lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) l_channel, a_channel, b_channel = cv2.split(lab) a_channel = cv2.add(a_channel, 5) b_channel = cv2.add(b_channel, 8) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) l_channel = clahe.apply(l_channel) lab_enhanced = cv2.merge([l_channel, a_channel, b_channel]) frame = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR) hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV).astype(np.float32) hsv[:, :, 1] = np.clip(hsv[:, :, 1] * 1.3, 0, 255) frame = cv2.cvtColor(hsv.astype(np.uint8), cv2.COLOR_HSV2BGR) frame = cv2.convertScaleAbs(frame, alpha=1.15, beta=10) kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) / 1.2 sharpened = cv2.filter2D(frame, -1, kernel) frame = cv2.addWeighted(frame, 0.4, sharpened, 0.6, 0) rows, cols = frame.shape[:2] X_kernel = cv2.getGaussianKernel(cols, cols / 2) Y_kernel = cv2.getGaussianKernel(rows, rows / 2) mask = (Y_kernel * X_kernel.T) mask = (mask / mask.max()) ** 0.4 for i in range(3): frame[:, :, i] = frame[:, :, i] * mask return np.clip(frame, 0, 255).astype(np.uint8) def burn_captions_to_frame(frame: np.ndarray, text: str, font_size: int = 36) -> np.ndarray: """Burn caption text onto frame — shadow only, no background, positioned near bottom.""" height, width = frame.shape[:2] frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).convert('RGBA') overlay = Image.new('RGBA', frame_pil.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(overlay) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) except Exception: font = ImageFont.load_default() max_width = width - 80 wrapped_lines = [] words = text.split() current_line = [] for word in words: test_line = ' '.join(current_line + [word]) bbox = draw.textbbox((0, 0), test_line, font=font) if bbox[2] - bbox[0] > max_width: if current_line: wrapped_lines.append(' '.join(current_line)) current_line = [word] else: current_line.append(word) if current_line: wrapped_lines.append(' '.join(current_line)) line_height = font_size + 12 total_text_height = len(wrapped_lines) * line_height y_start = int(height * 0.80) - total_text_height // 2 shadow_offset = 3 for i, line in enumerate(wrapped_lines): bbox = draw.textbbox((0, 0), line, font=font) line_width = bbox[2] - bbox[0] x = (width - line_width) // 2 y = y_start + i * line_height draw.text((x + shadow_offset, y + shadow_offset), line, font=font, fill=(0, 0, 0, 200)) draw.text((x, y), line, font=font, fill=(255, 255, 255, 255)) frame_pil = Image.alpha_composite(frame_pil, overlay).convert('RGB') return cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR) def build_frame_caption_map(captions: List[Tuple[float, float, str]], fps: float) -> Dict[int, str]: """Convert Whisper segments into a per-frame caption lookup.""" frame_map = {} for start_sec, end_sec, text in captions: start_frame = int(start_sec * fps) end_frame = int(end_sec * fps) for f in range(start_frame, end_frame + 1): frame_map[f] = text return frame_map def process_video_segment( video_path: str, output_path: str, start_time: str, end_time: str, target_width: int = 1080, target_height: int = 1350 ) -> bool: """ Full pipeline: 1. Extract audio segment → WAV 2. Transcribe with Whisper small 3. Process frames with color grading + caption burn-in 4. Mux processed video with original audio """ ffmpeg_video_proc = None cap = None # Declared here so finally block can always release it temp_wav = output_path.replace(".mp4", "_audio.wav") temp_video_path = output_path.replace(".mp4", "_noaudio.mp4") try: print(f"Opening video: {video_path}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Could not open video {video_path}") return False fps = cap.get(cv2.CAP_PROP_FPS) original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) start_seconds = timestamp_to_seconds(start_time) end_seconds = timestamp_to_seconds(end_time) duration = end_seconds - start_seconds print(f"Video info: {fps} fps, {original_width}x{original_height}") print(f"Extracting segment: {start_time} to {end_time} ({duration:.1f}s)") # ── Step 1: Extract audio → WAV ─────────────────────────────────────── print(" Extracting audio segment...") audio_ok = extract_audio_segment(video_path, start_seconds, end_seconds, temp_wav) # ── Step 2: Transcribe with Whisper ─────────────────────────────────── if audio_ok and whisper_model is not None: captions = transcribe_segment(temp_wav) else: if not audio_ok: print(" ✗ Skipping transcription: audio extraction failed") elif whisper_model is None: print(" ✗ Skipping transcription: Whisper model not ready") captions = [] frame_caption_map = build_frame_caption_map(captions, fps) # ── Step 3: Process frames → pipe to FFmpeg ─────────────────────────── ffmpeg_video_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{target_width}x{target_height}", "-pix_fmt", "bgr24", "-r", str(fps), "-i", "pipe:0", "-vcodec", "libx264", "-preset", "fast", "-crf", "23", "-pix_fmt", "yuv420p", temp_video_path ] ffmpeg_video_proc = subprocess.Popen( ffmpeg_video_cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) start_frame = int(start_seconds * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) current_caption = "" processed_frames = 0 target_frames = int(duration * fps) print(f"Processing {target_frames} frames...") while processed_frames < target_frames: ret, frame = cap.read() if not ret: print(f"Warning: Could not read frame at position {processed_frames}") break aspect_ratio = target_width / target_height if original_width / original_height > aspect_ratio: new_width = int(original_height * aspect_ratio) x_offset = (original_width - new_width) // 2 frame = frame[:, x_offset:x_offset + new_width] else: new_height = int(original_width / aspect_ratio) y_offset = (original_height - new_height) // 2 frame = frame[y_offset:y_offset + new_height, :] frame = cv2.resize(frame, (target_width, target_height), interpolation=cv2.INTER_LANCZOS4) frame = apply_color_grading_wedding_retro(frame) # Set caption for this frame (empty if none) current_caption = frame_caption_map.get(processed_frames, "") if current_caption: frame = burn_captions_to_frame(frame, current_caption) ffmpeg_video_proc.stdin.write(frame.tobytes()) processed_frames += 1 if processed_frames % max(1, target_frames // 10) == 0: progress = (processed_frames / target_frames) * 100 print(f"Progress: {progress:.1f}%") # Close stdin and wait for FFmpeg to finish encoding ffmpeg_video_proc.stdin.close() ffmpeg_video_proc.wait() if ffmpeg_video_proc.returncode != 0: print(f"✗ FFmpeg video encoding failed (code {ffmpeg_video_proc.returncode})") return False print("✓ Frames encoded, muxing audio...") # ── Step 4: Mux processed video + original audio ────────────────────── ffmpeg_mux_cmd = [ "ffmpeg", "-y", "-i", temp_video_path, "-ss", str(start_seconds), "-to", str(end_seconds), "-i", video_path, "-map", "0:v:0", "-map", "1:a:0", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-shortest", "-movflags", "+faststart", output_path ] mux_result = subprocess.run( ffmpeg_mux_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) if mux_result.returncode != 0: print(f"✗ FFmpeg audio mux failed (code {mux_result.returncode})") return False print(f"✓ Segment complete: {output_path}") return True except Exception as e: print(f"✗ Error processing video segment: {e}") if ffmpeg_video_proc is not None: try: ffmpeg_video_proc.stdin.close() except Exception: pass ffmpeg_video_proc.wait() return False finally: # Always release VideoCapture regardless of success or failure if cap is not None: cap.release() # Always clean up temp files for tmp in [temp_video_path, temp_wav]: if tmp and os.path.exists(tmp): try: os.remove(tmp) except Exception: pass async def process_movie_segments(movie_name: str) -> bool: """Process all segments for a movie.""" try: processing_state["current_file"] = movie_name print(f"\n{'='*80}") print(f"Processing movie: {movie_name}") print(f"{'='*80}") video_file = f"{movie_name}.mkv" print(f"Downloading video: {video_file}") try: video_path = hf_hub_download( repo_id=HF_DATASET_REPO, filename=video_file, repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/video_processor_cache" ) if os.path.islink(video_path): video_path = os.path.realpath(video_path) except Exception as e: print(f"Error: Could not download video: {e}") return False hooks_folder = f"{HOOKS_FOLDER}/{movie_name}" print(f"Listing segments from: {hooks_folder}") files = list_repo_files( repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN ) segment_files = sorted([ f for f in files if f.startswith(f"{hooks_folder}/") and f.endswith(".json") ]) if not segment_files: print(f"No segment JSON files found for {movie_name}") return False print(f"Found {len(segment_files)} segments: {segment_files}") temp_dir = tempfile.mkdtemp() try: for segment_file in segment_files: print(f"\n── Processing file: {segment_file}") try: segment_path = hf_hub_download( repo_id=HF_DATASET_REPO, filename=segment_file, repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/video_processor_cache" ) with open(segment_path, 'r', encoding='utf-8') as f: segment_data = json.load(f) segment_number = segment_data.get("segment_number", 1) start_time = segment_data.get("start_time", "00:00:00") end_time = segment_data.get("end_time", "00:10:00") print(f"Processing segment {segment_number}: {start_time} to {end_time}") output_filename = f"segment-{segment_number:02d}.mp4" output_path = os.path.join(temp_dir, output_filename) success = process_video_segment( video_path, output_path, start_time, end_time ) if not success: print(f"✗ Failed to process segment {segment_number}, continuing to next...") processing_state["error_count"] += 1 continue upload_path = f"{READY_VIDEOS_FOLDER}/{movie_name}/{output_filename}" print(f"Uploading to: {upload_path}") upload_file( path_or_fileobj=output_path, path_in_repo=upload_path, repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Add processed video segment {segment_number} for {movie_name}" ) print(f"✓ Segment {segment_number} uploaded successfully") # Clean up the output file after successful upload if os.path.exists(output_path): try: os.remove(output_path) except Exception: pass except Exception as e: print(f"✗ Error processing segment file {segment_file}: {e}") processing_state["error_count"] += 1 continue finally: import shutil shutil.rmtree(temp_dir, ignore_errors=True) processing_state["processed_files"].append(movie_name) processing_state["total_processed"] += 1 print(f"\n✓ Successfully processed all segments for {movie_name}") return True except Exception as e: processing_state["error_count"] += 1 processing_state["last_error"] = str(e) print(f"✗ Error in process_movie_segments: {e}") return False async def scan_and_process_videos(): """Scan hooks folder and process all movies.""" if processing_state["is_running"]: print("Video processing already running, skipping...") return startup_delay = int(os.getenv("STARTUP_DELAY", 5)) print(f"Waiting {startup_delay} seconds before starting video processing...") await asyncio.sleep(startup_delay) processing_state["is_running"] = True print("\n" + "="*80) print("STARTING VIDEO PROCESSING SERVICE") print("="*80) try: files = list_repo_files( repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN ) movie_folders = set() for f in files: if f.startswith(f"{HOOKS_FOLDER}/") and f.endswith(".json"): parts = f.split("/") if len(parts) >= 2: movie_folders.add(parts[1]) print(f"Found {len(movie_folders)} movies to process: {sorted(movie_folders)}") for movie_name in sorted(movie_folders): await process_movie_segments(movie_name) await asyncio.sleep(2) print("\n" + "="*80) print("VIDEO PROCESSING COMPLETE") print(f"Processed: {processing_state['total_processed']}") print(f"Errors: {processing_state['error_count']}") print("="*80 + "\n") except Exception as e: print(f"Critical error in scan_and_process_videos: {e}") processing_state["last_error"] = str(e) finally: processing_state["is_running"] = False processing_state["current_file"] = None @app.get("/") async def health(): return JSONResponse({ "status": "running", "service": "Video Processing Service", "whisper_ready": processing_state["whisper_ready"], "is_processing": processing_state["is_running"], "total_processed": processing_state["total_processed"], "error_count": processing_state["error_count"], "current_file": processing_state["current_file"], "last_error": processing_state["last_error"], "processed_files": processing_state["processed_files"] }) @app.get("/status") async def get_status(): return JSONResponse({ "whisper_ready": processing_state["whisper_ready"], "is_running": processing_state["is_running"], "total_processed": processing_state["total_processed"], "error_count": processing_state["error_count"], "current_file": processing_state["current_file"], "last_error": processing_state["last_error"], "processed_files": processing_state["processed_files"] }) @app.post("/trigger-processing") async def trigger_processing(): if processing_state["is_running"]: return JSONResponse({ "status": "already_running", "message": "Video processing is already in progress" }) if not processing_state["whisper_ready"]: return JSONResponse({ "status": "not_ready", "message": "Whisper model is still loading, try again shortly" }) asyncio.create_task(scan_and_process_videos()) return JSONResponse({ "status": "started", "message": "Video processing scan started" }) if __name__ == "__main__": print("Starting Video Processing Service on port 7860...") print("Whisper will load at startup, processing begins after startup delay") uvicorn.run(app, host="0.0.0.0", port=7860)