File size: 3,675 Bytes
7a0c684 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
"""
GPU Display Manager - Handles rendering and streaming of GPU output to the web interface
"""
import numpy as np
import base64
import io
from PIL import Image
import asyncio
from typing import Optional, Tuple
import threading
import queue
class GPUDisplayManager:
def __init__(self, resolution: Tuple[int, int] = (800, 600), fps: int = 30):
self.resolution = resolution
self.fps = fps
self.frame_interval = 1.0 / fps
self.frame_queue = queue.Queue(maxsize=10)
self.running = False
self.current_frame = None
def start_capture(self):
"""Start the GPU frame capture thread"""
self.running = True
self.capture_thread = threading.Thread(target=self._capture_loop)
self.capture_thread.daemon = True
self.capture_thread.start()
def stop_capture(self):
"""Stop the GPU frame capture"""
self.running = False
if hasattr(self, 'capture_thread'):
self.capture_thread.join()
def _capture_loop(self):
"""Main capture loop that reads from GPU framebuffer"""
while self.running:
try:
frame = self._capture_gpu_frame()
if frame is not None:
# Convert frame to base64 for streaming
encoded_frame = self._encode_frame(frame)
# Put in queue, drop frames if queue is full
try:
self.frame_queue.put_nowait(encoded_frame)
except queue.Full:
# Drop oldest frame
try:
self.frame_queue.get_nowait()
self.frame_queue.put_nowait(encoded_frame)
except queue.Empty:
pass
except Exception as e:
print(f"Error capturing frame: {e}")
# Maintain target FPS
asyncio.sleep(self.frame_interval)
def _capture_gpu_frame(self) -> Optional[np.ndarray]:
"""Capture current frame from GPU framebuffer"""
try:
# TODO: Implement actual GPU framebuffer capture here
# For now, generate a test pattern
frame = np.zeros((self.resolution[1], self.resolution[0], 3), dtype=np.uint8)
# Add some test pattern
frame[::20, :] = [255, 0, 0] # Red lines
frame[:, ::20] = [0, 255, 0] # Green lines
return frame
except Exception as e:
print(f"Error reading GPU framebuffer: {e}")
return None
def _encode_frame(self, frame: np.ndarray) -> str:
"""Convert frame to base64 JPEG for streaming"""
img = Image.fromarray(frame)
buffer = io.BytesIO()
img.save(buffer, format='JPEG', quality=80)
img_str = base64.b64encode(buffer.getvalue()).decode('utf-8')
return f"data:image/jpeg;base64,{img_str}"
async def get_next_frame(self) -> Optional[str]:
"""Get the next available frame"""
try:
return self.frame_queue.get_nowait()
except queue.Empty:
return None
def set_resolution(self, width: int, height: int):
"""Update the display resolution"""
self.resolution = (width, height)
def set_fps(self, fps: int):
"""Update the target FPS"""
self.fps = fps
self.frame_interval = 1.0 / fps
|