segment / server.py
factorstudios's picture
Update server.py
6bf2a59 verified
raw
history blame
22.2 kB
#!/usr/bin/env python3
import os
import json
import re
import asyncio
import tempfile
import subprocess
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from typing import List, Dict, Optional, Tuple
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import uvicorn
try:
from huggingface_hub import list_repo_files, hf_hub_download, upload_file
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from faster_whisper import WhisperModel
except ImportError as e:
print(f"Missing dependency: {e}")
print("Install with: pip install faster-whisper")
exit(1)
# Load environment variables
load_dotenv()
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
print("Error: Missing HF_TOKEN in .env")
exit(1)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Load Whisper in background, then kick off video processing."""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _load_whisper_model)
asyncio.create_task(scan_and_process_videos())
yield
app = FastAPI(title="Video Processing Service", lifespan=lifespan)
# Global state
processing_state = {
"is_running": False,
"total_processed": 0,
"current_file": None,
"error_count": 0,
"last_error": None,
"processed_files": [],
"whisper_ready": False
}
# Whisper model — loaded async at startup, not at import time
whisper_model = None
HF_DATASET_REPO = "factorstudios/movs"
HOOKS_FOLDER = "hooks"
READY_VIDEOS_FOLDER = "ready_videos"
TRANSCRIPTION_FOLDER = "transcriptions"
def _load_whisper_model():
"""Blocking model load — runs in thread executor."""
global whisper_model
print("Loading Whisper small model...")
whisper_model = WhisperModel("small", device="auto", compute_type="int8")
processing_state["whisper_ready"] = True
print("✓ Whisper model loaded")
def timestamp_to_seconds(timestamp: str) -> float:
"""Convert HH:MM:SS to seconds."""
try:
parts = timestamp.split(":")
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
except Exception as e:
print(f"Error converting timestamp {timestamp}: {e}")
return 0.0
def extract_audio_segment(video_path: str, start_seconds: float, end_seconds: float, output_wav: str) -> bool:
"""Extract audio segment from video as WAV for Whisper."""
cmd = [
"ffmpeg", "-y",
"-ss", str(start_seconds),
"-to", str(end_seconds),
"-i", video_path,
"-vn",
"-acodec", "pcm_s16le",
"-ar", "16000",
"-ac", "1",
output_wav
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" ✗ FFmpeg audio extraction failed: {result.stderr}")
return False
if not os.path.exists(output_wav):
print(f" ✗ Output WAV file not created: {output_wav}")
return False
print(f" ✓ Audio extracted successfully")
return True
def transcribe_segment(audio_path: str) -> List[Tuple[float, float, str]]:
"""
Transcribe audio with Whisper small.
Returns list of (start_sec, end_sec, text) relative to segment start.
"""
print(" Transcribing audio with Whisper small...")
segments, info = whisper_model.transcribe(
audio_path,
beam_size=5,
language=None,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500)
)
captions = []
for seg in segments:
text = seg.text.strip()
if text:
captions.append((seg.start, seg.end, text))
print(f" [{seg.start:.1f}s → {seg.end:.1f}s] {text}")
print(f" ✓ Transcribed {len(captions)} caption segments")
return captions
def apply_color_grading_wedding_retro(frame: np.ndarray) -> np.ndarray:
"""Apply cinematic wedding LUT + retro style with high sharpening."""
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
l_channel, a_channel, b_channel = cv2.split(lab)
a_channel = cv2.add(a_channel, 5)
b_channel = cv2.add(b_channel, 8)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
l_channel = clahe.apply(l_channel)
lab_enhanced = cv2.merge([l_channel, a_channel, b_channel])
frame = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV).astype(np.float32)
hsv[:, :, 1] = np.clip(hsv[:, :, 1] * 1.3, 0, 255)
frame = cv2.cvtColor(hsv.astype(np.uint8), cv2.COLOR_HSV2BGR)
frame = cv2.convertScaleAbs(frame, alpha=1.15, beta=10)
kernel = np.array([[-1, -1, -1],
[-1, 9, -1],
[-1, -1, -1]]) / 1.2
sharpened = cv2.filter2D(frame, -1, kernel)
frame = cv2.addWeighted(frame, 0.4, sharpened, 0.6, 0)
rows, cols = frame.shape[:2]
X_kernel = cv2.getGaussianKernel(cols, cols / 2)
Y_kernel = cv2.getGaussianKernel(rows, rows / 2)
mask = (Y_kernel * X_kernel.T)
mask = (mask / mask.max()) ** 0.4
for i in range(3):
frame[:, :, i] = frame[:, :, i] * mask
return np.clip(frame, 0, 255).astype(np.uint8)
def burn_captions_to_frame(frame: np.ndarray, text: str, font_size: int = 36) -> np.ndarray:
"""Burn caption text onto frame — shadow only, no background, positioned near bottom."""
height, width = frame.shape[:2]
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).convert('RGBA')
overlay = Image.new('RGBA', frame_pil.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size)
except Exception:
font = ImageFont.load_default()
max_width = width - 80
wrapped_lines = []
words = text.split()
current_line = []
for word in words:
test_line = ' '.join(current_line + [word])
bbox = draw.textbbox((0, 0), test_line, font=font)
if bbox[2] - bbox[0] > max_width:
if current_line:
wrapped_lines.append(' '.join(current_line))
current_line = [word]
else:
current_line.append(word)
if current_line:
wrapped_lines.append(' '.join(current_line))
line_height = font_size + 12
total_text_height = len(wrapped_lines) * line_height
y_start = int(height * 0.80) - total_text_height // 2
shadow_offset = 3
for i, line in enumerate(wrapped_lines):
bbox = draw.textbbox((0, 0), line, font=font)
line_width = bbox[2] - bbox[0]
x = (width - line_width) // 2
y = y_start + i * line_height
draw.text((x + shadow_offset, y + shadow_offset), line, font=font, fill=(0, 0, 0, 200))
draw.text((x, y), line, font=font, fill=(255, 255, 255, 255))
frame_pil = Image.alpha_composite(frame_pil, overlay).convert('RGB')
return cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)
def build_frame_caption_map(captions: List[Tuple[float, float, str]], fps: float) -> Dict[int, str]:
"""Convert Whisper segments into a per-frame caption lookup."""
frame_map = {}
for start_sec, end_sec, text in captions:
start_frame = int(start_sec * fps)
end_frame = int(end_sec * fps)
for f in range(start_frame, end_frame + 1):
frame_map[f] = text
return frame_map
def process_video_segment(
video_path: str,
output_path: str,
start_time: str,
end_time: str,
target_width: int = 1080,
target_height: int = 1350
) -> bool:
"""
Full pipeline:
1. Extract audio segment → WAV
2. Transcribe with Whisper small
3. Process frames with color grading + caption burn-in
4. Mux processed video with original audio
"""
ffmpeg_video_proc = None
cap = None # Declared here so finally block can always release it
temp_wav = output_path.replace(".mp4", "_audio.wav")
temp_video_path = output_path.replace(".mp4", "_noaudio.mp4")
try:
print(f"Opening video: {video_path}")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video {video_path}")
return False
fps = cap.get(cv2.CAP_PROP_FPS)
original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
start_seconds = timestamp_to_seconds(start_time)
end_seconds = timestamp_to_seconds(end_time)
duration = end_seconds - start_seconds
print(f"Video info: {fps} fps, {original_width}x{original_height}")
print(f"Extracting segment: {start_time} to {end_time} ({duration:.1f}s)")
# ── Step 1: Extract audio → WAV ───────────────────────────────────────
print(" Extracting audio segment...")
audio_ok = extract_audio_segment(video_path, start_seconds, end_seconds, temp_wav)
# ── Step 2: Transcribe with Whisper ───────────────────────────────────
if audio_ok and whisper_model is not None:
captions = transcribe_segment(temp_wav)
else:
if not audio_ok:
print(" ✗ Skipping transcription: audio extraction failed")
elif whisper_model is None:
print(" ✗ Skipping transcription: Whisper model not ready")
captions = []
frame_caption_map = build_frame_caption_map(captions, fps)
# ── Step 3: Process frames → pipe to FFmpeg ───────────────────────────
ffmpeg_video_cmd = [
"ffmpeg", "-y",
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-s", f"{target_width}x{target_height}",
"-pix_fmt", "bgr24",
"-r", str(fps),
"-i", "pipe:0",
"-vcodec", "libx264",
"-preset", "fast",
"-crf", "23",
"-pix_fmt", "yuv420p",
temp_video_path
]
ffmpeg_video_proc = subprocess.Popen(
ffmpeg_video_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
start_frame = int(start_seconds * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
current_caption = ""
processed_frames = 0
target_frames = int(duration * fps)
print(f"Processing {target_frames} frames...")
while processed_frames < target_frames:
ret, frame = cap.read()
if not ret:
print(f"Warning: Could not read frame at position {processed_frames}")
break
aspect_ratio = target_width / target_height
if original_width / original_height > aspect_ratio:
new_width = int(original_height * aspect_ratio)
x_offset = (original_width - new_width) // 2
frame = frame[:, x_offset:x_offset + new_width]
else:
new_height = int(original_width / aspect_ratio)
y_offset = (original_height - new_height) // 2
frame = frame[y_offset:y_offset + new_height, :]
frame = cv2.resize(frame, (target_width, target_height), interpolation=cv2.INTER_LANCZOS4)
frame = apply_color_grading_wedding_retro(frame)
# Set caption for this frame (empty if none)
current_caption = frame_caption_map.get(processed_frames, "")
if current_caption:
frame = burn_captions_to_frame(frame, current_caption)
ffmpeg_video_proc.stdin.write(frame.tobytes())
processed_frames += 1
if processed_frames % max(1, target_frames // 10) == 0:
progress = (processed_frames / target_frames) * 100
print(f"Progress: {progress:.1f}%")
# Close stdin and wait for FFmpeg to finish encoding
ffmpeg_video_proc.stdin.close()
ffmpeg_video_proc.wait()
if ffmpeg_video_proc.returncode != 0:
print(f"✗ FFmpeg video encoding failed (code {ffmpeg_video_proc.returncode})")
return False
print("✓ Frames encoded, muxing audio...")
# ── Step 4: Mux processed video + original audio ──────────────────────
ffmpeg_mux_cmd = [
"ffmpeg", "-y",
"-i", temp_video_path,
"-ss", str(start_seconds),
"-to", str(end_seconds),
"-i", video_path,
"-map", "0:v:0",
"-map", "1:a:0",
"-c:v", "copy",
"-c:a", "aac",
"-b:a", "192k",
"-shortest",
"-movflags", "+faststart",
output_path
]
mux_result = subprocess.run(
ffmpeg_mux_cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
if mux_result.returncode != 0:
print(f"✗ FFmpeg audio mux failed (code {mux_result.returncode})")
return False
print(f"✓ Segment complete: {output_path}")
return True
except Exception as e:
print(f"✗ Error processing video segment: {e}")
if ffmpeg_video_proc is not None:
try:
ffmpeg_video_proc.stdin.close()
except Exception:
pass
ffmpeg_video_proc.wait()
return False
finally:
# Always release VideoCapture regardless of success or failure
if cap is not None:
cap.release()
# Always clean up temp files
for tmp in [temp_video_path, temp_wav]:
if tmp and os.path.exists(tmp):
try:
os.remove(tmp)
except Exception:
pass
async def process_movie_segments(movie_name: str) -> bool:
"""Process all segments for a movie."""
try:
processing_state["current_file"] = movie_name
print(f"\n{'='*80}")
print(f"Processing movie: {movie_name}")
print(f"{'='*80}")
video_file = f"{movie_name}.mkv"
print(f"Downloading video: {video_file}")
try:
video_path = hf_hub_download(
repo_id=HF_DATASET_REPO,
filename=video_file,
repo_type="dataset",
token=HF_TOKEN,
cache_dir="/tmp/video_processor_cache"
)
if os.path.islink(video_path):
video_path = os.path.realpath(video_path)
except Exception as e:
print(f"Error: Could not download video: {e}")
return False
hooks_folder = f"{HOOKS_FOLDER}/{movie_name}"
print(f"Listing segments from: {hooks_folder}")
files = list_repo_files(
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
segment_files = sorted([
f for f in files
if f.startswith(f"{hooks_folder}/") and f.endswith(".json")
])
if not segment_files:
print(f"No segment JSON files found for {movie_name}")
return False
print(f"Found {len(segment_files)} segments: {segment_files}")
temp_dir = tempfile.mkdtemp()
try:
for segment_file in segment_files:
print(f"\n── Processing file: {segment_file}")
try:
segment_path = hf_hub_download(
repo_id=HF_DATASET_REPO,
filename=segment_file,
repo_type="dataset",
token=HF_TOKEN,
cache_dir="/tmp/video_processor_cache"
)
with open(segment_path, 'r', encoding='utf-8') as f:
segment_data = json.load(f)
segment_number = segment_data.get("segment_number", 1)
start_time = segment_data.get("start_time", "00:00:00")
end_time = segment_data.get("end_time", "00:10:00")
print(f"Processing segment {segment_number}: {start_time} to {end_time}")
output_filename = f"segment-{segment_number:02d}.mp4"
output_path = os.path.join(temp_dir, output_filename)
success = process_video_segment(
video_path,
output_path,
start_time,
end_time
)
if not success:
print(f"✗ Failed to process segment {segment_number}, continuing to next...")
processing_state["error_count"] += 1
continue
upload_path = f"{READY_VIDEOS_FOLDER}/{movie_name}/{output_filename}"
print(f"Uploading to: {upload_path}")
upload_file(
path_or_fileobj=output_path,
path_in_repo=upload_path,
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Add processed video segment {segment_number} for {movie_name}"
)
print(f"✓ Segment {segment_number} uploaded successfully")
# Clean up the output file after successful upload
if os.path.exists(output_path):
try:
os.remove(output_path)
except Exception:
pass
except Exception as e:
print(f"✗ Error processing segment file {segment_file}: {e}")
processing_state["error_count"] += 1
continue
finally:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
processing_state["processed_files"].append(movie_name)
processing_state["total_processed"] += 1
print(f"\n✓ Successfully processed all segments for {movie_name}")
return True
except Exception as e:
processing_state["error_count"] += 1
processing_state["last_error"] = str(e)
print(f"✗ Error in process_movie_segments: {e}")
return False
async def scan_and_process_videos():
"""Scan hooks folder and process all movies."""
if processing_state["is_running"]:
print("Video processing already running, skipping...")
return
startup_delay = int(os.getenv("STARTUP_DELAY", 5))
print(f"Waiting {startup_delay} seconds before starting video processing...")
await asyncio.sleep(startup_delay)
processing_state["is_running"] = True
print("\n" + "="*80)
print("STARTING VIDEO PROCESSING SERVICE")
print("="*80)
try:
files = list_repo_files(
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
movie_folders = set()
for f in files:
if f.startswith(f"{HOOKS_FOLDER}/") and f.endswith(".json"):
parts = f.split("/")
if len(parts) >= 2:
movie_folders.add(parts[1])
print(f"Found {len(movie_folders)} movies to process: {sorted(movie_folders)}")
for movie_name in sorted(movie_folders):
await process_movie_segments(movie_name)
await asyncio.sleep(2)
print("\n" + "="*80)
print("VIDEO PROCESSING COMPLETE")
print(f"Processed: {processing_state['total_processed']}")
print(f"Errors: {processing_state['error_count']}")
print("="*80 + "\n")
except Exception as e:
print(f"Critical error in scan_and_process_videos: {e}")
processing_state["last_error"] = str(e)
finally:
processing_state["is_running"] = False
processing_state["current_file"] = None
@app.get("/")
async def health():
return JSONResponse({
"status": "running",
"service": "Video Processing Service",
"whisper_ready": processing_state["whisper_ready"],
"is_processing": processing_state["is_running"],
"total_processed": processing_state["total_processed"],
"error_count": processing_state["error_count"],
"current_file": processing_state["current_file"],
"last_error": processing_state["last_error"],
"processed_files": processing_state["processed_files"]
})
@app.get("/status")
async def get_status():
return JSONResponse({
"whisper_ready": processing_state["whisper_ready"],
"is_running": processing_state["is_running"],
"total_processed": processing_state["total_processed"],
"error_count": processing_state["error_count"],
"current_file": processing_state["current_file"],
"last_error": processing_state["last_error"],
"processed_files": processing_state["processed_files"]
})
@app.post("/trigger-processing")
async def trigger_processing():
if processing_state["is_running"]:
return JSONResponse({
"status": "already_running",
"message": "Video processing is already in progress"
})
if not processing_state["whisper_ready"]:
return JSONResponse({
"status": "not_ready",
"message": "Whisper model is still loading, try again shortly"
})
asyncio.create_task(scan_and_process_videos())
return JSONResponse({
"status": "started",
"message": "Video processing scan started"
})
if __name__ == "__main__":
print("Starting Video Processing Service on port 7860...")
print("Whisper will load at startup, processing begins after startup delay")
uvicorn.run(app, host="0.0.0.0", port=7860)