""" THE Z AI — Computer Mode Server v7 — SMART CONTROL ==================================================== الإصلاحات الجوهرية في v7: 1. capture_screen_with_grid() — صورة مزدوجة: يمين نظيفة + يسار بشبكة احداثيات شفافة 2. أوامر بسيطة: open pc, open firefox, mouse go x y, click x y, rclick x y, type TEXT, key ENTER ... 3. إصلاح الكتابة العربية: xclip clipboard + xdotool key ctrl+v 4. إرسال احداثيات الشاشة الحقيقية + موقع الماوس مع كل screenshot 5. auto_shot_with_grid بعد كل خطوة 6. simple_command parser: يفهم أوامر بسيطة ويحولها لأفعال WebSocket """ import asyncio import base64 import io import json import os import re import subprocess import time import urllib.parse from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import uvicorn DISPLAY = os.environ.get("DISPLAY", ":1") os.environ["DISPLAY"] = DISPLAY # ─── تشخيص أدوات الـ capture عند البداية ───────────── def _check_capture_tools(): tools = ["scrot", "import", "xwd", "convert", "ffmpeg", "xdotool"] print("─── Capture Tools Check ───") for t in tools: r = subprocess.run(["which", t], capture_output=True, text=True) status = "✅" if r.returncode == 0 else "❌" print(f" {status} {t}: {r.stdout.strip() or 'not found'}") try: r2 = subprocess.run(["xdpyinfo", "-display", DISPLAY], capture_output=True, text=True, timeout=3) print(f" {'✅' if r2.returncode==0 else '❌'} DISPLAY={DISPLAY}: {'active' if r2.returncode==0 else 'not active'}") except Exception as ex: print(f" ❌ DISPLAY={DISPLAY}: {ex}") print("───────────────────────────") try: _check_capture_tools() except Exception as _e: print(f"[diag] {_e}") # ─── اكتشاف المتصفح ────────────────────────────────── def _detect_browser() -> str: candidates = ["firefox", "firefox-esr", "chromium-browser", "chromium", "google-chrome"] found = None for c in candidates: r = subprocess.run(["which", c], capture_output=True, text=True) if r.returncode == 0 and r.stdout.strip(): found = c break if not found: return "firefox" # تأكد من وجود firefox-esr كـ symlink esr_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True) if esr_check.returncode != 0: real_path = subprocess.run(["which", found], capture_output=True, text=True).stdout.strip() if real_path: try: subprocess.run(["ln", "-sf", real_path, "/usr/local/bin/firefox-esr"], check=True) print(f"✅ Created firefox-esr symlink → {real_path}") except Exception as e: print(f"⚠️ Could not create firefox-esr symlink: {e}") return found BROWSER = _detect_browser() print(f"🌐 Browser detected: {BROWSER}") app = FastAPI(title="Z-Computer-Mode API v7") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) active_connections: list[WebSocket] = [] stream_active = False stream_fps = 3 stream_quality = 60 stream_scale = 0.5 # ─── CURL HEADERS ──────────────────────────────────── CURL_HEADERS = ( '-H "User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ' '-H "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" ' '-H "Accept-Language: en-US,en;q=0.9,ar;q=0.8" ' '-H "Accept-Encoding: gzip, deflate" ' '--compressed ' '--max-time 20 ' '-L ' '-s ' ) # ─── مصادر البحث 8+ ────────────────────────────────── def _build_search_sources(query: str) -> list[dict]: q = urllib.parse.quote_plus(query) q_raw = query.replace(' ', '+') return [ { "name": "DuckDuckGo Instant", "cmd": f"curl -s --max-time 20 'https://api.duckduckgo.com/?q={q}&format=json&no_html=1&skip_disambig=1' | python3 -c \"import sys,json; d=json.load(sys.stdin); ans=d.get('AbstractText',''); rels=d.get('RelatedTopics',[]); print('ANSWER:',ans if ans else 'no direct answer'); [print('-',r.get('Text','')[:250]) for r in rels if isinstance(r,dict) and r.get('Text')]\"" }, { "name": "Google News RSS", "cmd": f"curl -sL {CURL_HEADERS} 'https://news.google.com/rss/search?q={q_raw}&hl=en&gl=US&ceid=US:en' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<!\\[CDATA\\[(.*?)\\]\\]>|(.*?)',xml); clean=lambda s:re.sub('<[^>]+>','',s); results=[(a or b).strip() for a,b in titles if (a or b).strip()][1:9]; [print(str(i+1)+'. '+t[:180]) for i,t in enumerate(results)]\"" }, { "name": "Wikipedia English", "cmd": f"curl -s --max-time 15 'https://en.wikipedia.org/api/rest_v1/page/summary/{q}' | python3 -c \"import sys,json; d=json.load(sys.stdin); print(d.get('title','')+'\\n'+d.get('extract','')[:1500])\" 2>/dev/null || curl -s --max-time 15 'https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={q}&format=json&srlimit=5' | python3 -c \"import sys,json,re; d=json.load(sys.stdin); [print(str(i+1)+'. '+r['title']+': '+re.sub('<[^>]+>','',r.get('snippet',''))[:200]) for i,r in enumerate(d.get('query',{{}}).get('search',[]))]\" 2>/dev/null" }, { "name": "DuckDuckGo HTML", "cmd": f"curl -sL {CURL_HEADERS} 'https://html.duckduckgo.com/html/?q={q}' | python3 -c \"import sys,re; h=sys.stdin.read(); snippets=re.findall(r'class=.result__snippet[^>]*>(.*?)',h,re.DOTALL); titles=re.findall(r'class=.result__title[^>]*>.*?]*>(.*?)',h,re.DOTALL); clean=lambda s:re.sub('<[^>]+>','',s).strip(); [print(str(i+1)+'. '+clean(titles[i] if i(.*?)',xml,re.DOTALL); [print(str(i+1)+'. '+re.sub('<[^>]+>','',re.search(r'(.*?)',it).group(1) if re.search(r'',it) else '')+'\\n '+re.sub('<[^>]+>','',re.search(r'<description>(.*?)</description>',it).group(1) if re.search(r'<description>',it) else '')[:150]) for i,it in enumerate(items[:6])]\"" }, { "name": "Reddit Search", "cmd": f"curl -sL {CURL_HEADERS} -H 'Accept: application/json' 'https://www.reddit.com/search.json?q={q}&sort=new&limit=8&type=link' | python3 -c \"import sys,json; d=json.load(sys.stdin); posts=d.get('data',{{}}).get('children',[]); [print(str(i+1)+'. '+p['data'].get('title','')+'\\n r/'+p['data'].get('subreddit','')+' Score:'+str(p['data'].get('score',''))+'\\n '+p['data'].get('selftext','')[:200]) for i,p in enumerate(posts[:6])]\"" }, { "name": "HackerNews", "cmd": f"curl -s --max-time 15 'https://hn.algolia.com/api/v1/search?query={q}&hitsPerPage=8&tags=story' | python3 -c \"import sys,json; d=json.load(sys.stdin); hits=d.get('hits',[]); [print(str(i+1)+'. '+h.get('title','')+'\\n Points:'+str(h.get('points','0'))+' | '+h.get('url','')[:80]) for i,h in enumerate(hits[:6])]\"" }, { "name": "ArXiv Academic", "cmd": f"curl -s --max-time 15 'https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=5' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title>(.*?)',xml)[1:]; summaries=re.findall(r'(.*?)',xml,re.DOTALL); [print(str(i+1)+'. '+t.strip()+'\\n '+summaries[i].strip()[:200] if i str: patterns = [ r'[?&]q=([^&\'"]+)', r'[?&]query=([^&\'"]+)', r'[?&]search_query=all:([^&\'"]+)', r"search\?q=([^&'\"\s]+)", ] for p in patterns: m = re.search(p, cmd) if m: q = m.group(1).replace('+', ' ').replace('%20', ' ') return urllib.parse.unquote(q) return "" def _is_empty_result(stdout: str) -> bool: if not stdout or len(stdout.strip()) < 10: return True empty_signals = [ "no direct answer", "no results", "0 results", "no items", "[]", "{}", "error", "not found", "answer: no", ] s = stdout.strip().lower() meaningful_lines = [l for l in s.split('\n') if l.strip() and not any(sig in l for sig in empty_signals)] return len(meaningful_lines) < 1 # ─── Screen Capture ───────────────────────────────── def _get_screen_size() -> tuple[int, int]: """يُعيد حجم الشاشة الحقيقي""" r = subprocess.run(["xdotool", "getdisplaygeometry"], env={**os.environ, "DISPLAY": DISPLAY}, capture_output=True, text=True, timeout=5) try: parts = r.stdout.strip().split() return int(parts[0]), int(parts[1]) except: return 1920, 1080 def _get_mouse_pos() -> tuple[int, int]: """يُعيد موقع الماوس الحالي""" r = subprocess.run(["xdotool", "getmouselocation"], env={**os.environ, "DISPLAY": DISPLAY}, capture_output=True, text=True, timeout=5) try: mx = int(re.search(r"x:(\d+)", r.stdout).group(1)) my = int(re.search(r"y:(\d+)", r.stdout).group(1)) return mx, my except: return 0, 0 def capture_screen_raw() -> tuple[object, int, int]: """ يلتقط الشاشة بـ 5 طرق متسلسلة — تضمن نجاح واحدة: 1. scrot (الأسرع) 2. import (ImageMagick) 3. xwd + convert 4. ffmpeg + x11grab 5. python-xlib / Xlib مباشرة """ from PIL import Image env = {**os.environ, "DISPLAY": DISPLAY} tmp = f"/tmp/zs_{int(time.time()*1000)}" def _load(path) -> tuple: """تحميل الصورة إذا وُجدت وحجمها > 0""" if not path or not os.path.exists(path): return None, 0, 0 size = os.path.getsize(path) if size < 512: # صورة فارغة/تالفة return None, 0, 0 try: img = Image.open(path).convert("RGB") w, h = img.size if w < 10 or h < 10: return None, 0, 0 try: os.unlink(path) except: pass return img, w, h except Exception as ex: print(f"[_load] {ex}") return None, 0, 0 # ── طريقة 1: scrot ────────────────────────────────── try: p = tmp + "_1.png" r = subprocess.run( ["scrot", "-q", "95", p], env=env, timeout=8, capture_output=True ) img, w, h = _load(p) if img: print("[capture] ✅ scrot") return img, w, h print(f"[capture] scrot failed rc={r.returncode} err={r.stderr[:80]}") except Exception as e: print(f"[capture] scrot ex: {e}") # ── طريقة 2: ImageMagick import ───────────────────── try: p = tmp + "_2.png" r = subprocess.run( ["import", "-window", "root", "-silent", p], env=env, timeout=10, capture_output=True ) img, w, h = _load(p) if img: print("[capture] ✅ import (ImageMagick)") return img, w, h print(f"[capture] import failed rc={r.returncode}") except Exception as e: print(f"[capture] import ex: {e}") # ── طريقة 3: xwd + convert ────────────────────────── try: xwd_p = tmp + "_3.xwd" png_p = tmp + "_3.png" r1 = subprocess.run( ["xwd", "-root", "-silent", "-out", xwd_p], env=env, timeout=10, capture_output=True ) r2 = subprocess.run( ["convert", xwd_p, png_p], timeout=10, capture_output=True ) try: os.unlink(xwd_p) except: pass img, w, h = _load(png_p) if img: print("[capture] ✅ xwd+convert") return img, w, h print(f"[capture] xwd rc={r1.returncode} convert rc={r2.returncode}") except Exception as e: print(f"[capture] xwd ex: {e}") # ── طريقة 4: ffmpeg x11grab ───────────────────────── try: p = tmp + "_4.png" sw, sh = _get_screen_size() r = subprocess.run([ "ffmpeg", "-y", "-f", "x11grab", "-video_size", f"{sw}x{sh}", "-i", DISPLAY, "-vframes", "1", "-q:v", "2", p ], env=env, timeout=12, capture_output=True) img, w, h = _load(p) if img: print("[capture] ✅ ffmpeg x11grab") return img, w, h print(f"[capture] ffmpeg rc={r.returncode} err={r.stderr[-120:]}") except Exception as e: print(f"[capture] ffmpeg ex: {e}") # ── طريقة 5: python-xlib (Xlib مباشرة) ────────────── try: from Xlib import display as Xdisplay from Xlib.ext.xtest import fake_input xdisp = Xdisplay.Display(DISPLAY) root = xdisp.screen().root geom = root.get_geometry() w, h = geom.width, geom.height raw = root.get_image(0, 0, w, h, Xdisplay.X.ZPixmap, 0xFFFFFFFF) import struct data = raw.data # BGRA → RGB pixels = [] for i in range(0, len(data), 4): b, g, r_, a = struct.unpack_from('BBBB', data, i) pixels.extend([r_, g, b]) img = Image.frombytes("RGB", (w, h), bytes(pixels)) print("[capture] ✅ python-xlib") return img, w, h except Exception as e: print(f"[capture] xlib ex: {e}") # ── طريقة 6: صورة placeholder واضحة ──────────────── print("[capture] ⚠️ ALL methods failed — returning placeholder") try: from PIL import ImageDraw sw, sh = _get_screen_size() img = Image.new("RGB", (sw or 1280, sh or 720), (20, 20, 30)) draw = ImageDraw.Draw(img) draw.rectangle([(0, 0), (sw, 40)], fill=(30, 30, 50)) draw.text((10, 10), f"⚠️ Screenshot failed — DISPLAY={DISPLAY}", fill=(255, 100, 100)) draw.text((10, 50), "Methods tried: scrot, import, xwd, ffmpeg, xlib", fill=(150, 150, 150)) return img, sw or 1280, sh or 720 except: return None, 0, 0 def capture_screen(scale=0.6, quality=70) -> str: """يلتقط الشاشة العادية بدون grid""" img, w, h = capture_screen_raw() if img is None: return "" try: if scale < 1.0: img = img.resize((int(w * scale), int(h * scale)), img.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=quality, optimize=True) return base64.b64encode(buf.getvalue()).decode() except Exception as e: print(f"[capture] {e}") return "" def capture_screen_with_grid(scale=0.65, quality=72, force_mx: int | None = None, force_my: int | None = None) -> dict: """ يلتقط الشاشة ويُعيد صورة واحدة فقط مع شبكة إحداثيات واضحة. الإحداثيات في الصورة تطابق إحداثيات الشاشة الحقيقية (1:1 mapping). الخطوط كل 100px — الأرقام تُظهر الـ X/Y الحقيقي للنقر. force_mx / force_my: إذا مُرِّرا يُرسم الـ cursor في هذا الموضع مباشرةً بدون استعلام xdotool — يُعالج مشكلة race condition بعد mouse_move. """ try: from PIL import Image, ImageDraw img, orig_w, orig_h = capture_screen_raw() if img is None: return {"data": "", "width": 1920, "height": 1080, "mouse_x": 0, "mouse_y": 0} # استخدم الإحداثيات المُمرَّرة إذا وُجدت (بعد mouse_move/click مباشرة) # وإلا اقرأ من X server if force_mx is not None and force_my is not None: mx, my = force_mx, force_my else: mx, my = _get_mouse_pos() # resize للعرض فقط — الإحداثيات تبقى للشاشة الأصلية sw = int(orig_w * scale) sh = int(orig_h * scale) grid_img = img.resize((sw, sh), Image.LANCZOS) draw = ImageDraw.Draw(grid_img, "RGBA") # شبكة كل 100px بإحداثيات الشاشة الحقيقية step_orig = 100 step_scaled_x = int(step_orig * sw / orig_w) step_scaled_y = int(step_orig * sh / orig_h) grid_color = (255, 255, 255, 45) # خطوط بيضاء شفافة label_color = (0, 255, 180, 200) # أرقام خضراء واضحة # خطوط عمودية + أرقام X الحقيقية x_sc = step_scaled_x x_real = step_orig while x_sc < sw: draw.line([(x_sc, 0), (x_sc, sh)], fill=grid_color, width=1) draw.rectangle([(x_sc + 1, 2), (x_sc + 32, 14)], fill=(0, 0, 0, 160)) draw.text((x_sc + 2, 3), str(x_real), fill=label_color) x_sc += step_scaled_x x_real += step_orig # خطوط أفقية + أرقام Y الحقيقية y_sc = step_scaled_y y_real = step_orig while y_sc < sh: draw.line([(0, y_sc), (sw, y_sc)], fill=grid_color, width=1) draw.rectangle([(2, y_sc + 1), (36, y_sc + 13)], fill=(0, 0, 0, 160)) draw.text((3, y_sc + 2), str(y_real), fill=label_color) y_sc += step_scaled_y y_real += step_orig # موقع الماوس الحقيقي — دائرة حمراء mouse_sx = int(mx * sw / orig_w) mouse_sy = int(my * sh / orig_h) r = 10 draw.ellipse([(mouse_sx-r, mouse_sy-r), (mouse_sx+r, mouse_sy+r)], outline=(255, 60, 60, 240), width=2) draw.line([(mouse_sx-16, mouse_sy), (mouse_sx+16, mouse_sy)], fill=(255, 60, 60, 200), width=1) draw.line([(mouse_sx, mouse_sy-16), (mouse_sx, mouse_sy+16)], fill=(255, 60, 60, 200), width=1) # شريط معلومات في الأعلى final = grid_img.convert("RGB") draw2 = ImageDraw.Draw(final) draw2.rectangle([(0, 0), (sw, 18)], fill=(0, 0, 0)) hdr = (f"SCREEN {orig_w}x{orig_h} | MOUSE:({mx},{my}) | " f"GRID=100px | CLICK COORDS = numbers on grid lines") draw2.text((4, 2), hdr, fill=(0, 220, 160)) # شريط معلومات في الأسفل draw2.rectangle([(0, sh-18), (sw, sh)], fill=(0, 0, 0)) draw2.text((4, sh-16), f"USE REAL COORDS: e.g. click x=500 y=300 means the '500' vertical line + '300' horizontal line", fill=(180, 180, 80)) buf = io.BytesIO() final.save(buf, format="JPEG", quality=quality, optimize=True) data = base64.b64encode(buf.getvalue()).decode() return { "data": data, "width": orig_w, "height": orig_h, "mouse_x": mx, "mouse_y": my, } except Exception as e: print(f"[capture_grid] {e}") plain = capture_screen(scale=scale, quality=quality) mx, my = _get_mouse_pos() w, h = _get_screen_size() return {"data": plain, "width": w, "height": h, "mouse_x": mx, "mouse_y": my} # ─── Command Runner ────────────────────────────────── def run_raw_command(cmd: str, timeout: int = 60) -> dict: env = {**os.environ, "DISPLAY": DISPLAY, "PYTHONIOENCODING": "utf-8", "LANG": "en_US.UTF-8"} try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout, env=env, executable="/bin/bash") return { "stdout": result.stdout[-15000:], "stderr": result.stderr[-3000:], "returncode": result.returncode, } except subprocess.TimeoutExpired: return {"stdout": "", "stderr": f"⏱️ Timeout {timeout}s", "returncode": -1} except Exception as e: return {"stdout": "", "stderr": str(e), "returncode": -1} def run_command_smart(cmd: str, timeout: int = 60) -> dict: res = run_raw_command(cmd, timeout=timeout) stdout = res["stdout"].strip() if res["returncode"] == 0 and not _is_empty_result(stdout): return res is_curl_search = "curl" in cmd and any(x in cmd for x in [ "duckduckgo", "google", "bing", "wikipedia", "reddit", "hackernews", "hn.algolia", "arxiv", "news", "search" ]) if not is_curl_search: return res query = _extract_query_from_cmd(cmd) if not query or len(query) < 3: words = [w for w in cmd.split() if len(w) > 3 and not w.startswith('-') and 'http' not in w and 'python3' not in w and 'curl' not in w] query = ' '.join(words[:5]) if words else "" if not query: return res sources = _build_search_sources(query) tried_names = [] all_results = [] for source in sources: src_name = source["name"] tried_names.append(f"🔍 {src_name}") src_res = run_raw_command(source["cmd"], timeout=25) src_out = src_res["stdout"].strip() if not _is_empty_result(src_out): combined_header = f"[مصدر بديل: {src_name}]\n{'='*50}\n" all_results.append(src_out) for source2 in sources: if source2["name"] != src_name: s2 = run_raw_command(source2["cmd"], timeout=20) s2_out = s2["stdout"].strip() if not _is_empty_result(s2_out): all_results.append(f"\n[مصدر إضافي: {source2['name']}]\n{s2_out}") break final_out = combined_header + "\n\n".join(all_results) return { "stdout": final_out[:15000], "stderr": "", "returncode": 0, "_sources_tried": tried_names, "_fallback_used": src_name, } return { "stdout": f"[لم تُرجع أي مصادر نتائج لـ: {query}]\nالمصادر: {', '.join(tried_names[:5])}", "stderr": res["stderr"], "returncode": -1, "_sources_tried": tried_names, } def xdo(args: list, timeout=10) -> dict: r = subprocess.run(["xdotool"] + args, env={**os.environ, "DISPLAY": DISPLAY}, timeout=timeout, capture_output=True, text=True) return {"rc": r.returncode, "out": r.stdout, "err": r.stderr} def type_text_smart(text: str) -> dict: """ كتابة نص ذكية — تدعم العربية والإنجليزية: - للنصوص الإنجليزية: xdotool type مباشرة - للنصوص العربية أو المختلطة: xclip clipboard ثم ctrl+v """ has_arabic = bool(re.search(r'[\u0600-\u06FF]', text)) if has_arabic: # طريقة Clipboard لضمان كتابة العربية بشكل صحيح try: proc = subprocess.Popen( ["xclip", "-selection", "clipboard"], stdin=subprocess.PIPE, env={**os.environ, "DISPLAY": DISPLAY} ) proc.communicate(text.encode("utf-8")) time.sleep(0.15) # Focus + paste xdo(["key", "--clearmodifiers", "ctrl+v"]) return {"success": True, "method": "clipboard+paste", "arabic": True} except Exception as e: # fallback: xdotool type r = xdo(["type", "--clearmodifiers", "--delay", "50", text]) return {"success": r["rc"] == 0, "method": "xdotool_fallback", "error": r["err"]} else: # إنجليزية: xdotool type مباشرة r = xdo(["type", "--clearmodifiers", "--delay", "30", text]) return {"success": r["rc"] == 0, "method": "xdotool_direct"} def open_browser_smart(url: str = "") -> str: """يفتح المتصفح بطريقة ذكية مع fallback""" browser_cmd = BROWSER if not url: url = "about:blank" # جرب أولاً: BROWSER العادي cmd = f"{browser_cmd} --new-window '{url}' &" proc = subprocess.Popen(cmd, shell=True, env={**os.environ, "DISPLAY": DISPLAY}, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return f"{browser_cmd} → {url}" # ─── Simple Command Parser ───────────────────────────── # يفهم أوامر بسيطة ويحولها لأفعال داخلية def parse_simple_command(raw: str) -> dict | None: """ يُحلّل الأوامر البسيطة النصية وتُعيد action dict أمثلة: open pc → {"action":"screenshot"} open firefox → {"action":"open_browser","url":""} open chrome → {"action":"open_browser","url":""} open browser → {"action":"open_browser","url":""} open url https://google.com → {"action":"open_browser","url":"https://..."} mouse go x500 y300 → {"action":"mouse_move","x":500,"y":300} mouse go 500 300 → {"action":"mouse_move","x":500,"y":300} click x500 y300 → {"action":"mouse_click","x":500,"y":300,"button":"left"} click 500 300 → {"action":"mouse_click","x":500,"y":300,"button":"left"} rclick x500 y300 → {"action":"mouse_click","x":500,"y":300,"button":"right"} dclick x500 y300 → {"action":"mouse_click","x":500,"y":300,"double":true} type hello world → {"action":"keyboard_type","text":"hello world"} key enter → {"action":"keyboard_hotkey","keys":["Return"]} key ctrl+c → {"action":"keyboard_hotkey","keys":["ctrl","c"]} scroll up → {"action":"scroll","x":960,"y":540,"clicks":3} scroll down → {"action":"scroll","x":960,"y":540,"clicks":-3} screenshot → {"action":"screenshot"} screen info → {"action":"screen_info"} """ s = raw.strip().lower() # open pc / open computer / open screen if re.match(r'^open\s+(pc|computer|screen|desktop)$', s): return {"action": "screenshot"} # open firefox / chrome / browser / browser url m = re.match(r'^open\s+(firefox|chrome|chromium|browser|web|internet)(\s+(.+))?$', s) if m: url = (m.group(3) or "").strip() if url and not url.startswith("http"): url = "https://" + url return {"action": "open_browser", "url": url or ""} # open url m = re.match(r'^open\s+url\s+(\S+)$', s) if m: url = m.group(1) if not url.startswith("http"): url = "https://" + url return {"action": "open_browser", "url": url} # mouse go x500 y300 OR mouse go 500 300 m = re.match(r'^mouse\s+(?:go|move|to)\s+x?(\d+)\s+y?(\d+)$', s) if m: return {"action": "mouse_move", "x": int(m.group(1)), "y": int(m.group(2))} # click x500 y300 OR click 500 300 m = re.match(r'^click\s+x?(\d+)\s+y?(\d+)$', s) if m: return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "left"} # rclick / right click m = re.match(r'^r(?:ight)?click\s+x?(\d+)\s+y?(\d+)$', s) if m: return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "right"} # dclick / double click m = re.match(r'^d(?:ouble)?click\s+x?(\d+)\s+y?(\d+)$', s) if m: return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "left", "double": True} # type (يحتفظ بالحالة الأصلية) m = re.match(r'^type\s+(.+)$', raw.strip(), re.IGNORECASE) if m: return {"action": "keyboard_type", "text": m.group(1)} # key m = re.match(r'^key\s+(.+)$', s) if m: k = m.group(1).strip() key_map = { "enter": "Return", "return": "Return", "esc": "Escape", "escape": "Escape", "tab": "Tab", "space": "space", "backspace": "BackSpace", "delete": "Delete", "up": "Up", "down": "Down", "left": "Left", "right": "Right", "home": "Home", "end": "End", "pageup": "Prior", "pagedown": "Next", "f1": "F1", "f2": "F2", "f3": "F3", "f4": "F4", "f5": "F5", } if '+' in k: parts = [p.strip() for p in k.split('+')] keys = [key_map.get(p, p) for p in parts] else: keys = [key_map.get(k, k)] return {"action": "keyboard_hotkey", "keys": keys} # scroll up / scroll down / scroll m = re.match(r'^scroll\s+(up|down|(\-?\d+))$', s) if m: w, h = _get_screen_size() direction = m.group(1) if direction == "up": clicks = 4 elif direction == "down": clicks = -4 else: clicks = int(direction) return {"action": "scroll", "x": w // 2, "y": h // 2, "clicks": clicks} # screenshot if s in ("screenshot", "screen", "ss", "snap"): return {"action": "screenshot"} # screen info if re.match(r'^screen\s+info$', s): return {"action": "screen_info"} return None # لم يُعرف async def broadcast(msg: dict): txt = json.dumps(msg, ensure_ascii=False) dead = [] for ws in active_connections: try: await ws.send_text(txt) except: dead.append(ws) for ws in dead: if ws in active_connections: active_connections.remove(ws) async def screen_stream_loop(): global stream_active interval = 1.0 / max(1, stream_fps) while stream_active and active_connections: try: frame = capture_screen(scale=stream_scale, quality=stream_quality) if frame: await broadcast({"type": "frame", "data": frame}) except: pass await asyncio.sleep(interval) stream_active = False # ─── Action Handler ────────────────────────────────── async def handle_action(ws: WebSocket, msg: dict): action = msg.get("action", "") data = msg.get("data", {}) async def send(obj): await ws.send_text(json.dumps(obj, ensure_ascii=False)) async def auto_shot_grid(label="", delay=0.5, force_mx: int | None = None, force_my: int | None = None): """ screenshot مع grid. force_mx/force_my: ارسم الـ cursor في هذا الموضع بالضبط (بعد mouse_move/click) بدلاً من إعادة قراءته من X server — يُصلح race condition. """ await asyncio.sleep(delay) result = capture_screen_with_grid(scale=0.65, quality=72, force_mx=force_mx, force_my=force_my) if result["data"]: await send({ "type": "screenshot", "data": result["data"], "ts": int(time.time() * 1000), "auto": True, "label": label, "screen_width": result["width"], "screen_height": result["height"], "mouse_x": result["mouse_x"], "mouse_y": result["mouse_y"], "has_grid": True, }) return result["data"] # ── screenshot ──────────────────────────────────── if action == "screenshot": result = capture_screen_with_grid(scale=0.65, quality=75) await send({ "type": "screenshot", "data": result["data"], "ts": int(time.time() * 1000), "screen_width": result["width"], "screen_height": result["height"], "mouse_x": result["mouse_x"], "mouse_y": result["mouse_y"], "has_grid": True, }) # ── simple_command — الأوامر البسيطة ────────────── elif action == "simple_command": raw_cmd = data.get("cmd", "").strip() parsed = parse_simple_command(raw_cmd) if parsed is None: await send({ "type": "simple_command_result", "ok": False, "cmd": raw_cmd, "error": f"أمر غير معروف: '{raw_cmd}'\nالأوامر المتاحة: open pc, open firefox, mouse go x y, click x y, rclick x y, type TEXT, key ENTER, scroll up/down, screenshot" }) return # نفّذ الأمر المُحلّل await send({"type": "simple_command_result", "ok": True, "cmd": raw_cmd, "parsed": parsed}) # أعِد تشغيل نفس الـ handle_action مع الأمر المُحلّل sub_msg = {"action": parsed["action"], "data": {k: v for k, v in parsed.items() if k != "action"}} await handle_action(ws, sub_msg) return # ── terminal ────────────────────────────────────── elif action == "terminal": cmd = data.get("cmd", "") timeout = int(data.get("timeout", 60)) if not cmd: await send({"type": "terminal_result", "cmd": "", "stdout": "", "stderr": "no command", "returncode": -1}) return res = run_command_smart(cmd, timeout=timeout) await send({ "type": "terminal_result", "cmd": cmd, "stdout": res["stdout"], "stderr": res.get("stderr", ""), "returncode": res["returncode"], "fallback_used": res.get("_fallback_used", None), "sources_tried": res.get("_sources_tried", []), }) await auto_shot_grid(f"بعد: {cmd[:45]}", delay=0.4) # ── mouse_move ──────────────────────────────────── elif action == "mouse_move": x, y = int(data.get("x", 0)), int(data.get("y", 0)) # --sync ينتظر حتى يؤكد X server استلام الأمر xdo(["mousemove", "--sync", str(x), str(y)]) await send({"type": "ack", "action": "mouse_move", "x": x, "y": y}) # مرّر الإحداثيات مباشرةً لتجنّب race condition مع X server await auto_shot_grid(f"ماوس → ({x},{y})", delay=0.2, force_mx=x, force_my=y) # ── mouse_click ─────────────────────────────────── elif action == "mouse_click": x, y = int(data.get("x", 0)), int(data.get("y", 0)) btn_num = {"left": "1", "middle": "2", "right": "3"}.get(data.get("button", "left"), "1") xdo(["mousemove", "--sync", str(x), str(y)]) await asyncio.sleep(0.08) if data.get("double"): xdo(["click", "--repeat", "2", "--delay", "100", btn_num]) else: xdo(["click", btn_num]) btn_name = {"1": "left", "2": "middle", "3": "right"}.get(btn_num, "left") await send({"type": "ack", "action": "mouse_click", "x": x, "y": y, "button": btn_name}) # مرّر الإحداثيات مباشرةً لتجنّب race condition await auto_shot_grid(f"نقر {btn_name} → ({x},{y})", delay=0.5, force_mx=x, force_my=y) # ── keyboard_type ───────────────────────────────── elif action == "keyboard_type": text = data.get("text", "") if text: result = type_text_smart(text) await send({"type": "ack", "action": "keyboard_type", "method": result["method"], "text_len": len(text)}) else: await send({"type": "ack", "action": "keyboard_type"}) await auto_shot_grid("بعد الكتابة", delay=0.5) # ── keyboard_hotkey ─────────────────────────────── elif action == "keyboard_hotkey": keys = data.get("keys", []) if keys: xdo(["key", "--clearmodifiers", "+".join(keys)]) await send({"type": "ack", "action": "keyboard_hotkey", "keys": keys}) await auto_shot_grid("بعد الاختصار", delay=0.5) # ── keyboard_press ──────────────────────────────── elif action == "keyboard_press": key = data.get("key", "") if key: xdo(["key", "--clearmodifiers", key]) await send({"type": "ack", "action": "keyboard_press"}) await auto_shot_grid("بعد المفتاح", delay=0.4) # ── clipboard_write ─────────────────────────────── elif action == "clipboard_write": text = data.get("text", "") try: proc = subprocess.Popen( ["xclip", "-selection", "clipboard"], stdin=subprocess.PIPE, env={**os.environ, "DISPLAY": DISPLAY} ) proc.communicate(text.encode("utf-8")) await send({"type": "ack", "action": "clipboard_write", "length": len(text)}) except Exception as e: await send({"type": "error", "action": "clipboard_write", "msg": str(e)}) # ── clipboard_read ──────────────────────────────── elif action == "clipboard_read": res = run_raw_command("xclip -selection clipboard -o", timeout=5) await send({"type": "clipboard_content", "text": res["stdout"]}) # ── scroll ──────────────────────────────────────── elif action == "scroll": x, y = int(data.get("x", 0)), int(data.get("y", 0)) clicks = int(data.get("clicks", 3)) btn = "4" if clicks > 0 else "5" xdo(["mousemove", str(x), str(y)]) for _ in range(abs(clicks)): xdo(["click", btn]) await asyncio.sleep(0.03) await send({"type": "ack", "action": "scroll", "clicks": clicks}) await auto_shot_grid("بعد التمرير", delay=0.4) # ── open_app ────────────────────────────────────── elif action == "open_app": app_cmd = data.get("cmd", "") if app_cmd: fixed_cmd = app_cmd # إصلاح firefox-esr if "firefox-esr" in app_cmd: esr_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True) if esr_check.returncode != 0: fixed_cmd = app_cmd.replace("firefox-esr", BROWSER) subprocess.Popen(fixed_cmd, shell=True, env={**os.environ, "DISPLAY": DISPLAY}, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) await send({"type": "ack", "action": "open_app", "cmd": fixed_cmd}) wait_time = 7.0 if any(x in app_cmd for x in ["firefox", "chromium", "chrome"]) else 4.0 await asyncio.sleep(2.0) await auto_shot_grid(f"فتح: {fixed_cmd[:40]} (2s)", delay=0) await asyncio.sleep(wait_time - 2.0) await auto_shot_grid(f"بعد تحميل: {fixed_cmd[:40]}", delay=0) else: await send({"type": "ack", "action": "open_app"}) # ── open_browser — فتح المتصفح ──────────────────── elif action == "open_browser": url = data.get("url", "") if not url: url = "about:blank" browser_result = open_browser_smart(url) await send({"type": "ack", "action": "open_browser", "result": browser_result}) await asyncio.sleep(2.0) await auto_shot_grid(f"فتح المتصفح: {url[:50]}", delay=0) await asyncio.sleep(5.0) await auto_shot_grid("بعد تحميل المتصفح", delay=0) # ── mouse_drag ──────────────────────────────────── elif action == "mouse_drag": x1, y1 = int(data.get("x1", 0)), int(data.get("y1", 0)) x2, y2 = int(data.get("x2", 0)), int(data.get("y2", 0)) xdo(["mousemove", str(x1), str(y1)]) xdo(["mousedown", "1"]) await asyncio.sleep(0.1) xdo(["mousemove", str(x2), str(y2)]) await asyncio.sleep(0.1) xdo(["mouseup", "1"]) await send({"type": "ack", "action": "mouse_drag"}) # cursor ينتهي عند x2,y2 await auto_shot_grid("بعد السحب", delay=0.4, force_mx=x2, force_my=y2) # ── start_stream ────────────────────────────────── elif action == "start_stream": global stream_active, stream_fps, stream_quality, stream_scale stream_fps = int(data.get("fps", 3)) stream_quality = int(data.get("quality", 60)) stream_scale = float(data.get("scale", 0.5)) if not stream_active: stream_active = True asyncio.create_task(screen_stream_loop()) await send({"type": "ack", "action": "start_stream"}) # ── stop_stream ─────────────────────────────────── elif action == "stop_stream": stream_active = False await send({"type": "ack", "action": "stop_stream"}) # ── screen_info ─────────────────────────────────── elif action == "screen_info": w, h = _get_screen_size() mx, my = _get_mouse_pos() await send({ "type": "screen_info", "width": w, "height": h, "mouse_x": mx, "mouse_y": my, "browser": BROWSER, }) # ── paste ───────────────────────────────────────── elif action == "paste": text = data.get("text", "") if text: proc = subprocess.Popen( ["xclip", "-selection", "clipboard"], stdin=subprocess.PIPE, env={**os.environ, "DISPLAY": DISPLAY} ) proc.communicate(text.encode("utf-8")) await asyncio.sleep(0.1) xdo(["key", "--clearmodifiers", "ctrl+v"]) await send({"type": "ack", "action": "paste"}) await auto_shot_grid("بعد اللصق", delay=0.5) # ── unknown ─────────────────────────────────────── else: await send({ "type": "error", "msg": f"Unknown action: '{action}'. Available: screenshot, terminal, mouse_move, mouse_click, mouse_drag, keyboard_type, keyboard_hotkey, keyboard_press, clipboard_write, clipboard_read, scroll, open_app, open_browser, paste, simple_command, screen_info, start_stream, stop_stream" }) # ─── WebSocket ─────────────────────────────────────── @app.websocket("/ws") async def websocket_endpoint(ws: WebSocket): await ws.accept() active_connections.append(ws) w, h = _get_screen_size() await ws.send_text(json.dumps({ "type": "connected", "screen_width": w, "screen_height": h, "browser": BROWSER, "msg": f"Z Computer Mode v7 — Smart Control | Browser: {BROWSER} | Screen: {w}x{h}", "commands": [ "screenshot", "terminal {cmd}", "mouse_move {x,y}", "mouse_click {x,y,button}", "keyboard_type {text}", "keyboard_hotkey {keys}", "scroll {x,y,clicks}", "open_app {cmd}", "open_browser {url}", "simple_command {cmd}", "Simple cmds: 'open firefox', 'mouse go x y', 'click x y', 'type TEXT', 'key enter'" ] }, ensure_ascii=False)) # أرسل screenshot أولية مع grid result = capture_screen_with_grid(scale=0.65, quality=72) if result["data"]: await ws.send_text(json.dumps({ "type": "screenshot", "data": result["data"], "ts": int(time.time() * 1000), "label": "الشاشة الأولية", "screen_width": result["width"], "screen_height": result["height"], "mouse_x": result["mouse_x"], "mouse_y": result["mouse_y"], "has_grid": True, }, ensure_ascii=False)) try: while True: raw = await ws.receive_text() await handle_action(ws, json.loads(raw)) except WebSocketDisconnect: pass except Exception as e: print(f"[ws] {e}") finally: if ws in active_connections: active_connections.remove(ws) # ─── REST ──────────────────────────────────────────── @app.get("/screenshot") async def rest_screenshot(): result = capture_screen_with_grid(scale=0.7, quality=73) return JSONResponse({ "image": result["data"], "ts": int(time.time() * 1000), "screen_width": result["width"], "screen_height": result["height"], "mouse_x": result["mouse_x"], "mouse_y": result["mouse_y"], "has_grid": True, }) @app.get("/screenshot/clean") async def rest_screenshot_clean(): return JSONResponse({"image": capture_screen(0.75, 75), "ts": int(time.time() * 1000)}) @app.post("/terminal") async def rest_terminal(body: dict): return JSONResponse(run_command_smart(body.get("cmd", ""), body.get("timeout", 60))) @app.post("/simple") async def rest_simple(body: dict): """تنفيذ أمر بسيط عبر REST""" raw = body.get("cmd", "").strip() parsed = parse_simple_command(raw) if not parsed: return JSONResponse({"ok": False, "error": f"Unknown simple command: {raw}"}, status_code=400) return JSONResponse({"ok": True, "parsed": parsed}) @app.get("/health") async def health(): w, h = _get_screen_size() mx, my = _get_mouse_pos() browser_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True) return { "status": "ok", "version": "v7-smart-control", "browser": BROWSER, "firefox_esr_available": browser_check.returncode == 0, "screen_width": w, "screen_height": h, "mouse_x": mx, "mouse_y": my, } @app.get("/search/{query}") async def quick_search(query: str): res = run_command_smart( f"curl -s --max-time 15 'https://api.duckduckgo.com/?q={urllib.parse.quote_plus(query)}&format=json&no_html=1'", timeout=30 ) return JSONResponse(res) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) uvicorn.run("app:app", host="0.0.0.0", port=port, log_level="info")