""" GPU Display Manager - Handles rendering and streaming of GPU output to the web interface """ import numpy as np import base64 import io from PIL import Image import asyncio from typing import Optional, Tuple import threading import queue class GPUDisplayManager: def __init__(self, resolution: Tuple[int, int] = (800, 600), fps: int = 30): self.resolution = resolution self.fps = fps self.frame_interval = 1.0 / fps self.frame_queue = queue.Queue(maxsize=10) self.running = False self.current_frame = None def start_capture(self): """Start the GPU frame capture thread""" self.running = True self.capture_thread = threading.Thread(target=self._capture_loop) self.capture_thread.daemon = True self.capture_thread.start() def stop_capture(self): """Stop the GPU frame capture""" self.running = False if hasattr(self, 'capture_thread'): self.capture_thread.join() def _capture_loop(self): """Main capture loop that reads from GPU framebuffer""" while self.running: try: frame = self._capture_gpu_frame() if frame is not None: # Convert frame to base64 for streaming encoded_frame = self._encode_frame(frame) # Put in queue, drop frames if queue is full try: self.frame_queue.put_nowait(encoded_frame) except queue.Full: # Drop oldest frame try: self.frame_queue.get_nowait() self.frame_queue.put_nowait(encoded_frame) except queue.Empty: pass except Exception as e: print(f"Error capturing frame: {e}") # Maintain target FPS asyncio.sleep(self.frame_interval) def _capture_gpu_frame(self) -> Optional[np.ndarray]: """Capture current frame from GPU framebuffer""" try: # TODO: Implement actual GPU framebuffer capture here # For now, generate a test pattern frame = np.zeros((self.resolution[1], self.resolution[0], 3), dtype=np.uint8) # Add some test pattern frame[::20, :] = [255, 0, 0] # Red lines frame[:, ::20] = [0, 255, 0] # Green lines return frame except Exception as e: print(f"Error reading GPU framebuffer: {e}") return None def _encode_frame(self, frame: np.ndarray) -> str: """Convert frame to base64 JPEG for streaming""" img = Image.fromarray(frame) buffer = io.BytesIO() img.save(buffer, format='JPEG', quality=80) img_str = base64.b64encode(buffer.getvalue()).decode('utf-8') return f"data:image/jpeg;base64,{img_str}" async def get_next_frame(self) -> Optional[str]: """Get the next available frame""" try: return self.frame_queue.get_nowait() except queue.Empty: return None def set_resolution(self, width: int, height: int): """Update the display resolution""" self.resolution = (width, height) def set_fps(self, fps: int): """Update the target FPS""" self.fps = fps self.frame_interval = 1.0 / fps