segment / server.py
factorstudios's picture
Update server.py
de435e7 verified
raw
history blame
18.8 kB
#!/usr/bin/env python3
import os
import json
import re
import asyncio
import tempfile
import subprocess
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from typing import List, Dict, Optional
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import uvicorn
try:
from huggingface_hub import list_repo_files, hf_hub_download, upload_file
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
except ImportError as e:
print(f"Missing dependency: {e}")
exit(1)
# Load environment variables
load_dotenv()
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
print("Error: Missing HF_TOKEN in .env")
exit(1)
app = FastAPI(title="Video Processing Service")
# Global state
processing_state = {
"is_running": False,
"total_processed": 0,
"current_file": None,
"error_count": 0,
"last_error": None,
"processed_files": []
}
HF_DATASET_REPO = "factorstudios/movs"
HOOKS_FOLDER = "hooks"
READY_VIDEOS_FOLDER = "ready_videos"
TRANSCRIPTION_FOLDER = "transcriptions"
def timestamp_to_seconds(timestamp: str) -> float:
"""Convert HH:MM:SS to seconds."""
try:
parts = timestamp.split(":")
hours = int(parts[0])
minutes = int(parts[1])
seconds = int(parts[2])
return hours * 3600 + minutes * 60 + seconds
except Exception as e:
print(f"Error converting timestamp {timestamp}: {e}")
return 0.0
def extract_captions_for_segment(transcript_content: str, start_time: str, end_time: str) -> List[tuple]:
"""Extract captions from transcript that fall within segment timeframe.
Returns list of (relative_seconds, text) tuples."""
captions = []
start_seconds = timestamp_to_seconds(start_time)
end_seconds = timestamp_to_seconds(end_time)
lines = transcript_content.strip().split('\n')
for line in lines:
match = re.match(r'\[(\d{2}):(\d{2}):(\d{2})\]\s+(.*)', line)
if match:
h, m, s, text = match.groups()
line_seconds = int(h) * 3600 + int(m) * 60 + int(s)
if start_seconds <= line_seconds <= end_seconds:
relative_time = line_seconds - start_seconds
captions.append((relative_time, text.strip()))
return captions
def apply_color_grading_wedding_retro(frame: np.ndarray) -> np.ndarray:
"""Apply cinematic wedding LUT + retro style with high sharpening."""
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
l_channel, a_channel, b_channel = cv2.split(lab)
# 1. VINTAGE/RETRO EFFECT: warm tones
a_channel = cv2.add(a_channel, 5)
b_channel = cv2.add(b_channel, 8)
# 2. WEDDING LOOK: soft highlights via CLAHE
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
l_channel = clahe.apply(l_channel)
lab_enhanced = cv2.merge([l_channel, a_channel, b_channel])
frame = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR)
# 3. SATURATION BOOST
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV).astype(np.float32)
hsv[:, :, 1] = np.clip(hsv[:, :, 1] * 1.3, 0, 255)
frame = cv2.cvtColor(hsv.astype(np.uint8), cv2.COLOR_HSV2BGR)
# 4. CONTRAST ENHANCEMENT
frame = cv2.convertScaleAbs(frame, alpha=1.15, beta=10)
# 5. HIGH SHARPENING
kernel = np.array([[-1, -1, -1],
[-1, 9, -1],
[-1, -1, -1]]) / 1.2
sharpened = cv2.filter2D(frame, -1, kernel)
frame = cv2.addWeighted(frame, 0.4, sharpened, 0.6, 0)
# 6. SLIGHT VIGNETTE
rows, cols = frame.shape[:2]
X_kernel = cv2.getGaussianKernel(cols, cols / 2)
Y_kernel = cv2.getGaussianKernel(rows, rows / 2)
mask = (Y_kernel * X_kernel.T)
mask = (mask / mask.max()) ** 0.4
for i in range(3):
frame[:, :, i] = frame[:, :, i] * mask
return np.clip(frame, 0, 255).astype(np.uint8)
def burn_captions_to_frame(frame: np.ndarray, text: str, font_size: int = 32) -> np.ndarray:
"""Burn caption text onto frame with semi-transparent background (centered)."""
height, width = frame.shape[:2]
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(frame_pil, 'RGBA')
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size)
except Exception:
font = ImageFont.load_default()
# Word-wrap text
max_width = width - 60
wrapped_lines = []
words = text.split()
current_line = []
for word in words:
test_line = ' '.join(current_line + [word])
bbox = draw.textbbox((0, 0), test_line, font=font)
if bbox[2] - bbox[0] > max_width:
if current_line:
wrapped_lines.append(' '.join(current_line))
current_line = [word]
else:
current_line.append(word)
if current_line:
wrapped_lines.append(' '.join(current_line))
# Background box dimensions
line_height = font_size + 10
text_height = len(wrapped_lines) * line_height + 20
bg_y_start = max(height // 2 - text_height // 2 - 10, 20)
bg_y_end = min(bg_y_start + text_height, height - 20)
overlay = Image.new('RGBA', frame_pil.size, (0, 0, 0, 0))
overlay_draw = ImageDraw.Draw(overlay, 'RGBA')
overlay_draw.rectangle(
[(20, bg_y_start), (width - 20, bg_y_end)],
fill=(0, 0, 0, 180)
)
frame_pil = Image.alpha_composite(frame_pil.convert('RGBA'), overlay).convert('RGB')
draw = ImageDraw.Draw(frame_pil)
y_position = bg_y_start + 10
for line in wrapped_lines:
bbox = draw.textbbox((0, 0), line, font=font)
line_width = bbox[2] - bbox[0]
x_position = (width - line_width) // 2
draw.text((x_position, y_position), line, font=font, fill=(255, 255, 255, 255))
y_position += line_height
return cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)
def process_video_segment(
video_path: str,
output_path: str,
start_time: str,
end_time: str,
captions: List[tuple],
target_width: int = 1080,
target_height: int = 1350
) -> bool:
"""Process video segment: crop, resize, color grade, burn captions, encode via FFmpeg."""
ffmpeg_proc = None
try:
print(f"Opening video: {video_path}")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video {video_path}")
return False
fps = cap.get(cv2.CAP_PROP_FPS)
original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
start_seconds = timestamp_to_seconds(start_time)
end_seconds = timestamp_to_seconds(end_time)
duration = end_seconds - start_seconds
print(f"Video info: {fps} fps, {original_width}x{original_height}")
print(f"Extracting segment: {start_time} to {end_time} ({duration:.1f}s)")
# Pipe frames into FFmpeg — proper H.264 with real compression
ffmpeg_cmd = [
"ffmpeg", "-y",
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-s", f"{target_width}x{target_height}",
"-pix_fmt", "bgr24",
"-r", str(fps),
"-i", "pipe:0",
"-vcodec", "libx264",
"-preset", "fast",
"-crf", "23", # 0=lossless, 51=worst; 23 is a solid default
"-pix_fmt", "yuv420p", # broad playback compatibility
"-movflags", "+faststart",
output_path
]
ffmpeg_proc = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Seek to start frame
start_frame = int(start_seconds * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
# Build caption lookup: frame_number -> text
caption_map = {}
for rel_time, caption_text in captions:
frame_num = int(rel_time * fps)
caption_map[frame_num] = caption_text
current_caption = ""
processed_frames = 0
target_frames = int(duration * fps)
print(f"Processing {target_frames} frames...")
while processed_frames < target_frames:
ret, frame = cap.read()
if not ret:
print(f"Warning: Could not read frame at position {processed_frames}")
break
# Crop to target aspect ratio
aspect_ratio = target_width / target_height
if original_width / original_height > aspect_ratio:
new_width = int(original_height * aspect_ratio)
x_offset = (original_width - new_width) // 2
frame = frame[:, x_offset:x_offset + new_width]
else:
new_height = int(original_width / aspect_ratio)
y_offset = (original_height - new_height) // 2
frame = frame[y_offset:y_offset + new_height, :]
frame = cv2.resize(frame, (target_width, target_height), interpolation=cv2.INTER_LANCZOS4)
frame = apply_color_grading_wedding_retro(frame)
if processed_frames in caption_map:
current_caption = caption_map[processed_frames]
if current_caption:
frame = burn_captions_to_frame(frame, current_caption)
ffmpeg_proc.stdin.write(frame.tobytes())
processed_frames += 1
if processed_frames % max(1, target_frames // 10) == 0:
progress = (processed_frames / target_frames) * 100
print(f"Progress: {progress:.1f}%")
ffmpeg_proc.stdin.close()
ffmpeg_proc.wait()
cap.release()
if ffmpeg_proc.returncode != 0:
print(f"✗ FFmpeg encoding failed with return code {ffmpeg_proc.returncode}")
return False
print(f"✓ Video segment saved: {output_path}")
return True
except Exception as e:
print(f"✗ Error processing video segment: {e}")
if ffmpeg_proc is not None:
try:
ffmpeg_proc.stdin.close()
except Exception:
pass
ffmpeg_proc.wait()
return False
async def process_movie_segments(movie_name: str) -> bool:
"""Process all segments for a movie."""
try:
processing_state["current_file"] = movie_name
print(f"\n{'='*80}")
print(f"Processing movie: {movie_name}")
print(f"{'='*80}")
# Download transcript
transcript_file = f"{TRANSCRIPTION_FOLDER}/{movie_name}.transcript.txt"
print(f"Downloading transcript: {transcript_file}")
try:
transcript_path = hf_hub_download(
repo_id=HF_DATASET_REPO,
filename=transcript_file,
repo_type="dataset",
token=HF_TOKEN,
cache_dir="/tmp/video_processor_cache"
)
with open(transcript_path, 'r', encoding='utf-8') as f:
transcript_content = f.read()
except Exception as e:
print(f"Warning: Could not download transcript: {e}")
transcript_content = ""
# Download original video
video_file = f"{movie_name}.mkv"
print(f"Downloading video: {video_file}")
try:
video_path = hf_hub_download(
repo_id=HF_DATASET_REPO,
filename=video_file,
repo_type="dataset",
token=HF_TOKEN,
cache_dir="/tmp/video_processor_cache"
)
if os.path.islink(video_path):
video_path = os.path.realpath(video_path)
except Exception as e:
print(f"Error: Could not download video: {e}")
return False
# List segment JSON files
hooks_folder = f"{HOOKS_FOLDER}/{movie_name}"
print(f"Listing segments from: {hooks_folder}")
files = list_repo_files(
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
segment_files = sorted([
f for f in files
if f.startswith(f"{hooks_folder}/") and f.endswith(".json")
])
if not segment_files:
print(f"No segment JSON files found for {movie_name}")
return False
print(f"Found {len(segment_files)} segments")
temp_dir = tempfile.mkdtemp()
try:
for segment_file in segment_files:
try:
segment_path = hf_hub_download(
repo_id=HF_DATASET_REPO,
filename=segment_file,
repo_type="dataset",
token=HF_TOKEN,
cache_dir="/tmp/video_processor_cache"
)
with open(segment_path, 'r', encoding='utf-8') as f:
segment_data = json.load(f)
segment_number = segment_data.get("segment_number", 1)
start_time = segment_data.get("start_time", "00:00:00")
end_time = segment_data.get("end_time", "00:10:00")
print(f"\nProcessing segment {segment_number}: {start_time} to {end_time}")
captions = extract_captions_for_segment(transcript_content, start_time, end_time)
print(f"Found {len(captions)} caption lines for this segment")
output_filename = f"segment-{segment_number:02d}.mp4"
output_path = os.path.join(temp_dir, output_filename)
success = process_video_segment(
video_path,
output_path,
start_time,
end_time,
captions
)
if not success:
print(f"Failed to process segment {segment_number}")
continue
upload_path = f"{READY_VIDEOS_FOLDER}/{movie_name}/{output_filename}"
print(f"Uploading to: {upload_path}")
upload_file(
path_or_fileobj=output_path,
path_in_repo=upload_path,
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Add processed video segment {segment_number} for {movie_name}"
)
print(f"✓ Segment {segment_number} uploaded successfully")
except Exception as e:
print(f"✗ Error processing segment: {e}")
processing_state["error_count"] += 1
continue
finally:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
processing_state["processed_files"].append(movie_name)
processing_state["total_processed"] += 1
print(f"\n✓ Successfully processed all segments for {movie_name}")
return True
except Exception as e:
processing_state["error_count"] += 1
processing_state["last_error"] = str(e)
print(f"✗ Error: {e}")
return False
async def scan_and_process_videos():
"""Scan hooks folder and process all movies."""
if processing_state["is_running"]:
print("Video processing already running, skipping...")
return
print("Waiting 3 minutes before starting video processing...")
await asyncio.sleep(180) # 3-minute startup delay
processing_state["is_running"] = True
print("\n" + "="*80)
print("STARTING VIDEO PROCESSING SERVICE")
print("="*80)
try:
files = list_repo_files(
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
movie_folders = set()
for f in files:
if f.startswith(f"{HOOKS_FOLDER}/") and f.endswith(".json"):
parts = f.split("/")
if len(parts) >= 2:
movie_folders.add(parts[1])
print(f"Found {len(movie_folders)} movies to process")
for movie_name in sorted(movie_folders):
await process_movie_segments(movie_name)
await asyncio.sleep(2)
print("\n" + "="*80)
print("VIDEO PROCESSING COMPLETE")
print(f"Processed: {processing_state['total_processed']}")
print(f"Errors: {processing_state['error_count']}")
print("="*80 + "\n")
except Exception as e:
print(f"Critical error: {e}")
processing_state["last_error"] = str(e)
finally:
processing_state["is_running"] = False
@app.on_event("startup")
async def startup_event():
"""Start video processing on server startup."""
asyncio.create_task(scan_and_process_videos())
@app.get("/")
async def health():
"""Health check endpoint."""
return JSONResponse({
"status": "running",
"service": "Video Processing Service",
"is_processing": processing_state["is_running"],
"total_processed": processing_state["total_processed"],
"error_count": processing_state["error_count"],
"current_file": processing_state["current_file"],
"last_error": processing_state["last_error"],
"processed_files": processing_state["processed_files"]
})
@app.get("/status")
async def get_status():
"""Get current processing status."""
return JSONResponse({
"is_running": processing_state["is_running"],
"total_processed": processing_state["total_processed"],
"error_count": processing_state["error_count"],
"current_file": processing_state["current_file"],
"last_error": processing_state["last_error"],
"processed_files": processing_state["processed_files"]
})
@app.post("/trigger-processing")
async def trigger_processing():
"""Manually trigger video processing (skips the startup delay)."""
if processing_state["is_running"]:
return JSONResponse({
"status": "already_running",
"message": "Video processing is already in progress"
})
asyncio.create_task(scan_and_process_videos())
return JSONResponse({
"status": "started",
"message": "Video processing scan started"
})
if __name__ == "__main__":
print("Starting Video Processing Service on port 7860...")
print("Processing will begin 3 minutes after startup")
uvicorn.run(app, host="0.0.0.0", port=7860)