MySafeCode's picture
Create app.py
cbc54ba verified
import pygame
import numpy as np
from flask import Flask, Response, render_template_string
from flask_sock import Sock
import time
import os
import cv2
import threading
import json
import base64
from PIL import Image
import io
# Initialize Pygame headlessly
os.environ['SDL_VIDEODRIVER'] = 'dummy'
pygame.init()
try:
pygame.mixer.init(frequency=44100, size=-16, channels=2)
print("✅ Audio mixer initialized")
except Exception as e:
print(f"⚠️ Audio mixer not available: {e}")
app = Flask(__name__)
sock = Sock(app)
class ShaderRenderer:
def __init__(self, width=640, height=480):
self.width = width
self.height = height
self.mouse_x = width // 2
self.mouse_y = height // 2
self.start_time = time.time()
self.surface = pygame.Surface((width, height))
self.frame_count = 0
self.last_frame_time = time.time()
self.fps = 0
self.button_clicked = False
self.sound_source = 'none'
def set_mouse(self, x, y):
self.mouse_x = max(0, min(self.width, x))
self.mouse_y = max(0, min(self.height, y))
def handle_click(self, x, y):
button_rect = pygame.Rect(self.width-200, 120, 180, 40)
if button_rect.collidepoint(x, y):
self.button_clicked = not self.button_clicked
return True
return False
def render_frame(self):
t = time.time() - self.start_time
# Calculate FPS
self.frame_count += 1
if time.time() - self.last_frame_time > 1.0:
self.fps = self.frame_count
self.frame_count = 0
self.last_frame_time = time.time()
# Clear
self.surface.fill((20, 20, 30))
font = pygame.font.Font(None, 24)
# Draw TOP marker
pygame.draw.rect(self.surface, (255, 100, 100), (10, 10, 100, 30))
text = font.render("TOP", True, (255, 255, 255))
self.surface.blit(text, (20, 15))
# Draw BOTTOM marker
pygame.draw.rect(self.surface, (100, 255, 100), (10, self.height-40, 100, 30))
text = font.render("BOTTOM", True, (0, 0, 0))
self.surface.blit(text, (20, self.height-35))
# Draw CLOCK
current_time = time.time()
seconds = int(current_time) % 60
hundredths = int((current_time * 100) % 100)
time_str = f"{seconds:02d}.{hundredths:02d}s"
clock_text = font.render(time_str, True, (0, 255, 255))
self.surface.blit(clock_text, (self.width-150, 40))
# Draw BUTTON
button_rect = pygame.Rect(self.width-200, 120, 180, 40)
mouse_over = button_rect.collidepoint(self.mouse_x, self.mouse_y)
if self.button_clicked:
button_color = (0, 200, 0)
elif mouse_over:
button_color = (100, 100, 200)
else:
button_color = (80, 80, 80)
pygame.draw.rect(self.surface, button_color, button_rect)
pygame.draw.rect(self.surface, (200, 200, 200), button_rect, 2)
btn_text = "✅ CLICKED!" if self.button_clicked else "🔘 CLICK ME"
text_surf = font.render(btn_text, True, (255, 255, 255))
text_rect = text_surf.get_rect(center=button_rect.center)
self.surface.blit(text_surf, text_rect)
# Draw circle
circle_size = 30 + int(20 * np.sin(t * 2))
if self.sound_source == 'pygame':
color = (100, 255, 100)
elif self.sound_source == 'browser':
color = (100, 100, 255)
else:
color = (255, 100, 100)
pygame.draw.circle(self.surface, color,
(self.mouse_x, self.mouse_y), circle_size)
# Draw grid
for x in range(0, self.width, 50):
alpha = int(40 + 20 * np.sin(x * 0.1 + t))
pygame.draw.line(self.surface, (alpha, alpha, 50),
(x, 0), (x, self.height))
for y in range(0, self.height, 50):
alpha = int(40 + 20 * np.cos(y * 0.1 + t))
pygame.draw.line(self.surface, (alpha, alpha, 50),
(0, y), (self.width, y))
# FPS counter
fps_text = font.render(f"FPS: {self.fps}", True, (255, 255, 0))
self.surface.blit(fps_text, (self.width-150, self.height-60))
return pygame.image.tostring(self.surface, 'RGB')
def get_frame_jpeg(self, quality=70):
"""Return frame as JPEG bytes"""
frame = self.render_frame()
img = np.frombuffer(frame, dtype=np.uint8).reshape((self.height, self.width, 3))
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
_, jpeg = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, quality])
return jpeg.tobytes()
renderer = ShaderRenderer()
# Single WebSocket for everything
@sock.route('/ws')
def websocket(ws):
"""One WebSocket to rule them all - video + interactions"""
print("🟢 Client connected")
running = True
# Video streaming thread
def video_stream():
while running:
try:
# Get frame as JPEG
frame = renderer.get_frame_jpeg(quality=70)
# Send as binary with a 6-byte header 'FRAME:'
# This helps client identify what type of data it is
header = b'FRAME:'
ws.send(header + frame)
time.sleep(1/30) # 30fps max
except:
break
# Start video thread
video_thread = threading.Thread(target=video_stream, daemon=True)
video_thread.start()
# Handle incoming messages (interactions)
try:
while True:
message = ws.receive()
if not message:
continue
# Parse JSON message
try:
data = json.loads(message)
if data['type'] == 'mouse':
renderer.set_mouse(data['x'], data['y'])
elif data['type'] == 'click':
renderer.handle_click(data['x'], data['y'])
elif data['type'] == 'sound':
renderer.sound_source = data['source']
# Optional: Send confirmation or state back
ws.send(json.dumps({
'type': 'state',
'button': renderer.button_clicked,
'sound': renderer.sound_source
}))
except json.JSONDecodeError:
print(f"Invalid JSON: {message}")
except Exception as e:
print(f"Connection error: {e}")
finally:
running = False
print("🔴 Client disconnected")
@app.route('/')
def index():
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>🎮 Pygame + WebSocket Only</title>
<style>
body {
margin: 0;
background: #0a0a0a;
color: white;
font-family: 'Segoe UI', Arial, sans-serif;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
}
.container {
max-width: 900px;
padding: 20px;
text-align: center;
}
h1 { color: #4CAF50; margin-bottom: 20px; }
h1 small { font-size: 14px; color: #666; display: block; }
.video-container {
background: #000;
border-radius: 12px;
padding: 5px;
margin: 20px 0;
box-shadow: 0 0 30px rgba(76, 175, 80, 0.2);
position: relative;
}
canvas {
width: 100%;
max-width: 640px;
height: auto;
border-radius: 8px;
display: block;
margin: 0 auto;
background: #111;
cursor: crosshair;
}
.mouse-coords {
position: absolute;
bottom: 10px;
left: 10px;
background: rgba(0,0,0,0.7);
color: #4CAF50;
padding: 5px 10px;
border-radius: 20px;
font-family: monospace;
font-size: 14px;
pointer-events: none;
z-index: 10;
}
.controls {
background: #1a1a1a;
border-radius: 12px;
padding: 20px;
margin-top: 20px;
}
.sound-buttons {
display: flex;
gap: 10px;
justify-content: center;
margin: 20px 0;
flex-wrap: wrap;
}
button {
background: #333;
color: white;
border: none;
padding: 12px 24px;
font-size: 16px;
border-radius: 8px;
cursor: pointer;
transition: all 0.3s;
min-width: 100px;
font-weight: bold;
border: 1px solid #444;
}
button:hover {
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(0,0,0,0.3);
}
button.active {
background: #4CAF50;
border-color: #4CAF50;
box-shadow: 0 0 20px #4CAF50;
}
.status-panel {
background: #222;
border-radius: 8px;
padding: 15px;
margin-top: 20px;
display: flex;
justify-content: space-around;
flex-wrap: wrap;
gap: 15px;
}
.status-item {
display: flex;
align-items: center;
gap: 10px;
}
.status-label {
color: #888;
font-size: 14px;
}
.status-value {
background: #333;
padding: 5px 12px;
border-radius: 20px;
font-size: 14px;
font-weight: bold;
}
.badge {
padding: 5px 10px;
border-radius: 20px;
background: #333;
}
.badge.green { background: #4CAF50; }
.badge.red { background: #ff4444; }
.badge.blue { background: #2196F3; }
.info-text {
color: #666;
font-size: 12px;
margin-top: 15px;
border-top: 1px solid #333;
padding-top: 15px;
}
</style>
</head>
<body>
<div class="container">
<h1>🎮 Pygame + WebSocket Only <small>Zero HTTP API Calls</small></h1>
<div class="video-container">
<canvas id="canvas" width="640" height="480"></canvas>
<div id="mouseCoords" class="mouse-coords">X: 320, Y: 240</div>
</div>
<div class="controls">
<h3>🔊 Sound Source</h3>
<div class="sound-buttons">
<button id="btnNone" onclick="setSound('none')" class="active">🔇 None</button>
<button id="btnPygame" onclick="setSound('pygame')">🎮 Pygame</button>
<button id="btnBrowser" onclick="setSound('browser')">🌐 Browser</button>
</div>
<div class="status-panel">
<div class="status-item">
<span class="status-label">WebSocket:</span>
<span id="wsStatus" class="status-value">🟢 Connected</span>
</div>
<div class="status-item">
<span class="status-label">Button:</span>
<span id="buttonState" class="status-value">⚪ Off</span>
</div>
<div class="status-item">
<span class="status-label">Frames:</span>
<span id="frameCounter" class="status-value">0</span>
</div>
</div>
<div class="info-text">
⚡ Single WebSocket connection • Video + Mouse + Clicks • Zero polling • 30fps
</div>
</div>
</div>
<audio id="browserAudio" loop style="display:none;">
<source src="/static/sound.mp3" type="audio/mpeg">
</audio>
<script>
const canvas = document.getElementById('canvas');
const ctx = canvas.getContext('2d');
const browserAudio = document.getElementById('browserAudio');
let frameCount = 0;
let lastFrameTime = performance.now();
let fps = 0;
// WebSocket connection
const ws = new WebSocket((location.protocol === 'https:' ? 'wss:' : 'ws:') + '//' + window.location.host + '/ws');
ws.binaryType = 'arraybuffer';
// Connection status
ws.onopen = () => {
document.getElementById('wsStatus').innerHTML = '🟢 Connected';
document.getElementById('wsStatus').className = 'status-value';
};
ws.onclose = () => {
document.getElementById('wsStatus').innerHTML = '🔴 Disconnected';
document.getElementById('wsStatus').className = 'status-value';
};
// Handle incoming messages (video frames + state updates)
ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
// Video frame
const data = new Uint8Array(event.data);
const header = new TextDecoder().decode(data.slice(0, 6));
if (header === 'FRAME:') {
// Extract JPEG data
const jpegData = data.slice(6);
// Convert to blob and create image
const blob = new Blob([jpegData], {type: 'image/jpeg'});
const url = URL.createObjectURL(blob);
const img = new Image();
img.onload = () => {
ctx.drawImage(img, 0, 0, 640, 480);
URL.revokeObjectURL(url);
// Update frame counter
frameCount++;
document.getElementById('frameCounter').innerHTML = frameCount;
// Calculate FPS occasionally
const now = performance.now();
if (now - lastFrameTime > 1000) {
fps = frameCount;
frameCount = 0;
lastFrameTime = now;
}
};
img.src = url;
}
} else {
// JSON message (state updates)
try {
const data = JSON.parse(event.data);
if (data.type === 'state') {
// Update button state from server
document.getElementById('buttonState').innerHTML =
data.button ? '✅ Clicked' : '⚪ Off';
}
} catch (e) {
console.log('Received:', event.data);
}
}
};
// Mouse tracking (throttled)
let mouseTimer;
canvas.addEventListener('mousemove', (e) => {
const rect = canvas.getBoundingClientRect();
const scaleX = canvas.width / rect.width;
const scaleY = canvas.height / rect.height;
const x = Math.round((e.clientX - rect.left) * scaleX);
const y = Math.round((e.clientY - rect.top) * scaleY);
// Clamp to canvas boundaries
const clampedX = Math.max(0, Math.min(640, x));
const clampedY = Math.max(0, Math.min(480, y));
document.getElementById('mouseCoords').innerHTML = `X: ${clampedX}, Y: ${clampedY}`;
// Throttle to 30fps
if (mouseTimer) clearTimeout(mouseTimer);
mouseTimer = setTimeout(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'mouse',
x: clampedX,
y: clampedY
}));
}
}, 33);
});
// Click handling
canvas.addEventListener('click', (e) => {
const rect = canvas.getBoundingClientRect();
const scaleX = canvas.width / rect.width;
const scaleY = canvas.height / rect.height;
const x = Math.round((e.clientX - rect.left) * scaleX);
const y = Math.round((e.clientY - rect.top) * scaleY);
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'click',
x: x,
y: y
}));
}
});
// Sound handling
function setSound(source) {
// Update UI
document.getElementById('btnNone').className = source === 'none' ? 'active' : '';
document.getElementById('btnPygame').className = source === 'pygame' ? 'active' : '';
document.getElementById('btnBrowser').className = source === 'browser' ? 'active' : '';
// Handle browser audio
if (source === 'browser') {
browserAudio.play().catch(e => console.log('Audio error:', e));
} else {
browserAudio.pause();
browserAudio.currentTime = 0;
}
// Send to server
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'sound',
source: source
}));
}
}
// Clean up on page unload
window.addEventListener('beforeunload', () => {
ws.close();
});
</script>
</body>
</html>
''')
@app.route('/static/sound.mp3')
def serve_sound():
"""Serve sound file for browser playback"""
if os.path.exists('sound.mp3'):
with open('sound.mp3', 'rb') as f:
return Response(f.read(), mimetype='audio/mpeg')
return 'Sound not found', 404
if __name__ == '__main__':
print("\n" + "="*70)
print("🎮 Pygame + WebSocket Only")
print("="*70)
print("🔌 Single WebSocket for everything:")
print(" • Video streaming (30fps JPEG)")
print(" • Mouse tracking")
print(" • Click handling")
print(" • Sound control")
print("📡 Zero HTTP API calls")
print("🖱️ Interactive button inside Pygame")
print("⏱️ Live clock display")
print("\n🌐 Main page: /")
print("="*70 + "\n")
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=False, threaded=True)