plasma1 / app.py
Alvin3y1's picture
Update app.py
dc0f259 verified
import asyncio
import json
import os
import shutil
import subprocess
import time
import logging
import concurrent.futures
import threading
import numpy as np
import psutil
import ctypes
from ctypes import c_int, c_void_p, c_char_p, POINTER, c_ubyte, c_bool
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack, RTCIceServer, RTCConfiguration
from av import VideoFrame
# ==========================================
# C++ X11 CAPTURE + FAULT TOLERANCE + DUAL CHANNEL
# ==========================================
CPP_SOURCE = r"""
#include <X11/Xlib.h>
#include <X11/Xutil.h>
#include <X11/extensions/XShm.h>
#include <X11/extensions/XTest.h>
#include <X11/keysym.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
extern "C" {
#include <libswscale/swscale.h>
#include <libavutil/imgutils.h>
}
// Custom Error Handler
static int last_x_error_code = 0;
int XErrorHandlerImpl(Display *display, XErrorEvent *event) {
last_x_error_code = event->error_code;
return 0;
}
struct ScreenCap {
Display* display; // For Video Capture
Display* input_display; // For Input Injection (Separate Channel)
Window root;
XImage* image;
XShmSegmentInfo shminfo;
int width;
int height;
int is_init;
struct SwsContext* sws_ctx;
};
static ScreenCap cap = {0};
extern "C" {
void cleanup() {
if (cap.sws_ctx) {
sws_freeContext(cap.sws_ctx);
cap.sws_ctx = NULL;
}
if (cap.image) {
XShmDetach(cap.display, &cap.shminfo);
XDestroyImage(cap.image);
shmdt(cap.shminfo.shmaddr);
shmctl(cap.shminfo.shmid, IPC_RMID, 0);
cap.image = NULL;
}
if (cap.input_display) {
XCloseDisplay(cap.input_display);
cap.input_display = NULL;
}
if (cap.display) {
XCloseDisplay(cap.display);
cap.display = NULL;
}
cap.is_init = 0;
}
int init_grabber(int w, int h, const char* display_name) {
if (cap.is_init && cap.width == w && cap.height == h && cap.display) return 1;
if (cap.is_init) cleanup();
XSetErrorHandler(XErrorHandlerImpl);
if (!XInitThreads()) return 0;
// 1. Video Connection
cap.display = XOpenDisplay(display_name);
if (!cap.display) return 0;
// 2. Input Connection (Separate socket for lower latency)
cap.input_display = XOpenDisplay(display_name);
if (!cap.input_display) {
XCloseDisplay(cap.display);
return 0;
}
cap.root = DefaultRootWindow(cap.display);
cap.width = w;
cap.height = h;
XWindowAttributes window_attributes;
XGetWindowAttributes(cap.display, cap.root, &window_attributes);
Screen* screen = window_attributes.screen;
cap.shminfo.shmid = shmget(IPC_PRIVATE, w * h * 4, IPC_CREAT | 0777);
cap.shminfo.shmaddr = (char*)shmat(cap.shminfo.shmid, 0, 0);
cap.shminfo.readOnly = False;
cap.image = XShmCreateImage(cap.display, DefaultVisualOfScreen(screen),
window_attributes.depth, ZPixmap, NULL,
&cap.shminfo, w, h);
if (!cap.image) {
cleanup();
return 0;
}
cap.image->data = cap.shminfo.shmaddr;
XShmAttach(cap.display, &cap.shminfo);
XSync(cap.display, False);
// SWS_FAST_BILINEAR is good, SWS_POINT is faster but blocky.
// Using BILINEAR for balance.
cap.sws_ctx = sws_getContext(w, h, AV_PIX_FMT_BGRA,
w, h, AV_PIX_FMT_YUV420P,
SWS_FAST_BILINEAR, NULL, NULL, NULL);
if (!cap.sws_ctx) {
cleanup();
return 0;
}
cap.is_init = 1;
return 1;
}
int capture_frame() {
if (cap.is_init && cap.display && cap.image) {
last_x_error_code = 0;
// This blocks only the video thread
XShmGetImage(cap.display, cap.root, cap.image, 0, 0, AllPlanes);
return (last_x_error_code == 0);
}
return 0;
}
// Optimized pointer math happens inside sws_scale
void convert_to_yuv(void* y, int y_stride, void* u, int u_stride, void* v, int v_stride) {
if (!cap.is_init || !cap.sws_ctx || !cap.image) return;
const uint8_t* srcSlice[] = { (uint8_t*)cap.image->data };
const int srcStride[] = { cap.width * 4 };
uint8_t* dst[] = { (uint8_t*)y, (uint8_t*)u, (uint8_t*)v };
const int dstStride[] = { y_stride, u_stride, v_stride };
sws_scale(cap.sws_ctx, srcSlice, srcStride, 0, cap.height, dst, dstStride);
}
// INPUT FUNCTIONS USE SEPARATE DISPLAY CONNECTION
void move_mouse(int x, int y) {
if (!cap.is_init || !cap.input_display) return;
XTestFakeMotionEvent(cap.input_display, -1, x, y, CurrentTime);
XFlush(cap.input_display); // Flush only input stream
}
void mouse_button(int button, int is_down) {
if (!cap.is_init || !cap.input_display) return;
XTestFakeButtonEvent(cap.input_display, button, is_down ? True : False, CurrentTime);
XFlush(cap.input_display);
}
void key_send(const char* key_name, int is_down) {
if (!cap.is_init || !cap.input_display) return;
KeySym ks = XStringToKeysym(key_name);
if (ks != NoSymbol) {
KeyCode kc = XKeysymToKeycode(cap.input_display, ks);
if (kc != 0) {
XTestFakeKeyEvent(cap.input_display, kc, is_down ? True : False, CurrentTime);
XFlush(cap.input_display);
}
}
}
}
"""
LIB_PATH = "./libxcapture_full.so"
def compile_cpp():
if os.path.exists(LIB_PATH):
try: os.remove(LIB_PATH)
except: pass
with open("xcapture.cpp", "w") as f:
f.write(CPP_SOURCE)
# ADDED: -march=native -ffast-math -flto for CPU optimization
cmd = [
"g++", "-O3", "-march=native", "-ffast-math", "-flto", "-shared", "-fPIC",
"-o", LIB_PATH, "xcapture.cpp",
"-lX11", "-lXext", "-lswscale", "-lavutil", "-lXtst"
]
try:
subprocess.check_call(cmd)
print("C++ Optimized Library Compiled.")
except Exception as e:
print(f"Compilation failed: {e}")
compile_cpp()
# Load C++ Library
try:
xlib = ctypes.CDLL(LIB_PATH)
xlib.init_grabber.argtypes = [c_int, c_int, c_char_p]
xlib.init_grabber.restype = c_int
xlib.capture_frame.argtypes = []
xlib.capture_frame.restype = c_int
xlib.cleanup.argtypes = []
xlib.cleanup.restype = None
xlib.convert_to_yuv.argtypes = [c_void_p, c_int, c_void_p, c_int, c_void_p, c_int]
xlib.convert_to_yuv.restype = None
xlib.move_mouse.argtypes = [c_int, c_int]
xlib.move_mouse.restype = None
xlib.mouse_button.argtypes = [c_int, c_int]
xlib.mouse_button.restype = None
xlib.key_send.argtypes = [c_char_p, c_int]
xlib.key_send.restype = None
USE_CSHM = True
except Exception as e:
print(f"Library load failed: {e}")
USE_CSHM = False
HOST = "0.0.0.0"
PORT = 7860
DISPLAY_NUM = ":99"
MAX_WIDTH = 4096
MAX_HEIGHT = 4096
DEFAULT_WIDTH = 1280
DEFAULT_HEIGHT = 720
logging.basicConfig(level=logging.WARNING)
# Dedicated thread for video capture
video_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# LOCKS
# video_lock: Protects XShm, resizing, and video display connection
video_lock = threading.Lock()
# Input does NOT use a lock anymore because it uses a separate X11 connection in C++
config = {
"width": DEFAULT_WIDTH,
"height": DEFAULT_HEIGHT
}
class InputManager:
def __init__(self):
self.scroll_accum = 0
# No locking needed here, C++ handles separate connection
def mouse_move(self, x, y):
if USE_CSHM: xlib.move_mouse(x, y)
def mouse_down(self, btn):
if USE_CSHM: xlib.mouse_button(btn, 1)
def mouse_up(self, btn):
if USE_CSHM: xlib.mouse_button(btn, 0)
def scroll(self, dy):
self.scroll_accum += dy
if abs(self.scroll_accum) >= 40:
btn = 4 if self.scroll_accum < 0 else 5
if USE_CSHM:
xlib.mouse_button(btn, 1)
xlib.mouse_button(btn, 0)
self.scroll_accum = 0
def key_down(self, key):
if USE_CSHM and key: xlib.key_send(key.encode('utf-8'), 1)
def key_up(self, key):
if USE_CSHM and key: xlib.key_send(key.encode('utf-8'), 0)
input_manager = InputManager()
def start_system():
os.environ["DISPLAY"] = DISPLAY_NUM
try: os.remove(f"/tmp/.X{DISPLAY_NUM.replace(':', '')}-lock")
except: pass
if not shutil.which("Xvfb"): raise FileNotFoundError("Xvfb missing")
subprocess.Popen([
"Xvfb", DISPLAY_NUM,
"-screen", "0", f"{MAX_WIDTH}x{MAX_HEIGHT}x24",
"-ac", "-noreset", "-nolisten", "tcp"
], stderr=subprocess.DEVNULL)
time.sleep(1) # Reduced sleep
set_resolution(DEFAULT_WIDTH, DEFAULT_HEIGHT)
if shutil.which("matchbox-window-manager"):
subprocess.Popen("matchbox-window-manager -use_titlebar no", shell=True, stderr=subprocess.DEVNULL)
threading.Thread(target=maintain_antigravity, daemon=True).start()
def maintain_antigravity():
while True:
try:
# Check optimization: Avoid list parsing overhead
running = False
for p in psutil.process_iter(['name']):
if p.info['name'] == "antigravity":
running = True
break
if not running:
subprocess.Popen(["antigravity"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except: pass
time.sleep(5) # Increased sleep to save CPU
def set_resolution(w, h):
with video_lock:
if w == config["width"] and h == config["height"]:
return
try:
if w % 2 != 0: w += 1
if h % 2 != 0: h += 1
if w > MAX_WIDTH: w = MAX_WIDTH
if h > MAX_HEIGHT: h = MAX_HEIGHT
mode_name = f"M_{w}_{h}"
# Combine xrandr calls to reduce process fork overhead?
# Xrandr is CLI, safer to keep separate but fast.
subprocess.call(["xrandr", "--newmode", mode_name, f"{60*w*h/1000000:.2f}",
str(w), str(w+40), str(w+80), str(w+160),
str(h), str(h+3), str(h+10), str(h+16),
"-hsync", "+vsync"], stderr=subprocess.DEVNULL)
subprocess.call(["xrandr", "--addmode", "screen", mode_name], stderr=subprocess.DEVNULL)
subprocess.call(["xrandr", "--output", "screen", "--mode", mode_name], stderr=subprocess.DEVNULL)
config["width"] = w
config["height"] = h
print(f"Resized to {w}x{h}")
except Exception as e:
print(f"Resize failed: {e}")
class VirtualScreenTrack(VideoStreamTrack):
kind = "video"
def __init__(self):
super().__init__()
self.last_w = 0
self.last_h = 0
self.fallback_frame = None
def _produce_frame(self, w, h):
if not USE_CSHM: return None
# Only lock the video part. Input continues in parallel.
with video_lock:
try:
if w != self.last_w or h != self.last_h:
res = xlib.init_grabber(w, h, DISPLAY_NUM.encode('utf-8'))
if res == 0: return None
self.last_w = w
self.last_h = h
# 1. Capture (Copy X11 -> Shared Memory)
if xlib.capture_frame() == 0:
return None
# 2. Allocate Frame (Python overhead, but required for aiortc)
frame = VideoFrame(width=w, height=h, format="yuv420p")
# 3. Convert (Shared Memory -> Frame Buffer)
# Using direct address integer for speed
xlib.convert_to_yuv(
c_void_p(int(frame.planes[0].buffer_ptr)), frame.planes[0].line_size,
c_void_p(int(frame.planes[1].buffer_ptr)), frame.planes[1].line_size,
c_void_p(int(frame.planes[2].buffer_ptr)), frame.planes[2].line_size
)
return frame
except:
return None
async def recv(self):
pts, time_base = await self.next_timestamp()
w, h = config["width"], config["height"]
frame = None
if USE_CSHM:
# Offload heavy C++ work to thread.
# While this runs, the Main Thread can process "process_input" (Mouse/Keys)
# because we are not holding the Global Interpreter Lock (ctypes releases it)
# and we are not holding a global "input lock".
try:
frame = await asyncio.get_event_loop().run_in_executor(
video_executor, self._produce_frame, w, h
)
except: pass
if frame is None:
if self.fallback_frame is None or self.fallback_frame.width != w or self.fallback_frame.height != h:
self.fallback_frame = VideoFrame.from_ndarray(np.zeros((h, w, 3), dtype=np.uint8), format="bgr24")
frame = self.fallback_frame
frame.pts = pts
frame.time_base = time_base
return frame
async def offer(request):
try:
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
except: return web.Response(status=400)
ice_servers = [RTCIceServer(urls=["stun:stun.l.google.com:19302"])]
pc = RTCPeerConnection(RTCConfiguration(iceServers=ice_servers))
pcs.add(pc)
@pc.on("connectionstatechange")
async def on_state():
if pc.connectionState in ["failed", "closed"]:
await pc.close()
pcs.discard(pc)
@pc.on("datachannel")
def on_dc(channel):
@channel.on("message")
async def on_message(message):
# Direct call to input processing
await process_input(message)
pc.addTrack(VirtualScreenTrack())
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
# FORCE HIGHER BITRATE (4Mbps)
# Injecting bandwidth info into SDP before setting local description
sdp_lines = answer.sdp.splitlines()
new_sdp = []
for line in sdp_lines:
new_sdp.append(line)
if line.startswith("m=video"):
new_sdp.append("b=AS:4000")
new_sdp.append("b=TIAS:4000000")
answer.sdp = "\r\n".join(new_sdp)
await pc.setLocalDescription(answer)
return web.Response(content_type="application/json", text=json.dumps({
"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type
}), headers={"Access-Control-Allow-Origin": "*"})
def map_key(key):
if not key: return None
mapping = {"Enter": "Return", "ArrowUp": "Up", "ArrowDown": "Down", "ArrowLeft": "Left", "ArrowRight": "Right"}
return mapping.get(key, key)
resize_timer = None
async def handle_debounced_resize(w, h):
global resize_timer
if resize_timer: resize_timer.cancel()
async def task():
# REDUCED DELAY FROM 0.5 to 0.1 FOR FASTER RESIZING
await asyncio.sleep(0.1)
await asyncio.get_event_loop().run_in_executor(video_executor, set_resolution, w, h)
resize_timer = asyncio.create_task(task())
async def process_input(data):
try:
# Optimized parsing
msg = json.loads(data)
t = msg.get("type")
# Immediate dispatch
if t == "mousemove":
w, h = config["width"], config["height"]
input_manager.mouse_move(int(msg["x"] * w), int(msg["y"] * h))
elif t == "mousedown":
input_manager.mouse_down({0:1, 1:2, 2:3}.get(msg.get("button"), 1))
elif t == "mouseup":
input_manager.mouse_up({0:1, 1:2, 2:3}.get(msg.get("button"), 1))
elif t == "keydown":
k = map_key(msg.get("key"))
if k: input_manager.key_down(k)
elif t == "keyup":
k = map_key(msg.get("key"))
if k: input_manager.key_up(k)
elif t == "wheel":
input_manager.scroll(msg.get("deltaY", 0))
elif t == "resize":
w, h = int(msg.get("width")), int(msg.get("height"))
if w > 100 and h > 100:
await handle_debounced_resize(w, h)
except: pass
async def index(r): return web.Response(text="Optimized")
async def options(r): return web.Response(headers={"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type"})
pcs = set()
async def on_shutdown(app):
with video_lock:
if USE_CSHM:
try: xlib.cleanup()
except: pass
if pcs:
await asyncio.gather(*[pc.close() for pc in pcs])
if __name__ == "__main__":
start_system()
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_get("/", index)
app.router.add_post("/offer", offer)
app.router.add_options("/offer", options)
web.run_app(app, host=HOST, port=PORT)