Spaces:
Sleeping
Sleeping
| # @title core/video_gen.py | |
| import os | |
| import json | |
| import httpx | |
| import asyncio | |
| from typing import Dict, List, Any, Optional | |
| from pathlib import Path | |
| from utils.logger import SpotMakerLogger | |
| class VideoGenerator: | |
| """ | |
| Generator video wykorzystuj膮cy Hailuo API. | |
| Generuje klipy video na podstawie opis贸w uj臋膰. | |
| """ | |
| def __init__(self, api_key: str, output_dir: str, logger: SpotMakerLogger): | |
| self.api_key = api_key | |
| self.logger = logger | |
| self.output_dir = Path(output_dir) | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| # Bazowe URL dla API | |
| self.create_url = "https://api.minimaxi.chat/v1/video_generation" | |
| self.status_url = "https://api.minimaxi.chat/v1/query/video_generation" | |
| self.model = "video-01" | |
| # Status dla ka偶dego klipu | |
| self.clips_status: Dict[str, Dict[str, Any]] = {} | |
| async def generate_clips(self, scenes: List[Dict[str, str]]) -> List[str]: | |
| """ | |
| Generuje seri臋 klip贸w video na podstawie opis贸w scen. | |
| Args: | |
| scenes: Lista s艂ownik贸w z opisami scen | |
| Returns: | |
| Lista 艣cie偶ek do wygenerowanych plik贸w video | |
| """ | |
| try: | |
| self.logger.log("video_gen", f"Rozpoczynam generowanie {len(scenes)} klip贸w", "info") | |
| self.logger.update_progress("video_gen", 0, "Inicjalizacja...") | |
| # Lista do przechowywania task_id dla ka偶dego klipu | |
| tasks = [] | |
| # Uruchom generowanie wszystkich klip贸w r贸wnolegle | |
| for i, scene in enumerate(scenes, 1): | |
| self.logger.log("video_gen", f"Inicjuj臋 generowanie klipu {i}/5", "info") | |
| task = asyncio.create_task(self._generate_single_clip(scene['description'], i)) | |
| tasks.append(task) | |
| # Czekaj na uko艅czenie wszystkich zada艅 | |
| clip_paths = await asyncio.gather(*tasks) | |
| self.logger.update_progress("video_gen", 100, "Zako艅czono generowanie wszystkich klip贸w") | |
| return clip_paths | |
| except Exception as e: | |
| self.logger.format_error("video_gen", e) | |
| raise | |
| async def _generate_single_clip(self, description: str, clip_index: int) -> str: | |
| """Generuje pojedynczy klip video.""" | |
| try: | |
| # Inicjacja generowania | |
| task_id = await self._initiate_generation(description, clip_index) | |
| if not task_id: | |
| raise Exception(f"Nie uda艂o si臋 zainicjowa膰 generowania klipu {clip_index}") | |
| # Monitorowanie post臋pu | |
| file_id = await self._monitor_generation(task_id, clip_index) | |
| if not file_id: | |
| raise Exception(f"B艂膮d podczas generowania klipu {clip_index}") | |
| # Pobranie wygenerowanego pliku | |
| output_path = await self._download_clip(file_id, clip_index) | |
| return output_path | |
| except Exception as e: | |
| self.logger.format_error("video_gen", e) | |
| raise | |
| async def _initiate_generation(self, description: str, clip_index: int) -> str: | |
| """Inicjuje generowanie video w API.""" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {self.api_key}" | |
| } | |
| data = { | |
| "model": self.model, | |
| "prompt": description, | |
| "prompt_optimizer": True | |
| } | |
| self.logger.log("video_gen", | |
| f"Wysy艂am request do API dla klipu {clip_index}", | |
| "debug") | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| self.create_url, | |
| headers=headers, | |
| json=data, | |
| timeout=30.0 | |
| ) | |
| response_data = response.json() | |
| if response.status_code != 200: | |
| error_msg = f"API error: {response.status_code} - {response.text}" | |
| self.logger.log("video_gen", error_msg, "error") | |
| raise Exception(error_msg) | |
| task_id = response_data.get('task_id') | |
| if not task_id: | |
| raise Exception("Brak task_id w odpowiedzi") | |
| # Aktualizuj status | |
| clip_progress = (clip_index - 1) * 20 # 20% na klip | |
| self.logger.update_progress("video_gen", | |
| clip_progress, | |
| f"Rozpocz臋to generowanie klipu {clip_index}") | |
| return task_id | |
| async def _monitor_generation(self, task_id: str, clip_index: int) -> Optional[str]: | |
| """Monitoruje post臋p generowania video.""" | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| max_retries = 60 # maksymalna liczba pr贸b (10 minut przy 10s odst臋pie) | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get( | |
| f"{self.status_url}?task_id={task_id}", | |
| headers=headers | |
| ) | |
| data = response.json() | |
| status = data.get('status') | |
| self.logger.log("video_gen", | |
| f"Status klipu {clip_index}: {status}", | |
| "debug") | |
| if status == "Success": | |
| clip_progress = (clip_index - 1) * 20 + 15 # +15% za uko艅czenie | |
| self.logger.update_progress("video_gen", | |
| clip_progress, | |
| f"Wygenerowano klip {clip_index}") | |
| return data.get('file_id') | |
| elif status == "Fail": | |
| raise Exception(f"Generowanie klipu {clip_index} nie powiod艂o si臋") | |
| await asyncio.sleep(10) # czekaj 10s przed kolejnym sprawdzeniem | |
| retry_count += 1 | |
| raise Exception(f"Przekroczono limit czasu dla klipu {clip_index}") | |
| async def _download_clip(self, file_id: str, clip_index: int) -> str: | |
| """Pobiera wygenerowany klip.""" | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get( | |
| f"https://api.minimaxi.chat/v1/files/retrieve?file_id={file_id}", | |
| headers=headers | |
| ) | |
| data = response.json() | |
| download_url = data.get('file', {}).get('download_url') | |
| if not download_url: | |
| raise Exception(f"Nie mo偶na pobra膰 URL dla klipu {clip_index}") | |
| # Pobierz plik | |
| response = await client.get(download_url) | |
| # Zapisz plik | |
| output_path = self.output_dir / f"clip_{clip_index}.mp4" | |
| with open(output_path, 'wb') as f: | |
| f.write(response.content) | |
| clip_progress = clip_index * 20 # 100% dla danego klipu | |
| self.logger.update_progress("video_gen", | |
| clip_progress, | |
| f"Pobrano klip {clip_index}") | |
| return str(output_path) | |
| def get_status(self) -> Dict[str, Any]: | |
| """Zwraca aktualny status generatora.""" | |
| return { | |
| 'progress': self.logger.get_module_progress("video_gen"), | |
| 'status': self.logger.get_module_status("video_gen") | |
| } | |