webrtc1 / app.py
Alvin3y1's picture
Update app.py
d7ee57b verified
import asyncio
import json
import os
import shutil
import subprocess
import time
import logging
import threading
import struct
import psutil
import mss
import mss.exception
# --- PERFORMANCE IMPORTS ---
from turbojpeg import TurboJPEG, TJPF_BGRA, TJSAMP_420
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceServer, RTCConfiguration
# X11 Imports
from Xlib import X, display
from Xlib.ext import damage
# --- CONFIGURATION ---
HOST = "0.0.0.0"
PORT = 7860
DISPLAY_NUM = ":99"
JPEG_QUALITY = 70
# Init TurboJPEG
try:
jpeg_encoder = TurboJPEG()
except Exception as e:
print("WARNING: TurboJPEG not found. Please install libturbojpeg0.")
exit(1)
# ICE Servers
TURN_USER = "g08abe68c81a07f098bb5f0914549bb32440e5aad0b216c7fba2b61e76fd62c6"
TURN_PASS = "aed1a10dd10eba9401ad9d99e5c66036d8a970eab5ba8e6dc9845ab57c771a7d"
ICE_SERVERS = [
RTCIceServer(urls=["turns:turn.cloudflare.com:443?transport=tcp", "turn:turn.cloudflare.com:3478?transport=udp"], username=TURN_USER, credential=TURN_PASS),
RTCIceServer(urls=["stun:stun.l.google.com:19302"])
]
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger("DeltaPlasma")
# Global State
pcs = set()
config = {"width": 1280, "height": 720}
config_lock = threading.Lock()
# --- X11 DAMAGE LISTENER ---
class X11DamageListener(threading.Thread):
def __init__(self):
super().__init__()
self.daemon = True
self.running = True
self.lock = threading.Lock()
self.dirty_bbox = None
self.display = None
def wait_for_display(self):
while self.running:
try:
self.display = display.Display(DISPLAY_NUM)
self.root = self.display.screen().root
self.damage_obj = self.root.damage_create(damage.DamageReportBoundingBox)
return
except Exception:
time.sleep(1)
def run(self):
self.wait_for_display()
if not self.display: return
try:
damage_event_base = self.display.info.ext_codes['DAMAGE'].first_event
except: return
while self.running:
try:
event = self.display.next_event()
if event.type == damage_event_base + X.DamageNotify:
rect = event.area
with self.lock:
self._merge_rect(rect.x, rect.y, rect.width, rect.height)
self.damage_obj.subtract(0, 0)
except Exception:
time.sleep(0.5)
try: self.wait_for_display()
except: pass
def _merge_rect(self, x, y, w, h):
x2, y2 = x + w, y + h
if self.dirty_bbox is None:
self.dirty_bbox = [x, y, x2, y2]
else:
bx1, by1, bx2, by2 = self.dirty_bbox
self.dirty_bbox = [min(bx1, x), min(by1, y), max(bx2, x2), max(by2, y2)]
def get_and_clear_damage(self):
with self.lock:
if self.dirty_bbox is None: return None
try:
x1, y1, x2, y2 = self.dirty_bbox
self.dirty_bbox = None
x1 = max(0, x1); y1 = max(0, y1)
return {'top': int(y1), 'left': int(x1), 'width': int(x2 - x1), 'height': int(y2 - y1)}
except:
self.dirty_bbox = None
return None
x11_monitor = X11DamageListener()
# --- INPUT MANAGER ---
class InputManager:
def __init__(self):
self.process = None
self.lock = threading.Lock()
self.scroll_accum = 0
def start_process(self):
if not os.environ.get("DISPLAY"): return
try:
self.process = subprocess.Popen(['xdotool', '-'], stdin=subprocess.PIPE, encoding='utf-8', bufsize=0)
except: pass
def _send_raw(self, command):
if self.process is None or self.process.poll() is not None: self.start_process()
if self.process:
try:
self.process.stdin.write(command + "\n")
self.process.stdin.flush()
except: self.process = None
def send(self, cmd):
with self.lock: self._send_raw(cmd)
def mouse_move(self, x, y): self.send(f"mousemove {x} {y}")
def mouse_down(self, btn): self.send(f"mousedown {btn}")
def mouse_up(self, btn): self.send(f"mouseup {btn}")
def key_down(self, key): self.send(f"keydown {key}")
def key_up(self, key): self.send(f"keyup {key}")
def scroll(self, dy):
self.scroll_accum += dy
if abs(self.scroll_accum) >= 40:
clicks = int(self.scroll_accum / 40)
btn = 5 if clicks > 0 else 4
for _ in range(abs(clicks)): self._send_raw(f"click {btn}")
self.scroll_accum %= 40
input_manager = InputManager()
# --- SYSTEM MANAGEMENT ---
def start_system():
# 1. Setup Environment
os.environ["DISPLAY"] = DISPLAY_NUM
# Fix XDG_RUNTIME_DIR error
runtime_dir = f"/tmp/runtime-{os.getuid()}"
if not os.path.exists(runtime_dir):
os.makedirs(runtime_dir, mode=0o700, exist_ok=True)
os.environ["XDG_RUNTIME_DIR"] = runtime_dir
if not shutil.which("Xvfb"): raise Exception("Xvfb missing")
# 2. Start Xvfb
print("Starting Xvfb...")
subprocess.Popen(["Xvfb", DISPLAY_NUM, "-screen", "0", "1280x720x24", "-ac", "-noreset", "+extension", "RANDR"])
time.sleep(2)
# 3. Start Blue Terminal (Visual Test)
if shutil.which("xterm"):
subprocess.Popen(["xterm", "-geometry", "80x20+50+50", "-bg", "blue", "-fg", "white", "-e", "top"], env=os.environ)
input_manager.start_process()
x11_monitor.start()
threading.Thread(target=maintain_plasma, daemon=True).start()
def maintain_plasma():
while True:
try:
running = False
for p in psutil.process_iter(['name']):
if p.info['name'] in ['plasmashell', 'startplasma-x11', 'xfce4-session', 'openbox']:
running = True; break
if not running:
print("Starting Session...")
env = os.environ.copy()
env["KWIN_COMPOSE"] = "O2"
env["LIBGL_ALWAYS_SOFTWARE"] = "1"
if shutil.which("startplasma-x11"):
subprocess.Popen("dbus-launch --exit-with-session startplasma-x11", shell=True, env=env)
elif shutil.which("openbox"):
subprocess.Popen("openbox", shell=True, env=env)
time.sleep(10)
except: pass
time.sleep(5)
def set_resolution(w, h):
with config_lock:
try:
if w % 2 != 0: w += 1
if h % 2 != 0: h += 1
# CRITICAL FIX: Add mode if missing (Fixes 'Size not found')
modeline = subprocess.check_output(f"cvt {w} {h} 60", shell=True).decode().split("Modeline ")[1].strip()
mode_name = modeline.split()[0].replace('"', '')
try: subprocess.run(f"xrandr --newmode {modeline}", shell=True, check=True)
except: pass # Mode might exist
try: subprocess.run(f"xrandr --addmode screen {mode_name}", shell=True, check=True)
except: pass
subprocess.run(f"xrandr --output screen --mode {mode_name}", shell=True)
config["width"] = w
config["height"] = h
print(f"Resolution set to {w}x{h}")
except Exception as e:
print(f"Resize Error: {e}")
def map_key(key):
charmap = {
"Control": "ctrl", "Shift": "shift", "Alt": "alt", "Meta": "super",
"Enter": "Return", "Backspace": "BackSpace", "Tab": "Tab", "Escape": "Escape",
"ArrowUp": "Up", "ArrowDown": "Down", "ArrowLeft": "Left", "ArrowRight": "Right",
"Delete": "Delete", "Insert": "Insert", "Space": "space", " ": "space"
}
return charmap.get(key, key)
# --- WEBRTC STREAM LOOP ---
async def stream_loop(channel):
with config_lock:
channel.send(json.dumps({"type": "init", "width": config["width"], "height": config["height"]}))
sct = None
force_update = True
last_frame_time = time.time()
try:
while channel.readyState == "open":
loop_start = time.time()
if channel.bufferedAmount > 1000000:
await asyncio.sleep(0.01)
continue
# MSS Init
if sct is None:
try:
sct = mss.mss()
monitor_idx = 1 if len(sct.monitors) > 1 else 0
except Exception:
await asyncio.sleep(1)
continue
# Capture
try:
with config_lock:
max_w, max_h = config["width"], config["height"]
rect = x11_monitor.get_and_clear_damage()
if not rect and (time.time() - last_frame_time > 2.0):
force_update = True
if force_update:
rect = {'top': 0, 'left': 0, 'width': max_w, 'height': max_h}
force_update = False
if rect and rect['width'] > 0 and rect['height'] > 0:
r_left = max(0, min(rect['left'], max_w - 1))
r_top = max(0, min(rect['top'], max_h - 1))
r_width = min(rect['width'], max_w - r_left)
r_height = min(rect['height'], max_h - r_top)
if r_width > 0 and r_height > 0:
cap_rect = {"top": r_top, "left": r_left, "width": r_width, "height": r_height, "mon": monitor_idx}
sct_img = sct.grab(cap_rect)
jpeg_bytes = jpeg_encoder.encode(
sct_img.bgra,
quality=JPEG_QUALITY,
pixel_format=TJPF_BGRA,
jpeg_subsample=TJSAMP_420
)
header = struct.pack('>HHHH', r_left, r_top, r_width, r_height)
channel.send(header + jpeg_bytes)
last_frame_time = time.time()
except Exception:
if sct: sct.close()
sct = None
force_update = True
await asyncio.sleep(0.1)
elapsed = time.time() - loop_start
await asyncio.sleep(max(0, 0.033 - elapsed))
except Exception: pass
finally:
if sct: sct.close()
def process_input(msg_str):
try:
msg = json.loads(msg_str)
t = msg.get("type")
with config_lock: w, h = config["width"], config["height"]
if t == "resize":
set_resolution(int(msg["width"]), int(msg["height"]))
elif t == "mousemove":
# Clamp coordinates to avoid XTest error
mx = max(0, min(int(msg["x"] * w), w - 1))
my = max(0, min(int(msg["y"] * h), h - 1))
input_manager.mouse_move(mx, my)
elif t == "mousedown":
# CRITICAL FIX: Map JS buttons (0,1,2) to X11 (1,2,3)
# Default to 1 (Left) if invalid
btn = {0:1, 1:2, 2:3}.get(msg.get("button"), 1)
input_manager.mouse_down(btn)
elif t == "mouseup":
btn = {0:1, 1:2, 2:3}.get(msg.get("button"), 1)
input_manager.mouse_up(btn)
elif t == "wheel":
input_manager.scroll(msg.get("deltaY", 0))
elif t == "keydown":
k = map_key(msg.get("key"))
if k: input_manager.key_down(k)
elif t == "keyup":
k = map_key(msg.get("key"))
if k: input_manager.key_up(k)
except: pass
async def offer(request):
try:
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
except: return web.Response(status=400)
pc = RTCPeerConnection(RTCConfiguration(iceServers=ICE_SERVERS))
pcs.add(pc)
@pc.on("connectionstatechange")
async def on_state():
if pc.connectionState in ["failed", "closed"]:
await pc.close()
pcs.discard(pc)
@pc.on("datachannel")
def on_datachannel(channel):
if channel.label == "video":
asyncio.create_task(stream_loop(channel))
elif channel.label == "input":
channel.on("message", lambda m: asyncio.get_event_loop().run_in_executor(None, process_input, m))
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
content_type="application/json",
text=json.dumps({"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}),
headers={"Access-Control-Allow-Origin": "*"}
)
async def index(r): return web.Response(text="Turbo WebRTC Server Running")
async def on_shutdown(app):
x11_monitor.running = False
await asyncio.gather(*[pc.close() for pc in pcs])
if __name__ == "__main__":
start_system()
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_get("/", index)
app.router.add_post("/offer", offer)
web.run_app(app, host=HOST, port=PORT)