import asyncio import json import os import shutil import subprocess import time import logging import concurrent.futures import threading import numpy as np import psutil import ctypes from ctypes import c_int, c_void_p, c_char_p, POINTER, c_ubyte, c_bool from aiohttp import web from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack, RTCIceServer, RTCConfiguration from av import VideoFrame # ========================================== # C++ X11 CAPTURE + FAULT TOLERANCE + DUAL CHANNEL # ========================================== CPP_SOURCE = r""" #include #include #include #include #include #include #include #include #include #include extern "C" { #include #include } // Custom Error Handler static int last_x_error_code = 0; int XErrorHandlerImpl(Display *display, XErrorEvent *event) { last_x_error_code = event->error_code; return 0; } struct ScreenCap { Display* display; // For Video Capture Display* input_display; // For Input Injection (Separate Channel) Window root; XImage* image; XShmSegmentInfo shminfo; int width; int height; int is_init; struct SwsContext* sws_ctx; }; static ScreenCap cap = {0}; extern "C" { void cleanup() { if (cap.sws_ctx) { sws_freeContext(cap.sws_ctx); cap.sws_ctx = NULL; } if (cap.image) { XShmDetach(cap.display, &cap.shminfo); XDestroyImage(cap.image); shmdt(cap.shminfo.shmaddr); shmctl(cap.shminfo.shmid, IPC_RMID, 0); cap.image = NULL; } if (cap.input_display) { XCloseDisplay(cap.input_display); cap.input_display = NULL; } if (cap.display) { XCloseDisplay(cap.display); cap.display = NULL; } cap.is_init = 0; } int init_grabber(int w, int h, const char* display_name) { if (cap.is_init && cap.width == w && cap.height == h && cap.display) return 1; if (cap.is_init) cleanup(); XSetErrorHandler(XErrorHandlerImpl); if (!XInitThreads()) return 0; // 1. Video Connection cap.display = XOpenDisplay(display_name); if (!cap.display) return 0; // 2. Input Connection (Separate socket for lower latency) cap.input_display = XOpenDisplay(display_name); if (!cap.input_display) { XCloseDisplay(cap.display); return 0; } cap.root = DefaultRootWindow(cap.display); cap.width = w; cap.height = h; XWindowAttributes window_attributes; XGetWindowAttributes(cap.display, cap.root, &window_attributes); Screen* screen = window_attributes.screen; cap.shminfo.shmid = shmget(IPC_PRIVATE, w * h * 4, IPC_CREAT | 0777); cap.shminfo.shmaddr = (char*)shmat(cap.shminfo.shmid, 0, 0); cap.shminfo.readOnly = False; cap.image = XShmCreateImage(cap.display, DefaultVisualOfScreen(screen), window_attributes.depth, ZPixmap, NULL, &cap.shminfo, w, h); if (!cap.image) { cleanup(); return 0; } cap.image->data = cap.shminfo.shmaddr; XShmAttach(cap.display, &cap.shminfo); XSync(cap.display, False); // SWS_FAST_BILINEAR is good, SWS_POINT is faster but blocky. // Using BILINEAR for balance. cap.sws_ctx = sws_getContext(w, h, AV_PIX_FMT_BGRA, w, h, AV_PIX_FMT_YUV420P, SWS_FAST_BILINEAR, NULL, NULL, NULL); if (!cap.sws_ctx) { cleanup(); return 0; } cap.is_init = 1; return 1; } int capture_frame() { if (cap.is_init && cap.display && cap.image) { last_x_error_code = 0; // This blocks only the video thread XShmGetImage(cap.display, cap.root, cap.image, 0, 0, AllPlanes); return (last_x_error_code == 0); } return 0; } // Optimized pointer math happens inside sws_scale void convert_to_yuv(void* y, int y_stride, void* u, int u_stride, void* v, int v_stride) { if (!cap.is_init || !cap.sws_ctx || !cap.image) return; const uint8_t* srcSlice[] = { (uint8_t*)cap.image->data }; const int srcStride[] = { cap.width * 4 }; uint8_t* dst[] = { (uint8_t*)y, (uint8_t*)u, (uint8_t*)v }; const int dstStride[] = { y_stride, u_stride, v_stride }; sws_scale(cap.sws_ctx, srcSlice, srcStride, 0, cap.height, dst, dstStride); } // INPUT FUNCTIONS USE SEPARATE DISPLAY CONNECTION void move_mouse(int x, int y) { if (!cap.is_init || !cap.input_display) return; XTestFakeMotionEvent(cap.input_display, -1, x, y, CurrentTime); XFlush(cap.input_display); // Flush only input stream } void mouse_button(int button, int is_down) { if (!cap.is_init || !cap.input_display) return; XTestFakeButtonEvent(cap.input_display, button, is_down ? True : False, CurrentTime); XFlush(cap.input_display); } void key_send(const char* key_name, int is_down) { if (!cap.is_init || !cap.input_display) return; KeySym ks = XStringToKeysym(key_name); if (ks != NoSymbol) { KeyCode kc = XKeysymToKeycode(cap.input_display, ks); if (kc != 0) { XTestFakeKeyEvent(cap.input_display, kc, is_down ? True : False, CurrentTime); XFlush(cap.input_display); } } } } """ LIB_PATH = "./libxcapture_full.so" def compile_cpp(): if os.path.exists(LIB_PATH): try: os.remove(LIB_PATH) except: pass with open("xcapture.cpp", "w") as f: f.write(CPP_SOURCE) # ADDED: -march=native -ffast-math -flto for CPU optimization cmd = [ "g++", "-O3", "-march=native", "-ffast-math", "-flto", "-shared", "-fPIC", "-o", LIB_PATH, "xcapture.cpp", "-lX11", "-lXext", "-lswscale", "-lavutil", "-lXtst" ] try: subprocess.check_call(cmd) print("C++ Optimized Library Compiled.") except Exception as e: print(f"Compilation failed: {e}") compile_cpp() # Load C++ Library try: xlib = ctypes.CDLL(LIB_PATH) xlib.init_grabber.argtypes = [c_int, c_int, c_char_p] xlib.init_grabber.restype = c_int xlib.capture_frame.argtypes = [] xlib.capture_frame.restype = c_int xlib.cleanup.argtypes = [] xlib.cleanup.restype = None xlib.convert_to_yuv.argtypes = [c_void_p, c_int, c_void_p, c_int, c_void_p, c_int] xlib.convert_to_yuv.restype = None xlib.move_mouse.argtypes = [c_int, c_int] xlib.move_mouse.restype = None xlib.mouse_button.argtypes = [c_int, c_int] xlib.mouse_button.restype = None xlib.key_send.argtypes = [c_char_p, c_int] xlib.key_send.restype = None USE_CSHM = True except Exception as e: print(f"Library load failed: {e}") USE_CSHM = False HOST = "0.0.0.0" PORT = 7860 DISPLAY_NUM = ":99" MAX_WIDTH = 4096 MAX_HEIGHT = 4096 DEFAULT_WIDTH = 1280 DEFAULT_HEIGHT = 720 logging.basicConfig(level=logging.WARNING) # Dedicated thread for video capture video_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) # LOCKS # video_lock: Protects XShm, resizing, and video display connection video_lock = threading.Lock() # Input does NOT use a lock anymore because it uses a separate X11 connection in C++ config = { "width": DEFAULT_WIDTH, "height": DEFAULT_HEIGHT } class InputManager: def __init__(self): self.scroll_accum = 0 # No locking needed here, C++ handles separate connection def mouse_move(self, x, y): if USE_CSHM: xlib.move_mouse(x, y) def mouse_down(self, btn): if USE_CSHM: xlib.mouse_button(btn, 1) def mouse_up(self, btn): if USE_CSHM: xlib.mouse_button(btn, 0) def scroll(self, dy): self.scroll_accum += dy if abs(self.scroll_accum) >= 40: btn = 4 if self.scroll_accum < 0 else 5 if USE_CSHM: xlib.mouse_button(btn, 1) xlib.mouse_button(btn, 0) self.scroll_accum = 0 def key_down(self, key): if USE_CSHM and key: xlib.key_send(key.encode('utf-8'), 1) def key_up(self, key): if USE_CSHM and key: xlib.key_send(key.encode('utf-8'), 0) input_manager = InputManager() def start_system(): os.environ["DISPLAY"] = DISPLAY_NUM try: os.remove(f"/tmp/.X{DISPLAY_NUM.replace(':', '')}-lock") except: pass if not shutil.which("Xvfb"): raise FileNotFoundError("Xvfb missing") subprocess.Popen([ "Xvfb", DISPLAY_NUM, "-screen", "0", f"{MAX_WIDTH}x{MAX_HEIGHT}x24", "-ac", "-noreset", "-nolisten", "tcp" ], stderr=subprocess.DEVNULL) time.sleep(1) # Reduced sleep set_resolution(DEFAULT_WIDTH, DEFAULT_HEIGHT) if shutil.which("matchbox-window-manager"): subprocess.Popen("matchbox-window-manager -use_titlebar no", shell=True, stderr=subprocess.DEVNULL) threading.Thread(target=maintain_antigravity, daemon=True).start() def maintain_antigravity(): while True: try: # Check optimization: Avoid list parsing overhead running = False for p in psutil.process_iter(['name']): if p.info['name'] == "antigravity": running = True break if not running: subprocess.Popen(["antigravity"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except: pass time.sleep(5) # Increased sleep to save CPU def set_resolution(w, h): with video_lock: if w == config["width"] and h == config["height"]: return try: if w % 2 != 0: w += 1 if h % 2 != 0: h += 1 if w > MAX_WIDTH: w = MAX_WIDTH if h > MAX_HEIGHT: h = MAX_HEIGHT mode_name = f"M_{w}_{h}" # Combine xrandr calls to reduce process fork overhead? # Xrandr is CLI, safer to keep separate but fast. subprocess.call(["xrandr", "--newmode", mode_name, f"{60*w*h/1000000:.2f}", str(w), str(w+40), str(w+80), str(w+160), str(h), str(h+3), str(h+10), str(h+16), "-hsync", "+vsync"], stderr=subprocess.DEVNULL) subprocess.call(["xrandr", "--addmode", "screen", mode_name], stderr=subprocess.DEVNULL) subprocess.call(["xrandr", "--output", "screen", "--mode", mode_name], stderr=subprocess.DEVNULL) config["width"] = w config["height"] = h print(f"Resized to {w}x{h}") except Exception as e: print(f"Resize failed: {e}") class VirtualScreenTrack(VideoStreamTrack): kind = "video" def __init__(self): super().__init__() self.last_w = 0 self.last_h = 0 self.fallback_frame = None def _produce_frame(self, w, h): if not USE_CSHM: return None # Only lock the video part. Input continues in parallel. with video_lock: try: if w != self.last_w or h != self.last_h: res = xlib.init_grabber(w, h, DISPLAY_NUM.encode('utf-8')) if res == 0: return None self.last_w = w self.last_h = h # 1. Capture (Copy X11 -> Shared Memory) if xlib.capture_frame() == 0: return None # 2. Allocate Frame (Python overhead, but required for aiortc) frame = VideoFrame(width=w, height=h, format="yuv420p") # 3. Convert (Shared Memory -> Frame Buffer) # Using direct address integer for speed xlib.convert_to_yuv( c_void_p(int(frame.planes[0].buffer_ptr)), frame.planes[0].line_size, c_void_p(int(frame.planes[1].buffer_ptr)), frame.planes[1].line_size, c_void_p(int(frame.planes[2].buffer_ptr)), frame.planes[2].line_size ) return frame except: return None async def recv(self): pts, time_base = await self.next_timestamp() w, h = config["width"], config["height"] frame = None if USE_CSHM: # Offload heavy C++ work to thread. # While this runs, the Main Thread can process "process_input" (Mouse/Keys) # because we are not holding the Global Interpreter Lock (ctypes releases it) # and we are not holding a global "input lock". try: frame = await asyncio.get_event_loop().run_in_executor( video_executor, self._produce_frame, w, h ) except: pass if frame is None: if self.fallback_frame is None or self.fallback_frame.width != w or self.fallback_frame.height != h: self.fallback_frame = VideoFrame.from_ndarray(np.zeros((h, w, 3), dtype=np.uint8), format="bgr24") frame = self.fallback_frame frame.pts = pts frame.time_base = time_base return frame async def offer(request): try: params = await request.json() offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) except: return web.Response(status=400) ice_servers = [RTCIceServer(urls=["stun:stun.l.google.com:19302"])] pc = RTCPeerConnection(RTCConfiguration(iceServers=ice_servers)) pcs.add(pc) @pc.on("connectionstatechange") async def on_state(): if pc.connectionState in ["failed", "closed"]: await pc.close() pcs.discard(pc) @pc.on("datachannel") def on_dc(channel): @channel.on("message") async def on_message(message): # Direct call to input processing await process_input(message) pc.addTrack(VirtualScreenTrack()) await pc.setRemoteDescription(offer) answer = await pc.createAnswer() # FORCE HIGHER BITRATE (4Mbps) # Injecting bandwidth info into SDP before setting local description sdp_lines = answer.sdp.splitlines() new_sdp = [] for line in sdp_lines: new_sdp.append(line) if line.startswith("m=video"): new_sdp.append("b=AS:4000") new_sdp.append("b=TIAS:4000000") answer.sdp = "\r\n".join(new_sdp) await pc.setLocalDescription(answer) return web.Response(content_type="application/json", text=json.dumps({ "sdp": pc.localDescription.sdp, "type": pc.localDescription.type }), headers={"Access-Control-Allow-Origin": "*"}) def map_key(key): if not key: return None mapping = {"Enter": "Return", "ArrowUp": "Up", "ArrowDown": "Down", "ArrowLeft": "Left", "ArrowRight": "Right"} return mapping.get(key, key) resize_timer = None async def handle_debounced_resize(w, h): global resize_timer if resize_timer: resize_timer.cancel() async def task(): # REDUCED DELAY FROM 0.5 to 0.1 FOR FASTER RESIZING await asyncio.sleep(0.1) await asyncio.get_event_loop().run_in_executor(video_executor, set_resolution, w, h) resize_timer = asyncio.create_task(task()) async def process_input(data): try: # Optimized parsing msg = json.loads(data) t = msg.get("type") # Immediate dispatch if t == "mousemove": w, h = config["width"], config["height"] input_manager.mouse_move(int(msg["x"] * w), int(msg["y"] * h)) elif t == "mousedown": input_manager.mouse_down({0:1, 1:2, 2:3}.get(msg.get("button"), 1)) elif t == "mouseup": input_manager.mouse_up({0:1, 1:2, 2:3}.get(msg.get("button"), 1)) elif t == "keydown": k = map_key(msg.get("key")) if k: input_manager.key_down(k) elif t == "keyup": k = map_key(msg.get("key")) if k: input_manager.key_up(k) elif t == "wheel": input_manager.scroll(msg.get("deltaY", 0)) elif t == "resize": w, h = int(msg.get("width")), int(msg.get("height")) if w > 100 and h > 100: await handle_debounced_resize(w, h) except: pass async def index(r): return web.Response(text="Optimized") async def options(r): return web.Response(headers={"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type"}) pcs = set() async def on_shutdown(app): with video_lock: if USE_CSHM: try: xlib.cleanup() except: pass if pcs: await asyncio.gather(*[pc.close() for pc in pcs]) if __name__ == "__main__": start_system() app = web.Application() app.on_shutdown.append(on_shutdown) app.router.add_get("/", index) app.router.add_post("/offer", offer) app.router.add_options("/offer", options) web.run_app(app, host=HOST, port=PORT)