Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| ================================================================================ | |
| NEXUS-VideoModel v2.0 - Gradio Demo for HuggingFace Spaces | |
| Interactive demo for video generation and continuation | |
| Author: Mike Amega - Ame Web Studio | |
| ================================================================================ | |
| """ | |
| import os | |
| import tempfile | |
| import torch | |
| import gradio as gr | |
| # --- DEBUG: VERSION CHECK --- | |
| try: | |
| import huggingface_hub | |
| print(f"π HuggingFace Hub Version: {huggingface_hub.__version__}") | |
| except ImportError as e: | |
| print(f"β οΈ Could not import huggingface_hub: {e}") | |
| try: | |
| import gradio as gr | |
| print(f"π Gradio Version: {gr.__version__}") | |
| except ImportError as e: | |
| print(f"β Failed to import gradio: {e}") | |
| try: | |
| from importlib.metadata import version | |
| print(f"π Installed Gradio Version (importlib): {version('gradio')}") | |
| print( | |
| f"π Installed HuggingFace Hub Version (importlib): {version('huggingface_hub')}" | |
| ) | |
| except Exception as ie: | |
| print(f"β οΈ Could not retrieve versions via importlib: {ie}") | |
| raise e | |
| # ---------------------------- | |
| from PIL import Image | |
| # Try to import the model | |
| try: | |
| from nexus_nutata_videomodel import ( | |
| NutataVideoModel, | |
| NutataVideoModelConfig, | |
| VideoConfig, | |
| ) | |
| MODEL_AVAILABLE = True | |
| except ImportError: | |
| MODEL_AVAILABLE = False | |
| print( | |
| "β οΈ Could not import model. Make sure nexus_nutata_videomodel.py is in the same directory." | |
| ) | |
| # ============================================================================== | |
| # GLOBAL MODEL LOADING | |
| # ============================================================================== | |
| # Device selection | |
| if torch.cuda.is_available(): | |
| DEVICE = "cuda" | |
| torch.backends.cudnn.benchmark = True | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| DEVICE = "mps" | |
| else: | |
| DEVICE = "cpu" | |
| print(f"π§ Using device: {DEVICE}") | |
| # Model cache | |
| MODEL = None | |
| CONFIG = None | |
| def load_model(repo_id: str = "amewebstudio/nexus-videomodel-v1.1"): | |
| """Load model from HuggingFace Hub or create new one""" | |
| global MODEL, CONFIG | |
| if MODEL is not None: | |
| return MODEL, "β Model already loaded" | |
| try: | |
| # Try loading from Hub | |
| print(f"π¦ Loading model from {repo_id}...") | |
| MODEL = NutataVideoModel.from_pretrained(repo_id, device=DEVICE) | |
| CONFIG = MODEL.config | |
| return MODEL, f"β Loaded from {repo_id}" | |
| except Exception as e: | |
| print(f"β οΈ Could not load from Hub: {e}") | |
| print("π¦ Creating new model with default config...") | |
| # Create with minimal config for demo | |
| CONFIG = NutataVideoModelConfig( | |
| d_model=256, | |
| n_layers=2, | |
| use_sparse_compression=True, | |
| sparse_compression_ratio=0.5, | |
| video=VideoConfig( | |
| height=128, | |
| width=128, | |
| n_frames=8, | |
| ), | |
| ) | |
| MODEL = NutataVideoModel(CONFIG).to(DEVICE) | |
| MODEL.eval() | |
| return MODEL, "β Created new model (not pretrained)" | |
| def video_to_gif(video_tensor: torch.Tensor, fps: int = 8) -> str: | |
| """Convert video tensor to GIF file""" | |
| # video_tensor: [B, C, T, H, W] or [C, T, H, W] | |
| if video_tensor.dim() == 5: | |
| video_tensor = video_tensor[0] # Take first batch | |
| # [C, T, H, W] -> [T, H, W, C] | |
| video = video_tensor.permute(1, 2, 3, 0).cpu().numpy() | |
| video = (video * 255).astype(np.uint8) | |
| # Create GIF | |
| frames = [Image.fromarray(frame) for frame in video] | |
| # Save to temp file | |
| temp_file = tempfile.NamedTemporaryFile(suffix=".gif", delete=False) | |
| frames[0].save( | |
| temp_file.name, | |
| save_all=True, | |
| append_images=frames[1:], | |
| duration=1000 // fps, | |
| loop=0, | |
| ) | |
| return temp_file.name | |
| def frames_to_video_tensor( | |
| frames: list, target_size: tuple = (128, 128) | |
| ) -> torch.Tensor: | |
| """Convert list of PIL Images to video tensor""" | |
| processed = [] | |
| for frame in frames: | |
| if isinstance(frame, np.ndarray): | |
| frame = Image.fromarray(frame) | |
| frame = frame.convert("RGB").resize(target_size, Image.LANCZOS) | |
| arr = np.array(frame).astype(np.float32) / 255.0 | |
| processed.append(arr) | |
| # [T, H, W, C] -> [C, T, H, W] | |
| video = np.stack(processed, axis=0) | |
| video = np.transpose(video, (3, 0, 1, 2)) | |
| return torch.tensor(video).unsqueeze(0) # Add batch dim | |
| # ============================================================================== | |
| # GRADIO INTERFACE FUNCTIONS | |
| # ============================================================================== | |
| def generate_video( | |
| n_frames: int = 16, temperature: float = 0.8, seed: int = -1, progress=gr.Progress() | |
| ): | |
| """Generate a new video from scratch""" | |
| global MODEL | |
| if MODEL is None: | |
| model, status = load_model() | |
| if model is None: | |
| return None, "β Model not loaded" | |
| progress(0.1, desc="Initializing...") | |
| # Set seed | |
| if seed >= 0: | |
| torch.manual_seed(seed) | |
| if DEVICE == "cuda": | |
| torch.cuda.manual_seed(seed) | |
| progress(0.3, desc="Generating latent...") | |
| try: | |
| MODEL.eval() | |
| with torch.no_grad(): | |
| # Generate video | |
| video = MODEL.generate( | |
| n_frames=n_frames, temperature=temperature, batch_size=1 | |
| ) | |
| progress(0.8, desc="Converting to GIF...") | |
| # Convert to GIF | |
| gif_path = video_to_gif(video, fps=CONFIG.video.fps) | |
| progress(1.0, desc="Done!") | |
| info = f""" | |
| β **Generated successfully!** | |
| - Frames: {n_frames} | |
| - Resolution: {video.shape[3]}Γ{video.shape[4]} | |
| - Temperature: {temperature} | |
| - Seed: {seed if seed >= 0 else "random"} | |
| """ | |
| return gif_path, info | |
| except Exception as e: | |
| return None, f"β Error: {str(e)}" | |
| def continue_video( | |
| input_video, | |
| n_continue_frames: int = 8, | |
| temperature: float = 0.7, | |
| progress=gr.Progress(), | |
| ): | |
| """Continue an existing video""" | |
| global MODEL, CONFIG | |
| if MODEL is None: | |
| model, status = load_model() | |
| if model is None: | |
| return None, "β Model not loaded" | |
| if input_video is None: | |
| return None, "β Please upload a video first" | |
| progress(0.1, desc="Processing input video...") | |
| try: | |
| # Load video frames | |
| import cv2 | |
| cap = cv2.VideoCapture(input_video) | |
| frames = [] | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frames.append(frame) | |
| cap.release() | |
| if len(frames) == 0: | |
| return None, "β Could not read video frames" | |
| # Convert to tensor | |
| video_tensor = frames_to_video_tensor( | |
| frames[:16], # Take first 16 frames | |
| target_size=(CONFIG.video.height, CONFIG.video.width), | |
| ).to(DEVICE) | |
| progress(0.4, desc="Encoding video...") | |
| MODEL.eval() | |
| with torch.no_grad(): | |
| # Continue video | |
| continued = MODEL.continue_video( | |
| video_tensor, n_frames=n_continue_frames, temperature=temperature | |
| ) | |
| progress(0.8, desc="Converting to GIF...") | |
| gif_path = video_to_gif(continued, fps=CONFIG.video.fps) | |
| progress(1.0, desc="Done!") | |
| info = f""" | |
| β **Continuation successful!** | |
| - Input frames: {len(frames[:16])} | |
| - Added frames: {n_continue_frames} | |
| - Total frames: {continued.shape[2]} | |
| """ | |
| return gif_path, info | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return None, f"β Error: {str(e)}" | |
| def get_model_info(): | |
| """Get information about the loaded model""" | |
| global MODEL, CONFIG | |
| if MODEL is None: | |
| return "β οΈ Model not loaded yet. Generate a video to load the model." | |
| try: | |
| diag = MODEL.diagnostics() | |
| info = f""" | |
| ## π§ NEXUS-VideoModel v2.0 | |
| **Architecture:** {CONFIG.codename if hasattr(CONFIG, "codename") else "Nutata-Cognitive"} | |
| ### Parameters | |
| - **Total:** {diag.get("total_params", "N/A"):,} | |
| - **Trainable:** {diag.get("trainable_params", "N/A"):,} | |
| ### Video Configuration | |
| - **Resolution:** {CONFIG.video.height}Γ{CONFIG.video.width} | |
| - **Frames:** {CONFIG.video.n_frames} | |
| - **FPS:** {CONFIG.video.fps} | |
| ### Cognitive Features | |
| - **LPOL Memory:** {"β " if CONFIG.use_lpol else "β"} ({len(CONFIG.domain_types)} domains) | |
| - **GQA Attention:** {"β " if CONFIG.use_gqa else "β"} | |
| - **Sparse Compression:** {"β " if getattr(CONFIG, "use_sparse_compression", False) else "β"} | |
| - **Flow Prediction:** {"β " if CONFIG.flow_prediction else "β"} | |
| ### Device | |
| - **Running on:** {DEVICE} | |
| """ | |
| return info | |
| except Exception as e: | |
| return f"β Error getting model info: {str(e)}" | |
| # ============================================================================== | |
| # GRADIO APP | |
| # ============================================================================== | |
| def create_demo(): | |
| """Create the Gradio demo interface""" | |
| with gr.Blocks( | |
| title="NEXUS-VideoModel v2.0", | |
| theme=gr.themes.Soft( | |
| primary_hue="violet", | |
| secondary_hue="purple", | |
| ), | |
| css=""" | |
| .container { max-width: 1200px; margin: auto; } | |
| .title { text-align: center; margin-bottom: 1rem; } | |
| .subtitle { text-align: center; color: #666; margin-bottom: 2rem; } | |
| """, | |
| ) as demo: | |
| gr.Markdown( | |
| """ | |
| # π¬ NEXUS-VideoModel v2.0 | |
| ### Cognitive Video World Model by Ame Web Studio | |
| Generate and continue videos using the NEXUS cognitive architecture with: | |
| - **3D VAE** for spatiotemporal compression | |
| - **LPOL Memory** for domain-specific knowledge | |
| - **SparseCompressor** for efficient processing | |
| - **Neurogenesis** for adaptive learning | |
| """, | |
| elem_classes="title", | |
| ) | |
| with gr.Tabs(): | |
| # Tab 1: Generate Video | |
| with gr.TabItem("π¨ Generate Video"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Generation Settings") | |
| gen_frames = gr.Slider( | |
| minimum=4, | |
| maximum=32, | |
| value=16, | |
| step=4, | |
| label="Number of Frames", | |
| ) | |
| gen_temp = gr.Slider( | |
| minimum=0.1, | |
| maximum=2.0, | |
| value=0.8, | |
| step=0.1, | |
| label="Temperature (creativity)", | |
| ) | |
| gen_seed = gr.Number( | |
| value=-1, precision=0, label="Seed (-1 for random)" | |
| ) | |
| gen_btn = gr.Button("π¬ Generate Video", variant="primary") | |
| with gr.Column(scale=2): | |
| gen_output = gr.Image(label="Generated Video", type="filepath") | |
| gen_info = gr.Markdown("Click 'Generate Video' to start") | |
| gen_btn.click( | |
| fn=generate_video, | |
| inputs=[gen_frames, gen_temp, gen_seed], | |
| outputs=[gen_output, gen_info], | |
| ) | |
| # Tab 2: Continue Video | |
| with gr.TabItem("β‘οΈ Continue Video"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Continuation Settings") | |
| input_video = gr.Video(label="Upload Video") | |
| cont_frames = gr.Slider( | |
| minimum=4, | |
| maximum=24, | |
| value=8, | |
| step=4, | |
| label="Frames to Add", | |
| ) | |
| cont_temp = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.5, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature", | |
| ) | |
| cont_btn = gr.Button("β‘οΈ Continue Video", variant="primary") | |
| with gr.Column(scale=2): | |
| cont_output = gr.Image(label="Continued Video", type="filepath") | |
| cont_info = gr.Markdown( | |
| "Upload a video and click 'Continue Video'" | |
| ) | |
| cont_btn.click( | |
| fn=continue_video, | |
| inputs=[input_video, cont_frames, cont_temp], | |
| outputs=[cont_output, cont_info], | |
| ) | |
| # Tab 3: Model Info | |
| with gr.TabItem("βΉοΈ Model Info"): | |
| model_info = gr.Markdown("Click button to load model info") | |
| info_btn = gr.Button("π Refresh Model Info") | |
| info_btn.click(fn=get_model_info, outputs=[model_info]) | |
| gr.Markdown( | |
| """ | |
| --- | |
| **Author:** Mike Amega - [Ame Web Studio](https://github.com/amewebstudio) | |
| **Model:** [amewebstudio/nexus-videomodel-v1.1](https://huggingface.co/amewebstudio/nexus-videomodel-v1.1) | |
| """ | |
| ) | |
| return demo | |
| # ============================================================================== | |
| # MAIN | |
| # ============================================================================== | |
| if __name__ == "__main__": | |
| try: | |
| import gradio | |
| import huggingface_hub | |
| print(f"π Gradio Version: {gradio.__version__}") | |
| print(f"π HuggingFace Hub Version: {huggingface_hub.__version__}") | |
| except Exception as e: | |
| print(f"β οΈ Could not check versions: {e}") | |
| if not MODEL_AVAILABLE: | |
| print("β Model not available. Please check imports.") | |
| else: | |
| # Pre-load model | |
| print("π¦ Pre-loading model...") | |
| load_model() | |
| # Create and launch demo | |
| demo = create_demo() | |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) | |