import asyncio import json import os import subprocess import time import logging import base64 import mss import threading from io import BytesIO from PIL import Image from Xlib import display, X from Xlib.ext import xtest from aiohttp import web from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceServer, RTCConfiguration # --- Configuration --- HOST = "0.0.0.0" PORT = 7860 DISPLAY_NUM = ":99" WIDTH, HEIGHT = 1280, 720 # TURN/STUN Config TURN_USER = "g08abe68c81a07f098bb5f0914549bb32440e5aad0b216c7fba2b61e76fd62c6" TURN_PASS = "aed1a10dd10eba9401ad9d99e5c66036d8a970eab5ba8e6dc9845ab57c771a7d" logging.basicConfig(level=logging.INFO) logger = logging.getLogger("X11-RTC") class X11Engine: def __init__(self): self.sct = mss.mss() self.dirty_rects = [] self.lock = threading.Lock() self.use_damage = False self.last_full_refresh = 0 self.alive = True try: self.d = display.Display(DISPLAY_NUM) self.root = self.d.screen().root if self.d.has_extension('DAMAGE'): self.damage_ext = self.d.damage_create(self.root, X.DamageReportLevelNonEmpty) self.use_damage = True logger.info("X11 Damage Extension enabled.") else: logger.warning("DAMAGE extension missing. Polling mode.") except Exception as e: logger.error(f"X11 Init Failed: {e}") self.d = None def close(self): self.alive = False if self.d: try: self.d.close() except: pass def collect_damage(self): if not self.d or not self.use_damage: return try: while self.d.pending_events() > 0: ev = self.d.next_event() if ev.type == self.d.extension_event.DamageNotify: with self.lock: self.dirty_rects.append((ev.area.x, ev.area.y, ev.area.width, ev.area.height)) self.d.damage_subtract(self.damage_ext, 0, 0) except: pass def get_patches(self): if not self.alive: return [] with self.lock: now = time.time() # Force full refresh if it's been >3s OR if it's the very first frame (0) force = (self.last_full_refresh == 0) or (now - self.last_full_refresh > 3.0) if force: rects = [(0, 0, WIDTH, HEIGHT)] self.last_full_refresh = now self.dirty_rects = [] elif self.dirty_rects: rects = list(self.dirty_rects[-10:]) self.dirty_rects = [] else: return [] patches = [] for (x, y, w, h) in rects: try: x = max(0, min(x, WIDTH - 1)) y = max(0, min(y, HEIGHT - 1)) w = max(1, min(w, WIDTH - x)) h = max(1, min(h, HEIGHT - y)) sct_img = self.sct.grab({"top": y, "left": x, "width": w, "height": h}) img = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") buf = BytesIO() # Lower quality to 30 ensures the first big frame doesn't exceed UDP/DataChannel limits q = 30 if (w > 1000 and h > 500) else 60 img.save(buf, format="JPEG", quality=q, optimize=True) patches.append({ "x": x, "y": y, "w": w, "h": h, "d": base64.b64encode(buf.getvalue()).decode('ascii') }) except Exception as e: logger.error(f"Capture error: {e}") continue return patches def inject_input(self, msg): if not self.d: return try: t = msg.get("type") if t == "mousemove": tx = int(max(0, min(msg["x"], 1.0)) * WIDTH) ty = int(max(0, min(msg["y"], 1.0)) * HEIGHT) xtest.fake_motion(self.d, tx, ty) elif t == "mousedown": xtest.fake_button_event(self.d, msg.get("button", 0) + 1, True) elif t == "mouseup": xtest.fake_button_event(self.d, msg.get("button", 0) + 1, False) self.d.sync() except: pass async def patch_stream_loop(channel, engine): logger.info("Stream started") try: # Reset refresh timer to force an immediate full frame on connect engine.last_full_refresh = 0 while channel.readyState == "open": start_time = time.perf_counter() await asyncio.to_thread(engine.collect_damage) patches = await asyncio.to_thread(engine.get_patches) if patches: try: payload = json.dumps({"type": "patch", "p": patches}) channel.send(payload) # logger.info(f"Sent {len(patches)} patches ({len(payload)} bytes)") except Exception as e: logger.error(f"Send failed: {e}") break await asyncio.sleep(max(0.01, 0.033 - (time.perf_counter() - start_time))) except Exception as e: logger.error(f"Stream Loop Error: {e}") finally: engine.close() async def offer(request): try: params = await request.json() pc = RTCPeerConnection(RTCConfiguration(iceServers=[ RTCIceServer(urls=["turns:turn.cloudflare.com:443?transport=tcp", "turn:turn.cloudflare.com:3478?transport=udp"], username=TURN_USER, credential=TURN_PASS), RTCIceServer(urls=["stun:stun.l.google.com:19302"]) ])) engine = X11Engine() @pc.on("datachannel") def on_dc(channel): @channel.on("open") def on_open(): asyncio.create_task(patch_stream_loop(channel, engine)) @channel.on("message") def on_message(msg): engine.inject_input(json.loads(msg)) await pc.setRemoteDescription(RTCSessionDescription(sdp=params["sdp"], type=params["type"])) answer = await pc.createAnswer() await pc.setLocalDescription(answer) return web.json_response({"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}, headers={"Access-Control-Allow-Origin": "*"}) except Exception as e: return web.json_response({"error": str(e)}, status=500) async def index(request): content = open("client2.html", "r").read() return web.Response(content_type="text/html", text=content) if __name__ == "__main__": os.environ["DISPLAY"] = DISPLAY_NUM # 1. Start Xvfb logger.info("Starting Xvfb...") subprocess.Popen(["Xvfb", DISPLAY_NUM, "-screen", "0", f"{WIDTH}x{HEIGHT}x24", "+extension", "DAMAGE", "-ac", "-noreset"]) time.sleep(2) # 2. Start Fluxbox (Window Manager) - Fixes black screen issues logger.info("Starting Fluxbox...") subprocess.Popen(["fluxbox"], env=os.environ) time.sleep(1) # 3. Start Opera logger.info("Starting Browser...") cmd = "opera --no-sandbox --disable-gpu --disable-dev-shm-usage --start-maximized --window-size=1280,720 --window-position=0,0" subprocess.Popen(cmd, shell=True, env=os.environ) # 4. Start Server app = web.Application() app.router.add_get("/", index) app.router.add_post("/offer", offer) logger.info(f"Server running on port {PORT}") web.run_app(app, host=HOST, port=PORT)