segment / server.py
factorstudios's picture
Update server.py
d01b941 verified
raw
history blame
9.99 kB
#!/usr/bin/env python3
import os
import json
import asyncio
import tempfile
import subprocess
import shutil
import time
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from typing import List, Dict, Optional, Tuple
from fastapi import FastAPI
from fastapi.responses import JSONResponse
import uvicorn
try:
from huggingface_hub import list_repo_files, hf_hub_download, upload_file
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from faster_whisper import WhisperModel
except ImportError as e:
print(f"Missing dependency: {e}")
exit(1)
# Load environment variables
load_dotenv()
HF_TOKEN = os.getenv("HF_TOKEN")
HF_DATASET_REPO = "factorstudios/movs"
HOOKS_FOLDER = "hooks"
READY_VIDEOS_FOLDER = "ready_videos"
app = FastAPI(title="Video Processing Service")
# Global state
processing_state = {
"is_running": False,
"total_processed": 0,
"current_file": None,
"error_count": 0,
"last_error": None,
"processed_files": [],
"whisper_ready": False,
"log": []
}
whisper_model = None
def add_log(msg):
# Print to console as requested
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_msg = f"[{timestamp}] {msg}"
print(formatted_msg)
# Also keep in state for API status checks
processing_state["log"].append(formatted_msg)
if len(processing_state["log"]) > 100:
processing_state["log"].pop(0)
def _load_whisper_model():
"""Load model in a way that doesn't block the event loop."""
global whisper_model
try:
add_log("Starting Whisper model load...")
whisper_model = WhisperModel("small", device="auto", compute_type="int8")
processing_state["whisper_ready"] = True
add_log("✓ Whisper model loaded successfully")
except Exception as e:
add_log(f"✗ Failed to load Whisper model: {e}")
def timestamp_to_seconds(timestamp: str) -> float:
try:
parts = timestamp.split(":")
if len(parts) == 3:
return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2])
return 0.0
except:
return 0.0
def apply_color_grading(frame):
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
l = clahe.apply(l)
frame = cv2.cvtColor(cv2.merge([l, a, b]), cv2.COLOR_LAB2BGR)
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) / 1.2
sharpened = cv2.filter2D(frame, -1, kernel)
return cv2.addWeighted(frame, 0.4, sharpened, 0.6, 0)
def burn_captions(frame, text, font_size=40):
h, w = frame.shape[:2]
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).convert('RGBA')
draw = ImageDraw.Draw(pil_img)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size)
except:
font = ImageFont.load_default()
lines, curr = [], []
for word in text.split():
test = ' '.join(curr + [word])
if draw.textbbox((0, 0), test, font=font)[2] < w - 100:
curr.append(word)
else:
lines.append(' '.join(curr))
curr = [word]
if curr: lines.append(' '.join(curr))
y = int(h * 0.8)
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
x = (w - (bbox[2] - bbox[0])) // 2
draw.text((x+2, y+2), line, font=font, fill=(0,0,0,180))
draw.text((x, y), line, font=font, fill=(255,255,255,255))
y += font_size + 10
return cv2.cvtColor(np.array(pil_img.convert('RGB')), cv2.COLOR_RGB2BGR)
def process_video_sync(video_path, output_path, start_t, end_t):
temp_seg = output_path + ".seg.mp4"
temp_no_audio = output_path + ".noaudio.mp4"
temp_wav = output_path + ".wav"
try:
start_s = timestamp_to_seconds(start_t)
end_s = timestamp_to_seconds(end_t)
subprocess.run(["ffmpeg", "-y", "-ss", str(start_s), "-to", str(end_s), "-i", video_path, "-c", "copy", temp_seg], capture_output=True)
subprocess.run(["ffmpeg", "-y", "-i", temp_seg, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", temp_wav], capture_output=True)
captions = []
add_log(f"[process_video_sync] Whisper model ready: {processing_state["whisper_ready"]}")
add_log(f"[process_video_sync] Whisper model instance: {whisper_model is not None}")
if whisper_model and processing_state["whisper_ready"]:
segs, _ = whisper_model.transcribe(temp_wav)
captions = [(s.start, s.end, s.text.strip()) for s in segs if s.text.strip()]
add_log(f"[process_video_sync] Transcribed {len(captions)} captions for {temp_wav}")
if not captions:
add_log("[process_video_sync] WARNING: No captions transcribed. Check audio or model.")
cap = cv2.VideoCapture(temp_seg)
fps = cap.get(cv2.CAP_PROP_FPS) or 24
width, height = 1080, 1350
ffmpeg_cmd = [
"ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{width}x{height}",
"-pix_fmt", "bgr24", "-r", str(fps), "-i", "pipe:0", "-vcodec", "libx264",
"-preset", "veryfast", "-crf", "22", "-pix_fmt", "yuv420p", temp_no_audio
]
proc = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL)
f_idx = 0
while True:
ret, frame = cap.read()
if not ret: break
h, w = frame.shape[:2]
target_ratio = width / height
if w/h > target_ratio:
nw = int(h * target_ratio)
off = (w - nw) // 2
frame = frame[:, off:off+nw]
else:
nh = int(w / target_ratio)
off = (h - nh) // 2
frame = frame[off:off+nh, :]
frame = cv2.resize(frame, (width, height))
frame = apply_color_grading(frame)
ts = f_idx / fps
for s, e, t in captions:
if s <= ts <= e:
frame = burn_captions(frame, t)
break
proc.stdin.write(frame.tobytes())
f_idx += 1
proc.stdin.close()
proc.wait()
cap.release()
subprocess.run(["ffmpeg", "-y", "-i", temp_no_audio, "-i", temp_seg, "-map", "0:v:0", "-map", "1:a:0", "-c", "copy", "-shortest", output_path], capture_output=True)
return os.path.exists(output_path)
except Exception as e:
add_log(f"Error in sync process: {e}")
return False
finally:
for f in [temp_seg, temp_no_audio, temp_wav]:
if os.path.exists(f): os.remove(f)
async def run_processing_loop():
if processing_state["is_running"]: return
processing_state["is_running"] = True
try:
add_log("Waiting 5 seconds for server to settle...")
await asyncio.sleep(5)
# Start model loading after the 5s delay
add_log("Initiating background tasks...")
asyncio.create_task(asyncio.to_thread(_load_whisper_model))
while not processing_state["whisper_ready"]:
await asyncio.sleep(2)
add_log("Starting repository scan...")
files = list_repo_files(repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN)
movies = sorted(list(set(f.split("/")[1] for f in files if f.startswith(HOOKS_FOLDER + "/") and f.endswith(".json"))))
add_log(f"Found {len(movies)} movies to process")
for movie in movies:
processing_state["current_file"] = movie
add_log(f"--- Processing Movie: {movie} ---")
video_path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=f"{movie}.mkv", repo_type="dataset", token=HF_TOKEN)
movie_hooks = sorted([f for f in files if f.startswith(f"{HOOKS_FOLDER}/{movie}/") and f.endswith(".json")])
add_log(f"Found {len(movie_hooks)} segments for {movie}")
temp_dir = tempfile.mkdtemp()
for hook_file in movie_hooks:
await asyncio.sleep(0.1)
hook_path = hf_hub_download(repo_id=HF_DATASET_REPO, filename=hook_file, repo_type="dataset", token=HF_TOKEN)
with open(hook_path, 'r') as f:
data = json.load(f)
num, start, end = data.get("segment_number", 1), data.get("start_time", "00:00:00"), data.get("end_time", "00:00:10")
out_name = f"segment-{num:02d}.mp4"
out_path = os.path.join(temp_dir, out_name)
add_log(f"Processing Segment {num} ({start} to {end})")
success = await asyncio.to_thread(process_video_sync, video_path, out_path, start, end)
if success:
upload_file(path_or_fileobj=out_path, path_in_repo=f"{READY_VIDEOS_FOLDER}/{movie}/{out_name}", repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN)
add_log(f"✓ Segment {num} uploaded successfully")
else:
add_log(f"✗ Segment {num} failed")
shutil.rmtree(temp_dir)
processing_state["processed_files"].append(movie)
processing_state["total_processed"] += 1
add_log(f"Finished movie: {movie}")
except Exception as e:
add_log(f"CRITICAL ERROR: {e}")
processing_state["last_error"] = str(e)
finally:
processing_state["is_running"] = False
add_log("Background worker idle.")
@app.on_event("startup")
async def startup_event():
# Only kick off the main loop, which now handles the 5s delay and model loading
asyncio.create_task(run_processing_loop())
@app.get("/")
@app.get("/status")
async def status():
return processing_state
if __name__ == "__main__":
add_log("Starting Video Processing Service on port 7860...")
uvicorn.run(app, host="0.0.0.0", port=7860)