#!/usr/bin/env python3 import os import json import re import asyncio import tempfile import subprocess from pathlib import Path from datetime import datetime from dotenv import load_dotenv from typing import List, Dict, Optional from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse import uvicorn try: from huggingface_hub import list_repo_files, hf_hub_download, upload_file import cv2 import numpy as np from PIL import Image, ImageDraw, ImageFont except ImportError as e: print(f"Missing dependency: {e}") exit(1) # Load environment variables load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: print("Error: Missing HF_TOKEN in .env") exit(1) app = FastAPI(title="Video Processing Service") # Global state processing_state = { "is_running": False, "total_processed": 0, "current_file": None, "error_count": 0, "last_error": None, "processed_files": [] } HF_DATASET_REPO = "factorstudios/movs" HOOKS_FOLDER = "hooks" READY_VIDEOS_FOLDER = "ready_videos" TRANSCRIPTION_FOLDER = "transcriptions" def timestamp_to_seconds(timestamp: str) -> float: """Convert HH:MM:SS to seconds.""" try: parts = timestamp.split(":") hours = int(parts[0]) minutes = int(parts[1]) seconds = int(parts[2]) return hours * 3600 + minutes * 60 + seconds except Exception as e: print(f"Error converting timestamp {timestamp}: {e}") return 0.0 def extract_captions_for_segment(transcript_content: str, start_time: str, end_time: str) -> List[tuple]: """Extract captions from transcript that fall within segment timeframe. Returns list of (relative_seconds, text) tuples.""" captions = [] start_seconds = timestamp_to_seconds(start_time) end_seconds = timestamp_to_seconds(end_time) lines = transcript_content.strip().split('\n') for line in lines: match = re.match(r'\[(\d{2}):(\d{2}):(\d{2})\]\s+(.*)', line) if match: h, m, s, text = match.groups() line_seconds = int(h) * 3600 + int(m) * 60 + int(s) if start_seconds <= line_seconds <= end_seconds: relative_time = line_seconds - start_seconds captions.append((relative_time, text.strip())) return captions def apply_color_grading_wedding_retro(frame: np.ndarray) -> np.ndarray: """Apply cinematic wedding LUT + retro style with high sharpening.""" lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) l_channel, a_channel, b_channel = cv2.split(lab) # 1. VINTAGE/RETRO EFFECT: warm tones a_channel = cv2.add(a_channel, 5) b_channel = cv2.add(b_channel, 8) # 2. WEDDING LOOK: soft highlights via CLAHE clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) l_channel = clahe.apply(l_channel) lab_enhanced = cv2.merge([l_channel, a_channel, b_channel]) frame = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR) # 3. SATURATION BOOST hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV).astype(np.float32) hsv[:, :, 1] = np.clip(hsv[:, :, 1] * 1.3, 0, 255) frame = cv2.cvtColor(hsv.astype(np.uint8), cv2.COLOR_HSV2BGR) # 4. CONTRAST ENHANCEMENT frame = cv2.convertScaleAbs(frame, alpha=1.15, beta=10) # 5. HIGH SHARPENING kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) / 1.2 sharpened = cv2.filter2D(frame, -1, kernel) frame = cv2.addWeighted(frame, 0.4, sharpened, 0.6, 0) # 6. SLIGHT VIGNETTE rows, cols = frame.shape[:2] X_kernel = cv2.getGaussianKernel(cols, cols / 2) Y_kernel = cv2.getGaussianKernel(rows, rows / 2) mask = (Y_kernel * X_kernel.T) mask = (mask / mask.max()) ** 0.4 for i in range(3): frame[:, :, i] = frame[:, :, i] * mask return np.clip(frame, 0, 255).astype(np.uint8) def burn_captions_to_frame(frame: np.ndarray, text: str, font_size: int = 32) -> np.ndarray: """Burn caption text onto frame with semi-transparent background (centered).""" height, width = frame.shape[:2] frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(frame_pil, 'RGBA') try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) except Exception: font = ImageFont.load_default() # Word-wrap text max_width = width - 60 wrapped_lines = [] words = text.split() current_line = [] for word in words: test_line = ' '.join(current_line + [word]) bbox = draw.textbbox((0, 0), test_line, font=font) if bbox[2] - bbox[0] > max_width: if current_line: wrapped_lines.append(' '.join(current_line)) current_line = [word] else: current_line.append(word) if current_line: wrapped_lines.append(' '.join(current_line)) # Background box dimensions line_height = font_size + 10 text_height = len(wrapped_lines) * line_height + 20 bg_y_start = max(height // 2 - text_height // 2 - 10, 20) bg_y_end = min(bg_y_start + text_height, height - 20) overlay = Image.new('RGBA', frame_pil.size, (0, 0, 0, 0)) overlay_draw = ImageDraw.Draw(overlay, 'RGBA') overlay_draw.rectangle( [(20, bg_y_start), (width - 20, bg_y_end)], fill=(0, 0, 0, 180) ) frame_pil = Image.alpha_composite(frame_pil.convert('RGBA'), overlay).convert('RGB') draw = ImageDraw.Draw(frame_pil) y_position = bg_y_start + 10 for line in wrapped_lines: bbox = draw.textbbox((0, 0), line, font=font) line_width = bbox[2] - bbox[0] x_position = (width - line_width) // 2 draw.text((x_position, y_position), line, font=font, fill=(255, 255, 255, 255)) y_position += line_height return cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR) def process_video_segment( video_path: str, output_path: str, start_time: str, end_time: str, captions: List[tuple], target_width: int = 1080, target_height: int = 1350 ) -> bool: """Process video segment: crop, resize, color grade, burn captions, encode via FFmpeg.""" ffmpeg_proc = None try: print(f"Opening video: {video_path}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Could not open video {video_path}") return False fps = cap.get(cv2.CAP_PROP_FPS) original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) start_seconds = timestamp_to_seconds(start_time) end_seconds = timestamp_to_seconds(end_time) duration = end_seconds - start_seconds print(f"Video info: {fps} fps, {original_width}x{original_height}") print(f"Extracting segment: {start_time} to {end_time} ({duration:.1f}s)") # Pipe frames into FFmpeg — proper H.264 with real compression ffmpeg_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{target_width}x{target_height}", "-pix_fmt", "bgr24", "-r", str(fps), "-i", "pipe:0", "-vcodec", "libx264", "-preset", "fast", "-crf", "23", # 0=lossless, 51=worst; 23 is a solid default "-pix_fmt", "yuv420p", # broad playback compatibility "-movflags", "+faststart", output_path ] ffmpeg_proc = subprocess.Popen( ffmpeg_cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # Seek to start frame start_frame = int(start_seconds * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) # Build caption lookup: frame_number -> text caption_map = {} for rel_time, caption_text in captions: frame_num = int(rel_time * fps) caption_map[frame_num] = caption_text current_caption = "" processed_frames = 0 target_frames = int(duration * fps) print(f"Processing {target_frames} frames...") while processed_frames < target_frames: ret, frame = cap.read() if not ret: print(f"Warning: Could not read frame at position {processed_frames}") break # Crop to target aspect ratio aspect_ratio = target_width / target_height if original_width / original_height > aspect_ratio: new_width = int(original_height * aspect_ratio) x_offset = (original_width - new_width) // 2 frame = frame[:, x_offset:x_offset + new_width] else: new_height = int(original_width / aspect_ratio) y_offset = (original_height - new_height) // 2 frame = frame[y_offset:y_offset + new_height, :] frame = cv2.resize(frame, (target_width, target_height), interpolation=cv2.INTER_LANCZOS4) frame = apply_color_grading_wedding_retro(frame) if processed_frames in caption_map: current_caption = caption_map[processed_frames] if current_caption: frame = burn_captions_to_frame(frame, current_caption) ffmpeg_proc.stdin.write(frame.tobytes()) processed_frames += 1 if processed_frames % max(1, target_frames // 10) == 0: progress = (processed_frames / target_frames) * 100 print(f"Progress: {progress:.1f}%") ffmpeg_proc.stdin.close() ffmpeg_proc.wait() cap.release() if ffmpeg_proc.returncode != 0: print(f"✗ FFmpeg encoding failed with return code {ffmpeg_proc.returncode}") return False print(f"✓ Video segment saved: {output_path}") return True except Exception as e: print(f"✗ Error processing video segment: {e}") if ffmpeg_proc is not None: try: ffmpeg_proc.stdin.close() except Exception: pass ffmpeg_proc.wait() return False async def process_movie_segments(movie_name: str) -> bool: """Process all segments for a movie.""" try: processing_state["current_file"] = movie_name print(f"\n{'='*80}") print(f"Processing movie: {movie_name}") print(f"{'='*80}") # Download transcript transcript_file = f"{TRANSCRIPTION_FOLDER}/{movie_name}.transcript.txt" print(f"Downloading transcript: {transcript_file}") try: transcript_path = hf_hub_download( repo_id=HF_DATASET_REPO, filename=transcript_file, repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/video_processor_cache" ) with open(transcript_path, 'r', encoding='utf-8') as f: transcript_content = f.read() except Exception as e: print(f"Warning: Could not download transcript: {e}") transcript_content = "" # Download original video video_file = f"{movie_name}.mkv" print(f"Downloading video: {video_file}") try: video_path = hf_hub_download( repo_id=HF_DATASET_REPO, filename=video_file, repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/video_processor_cache" ) if os.path.islink(video_path): video_path = os.path.realpath(video_path) except Exception as e: print(f"Error: Could not download video: {e}") return False # List segment JSON files hooks_folder = f"{HOOKS_FOLDER}/{movie_name}" print(f"Listing segments from: {hooks_folder}") files = list_repo_files( repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN ) segment_files = sorted([ f for f in files if f.startswith(f"{hooks_folder}/") and f.endswith(".json") ]) if not segment_files: print(f"No segment JSON files found for {movie_name}") return False print(f"Found {len(segment_files)} segments") temp_dir = tempfile.mkdtemp() try: for segment_file in segment_files: try: segment_path = hf_hub_download( repo_id=HF_DATASET_REPO, filename=segment_file, repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/video_processor_cache" ) with open(segment_path, 'r', encoding='utf-8') as f: segment_data = json.load(f) segment_number = segment_data.get("segment_number", 1) start_time = segment_data.get("start_time", "00:00:00") end_time = segment_data.get("end_time", "00:10:00") print(f"\nProcessing segment {segment_number}: {start_time} to {end_time}") captions = extract_captions_for_segment(transcript_content, start_time, end_time) print(f"Found {len(captions)} caption lines for this segment") output_filename = f"segment-{segment_number:02d}.mp4" output_path = os.path.join(temp_dir, output_filename) success = process_video_segment( video_path, output_path, start_time, end_time, captions ) if not success: print(f"Failed to process segment {segment_number}") continue upload_path = f"{READY_VIDEOS_FOLDER}/{movie_name}/{output_filename}" print(f"Uploading to: {upload_path}") upload_file( path_or_fileobj=output_path, path_in_repo=upload_path, repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Add processed video segment {segment_number} for {movie_name}" ) print(f"✓ Segment {segment_number} uploaded successfully") except Exception as e: print(f"✗ Error processing segment: {e}") processing_state["error_count"] += 1 continue finally: import shutil shutil.rmtree(temp_dir, ignore_errors=True) processing_state["processed_files"].append(movie_name) processing_state["total_processed"] += 1 print(f"\n✓ Successfully processed all segments for {movie_name}") return True except Exception as e: processing_state["error_count"] += 1 processing_state["last_error"] = str(e) print(f"✗ Error: {e}") return False async def scan_and_process_videos(): """Scan hooks folder and process all movies.""" if processing_state["is_running"]: print("Video processing already running, skipping...") return print("Waiting 3 minutes before starting video processing...") await asyncio.sleep(180) # 3-minute startup delay processing_state["is_running"] = True print("\n" + "="*80) print("STARTING VIDEO PROCESSING SERVICE") print("="*80) try: files = list_repo_files( repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN ) movie_folders = set() for f in files: if f.startswith(f"{HOOKS_FOLDER}/") and f.endswith(".json"): parts = f.split("/") if len(parts) >= 2: movie_folders.add(parts[1]) print(f"Found {len(movie_folders)} movies to process") for movie_name in sorted(movie_folders): await process_movie_segments(movie_name) await asyncio.sleep(2) print("\n" + "="*80) print("VIDEO PROCESSING COMPLETE") print(f"Processed: {processing_state['total_processed']}") print(f"Errors: {processing_state['error_count']}") print("="*80 + "\n") except Exception as e: print(f"Critical error: {e}") processing_state["last_error"] = str(e) finally: processing_state["is_running"] = False @app.on_event("startup") async def startup_event(): """Start video processing on server startup.""" asyncio.create_task(scan_and_process_videos()) @app.get("/") async def health(): """Health check endpoint.""" return JSONResponse({ "status": "running", "service": "Video Processing Service", "is_processing": processing_state["is_running"], "total_processed": processing_state["total_processed"], "error_count": processing_state["error_count"], "current_file": processing_state["current_file"], "last_error": processing_state["last_error"], "processed_files": processing_state["processed_files"] }) @app.get("/status") async def get_status(): """Get current processing status.""" return JSONResponse({ "is_running": processing_state["is_running"], "total_processed": processing_state["total_processed"], "error_count": processing_state["error_count"], "current_file": processing_state["current_file"], "last_error": processing_state["last_error"], "processed_files": processing_state["processed_files"] }) @app.post("/trigger-processing") async def trigger_processing(): """Manually trigger video processing (skips the startup delay).""" if processing_state["is_running"]: return JSONResponse({ "status": "already_running", "message": "Video processing is already in progress" }) asyncio.create_task(scan_and_process_videos()) return JSONResponse({ "status": "started", "message": "Video processing scan started" }) if __name__ == "__main__": print("Starting Video Processing Service on port 7860...") print("Processing will begin 3 minutes after startup") uvicorn.run(app, host="0.0.0.0", port=7860)