Spaces:
Sleeping
Sleeping
| ENDOFFILE' | |
| #!/usr/bin/env python3 | |
| import os | |
| import json | |
| import re | |
| import asyncio | |
| import tempfile | |
| import subprocess | |
| from pathlib import Path | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Optional, Tuple | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| try: | |
| from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| from faster_whisper import WhisperModel | |
| except ImportError as e: | |
| print(f"Missing dependency: {e}") | |
| print("Install with: pip install faster-whisper") | |
| exit(1) | |
| # Load environment variables | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| print("Error: Missing HF_TOKEN in .env") | |
| exit(1) | |
| app = FastAPI(title="Video Processing Service") | |
| # Global state | |
| processing_state = { | |
| "is_running": False, | |
| "total_processed": 0, | |
| "current_file": None, | |
| "error_count": 0, | |
| "last_error": None, | |
| "processed_files": [], | |
| "whisper_ready": False | |
| } | |
| # Whisper model — loaded async at startup, not at import time | |
| whisper_model = None | |
| HF_DATASET_REPO = "factorstudios/movs" | |
| HOOKS_FOLDER = "hooks" | |
| READY_VIDEOS_FOLDER = "ready_videos" | |
| TRANSCRIPTION_FOLDER = "transcriptions" | |
| def _load_whisper_model(): | |
| """Blocking model load — runs in thread executor.""" | |
| global whisper_model | |
| print("Loading Whisper small model...") | |
| whisper_model = WhisperModel("small", device="auto", compute_type="int8") | |
| processing_state["whisper_ready"] = True | |
| print("✓ Whisper model loaded") | |
| def timestamp_to_seconds(timestamp: str) -> float: | |
| """Convert HH:MM:SS to seconds.""" | |
| try: | |
| parts = timestamp.split(":") | |
| return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2]) | |
| except Exception as e: | |
| print(f"Error converting timestamp {timestamp}: {e}") | |
| return 0.0 | |
| def extract_audio_segment(video_path: str, start_seconds: float, end_seconds: float, output_wav: str) -> bool: | |
| """Extract audio segment from video as WAV for Whisper.""" | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-ss", str(start_seconds), | |
| "-to", str(end_seconds), | |
| "-i", video_path, | |
| "-vn", | |
| "-acodec", "pcm_s16le", | |
| "-ar", "16000", | |
| "-ac", "1", | |
| output_wav | |
| ] | |
| result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return result.returncode == 0 | |
| def transcribe_segment(audio_path: str) -> List[Tuple[float, float, str]]: | |
| """ | |
| Transcribe audio with Whisper small. | |
| Returns list of (start_sec, end_sec, text) relative to segment start. | |
| """ | |
| print(" Transcribing audio with Whisper small...") | |
| segments, info = whisper_model.transcribe( | |
| audio_path, | |
| beam_size=5, | |
| language=None, | |
| vad_filter=True, | |
| vad_parameters=dict(min_silence_duration_ms=500) | |
| ) | |
| captions = [] | |
| for seg in segments: | |
| text = seg.text.strip() | |
| if text: | |
| captions.append((seg.start, seg.end, text)) | |
| print(f" [{seg.start:.1f}s → {seg.end:.1f}s] {text}") | |
| print(f" ✓ Transcribed {len(captions)} caption segments") | |
| return captions | |
| def apply_color_grading_wedding_retro(frame: np.ndarray) -> np.ndarray: | |
| """Apply cinematic wedding LUT + retro style with high sharpening.""" | |
| lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) | |
| l_channel, a_channel, b_channel = cv2.split(lab) | |
| a_channel = cv2.add(a_channel, 5) | |
| b_channel = cv2.add(b_channel, 8) | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) | |
| l_channel = clahe.apply(l_channel) | |
| lab_enhanced = cv2.merge([l_channel, a_channel, b_channel]) | |
| frame = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR) | |
| hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV).astype(np.float32) | |
| hsv[:, :, 1] = np.clip(hsv[:, :, 1] * 1.3, 0, 255) | |
| frame = cv2.cvtColor(hsv.astype(np.uint8), cv2.COLOR_HSV2BGR) | |
| frame = cv2.convertScaleAbs(frame, alpha=1.15, beta=10) | |
| kernel = np.array([[-1, -1, -1], | |
| [-1, 9, -1], | |
| [-1, -1, -1]]) / 1.2 | |
| sharpened = cv2.filter2D(frame, -1, kernel) | |
| frame = cv2.addWeighted(frame, 0.4, sharpened, 0.6, 0) | |
| rows, cols = frame.shape[:2] | |
| X_kernel = cv2.getGaussianKernel(cols, cols / 2) | |
| Y_kernel = cv2.getGaussianKernel(rows, rows / 2) | |
| mask = (Y_kernel * X_kernel.T) | |
| mask = (mask / mask.max()) ** 0.4 | |
| for i in range(3): | |
| frame[:, :, i] = frame[:, :, i] * mask | |
| return np.clip(frame, 0, 255).astype(np.uint8) | |
| def burn_captions_to_frame(frame: np.ndarray, text: str, font_size: int = 36) -> np.ndarray: | |
| """Burn caption text onto frame — shadow only, no background, positioned near bottom.""" | |
| height, width = frame.shape[:2] | |
| frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).convert('RGBA') | |
| overlay = Image.new('RGBA', frame_pil.size, (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(overlay) | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) | |
| except Exception: | |
| font = ImageFont.load_default() | |
| max_width = width - 80 | |
| wrapped_lines = [] | |
| words = text.split() | |
| current_line = [] | |
| for word in words: | |
| test_line = ' '.join(current_line + [word]) | |
| bbox = draw.textbbox((0, 0), test_line, font=font) | |
| if bbox[2] - bbox[0] > max_width: | |
| if current_line: | |
| wrapped_lines.append(' '.join(current_line)) | |
| current_line = [word] | |
| else: | |
| current_line.append(word) | |
| if current_line: | |
| wrapped_lines.append(' '.join(current_line)) | |
| line_height = font_size + 12 | |
| total_text_height = len(wrapped_lines) * line_height | |
| y_start = int(height * 0.80) - total_text_height // 2 | |
| shadow_offset = 3 | |
| for i, line in enumerate(wrapped_lines): | |
| bbox = draw.textbbox((0, 0), line, font=font) | |
| line_width = bbox[2] - bbox[0] | |
| x = (width - line_width) // 2 | |
| y = y_start + i * line_height | |
| draw.text((x + shadow_offset, y + shadow_offset), line, font=font, fill=(0, 0, 0, 200)) | |
| draw.text((x, y), line, font=font, fill=(255, 255, 255, 255)) | |
| frame_pil = Image.alpha_composite(frame_pil, overlay).convert('RGB') | |
| return cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR) | |
| def build_frame_caption_map(captions: List[Tuple[float, float, str]], fps: float) -> Dict[int, str]: | |
| """Convert Whisper segments into a per-frame caption lookup.""" | |
| frame_map = {} | |
| for start_sec, end_sec, text in captions: | |
| start_frame = int(start_sec * fps) | |
| end_frame = int(end_sec * fps) | |
| for f in range(start_frame, end_frame + 1): | |
| frame_map[f] = text | |
| return frame_map | |
| def process_video_segment( | |
| video_path: str, | |
| output_path: str, | |
| start_time: str, | |
| end_time: str, | |
| target_width: int = 1080, | |
| target_height: int = 1350 | |
| ) -> bool: | |
| """ | |
| Full pipeline: | |
| 1. Extract audio segment → WAV | |
| 2. Transcribe with Whisper small | |
| 3. Process frames with color grading + caption burn-in | |
| 4. Mux processed video with original audio | |
| """ | |
| ffmpeg_video_proc = None | |
| temp_wav = output_path.replace(".mp4", "_audio.wav") | |
| temp_video_path = output_path.replace(".mp4", "_noaudio.mp4") | |
| try: | |
| print(f"Opening video: {video_path}") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| print(f"Error: Could not open video {video_path}") | |
| return False | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| start_seconds = timestamp_to_seconds(start_time) | |
| end_seconds = timestamp_to_seconds(end_time) | |
| duration = end_seconds - start_seconds | |
| print(f"Video info: {fps} fps, {original_width}x{original_height}") | |
| print(f"Extracting segment: {start_time} to {end_time} ({duration:.1f}s)") | |
| # ── Step 1: Extract audio → WAV ─────────────────────────────────────── | |
| print(" Extracting audio segment...") | |
| audio_ok = extract_audio_segment(video_path, start_seconds, end_seconds, temp_wav) | |
| # ── Step 2: Transcribe with Whisper ─────────────────────────────────── | |
| if audio_ok and whisper_model is not None: | |
| captions = transcribe_segment(temp_wav) | |
| else: | |
| print(" Warning: Skipping transcription (audio failed or model not ready)") | |
| captions = [] | |
| frame_caption_map = build_frame_caption_map(captions, fps) | |
| # ── Step 3: Process frames → pipe to FFmpeg ─────────────────────────── | |
| ffmpeg_video_cmd = [ | |
| "ffmpeg", "-y", | |
| "-f", "rawvideo", | |
| "-vcodec", "rawvideo", | |
| "-s", f"{target_width}x{target_height}", | |
| "-pix_fmt", "bgr24", | |
| "-r", str(fps), | |
| "-i", "pipe:0", | |
| "-vcodec", "libx264", | |
| "-preset", "fast", | |
| "-crf", "23", | |
| "-pix_fmt", "yuv420p", | |
| temp_video_path | |
| ] | |
| ffmpeg_video_proc = subprocess.Popen( | |
| ffmpeg_video_cmd, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL | |
| ) | |
| start_frame = int(start_seconds * fps) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
| current_caption = "" | |
| processed_frames = 0 | |
| target_frames = int(duration * fps) | |
| print(f"Processing {target_frames} frames...") | |
| while processed_frames < target_frames: | |
| ret, frame = cap.read() | |
| if not ret: | |
| print(f"Warning: Could not read frame at position {processed_frames}") | |
| break | |
| aspect_ratio = target_width / target_height | |
| if original_width / original_height > aspect_ratio: | |
| new_width = int(original_height * aspect_ratio) | |
| x_offset = (original_width - new_width) // 2 | |
| frame = frame[:, x_offset:x_offset + new_width] | |
| else: | |
| new_height = int(original_width / aspect_ratio) | |
| y_offset = (original_height - new_height) // 2 | |
| frame = frame[y_offset:y_offset + new_height, :] | |
| frame = cv2.resize(frame, (target_width, target_height), interpolation=cv2.INTER_LANCZOS4) | |
| frame = apply_color_grading_wedding_retro(frame) | |
| current_caption = frame_caption_map.get(processed_frames, current_caption) | |
| # Clear caption if this frame isn't in the map and the previous caption has ended | |
| if processed_frames not in frame_caption_map: | |
| current_caption = "" | |
| if current_caption: | |
| frame = burn_captions_to_frame(frame, current_caption) | |
| ffmpeg_video_proc.stdin.write(frame.tobytes()) | |
| processed_frames += 1 | |
| if processed_frames % max(1, target_frames // 10) == 0: | |
| progress = (processed_frames / target_frames) * 100 | |
| print(f"Progress: {progress:.1f}%") | |
| ffmpeg_video_proc.stdin.close() | |
| ffmpeg_video_proc.wait() | |
| cap.release() | |
| if ffmpeg_video_proc.returncode != 0: | |
| print(f"✗ FFmpeg video encoding failed (code {ffmpeg_video_proc.returncode})") | |
| return False | |
| print("✓ Frames encoded, muxing audio...") | |
| # ── Step 4: Mux processed video + original audio ────────────────────── | |
| ffmpeg_mux_cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", temp_video_path, | |
| "-ss", str(start_seconds), | |
| "-to", str(end_seconds), | |
| "-i", video_path, | |
| "-map", "0:v:0", | |
| "-map", "1:a:0", | |
| "-c:v", "copy", | |
| "-c:a", "aac", | |
| "-b:a", "192k", | |
| "-shortest", | |
| "-movflags", "+faststart", | |
| output_path | |
| ] | |
| mux_result = subprocess.run( | |
| ffmpeg_mux_cmd, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL | |
| ) | |
| if mux_result.returncode != 0: | |
| print(f"✗ FFmpeg audio mux failed (code {mux_result.returncode})") | |
| return False | |
| print(f"✓ Segment complete: {output_path}") | |
| return True | |
| except Exception as e: | |
| print(f"✗ Error processing video segment: {e}") | |
| if ffmpeg_video_proc is not None: | |
| try: | |
| ffmpeg_video_proc.stdin.close() | |
| except Exception: | |
| pass | |
| ffmpeg_video_proc.wait() | |
| return False | |
| finally: | |
| for tmp in [temp_video_path, temp_wav]: | |
| if tmp and os.path.exists(tmp): | |
| try: | |
| os.remove(tmp) | |
| except Exception: | |
| pass | |
| async def process_movie_segments(movie_name: str) -> bool: | |
| """Process all segments for a movie.""" | |
| try: | |
| processing_state["current_file"] = movie_name | |
| print(f"\n{'='*80}") | |
| print(f"Processing movie: {movie_name}") | |
| print(f"{'='*80}") | |
| video_file = f"{movie_name}.mkv" | |
| print(f"Downloading video: {video_file}") | |
| try: | |
| video_path = hf_hub_download( | |
| repo_id=HF_DATASET_REPO, | |
| filename=video_file, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| cache_dir="/tmp/video_processor_cache" | |
| ) | |
| if os.path.islink(video_path): | |
| video_path = os.path.realpath(video_path) | |
| except Exception as e: | |
| print(f"Error: Could not download video: {e}") | |
| return False | |
| hooks_folder = f"{HOOKS_FOLDER}/{movie_name}" | |
| print(f"Listing segments from: {hooks_folder}") | |
| files = list_repo_files( | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| segment_files = sorted([ | |
| f for f in files | |
| if f.startswith(f"{hooks_folder}/") and f.endswith(".json") | |
| ]) | |
| if not segment_files: | |
| print(f"No segment JSON files found for {movie_name}") | |
| return False | |
| print(f"Found {len(segment_files)} segments") | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| for segment_file in segment_files: | |
| try: | |
| segment_path = hf_hub_download( | |
| repo_id=HF_DATASET_REPO, | |
| filename=segment_file, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| cache_dir="/tmp/video_processor_cache" | |
| ) | |
| with open(segment_path, 'r', encoding='utf-8') as f: | |
| segment_data = json.load(f) | |
| segment_number = segment_data.get("segment_number", 1) | |
| start_time = segment_data.get("start_time", "00:00:00") | |
| end_time = segment_data.get("end_time", "00:10:00") | |
| print(f"\nProcessing segment {segment_number}: {start_time} to {end_time}") | |
| output_filename = f"segment-{segment_number:02d}.mp4" | |
| output_path = os.path.join(temp_dir, output_filename) | |
| success = process_video_segment( | |
| video_path, | |
| output_path, | |
| start_time, | |
| end_time | |
| ) | |
| if not success: | |
| print(f"Failed to process segment {segment_number}") | |
| continue | |
| upload_path = f"{READY_VIDEOS_FOLDER}/{movie_name}/{output_filename}" | |
| print(f"Uploading to: {upload_path}") | |
| upload_file( | |
| path_or_fileobj=output_path, | |
| path_in_repo=upload_path, | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"Add processed video segment {segment_number} for {movie_name}" | |
| ) | |
| print(f"✓ Segment {segment_number} uploaded successfully") | |
| except Exception as e: | |
| print(f"✗ Error processing segment: {e}") | |
| processing_state["error_count"] += 1 | |
| continue | |
| finally: | |
| import shutil | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| processing_state["processed_files"].append(movie_name) | |
| processing_state["total_processed"] += 1 | |
| print(f"\n✓ Successfully processed all segments for {movie_name}") | |
| return True | |
| except Exception as e: | |
| processing_state["error_count"] += 1 | |
| processing_state["last_error"] = str(e) | |
| print(f"✗ Error: {e}") | |
| return False | |
| async def scan_and_process_videos(): | |
| """Scan hooks folder and process all movies.""" | |
| if processing_state["is_running"]: | |
| print("Video processing already running, skipping...") | |
| return | |
| # Wait 3 minutes for Space to fully initialize | |
| print("Waiting 3 minutes before starting video processing...") | |
| await asyncio.sleep(180) | |
| processing_state["is_running"] = True | |
| print("\n" + "="*80) | |
| print("STARTING VIDEO PROCESSING SERVICE") | |
| print("="*80) | |
| try: | |
| files = list_repo_files( | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| movie_folders = set() | |
| for f in files: | |
| if f.startswith(f"{HOOKS_FOLDER}/") and f.endswith(".json"): | |
| parts = f.split("/") | |
| if len(parts) >= 2: | |
| movie_folders.add(parts[1]) | |
| print(f"Found {len(movie_folders)} movies to process") | |
| for movie_name in sorted(movie_folders): | |
| await process_movie_segments(movie_name) | |
| await asyncio.sleep(2) | |
| print("\n" + "="*80) | |
| print("VIDEO PROCESSING COMPLETE") | |
| print(f"Processed: {processing_state['total_processed']}") | |
| print(f"Errors: {processing_state['error_count']}") | |
| print("="*80 + "\n") | |
| except Exception as e: | |
| print(f"Critical error: {e}") | |
| processing_state["last_error"] = str(e) | |
| finally: | |
| processing_state["is_running"] = False | |
| async def startup_event(): | |
| """Load Whisper in background, then kick off video processing after 3 min.""" | |
| loop = asyncio.get_event_loop() | |
| # Load Whisper model in thread so it doesn't block the event loop / health check | |
| await loop.run_in_executor(None, _load_whisper_model) | |
| # Kick off processing task (has its own 3-min delay inside) | |
| asyncio.create_task(scan_and_process_videos()) | |
| async def health(): | |
| return JSONResponse({ | |
| "status": "running", | |
| "service": "Video Processing Service", | |
| "whisper_ready": processing_state["whisper_ready"], | |
| "is_processing": processing_state["is_running"], | |
| "total_processed": processing_state["total_processed"], | |
| "error_count": processing_state["error_count"], | |
| "current_file": processing_state["current_file"], | |
| "last_error": processing_state["last_error"], | |
| "processed_files": processing_state["processed_files"] | |
| }) | |
| async def get_status(): | |
| return JSONResponse({ | |
| "whisper_ready": processing_state["whisper_ready"], | |
| "is_running": processing_state["is_running"], | |
| "total_processed": processing_state["total_processed"], | |
| "error_count": processing_state["error_count"], | |
| "current_file": processing_state["current_file"], | |
| "last_error": processing_state["last_error"], | |
| "processed_files": processing_state["processed_files"] | |
| }) | |
| async def trigger_processing(): | |
| if processing_state["is_running"]: | |
| return JSONResponse({ | |
| "status": "already_running", | |
| "message": "Video processing is already in progress" | |
| }) | |
| if not processing_state["whisper_ready"]: | |
| return JSONResponse({ | |
| "status": "not_ready", | |
| "message": "Whisper model is still loading, try again shortly" | |
| }) | |
| asyncio.create_task(scan_and_process_videos()) | |
| return JSONResponse({ | |
| "status": "started", | |
| "message": "Video processing scan started" | |
| }) | |
| if __name__ == "__main__": | |
| print("Starting Video Processing Service on port 7860...") | |
| print("Whisper will load at startup, processing begins 3 minutes after") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |