import pygame
import time
import threading
import os
from flask import Flask, Response, render_template_string
os.environ['SDL_VIDEODRIVER'] = 'dummy'
pygame.init()
print("✅ PyGame initialized")
WIDTH, HEIGHT = 400, 300
shared = {
"streaming": True,
"x": 200,
"y": 150,
"frame_count": 0
}
def game_loop():
"""Generate frames continuously"""
screen = pygame.Surface((WIDTH, HEIGHT))
x, speed = 200, 5
while shared["streaming"]:
start_time = time.time()
# Update position
x += speed
if x < 30 or x > WIDTH-30:
speed *= -1
# Draw frame
screen.fill((25, 25, 45))
pygame.draw.circle(screen, (255, 80, 80), (int(x), int(shared["y"])), 20)
pygame.draw.circle(screen, (255, 255, 255), (int(x), int(shared["y"])), 20, 2)
# Store latest frame as RGB bytes
shared["current_frame"] = pygame.image.tostring(screen, 'RGB')
shared["x"] = x
shared["frame_count"] += 1
# Log every 30 frames
if shared["frame_count"] % 30 == 0:
print(f"Frame {shared['frame_count']}: X={x}")
# Maintain 30 FPS
elapsed = time.time() - start_time
if elapsed < 1.0/30:
time.sleep(1.0/30 - elapsed)
app = Flask(__name__)
HTML = '''
🎮 Binary Stream
Frame: 0 |
Position: X=200 |
FPS: 0
Streaming raw RGB data • 30 FPS • No caching
'''
@app.route('/')
def index():
return render_template_string(HTML)
@app.route('/stream')
def stream():
"""Server-Sent Events stream with base64 encoded frames"""
import json
import base64
def generate():
last_sent = 0
while shared.get("streaming", True):
# Only send if frame has updated
if shared.get("frame_count", 0) > last_sent and "current_frame" in shared:
frame_data = {
'frame': base64.b64encode(shared["current_frame"]).decode('utf-8'),
'frame_count': shared["frame_count"],
'x': shared["x"],
'timestamp': time.time()
}
yield f"data: {json.dumps(frame_data)}\n\n"
last_sent = shared["frame_count"]
# Check for updates 30 times per second
time.sleep(1.0/30)
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
}
)
if __name__ == "__main__":
print("🚀 Starting binary stream server...")
print("Expecting 30 FPS updates via SSE")
# Create initial frame
screen = pygame.Surface((WIDTH, HEIGHT))
screen.fill((25, 25, 45))
pygame.draw.circle(screen, (255, 80, 80), (200, 150), 20)
shared["current_frame"] = pygame.image.tostring(screen, 'RGB')
# Start game thread
thread = threading.Thread(target=game_loop, daemon=True)
thread.start()
# Give it a moment
time.sleep(0.5)
# Start server
app.run(
host='0.0.0.0',
port=int(os.environ.get('PORT', 7860)),
debug=False,
threaded=True
)