| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import logging |
| from typing import List, Dict, Any, Tuple, Callable, Optional, Generator |
| from PIL import Image, ImageOps |
| import os |
| import subprocess |
| import shutil |
| from pathlib import Path |
| import time |
| import gc |
| import torch |
|
|
| |
| from .director import AducDirector |
| from .types import GenerationState, PreProductionParams, ProductionParams |
|
|
| |
| from .engineers import deformes2d_thinker_singleton, deformes3d_engine_singleton, Deformes4DEngine |
|
|
| |
| from .managers.latent_enhancer_manager import latent_enhancer_specialist_singleton |
| from .managers.seedvr_manager import seedvr_manager_singleton |
| from .managers.mmaudio_manager import mmaudio_manager_singleton |
| from .managers.vae_manager import vae_manager_singleton |
|
|
| |
| from .tools.video_encode_tool import video_encode_tool_singleton |
|
|
| logger = logging.getLogger(__name__) |
|
|
| ProgressCallback = Optional[Callable[[float, str], None]] |
|
|
| class AducOrchestrator: |
| """ |
| Implementa o Maestro (Γ), a camada de orquestração central do Aduc Framework. |
| Ele recebe solicitações, atualiza o estado de geração, delega tarefas para os |
| engenheiros e seus pools de especialistas, e retorna o estado atualizado. |
| """ |
| def __init__(self, workspace_dir: str): |
| self.director = AducDirector(workspace_dir) |
| self.editor = Deformes4DEngine() |
| self.editor.initialize(workspace_dir) |
| self.painter = deformes3d_engine_singleton |
| self.painter.initialize(workspace_dir) |
| self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| logger.info("ADUC Maestro (Framework Core) pronto para reger a orquestra de especialistas.") |
|
|
| def get_current_state(self) -> GenerationState: |
| """Retorna o estado de geração atual.""" |
| return self.director.get_full_state() |
|
|
| def process_image_for_story(self, image_path: str, size: int, filename: str) -> str: |
| """Processa uma imagem de referência para o formato quadrado padrão.""" |
| img = Image.open(image_path).convert("RGB") |
| img_square = ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS) |
| processed_path = os.path.join(self.director.workspace_dir, filename) |
| img_square.save(processed_path) |
| logger.info(f"Imagem de referência processada e salva em: {processed_path}") |
| return processed_path |
|
|
| |
| def task_pre_production(self, params: PreProductionParams, progress_callback: ProgressCallback = None) -> Tuple[List[str], List[str], GenerationState]: |
| """Orquestra a criação do storyboard e dos keyframes visuais.""" |
| logger.info("Maestro: Iniciando tarefa de Pré-Produção.") |
| self.director.update_parameters("pre_producao", params) |
| |
| if progress_callback: progress_callback(0.1, "Gerando storyboard...") |
| storyboard_list = deformes2d_thinker_singleton.generate_storyboard(prompt=params.prompt, num_keyframes=params.num_keyframes, ref_image_paths=params.ref_paths) |
| self.director.update_pre_production_state(params.prompt, params.ref_paths, storyboard_list) |
| |
| if progress_callback: progress_callback(0.2, "Iniciando geração de keyframes...") |
| keyframes_detailed_data = self.painter.generate_keyframes_from_storyboard(generation_state=self.director.get_full_state_as_dict(), progress_callback=progress_callback) |
| self.director.update_keyframes_state(keyframes_detailed_data) |
| |
| final_keyframe_paths = [kf["caminho_pixel"] for kf in keyframes_detailed_data] |
| final_state = self.director.get_full_state() |
| logger.info("Maestro: Tarefa de Pré-Produção concluída.") |
| return storyboard_list, final_keyframe_paths, final_state |
|
|
| |
| def task_produce_original_movie(self, params: ProductionParams, progress_callback: ProgressCallback = None) -> Tuple[str, List[str], GenerationState]: |
| """Orquestra a geração do vídeo principal a partir dos keyframes.""" |
| logger.info("Maestro: Iniciando tarefa de Produção do Filme Original.") |
| self.director.update_parameters("producao", params) |
| |
| result_data = self.editor.generate_original_movie(full_generation_state=self.director.get_full_state_as_dict(), progress_callback=progress_callback) |
| self.director.update_video_state(result_data["video_data"]) |
| |
| final_video_path = result_data["final_path"] |
| latent_paths = result_data["latent_paths"] |
| final_state = self.director.get_full_state() |
| logger.info("Maestro: Tarefa de Produção do Filme Original concluída.") |
| return final_video_path, latent_paths, final_state |
|
|
| |
|
|
| def task_run_latent_upscaler(self, latent_paths: List[str], chunk_size: int, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]: |
| """Aplica upscale 2x nos latentes e os decodifica para um novo vídeo.""" |
| if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado.") |
| if not latent_paths: raise ValueError("Nenhum caminho de latente fornecido para o upscale.") |
| |
| logger.info("--- ORQUESTRADOR: Tarefa de Upscaling de Latentes ---") |
| run_timestamp = int(time.time()) |
| temp_dir = os.path.join(self.director.workspace_dir, f"temp_upscaled_clips_{run_timestamp}") |
| os.makedirs(temp_dir, exist_ok=True) |
| |
| final_upscaled_clip_paths = [] |
| num_chunks = -(-len(latent_paths) // chunk_size) |
| |
| for i in range(num_chunks): |
| chunk_paths = latent_paths[i * chunk_size:(i + 1) * chunk_size] |
| if progress_callback: progress_callback(i / num_chunks, f"Upscalando & Decodificando Lote {i+1}/{num_chunks}") |
|
|
| tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths] |
| sub_group_latent = torch.cat(tensors_in_chunk, dim=2) |
| |
| upscaled_latent_chunk = latent_enhancer_specialist_singleton.upscale(sub_group_latent) |
| pixel_tensor = vae_manager_singleton.decode(upscaled_latent_chunk) |
|
|
| current_clip_path = os.path.join(temp_dir, f"upscaled_clip_{i:04d}.mp4") |
| self.editor.save_video_from_tensor(pixel_tensor, current_clip_path, fps=24) |
| final_upscaled_clip_paths.append(current_clip_path) |
| |
| del tensors_in_chunk, sub_group_latent, upscaled_latent_chunk, pixel_tensor |
| gc.collect(); torch.cuda.empty_cache() |
| yield {"progress": (i + 1) / num_chunks} |
|
|
| final_video_path = os.path.join(self.director.workspace_dir, f"upscaled_movie_{run_timestamp}.mp4") |
| video_encode_tool_singleton.concatenate_videos(final_upscaled_clip_paths, final_video_path, self.director.workspace_dir) |
| |
| shutil.rmtree(temp_dir) |
| logger.info(f"Upscaling de latentes completo! Vídeo final em: {final_video_path}") |
| yield {"final_path": final_video_path} |
|
|
| def task_run_hd_mastering(self, source_video_path: str, steps: int, prompt: str, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]: |
| """Aplica masterização em HD usando o pool de GPUs do SeedVR com o modelo 3B.""" |
| if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado.") |
| logger.info(f"--- ORQUESTRADOR: Tarefa de Masterização HD com SeedVR 3B ---") |
| |
| run_timestamp = int(time.time()) |
| output_path = os.path.join(self.director.workspace_dir, f"hd_mastered_movie_3B_{run_timestamp}.mp4") |
| |
| final_path = seedvr_manager_singleton.process_video( |
| input_video_path=source_video_path, |
| output_video_path=output_path, |
| prompt=prompt, |
| steps=steps |
| ) |
| logger.info(f"Masterização HD completa! Vídeo final em: {final_path}") |
| yield {"final_path": final_path} |
|
|
| def task_run_audio_generation(self, source_video_path: str, audio_prompt: str, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]: |
| """Gera e adiciona áudio ao vídeo usando o pool de GPUs do MMAudio.""" |
| if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado.") |
| logger.info(f"--- ORQUESTRADOR: Tarefa de Geração de Áudio ---") |
|
|
| if progress_callback: progress_callback(0.1, "Preparando para geração de áudio...") |
| |
| run_timestamp = int(time.time()) |
| source_name = Path(source_video_path).stem |
| output_path = os.path.join(self.director.workspace_dir, f"{source_name}_with_audio_{run_timestamp}.mp4") |
|
|
| try: |
| result = subprocess.run( |
| ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", source_video_path], |
| capture_output=True, text=True, check=True |
| ) |
| duration = float(result.stdout.strip()) |
| except Exception as e: |
| logger.error(f"Não foi possível obter a duração do vídeo '{source_video_path}': {e}", exc_info=True) |
| yield {"error": "Falha ao obter duração do vídeo."} |
| return |
|
|
| if progress_callback: progress_callback(0.5, "Gerando trilha de áudio...") |
| |
| final_path = mmaudio_manager_singleton.generate_audio_for_video( |
| video_path=source_video_path, |
| prompt=audio_prompt, |
| duration_seconds=duration, |
| output_path_override=output_path |
| ) |
| |
| logger.info(f"Geração de áudio completa! Vídeo com áudio em: {final_path}") |
| if progress_callback: progress_callback(1.0, "Geração de áudio completa!") |
| yield {"final_path": final_path} |