SpotMaster_v1.0 / core /video_gen.py
Marek4321's picture
Create video_gen.py
1a9ed07 verified
# @title core/video_gen.py
import os
import json
import httpx
import asyncio
from typing import Dict, List, Any, Optional
from pathlib import Path
from utils.logger import SpotMakerLogger
class VideoGenerator:
"""
Generator video wykorzystuj膮cy Hailuo API.
Generuje klipy video na podstawie opis贸w uj臋膰.
"""
def __init__(self, api_key: str, output_dir: str, logger: SpotMakerLogger):
self.api_key = api_key
self.logger = logger
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# Bazowe URL dla API
self.create_url = "https://api.minimaxi.chat/v1/video_generation"
self.status_url = "https://api.minimaxi.chat/v1/query/video_generation"
self.model = "video-01"
# Status dla ka偶dego klipu
self.clips_status: Dict[str, Dict[str, Any]] = {}
async def generate_clips(self, scenes: List[Dict[str, str]]) -> List[str]:
"""
Generuje seri臋 klip贸w video na podstawie opis贸w scen.
Args:
scenes: Lista s艂ownik贸w z opisami scen
Returns:
Lista 艣cie偶ek do wygenerowanych plik贸w video
"""
try:
self.logger.log("video_gen", f"Rozpoczynam generowanie {len(scenes)} klip贸w", "info")
self.logger.update_progress("video_gen", 0, "Inicjalizacja...")
# Lista do przechowywania task_id dla ka偶dego klipu
tasks = []
# Uruchom generowanie wszystkich klip贸w r贸wnolegle
for i, scene in enumerate(scenes, 1):
self.logger.log("video_gen", f"Inicjuj臋 generowanie klipu {i}/5", "info")
task = asyncio.create_task(self._generate_single_clip(scene['description'], i))
tasks.append(task)
# Czekaj na uko艅czenie wszystkich zada艅
clip_paths = await asyncio.gather(*tasks)
self.logger.update_progress("video_gen", 100, "Zako艅czono generowanie wszystkich klip贸w")
return clip_paths
except Exception as e:
self.logger.format_error("video_gen", e)
raise
async def _generate_single_clip(self, description: str, clip_index: int) -> str:
"""Generuje pojedynczy klip video."""
try:
# Inicjacja generowania
task_id = await self._initiate_generation(description, clip_index)
if not task_id:
raise Exception(f"Nie uda艂o si臋 zainicjowa膰 generowania klipu {clip_index}")
# Monitorowanie post臋pu
file_id = await self._monitor_generation(task_id, clip_index)
if not file_id:
raise Exception(f"B艂膮d podczas generowania klipu {clip_index}")
# Pobranie wygenerowanego pliku
output_path = await self._download_clip(file_id, clip_index)
return output_path
except Exception as e:
self.logger.format_error("video_gen", e)
raise
async def _initiate_generation(self, description: str, clip_index: int) -> str:
"""Inicjuje generowanie video w API."""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"prompt": description,
"prompt_optimizer": True
}
self.logger.log("video_gen",
f"Wysy艂am request do API dla klipu {clip_index}",
"debug")
async with httpx.AsyncClient() as client:
response = await client.post(
self.create_url,
headers=headers,
json=data,
timeout=30.0
)
response_data = response.json()
if response.status_code != 200:
error_msg = f"API error: {response.status_code} - {response.text}"
self.logger.log("video_gen", error_msg, "error")
raise Exception(error_msg)
task_id = response_data.get('task_id')
if not task_id:
raise Exception("Brak task_id w odpowiedzi")
# Aktualizuj status
clip_progress = (clip_index - 1) * 20 # 20% na klip
self.logger.update_progress("video_gen",
clip_progress,
f"Rozpocz臋to generowanie klipu {clip_index}")
return task_id
async def _monitor_generation(self, task_id: str, clip_index: int) -> Optional[str]:
"""Monitoruje post臋p generowania video."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
max_retries = 60 # maksymalna liczba pr贸b (10 minut przy 10s odst臋pie)
retry_count = 0
while retry_count < max_retries:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.status_url}?task_id={task_id}",
headers=headers
)
data = response.json()
status = data.get('status')
self.logger.log("video_gen",
f"Status klipu {clip_index}: {status}",
"debug")
if status == "Success":
clip_progress = (clip_index - 1) * 20 + 15 # +15% za uko艅czenie
self.logger.update_progress("video_gen",
clip_progress,
f"Wygenerowano klip {clip_index}")
return data.get('file_id')
elif status == "Fail":
raise Exception(f"Generowanie klipu {clip_index} nie powiod艂o si臋")
await asyncio.sleep(10) # czekaj 10s przed kolejnym sprawdzeniem
retry_count += 1
raise Exception(f"Przekroczono limit czasu dla klipu {clip_index}")
async def _download_clip(self, file_id: str, clip_index: int) -> str:
"""Pobiera wygenerowany klip."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.minimaxi.chat/v1/files/retrieve?file_id={file_id}",
headers=headers
)
data = response.json()
download_url = data.get('file', {}).get('download_url')
if not download_url:
raise Exception(f"Nie mo偶na pobra膰 URL dla klipu {clip_index}")
# Pobierz plik
response = await client.get(download_url)
# Zapisz plik
output_path = self.output_dir / f"clip_{clip_index}.mp4"
with open(output_path, 'wb') as f:
f.write(response.content)
clip_progress = clip_index * 20 # 100% dla danego klipu
self.logger.update_progress("video_gen",
clip_progress,
f"Pobrano klip {clip_index}")
return str(output_path)
def get_status(self) -> Dict[str, Any]:
"""Zwraca aktualny status generatora."""
return {
'progress': self.logger.get_module_progress("video_gen"),
'status': self.logger.get_module_status("video_gen")
}