""" Display Module - Output System This module handles the final output of rendered frames, supporting multiple output methods including WebSocket to browser, GUI windows, and image files. """ import asyncio import json import base64 import time import numpy as np from typing import Optional, Dict, Any, Callable from io import BytesIO import threading try: import websockets WEBSOCKETS_AVAILABLE = True except ImportError: WEBSOCKETS_AVAILABLE = False print("Warning: websockets not available. WebSocket display will not work.") try: import tkinter as tk from tkinter import Canvas from PIL import Image, ImageTk TKINTER_AVAILABLE = True except ImportError: TKINTER_AVAILABLE = False print("Warning: tkinter or PIL not available. GUI display will not work.") try: from PIL import Image PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False print("Warning: PIL not available. Image saving will not work.") class DisplayMode: """Enumeration of display modes.""" WEBSOCKET = "websocket" GUI = "gui" FILE = "file" CONSOLE = "console" class WebSocketDisplay: """WebSocket-based display that sends frames to a web browser.""" def __init__(self, host: str = "localhost", port: int = 8765): self.host = host self.port = port self.server = None self.clients = set() self.is_running = False async def start_server(self): """Start the WebSocket server.""" if not WEBSOCKETS_AVAILABLE: raise RuntimeError("WebSocket support not available") async def handle_client(websocket, path): self.clients.add(websocket) print(f"Client connected: {websocket.remote_address}") try: await websocket.wait_closed() finally: self.clients.remove(websocket) print(f"Client disconnected: {websocket.remote_address}") self.server = await websockets.serve(handle_client, self.host, self.port) self.is_running = True print(f"WebSocket server started on ws://{self.host}:{self.port}") async def stop_server(self): """Stop the WebSocket server.""" if self.server: self.server.close() await self.server.wait_closed() self.is_running = False print("WebSocket server stopped") async def send_frame(self, frame_data: np.ndarray, frame_id: int = 0): """Send a frame to all connected clients.""" if not self.clients or not PIL_AVAILABLE: return try: # Convert numpy array to PIL Image if len(frame_data.shape) == 3: height, width, channels = frame_data.shape if channels == 3: image = Image.fromarray(frame_data.astype(np.uint8), 'RGB') elif channels == 4: image = Image.fromarray(frame_data.astype(np.uint8), 'RGBA') else: # Convert single channel to RGB rgb_data = np.stack([frame_data[:,:,0]] * 3, axis=-1) image = Image.fromarray(rgb_data.astype(np.uint8), 'RGB') else: # Grayscale image = Image.fromarray(frame_data.astype(np.uint8), 'L') # Convert to base64 buffer = BytesIO() image.save(buffer, format='PNG') img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') # Create message message = { "type": "frame", "frame_id": frame_id, "width": image.width, "height": image.height, "data": f"data:image/png;base64,{img_base64}", "timestamp": time.time() } # Send to all clients if self.clients: await asyncio.gather( *[client.send(json.dumps(message)) for client in self.clients], return_exceptions=True ) except Exception as e: print(f"Error sending frame via WebSocket: {e}") class GUIDisplay: """Tkinter-based GUI display window.""" def __init__(self, title: str = "vGPU Display", width: int = 800, height: int = 600): if not TKINTER_AVAILABLE: raise RuntimeError("GUI display not available (tkinter/PIL missing)") self.title = title self.width = width self.height = height self.window = None self.canvas = None self.is_running = False self.update_callback = None def start(self): """Start the GUI display in a separate thread.""" if self.is_running: return def run_gui(): self.window = tk.Tk() self.window.title(self.title) self.window.geometry(f"{self.width}x{self.height}") self.canvas = Canvas(self.window, width=self.width, height=self.height, bg='black') self.canvas.pack() self.is_running = True # Set up periodic update def update(): if self.update_callback: self.update_callback() if self.is_running: self.window.after(16, update) # ~60 FPS update() self.window.protocol("WM_DELETE_WINDOW", self.stop) self.window.mainloop() self.gui_thread = threading.Thread(target=run_gui, daemon=True) self.gui_thread.start() def stop(self): """Stop the GUI display.""" self.is_running = False if self.window: self.window.quit() def show_frame(self, frame_data: np.ndarray): """Display a frame in the GUI window.""" if not self.is_running or not self.canvas: return try: # Convert numpy array to PIL Image if len(frame_data.shape) == 3: height, width, channels = frame_data.shape if channels >= 3: image = Image.fromarray(frame_data[:,:,:3].astype(np.uint8), 'RGB') else: # Convert single channel to RGB rgb_data = np.stack([frame_data[:,:,0]] * 3, axis=-1) image = Image.fromarray(rgb_data.astype(np.uint8), 'RGB') else: # Grayscale image = Image.fromarray(frame_data.astype(np.uint8), 'L') # Resize to fit canvas image = image.resize((self.width, self.height), Image.Resampling.LANCZOS) # Convert to PhotoImage photo = ImageTk.PhotoImage(image) # Update canvas self.canvas.delete("all") self.canvas.create_image(self.width//2, self.height//2, image=photo) # Keep a reference to prevent garbage collection self.canvas.image = photo except Exception as e: print(f"Error displaying frame in GUI: {e}") def set_update_callback(self, callback: Callable): """Set a callback function to be called periodically.""" self.update_callback = callback class FileDisplay: """File-based display that saves frames as image files.""" def __init__(self, output_dir: str = "./frames", format: str = "png"): self.output_dir = output_dir self.format = format.lower() self.frame_counter = 0 # Create output directory import os os.makedirs(output_dir, exist_ok=True) def save_frame(self, frame_data: np.ndarray, filename: Optional[str] = None): """Save a frame to a file.""" if not PIL_AVAILABLE: print("Error: PIL not available for saving images") return False try: if filename is None: filename = f"frame_{self.frame_counter:06d}.{self.format}" self.frame_counter += 1 filepath = f"{self.output_dir}/{filename}" # Convert numpy array to PIL Image if len(frame_data.shape) == 3: height, width, channels = frame_data.shape if channels == 3: image = Image.fromarray(frame_data.astype(np.uint8), 'RGB') elif channels == 4: image = Image.fromarray(frame_data.astype(np.uint8), 'RGBA') else: # Convert single channel to RGB rgb_data = np.stack([frame_data[:,:,0]] * 3, axis=-1) image = Image.fromarray(rgb_data.astype(np.uint8), 'RGB') else: # Grayscale image = Image.fromarray(frame_data.astype(np.uint8), 'L') # Save image image.save(filepath) print(f"Frame saved: {filepath}") return True except Exception as e: print(f"Error saving frame: {e}") return False class ConsoleDisplay: """Console-based display that shows ASCII art representation.""" def __init__(self, width: int = 80, height: int = 24): self.width = width self.height = height self.ascii_chars = " .:-=+*#%@" def show_frame(self, frame_data: np.ndarray): """Display frame as ASCII art in console.""" try: # Convert to grayscale if needed if len(frame_data.shape) == 3: # Convert RGB to grayscale gray = np.dot(frame_data[...,:3], [0.299, 0.587, 0.114]) else: gray = frame_data # Resize to console dimensions from scipy import ndimage resized = ndimage.zoom(gray, (self.height / gray.shape[0], self.width / gray.shape[1])) # Convert to ASCII ascii_frame = [] for row in resized: ascii_row = "" for pixel in row: # Map pixel value to ASCII character char_index = int((pixel / 255.0) * (len(self.ascii_chars) - 1)) ascii_row += self.ascii_chars[char_index] ascii_frame.append(ascii_row) # Clear screen and display print("\033[2J\033[H") # Clear screen and move cursor to top for row in ascii_frame: print(row) except Exception as e: print(f"Error displaying ASCII frame: {e}") class DisplayManager: """Manages multiple display outputs and coordinates frame updates.""" def __init__(self, vram=None): self.vram = vram self.displays = {} self.active_framebuffer = None self.frame_counter = 0 self.fps_target = 60 self.last_frame_time = 0 # Statistics self.frames_displayed = 0 self.total_display_time = 0.0 def add_display(self, name: str, display_type: str, **kwargs): """Add a display output.""" if display_type == DisplayMode.WEBSOCKET: display = WebSocketDisplay(**kwargs) elif display_type == DisplayMode.GUI: display = GUIDisplay(**kwargs) elif display_type == DisplayMode.FILE: display = FileDisplay(**kwargs) elif display_type == DisplayMode.CONSOLE: display = ConsoleDisplay(**kwargs) else: raise ValueError(f"Unknown display type: {display_type}") self.displays[name] = { "display": display, "type": display_type, "enabled": True } return display def remove_display(self, name: str): """Remove a display output.""" if name in self.displays: display_info = self.displays[name] if display_info["type"] == DisplayMode.WEBSOCKET: asyncio.create_task(display_info["display"].stop_server()) elif display_info["type"] == DisplayMode.GUI: display_info["display"].stop() del self.displays[name] def set_active_framebuffer(self, framebuffer_id: str): """Set the active framebuffer to display.""" self.active_framebuffer = framebuffer_id async def update_displays(self): """Update all active displays with the current framebuffer.""" if not self.vram or not self.active_framebuffer: return start_time = time.time() # Get framebuffer data framebuffer = self.vram.get_framebuffer(self.active_framebuffer) if not framebuffer: return frame_data = framebuffer.pixel_buffer # Update each display for name, display_info in self.displays.items(): if not display_info["enabled"]: continue display = display_info["display"] display_type = display_info["type"] try: if display_type == DisplayMode.WEBSOCKET: await display.send_frame(frame_data, self.frame_counter) elif display_type == DisplayMode.GUI: display.show_frame(frame_data) elif display_type == DisplayMode.FILE: display.save_frame(frame_data) elif display_type == DisplayMode.CONSOLE: display.show_frame(frame_data) except Exception as e: print(f"Error updating display {name}: {e}") # Update statistics self.frame_counter += 1 self.frames_displayed += 1 self.total_display_time += time.time() - start_time self.last_frame_time = time.time() def enable_display(self, name: str, enabled: bool = True): """Enable or disable a specific display.""" if name in self.displays: self.displays[name]["enabled"] = enabled def get_stats(self) -> Dict[str, Any]: """Get display manager statistics.""" avg_display_time = self.total_display_time / max(1, self.frames_displayed) current_fps = 1.0 / max(0.001, time.time() - self.last_frame_time) if self.last_frame_time > 0 else 0 return { "frames_displayed": self.frames_displayed, "total_display_time": self.total_display_time, "avg_display_time": avg_display_time, "current_fps": current_fps, "target_fps": self.fps_target, "active_displays": len([d for d in self.displays.values() if d["enabled"]]), "total_displays": len(self.displays), "active_framebuffer": self.active_framebuffer } if __name__ == "__main__": # Test the display system async def test_display(): from vram import VRAM from render import Renderer print("Testing Display System...") # Create VRAM and renderer vram = VRAM(memory_size_gb=1) renderer = Renderer(vram) # Create display manager display_manager = DisplayManager(vram) # Create a test framebuffer fb_id = vram.create_framebuffer(400, 300, 3) display_manager.set_active_framebuffer(fb_id) # Add displays if WEBSOCKETS_AVAILABLE: ws_display = display_manager.add_display("websocket", DisplayMode.WEBSOCKET) await ws_display.start_server() if TKINTER_AVAILABLE: gui_display = display_manager.add_display("gui", DisplayMode.GUI, width=400, height=300) gui_display.start() file_display = display_manager.add_display("file", DisplayMode.FILE, output_dir="./test_frames") console_display = display_manager.add_display("console", DisplayMode.CONSOLE, width=40, height=20) # Render some test content renderer.clear(fb_id, (64, 128, 255)) renderer.draw_rect(fb_id, 50, 50, 100, 80, (255, 0, 0)) renderer.draw_circle(fb_id, 200, 150, 40, (0, 255, 0), filled=True) # Update displays await display_manager.update_displays() # Animate for a few seconds for i in range(60): # 1 second at 60 FPS # Clear and draw animated content renderer.clear(fb_id, (32, 64, 128)) # Moving rectangle x = 50 + int(50 * np.sin(i * 0.1)) renderer.draw_rect(fb_id, x, 50, 50, 50, (255, 255, 0)) # Rotating line effect center_x, center_y = 200, 150 for j in range(8): angle = (i + j * 8) * 0.1 end_x = center_x + int(40 * np.cos(angle)) end_y = center_y + int(40 * np.sin(angle)) renderer.draw_line(fb_id, center_x, center_y, end_x, end_y, (0, 255, 255)) # Update displays await display_manager.update_displays() await asyncio.sleep(1/60) # 60 FPS # Print statistics stats = display_manager.get_stats() print(f"Display Manager stats: {stats}") # Cleanup if WEBSOCKETS_AVAILABLE: await ws_display.stop_server() if TKINTER_AVAILABLE: gui_display.stop() print("Display system test completed!") # Run the test asyncio.run(test_display())