Spaces:
Paused
Paused
| """ | |
| THE Z AI — Computer Mode Server v7 — SMART CONTROL | |
| ==================================================== | |
| الإصلاحات الجوهرية في v7: | |
| 1. capture_screen_with_grid() — صورة مزدوجة: يمين نظيفة + يسار بشبكة احداثيات شفافة | |
| 2. أوامر بسيطة: open pc, open firefox, mouse go x y, click x y, rclick x y, type TEXT, key ENTER ... | |
| 3. إصلاح الكتابة العربية: xclip clipboard + xdotool key ctrl+v | |
| 4. إرسال احداثيات الشاشة الحقيقية + موقع الماوس مع كل screenshot | |
| 5. auto_shot_with_grid بعد كل خطوة | |
| 6. simple_command parser: يفهم أوامر بسيطة ويحولها لأفعال WebSocket | |
| """ | |
| import asyncio | |
| import base64 | |
| import io | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import time | |
| import urllib.parse | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| DISPLAY = os.environ.get("DISPLAY", ":1") | |
| os.environ["DISPLAY"] = DISPLAY | |
| # ─── تشخيص أدوات الـ capture عند البداية ───────────── | |
| def _check_capture_tools(): | |
| tools = ["scrot", "import", "xwd", "convert", "ffmpeg", "xdotool"] | |
| print("─── Capture Tools Check ───") | |
| for t in tools: | |
| r = subprocess.run(["which", t], capture_output=True, text=True) | |
| status = "✅" if r.returncode == 0 else "❌" | |
| print(f" {status} {t}: {r.stdout.strip() or 'not found'}") | |
| try: | |
| r2 = subprocess.run(["xdpyinfo", "-display", DISPLAY], | |
| capture_output=True, text=True, timeout=3) | |
| print(f" {'✅' if r2.returncode==0 else '❌'} DISPLAY={DISPLAY}: {'active' if r2.returncode==0 else 'not active'}") | |
| except Exception as ex: | |
| print(f" ❌ DISPLAY={DISPLAY}: {ex}") | |
| print("───────────────────────────") | |
| try: | |
| _check_capture_tools() | |
| except Exception as _e: | |
| print(f"[diag] {_e}") | |
| # ─── اكتشاف المتصفح ────────────────────────────────── | |
| def _detect_browser() -> str: | |
| candidates = ["firefox", "firefox-esr", "chromium-browser", "chromium", "google-chrome"] | |
| found = None | |
| for c in candidates: | |
| r = subprocess.run(["which", c], capture_output=True, text=True) | |
| if r.returncode == 0 and r.stdout.strip(): | |
| found = c | |
| break | |
| if not found: | |
| return "firefox" | |
| # تأكد من وجود firefox-esr كـ symlink | |
| esr_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True) | |
| if esr_check.returncode != 0: | |
| real_path = subprocess.run(["which", found], capture_output=True, text=True).stdout.strip() | |
| if real_path: | |
| try: | |
| subprocess.run(["ln", "-sf", real_path, "/usr/local/bin/firefox-esr"], check=True) | |
| print(f"✅ Created firefox-esr symlink → {real_path}") | |
| except Exception as e: | |
| print(f"⚠️ Could not create firefox-esr symlink: {e}") | |
| return found | |
| BROWSER = _detect_browser() | |
| print(f"🌐 Browser detected: {BROWSER}") | |
| app = FastAPI(title="Z-Computer-Mode API v7") | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, | |
| allow_methods=["*"], allow_headers=["*"]) | |
| active_connections: list[WebSocket] = [] | |
| stream_active = False | |
| stream_fps = 3 | |
| stream_quality = 60 | |
| stream_scale = 0.5 | |
| # ─── CURL HEADERS ──────────────────────────────────── | |
| CURL_HEADERS = ( | |
| '-H "User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' | |
| '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ' | |
| '-H "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" ' | |
| '-H "Accept-Language: en-US,en;q=0.9,ar;q=0.8" ' | |
| '-H "Accept-Encoding: gzip, deflate" ' | |
| '--compressed ' | |
| '--max-time 20 ' | |
| '-L ' | |
| '-s ' | |
| ) | |
| # ─── مصادر البحث 8+ ────────────────────────────────── | |
| def _build_search_sources(query: str) -> list[dict]: | |
| q = urllib.parse.quote_plus(query) | |
| q_raw = query.replace(' ', '+') | |
| return [ | |
| { | |
| "name": "DuckDuckGo Instant", | |
| "cmd": f"curl -s --max-time 20 'https://api.duckduckgo.com/?q={q}&format=json&no_html=1&skip_disambig=1' | python3 -c \"import sys,json; d=json.load(sys.stdin); ans=d.get('AbstractText',''); rels=d.get('RelatedTopics',[]); print('ANSWER:',ans if ans else 'no direct answer'); [print('-',r.get('Text','')[:250]) for r in rels if isinstance(r,dict) and r.get('Text')]\"" | |
| }, | |
| { | |
| "name": "Google News RSS", | |
| "cmd": f"curl -sL {CURL_HEADERS} 'https://news.google.com/rss/search?q={q_raw}&hl=en&gl=US&ceid=US:en' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title><!\\[CDATA\\[(.*?)\\]\\]></title>|<title>(.*?)</title>',xml); clean=lambda s:re.sub('<[^>]+>','',s); results=[(a or b).strip() for a,b in titles if (a or b).strip()][1:9]; [print(str(i+1)+'. '+t[:180]) for i,t in enumerate(results)]\"" | |
| }, | |
| { | |
| "name": "Wikipedia English", | |
| "cmd": f"curl -s --max-time 15 'https://en.wikipedia.org/api/rest_v1/page/summary/{q}' | python3 -c \"import sys,json; d=json.load(sys.stdin); print(d.get('title','')+'\\n'+d.get('extract','')[:1500])\" 2>/dev/null || curl -s --max-time 15 'https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={q}&format=json&srlimit=5' | python3 -c \"import sys,json,re; d=json.load(sys.stdin); [print(str(i+1)+'. '+r['title']+': '+re.sub('<[^>]+>','',r.get('snippet',''))[:200]) for i,r in enumerate(d.get('query',{{}}).get('search',[]))]\" 2>/dev/null" | |
| }, | |
| { | |
| "name": "DuckDuckGo HTML", | |
| "cmd": f"curl -sL {CURL_HEADERS} 'https://html.duckduckgo.com/html/?q={q}' | python3 -c \"import sys,re; h=sys.stdin.read(); snippets=re.findall(r'class=.result__snippet[^>]*>(.*?)</a>',h,re.DOTALL); titles=re.findall(r'class=.result__title[^>]*>.*?<a[^>]*>(.*?)</a>',h,re.DOTALL); clean=lambda s:re.sub('<[^>]+>','',s).strip(); [print(str(i+1)+'. '+clean(titles[i] if i<len(titles) else '')+'\\n '+clean(s)[:200]) for i,s in enumerate(snippets[:7])]\"" | |
| }, | |
| { | |
| "name": "Bing News RSS", | |
| "cmd": f"curl -sL {CURL_HEADERS} 'https://www.bing.com/news/search?q={q}&format=RSS' | python3 -c \"import sys,re; xml=sys.stdin.read(); items=re.findall(r'<item>(.*?)</item>',xml,re.DOTALL); [print(str(i+1)+'. '+re.sub('<[^>]+>','',re.search(r'<title>(.*?)</title>',it).group(1) if re.search(r'<title>',it) else '')+'\\n '+re.sub('<[^>]+>','',re.search(r'<description>(.*?)</description>',it).group(1) if re.search(r'<description>',it) else '')[:150]) for i,it in enumerate(items[:6])]\"" | |
| }, | |
| { | |
| "name": "Reddit Search", | |
| "cmd": f"curl -sL {CURL_HEADERS} -H 'Accept: application/json' 'https://www.reddit.com/search.json?q={q}&sort=new&limit=8&type=link' | python3 -c \"import sys,json; d=json.load(sys.stdin); posts=d.get('data',{{}}).get('children',[]); [print(str(i+1)+'. '+p['data'].get('title','')+'\\n r/'+p['data'].get('subreddit','')+' Score:'+str(p['data'].get('score',''))+'\\n '+p['data'].get('selftext','')[:200]) for i,p in enumerate(posts[:6])]\"" | |
| }, | |
| { | |
| "name": "HackerNews", | |
| "cmd": f"curl -s --max-time 15 'https://hn.algolia.com/api/v1/search?query={q}&hitsPerPage=8&tags=story' | python3 -c \"import sys,json; d=json.load(sys.stdin); hits=d.get('hits',[]); [print(str(i+1)+'. '+h.get('title','')+'\\n Points:'+str(h.get('points','0'))+' | '+h.get('url','')[:80]) for i,h in enumerate(hits[:6])]\"" | |
| }, | |
| { | |
| "name": "ArXiv Academic", | |
| "cmd": f"curl -s --max-time 15 'https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=5' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title>(.*?)</title>',xml)[1:]; summaries=re.findall(r'<summary>(.*?)</summary>',xml,re.DOTALL); [print(str(i+1)+'. '+t.strip()+'\\n '+summaries[i].strip()[:200] if i<len(summaries) else '') for i,t in enumerate(titles[:5])]\"" | |
| }, | |
| ] | |
| def _extract_query_from_cmd(cmd: str) -> str: | |
| patterns = [ | |
| r'[?&]q=([^&\'"]+)', | |
| r'[?&]query=([^&\'"]+)', | |
| r'[?&]search_query=all:([^&\'"]+)', | |
| r"search\?q=([^&'\"\s]+)", | |
| ] | |
| for p in patterns: | |
| m = re.search(p, cmd) | |
| if m: | |
| q = m.group(1).replace('+', ' ').replace('%20', ' ') | |
| return urllib.parse.unquote(q) | |
| return "" | |
| def _is_empty_result(stdout: str) -> bool: | |
| if not stdout or len(stdout.strip()) < 10: | |
| return True | |
| empty_signals = [ | |
| "no direct answer", "no results", "0 results", | |
| "no items", "[]", "{}", "error", "not found", "answer: no", | |
| ] | |
| s = stdout.strip().lower() | |
| meaningful_lines = [l for l in s.split('\n') if l.strip() and not any(sig in l for sig in empty_signals)] | |
| return len(meaningful_lines) < 1 | |
| # ─── Screen Capture ───────────────────────────────── | |
| def _get_screen_size() -> tuple[int, int]: | |
| """يُعيد حجم الشاشة الحقيقي""" | |
| r = subprocess.run(["xdotool", "getdisplaygeometry"], | |
| env={**os.environ, "DISPLAY": DISPLAY}, | |
| capture_output=True, text=True, timeout=5) | |
| try: | |
| parts = r.stdout.strip().split() | |
| return int(parts[0]), int(parts[1]) | |
| except: | |
| return 1920, 1080 | |
| def _get_mouse_pos() -> tuple[int, int]: | |
| """يُعيد موقع الماوس الحالي""" | |
| r = subprocess.run(["xdotool", "getmouselocation"], | |
| env={**os.environ, "DISPLAY": DISPLAY}, | |
| capture_output=True, text=True, timeout=5) | |
| try: | |
| mx = int(re.search(r"x:(\d+)", r.stdout).group(1)) | |
| my = int(re.search(r"y:(\d+)", r.stdout).group(1)) | |
| return mx, my | |
| except: | |
| return 0, 0 | |
| def capture_screen_raw() -> tuple[object, int, int]: | |
| """ | |
| يلتقط الشاشة بـ 5 طرق متسلسلة — تضمن نجاح واحدة: | |
| 1. scrot (الأسرع) | |
| 2. import (ImageMagick) | |
| 3. xwd + convert | |
| 4. ffmpeg + x11grab | |
| 5. python-xlib / Xlib مباشرة | |
| """ | |
| from PIL import Image | |
| env = {**os.environ, "DISPLAY": DISPLAY} | |
| tmp = f"/tmp/zs_{int(time.time()*1000)}" | |
| def _load(path) -> tuple: | |
| """تحميل الصورة إذا وُجدت وحجمها > 0""" | |
| if not path or not os.path.exists(path): | |
| return None, 0, 0 | |
| size = os.path.getsize(path) | |
| if size < 512: # صورة فارغة/تالفة | |
| return None, 0, 0 | |
| try: | |
| img = Image.open(path).convert("RGB") | |
| w, h = img.size | |
| if w < 10 or h < 10: | |
| return None, 0, 0 | |
| try: os.unlink(path) | |
| except: pass | |
| return img, w, h | |
| except Exception as ex: | |
| print(f"[_load] {ex}") | |
| return None, 0, 0 | |
| # ── طريقة 1: scrot ────────────────────────────────── | |
| try: | |
| p = tmp + "_1.png" | |
| r = subprocess.run( | |
| ["scrot", "-q", "95", p], | |
| env=env, timeout=8, capture_output=True | |
| ) | |
| img, w, h = _load(p) | |
| if img: | |
| print("[capture] ✅ scrot") | |
| return img, w, h | |
| print(f"[capture] scrot failed rc={r.returncode} err={r.stderr[:80]}") | |
| except Exception as e: | |
| print(f"[capture] scrot ex: {e}") | |
| # ── طريقة 2: ImageMagick import ───────────────────── | |
| try: | |
| p = tmp + "_2.png" | |
| r = subprocess.run( | |
| ["import", "-window", "root", "-silent", p], | |
| env=env, timeout=10, capture_output=True | |
| ) | |
| img, w, h = _load(p) | |
| if img: | |
| print("[capture] ✅ import (ImageMagick)") | |
| return img, w, h | |
| print(f"[capture] import failed rc={r.returncode}") | |
| except Exception as e: | |
| print(f"[capture] import ex: {e}") | |
| # ── طريقة 3: xwd + convert ────────────────────────── | |
| try: | |
| xwd_p = tmp + "_3.xwd" | |
| png_p = tmp + "_3.png" | |
| r1 = subprocess.run( | |
| ["xwd", "-root", "-silent", "-out", xwd_p], | |
| env=env, timeout=10, capture_output=True | |
| ) | |
| r2 = subprocess.run( | |
| ["convert", xwd_p, png_p], | |
| timeout=10, capture_output=True | |
| ) | |
| try: os.unlink(xwd_p) | |
| except: pass | |
| img, w, h = _load(png_p) | |
| if img: | |
| print("[capture] ✅ xwd+convert") | |
| return img, w, h | |
| print(f"[capture] xwd rc={r1.returncode} convert rc={r2.returncode}") | |
| except Exception as e: | |
| print(f"[capture] xwd ex: {e}") | |
| # ── طريقة 4: ffmpeg x11grab ───────────────────────── | |
| try: | |
| p = tmp + "_4.png" | |
| sw, sh = _get_screen_size() | |
| r = subprocess.run([ | |
| "ffmpeg", "-y", | |
| "-f", "x11grab", | |
| "-video_size", f"{sw}x{sh}", | |
| "-i", DISPLAY, | |
| "-vframes", "1", | |
| "-q:v", "2", | |
| p | |
| ], env=env, timeout=12, capture_output=True) | |
| img, w, h = _load(p) | |
| if img: | |
| print("[capture] ✅ ffmpeg x11grab") | |
| return img, w, h | |
| print(f"[capture] ffmpeg rc={r.returncode} err={r.stderr[-120:]}") | |
| except Exception as e: | |
| print(f"[capture] ffmpeg ex: {e}") | |
| # ── طريقة 5: python-xlib (Xlib مباشرة) ────────────── | |
| try: | |
| from Xlib import display as Xdisplay | |
| from Xlib.ext.xtest import fake_input | |
| xdisp = Xdisplay.Display(DISPLAY) | |
| root = xdisp.screen().root | |
| geom = root.get_geometry() | |
| w, h = geom.width, geom.height | |
| raw = root.get_image(0, 0, w, h, | |
| Xdisplay.X.ZPixmap, | |
| 0xFFFFFFFF) | |
| import struct | |
| data = raw.data | |
| # BGRA → RGB | |
| pixels = [] | |
| for i in range(0, len(data), 4): | |
| b, g, r_, a = struct.unpack_from('BBBB', data, i) | |
| pixels.extend([r_, g, b]) | |
| img = Image.frombytes("RGB", (w, h), bytes(pixels)) | |
| print("[capture] ✅ python-xlib") | |
| return img, w, h | |
| except Exception as e: | |
| print(f"[capture] xlib ex: {e}") | |
| # ── طريقة 6: صورة placeholder واضحة ──────────────── | |
| print("[capture] ⚠️ ALL methods failed — returning placeholder") | |
| try: | |
| from PIL import ImageDraw | |
| sw, sh = _get_screen_size() | |
| img = Image.new("RGB", (sw or 1280, sh or 720), (20, 20, 30)) | |
| draw = ImageDraw.Draw(img) | |
| draw.rectangle([(0, 0), (sw, 40)], fill=(30, 30, 50)) | |
| draw.text((10, 10), f"⚠️ Screenshot failed — DISPLAY={DISPLAY}", fill=(255, 100, 100)) | |
| draw.text((10, 50), "Methods tried: scrot, import, xwd, ffmpeg, xlib", fill=(150, 150, 150)) | |
| return img, sw or 1280, sh or 720 | |
| except: | |
| return None, 0, 0 | |
| def capture_screen(scale=0.6, quality=70) -> str: | |
| """يلتقط الشاشة العادية بدون grid""" | |
| img, w, h = capture_screen_raw() | |
| if img is None: | |
| return "" | |
| try: | |
| if scale < 1.0: | |
| img = img.resize((int(w * scale), int(h * scale)), img.LANCZOS) | |
| buf = io.BytesIO() | |
| img.save(buf, format="JPEG", quality=quality, optimize=True) | |
| return base64.b64encode(buf.getvalue()).decode() | |
| except Exception as e: | |
| print(f"[capture] {e}") | |
| return "" | |
| def capture_screen_with_grid(scale=0.65, quality=72, | |
| force_mx: int | None = None, | |
| force_my: int | None = None) -> dict: | |
| """ | |
| يلتقط الشاشة ويُعيد صورة واحدة فقط مع شبكة إحداثيات واضحة. | |
| الإحداثيات في الصورة تطابق إحداثيات الشاشة الحقيقية (1:1 mapping). | |
| الخطوط كل 100px — الأرقام تُظهر الـ X/Y الحقيقي للنقر. | |
| force_mx / force_my: إذا مُرِّرا يُرسم الـ cursor في هذا الموضع مباشرةً | |
| بدون استعلام xdotool — يُعالج مشكلة race condition بعد mouse_move. | |
| """ | |
| try: | |
| from PIL import Image, ImageDraw | |
| img, orig_w, orig_h = capture_screen_raw() | |
| if img is None: | |
| return {"data": "", "width": 1920, "height": 1080, "mouse_x": 0, "mouse_y": 0} | |
| # استخدم الإحداثيات المُمرَّرة إذا وُجدت (بعد mouse_move/click مباشرة) | |
| # وإلا اقرأ من X server | |
| if force_mx is not None and force_my is not None: | |
| mx, my = force_mx, force_my | |
| else: | |
| mx, my = _get_mouse_pos() | |
| # resize للعرض فقط — الإحداثيات تبقى للشاشة الأصلية | |
| sw = int(orig_w * scale) | |
| sh = int(orig_h * scale) | |
| grid_img = img.resize((sw, sh), Image.LANCZOS) | |
| draw = ImageDraw.Draw(grid_img, "RGBA") | |
| # شبكة كل 100px بإحداثيات الشاشة الحقيقية | |
| step_orig = 100 | |
| step_scaled_x = int(step_orig * sw / orig_w) | |
| step_scaled_y = int(step_orig * sh / orig_h) | |
| grid_color = (255, 255, 255, 45) # خطوط بيضاء شفافة | |
| label_color = (0, 255, 180, 200) # أرقام خضراء واضحة | |
| # خطوط عمودية + أرقام X الحقيقية | |
| x_sc = step_scaled_x | |
| x_real = step_orig | |
| while x_sc < sw: | |
| draw.line([(x_sc, 0), (x_sc, sh)], fill=grid_color, width=1) | |
| draw.rectangle([(x_sc + 1, 2), (x_sc + 32, 14)], fill=(0, 0, 0, 160)) | |
| draw.text((x_sc + 2, 3), str(x_real), fill=label_color) | |
| x_sc += step_scaled_x | |
| x_real += step_orig | |
| # خطوط أفقية + أرقام Y الحقيقية | |
| y_sc = step_scaled_y | |
| y_real = step_orig | |
| while y_sc < sh: | |
| draw.line([(0, y_sc), (sw, y_sc)], fill=grid_color, width=1) | |
| draw.rectangle([(2, y_sc + 1), (36, y_sc + 13)], fill=(0, 0, 0, 160)) | |
| draw.text((3, y_sc + 2), str(y_real), fill=label_color) | |
| y_sc += step_scaled_y | |
| y_real += step_orig | |
| # موقع الماوس الحقيقي — دائرة حمراء | |
| mouse_sx = int(mx * sw / orig_w) | |
| mouse_sy = int(my * sh / orig_h) | |
| r = 10 | |
| draw.ellipse([(mouse_sx-r, mouse_sy-r), (mouse_sx+r, mouse_sy+r)], | |
| outline=(255, 60, 60, 240), width=2) | |
| draw.line([(mouse_sx-16, mouse_sy), (mouse_sx+16, mouse_sy)], | |
| fill=(255, 60, 60, 200), width=1) | |
| draw.line([(mouse_sx, mouse_sy-16), (mouse_sx, mouse_sy+16)], | |
| fill=(255, 60, 60, 200), width=1) | |
| # شريط معلومات في الأعلى | |
| final = grid_img.convert("RGB") | |
| draw2 = ImageDraw.Draw(final) | |
| draw2.rectangle([(0, 0), (sw, 18)], fill=(0, 0, 0)) | |
| hdr = (f"SCREEN {orig_w}x{orig_h} | MOUSE:({mx},{my}) | " | |
| f"GRID=100px | CLICK COORDS = numbers on grid lines") | |
| draw2.text((4, 2), hdr, fill=(0, 220, 160)) | |
| # شريط معلومات في الأسفل | |
| draw2.rectangle([(0, sh-18), (sw, sh)], fill=(0, 0, 0)) | |
| draw2.text((4, sh-16), | |
| f"USE REAL COORDS: e.g. click x=500 y=300 means the '500' vertical line + '300' horizontal line", | |
| fill=(180, 180, 80)) | |
| buf = io.BytesIO() | |
| final.save(buf, format="JPEG", quality=quality, optimize=True) | |
| data = base64.b64encode(buf.getvalue()).decode() | |
| return { | |
| "data": data, | |
| "width": orig_w, | |
| "height": orig_h, | |
| "mouse_x": mx, | |
| "mouse_y": my, | |
| } | |
| except Exception as e: | |
| print(f"[capture_grid] {e}") | |
| plain = capture_screen(scale=scale, quality=quality) | |
| mx, my = _get_mouse_pos() | |
| w, h = _get_screen_size() | |
| return {"data": plain, "width": w, "height": h, "mouse_x": mx, "mouse_y": my} | |
| # ─── Command Runner ────────────────────────────────── | |
| def run_raw_command(cmd: str, timeout: int = 60) -> dict: | |
| env = {**os.environ, "DISPLAY": DISPLAY, | |
| "PYTHONIOENCODING": "utf-8", "LANG": "en_US.UTF-8"} | |
| try: | |
| result = subprocess.run(cmd, shell=True, capture_output=True, | |
| text=True, timeout=timeout, env=env, | |
| executable="/bin/bash") | |
| return { | |
| "stdout": result.stdout[-15000:], | |
| "stderr": result.stderr[-3000:], | |
| "returncode": result.returncode, | |
| } | |
| except subprocess.TimeoutExpired: | |
| return {"stdout": "", "stderr": f"⏱️ Timeout {timeout}s", "returncode": -1} | |
| except Exception as e: | |
| return {"stdout": "", "stderr": str(e), "returncode": -1} | |
| def run_command_smart(cmd: str, timeout: int = 60) -> dict: | |
| res = run_raw_command(cmd, timeout=timeout) | |
| stdout = res["stdout"].strip() | |
| if res["returncode"] == 0 and not _is_empty_result(stdout): | |
| return res | |
| is_curl_search = "curl" in cmd and any(x in cmd for x in [ | |
| "duckduckgo", "google", "bing", "wikipedia", "reddit", | |
| "hackernews", "hn.algolia", "arxiv", "news", "search" | |
| ]) | |
| if not is_curl_search: | |
| return res | |
| query = _extract_query_from_cmd(cmd) | |
| if not query or len(query) < 3: | |
| words = [w for w in cmd.split() if len(w) > 3 and not w.startswith('-') | |
| and 'http' not in w and 'python3' not in w and 'curl' not in w] | |
| query = ' '.join(words[:5]) if words else "" | |
| if not query: | |
| return res | |
| sources = _build_search_sources(query) | |
| tried_names = [] | |
| all_results = [] | |
| for source in sources: | |
| src_name = source["name"] | |
| tried_names.append(f"🔍 {src_name}") | |
| src_res = run_raw_command(source["cmd"], timeout=25) | |
| src_out = src_res["stdout"].strip() | |
| if not _is_empty_result(src_out): | |
| combined_header = f"[مصدر بديل: {src_name}]\n{'='*50}\n" | |
| all_results.append(src_out) | |
| for source2 in sources: | |
| if source2["name"] != src_name: | |
| s2 = run_raw_command(source2["cmd"], timeout=20) | |
| s2_out = s2["stdout"].strip() | |
| if not _is_empty_result(s2_out): | |
| all_results.append(f"\n[مصدر إضافي: {source2['name']}]\n{s2_out}") | |
| break | |
| final_out = combined_header + "\n\n".join(all_results) | |
| return { | |
| "stdout": final_out[:15000], | |
| "stderr": "", | |
| "returncode": 0, | |
| "_sources_tried": tried_names, | |
| "_fallback_used": src_name, | |
| } | |
| return { | |
| "stdout": f"[لم تُرجع أي مصادر نتائج لـ: {query}]\nالمصادر: {', '.join(tried_names[:5])}", | |
| "stderr": res["stderr"], | |
| "returncode": -1, | |
| "_sources_tried": tried_names, | |
| } | |
| def xdo(args: list, timeout=10) -> dict: | |
| r = subprocess.run(["xdotool"] + args, | |
| env={**os.environ, "DISPLAY": DISPLAY}, | |
| timeout=timeout, capture_output=True, text=True) | |
| return {"rc": r.returncode, "out": r.stdout, "err": r.stderr} | |
| def type_text_smart(text: str) -> dict: | |
| """ | |
| كتابة نص ذكية — تدعم العربية والإنجليزية: | |
| - للنصوص الإنجليزية: xdotool type مباشرة | |
| - للنصوص العربية أو المختلطة: xclip clipboard ثم ctrl+v | |
| """ | |
| has_arabic = bool(re.search(r'[\u0600-\u06FF]', text)) | |
| if has_arabic: | |
| # طريقة Clipboard لضمان كتابة العربية بشكل صحيح | |
| try: | |
| proc = subprocess.Popen( | |
| ["xclip", "-selection", "clipboard"], | |
| stdin=subprocess.PIPE, | |
| env={**os.environ, "DISPLAY": DISPLAY} | |
| ) | |
| proc.communicate(text.encode("utf-8")) | |
| time.sleep(0.15) | |
| # Focus + paste | |
| xdo(["key", "--clearmodifiers", "ctrl+v"]) | |
| return {"success": True, "method": "clipboard+paste", "arabic": True} | |
| except Exception as e: | |
| # fallback: xdotool type | |
| r = xdo(["type", "--clearmodifiers", "--delay", "50", text]) | |
| return {"success": r["rc"] == 0, "method": "xdotool_fallback", "error": r["err"]} | |
| else: | |
| # إنجليزية: xdotool type مباشرة | |
| r = xdo(["type", "--clearmodifiers", "--delay", "30", text]) | |
| return {"success": r["rc"] == 0, "method": "xdotool_direct"} | |
| def open_browser_smart(url: str = "") -> str: | |
| """يفتح المتصفح بطريقة ذكية مع fallback""" | |
| browser_cmd = BROWSER | |
| if not url: | |
| url = "about:blank" | |
| # جرب أولاً: BROWSER العادي | |
| cmd = f"{browser_cmd} --new-window '{url}' &" | |
| proc = subprocess.Popen(cmd, shell=True, | |
| env={**os.environ, "DISPLAY": DISPLAY}, | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return f"{browser_cmd} → {url}" | |
| # ─── Simple Command Parser ───────────────────────────── | |
| # يفهم أوامر بسيطة ويحولها لأفعال داخلية | |
| def parse_simple_command(raw: str) -> dict | None: | |
| """ | |
| يُحلّل الأوامر البسيطة النصية وتُعيد action dict | |
| أمثلة: | |
| open pc → {"action":"screenshot"} | |
| open firefox → {"action":"open_browser","url":""} | |
| open chrome → {"action":"open_browser","url":""} | |
| open browser → {"action":"open_browser","url":""} | |
| open url https://google.com → {"action":"open_browser","url":"https://..."} | |
| mouse go x500 y300 → {"action":"mouse_move","x":500,"y":300} | |
| mouse go 500 300 → {"action":"mouse_move","x":500,"y":300} | |
| click x500 y300 → {"action":"mouse_click","x":500,"y":300,"button":"left"} | |
| click 500 300 → {"action":"mouse_click","x":500,"y":300,"button":"left"} | |
| rclick x500 y300 → {"action":"mouse_click","x":500,"y":300,"button":"right"} | |
| dclick x500 y300 → {"action":"mouse_click","x":500,"y":300,"double":true} | |
| type hello world → {"action":"keyboard_type","text":"hello world"} | |
| key enter → {"action":"keyboard_hotkey","keys":["Return"]} | |
| key ctrl+c → {"action":"keyboard_hotkey","keys":["ctrl","c"]} | |
| scroll up → {"action":"scroll","x":960,"y":540,"clicks":3} | |
| scroll down → {"action":"scroll","x":960,"y":540,"clicks":-3} | |
| screenshot → {"action":"screenshot"} | |
| screen info → {"action":"screen_info"} | |
| """ | |
| s = raw.strip().lower() | |
| # open pc / open computer / open screen | |
| if re.match(r'^open\s+(pc|computer|screen|desktop)$', s): | |
| return {"action": "screenshot"} | |
| # open firefox / chrome / browser / browser url | |
| m = re.match(r'^open\s+(firefox|chrome|chromium|browser|web|internet)(\s+(.+))?$', s) | |
| if m: | |
| url = (m.group(3) or "").strip() | |
| if url and not url.startswith("http"): | |
| url = "https://" + url | |
| return {"action": "open_browser", "url": url or ""} | |
| # open url <url> | |
| m = re.match(r'^open\s+url\s+(\S+)$', s) | |
| if m: | |
| url = m.group(1) | |
| if not url.startswith("http"): | |
| url = "https://" + url | |
| return {"action": "open_browser", "url": url} | |
| # mouse go x500 y300 OR mouse go 500 300 | |
| m = re.match(r'^mouse\s+(?:go|move|to)\s+x?(\d+)\s+y?(\d+)$', s) | |
| if m: | |
| return {"action": "mouse_move", "x": int(m.group(1)), "y": int(m.group(2))} | |
| # click x500 y300 OR click 500 300 | |
| m = re.match(r'^click\s+x?(\d+)\s+y?(\d+)$', s) | |
| if m: | |
| return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "left"} | |
| # rclick / right click | |
| m = re.match(r'^r(?:ight)?click\s+x?(\d+)\s+y?(\d+)$', s) | |
| if m: | |
| return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "right"} | |
| # dclick / double click | |
| m = re.match(r'^d(?:ouble)?click\s+x?(\d+)\s+y?(\d+)$', s) | |
| if m: | |
| return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "left", "double": True} | |
| # type <text> (يحتفظ بالحالة الأصلية) | |
| m = re.match(r'^type\s+(.+)$', raw.strip(), re.IGNORECASE) | |
| if m: | |
| return {"action": "keyboard_type", "text": m.group(1)} | |
| # key <keys> | |
| m = re.match(r'^key\s+(.+)$', s) | |
| if m: | |
| k = m.group(1).strip() | |
| key_map = { | |
| "enter": "Return", "return": "Return", "esc": "Escape", "escape": "Escape", | |
| "tab": "Tab", "space": "space", "backspace": "BackSpace", "delete": "Delete", | |
| "up": "Up", "down": "Down", "left": "Left", "right": "Right", | |
| "home": "Home", "end": "End", "pageup": "Prior", "pagedown": "Next", | |
| "f1": "F1", "f2": "F2", "f3": "F3", "f4": "F4", "f5": "F5", | |
| } | |
| if '+' in k: | |
| parts = [p.strip() for p in k.split('+')] | |
| keys = [key_map.get(p, p) for p in parts] | |
| else: | |
| keys = [key_map.get(k, k)] | |
| return {"action": "keyboard_hotkey", "keys": keys} | |
| # scroll up / scroll down / scroll <n> | |
| m = re.match(r'^scroll\s+(up|down|(\-?\d+))$', s) | |
| if m: | |
| w, h = _get_screen_size() | |
| direction = m.group(1) | |
| if direction == "up": | |
| clicks = 4 | |
| elif direction == "down": | |
| clicks = -4 | |
| else: | |
| clicks = int(direction) | |
| return {"action": "scroll", "x": w // 2, "y": h // 2, "clicks": clicks} | |
| # screenshot | |
| if s in ("screenshot", "screen", "ss", "snap"): | |
| return {"action": "screenshot"} | |
| # screen info | |
| if re.match(r'^screen\s+info$', s): | |
| return {"action": "screen_info"} | |
| return None # لم يُعرف | |
| async def broadcast(msg: dict): | |
| txt = json.dumps(msg, ensure_ascii=False) | |
| dead = [] | |
| for ws in active_connections: | |
| try: await ws.send_text(txt) | |
| except: dead.append(ws) | |
| for ws in dead: | |
| if ws in active_connections: | |
| active_connections.remove(ws) | |
| async def screen_stream_loop(): | |
| global stream_active | |
| interval = 1.0 / max(1, stream_fps) | |
| while stream_active and active_connections: | |
| try: | |
| frame = capture_screen(scale=stream_scale, quality=stream_quality) | |
| if frame: | |
| await broadcast({"type": "frame", "data": frame}) | |
| except: pass | |
| await asyncio.sleep(interval) | |
| stream_active = False | |
| # ─── Action Handler ────────────────────────────────── | |
| async def handle_action(ws: WebSocket, msg: dict): | |
| action = msg.get("action", "") | |
| data = msg.get("data", {}) | |
| async def send(obj): | |
| await ws.send_text(json.dumps(obj, ensure_ascii=False)) | |
| async def auto_shot_grid(label="", delay=0.5, | |
| force_mx: int | None = None, | |
| force_my: int | None = None): | |
| """ | |
| screenshot مع grid. | |
| force_mx/force_my: ارسم الـ cursor في هذا الموضع بالضبط (بعد mouse_move/click) | |
| بدلاً من إعادة قراءته من X server — يُصلح race condition. | |
| """ | |
| await asyncio.sleep(delay) | |
| result = capture_screen_with_grid(scale=0.65, quality=72, | |
| force_mx=force_mx, force_my=force_my) | |
| if result["data"]: | |
| await send({ | |
| "type": "screenshot", | |
| "data": result["data"], | |
| "ts": int(time.time() * 1000), | |
| "auto": True, | |
| "label": label, | |
| "screen_width": result["width"], | |
| "screen_height": result["height"], | |
| "mouse_x": result["mouse_x"], | |
| "mouse_y": result["mouse_y"], | |
| "has_grid": True, | |
| }) | |
| return result["data"] | |
| # ── screenshot ──────────────────────────────────── | |
| if action == "screenshot": | |
| result = capture_screen_with_grid(scale=0.65, quality=75) | |
| await send({ | |
| "type": "screenshot", | |
| "data": result["data"], | |
| "ts": int(time.time() * 1000), | |
| "screen_width": result["width"], | |
| "screen_height": result["height"], | |
| "mouse_x": result["mouse_x"], | |
| "mouse_y": result["mouse_y"], | |
| "has_grid": True, | |
| }) | |
| # ── simple_command — الأوامر البسيطة ────────────── | |
| elif action == "simple_command": | |
| raw_cmd = data.get("cmd", "").strip() | |
| parsed = parse_simple_command(raw_cmd) | |
| if parsed is None: | |
| await send({ | |
| "type": "simple_command_result", | |
| "ok": False, | |
| "cmd": raw_cmd, | |
| "error": f"أمر غير معروف: '{raw_cmd}'\nالأوامر المتاحة: open pc, open firefox, mouse go x y, click x y, rclick x y, type TEXT, key ENTER, scroll up/down, screenshot" | |
| }) | |
| return | |
| # نفّذ الأمر المُحلّل | |
| await send({"type": "simple_command_result", "ok": True, "cmd": raw_cmd, "parsed": parsed}) | |
| # أعِد تشغيل نفس الـ handle_action مع الأمر المُحلّل | |
| sub_msg = {"action": parsed["action"], "data": {k: v for k, v in parsed.items() if k != "action"}} | |
| await handle_action(ws, sub_msg) | |
| return | |
| # ── terminal ────────────────────────────────────── | |
| elif action == "terminal": | |
| cmd = data.get("cmd", "") | |
| timeout = int(data.get("timeout", 60)) | |
| if not cmd: | |
| await send({"type": "terminal_result", "cmd": "", "stdout": "", | |
| "stderr": "no command", "returncode": -1}) | |
| return | |
| res = run_command_smart(cmd, timeout=timeout) | |
| await send({ | |
| "type": "terminal_result", | |
| "cmd": cmd, | |
| "stdout": res["stdout"], | |
| "stderr": res.get("stderr", ""), | |
| "returncode": res["returncode"], | |
| "fallback_used": res.get("_fallback_used", None), | |
| "sources_tried": res.get("_sources_tried", []), | |
| }) | |
| await auto_shot_grid(f"بعد: {cmd[:45]}", delay=0.4) | |
| # ── mouse_move ──────────────────────────────────── | |
| elif action == "mouse_move": | |
| x, y = int(data.get("x", 0)), int(data.get("y", 0)) | |
| # --sync ينتظر حتى يؤكد X server استلام الأمر | |
| xdo(["mousemove", "--sync", str(x), str(y)]) | |
| await send({"type": "ack", "action": "mouse_move", "x": x, "y": y}) | |
| # مرّر الإحداثيات مباشرةً لتجنّب race condition مع X server | |
| await auto_shot_grid(f"ماوس → ({x},{y})", delay=0.2, | |
| force_mx=x, force_my=y) | |
| # ── mouse_click ─────────────────────────────────── | |
| elif action == "mouse_click": | |
| x, y = int(data.get("x", 0)), int(data.get("y", 0)) | |
| btn_num = {"left": "1", "middle": "2", "right": "3"}.get(data.get("button", "left"), "1") | |
| xdo(["mousemove", "--sync", str(x), str(y)]) | |
| await asyncio.sleep(0.08) | |
| if data.get("double"): | |
| xdo(["click", "--repeat", "2", "--delay", "100", btn_num]) | |
| else: | |
| xdo(["click", btn_num]) | |
| btn_name = {"1": "left", "2": "middle", "3": "right"}.get(btn_num, "left") | |
| await send({"type": "ack", "action": "mouse_click", "x": x, "y": y, "button": btn_name}) | |
| # مرّر الإحداثيات مباشرةً لتجنّب race condition | |
| await auto_shot_grid(f"نقر {btn_name} → ({x},{y})", delay=0.5, | |
| force_mx=x, force_my=y) | |
| # ── keyboard_type ───────────────────────────────── | |
| elif action == "keyboard_type": | |
| text = data.get("text", "") | |
| if text: | |
| result = type_text_smart(text) | |
| await send({"type": "ack", "action": "keyboard_type", | |
| "method": result["method"], "text_len": len(text)}) | |
| else: | |
| await send({"type": "ack", "action": "keyboard_type"}) | |
| await auto_shot_grid("بعد الكتابة", delay=0.5) | |
| # ── keyboard_hotkey ─────────────────────────────── | |
| elif action == "keyboard_hotkey": | |
| keys = data.get("keys", []) | |
| if keys: | |
| xdo(["key", "--clearmodifiers", "+".join(keys)]) | |
| await send({"type": "ack", "action": "keyboard_hotkey", "keys": keys}) | |
| await auto_shot_grid("بعد الاختصار", delay=0.5) | |
| # ── keyboard_press ──────────────────────────────── | |
| elif action == "keyboard_press": | |
| key = data.get("key", "") | |
| if key: | |
| xdo(["key", "--clearmodifiers", key]) | |
| await send({"type": "ack", "action": "keyboard_press"}) | |
| await auto_shot_grid("بعد المفتاح", delay=0.4) | |
| # ── clipboard_write ─────────────────────────────── | |
| elif action == "clipboard_write": | |
| text = data.get("text", "") | |
| try: | |
| proc = subprocess.Popen( | |
| ["xclip", "-selection", "clipboard"], | |
| stdin=subprocess.PIPE, | |
| env={**os.environ, "DISPLAY": DISPLAY} | |
| ) | |
| proc.communicate(text.encode("utf-8")) | |
| await send({"type": "ack", "action": "clipboard_write", "length": len(text)}) | |
| except Exception as e: | |
| await send({"type": "error", "action": "clipboard_write", "msg": str(e)}) | |
| # ── clipboard_read ──────────────────────────────── | |
| elif action == "clipboard_read": | |
| res = run_raw_command("xclip -selection clipboard -o", timeout=5) | |
| await send({"type": "clipboard_content", "text": res["stdout"]}) | |
| # ── scroll ──────────────────────────────────────── | |
| elif action == "scroll": | |
| x, y = int(data.get("x", 0)), int(data.get("y", 0)) | |
| clicks = int(data.get("clicks", 3)) | |
| btn = "4" if clicks > 0 else "5" | |
| xdo(["mousemove", str(x), str(y)]) | |
| for _ in range(abs(clicks)): | |
| xdo(["click", btn]) | |
| await asyncio.sleep(0.03) | |
| await send({"type": "ack", "action": "scroll", "clicks": clicks}) | |
| await auto_shot_grid("بعد التمرير", delay=0.4) | |
| # ── open_app ────────────────────────────────────── | |
| elif action == "open_app": | |
| app_cmd = data.get("cmd", "") | |
| if app_cmd: | |
| fixed_cmd = app_cmd | |
| # إصلاح firefox-esr | |
| if "firefox-esr" in app_cmd: | |
| esr_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True) | |
| if esr_check.returncode != 0: | |
| fixed_cmd = app_cmd.replace("firefox-esr", BROWSER) | |
| subprocess.Popen(fixed_cmd, shell=True, | |
| env={**os.environ, "DISPLAY": DISPLAY}, | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| await send({"type": "ack", "action": "open_app", "cmd": fixed_cmd}) | |
| wait_time = 7.0 if any(x in app_cmd for x in ["firefox", "chromium", "chrome"]) else 4.0 | |
| await asyncio.sleep(2.0) | |
| await auto_shot_grid(f"فتح: {fixed_cmd[:40]} (2s)", delay=0) | |
| await asyncio.sleep(wait_time - 2.0) | |
| await auto_shot_grid(f"بعد تحميل: {fixed_cmd[:40]}", delay=0) | |
| else: | |
| await send({"type": "ack", "action": "open_app"}) | |
| # ── open_browser — فتح المتصفح ──────────────────── | |
| elif action == "open_browser": | |
| url = data.get("url", "") | |
| if not url: | |
| url = "about:blank" | |
| browser_result = open_browser_smart(url) | |
| await send({"type": "ack", "action": "open_browser", "result": browser_result}) | |
| await asyncio.sleep(2.0) | |
| await auto_shot_grid(f"فتح المتصفح: {url[:50]}", delay=0) | |
| await asyncio.sleep(5.0) | |
| await auto_shot_grid("بعد تحميل المتصفح", delay=0) | |
| # ── mouse_drag ──────────────────────────────────── | |
| elif action == "mouse_drag": | |
| x1, y1 = int(data.get("x1", 0)), int(data.get("y1", 0)) | |
| x2, y2 = int(data.get("x2", 0)), int(data.get("y2", 0)) | |
| xdo(["mousemove", str(x1), str(y1)]) | |
| xdo(["mousedown", "1"]) | |
| await asyncio.sleep(0.1) | |
| xdo(["mousemove", str(x2), str(y2)]) | |
| await asyncio.sleep(0.1) | |
| xdo(["mouseup", "1"]) | |
| await send({"type": "ack", "action": "mouse_drag"}) | |
| # cursor ينتهي عند x2,y2 | |
| await auto_shot_grid("بعد السحب", delay=0.4, force_mx=x2, force_my=y2) | |
| # ── start_stream ────────────────────────────────── | |
| elif action == "start_stream": | |
| global stream_active, stream_fps, stream_quality, stream_scale | |
| stream_fps = int(data.get("fps", 3)) | |
| stream_quality = int(data.get("quality", 60)) | |
| stream_scale = float(data.get("scale", 0.5)) | |
| if not stream_active: | |
| stream_active = True | |
| asyncio.create_task(screen_stream_loop()) | |
| await send({"type": "ack", "action": "start_stream"}) | |
| # ── stop_stream ─────────────────────────────────── | |
| elif action == "stop_stream": | |
| stream_active = False | |
| await send({"type": "ack", "action": "stop_stream"}) | |
| # ── screen_info ─────────────────────────────────── | |
| elif action == "screen_info": | |
| w, h = _get_screen_size() | |
| mx, my = _get_mouse_pos() | |
| await send({ | |
| "type": "screen_info", | |
| "width": w, "height": h, | |
| "mouse_x": mx, "mouse_y": my, | |
| "browser": BROWSER, | |
| }) | |
| # ── paste ───────────────────────────────────────── | |
| elif action == "paste": | |
| text = data.get("text", "") | |
| if text: | |
| proc = subprocess.Popen( | |
| ["xclip", "-selection", "clipboard"], | |
| stdin=subprocess.PIPE, | |
| env={**os.environ, "DISPLAY": DISPLAY} | |
| ) | |
| proc.communicate(text.encode("utf-8")) | |
| await asyncio.sleep(0.1) | |
| xdo(["key", "--clearmodifiers", "ctrl+v"]) | |
| await send({"type": "ack", "action": "paste"}) | |
| await auto_shot_grid("بعد اللصق", delay=0.5) | |
| # ── unknown ─────────────────────────────────────── | |
| else: | |
| await send({ | |
| "type": "error", | |
| "msg": f"Unknown action: '{action}'. Available: screenshot, terminal, mouse_move, mouse_click, mouse_drag, keyboard_type, keyboard_hotkey, keyboard_press, clipboard_write, clipboard_read, scroll, open_app, open_browser, paste, simple_command, screen_info, start_stream, stop_stream" | |
| }) | |
| # ─── WebSocket ─────────────────────────────────────── | |
| async def websocket_endpoint(ws: WebSocket): | |
| await ws.accept() | |
| active_connections.append(ws) | |
| w, h = _get_screen_size() | |
| await ws.send_text(json.dumps({ | |
| "type": "connected", | |
| "screen_width": w, | |
| "screen_height": h, | |
| "browser": BROWSER, | |
| "msg": f"Z Computer Mode v7 — Smart Control | Browser: {BROWSER} | Screen: {w}x{h}", | |
| "commands": [ | |
| "screenshot", "terminal {cmd}", "mouse_move {x,y}", "mouse_click {x,y,button}", | |
| "keyboard_type {text}", "keyboard_hotkey {keys}", "scroll {x,y,clicks}", | |
| "open_app {cmd}", "open_browser {url}", "simple_command {cmd}", | |
| "Simple cmds: 'open firefox', 'mouse go x y', 'click x y', 'type TEXT', 'key enter'" | |
| ] | |
| }, ensure_ascii=False)) | |
| # أرسل screenshot أولية مع grid | |
| result = capture_screen_with_grid(scale=0.65, quality=72) | |
| if result["data"]: | |
| await ws.send_text(json.dumps({ | |
| "type": "screenshot", | |
| "data": result["data"], | |
| "ts": int(time.time() * 1000), | |
| "label": "الشاشة الأولية", | |
| "screen_width": result["width"], | |
| "screen_height": result["height"], | |
| "mouse_x": result["mouse_x"], | |
| "mouse_y": result["mouse_y"], | |
| "has_grid": True, | |
| }, ensure_ascii=False)) | |
| try: | |
| while True: | |
| raw = await ws.receive_text() | |
| await handle_action(ws, json.loads(raw)) | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception as e: | |
| print(f"[ws] {e}") | |
| finally: | |
| if ws in active_connections: | |
| active_connections.remove(ws) | |
| # ─── REST ──────────────────────────────────────────── | |
| async def rest_screenshot(): | |
| result = capture_screen_with_grid(scale=0.7, quality=73) | |
| return JSONResponse({ | |
| "image": result["data"], | |
| "ts": int(time.time() * 1000), | |
| "screen_width": result["width"], | |
| "screen_height": result["height"], | |
| "mouse_x": result["mouse_x"], | |
| "mouse_y": result["mouse_y"], | |
| "has_grid": True, | |
| }) | |
| async def rest_screenshot_clean(): | |
| return JSONResponse({"image": capture_screen(0.75, 75), "ts": int(time.time() * 1000)}) | |
| async def rest_terminal(body: dict): | |
| return JSONResponse(run_command_smart(body.get("cmd", ""), body.get("timeout", 60))) | |
| async def rest_simple(body: dict): | |
| """تنفيذ أمر بسيط عبر REST""" | |
| raw = body.get("cmd", "").strip() | |
| parsed = parse_simple_command(raw) | |
| if not parsed: | |
| return JSONResponse({"ok": False, "error": f"Unknown simple command: {raw}"}, status_code=400) | |
| return JSONResponse({"ok": True, "parsed": parsed}) | |
| async def health(): | |
| w, h = _get_screen_size() | |
| mx, my = _get_mouse_pos() | |
| browser_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True) | |
| return { | |
| "status": "ok", | |
| "version": "v7-smart-control", | |
| "browser": BROWSER, | |
| "firefox_esr_available": browser_check.returncode == 0, | |
| "screen_width": w, | |
| "screen_height": h, | |
| "mouse_x": mx, | |
| "mouse_y": my, | |
| } | |
| async def quick_search(query: str): | |
| res = run_command_smart( | |
| f"curl -s --max-time 15 'https://api.duckduckgo.com/?q={urllib.parse.quote_plus(query)}&format=json&no_html=1'", | |
| timeout=30 | |
| ) | |
| return JSONResponse(res) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run("app:app", host="0.0.0.0", port=port, log_level="info") | |