nca-toolkit / video_creator /short_creator.py
ismdrobiul489's picture
Fix: run video processing in background thread for responsive API
c5147f6
import asyncio
import logging
import uuid
from pathlib import Path
from typing import List, Dict, Optional
import aiohttp
import requests
from config import Config
from models.schemas import (
SceneInput, RenderConfig, VideoStatus, Scene, Caption
)
from video_creator.libraries.tts_client import TTSClient
from video_creator.libraries.whisper_client import WhisperClient
from video_creator.libraries.pexels_client import PexelsClient
from video_creator.libraries.ffmpeg_utils import FFmpegUtils
from video_creator.libraries.video_composer import VideoComposer
from video_creator.music_manager import MusicManager
logger = logging.getLogger(__name__)
class ShortCreator:
"""Main video creation orchestrator"""
def __init__(
self,
config: Config,
tts_client: TTSClient,
whisper_client: WhisperClient,
pexels_client: PexelsClient,
music_manager: MusicManager
):
self.config = config
self.tts = tts_client
self.whisper = whisper_client
self.pexels = pexels_client
self.music_manager = music_manager
self.queue: List[Dict] = []
self.processing = False
def add_to_queue(self, scenes: List[SceneInput], config: RenderConfig) -> str:
"""
Add video to processing queue
Returns:
video_id for tracking
"""
video_id = str(uuid.uuid4()).replace('-', '')[:24] # Similar to cuid
self.queue.append({
"id": video_id,
"scenes": scenes,
"config": config
})
logger.info(f"Added video {video_id} to queue. Queue length: {len(self.queue)}")
# Start processing if not already running
if not self.processing:
asyncio.create_task(self.process_queue())
return video_id
async def process_queue(self):
"""Process videos in the queue"""
if self.processing:
return
self.processing = True
try:
while self.queue:
item = self.queue[0]
video_id = item["id"]
logger.info(f"Processing video {video_id}")
try:
# Run video creation in a background thread to keep API responsive
# This allows status checks while video is being processed
await asyncio.to_thread(
self._create_short_sync,
video_id,
item["scenes"],
item["config"]
)
logger.info(f"Successfully created video {video_id}")
except Exception as e:
logger.error(f"Failed to create video {video_id}: {e}", exc_info=True)
# Mark as failed by creating a .failed marker file
failed_marker = self.config.videos_dir_path / f"{video_id}.failed"
failed_marker.write_text(str(e))
finally:
self.queue.pop(0)
finally:
self.processing = False
def _create_short_sync(
self,
video_id: str,
input_scenes: List[SceneInput],
config: RenderConfig
):
"""Synchronous wrapper for create_short - runs in a separate thread"""
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.create_short(video_id, input_scenes, config))
finally:
loop.close()
async def create_short(
self,
video_id: str,
input_scenes: List[SceneInput],
config: RenderConfig
):
"""Create the short video"""
scenes = []
total_duration = 0
exclude_video_ids = []
temp_files = []
orientation = config.orientation.value
# Process each scene
for i, scene_input in enumerate(input_scenes):
logger.debug(f"Processing scene {i + 1}/{len(input_scenes)}")
# Generate TTS audio
audio_data, tts_duration = await self.tts.generate(
scene_input.text,
config.voice.value
)
# Save audio files
temp_id = str(uuid.uuid4()).replace('-', '')[:12]
wav_path = self.config.temp_dir_path / f"{temp_id}.wav"
mp3_path = self.config.temp_dir_path / f"{temp_id}.mp3"
video_path = self.config.temp_dir_path / f"{temp_id}.mp4"
temp_files.extend([wav_path, mp3_path, video_path])
# Save and convert audio
FFmpegUtils.save_audio_as_wav(audio_data, wav_path)
FFmpegUtils.save_audio_as_mp3(audio_data, mp3_path)
# Get ACTUAL audio duration from WAV file (TTS estimate is often wrong!)
audio_duration = FFmpegUtils.get_video_duration(wav_path)
logger.info(f"Scene {i+1}: TTS reported {tts_duration:.2f}s, actual WAV duration: {audio_duration:.2f}s")
# Add padding to last scene
if i + 1 == len(input_scenes) and config.paddingBack:
audio_duration += config.paddingBack / 1000
# Generate captions
captions = self.whisper.create_captions(str(wav_path))
# Find and download background video(s)
video_paths = []
# Simplified Scene Construction: One Video Per Scene
# User Request: "Remove restrictions. One video per scene equal to audio."
# User Request: "Video must be 9:16. Use image if needed."
# Force portrait for 9:16
orientation = "portrait"
keywords = scene_input.searchTerms
if not keywords:
keywords = ["general"]
# Handle both string and list inputs for searchTerms
# If it's a string, use it directly; if list, use first item
if isinstance(keywords, str):
keyword = keywords # Use the whole string
elif isinstance(keywords, list) and len(keywords) > 0:
keyword = keywords[0] if isinstance(keywords[0], str) else str(keywords[0])
else:
keyword = "general"
logger.debug(f"Using search keyword: '{keyword}' from searchTerms: {keywords}")
# Try to find a video that is at least as long as the audio
search_duration = max(audio_duration, 5.0)
video_found = False
video_path = None
temp_vid_id = str(uuid.uuid4()).replace('-', '')[:12]
try:
# 1. Try Video Search
pexels_video = self.pexels.find_video(
keyword,
search_duration,
exclude_video_ids,
orientation
)
video_path = self.config.temp_dir_path / f"{temp_vid_id}.mp4"
temp_files.append(video_path)
# Download video
logger.debug(f"Downloading video for '{keyword}' (Target: {audio_duration:.2f}s)")
response = requests.get(pexels_video["url"], stream=True, timeout=30)
response.raise_for_status()
with open(video_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify file size
if video_path.stat().st_size < 1024:
logger.warning(f"Downloaded video {video_path} is too small")
raise Exception("Downloaded video is invalid")
# Normalize video
norm_path = video_path.with_suffix(".norm.mp4")
FFmpegUtils.normalize_video(video_path, norm_path)
video_path.unlink()
norm_path.rename(video_path)
video_found = True
exclude_video_ids.append(pexels_video["id"])
except Exception as e:
logger.warning(f"Video search/download failed for '{keyword}': {e}. Trying photo fallback.")
video_found = False
# 2. Photo Fallback
if not video_found:
try:
logger.info(f"Attempting photo fallback for '{keyword}'")
pexels_photo = self.pexels.find_photo(keyword, orientation)
if pexels_photo:
# Download photo
photo_path = self.config.temp_dir_path / f"{temp_vid_id}.jpg"
temp_files.append(photo_path)
response = requests.get(pexels_photo["url"], stream=True, timeout=30)
response.raise_for_status()
with open(photo_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Convert photo to video
video_path = self.config.temp_dir_path / f"{temp_vid_id}_img.mp4"
temp_files.append(video_path)
FFmpegUtils.image_to_video(photo_path, video_path, audio_duration)
video_found = True
logger.info(f"Created video from photo {pexels_photo['id']}")
except Exception as e:
logger.error(f"Photo fallback failed: {e}")
if not video_found or not video_path or not video_path.exists():
raise Exception(f"Failed to find any visual content for '{keyword}'")
# Get actual duration (whether video or image-video)
vid_duration = FFmpegUtils.get_video_duration(video_path)
# Determine cut duration
take_duration = min(vid_duration, audio_duration)
logger.info(f"Using {take_duration:.2f}s of content for scene (Audio: {audio_duration:.2f}s)")
# Physically cut/trim to ensure exact match
final_clip_path = self.config.temp_dir_path / f"{temp_vid_id}_cut.mp4"
temp_files.append(final_clip_path)
FFmpegUtils.cut_video(video_path, final_clip_path, 0, take_duration)
# Verify actual cut duration
actual_cut_dur = FFmpegUtils.get_video_duration(final_clip_path)
video_paths.append({
"path": str(final_clip_path),
"duration": actual_cut_dur,
"keyword": keyword
})
# Build scene dict
scenes.append({
"captions": [c.dict() for c in captions],
"video": video_paths,
"audio": {
"url": str(mp3_path),
"duration": audio_duration
}
})
total_duration += audio_duration
# Add padding to total duration
if config.paddingBack:
total_duration += config.paddingBack / 1000
# Select background music
music_mood = config.music.value if config.music else None
selected_music = self.music_manager.find_music(music_mood)
logger.info(f"Selected music: {selected_music['filename']} (mood: {selected_music['mood']})")
# Render final video
output_path = self.config.videos_dir_path / f"{video_id}.mp4"
# Use a temp path for atomic write to prevent premature "ready" status
temp_output_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4"
try:
VideoComposer.render(
scenes=scenes,
music_path=selected_music["path"],
output_path=temp_output_path,
orientation=orientation,
caption_position=config.captionPosition.value,
caption_bg_color=config.captionBackgroundColor,
music_volume=config.musicVolume.value,
padding_back=config.paddingBack
)
# Atomic rename to final path
if temp_output_path.exists():
temp_output_path.rename(output_path)
logger.info(f"Video {video_id} created successfully at {output_path}")
else:
raise Exception("Rendered file not found at temp path")
except Exception as e:
# Cleanup temp file on failure
if temp_output_path.exists():
temp_output_path.unlink()
raise e
# Cleanup temp files
for temp_file in temp_files:
if temp_file.exists():
temp_file.unlink()
def get_status(self, video_id: str) -> VideoStatus:
"""Get video processing status"""
# Check if in queue (waiting or being processed)
if any(item["id"] == video_id for item in self.queue):
return VideoStatus.processing
# Check if final video exists (READY)
video_path = self.config.videos_dir_path / f"{video_id}.mp4"
if video_path.exists():
return VideoStatus.ready
# Check if temp file exists (still rendering = PROCESSING)
temp_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4"
if temp_path.exists():
return VideoStatus.processing
# Check if failed marker exists
failed_marker = self.config.videos_dir_path / f"{video_id}.failed"
if failed_marker.exists():
return VideoStatus.failed
# If processing flag is active but video not found, it might be in early stages
if self.processing:
return VideoStatus.processing
# Video not found at all
return VideoStatus.failed
def get_video_path(self, video_id: str) -> Path:
"""Get path to video file"""
return self.config.videos_dir_path / f"{video_id}.mp4"
def delete_video(self, video_id: str):
"""Delete video file"""
video_path = self.get_video_path(video_id)
if video_path.exists():
video_path.unlink()
logger.info(f"Deleted video {video_id}")
def list_all_videos(self) -> List[Dict]:
"""List all videos with their status"""
videos = []
# Get all MP4 files (exclude temp files)
for video_file in self.config.videos_dir_path.glob("*.mp4"):
# Skip temp files (*.tmp.mp4)
if ".tmp." in video_file.name:
continue
video_id = video_file.stem
videos.append({
"id": video_id,
"status": self.get_status(video_id).value
})
# Add videos in queue
for item in self.queue:
if not any(v["id"] == item["id"] for v in videos):
videos.append({
"id": item["id"],
"status": VideoStatus.processing.value
})
return videos
def get_available_voices(self) -> List[str]:
"""Get list of available TTS voices"""
return TTSClient.list_available_voices()
def _plan_segments(self, duration: float) -> List[float]:
"""
Deterministic segmentation algorithm (Even Split Strategy):
- Segments between 2-5 seconds
- Avoid 1-second clips
- Sum exactly equals duration
- Distribute duration evenly to maximize segment length
"""
if duration <= 5.0:
return [duration]
# Calculate optimal number of segments
# We want segments as close to 5.0 as possible, but >= 2.0
num_segments = int(duration / 5.0)
if duration % 5.0 > 0:
num_segments += 1
segment_duration = duration / num_segments
# Create list of equal segments
segments = [segment_duration] * num_segments
# Handle floating point precision errors
current_sum = sum(segments)
diff = duration - current_sum
if abs(diff) > 0.0001:
segments[-1] += diff
return segments
def get_available_music_tags(self) -> List[str]:
"""Get list of available music moods"""
return self.music_manager.get_available_moods()