Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import uuid | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| import aiohttp | |
| import requests | |
| from config import Config | |
| from models.schemas import ( | |
| SceneInput, RenderConfig, VideoStatus, Scene, Caption | |
| ) | |
| from video_creator.libraries.tts_client import TTSClient | |
| from video_creator.libraries.whisper_client import WhisperClient | |
| from video_creator.libraries.pexels_client import PexelsClient | |
| from video_creator.libraries.ffmpeg_utils import FFmpegUtils | |
| from video_creator.libraries.video_composer import VideoComposer | |
| from video_creator.music_manager import MusicManager | |
| logger = logging.getLogger(__name__) | |
| class ShortCreator: | |
| """Main video creation orchestrator""" | |
| def __init__( | |
| self, | |
| config: Config, | |
| tts_client: TTSClient, | |
| whisper_client: WhisperClient, | |
| pexels_client: PexelsClient, | |
| music_manager: MusicManager | |
| ): | |
| self.config = config | |
| self.tts = tts_client | |
| self.whisper = whisper_client | |
| self.pexels = pexels_client | |
| self.music_manager = music_manager | |
| self.queue: List[Dict] = [] | |
| self.processing = False | |
| def add_to_queue(self, scenes: List[SceneInput], config: RenderConfig) -> str: | |
| """ | |
| Add video to processing queue | |
| Returns: | |
| video_id for tracking | |
| """ | |
| video_id = str(uuid.uuid4()).replace('-', '')[:24] # Similar to cuid | |
| self.queue.append({ | |
| "id": video_id, | |
| "scenes": scenes, | |
| "config": config | |
| }) | |
| logger.info(f"Added video {video_id} to queue. Queue length: {len(self.queue)}") | |
| # Start processing if not already running | |
| if not self.processing: | |
| asyncio.create_task(self.process_queue()) | |
| return video_id | |
| async def process_queue(self): | |
| """Process videos in the queue""" | |
| if self.processing: | |
| return | |
| self.processing = True | |
| try: | |
| while self.queue: | |
| item = self.queue[0] | |
| video_id = item["id"] | |
| logger.info(f"Processing video {video_id}") | |
| try: | |
| # Run video creation in a background thread to keep API responsive | |
| # This allows status checks while video is being processed | |
| await asyncio.to_thread( | |
| self._create_short_sync, | |
| video_id, | |
| item["scenes"], | |
| item["config"] | |
| ) | |
| logger.info(f"Successfully created video {video_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to create video {video_id}: {e}", exc_info=True) | |
| # Mark as failed by creating a .failed marker file | |
| failed_marker = self.config.videos_dir_path / f"{video_id}.failed" | |
| failed_marker.write_text(str(e)) | |
| finally: | |
| self.queue.pop(0) | |
| finally: | |
| self.processing = False | |
| def _create_short_sync( | |
| self, | |
| video_id: str, | |
| input_scenes: List[SceneInput], | |
| config: RenderConfig | |
| ): | |
| """Synchronous wrapper for create_short - runs in a separate thread""" | |
| # Create a new event loop for this thread | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| loop.run_until_complete(self.create_short(video_id, input_scenes, config)) | |
| finally: | |
| loop.close() | |
| async def create_short( | |
| self, | |
| video_id: str, | |
| input_scenes: List[SceneInput], | |
| config: RenderConfig | |
| ): | |
| """Create the short video""" | |
| scenes = [] | |
| total_duration = 0 | |
| exclude_video_ids = [] | |
| temp_files = [] | |
| orientation = config.orientation.value | |
| # Process each scene | |
| for i, scene_input in enumerate(input_scenes): | |
| logger.debug(f"Processing scene {i + 1}/{len(input_scenes)}") | |
| # Generate TTS audio | |
| audio_data, tts_duration = await self.tts.generate( | |
| scene_input.text, | |
| config.voice.value | |
| ) | |
| # Save audio files | |
| temp_id = str(uuid.uuid4()).replace('-', '')[:12] | |
| wav_path = self.config.temp_dir_path / f"{temp_id}.wav" | |
| mp3_path = self.config.temp_dir_path / f"{temp_id}.mp3" | |
| video_path = self.config.temp_dir_path / f"{temp_id}.mp4" | |
| temp_files.extend([wav_path, mp3_path, video_path]) | |
| # Save and convert audio | |
| FFmpegUtils.save_audio_as_wav(audio_data, wav_path) | |
| FFmpegUtils.save_audio_as_mp3(audio_data, mp3_path) | |
| # Get ACTUAL audio duration from WAV file (TTS estimate is often wrong!) | |
| audio_duration = FFmpegUtils.get_video_duration(wav_path) | |
| logger.info(f"Scene {i+1}: TTS reported {tts_duration:.2f}s, actual WAV duration: {audio_duration:.2f}s") | |
| # Add padding to last scene | |
| if i + 1 == len(input_scenes) and config.paddingBack: | |
| audio_duration += config.paddingBack / 1000 | |
| # Generate captions | |
| captions = self.whisper.create_captions(str(wav_path)) | |
| # Find and download background video(s) | |
| video_paths = [] | |
| # Simplified Scene Construction: One Video Per Scene | |
| # User Request: "Remove restrictions. One video per scene equal to audio." | |
| # User Request: "Video must be 9:16. Use image if needed." | |
| # Force portrait for 9:16 | |
| orientation = "portrait" | |
| keywords = scene_input.searchTerms | |
| if not keywords: | |
| keywords = ["general"] | |
| # Handle both string and list inputs for searchTerms | |
| # If it's a string, use it directly; if list, use first item | |
| if isinstance(keywords, str): | |
| keyword = keywords # Use the whole string | |
| elif isinstance(keywords, list) and len(keywords) > 0: | |
| keyword = keywords[0] if isinstance(keywords[0], str) else str(keywords[0]) | |
| else: | |
| keyword = "general" | |
| logger.debug(f"Using search keyword: '{keyword}' from searchTerms: {keywords}") | |
| # Try to find a video that is at least as long as the audio | |
| search_duration = max(audio_duration, 5.0) | |
| video_found = False | |
| video_path = None | |
| temp_vid_id = str(uuid.uuid4()).replace('-', '')[:12] | |
| try: | |
| # 1. Try Video Search | |
| pexels_video = self.pexels.find_video( | |
| keyword, | |
| search_duration, | |
| exclude_video_ids, | |
| orientation | |
| ) | |
| video_path = self.config.temp_dir_path / f"{temp_vid_id}.mp4" | |
| temp_files.append(video_path) | |
| # Download video | |
| logger.debug(f"Downloading video for '{keyword}' (Target: {audio_duration:.2f}s)") | |
| response = requests.get(pexels_video["url"], stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(video_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| # Verify file size | |
| if video_path.stat().st_size < 1024: | |
| logger.warning(f"Downloaded video {video_path} is too small") | |
| raise Exception("Downloaded video is invalid") | |
| # Normalize video | |
| norm_path = video_path.with_suffix(".norm.mp4") | |
| FFmpegUtils.normalize_video(video_path, norm_path) | |
| video_path.unlink() | |
| norm_path.rename(video_path) | |
| video_found = True | |
| exclude_video_ids.append(pexels_video["id"]) | |
| except Exception as e: | |
| logger.warning(f"Video search/download failed for '{keyword}': {e}. Trying photo fallback.") | |
| video_found = False | |
| # 2. Photo Fallback | |
| if not video_found: | |
| try: | |
| logger.info(f"Attempting photo fallback for '{keyword}'") | |
| pexels_photo = self.pexels.find_photo(keyword, orientation) | |
| if pexels_photo: | |
| # Download photo | |
| photo_path = self.config.temp_dir_path / f"{temp_vid_id}.jpg" | |
| temp_files.append(photo_path) | |
| response = requests.get(pexels_photo["url"], stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(photo_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| # Convert photo to video | |
| video_path = self.config.temp_dir_path / f"{temp_vid_id}_img.mp4" | |
| temp_files.append(video_path) | |
| FFmpegUtils.image_to_video(photo_path, video_path, audio_duration) | |
| video_found = True | |
| logger.info(f"Created video from photo {pexels_photo['id']}") | |
| except Exception as e: | |
| logger.error(f"Photo fallback failed: {e}") | |
| if not video_found or not video_path or not video_path.exists(): | |
| raise Exception(f"Failed to find any visual content for '{keyword}'") | |
| # Get actual duration (whether video or image-video) | |
| vid_duration = FFmpegUtils.get_video_duration(video_path) | |
| # Determine cut duration | |
| take_duration = min(vid_duration, audio_duration) | |
| logger.info(f"Using {take_duration:.2f}s of content for scene (Audio: {audio_duration:.2f}s)") | |
| # Physically cut/trim to ensure exact match | |
| final_clip_path = self.config.temp_dir_path / f"{temp_vid_id}_cut.mp4" | |
| temp_files.append(final_clip_path) | |
| FFmpegUtils.cut_video(video_path, final_clip_path, 0, take_duration) | |
| # Verify actual cut duration | |
| actual_cut_dur = FFmpegUtils.get_video_duration(final_clip_path) | |
| video_paths.append({ | |
| "path": str(final_clip_path), | |
| "duration": actual_cut_dur, | |
| "keyword": keyword | |
| }) | |
| # Build scene dict | |
| scenes.append({ | |
| "captions": [c.dict() for c in captions], | |
| "video": video_paths, | |
| "audio": { | |
| "url": str(mp3_path), | |
| "duration": audio_duration | |
| } | |
| }) | |
| total_duration += audio_duration | |
| # Add padding to total duration | |
| if config.paddingBack: | |
| total_duration += config.paddingBack / 1000 | |
| # Select background music | |
| music_mood = config.music.value if config.music else None | |
| selected_music = self.music_manager.find_music(music_mood) | |
| logger.info(f"Selected music: {selected_music['filename']} (mood: {selected_music['mood']})") | |
| # Render final video | |
| output_path = self.config.videos_dir_path / f"{video_id}.mp4" | |
| # Use a temp path for atomic write to prevent premature "ready" status | |
| temp_output_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4" | |
| try: | |
| VideoComposer.render( | |
| scenes=scenes, | |
| music_path=selected_music["path"], | |
| output_path=temp_output_path, | |
| orientation=orientation, | |
| caption_position=config.captionPosition.value, | |
| caption_bg_color=config.captionBackgroundColor, | |
| music_volume=config.musicVolume.value, | |
| padding_back=config.paddingBack | |
| ) | |
| # Atomic rename to final path | |
| if temp_output_path.exists(): | |
| temp_output_path.rename(output_path) | |
| logger.info(f"Video {video_id} created successfully at {output_path}") | |
| else: | |
| raise Exception("Rendered file not found at temp path") | |
| except Exception as e: | |
| # Cleanup temp file on failure | |
| if temp_output_path.exists(): | |
| temp_output_path.unlink() | |
| raise e | |
| # Cleanup temp files | |
| for temp_file in temp_files: | |
| if temp_file.exists(): | |
| temp_file.unlink() | |
| def get_status(self, video_id: str) -> VideoStatus: | |
| """Get video processing status""" | |
| # Check if in queue (waiting or being processed) | |
| if any(item["id"] == video_id for item in self.queue): | |
| return VideoStatus.processing | |
| # Check if final video exists (READY) | |
| video_path = self.config.videos_dir_path / f"{video_id}.mp4" | |
| if video_path.exists(): | |
| return VideoStatus.ready | |
| # Check if temp file exists (still rendering = PROCESSING) | |
| temp_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4" | |
| if temp_path.exists(): | |
| return VideoStatus.processing | |
| # Check if failed marker exists | |
| failed_marker = self.config.videos_dir_path / f"{video_id}.failed" | |
| if failed_marker.exists(): | |
| return VideoStatus.failed | |
| # If processing flag is active but video not found, it might be in early stages | |
| if self.processing: | |
| return VideoStatus.processing | |
| # Video not found at all | |
| return VideoStatus.failed | |
| def get_video_path(self, video_id: str) -> Path: | |
| """Get path to video file""" | |
| return self.config.videos_dir_path / f"{video_id}.mp4" | |
| def delete_video(self, video_id: str): | |
| """Delete video file""" | |
| video_path = self.get_video_path(video_id) | |
| if video_path.exists(): | |
| video_path.unlink() | |
| logger.info(f"Deleted video {video_id}") | |
| def list_all_videos(self) -> List[Dict]: | |
| """List all videos with their status""" | |
| videos = [] | |
| # Get all MP4 files (exclude temp files) | |
| for video_file in self.config.videos_dir_path.glob("*.mp4"): | |
| # Skip temp files (*.tmp.mp4) | |
| if ".tmp." in video_file.name: | |
| continue | |
| video_id = video_file.stem | |
| videos.append({ | |
| "id": video_id, | |
| "status": self.get_status(video_id).value | |
| }) | |
| # Add videos in queue | |
| for item in self.queue: | |
| if not any(v["id"] == item["id"] for v in videos): | |
| videos.append({ | |
| "id": item["id"], | |
| "status": VideoStatus.processing.value | |
| }) | |
| return videos | |
| def get_available_voices(self) -> List[str]: | |
| """Get list of available TTS voices""" | |
| return TTSClient.list_available_voices() | |
| def _plan_segments(self, duration: float) -> List[float]: | |
| """ | |
| Deterministic segmentation algorithm (Even Split Strategy): | |
| - Segments between 2-5 seconds | |
| - Avoid 1-second clips | |
| - Sum exactly equals duration | |
| - Distribute duration evenly to maximize segment length | |
| """ | |
| if duration <= 5.0: | |
| return [duration] | |
| # Calculate optimal number of segments | |
| # We want segments as close to 5.0 as possible, but >= 2.0 | |
| num_segments = int(duration / 5.0) | |
| if duration % 5.0 > 0: | |
| num_segments += 1 | |
| segment_duration = duration / num_segments | |
| # Create list of equal segments | |
| segments = [segment_duration] * num_segments | |
| # Handle floating point precision errors | |
| current_sum = sum(segments) | |
| diff = duration - current_sum | |
| if abs(diff) > 0.0001: | |
| segments[-1] += diff | |
| return segments | |
| def get_available_music_tags(self) -> List[str]: | |
| """Get list of available music moods""" | |
| return self.music_manager.get_available_moods() | |