#!/usr/bin/env python3 import os import json import asyncio import tempfile import subprocess import shutil import time from pathlib import Path from datetime import datetime from dotenv import load_dotenv from typing import List, Dict, Optional, Tuple from fastapi import FastAPI from fastapi.responses import JSONResponse import uvicorn try: from huggingface_hub import list_repo_files, hf_hub_download, upload_file import cv2 import numpy as np from PIL import Image, ImageDraw, ImageFont from faster_whisper import WhisperModel except ImportError as e: print(f"Missing dependency: {e}") exit(1) # Load environment variables load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") HF_DATASET_REPO = "factorstudios/movs" HOOKS_FOLDER = "hooks" READY_VIDEOS_FOLDER = "ready_videos" app = FastAPI(title="Video Processing Service") # Global state processing_state = { "is_running": False, "total_processed": 0, "current_file": None, "error_count": 0, "last_error": None, "processed_files": [], "whisper_ready": False, "log": [] } whisper_model = None def add_log(msg): # Print to console as requested timestamp = datetime.now().strftime('%H:%M:%S') formatted_msg = f"[{timestamp}] {msg}" print(formatted_msg) # Also keep in state for API status checks processing_state["log"].append(formatted_msg) if len(processing_state["log"]) > 100: processing_state["log"].pop(0) def _load_whisper_model(): """Load model in a way that doesn't block the event loop.""" global whisper_model try: add_log("Starting Whisper model load...") whisper_model = WhisperModel("small", device="auto", compute_type="int8") processing_state["whisper_ready"] = True add_log("✓ Whisper model loaded successfully") except Exception as e: add_log(f"✗ Failed to load Whisper model: {e}") def timestamp_to_seconds(timestamp: str) -> float: try: parts = timestamp.split(":") if len(parts) == 3: return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2]) return 0.0 except: return 0.0 def apply_color_grading(frame): lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) l = clahe.apply(l) frame = cv2.cvtColor(cv2.merge([l, a, b]), cv2.COLOR_LAB2BGR) kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) / 1.2 sharpened = cv2.filter2D(frame, -1, kernel) return cv2.addWeighted(frame, 0.4, sharpened, 0.6, 0) def burn_captions(frame, text, font_size=40): h, w = frame.shape[:2] pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).convert('RGBA') draw = ImageDraw.Draw(pil_img) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) except: font = ImageFont.load_default() lines, curr = [], [] for word in text.split(): test = ' '.join(curr + [word]) if draw.textbbox((0, 0), test, font=font)[2] < w - 100: curr.append(word) else: lines.append(' '.join(curr)) curr = [word] if curr: lines.append(' '.join(curr)) y = int(h * 0.8) for line in lines: bbox = draw.textbbox((0, 0), line, font=font) x = (w - (bbox[2] - bbox[0])) // 2 draw.text((x+2, y+2), line, font=font, fill=(0,0,0,180)) draw.text((x, y), line, font=font, fill=(255,255,255,255)) y += font_size + 10 return cv2.cvtColor(np.array(pil_img.convert('RGB')), cv2.COLOR_RGB2BGR) def process_video_sync(video_path, output_path, start_t, end_t): temp_seg = output_path + ".seg.mp4" temp_no_audio = output_path + ".noaudio.mp4" temp_wav = output_path + ".wav" try: start_s = timestamp_to_seconds(start_t) end_s = timestamp_to_seconds(end_t) subprocess.run(["ffmpeg", "-y", "-ss", str(start_s), "-to", str(end_s), "-i", video_path, "-c", "copy", temp_seg], capture_output=True) subprocess.run(["ffmpeg", "-y", "-i", temp_seg, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", temp_wav], capture_output=True) captions = [] add_log(f"[process_video_sync] Whisper model ready: {processing_state["whisper_ready"]}") add_log(f"[process_video_sync] Whisper model instance: {whisper_model is not None}") if whisper_model and processing_state["whisper_ready"]: segs, _ = whisper_model.transcribe(temp_wav) captions = [(s.start, s.end, s.text.strip()) for s in segs if s.text.strip()] add_log(f"[process_video_sync] Transcribed {len(captions)} captions for {temp_wav}") if not captions: add_log("[process_video_sync] WARNING: No captions transcribed. Check audio or model.") cap = cv2.VideoCapture(temp_seg) fps = cap.get(cv2.CAP_PROP_FPS) or 24 width, height = 1080, 1350 ffmpeg_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{width}x{height}", "-pix_fmt", "bgr24", "-r", str(fps), "-i", "pipe:0", "-vcodec", "libx264", "-preset", "veryfast", "-crf", "22", "-pix_fmt", "yuv420p", temp_no_audio ] proc = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL) f_idx = 0 while True: ret, frame = cap.read() if not ret: break h, w = frame.shape[:2] target_ratio = width / height if w/h > target_ratio: nw = int(h * target_ratio) off = (w - nw) // 2 frame = frame[:, off:off+nw] else: nh = int(w / target_ratio) off = (h - nh) // 2 frame = frame[off:off+nh, :] frame = cv2.resize(frame, (width, height)) frame = apply_color_grading(frame) ts = f_idx / fps for s, e, t in captions: if s <= ts <= e: frame = burn_captions(frame, t) break proc.stdin.write(frame.tobytes()) f_idx += 1 proc.stdin.close() proc.wait() cap.release() subprocess.run(["ffmpeg", "-y", "-i", temp_no_audio, "-i", temp_seg, "-map", "0:v:0", "-map", "1:a:0", "-c", "copy", "-shortest", output_path], capture_output=True) return os.path.exists(output_path) except Exception as e: add_log(f"Error in sync process: {e}") return False finally: for f in [temp_seg, temp_no_audio, temp_wav]: if os.path.exists(f): os.remove(f) async def run_processing_loop(): if processing_state["is_running"]: return processing_state["is_running"] = True try: add_log("Waiting 5 seconds for server to settle...") await asyncio.sleep(5) # Start model loading after the 5s delay add_log("Initiating background tasks...") asyncio.create_task(asyncio.to_thread(_load_whisper_model)) while not processing_state["whisper_ready"]: await asyncio.sleep(2) add_log("Starting repository scan...") files = list_repo_files(repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN) movies = sorted(list(set(f.split("/")[1] for f in files if f.startswith(HOOKS_FOLDER + "/") and f.endswith(".json")))) add_log(f"Found {len(movies)} movies to process") for movie in movies: processing_state["current_file"] = movie add_log(f"--- Processing Movie: {movie} ---") video_path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=f"{movie}.mkv", repo_type="dataset", token=HF_TOKEN) movie_hooks = sorted([f for f in files if f.startswith(f"{HOOKS_FOLDER}/{movie}/") and f.endswith(".json")]) add_log(f"Found {len(movie_hooks)} segments for {movie}") temp_dir = tempfile.mkdtemp() for hook_file in movie_hooks: await asyncio.sleep(0.1) hook_path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=hook_file, repo_type="dataset", token=HF_TOKEN) with open(hook_path, 'r') as f: data = json.load(f) num, start, end = data.get("segment_number", 1), data.get("start_time", "00:00:00"), data.get("end_time", "00:00:10") out_name = f"segment-{num:02d}.mp4" out_path = os.path.join(temp_dir, out_name) add_log(f"Processing Segment {num} ({start} to {end})") success = await asyncio.to_thread(process_video_sync, video_path, out_path, start, end) if success: upload_file(path_or_fileobj=out_path, path_in_repo=f"{READY_VIDEOS_FOLDER}/{movie}/{out_name}", repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN) add_log(f"✓ Segment {num} uploaded successfully") else: add_log(f"✗ Segment {num} failed") shutil.rmtree(temp_dir) processing_state["processed_files"].append(movie) processing_state["total_processed"] += 1 add_log(f"Finished movie: {movie}") except Exception as e: add_log(f"CRITICAL ERROR: {e}") processing_state["last_error"] = str(e) finally: processing_state["is_running"] = False add_log("Background worker idle.") @app.on_event("startup") async def startup_event(): # Only kick off the main loop, which now handles the 5s delay and model loading asyncio.create_task(run_processing_loop()) @app.get("/") @app.get("/status") async def status(): return processing_state if __name__ == "__main__": add_log("Starting Video Processing Service on port 7860...") uvicorn.run(app, host="0.0.0.0", port=7860)