import pygame import time import threading import os from flask import Flask, Response, render_template_string os.environ['SDL_VIDEODRIVER'] = 'dummy' pygame.init() print("✅ PyGame initialized") WIDTH, HEIGHT = 400, 300 shared = { "streaming": True, "x": 200, "y": 150, "frame_count": 0 } def game_loop(): """Generate frames continuously""" screen = pygame.Surface((WIDTH, HEIGHT)) x, speed = 200, 5 while shared["streaming"]: start_time = time.time() # Update position x += speed if x < 30 or x > WIDTH-30: speed *= -1 # Draw frame screen.fill((25, 25, 45)) pygame.draw.circle(screen, (255, 80, 80), (int(x), int(shared["y"])), 20) pygame.draw.circle(screen, (255, 255, 255), (int(x), int(shared["y"])), 20, 2) # Store latest frame as RGB bytes shared["current_frame"] = pygame.image.tostring(screen, 'RGB') shared["x"] = x shared["frame_count"] += 1 # Log every 30 frames if shared["frame_count"] % 30 == 0: print(f"Frame {shared['frame_count']}: X={x}") # Maintain 30 FPS elapsed = time.time() - start_time if elapsed < 1.0/30: time.sleep(1.0/30 - elapsed) app = Flask(__name__) HTML = '''

🎮 Binary Stream

Frame: 0 | Position: X=200 | FPS: 0
Streaming raw RGB data • 30 FPS • No caching
''' @app.route('/') def index(): return render_template_string(HTML) @app.route('/stream') def stream(): """Server-Sent Events stream with base64 encoded frames""" import json import base64 def generate(): last_sent = 0 while shared.get("streaming", True): # Only send if frame has updated if shared.get("frame_count", 0) > last_sent and "current_frame" in shared: frame_data = { 'frame': base64.b64encode(shared["current_frame"]).decode('utf-8'), 'frame_count': shared["frame_count"], 'x': shared["x"], 'timestamp': time.time() } yield f"data: {json.dumps(frame_data)}\n\n" last_sent = shared["frame_count"] # Check for updates 30 times per second time.sleep(1.0/30) return Response( generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' } ) if __name__ == "__main__": print("🚀 Starting binary stream server...") print("Expecting 30 FPS updates via SSE") # Create initial frame screen = pygame.Surface((WIDTH, HEIGHT)) screen.fill((25, 25, 45)) pygame.draw.circle(screen, (255, 80, 80), (200, 150), 20) shared["current_frame"] = pygame.image.tostring(screen, 'RGB') # Start game thread thread = threading.Thread(target=game_loop, daemon=True) thread.start() # Give it a moment time.sleep(0.5) # Start server app.run( host='0.0.0.0', port=int(os.environ.get('PORT', 7860)), debug=False, threaded=True )