flaskdockernew / app2.py
MySafeCode's picture
Rename app.py to app2.py
a63b4aa verified
import pygame
import time
import threading
import base64
import os
from flask import Flask, Response, render_template_string
from io import BytesIO
from PIL import Image
os.environ['SDL_VIDEODRIVER'] = 'dummy'
pygame.init()
WIDTH, HEIGHT = 800, 600
shared = {
"latest_frame": None,
"frame_count": 0,
"streaming": True
}
def capture_loop():
"""Simple game loop that updates a frame"""
screen = pygame.Surface((WIDTH, HEIGHT))
circle_x, circle_y = WIDTH//2, HEIGHT//2
speed_x, speed_y = 4, 3
while shared["streaming"]:
start_time = time.time()
# Update circle position
circle_x += speed_x
circle_y += speed_y
if circle_x < 30 or circle_x > WIDTH-30:
speed_x *= -1
if circle_y < 30 or circle_y > HEIGHT-30:
speed_y *= -1
# Draw on the surface
screen.fill((25, 25, 45))
pygame.draw.circle(screen, (255, 80, 80), (int(circle_x), int(circle_y)), 30)
# Convert to JPEG using PIL
frame_str = pygame.image.tostring(screen, 'RGB')
img = Image.frombytes('RGB', (WIDTH, HEIGHT), frame_str)
buf = BytesIO()
img.save(buf, format='JPEG', quality=85)
shared["latest_frame"] = base64.b64encode(buf.getvalue()).decode('utf-8')
shared["frame_count"] += 1
# Control frame rate
elapsed = time.time() - start_time
if elapsed < 1.0/30:
time.sleep(1.0/30 - elapsed)
app = Flask(__name__)
HTML = '''<html>
<head>
<style>
body { background:#0f172a; color:white; text-align:center; padding:20px; }
#streamImg { border:3px solid #60a5fa; width:400px; height:300px; }
</style>
</head>
<body>
<h1>PyGame Stream</h1>
<img id="streamImg" src="/stream">
<p>Frames: <span id="count">0</span></p>
<script>
function updateStream() {
const img = document.getElementById('streamImg');
img.src = '/stream?_=' + Date.now();
fetch('/stats')
.then(r => r.json())
.then(data => {
document.getElementById('count').textContent = data.frame_count;
});
}
// Update every 33ms (~30 FPS)
setInterval(updateStream, 33);
updateStream(); // Initial call
</script>
</body>
</html>'''
@app.route('/')
def index():
return render_template_string(HTML)
@app.route('/stats')
def stats():
return {'frame_count': shared['frame_count']}
@app.route('/stream')
def stream():
if shared['latest_frame']:
frame_bytes = base64.b64decode(shared['latest_frame'])
return Response(frame_bytes, mimetype='image/jpeg')
return Response(b'', mimetype='image/jpeg')
if __name__ == "__main__":
# Start game thread
thread = threading.Thread(target=capture_loop, daemon=True)
thread.start()
# Wait for first frame
for _ in range(50):
if shared['latest_frame']:
break
time.sleep(0.1)
# Start server
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860)), debug=False, threaded=True)