Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import random | |
| import time | |
| import logging | |
| from typing import Optional, List, Dict | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Configuraci贸n inicial para HF Spaces | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["GRADIO_ANALYTICS_ENABLED"] = "false" | |
| os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" | |
| # Configuraci贸n de logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| try: | |
| import requests | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip | |
| from moviepy.audio.fx.all import audio_loop | |
| import edge_tts | |
| import gradio as gr | |
| import numpy as np | |
| from transformers import pipeline | |
| import backoff | |
| from pydub import AudioSegment | |
| except ImportError as e: | |
| logger.error(f"Error importing dependencies: {e}") | |
| raise | |
| # Constantes configurables | |
| MAX_VIDEOS = 3 | |
| VIDEO_SEGMENT_DURATION = 5 | |
| MAX_RETRIES = 3 | |
| REQUEST_TIMEOUT = 15 | |
| # Voces disponibles en Edge TTS (espa帽ol) | |
| VOICES = { | |
| "Femenino MX": "es-MX-DaliaNeural", | |
| "Masculino MX": "es-MX-JorgeNeural", | |
| "Femenino ES": "es-ES-ElviraNeural", | |
| "Masculino ES": "es-ES-AlvaroNeural", | |
| "Femenino CO": "es-CO-SalomeNeural", | |
| "Masculino CO": "es-CO-GonzaloNeural", | |
| "Femenino AR": "es-AR-ElenaNeural", | |
| "Masculino AR": "es-AR-TomasNeural" | |
| } | |
| # Configuraci贸n de modelos | |
| MODEL_NAME = "facebook/mbart-large-50" | |
| PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "") | |
| def safe_download(url: str, timeout: int = REQUEST_TIMEOUT) -> Optional[str]: | |
| """Descarga segura con reintentos""" | |
| try: | |
| response = requests.get(url, stream=True, timeout=timeout) | |
| response.raise_for_status() | |
| filename = f"temp_{random.randint(1000,9999)}.mp4" | |
| with open(filename, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return filename | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 429: | |
| retry_after = int(e.response.headers.get('Retry-After', 5)) | |
| logger.warning(f"Rate limited. Waiting {retry_after} seconds...") | |
| time.sleep(retry_after) | |
| logger.error(f"Download failed: {str(e)}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected download error: {str(e)}") | |
| return None | |
| def process_music(music_path: str, target_duration: float) -> str: | |
| """Procesa m煤sica para loop y duraci贸n correcta""" | |
| processed_path = "processed_music.mp3" | |
| try: | |
| audio = AudioSegment.from_file(music_path) | |
| # Crear loop si es m谩s corto que el video | |
| if len(audio) < target_duration * 1000: | |
| loops_needed = int(target_duration * 1000 / len(audio)) + 1 | |
| audio = audio * loops_needed | |
| # Recortar a la duraci贸n exacta | |
| audio = audio[:int(target_duration * 1000)] | |
| audio.export(processed_path, format="mp3") | |
| return processed_path | |
| except Exception as e: | |
| logger.error(f"Error processing music: {str(e)}") | |
| return music_path # Fallback al original | |
| def download_video_segment(url: str, duration: float, output_path: str) -> bool: | |
| """Descarga y procesa un segmento de video""" | |
| temp_path = None | |
| try: | |
| temp_path = safe_download(url) | |
| if not temp_path: | |
| return False | |
| with VideoFileClip(temp_path) as clip: | |
| if clip.duration < 1: | |
| logger.error("Video demasiado corto") | |
| return False | |
| end_time = min(duration, clip.duration - 0.1) | |
| subclip = clip.subclip(0, end_time) | |
| subclip.write_videofile( | |
| output_path, | |
| codec="libx264", | |
| audio_codec="aac", | |
| threads=2, | |
| preset='ultrafast', | |
| verbose=False, | |
| ffmpeg_params=['-max_muxing_queue_size', '1024'] | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Video processing error: {str(e)}") | |
| return False | |
| finally: | |
| if temp_path and os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| def fetch_pexels_videos(query: str) -> List[str]: | |
| """Busca videos en Pexels""" | |
| if not PEXELS_API_KEY: | |
| logger.error("PEXELS_API_KEY no configurada") | |
| return [] | |
| headers = {"Authorization": PEXELS_API_KEY} | |
| url = f"https://api.pexels.com/videos/search?query={query}&per_page={MAX_VIDEOS}" | |
| try: | |
| response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) | |
| response.raise_for_status() | |
| videos = [] | |
| for video in response.json().get("videos", [])[:MAX_VIDEOS]: | |
| video_files = [vf for vf in video.get("video_files", []) | |
| if vf.get("width", 0) >= 720] | |
| if video_files: | |
| best_file = max(video_files, key=lambda x: x.get("width", 0)) | |
| videos.append(best_file["link"]) | |
| return videos | |
| except Exception as e: | |
| logger.error(f"Error fetching Pexels videos: {str(e)}") | |
| return [] | |
| def generate_script(prompt: str, custom_script: Optional[str] = None) -> str: | |
| """Genera un script usando IA o custom text""" | |
| if custom_script and custom_script.strip(): | |
| return custom_script.strip() | |
| try: | |
| generator = pipeline("text-generation", model=MODEL_NAME) | |
| result = generator( | |
| f"Genera un guion breve sobre {prompt} en espa帽ol con {MAX_VIDEOS} puntos:", | |
| max_length=200, | |
| num_return_sequences=1 | |
| )[0]['generated_text'] | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error generating script: {str(e)}") | |
| return f"1. Punto uno sobre {prompt}\n2. Punto dos\n3. Punto tres" | |
| async def generate_voice(text: str, voice_id: str, output_file: str = "voice.mp3") -> bool: | |
| """Genera narraci贸n de voz""" | |
| try: | |
| communicate = edge_tts.Communicate(text, voice=voice_id) | |
| await communicate.save(output_file) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Voice generation failed: {str(e)}") | |
| return False | |
| def run_async(coro): | |
| """Ejecuta corrutinas as铆ncronas""" | |
| import asyncio | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(coro) | |
| finally: | |
| loop.close() | |
| def create_video( | |
| prompt: str, | |
| custom_script: Optional[str] = None, | |
| voice_choice: str = "es-MX-DaliaNeural", | |
| music_file: Optional[str] = None | |
| ) -> Optional[str]: | |
| """Funci贸n principal para crear el video""" | |
| try: | |
| # 1. Generar contenido | |
| script = generate_script(prompt, custom_script) | |
| logger.info(f"Script generado: {script[:100]}...") | |
| # 2. Buscar videos | |
| video_urls = fetch_pexels_videos(prompt) | |
| if not video_urls: | |
| logger.error("No se encontraron videos") | |
| return None | |
| # 3. Generar voz | |
| voice_file = "voice.mp3" | |
| if not run_async(generate_voice(script, voice_choice, voice_file)): | |
| logger.error("No se pudo generar voz") | |
| return None | |
| # 4. Procesar m煤sica si existe | |
| music_path = None | |
| if music_file: | |
| audio_clip = AudioFileClip(voice_file) | |
| target_duration = audio_clip.duration | |
| audio_clip.close() | |
| music_path = process_music(music_file.name, target_duration) | |
| # 5. Procesar videos | |
| output_dir = "output" | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_path = os.path.join(output_dir, f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4") | |
| clips = [] | |
| segment_duration = VIDEO_SEGMENT_DURATION | |
| for i, url in enumerate(video_urls): | |
| clip_path = f"segment_{i}.mp4" | |
| if download_video_segment(url, segment_duration, clip_path): | |
| clips.append(VideoFileClip(clip_path)) | |
| if not clips: | |
| logger.error("No se pudieron procesar los videos") | |
| return None | |
| # 6. Ensamblar video final | |
| final_video = concatenate_videoclips(clips, method="compose") | |
| voice_audio = AudioFileClip(voice_file) | |
| if music_path: | |
| music_audio = AudioFileClip(music_path) | |
| final_audio = CompositeAudioClip([voice_audio, music_audio.volumex(0.3)]) | |
| else: | |
| final_audio = voice_audio | |
| final_video = final_video.set_audio(final_audio) | |
| final_video.write_videofile( | |
| output_path, | |
| codec="libx264", | |
| audio_codec="aac", | |
| threads=2, | |
| preset='ultrafast', | |
| verbose=False | |
| ) | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error creating video: {str(e)}") | |
| return None | |
| finally: | |
| # Limpieza | |
| for clip in clips: | |
| clip.close() | |
| if os.path.exists(voice_file): | |
| os.remove(voice_file) | |
| if music_path and os.path.exists(music_path): | |
| os.remove(music_path) | |
| for i in range(len(video_urls)): | |
| if os.path.exists(f"segment_{i}.mp4"): | |
| os.remove(f"segment_{i}.mp4") | |
| # Interfaz Gradio completa | |
| with gr.Blocks(title="Generador de Videos Avanzado", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# 馃幀 Generador de Videos con IA") | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt_input = gr.Textbox( | |
| label="Tema del video", | |
| placeholder="Ej: Lugares tur铆sticos de Argentina", | |
| max_lines=2 | |
| ) | |
| custom_script_input = gr.TextArea( | |
| label="Guion personalizado (opcional)", | |
| placeholder="Pega aqu铆 tu propio guion si lo tienes...", | |
| lines=5 | |
| ) | |
| voice_dropdown = gr.Dropdown( | |
| label="Selecciona una voz", | |
| choices=list(VOICES.keys()), | |
| value="Femenino MX" | |
| ) | |
| music_input = gr.File( | |
| label="M煤sica de fondo (opcional)", | |
| type="file", | |
| file_types=["audio"] | |
| ) | |
| generate_btn = gr.Button("Generar Video", variant="primary") | |
| with gr.Column(): | |
| output_video = gr.Video( | |
| label="Video Resultante", | |
| interactive=False, | |
| format="mp4" | |
| ) | |
| generate_btn.click( | |
| fn=create_video, | |
| inputs=[ | |
| prompt_input, | |
| custom_script_input, | |
| gr.Dropdown(value="es-MX-DaliaNeural", visible=False), # Valor real de voz | |
| music_input | |
| ], | |
| outputs=output_video | |
| ) | |
| # Actualizar el valor de voz real cuando cambia el dropdown | |
| voice_dropdown.change( | |
| lambda x: VOICES[x], | |
| inputs=voice_dropdown, | |
| outputs=gr.Dropdown(visible=False) | |
| ) | |
| # Para Hugging Face Spaces | |
| if __name__ == "__main__": | |
| app.launch(server_name="0.0.0.0", server_port=7860) |