|
|
import streamlit as st |
|
|
import os |
|
|
import tempfile |
|
|
import time |
|
|
import concurrent.futures |
|
|
from functools import partial |
|
|
import torch |
|
|
import hashlib |
|
|
from PIL import Image, ImageDraw |
|
|
import gc |
|
|
|
|
|
from transcriber import AudioTranscriber |
|
|
from prompt_generator import PromptGenerator |
|
|
from image_generator import ImageGenerator |
|
|
from animator import Animator |
|
|
from video_creator import VideoCreator |
|
|
|
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Audio to Video Converter", |
|
|
page_icon="π¬", |
|
|
layout="wide" |
|
|
) |
|
|
|
|
|
|
|
|
os.makedirs("temp", exist_ok=True) |
|
|
os.makedirs("outputs", exist_ok=True) |
|
|
os.makedirs("cache", exist_ok=True) |
|
|
|
|
|
|
|
|
st.markdown(""" |
|
|
<div style="text-align: center; background-color: #f0f2f6; padding: 20px; border-radius: 10px; margin-bottom: 20px;"> |
|
|
<h1 style="color: #1E88E5;">π¬ Audio to Video Converter</h1> |
|
|
<p style="font-size: 18px;">Transform your audio into engaging videos with AI-powered visuals</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
st.markdown(""" |
|
|
### How it works: |
|
|
1. π€ **Upload your audio** - We accept WAV, MP3, and OGG formats |
|
|
2. π€ **AI transcribes your audio** - Using advanced speech recognition |
|
|
3. πΌοΈ **Generate images from transcription** - AI creates visuals matching your content |
|
|
4. β¨ **Add animations** - Bring images to life with smooth transitions |
|
|
5. π **Synchronize with audio** - Perfectly timed to match your speech |
|
|
6. π₯ **Download your video** - Ready to share on social media |
|
|
""") |
|
|
|
|
|
|
|
|
@st.cache_resource |
|
|
def get_transcriber(): |
|
|
return AudioTranscriber() |
|
|
|
|
|
@st.cache_resource |
|
|
def get_prompt_generator(): |
|
|
return PromptGenerator() |
|
|
|
|
|
@st.cache_resource |
|
|
def get_image_generator(): |
|
|
return ImageGenerator() |
|
|
|
|
|
@st.cache_resource |
|
|
def get_animator(): |
|
|
return Animator() |
|
|
|
|
|
@st.cache_resource |
|
|
def get_video_creator(): |
|
|
return VideoCreator() |
|
|
|
|
|
|
|
|
class ResultCache: |
|
|
def __init__(self): |
|
|
self.cache_dir = "cache" |
|
|
os.makedirs(self.cache_dir, exist_ok=True) |
|
|
|
|
|
def get_cache_path(self, key, extension=".pkl"): |
|
|
|
|
|
hash_obj = hashlib.md5(key.encode()) |
|
|
return os.path.join(self.cache_dir, f"{hash_obj.hexdigest()}{extension}") |
|
|
|
|
|
def exists(self, key, extension=".pkl"): |
|
|
cache_path = self.get_cache_path(key, extension) |
|
|
return os.path.exists(cache_path) |
|
|
|
|
|
def save(self, key, data, extension=".pkl"): |
|
|
import pickle |
|
|
cache_path = self.get_cache_path(key, extension) |
|
|
with open(cache_path, 'wb') as f: |
|
|
pickle.dump(data, f) |
|
|
return cache_path |
|
|
|
|
|
def load(self, key, extension=".pkl"): |
|
|
import pickle |
|
|
cache_path = self.get_cache_path(key, extension) |
|
|
if os.path.exists(cache_path): |
|
|
with open(cache_path, 'rb') as f: |
|
|
return pickle.load(f) |
|
|
return None |
|
|
|
|
|
def clear(self): |
|
|
import shutil |
|
|
for file in os.listdir(self.cache_dir): |
|
|
file_path = os.path.join(self.cache_dir, file) |
|
|
if os.path.isfile(file_path): |
|
|
os.unlink(file_path) |
|
|
elif os.path.isdir(file_path): |
|
|
shutil.rmtree(file_path) |
|
|
|
|
|
|
|
|
result_cache = ResultCache() |
|
|
|
|
|
|
|
|
def process_audio_segment(segment, transcriber): |
|
|
"""Process a single audio segment in parallel""" |
|
|
try: |
|
|
return transcriber.transcribe_segment(segment) |
|
|
except Exception as e: |
|
|
st.warning(f"Error transcribing segment: {str(e)}. Using empty transcription.") |
|
|
return "" |
|
|
|
|
|
def generate_prompt_for_segment(transcription, prompt_generator, aspect_ratio="16:9"): |
|
|
"""Generate a prompt for a single transcription in parallel""" |
|
|
try: |
|
|
return prompt_generator.generate_optimized_prompt(transcription, aspect_ratio) |
|
|
except Exception as e: |
|
|
st.warning(f"Error generating prompt: {str(e)}. Using fallback prompt.") |
|
|
return f"{transcription}, visual scene, detailed, vibrant, cinematic" |
|
|
|
|
|
def generate_image_for_prompt(prompt, image_generator): |
|
|
"""Generate an image for a single prompt in parallel""" |
|
|
try: |
|
|
|
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
|
|
image_path = image_generator.generate_image(prompt) |
|
|
|
|
|
|
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
|
|
return image_path |
|
|
except Exception as e: |
|
|
st.warning(f"Error generating image: {str(e)}. Using fallback image.") |
|
|
|
|
|
from PIL import Image, ImageDraw |
|
|
img = Image.new('RGB', image_generator.target_size, color=(240, 240, 240)) |
|
|
draw = ImageDraw.Draw(img) |
|
|
draw.text((10, 10), prompt[:50], fill=(0, 0, 0)) |
|
|
path = f"temp/fallback_{int(time.time() * 1000)}.png" |
|
|
img.save(path) |
|
|
return path |
|
|
|
|
|
def animate_image(image_path, animator, animation_type="random", num_frames=15): |
|
|
"""Animate a single image in parallel""" |
|
|
try: |
|
|
return animator.animate_single_image(image_path, animation_type, num_frames=num_frames) |
|
|
except Exception as e: |
|
|
st.warning(f"Error animating image: {str(e)}. Using static frames.") |
|
|
|
|
|
frames = [] |
|
|
for i in range(10): |
|
|
frames.append(image_path) |
|
|
return frames |
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
with st.sidebar: |
|
|
st.markdown("## βοΈ Settings") |
|
|
|
|
|
|
|
|
st.markdown("### πΉ Video Format") |
|
|
with st.expander("Aspect Ratio", expanded=True): |
|
|
aspect_ratio = st.radio( |
|
|
"Select video format", |
|
|
options=["16:9 (Landscape)", "1:1 (Square)", "9:16 (Portrait)"], |
|
|
index=0, |
|
|
help="Choose the aspect ratio for your video" |
|
|
) |
|
|
|
|
|
|
|
|
aspect_ratio_map = { |
|
|
"16:9 (Landscape)": "16:9", |
|
|
"1:1 (Square)": "1:1", |
|
|
"9:16 (Portrait)": "9:16" |
|
|
} |
|
|
selected_aspect_ratio = aspect_ratio_map[aspect_ratio] |
|
|
|
|
|
|
|
|
col1, col2 = st.columns([1, 2]) |
|
|
with col1: |
|
|
st.markdown("Preview:") |
|
|
with col2: |
|
|
if selected_aspect_ratio == "16:9": |
|
|
st.markdown('<div style="background-color: #ddd; width: 160px; height: 90px; border-radius: 5px;"></div>', unsafe_allow_html=True) |
|
|
elif selected_aspect_ratio == "1:1": |
|
|
st.markdown('<div style="background-color: #ddd; width: 120px; height: 120px; border-radius: 5px;"></div>', unsafe_allow_html=True) |
|
|
elif selected_aspect_ratio == "9:16": |
|
|
st.markdown('<div style="background-color: #ddd; width: 90px; height: 160px; border-radius: 5px;"></div>', unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
st.markdown("### π Performance") |
|
|
with st.expander("Processing Options", expanded=True): |
|
|
parallel_processing = st.toggle("Enable parallel processing", value=True, |
|
|
help="Process multiple tasks simultaneously for faster results") |
|
|
max_workers = st.slider("Max parallel workers", min_value=2, max_value=8, value=4, |
|
|
help="Number of simultaneous tasks (higher values may use more memory)") |
|
|
use_caching = st.toggle("Enable result caching", value=True, |
|
|
help="Save results to speed up repeated conversions") |
|
|
|
|
|
|
|
|
memory_optimization = st.toggle("Enable memory optimization", value=True, |
|
|
help="Reduce memory usage (recommended for Hugging Face Spaces)") |
|
|
|
|
|
|
|
|
vram_optimization = st.toggle("Enable VRAM optimization", value=True, |
|
|
help="Use techniques to reduce VRAM usage on GPU (highly recommended for Hugging Face)") |
|
|
|
|
|
|
|
|
st.markdown("### π¨ Content") |
|
|
with st.expander("Segmentation", expanded=True): |
|
|
|
|
|
max_segment_duration = st.slider( |
|
|
"Maximum image duration (seconds)", |
|
|
min_value=3.0, |
|
|
max_value=5.0, |
|
|
value=4.0, |
|
|
step=0.5, |
|
|
help="Each image will stay on screen between 3-5 seconds for optimal results" |
|
|
) |
|
|
|
|
|
|
|
|
st.info("More images will be created to ensure each stays under the maximum duration") |
|
|
|
|
|
num_segments = st.slider("Minimum number of segments", min_value=2, max_value=20, value=5, |
|
|
help="Minimum number of scenes to create in your video") |
|
|
|
|
|
animation_type = st.selectbox( |
|
|
"Animation style", |
|
|
["random", "zoom", "pan_right", "pan_left", "fade_in", "ken_burns"], |
|
|
help="Choose how images will animate in your video" |
|
|
) |
|
|
|
|
|
|
|
|
frames_per_animation = st.slider( |
|
|
"Animation smoothness", |
|
|
min_value=10, |
|
|
max_value=20, |
|
|
value=15, |
|
|
help="Higher values create smoother animations but may increase processing time" |
|
|
) |
|
|
|
|
|
|
|
|
st.markdown("### π§ Advanced") |
|
|
with st.expander("Image Settings"): |
|
|
|
|
|
image_size_option = st.radio( |
|
|
"Image Quality", |
|
|
options=["Low (256x256)", "Medium (384x384)", "High (512x512)"], |
|
|
index=1, |
|
|
help="Higher quality creates better images but takes longer" |
|
|
) |
|
|
|
|
|
|
|
|
image_size_map = { |
|
|
"Low (256x256)": (256, 256), |
|
|
"Medium (384x384)": (384, 384), |
|
|
"High (512x512)": (512, 512) |
|
|
} |
|
|
base_image_size = image_size_map[image_size_option] |
|
|
|
|
|
inference_steps = st.slider("Generation Detail", min_value=10, max_value=50, value=20, |
|
|
help="Higher values create more detailed images but take longer") |
|
|
|
|
|
with st.expander("Video Settings"): |
|
|
video_quality = st.radio( |
|
|
"Video Quality", |
|
|
options=["Low", "Medium", "High"], |
|
|
index=1, |
|
|
help="Higher quality creates larger files" |
|
|
) |
|
|
|
|
|
|
|
|
bitrate_map = { |
|
|
"Low": "800k", |
|
|
"Medium": "1200k", |
|
|
"High": "2000k" |
|
|
} |
|
|
bitrate = bitrate_map[video_quality] |
|
|
|
|
|
|
|
|
if st.button("π§Ή Clear Cache", help="Remove all cached results to free up disk space"): |
|
|
result_cache.clear() |
|
|
st.success("Cache cleared successfully!") |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
st.markdown("### π About") |
|
|
st.markdown(""" |
|
|
This app uses AI to convert audio to video. |
|
|
|
|
|
Optimized for Hugging Face Spaces with: |
|
|
- Multiple video formats (16:9, 1:1, 9:16) |
|
|
- Dynamic image timing (5 seconds or less) |
|
|
- Parallel processing |
|
|
- Memory-efficient models |
|
|
- Result caching |
|
|
- Batch processing |
|
|
""") |
|
|
|
|
|
|
|
|
|
|
|
st.markdown("### π Upload Your Audio") |
|
|
audio_file = st.file_uploader("Select an audio file (WAV, MP3, OGG)", type=["wav", "mp3", "ogg"]) |
|
|
|
|
|
if audio_file is not None: |
|
|
|
|
|
st.markdown("### π΅ Preview Your Audio") |
|
|
st.audio(audio_file) |
|
|
|
|
|
|
|
|
audio_bytes = audio_file.getvalue() |
|
|
settings_str = f"{num_segments}_{max_segment_duration}_{animation_type}_{frames_per_animation}_{base_image_size[0]}x{base_image_size[1]}_{inference_steps}_{video_quality}_{selected_aspect_ratio}_{memory_optimization}_{vram_optimization}" |
|
|
cache_key = hashlib.md5((hashlib.md5(audio_bytes).hexdigest() + settings_str).encode()).hexdigest() |
|
|
|
|
|
|
|
|
st.markdown("### π Process Your Audio") |
|
|
convert_col, time_col = st.columns([3, 1]) |
|
|
|
|
|
with convert_col: |
|
|
convert_button = st.button("π¬ Convert to Video", type="primary", use_container_width=True) |
|
|
|
|
|
with time_col: |
|
|
st.info("Processing time: ~1-3 minutes") |
|
|
|
|
|
|
|
|
if use_caching and result_cache.exists(cache_key, ".mp4") and convert_button: |
|
|
output_video = result_cache.get_cache_path(cache_key, ".mp4") |
|
|
st.success("β
Found cached result! Loading video...") |
|
|
|
|
|
|
|
|
st.markdown("### π₯ Your Video") |
|
|
st.video(output_video) |
|
|
|
|
|
with open(output_video, "rb") as file: |
|
|
st.download_button( |
|
|
label="π₯ Download Video", |
|
|
data=file, |
|
|
file_name=f"audio_to_video_{selected_aspect_ratio.replace(':', '_')}.mp4", |
|
|
mime="video/mp4", |
|
|
use_container_width=True |
|
|
) |
|
|
return |
|
|
|
|
|
if convert_button: |
|
|
|
|
|
progress_container = st.container() |
|
|
with progress_container: |
|
|
progress_bar = st.progress(0) |
|
|
status_text = st.empty() |
|
|
|
|
|
|
|
|
processing_col1, processing_col2 = st.columns([1, 3]) |
|
|
with processing_col1: |
|
|
st.markdown("### Processing:") |
|
|
with processing_col2: |
|
|
status_message = st.empty() |
|
|
|
|
|
try: |
|
|
|
|
|
if memory_optimization or vram_optimization: |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
|
|
|
|
|
apply_vram_optimization = vram_optimization |
|
|
|
|
|
|
|
|
if vram_optimization: |
|
|
|
|
|
if inference_steps > 25: |
|
|
inference_steps = 25 |
|
|
|
|
|
|
|
|
if base_image_size[0] > 512 or base_image_size[1] > 512: |
|
|
base_image_size = (512, 512) |
|
|
|
|
|
|
|
|
status_text.text("Initializing components...") |
|
|
status_message.markdown("π **Setting up AI models...**") |
|
|
transcriber = get_transcriber() |
|
|
prompt_generator = get_prompt_generator() |
|
|
image_generator = get_image_generator() |
|
|
animator = get_animator() |
|
|
video_creator = get_video_creator() |
|
|
|
|
|
|
|
|
image_generator.set_aspect_ratio(selected_aspect_ratio) |
|
|
animator.set_aspect_ratio(selected_aspect_ratio) |
|
|
video_creator.set_aspect_ratio(selected_aspect_ratio) |
|
|
|
|
|
|
|
|
if apply_vram_optimization: |
|
|
image_generator.set_vram_optimization(True) |
|
|
|
|
|
|
|
|
transcriber.set_max_segment_duration(max_segment_duration) |
|
|
video_creator.set_max_segment_duration(max_segment_duration) |
|
|
|
|
|
|
|
|
animator.set_frames_per_animation(frames_per_animation) |
|
|
|
|
|
|
|
|
actual_image_size = image_generator.get_size_for_aspect_ratio(base_image_size, selected_aspect_ratio) |
|
|
|
|
|
|
|
|
image_generator.set_inference_steps(inference_steps) |
|
|
image_generator.set_target_size(actual_image_size) |
|
|
|
|
|
progress_bar.progress(10) |
|
|
|
|
|
|
|
|
status_text.text("Segmenting audio...") |
|
|
status_message.markdown("π **Analyzing audio...**") |
|
|
try: |
|
|
audio_segments, timestamps = transcriber.segment_audio(audio_file, num_segments=num_segments) |
|
|
except Exception as e: |
|
|
st.warning(f"Error segmenting audio: {str(e)}. Using simplified segmentation.") |
|
|
|
|
|
import numpy as np |
|
|
segment_duration = 4.0 |
|
|
audio_segments = [np.zeros(int(16000 * segment_duration)) for _ in range(num_segments)] |
|
|
total_duration = segment_duration * num_segments |
|
|
timestamps = [(i*segment_duration, (i+1)*segment_duration) for i in range(num_segments)] |
|
|
|
|
|
progress_bar.progress(15) |
|
|
|
|
|
|
|
|
status_text.text("Transcribing audio segments...") |
|
|
status_message.markdown("π€ **Converting speech to text...**") |
|
|
if parallel_processing: |
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
|
|
process_func = partial(process_audio_segment, transcriber=transcriber) |
|
|
|
|
|
transcriptions = list(executor.map(process_func, audio_segments)) |
|
|
else: |
|
|
transcriptions = [] |
|
|
for segment in audio_segments: |
|
|
try: |
|
|
trans = transcriber.transcribe_segment(segment) |
|
|
transcriptions.append(trans) |
|
|
except Exception as e: |
|
|
st.warning(f"Error transcribing segment: {str(e)}. Using empty transcription.") |
|
|
transcriptions.append("") |
|
|
|
|
|
|
|
|
if memory_optimization or apply_vram_optimization: |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
|
|
|
|
|
progress_bar.progress(30) |
|
|
st.markdown("### π Transcriptions") |
|
|
for i, (trans, (start, end)) in enumerate(zip(transcriptions, timestamps)): |
|
|
st.markdown(f""" |
|
|
<div style="background-color: #f0f2f6; padding: 10px; border-radius: 5px; margin-bottom: 10px;"> |
|
|
<strong>Segment {i+1} ({start:.1f}s - {end:.1f}s):</strong> {trans} |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
status_text.text("Generating prompts from transcriptions...") |
|
|
status_message.markdown("βοΈ **Creating image descriptions...**") |
|
|
if parallel_processing: |
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
|
|
prompt_func = partial(generate_prompt_for_segment, |
|
|
prompt_generator=prompt_generator, |
|
|
aspect_ratio=selected_aspect_ratio) |
|
|
|
|
|
prompts = list(executor.map(prompt_func, transcriptions)) |
|
|
else: |
|
|
prompts = [] |
|
|
for trans in transcriptions: |
|
|
try: |
|
|
prompt = prompt_generator.generate_optimized_prompt(trans, selected_aspect_ratio) |
|
|
prompts.append(prompt) |
|
|
except Exception as e: |
|
|
st.warning(f"Error generating prompt: {str(e)}. Using fallback prompt.") |
|
|
prompts.append(f"{trans}, visual scene, detailed, vibrant, cinematic") |
|
|
|
|
|
|
|
|
progress_bar.progress(40) |
|
|
st.markdown("### ποΈ Generated Prompts") |
|
|
for i, prompt in enumerate(prompts): |
|
|
st.markdown(f""" |
|
|
<div style="background-color: #e8f4f8; padding: 10px; border-radius: 5px; margin-bottom: 10px;"> |
|
|
<strong>Prompt {i+1}:</strong> {prompt} |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
status_text.text("Generating images from prompts...") |
|
|
status_message.markdown("π¨ **Creating images...**") |
|
|
|
|
|
|
|
|
if memory_optimization or apply_vram_optimization: |
|
|
batch_size = 2 |
|
|
images = [] |
|
|
|
|
|
for i in range(0, len(prompts), batch_size): |
|
|
batch_prompts = prompts[i:i+batch_size] |
|
|
status_text.text(f"Generating images {i+1}-{min(i+batch_size, len(prompts))}/{len(prompts)}...") |
|
|
|
|
|
if parallel_processing and batch_size > 1: |
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=min(batch_size, max_workers)) as executor: |
|
|
|
|
|
image_func = partial(generate_image_for_prompt, image_generator=image_generator) |
|
|
|
|
|
batch_images = list(executor.map(image_func, batch_prompts)) |
|
|
else: |
|
|
batch_images = [] |
|
|
for prompt in batch_prompts: |
|
|
img_path = generate_image_for_prompt(prompt, image_generator) |
|
|
batch_images.append(img_path) |
|
|
|
|
|
images.extend(batch_images) |
|
|
|
|
|
|
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
else: |
|
|
|
|
|
if parallel_processing: |
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
|
|
image_func = partial(generate_image_for_prompt, image_generator=image_generator) |
|
|
|
|
|
images = list(executor.map(image_func, prompts)) |
|
|
else: |
|
|
images = [] |
|
|
for i, prompt in enumerate(prompts): |
|
|
status_text.text(f"Generating image {i+1}/{len(prompts)}...") |
|
|
img_path = generate_image_for_prompt(prompt, image_generator) |
|
|
images.append(img_path) |
|
|
|
|
|
|
|
|
progress_bar.progress(60) |
|
|
st.markdown("### πΌοΈ Generated Images") |
|
|
image_cols = st.columns(min(len(images), 3)) |
|
|
for i, img_path in enumerate(images): |
|
|
with image_cols[i % len(image_cols)]: |
|
|
try: |
|
|
|
|
|
if os.path.exists(img_path): |
|
|
|
|
|
from PIL import Image |
|
|
try: |
|
|
img = Image.open(img_path) |
|
|
|
|
|
if img.mode != "RGB": |
|
|
img = img.convert("RGB") |
|
|
|
|
|
safe_path = f"temp/safe_image_{int(time.time() * 1000)}_{i}.jpg" |
|
|
img.save(safe_path, format="JPEG", quality=95) |
|
|
|
|
|
st.image(safe_path, caption=f"Image {i+1}", use_container_width=True) |
|
|
except Exception as e: |
|
|
st.error(f"Error loading image {i+1}: {str(e)}") |
|
|
else: |
|
|
st.warning(f"Image {i+1} not found") |
|
|
except Exception as e: |
|
|
st.error(f"Error displaying image {i+1}: {str(e)}") |
|
|
|
|
|
|
|
|
if memory_optimization or apply_vram_optimization: |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
|
|
|
|
|
status_text.text("Adding animations to images...") |
|
|
status_message.markdown("β¨ **Adding animations...**") |
|
|
|
|
|
|
|
|
if memory_optimization or apply_vram_optimization: |
|
|
batch_size = 3 |
|
|
animated_frames = [] |
|
|
|
|
|
for i in range(0, len(images), batch_size): |
|
|
batch_images = images[i:i+batch_size] |
|
|
status_text.text(f"Animating images {i+1}-{min(i+batch_size, len(images))}/{len(images)}...") |
|
|
|
|
|
if parallel_processing and batch_size > 1: |
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=min(batch_size, max_workers)) as executor: |
|
|
|
|
|
animate_func = partial(animate_image, |
|
|
animator=animator, |
|
|
animation_type=animation_type, |
|
|
num_frames=frames_per_animation) |
|
|
|
|
|
batch_frames = list(executor.map(animate_func, batch_images)) |
|
|
else: |
|
|
batch_frames = [] |
|
|
for img_path in batch_images: |
|
|
frames = animate_image(img_path, animator, animation_type, frames_per_animation) |
|
|
batch_frames.append(frames) |
|
|
|
|
|
animated_frames.extend(batch_frames) |
|
|
|
|
|
|
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
else: |
|
|
|
|
|
if parallel_processing: |
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
|
|
animate_func = partial(animate_image, |
|
|
animator=animator, |
|
|
animation_type=animation_type, |
|
|
num_frames=frames_per_animation) |
|
|
|
|
|
animated_frames = list(executor.map(animate_func, images)) |
|
|
else: |
|
|
animated_frames = [] |
|
|
for i, img_path in enumerate(images): |
|
|
status_text.text(f"Animating image {i+1}/{len(images)}...") |
|
|
frames = animator.animate_single_image( |
|
|
img_path, |
|
|
animation_type, |
|
|
num_frames=frames_per_animation |
|
|
) |
|
|
animated_frames.append(frames) |
|
|
|
|
|
progress_bar.progress(80) |
|
|
|
|
|
|
|
|
if memory_optimization or apply_vram_optimization: |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
|
|
|
|
|
status_text.text("Creating final video...") |
|
|
status_message.markdown("π¬ **Assembling video...**") |
|
|
output_video = video_creator.create_video_from_frames( |
|
|
animated_frames, |
|
|
audio_file, |
|
|
segments=transcriptions, |
|
|
timestamps=timestamps, |
|
|
parallel=parallel_processing and not (memory_optimization or vram_optimization), |
|
|
max_workers=max_workers |
|
|
) |
|
|
|
|
|
|
|
|
if output_video.endswith('.txt'): |
|
|
with open(output_video, 'r') as f: |
|
|
error_message = f.read() |
|
|
st.error(f"Error creating video: {error_message}") |
|
|
st.stop() |
|
|
|
|
|
|
|
|
if video_quality != "High": |
|
|
status_text.text("Optimizing video for web...") |
|
|
status_message.markdown("βοΈ **Optimizing video...**") |
|
|
output_video = video_creator.optimize_video( |
|
|
output_video, |
|
|
bitrate=bitrate, |
|
|
threads=2 if memory_optimization or apply_vram_optimization else max_workers |
|
|
) |
|
|
|
|
|
|
|
|
if use_caching: |
|
|
import shutil |
|
|
cached_path = result_cache.get_cache_path(cache_key, ".mp4") |
|
|
shutil.copy(output_video, cached_path) |
|
|
|
|
|
progress_bar.progress(100) |
|
|
status_text.text("Video creation complete!") |
|
|
status_message.markdown("β
**Done!**") |
|
|
|
|
|
|
|
|
st.markdown("### π₯ Your Video") |
|
|
st.video(output_video) |
|
|
|
|
|
st.markdown("### π₯ Download") |
|
|
with open(output_video, "rb") as file: |
|
|
st.download_button( |
|
|
label="π₯ Download Video", |
|
|
data=file, |
|
|
file_name=f"audio_to_video_{selected_aspect_ratio.replace(':', '_')}.mp4", |
|
|
mime="video/mp4", |
|
|
use_container_width=True |
|
|
) |
|
|
|
|
|
|
|
|
st.markdown("### β±οΈ Performance Metrics") |
|
|
st.info(f""" |
|
|
- Video Format: {aspect_ratio} |
|
|
- Max Image Duration: {max_segment_duration} seconds |
|
|
- Number of Segments: {len(audio_segments)} |
|
|
- Parallel Processing: {'Enabled' if parallel_processing else 'Disabled'} |
|
|
- Memory Optimization: {'Enabled' if memory_optimization else 'Disabled'} |
|
|
- VRAM Optimization: {'Enabled' if apply_vram_optimization else 'Disabled'} |
|
|
- Workers: {max_workers} |
|
|
- Image Size: {actual_image_size[0]}x{actual_image_size[1]} |
|
|
- Inference Steps: {inference_steps} |
|
|
- Video Quality: {video_quality} |
|
|
""") |
|
|
|
|
|
|
|
|
status_text.text("Cleaning up temporary files...") |
|
|
for path in images + [p for frames in animated_frames for p in frames]: |
|
|
if os.path.exists(path): |
|
|
try: |
|
|
os.remove(path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
if memory_optimization or apply_vram_optimization: |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
|
|
status_text.text("All done! Your video is ready for download.") |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"An error occurred: {str(e)}") |
|
|
st.exception(e) |
|
|
|
|
|
|
|
|
st.markdown("### π§ Troubleshooting Tips") |
|
|
st.info(""" |
|
|
- Try enabling memory optimization |
|
|
- Use a smaller image size |
|
|
- Reduce inference steps |
|
|
- Reduce the number of segments |
|
|
- Make sure your audio file is in a supported format |
|
|
- Clear the cache and try again |
|
|
""") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|