Spaces:
Running
Running
| import os | |
| import json | |
| import asyncio | |
| import tempfile | |
| import subprocess | |
| import shutil | |
| import time | |
| import threading | |
| from pathlib import Path | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Optional, Tuple | |
| from fastapi import FastAPI | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| try: | |
| from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| from faster_whisper import WhisperModel | |
| except ImportError as e: | |
| print(f"Missing dependency: {e}") | |
| exit(1) | |
| # Load environment variables | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| HF_DATASET_REPO = "factorstudios/movs" | |
| HOOKS_FOLDER = "hooks" | |
| READY_VIDEOS_FOLDER = "ready_videos" | |
| app = FastAPI(title="Video Processing Service") | |
| # Global state | |
| processing_state = { | |
| "is_running": False, | |
| "total_processed": 0, | |
| "current_file": None, | |
| "error_count": 0, | |
| "last_error": None, | |
| "processed_files": [], | |
| "whisper_ready": False, | |
| "log": [] | |
| } | |
| whisper_model = None | |
| def add_log(msg): | |
| # Print to console as requested | |
| timestamp = datetime.now().strftime('%H:%M:%S') | |
| formatted_msg = f"[{timestamp}] {msg}" | |
| print(formatted_msg) | |
| # Also keep in state for API status checks | |
| processing_state["log"].append(formatted_msg) | |
| if len(processing_state["log"]) > 100: | |
| processing_state["log"].pop(0) | |
| def _load_whisper_model(): | |
| """Load model in a way that doesn't block the event loop.""" | |
| global whisper_model | |
| try: | |
| add_log("Starting Whisper model load...") | |
| whisper_model = WhisperModel("small", device="auto", compute_type="int8") | |
| processing_state["whisper_ready"] = True | |
| add_log("β Whisper model loaded successfully") | |
| except Exception as e: | |
| add_log(f"β Failed to load Whisper model: {e}") | |
| def timestamp_to_seconds(timestamp: str) -> float: | |
| try: | |
| parts = timestamp.split(":") | |
| if len(parts) == 3: | |
| return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2]) | |
| return 0.0 | |
| except: | |
| return 0.0 | |
| def apply_color_grading(frame): | |
| lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) | |
| l, a, b = cv2.split(lab) | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) | |
| l = clahe.apply(l) | |
| frame = cv2.cvtColor(cv2.merge([l, a, b]), cv2.COLOR_LAB2BGR) | |
| kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) / 1.2 | |
| sharpened = cv2.filter2D(frame, -1, kernel) | |
| return cv2.addWeighted(frame, 0.4, sharpened, 0.6, 0) | |
| def burn_captions(frame, text, font_size=40): | |
| h, w = frame.shape[:2] | |
| pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).convert('RGBA') | |
| draw = ImageDraw.Draw(pil_img) | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) | |
| except: | |
| font = ImageFont.load_default() | |
| lines, curr = [], [] | |
| for word in text.split(): | |
| test = ' '.join(curr + [word]) | |
| if draw.textbbox((0, 0), test, font=font)[2] < w - 100: | |
| curr.append(word) | |
| else: | |
| lines.append(' '.join(curr)) | |
| curr = [word] | |
| if curr: lines.append(' '.join(curr)) | |
| y = int(h * 0.8) | |
| for line in lines: | |
| bbox = draw.textbbox((0, 0), line, font=font) | |
| x = (w - (bbox[2] - bbox[0])) // 2 | |
| draw.text((x+2, y+2), line, font=font, fill=(0,0,0,180)) | |
| draw.text((x, y), line, font=font, fill=(255,255,255,255)) | |
| y += font_size + 10 | |
| return cv2.cvtColor(np.array(pil_img.convert('RGB')), cv2.COLOR_RGB2BGR) | |
| def process_video_sync(video_path, output_path, start_t, end_t): | |
| temp_seg = output_path + ".seg.mp4" | |
| temp_no_audio = output_path + ".noaudio.mp4" | |
| temp_wav = output_path + ".wav" | |
| try: | |
| start_s = timestamp_to_seconds(start_t) | |
| end_s = timestamp_to_seconds(end_t) | |
| subprocess.run(["ffmpeg", "-y", "-ss", str(start_s), "-to", str(end_s), "-i", video_path, "-c", "copy", temp_seg], capture_output=True) | |
| subprocess.run(["ffmpeg", "-y", "-i", temp_seg, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", temp_wav], capture_output=True) | |
| captions = [] | |
| add_log(f"[process_video_sync] Whisper model ready: {processing_state["whisper_ready"]}") | |
| add_log(f"[process_video_sync] Whisper model instance: {whisper_model is not None}") | |
| if whisper_model and processing_state["whisper_ready"]: | |
| segs, _ = whisper_model.transcribe(temp_wav) | |
| captions = [(s.start, s.end, s.text.strip()) for s in segs if s.text.strip()] | |
| add_log(f"[process_video_sync] Transcribed {len(captions)} captions for {temp_wav}") | |
| if not captions: | |
| add_log("[process_video_sync] WARNING: No captions transcribed. Check audio or model.") | |
| cap = cv2.VideoCapture(temp_seg) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 24 | |
| width, height = 1080, 1350 | |
| ffmpeg_cmd = [ | |
| "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{width}x{height}", | |
| "-pix_fmt", "bgr24", "-r", str(fps), "-i", "pipe:0", "-vcodec", "libx264", | |
| "-preset", "veryfast", "-crf", "22", "-pix_fmt", "yuv420p", temp_no_audio | |
| ] | |
| proc = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL) | |
| f_idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: break | |
| h, w = frame.shape[:2] | |
| target_ratio = width / height | |
| if w/h > target_ratio: | |
| nw = int(h * target_ratio) | |
| off = (w - nw) // 2 | |
| frame = frame[:, off:off+nw] | |
| else: | |
| nh = int(w / target_ratio) | |
| off = (h - nh) // 2 | |
| frame = frame[off:off+nh, :] | |
| frame = cv2.resize(frame, (width, height)) | |
| frame = apply_color_grading(frame) | |
| ts = f_idx / fps | |
| for s, e, t in captions: | |
| if s <= ts <= e: | |
| frame = burn_captions(frame, t) | |
| break | |
| proc.stdin.write(frame.tobytes()) | |
| f_idx += 1 | |
| proc.stdin.close() | |
| proc.wait() | |
| cap.release() | |
| subprocess.run(["ffmpeg", "-y", "-i", temp_no_audio, "-i", temp_seg, "-map", "0:v:0", "-map", "1:a:0", "-c", "copy", "-shortest", output_path], capture_output=True) | |
| return os.path.exists(output_path) | |
| except Exception as e: | |
| add_log(f"Error in sync process: {e}") | |
| return False | |
| finally: | |
| for f in [temp_seg, temp_no_audio, temp_wav]: | |
| if os.path.exists(f): os.remove(f) | |
| async def run_processing_loop(): | |
| if processing_state["is_running"]: return | |
| processing_state["is_running"] = True | |
| try: | |
| add_log("Waiting 5 seconds for server to settle...") | |
| await asyncio.sleep(5) | |
| # Start model loading after the 5s delay | |
| add_log("Initiating background tasks...") | |
| asyncio.create_task(asyncio.to_thread(_load_whisper_model)) | |
| while not processing_state["whisper_ready"]: | |
| await asyncio.sleep(2) | |
| add_log("Starting repository scan...") | |
| files = list_repo_files(repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN) | |
| # Find all movies with hooks | |
| add_log("Scanning hooks directory...") | |
| all_hooks_movies = {} | |
| for f in files: | |
| if f.startswith(HOOKS_FOLDER + "/") and f.endswith(".json"): | |
| parts = f.split("/") | |
| if len(parts) >= 3: | |
| movie_name = parts[1] | |
| if movie_name not in all_hooks_movies: | |
| all_hooks_movies[movie_name] = [] | |
| all_hooks_movies[movie_name].append(f) | |
| add_log(f"Found {len(all_hooks_movies)} movies in hooks folder") | |
| # Find all movies with ready videos | |
| add_log("Scanning ready_videos directory...") | |
| processed_videos = {} | |
| for f in files: | |
| if f.startswith(READY_VIDEOS_FOLDER + "/") and f.endswith(".mp4"): | |
| parts = f.split("/") | |
| if len(parts) >= 3: | |
| movie_name = parts[1] | |
| if movie_name not in processed_videos: | |
| processed_videos[movie_name] = set() | |
| processed_videos[movie_name].add(parts[2]) | |
| add_log(f"Found {len(processed_videos)} movies with ready videos") | |
| # Find unprocessed movies | |
| unprocessed_movies = [] | |
| for movie_name, hooks in all_hooks_movies.items(): | |
| if movie_name not in processed_videos: | |
| # Movie has no ready videos at all | |
| unprocessed_movies.append((movie_name, hooks, [])) | |
| add_log(f" β {movie_name} (no ready videos, process all {len(hooks)} segments)") | |
| else: | |
| # Check which segments are already processed | |
| processed_segments = processed_videos[movie_name] | |
| unprocessed_hooks = [h for h in hooks if not any(f"segment-{json.loads(open(h).read()).get('segment_number', 1):02d}.mp4" in s for s in processed_segments)] | |
| if unprocessed_hooks: | |
| unprocessed_movies.append((movie_name, unprocessed_hooks, list(processed_segments))) | |
| add_log(f" β {movie_name} (already has {len(processed_segments)} videos, {len(unprocessed_hooks)} segments remaining)") | |
| else: | |
| add_log(f" β {movie_name} (already complete with {len(processed_segments)} videos)") | |
| add_log(f"\nTotal unprocessed movies to process: {len(unprocessed_movies)}\n") | |
| if not unprocessed_movies: | |
| add_log("All movies already processed!") | |
| return | |
| for movie, movie_hooks, existing_videos in unprocessed_movies: | |
| processing_state["current_file"] = movie | |
| add_log(f"--- Processing Movie: {movie} ---") | |
| try: | |
| video_path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=f"{READY_VIDEOS_FOLDER}/{movie}.mkv", repo_type="dataset", token=HF_TOKEN) | |
| except: | |
| try: | |
| # Try alternative path | |
| video_path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=f"{movie}.mkv", repo_type="dataset", token=HF_TOKEN) | |
| except Exception as e: | |
| add_log(f"β Could not find video file for {movie}: {e}") | |
| processing_state["error_count"] += 1 | |
| continue | |
| add_log(f"Found {len(movie_hooks)} unprocessed segments for {movie}") | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| for hook_file in movie_hooks: | |
| await asyncio.sleep(0.1) | |
| hook_path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=hook_file, repo_type="dataset", token=HF_TOKEN) | |
| with open(hook_path, 'r') as f: | |
| data = json.load(f) | |
| num, start, end = data.get("segment_number", 1), data.get("start_time", "00:00:00"), data.get("end_time", "00:00:10") | |
| out_name = f"segment-{num:02d}.mp4" | |
| # Skip if already exists | |
| if out_name in existing_videos: | |
| add_log(f" β Segment {num} (already processed)") | |
| continue | |
| out_path = os.path.join(temp_dir, out_name) | |
| add_log(f"Processing Segment {num} ({start} to {end})") | |
| success = await asyncio.to_thread(process_video_sync, video_path, out_path, start, end) | |
| if success: | |
| upload_file(path_or_fileobj=out_path, path_in_repo=f"{READY_VIDEOS_FOLDER}/{movie}/{out_name}", repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN) | |
| add_log(f"β Segment {num} uploaded successfully") | |
| else: | |
| add_log(f"β Segment {num} failed") | |
| processing_state["error_count"] += 1 | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| processing_state["processed_files"].append(movie) | |
| processing_state["total_processed"] += 1 | |
| add_log(f"Finished movie: {movie}") | |
| except Exception as e: | |
| add_log(f"CRITICAL ERROR: {e}") | |
| processing_state["last_error"] = str(e) | |
| finally: | |
| processing_state["is_running"] = False | |
| add_log("Background worker idle.") | |
| async def startup_event(): | |
| """Schedule video processing loop on server startup with background thread.""" | |
| add_log("\n" + "="*80) | |
| add_log("STARTUP EVENT TRIGGERED - Video Segment Processing Service") | |
| add_log("="*80) | |
| # Schedule processing in a background thread (more reliable for deployment) | |
| def run_loop(): | |
| asyncio.run(run_processing_loop()) | |
| process_thread = threading.Thread(target=run_loop, daemon=True) | |
| process_thread.start() | |
| add_log("β Background processing thread scheduled") | |
| async def status(): | |
| return processing_state | |
| if __name__ == "__main__": | |
| add_log("Starting Video Processing Service on port 7860...") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |