| import asyncio |
| import json |
| import os |
| import shutil |
| import subprocess |
| import time |
| import logging |
| import threading |
| import struct |
| import psutil |
| import mss |
| import mss.exception |
|
|
| |
| from turbojpeg import TurboJPEG, TJPF_BGRA, TJSAMP_420 |
| from aiohttp import web |
| from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceServer, RTCConfiguration |
|
|
| |
| from Xlib import X, display |
| from Xlib.ext import damage |
|
|
| |
| HOST = "0.0.0.0" |
| PORT = 7860 |
| DISPLAY_NUM = ":99" |
| JPEG_QUALITY = 70 |
|
|
| |
| try: |
| jpeg_encoder = TurboJPEG() |
| except Exception as e: |
| print("WARNING: TurboJPEG not found. Please install libturbojpeg0.") |
| exit(1) |
|
|
| |
| TURN_USER = "g08abe68c81a07f098bb5f0914549bb32440e5aad0b216c7fba2b61e76fd62c6" |
| TURN_PASS = "aed1a10dd10eba9401ad9d99e5c66036d8a970eab5ba8e6dc9845ab57c771a7d" |
| ICE_SERVERS = [ |
| RTCIceServer(urls=["turns:turn.cloudflare.com:443?transport=tcp", "turn:turn.cloudflare.com:3478?transport=udp"], username=TURN_USER, credential=TURN_PASS), |
| RTCIceServer(urls=["stun:stun.l.google.com:19302"]) |
| ] |
|
|
| logging.basicConfig(level=logging.WARNING) |
| logger = logging.getLogger("DeltaPlasma") |
|
|
| |
| pcs = set() |
| config = {"width": 1280, "height": 720} |
| config_lock = threading.Lock() |
|
|
| |
| class X11DamageListener(threading.Thread): |
| def __init__(self): |
| super().__init__() |
| self.daemon = True |
| self.running = True |
| self.lock = threading.Lock() |
| self.dirty_bbox = None |
| self.display = None |
|
|
| def wait_for_display(self): |
| while self.running: |
| try: |
| self.display = display.Display(DISPLAY_NUM) |
| self.root = self.display.screen().root |
| self.damage_obj = self.root.damage_create(damage.DamageReportBoundingBox) |
| return |
| except Exception: |
| time.sleep(1) |
|
|
| def run(self): |
| self.wait_for_display() |
| if not self.display: return |
| |
| try: |
| damage_event_base = self.display.info.ext_codes['DAMAGE'].first_event |
| except: return |
|
|
| while self.running: |
| try: |
| event = self.display.next_event() |
| if event.type == damage_event_base + X.DamageNotify: |
| rect = event.area |
| with self.lock: |
| self._merge_rect(rect.x, rect.y, rect.width, rect.height) |
| self.damage_obj.subtract(0, 0) |
| except Exception: |
| time.sleep(0.5) |
| try: self.wait_for_display() |
| except: pass |
|
|
| def _merge_rect(self, x, y, w, h): |
| x2, y2 = x + w, y + h |
| if self.dirty_bbox is None: |
| self.dirty_bbox = [x, y, x2, y2] |
| else: |
| bx1, by1, bx2, by2 = self.dirty_bbox |
| self.dirty_bbox = [min(bx1, x), min(by1, y), max(bx2, x2), max(by2, y2)] |
|
|
| def get_and_clear_damage(self): |
| with self.lock: |
| if self.dirty_bbox is None: return None |
| try: |
| x1, y1, x2, y2 = self.dirty_bbox |
| self.dirty_bbox = None |
| x1 = max(0, x1); y1 = max(0, y1) |
| return {'top': int(y1), 'left': int(x1), 'width': int(x2 - x1), 'height': int(y2 - y1)} |
| except: |
| self.dirty_bbox = None |
| return None |
|
|
| x11_monitor = X11DamageListener() |
|
|
| |
| class InputManager: |
| def __init__(self): |
| self.process = None |
| self.lock = threading.Lock() |
| self.scroll_accum = 0 |
|
|
| def start_process(self): |
| if not os.environ.get("DISPLAY"): return |
| try: |
| self.process = subprocess.Popen(['xdotool', '-'], stdin=subprocess.PIPE, encoding='utf-8', bufsize=0) |
| except: pass |
|
|
| def _send_raw(self, command): |
| if self.process is None or self.process.poll() is not None: self.start_process() |
| if self.process: |
| try: |
| self.process.stdin.write(command + "\n") |
| self.process.stdin.flush() |
| except: self.process = None |
|
|
| def send(self, cmd): |
| with self.lock: self._send_raw(cmd) |
|
|
| def mouse_move(self, x, y): self.send(f"mousemove {x} {y}") |
| def mouse_down(self, btn): self.send(f"mousedown {btn}") |
| def mouse_up(self, btn): self.send(f"mouseup {btn}") |
| def key_down(self, key): self.send(f"keydown {key}") |
| def key_up(self, key): self.send(f"keyup {key}") |
| def scroll(self, dy): |
| self.scroll_accum += dy |
| if abs(self.scroll_accum) >= 40: |
| clicks = int(self.scroll_accum / 40) |
| btn = 5 if clicks > 0 else 4 |
| for _ in range(abs(clicks)): self._send_raw(f"click {btn}") |
| self.scroll_accum %= 40 |
|
|
| input_manager = InputManager() |
|
|
| |
| def start_system(): |
| |
| os.environ["DISPLAY"] = DISPLAY_NUM |
| |
| |
| runtime_dir = f"/tmp/runtime-{os.getuid()}" |
| if not os.path.exists(runtime_dir): |
| os.makedirs(runtime_dir, mode=0o700, exist_ok=True) |
| os.environ["XDG_RUNTIME_DIR"] = runtime_dir |
|
|
| if not shutil.which("Xvfb"): raise Exception("Xvfb missing") |
| |
| |
| print("Starting Xvfb...") |
| subprocess.Popen(["Xvfb", DISPLAY_NUM, "-screen", "0", "1280x720x24", "-ac", "-noreset", "+extension", "RANDR"]) |
| time.sleep(2) |
| |
| |
| if shutil.which("xterm"): |
| subprocess.Popen(["xterm", "-geometry", "80x20+50+50", "-bg", "blue", "-fg", "white", "-e", "top"], env=os.environ) |
|
|
| input_manager.start_process() |
| x11_monitor.start() |
| |
| threading.Thread(target=maintain_plasma, daemon=True).start() |
|
|
| def maintain_plasma(): |
| while True: |
| try: |
| running = False |
| for p in psutil.process_iter(['name']): |
| if p.info['name'] in ['plasmashell', 'startplasma-x11', 'xfce4-session', 'openbox']: |
| running = True; break |
| |
| if not running: |
| print("Starting Session...") |
| env = os.environ.copy() |
| env["KWIN_COMPOSE"] = "O2" |
| env["LIBGL_ALWAYS_SOFTWARE"] = "1" |
| |
| if shutil.which("startplasma-x11"): |
| subprocess.Popen("dbus-launch --exit-with-session startplasma-x11", shell=True, env=env) |
| elif shutil.which("openbox"): |
| subprocess.Popen("openbox", shell=True, env=env) |
| |
| time.sleep(10) |
| except: pass |
| time.sleep(5) |
|
|
| def set_resolution(w, h): |
| with config_lock: |
| try: |
| if w % 2 != 0: w += 1 |
| if h % 2 != 0: h += 1 |
| |
| |
| modeline = subprocess.check_output(f"cvt {w} {h} 60", shell=True).decode().split("Modeline ")[1].strip() |
| mode_name = modeline.split()[0].replace('"', '') |
| |
| try: subprocess.run(f"xrandr --newmode {modeline}", shell=True, check=True) |
| except: pass |
| |
| try: subprocess.run(f"xrandr --addmode screen {mode_name}", shell=True, check=True) |
| except: pass |
| |
| subprocess.run(f"xrandr --output screen --mode {mode_name}", shell=True) |
| |
| config["width"] = w |
| config["height"] = h |
| print(f"Resolution set to {w}x{h}") |
| except Exception as e: |
| print(f"Resize Error: {e}") |
|
|
| def map_key(key): |
| charmap = { |
| "Control": "ctrl", "Shift": "shift", "Alt": "alt", "Meta": "super", |
| "Enter": "Return", "Backspace": "BackSpace", "Tab": "Tab", "Escape": "Escape", |
| "ArrowUp": "Up", "ArrowDown": "Down", "ArrowLeft": "Left", "ArrowRight": "Right", |
| "Delete": "Delete", "Insert": "Insert", "Space": "space", " ": "space" |
| } |
| return charmap.get(key, key) |
|
|
| |
| async def stream_loop(channel): |
| with config_lock: |
| channel.send(json.dumps({"type": "init", "width": config["width"], "height": config["height"]})) |
| |
| sct = None |
| force_update = True |
| last_frame_time = time.time() |
|
|
| try: |
| while channel.readyState == "open": |
| loop_start = time.time() |
|
|
| if channel.bufferedAmount > 1000000: |
| await asyncio.sleep(0.01) |
| continue |
|
|
| |
| if sct is None: |
| try: |
| sct = mss.mss() |
| monitor_idx = 1 if len(sct.monitors) > 1 else 0 |
| except Exception: |
| await asyncio.sleep(1) |
| continue |
|
|
| |
| try: |
| with config_lock: |
| max_w, max_h = config["width"], config["height"] |
|
|
| rect = x11_monitor.get_and_clear_damage() |
| |
| if not rect and (time.time() - last_frame_time > 2.0): |
| force_update = True |
|
|
| if force_update: |
| rect = {'top': 0, 'left': 0, 'width': max_w, 'height': max_h} |
| force_update = False |
|
|
| if rect and rect['width'] > 0 and rect['height'] > 0: |
| r_left = max(0, min(rect['left'], max_w - 1)) |
| r_top = max(0, min(rect['top'], max_h - 1)) |
| r_width = min(rect['width'], max_w - r_left) |
| r_height = min(rect['height'], max_h - r_top) |
|
|
| if r_width > 0 and r_height > 0: |
| cap_rect = {"top": r_top, "left": r_left, "width": r_width, "height": r_height, "mon": monitor_idx} |
| sct_img = sct.grab(cap_rect) |
| |
| jpeg_bytes = jpeg_encoder.encode( |
| sct_img.bgra, |
| quality=JPEG_QUALITY, |
| pixel_format=TJPF_BGRA, |
| jpeg_subsample=TJSAMP_420 |
| ) |
|
|
| header = struct.pack('>HHHH', r_left, r_top, r_width, r_height) |
| channel.send(header + jpeg_bytes) |
| last_frame_time = time.time() |
|
|
| except Exception: |
| if sct: sct.close() |
| sct = None |
| force_update = True |
| await asyncio.sleep(0.1) |
|
|
| elapsed = time.time() - loop_start |
| await asyncio.sleep(max(0, 0.033 - elapsed)) |
|
|
| except Exception: pass |
| finally: |
| if sct: sct.close() |
|
|
| def process_input(msg_str): |
| try: |
| msg = json.loads(msg_str) |
| t = msg.get("type") |
| with config_lock: w, h = config["width"], config["height"] |
| |
| if t == "resize": |
| set_resolution(int(msg["width"]), int(msg["height"])) |
| elif t == "mousemove": |
| |
| mx = max(0, min(int(msg["x"] * w), w - 1)) |
| my = max(0, min(int(msg["y"] * h), h - 1)) |
| input_manager.mouse_move(mx, my) |
| elif t == "mousedown": |
| |
| |
| btn = {0:1, 1:2, 2:3}.get(msg.get("button"), 1) |
| input_manager.mouse_down(btn) |
| elif t == "mouseup": |
| btn = {0:1, 1:2, 2:3}.get(msg.get("button"), 1) |
| input_manager.mouse_up(btn) |
| elif t == "wheel": |
| input_manager.scroll(msg.get("deltaY", 0)) |
| elif t == "keydown": |
| k = map_key(msg.get("key")) |
| if k: input_manager.key_down(k) |
| elif t == "keyup": |
| k = map_key(msg.get("key")) |
| if k: input_manager.key_up(k) |
| except: pass |
|
|
| async def offer(request): |
| try: |
| params = await request.json() |
| offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) |
| except: return web.Response(status=400) |
|
|
| pc = RTCPeerConnection(RTCConfiguration(iceServers=ICE_SERVERS)) |
| pcs.add(pc) |
|
|
| @pc.on("connectionstatechange") |
| async def on_state(): |
| if pc.connectionState in ["failed", "closed"]: |
| await pc.close() |
| pcs.discard(pc) |
|
|
| @pc.on("datachannel") |
| def on_datachannel(channel): |
| if channel.label == "video": |
| asyncio.create_task(stream_loop(channel)) |
| elif channel.label == "input": |
| channel.on("message", lambda m: asyncio.get_event_loop().run_in_executor(None, process_input, m)) |
|
|
| await pc.setRemoteDescription(offer) |
| answer = await pc.createAnswer() |
| await pc.setLocalDescription(answer) |
|
|
| return web.Response( |
| content_type="application/json", |
| text=json.dumps({"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}), |
| headers={"Access-Control-Allow-Origin": "*"} |
| ) |
|
|
| async def index(r): return web.Response(text="Turbo WebRTC Server Running") |
| async def on_shutdown(app): |
| x11_monitor.running = False |
| await asyncio.gather(*[pc.close() for pc in pcs]) |
|
|
| if __name__ == "__main__": |
| start_system() |
| app = web.Application() |
| app.on_shutdown.append(on_shutdown) |
| app.router.add_get("/", index) |
| app.router.add_post("/offer", offer) |
| web.run_app(app, host=HOST, port=PORT) |