web3 / app.py
Alvin3y1's picture
Update app.py
adcc74c verified
import asyncio
import json
import os
import subprocess
import time
import logging
import base64
import mss
import threading
from io import BytesIO
from PIL import Image
from Xlib import display, X
from Xlib.ext import xtest
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceServer, RTCConfiguration
# --- Configuration ---
HOST = "0.0.0.0"
PORT = 7860
DISPLAY_NUM = ":99"
WIDTH, HEIGHT = 1280, 720
# TURN/STUN Config
TURN_USER = "g08abe68c81a07f098bb5f0914549bb32440e5aad0b216c7fba2b61e76fd62c6"
TURN_PASS = "aed1a10dd10eba9401ad9d99e5c66036d8a970eab5ba8e6dc9845ab57c771a7d"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("X11-RTC")
class X11Engine:
def __init__(self):
self.sct = mss.mss()
self.dirty_rects = []
self.lock = threading.Lock()
self.use_damage = False
self.last_full_refresh = 0
self.alive = True
try:
self.d = display.Display(DISPLAY_NUM)
self.root = self.d.screen().root
if self.d.has_extension('DAMAGE'):
self.damage_ext = self.d.damage_create(self.root, X.DamageReportLevelNonEmpty)
self.use_damage = True
logger.info("X11 Damage Extension enabled.")
else:
logger.warning("DAMAGE extension missing. Polling mode.")
except Exception as e:
logger.error(f"X11 Init Failed: {e}")
self.d = None
def close(self):
self.alive = False
if self.d:
try:
self.d.close()
except: pass
def collect_damage(self):
if not self.d or not self.use_damage: return
try:
while self.d.pending_events() > 0:
ev = self.d.next_event()
if ev.type == self.d.extension_event.DamageNotify:
with self.lock:
self.dirty_rects.append((ev.area.x, ev.area.y, ev.area.width, ev.area.height))
self.d.damage_subtract(self.damage_ext, 0, 0)
except: pass
def get_patches(self):
if not self.alive: return []
with self.lock:
now = time.time()
# Force full refresh if it's been >3s OR if it's the very first frame (0)
force = (self.last_full_refresh == 0) or (now - self.last_full_refresh > 3.0)
if force:
rects = [(0, 0, WIDTH, HEIGHT)]
self.last_full_refresh = now
self.dirty_rects = []
elif self.dirty_rects:
rects = list(self.dirty_rects[-10:])
self.dirty_rects = []
else:
return []
patches = []
for (x, y, w, h) in rects:
try:
x = max(0, min(x, WIDTH - 1))
y = max(0, min(y, HEIGHT - 1))
w = max(1, min(w, WIDTH - x))
h = max(1, min(h, HEIGHT - y))
sct_img = self.sct.grab({"top": y, "left": x, "width": w, "height": h})
img = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
buf = BytesIO()
# Lower quality to 30 ensures the first big frame doesn't exceed UDP/DataChannel limits
q = 30 if (w > 1000 and h > 500) else 60
img.save(buf, format="JPEG", quality=q, optimize=True)
patches.append({
"x": x, "y": y, "w": w, "h": h,
"d": base64.b64encode(buf.getvalue()).decode('ascii')
})
except Exception as e:
logger.error(f"Capture error: {e}")
continue
return patches
def inject_input(self, msg):
if not self.d: return
try:
t = msg.get("type")
if t == "mousemove":
tx = int(max(0, min(msg["x"], 1.0)) * WIDTH)
ty = int(max(0, min(msg["y"], 1.0)) * HEIGHT)
xtest.fake_motion(self.d, tx, ty)
elif t == "mousedown":
xtest.fake_button_event(self.d, msg.get("button", 0) + 1, True)
elif t == "mouseup":
xtest.fake_button_event(self.d, msg.get("button", 0) + 1, False)
self.d.sync()
except: pass
async def patch_stream_loop(channel, engine):
logger.info("Stream started")
try:
# Reset refresh timer to force an immediate full frame on connect
engine.last_full_refresh = 0
while channel.readyState == "open":
start_time = time.perf_counter()
await asyncio.to_thread(engine.collect_damage)
patches = await asyncio.to_thread(engine.get_patches)
if patches:
try:
payload = json.dumps({"type": "patch", "p": patches})
channel.send(payload)
# logger.info(f"Sent {len(patches)} patches ({len(payload)} bytes)")
except Exception as e:
logger.error(f"Send failed: {e}")
break
await asyncio.sleep(max(0.01, 0.033 - (time.perf_counter() - start_time)))
except Exception as e:
logger.error(f"Stream Loop Error: {e}")
finally:
engine.close()
async def offer(request):
try:
params = await request.json()
pc = RTCPeerConnection(RTCConfiguration(iceServers=[
RTCIceServer(urls=["turns:turn.cloudflare.com:443?transport=tcp", "turn:turn.cloudflare.com:3478?transport=udp"],
username=TURN_USER, credential=TURN_PASS),
RTCIceServer(urls=["stun:stun.l.google.com:19302"])
]))
engine = X11Engine()
@pc.on("datachannel")
def on_dc(channel):
@channel.on("open")
def on_open(): asyncio.create_task(patch_stream_loop(channel, engine))
@channel.on("message")
def on_message(msg): engine.inject_input(json.loads(msg))
await pc.setRemoteDescription(RTCSessionDescription(sdp=params["sdp"], type=params["type"]))
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.json_response({"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}, headers={"Access-Control-Allow-Origin": "*"})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
async def index(request):
content = open("client2.html", "r").read()
return web.Response(content_type="text/html", text=content)
if __name__ == "__main__":
os.environ["DISPLAY"] = DISPLAY_NUM
# 1. Start Xvfb
logger.info("Starting Xvfb...")
subprocess.Popen(["Xvfb", DISPLAY_NUM, "-screen", "0", f"{WIDTH}x{HEIGHT}x24", "+extension", "DAMAGE", "-ac", "-noreset"])
time.sleep(2)
# 2. Start Fluxbox (Window Manager) - Fixes black screen issues
logger.info("Starting Fluxbox...")
subprocess.Popen(["fluxbox"], env=os.environ)
time.sleep(1)
# 3. Start Opera
logger.info("Starting Browser...")
cmd = "opera --no-sandbox --disable-gpu --disable-dev-shm-usage --start-maximized --window-size=1280,720 --window-position=0,0"
subprocess.Popen(cmd, shell=True, env=os.environ)
# 4. Start Server
app = web.Application()
app.router.add_get("/", index)
app.router.add_post("/offer", offer)
logger.info(f"Server running on port {PORT}")
web.run_app(app, host=HOST, port=PORT)