Pcserver / app.py
THEZYZSTUDIO's picture
Update app.py
52b56e0 verified
Raw
History Blame Contribute Delete
49.1 kB
"""
THE Z AI — Computer Mode Server v7 — SMART CONTROL
====================================================
الإصلاحات الجوهرية في v7:
1. capture_screen_with_grid() — صورة مزدوجة: يمين نظيفة + يسار بشبكة احداثيات شفافة
2. أوامر بسيطة: open pc, open firefox, mouse go x y, click x y, rclick x y, type TEXT, key ENTER ...
3. إصلاح الكتابة العربية: xclip clipboard + xdotool key ctrl+v
4. إرسال احداثيات الشاشة الحقيقية + موقع الماوس مع كل screenshot
5. auto_shot_with_grid بعد كل خطوة
6. simple_command parser: يفهم أوامر بسيطة ويحولها لأفعال WebSocket
"""
import asyncio
import base64
import io
import json
import os
import re
import subprocess
import time
import urllib.parse
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
DISPLAY = os.environ.get("DISPLAY", ":1")
os.environ["DISPLAY"] = DISPLAY
# ─── تشخيص أدوات الـ capture عند البداية ─────────────
def _check_capture_tools():
tools = ["scrot", "import", "xwd", "convert", "ffmpeg", "xdotool"]
print("─── Capture Tools Check ───")
for t in tools:
r = subprocess.run(["which", t], capture_output=True, text=True)
status = "✅" if r.returncode == 0 else "❌"
print(f" {status} {t}: {r.stdout.strip() or 'not found'}")
try:
r2 = subprocess.run(["xdpyinfo", "-display", DISPLAY],
capture_output=True, text=True, timeout=3)
print(f" {'✅' if r2.returncode==0 else '❌'} DISPLAY={DISPLAY}: {'active' if r2.returncode==0 else 'not active'}")
except Exception as ex:
print(f" ❌ DISPLAY={DISPLAY}: {ex}")
print("───────────────────────────")
try:
_check_capture_tools()
except Exception as _e:
print(f"[diag] {_e}")
# ─── اكتشاف المتصفح ──────────────────────────────────
def _detect_browser() -> str:
candidates = ["firefox", "firefox-esr", "chromium-browser", "chromium", "google-chrome"]
found = None
for c in candidates:
r = subprocess.run(["which", c], capture_output=True, text=True)
if r.returncode == 0 and r.stdout.strip():
found = c
break
if not found:
return "firefox"
# تأكد من وجود firefox-esr كـ symlink
esr_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True)
if esr_check.returncode != 0:
real_path = subprocess.run(["which", found], capture_output=True, text=True).stdout.strip()
if real_path:
try:
subprocess.run(["ln", "-sf", real_path, "/usr/local/bin/firefox-esr"], check=True)
print(f"✅ Created firefox-esr symlink → {real_path}")
except Exception as e:
print(f"⚠️ Could not create firefox-esr symlink: {e}")
return found
BROWSER = _detect_browser()
print(f"🌐 Browser detected: {BROWSER}")
app = FastAPI(title="Z-Computer-Mode API v7")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"])
active_connections: list[WebSocket] = []
stream_active = False
stream_fps = 3
stream_quality = 60
stream_scale = 0.5
# ─── CURL HEADERS ────────────────────────────────────
CURL_HEADERS = (
'-H "User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" '
'-H "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" '
'-H "Accept-Language: en-US,en;q=0.9,ar;q=0.8" '
'-H "Accept-Encoding: gzip, deflate" '
'--compressed '
'--max-time 20 '
'-L '
'-s '
)
# ─── مصادر البحث 8+ ──────────────────────────────────
def _build_search_sources(query: str) -> list[dict]:
q = urllib.parse.quote_plus(query)
q_raw = query.replace(' ', '+')
return [
{
"name": "DuckDuckGo Instant",
"cmd": f"curl -s --max-time 20 'https://api.duckduckgo.com/?q={q}&format=json&no_html=1&skip_disambig=1' | python3 -c \"import sys,json; d=json.load(sys.stdin); ans=d.get('AbstractText',''); rels=d.get('RelatedTopics',[]); print('ANSWER:',ans if ans else 'no direct answer'); [print('-',r.get('Text','')[:250]) for r in rels if isinstance(r,dict) and r.get('Text')]\""
},
{
"name": "Google News RSS",
"cmd": f"curl -sL {CURL_HEADERS} 'https://news.google.com/rss/search?q={q_raw}&hl=en&gl=US&ceid=US:en' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title><!\\[CDATA\\[(.*?)\\]\\]></title>|<title>(.*?)</title>',xml); clean=lambda s:re.sub('<[^>]+>','',s); results=[(a or b).strip() for a,b in titles if (a or b).strip()][1:9]; [print(str(i+1)+'. '+t[:180]) for i,t in enumerate(results)]\""
},
{
"name": "Wikipedia English",
"cmd": f"curl -s --max-time 15 'https://en.wikipedia.org/api/rest_v1/page/summary/{q}' | python3 -c \"import sys,json; d=json.load(sys.stdin); print(d.get('title','')+'\\n'+d.get('extract','')[:1500])\" 2>/dev/null || curl -s --max-time 15 'https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={q}&format=json&srlimit=5' | python3 -c \"import sys,json,re; d=json.load(sys.stdin); [print(str(i+1)+'. '+r['title']+': '+re.sub('<[^>]+>','',r.get('snippet',''))[:200]) for i,r in enumerate(d.get('query',{{}}).get('search',[]))]\" 2>/dev/null"
},
{
"name": "DuckDuckGo HTML",
"cmd": f"curl -sL {CURL_HEADERS} 'https://html.duckduckgo.com/html/?q={q}' | python3 -c \"import sys,re; h=sys.stdin.read(); snippets=re.findall(r'class=.result__snippet[^>]*>(.*?)</a>',h,re.DOTALL); titles=re.findall(r'class=.result__title[^>]*>.*?<a[^>]*>(.*?)</a>',h,re.DOTALL); clean=lambda s:re.sub('<[^>]+>','',s).strip(); [print(str(i+1)+'. '+clean(titles[i] if i<len(titles) else '')+'\\n '+clean(s)[:200]) for i,s in enumerate(snippets[:7])]\""
},
{
"name": "Bing News RSS",
"cmd": f"curl -sL {CURL_HEADERS} 'https://www.bing.com/news/search?q={q}&format=RSS' | python3 -c \"import sys,re; xml=sys.stdin.read(); items=re.findall(r'<item>(.*?)</item>',xml,re.DOTALL); [print(str(i+1)+'. '+re.sub('<[^>]+>','',re.search(r'<title>(.*?)</title>',it).group(1) if re.search(r'<title>',it) else '')+'\\n '+re.sub('<[^>]+>','',re.search(r'<description>(.*?)</description>',it).group(1) if re.search(r'<description>',it) else '')[:150]) for i,it in enumerate(items[:6])]\""
},
{
"name": "Reddit Search",
"cmd": f"curl -sL {CURL_HEADERS} -H 'Accept: application/json' 'https://www.reddit.com/search.json?q={q}&sort=new&limit=8&type=link' | python3 -c \"import sys,json; d=json.load(sys.stdin); posts=d.get('data',{{}}).get('children',[]); [print(str(i+1)+'. '+p['data'].get('title','')+'\\n r/'+p['data'].get('subreddit','')+' Score:'+str(p['data'].get('score',''))+'\\n '+p['data'].get('selftext','')[:200]) for i,p in enumerate(posts[:6])]\""
},
{
"name": "HackerNews",
"cmd": f"curl -s --max-time 15 'https://hn.algolia.com/api/v1/search?query={q}&hitsPerPage=8&tags=story' | python3 -c \"import sys,json; d=json.load(sys.stdin); hits=d.get('hits',[]); [print(str(i+1)+'. '+h.get('title','')+'\\n Points:'+str(h.get('points','0'))+' | '+h.get('url','')[:80]) for i,h in enumerate(hits[:6])]\""
},
{
"name": "ArXiv Academic",
"cmd": f"curl -s --max-time 15 'https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=5' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title>(.*?)</title>',xml)[1:]; summaries=re.findall(r'<summary>(.*?)</summary>',xml,re.DOTALL); [print(str(i+1)+'. '+t.strip()+'\\n '+summaries[i].strip()[:200] if i<len(summaries) else '') for i,t in enumerate(titles[:5])]\""
},
]
def _extract_query_from_cmd(cmd: str) -> str:
patterns = [
r'[?&]q=([^&\'"]+)',
r'[?&]query=([^&\'"]+)',
r'[?&]search_query=all:([^&\'"]+)',
r"search\?q=([^&'\"\s]+)",
]
for p in patterns:
m = re.search(p, cmd)
if m:
q = m.group(1).replace('+', ' ').replace('%20', ' ')
return urllib.parse.unquote(q)
return ""
def _is_empty_result(stdout: str) -> bool:
if not stdout or len(stdout.strip()) < 10:
return True
empty_signals = [
"no direct answer", "no results", "0 results",
"no items", "[]", "{}", "error", "not found", "answer: no",
]
s = stdout.strip().lower()
meaningful_lines = [l for l in s.split('\n') if l.strip() and not any(sig in l for sig in empty_signals)]
return len(meaningful_lines) < 1
# ─── Screen Capture ─────────────────────────────────
def _get_screen_size() -> tuple[int, int]:
"""يُعيد حجم الشاشة الحقيقي"""
r = subprocess.run(["xdotool", "getdisplaygeometry"],
env={**os.environ, "DISPLAY": DISPLAY},
capture_output=True, text=True, timeout=5)
try:
parts = r.stdout.strip().split()
return int(parts[0]), int(parts[1])
except:
return 1920, 1080
def _get_mouse_pos() -> tuple[int, int]:
"""يُعيد موقع الماوس الحالي"""
r = subprocess.run(["xdotool", "getmouselocation"],
env={**os.environ, "DISPLAY": DISPLAY},
capture_output=True, text=True, timeout=5)
try:
mx = int(re.search(r"x:(\d+)", r.stdout).group(1))
my = int(re.search(r"y:(\d+)", r.stdout).group(1))
return mx, my
except:
return 0, 0
def capture_screen_raw() -> tuple[object, int, int]:
"""
يلتقط الشاشة بـ 5 طرق متسلسلة — تضمن نجاح واحدة:
1. scrot (الأسرع)
2. import (ImageMagick)
3. xwd + convert
4. ffmpeg + x11grab
5. python-xlib / Xlib مباشرة
"""
from PIL import Image
env = {**os.environ, "DISPLAY": DISPLAY}
tmp = f"/tmp/zs_{int(time.time()*1000)}"
def _load(path) -> tuple:
"""تحميل الصورة إذا وُجدت وحجمها > 0"""
if not path or not os.path.exists(path):
return None, 0, 0
size = os.path.getsize(path)
if size < 512: # صورة فارغة/تالفة
return None, 0, 0
try:
img = Image.open(path).convert("RGB")
w, h = img.size
if w < 10 or h < 10:
return None, 0, 0
try: os.unlink(path)
except: pass
return img, w, h
except Exception as ex:
print(f"[_load] {ex}")
return None, 0, 0
# ── طريقة 1: scrot ──────────────────────────────────
try:
p = tmp + "_1.png"
r = subprocess.run(
["scrot", "-q", "95", p],
env=env, timeout=8, capture_output=True
)
img, w, h = _load(p)
if img:
print("[capture] ✅ scrot")
return img, w, h
print(f"[capture] scrot failed rc={r.returncode} err={r.stderr[:80]}")
except Exception as e:
print(f"[capture] scrot ex: {e}")
# ── طريقة 2: ImageMagick import ─────────────────────
try:
p = tmp + "_2.png"
r = subprocess.run(
["import", "-window", "root", "-silent", p],
env=env, timeout=10, capture_output=True
)
img, w, h = _load(p)
if img:
print("[capture] ✅ import (ImageMagick)")
return img, w, h
print(f"[capture] import failed rc={r.returncode}")
except Exception as e:
print(f"[capture] import ex: {e}")
# ── طريقة 3: xwd + convert ──────────────────────────
try:
xwd_p = tmp + "_3.xwd"
png_p = tmp + "_3.png"
r1 = subprocess.run(
["xwd", "-root", "-silent", "-out", xwd_p],
env=env, timeout=10, capture_output=True
)
r2 = subprocess.run(
["convert", xwd_p, png_p],
timeout=10, capture_output=True
)
try: os.unlink(xwd_p)
except: pass
img, w, h = _load(png_p)
if img:
print("[capture] ✅ xwd+convert")
return img, w, h
print(f"[capture] xwd rc={r1.returncode} convert rc={r2.returncode}")
except Exception as e:
print(f"[capture] xwd ex: {e}")
# ── طريقة 4: ffmpeg x11grab ─────────────────────────
try:
p = tmp + "_4.png"
sw, sh = _get_screen_size()
r = subprocess.run([
"ffmpeg", "-y",
"-f", "x11grab",
"-video_size", f"{sw}x{sh}",
"-i", DISPLAY,
"-vframes", "1",
"-q:v", "2",
p
], env=env, timeout=12, capture_output=True)
img, w, h = _load(p)
if img:
print("[capture] ✅ ffmpeg x11grab")
return img, w, h
print(f"[capture] ffmpeg rc={r.returncode} err={r.stderr[-120:]}")
except Exception as e:
print(f"[capture] ffmpeg ex: {e}")
# ── طريقة 5: python-xlib (Xlib مباشرة) ──────────────
try:
from Xlib import display as Xdisplay
from Xlib.ext.xtest import fake_input
xdisp = Xdisplay.Display(DISPLAY)
root = xdisp.screen().root
geom = root.get_geometry()
w, h = geom.width, geom.height
raw = root.get_image(0, 0, w, h,
Xdisplay.X.ZPixmap,
0xFFFFFFFF)
import struct
data = raw.data
# BGRA → RGB
pixels = []
for i in range(0, len(data), 4):
b, g, r_, a = struct.unpack_from('BBBB', data, i)
pixels.extend([r_, g, b])
img = Image.frombytes("RGB", (w, h), bytes(pixels))
print("[capture] ✅ python-xlib")
return img, w, h
except Exception as e:
print(f"[capture] xlib ex: {e}")
# ── طريقة 6: صورة placeholder واضحة ────────────────
print("[capture] ⚠️ ALL methods failed — returning placeholder")
try:
from PIL import ImageDraw
sw, sh = _get_screen_size()
img = Image.new("RGB", (sw or 1280, sh or 720), (20, 20, 30))
draw = ImageDraw.Draw(img)
draw.rectangle([(0, 0), (sw, 40)], fill=(30, 30, 50))
draw.text((10, 10), f"⚠️ Screenshot failed — DISPLAY={DISPLAY}", fill=(255, 100, 100))
draw.text((10, 50), "Methods tried: scrot, import, xwd, ffmpeg, xlib", fill=(150, 150, 150))
return img, sw or 1280, sh or 720
except:
return None, 0, 0
def capture_screen(scale=0.6, quality=70) -> str:
"""يلتقط الشاشة العادية بدون grid"""
img, w, h = capture_screen_raw()
if img is None:
return ""
try:
if scale < 1.0:
img = img.resize((int(w * scale), int(h * scale)), img.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=quality, optimize=True)
return base64.b64encode(buf.getvalue()).decode()
except Exception as e:
print(f"[capture] {e}")
return ""
def capture_screen_with_grid(scale=0.65, quality=72,
force_mx: int | None = None,
force_my: int | None = None) -> dict:
"""
يلتقط الشاشة ويُعيد صورة واحدة فقط مع شبكة إحداثيات واضحة.
الإحداثيات في الصورة تطابق إحداثيات الشاشة الحقيقية (1:1 mapping).
الخطوط كل 100px — الأرقام تُظهر الـ X/Y الحقيقي للنقر.
force_mx / force_my: إذا مُرِّرا يُرسم الـ cursor في هذا الموضع مباشرةً
بدون استعلام xdotool — يُعالج مشكلة race condition بعد mouse_move.
"""
try:
from PIL import Image, ImageDraw
img, orig_w, orig_h = capture_screen_raw()
if img is None:
return {"data": "", "width": 1920, "height": 1080, "mouse_x": 0, "mouse_y": 0}
# استخدم الإحداثيات المُمرَّرة إذا وُجدت (بعد mouse_move/click مباشرة)
# وإلا اقرأ من X server
if force_mx is not None and force_my is not None:
mx, my = force_mx, force_my
else:
mx, my = _get_mouse_pos()
# resize للعرض فقط — الإحداثيات تبقى للشاشة الأصلية
sw = int(orig_w * scale)
sh = int(orig_h * scale)
grid_img = img.resize((sw, sh), Image.LANCZOS)
draw = ImageDraw.Draw(grid_img, "RGBA")
# شبكة كل 100px بإحداثيات الشاشة الحقيقية
step_orig = 100
step_scaled_x = int(step_orig * sw / orig_w)
step_scaled_y = int(step_orig * sh / orig_h)
grid_color = (255, 255, 255, 45) # خطوط بيضاء شفافة
label_color = (0, 255, 180, 200) # أرقام خضراء واضحة
# خطوط عمودية + أرقام X الحقيقية
x_sc = step_scaled_x
x_real = step_orig
while x_sc < sw:
draw.line([(x_sc, 0), (x_sc, sh)], fill=grid_color, width=1)
draw.rectangle([(x_sc + 1, 2), (x_sc + 32, 14)], fill=(0, 0, 0, 160))
draw.text((x_sc + 2, 3), str(x_real), fill=label_color)
x_sc += step_scaled_x
x_real += step_orig
# خطوط أفقية + أرقام Y الحقيقية
y_sc = step_scaled_y
y_real = step_orig
while y_sc < sh:
draw.line([(0, y_sc), (sw, y_sc)], fill=grid_color, width=1)
draw.rectangle([(2, y_sc + 1), (36, y_sc + 13)], fill=(0, 0, 0, 160))
draw.text((3, y_sc + 2), str(y_real), fill=label_color)
y_sc += step_scaled_y
y_real += step_orig
# موقع الماوس الحقيقي — دائرة حمراء
mouse_sx = int(mx * sw / orig_w)
mouse_sy = int(my * sh / orig_h)
r = 10
draw.ellipse([(mouse_sx-r, mouse_sy-r), (mouse_sx+r, mouse_sy+r)],
outline=(255, 60, 60, 240), width=2)
draw.line([(mouse_sx-16, mouse_sy), (mouse_sx+16, mouse_sy)],
fill=(255, 60, 60, 200), width=1)
draw.line([(mouse_sx, mouse_sy-16), (mouse_sx, mouse_sy+16)],
fill=(255, 60, 60, 200), width=1)
# شريط معلومات في الأعلى
final = grid_img.convert("RGB")
draw2 = ImageDraw.Draw(final)
draw2.rectangle([(0, 0), (sw, 18)], fill=(0, 0, 0))
hdr = (f"SCREEN {orig_w}x{orig_h} | MOUSE:({mx},{my}) | "
f"GRID=100px | CLICK COORDS = numbers on grid lines")
draw2.text((4, 2), hdr, fill=(0, 220, 160))
# شريط معلومات في الأسفل
draw2.rectangle([(0, sh-18), (sw, sh)], fill=(0, 0, 0))
draw2.text((4, sh-16),
f"USE REAL COORDS: e.g. click x=500 y=300 means the '500' vertical line + '300' horizontal line",
fill=(180, 180, 80))
buf = io.BytesIO()
final.save(buf, format="JPEG", quality=quality, optimize=True)
data = base64.b64encode(buf.getvalue()).decode()
return {
"data": data,
"width": orig_w,
"height": orig_h,
"mouse_x": mx,
"mouse_y": my,
}
except Exception as e:
print(f"[capture_grid] {e}")
plain = capture_screen(scale=scale, quality=quality)
mx, my = _get_mouse_pos()
w, h = _get_screen_size()
return {"data": plain, "width": w, "height": h, "mouse_x": mx, "mouse_y": my}
# ─── Command Runner ──────────────────────────────────
def run_raw_command(cmd: str, timeout: int = 60) -> dict:
env = {**os.environ, "DISPLAY": DISPLAY,
"PYTHONIOENCODING": "utf-8", "LANG": "en_US.UTF-8"}
try:
result = subprocess.run(cmd, shell=True, capture_output=True,
text=True, timeout=timeout, env=env,
executable="/bin/bash")
return {
"stdout": result.stdout[-15000:],
"stderr": result.stderr[-3000:],
"returncode": result.returncode,
}
except subprocess.TimeoutExpired:
return {"stdout": "", "stderr": f"⏱️ Timeout {timeout}s", "returncode": -1}
except Exception as e:
return {"stdout": "", "stderr": str(e), "returncode": -1}
def run_command_smart(cmd: str, timeout: int = 60) -> dict:
res = run_raw_command(cmd, timeout=timeout)
stdout = res["stdout"].strip()
if res["returncode"] == 0 and not _is_empty_result(stdout):
return res
is_curl_search = "curl" in cmd and any(x in cmd for x in [
"duckduckgo", "google", "bing", "wikipedia", "reddit",
"hackernews", "hn.algolia", "arxiv", "news", "search"
])
if not is_curl_search:
return res
query = _extract_query_from_cmd(cmd)
if not query or len(query) < 3:
words = [w for w in cmd.split() if len(w) > 3 and not w.startswith('-')
and 'http' not in w and 'python3' not in w and 'curl' not in w]
query = ' '.join(words[:5]) if words else ""
if not query:
return res
sources = _build_search_sources(query)
tried_names = []
all_results = []
for source in sources:
src_name = source["name"]
tried_names.append(f"🔍 {src_name}")
src_res = run_raw_command(source["cmd"], timeout=25)
src_out = src_res["stdout"].strip()
if not _is_empty_result(src_out):
combined_header = f"[مصدر بديل: {src_name}]\n{'='*50}\n"
all_results.append(src_out)
for source2 in sources:
if source2["name"] != src_name:
s2 = run_raw_command(source2["cmd"], timeout=20)
s2_out = s2["stdout"].strip()
if not _is_empty_result(s2_out):
all_results.append(f"\n[مصدر إضافي: {source2['name']}]\n{s2_out}")
break
final_out = combined_header + "\n\n".join(all_results)
return {
"stdout": final_out[:15000],
"stderr": "",
"returncode": 0,
"_sources_tried": tried_names,
"_fallback_used": src_name,
}
return {
"stdout": f"[لم تُرجع أي مصادر نتائج لـ: {query}]\nالمصادر: {', '.join(tried_names[:5])}",
"stderr": res["stderr"],
"returncode": -1,
"_sources_tried": tried_names,
}
def xdo(args: list, timeout=10) -> dict:
r = subprocess.run(["xdotool"] + args,
env={**os.environ, "DISPLAY": DISPLAY},
timeout=timeout, capture_output=True, text=True)
return {"rc": r.returncode, "out": r.stdout, "err": r.stderr}
def type_text_smart(text: str) -> dict:
"""
كتابة نص ذكية — تدعم العربية والإنجليزية:
- للنصوص الإنجليزية: xdotool type مباشرة
- للنصوص العربية أو المختلطة: xclip clipboard ثم ctrl+v
"""
has_arabic = bool(re.search(r'[\u0600-\u06FF]', text))
if has_arabic:
# طريقة Clipboard لضمان كتابة العربية بشكل صحيح
try:
proc = subprocess.Popen(
["xclip", "-selection", "clipboard"],
stdin=subprocess.PIPE,
env={**os.environ, "DISPLAY": DISPLAY}
)
proc.communicate(text.encode("utf-8"))
time.sleep(0.15)
# Focus + paste
xdo(["key", "--clearmodifiers", "ctrl+v"])
return {"success": True, "method": "clipboard+paste", "arabic": True}
except Exception as e:
# fallback: xdotool type
r = xdo(["type", "--clearmodifiers", "--delay", "50", text])
return {"success": r["rc"] == 0, "method": "xdotool_fallback", "error": r["err"]}
else:
# إنجليزية: xdotool type مباشرة
r = xdo(["type", "--clearmodifiers", "--delay", "30", text])
return {"success": r["rc"] == 0, "method": "xdotool_direct"}
def open_browser_smart(url: str = "") -> str:
"""يفتح المتصفح بطريقة ذكية مع fallback"""
browser_cmd = BROWSER
if not url:
url = "about:blank"
# جرب أولاً: BROWSER العادي
cmd = f"{browser_cmd} --new-window '{url}' &"
proc = subprocess.Popen(cmd, shell=True,
env={**os.environ, "DISPLAY": DISPLAY},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return f"{browser_cmd}{url}"
# ─── Simple Command Parser ─────────────────────────────
# يفهم أوامر بسيطة ويحولها لأفعال داخلية
def parse_simple_command(raw: str) -> dict | None:
"""
يُحلّل الأوامر البسيطة النصية وتُعيد action dict
أمثلة:
open pc → {"action":"screenshot"}
open firefox → {"action":"open_browser","url":""}
open chrome → {"action":"open_browser","url":""}
open browser → {"action":"open_browser","url":""}
open url https://google.com → {"action":"open_browser","url":"https://..."}
mouse go x500 y300 → {"action":"mouse_move","x":500,"y":300}
mouse go 500 300 → {"action":"mouse_move","x":500,"y":300}
click x500 y300 → {"action":"mouse_click","x":500,"y":300,"button":"left"}
click 500 300 → {"action":"mouse_click","x":500,"y":300,"button":"left"}
rclick x500 y300 → {"action":"mouse_click","x":500,"y":300,"button":"right"}
dclick x500 y300 → {"action":"mouse_click","x":500,"y":300,"double":true}
type hello world → {"action":"keyboard_type","text":"hello world"}
key enter → {"action":"keyboard_hotkey","keys":["Return"]}
key ctrl+c → {"action":"keyboard_hotkey","keys":["ctrl","c"]}
scroll up → {"action":"scroll","x":960,"y":540,"clicks":3}
scroll down → {"action":"scroll","x":960,"y":540,"clicks":-3}
screenshot → {"action":"screenshot"}
screen info → {"action":"screen_info"}
"""
s = raw.strip().lower()
# open pc / open computer / open screen
if re.match(r'^open\s+(pc|computer|screen|desktop)$', s):
return {"action": "screenshot"}
# open firefox / chrome / browser / browser url
m = re.match(r'^open\s+(firefox|chrome|chromium|browser|web|internet)(\s+(.+))?$', s)
if m:
url = (m.group(3) or "").strip()
if url and not url.startswith("http"):
url = "https://" + url
return {"action": "open_browser", "url": url or ""}
# open url <url>
m = re.match(r'^open\s+url\s+(\S+)$', s)
if m:
url = m.group(1)
if not url.startswith("http"):
url = "https://" + url
return {"action": "open_browser", "url": url}
# mouse go x500 y300 OR mouse go 500 300
m = re.match(r'^mouse\s+(?:go|move|to)\s+x?(\d+)\s+y?(\d+)$', s)
if m:
return {"action": "mouse_move", "x": int(m.group(1)), "y": int(m.group(2))}
# click x500 y300 OR click 500 300
m = re.match(r'^click\s+x?(\d+)\s+y?(\d+)$', s)
if m:
return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "left"}
# rclick / right click
m = re.match(r'^r(?:ight)?click\s+x?(\d+)\s+y?(\d+)$', s)
if m:
return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "right"}
# dclick / double click
m = re.match(r'^d(?:ouble)?click\s+x?(\d+)\s+y?(\d+)$', s)
if m:
return {"action": "mouse_click", "x": int(m.group(1)), "y": int(m.group(2)), "button": "left", "double": True}
# type <text> (يحتفظ بالحالة الأصلية)
m = re.match(r'^type\s+(.+)$', raw.strip(), re.IGNORECASE)
if m:
return {"action": "keyboard_type", "text": m.group(1)}
# key <keys>
m = re.match(r'^key\s+(.+)$', s)
if m:
k = m.group(1).strip()
key_map = {
"enter": "Return", "return": "Return", "esc": "Escape", "escape": "Escape",
"tab": "Tab", "space": "space", "backspace": "BackSpace", "delete": "Delete",
"up": "Up", "down": "Down", "left": "Left", "right": "Right",
"home": "Home", "end": "End", "pageup": "Prior", "pagedown": "Next",
"f1": "F1", "f2": "F2", "f3": "F3", "f4": "F4", "f5": "F5",
}
if '+' in k:
parts = [p.strip() for p in k.split('+')]
keys = [key_map.get(p, p) for p in parts]
else:
keys = [key_map.get(k, k)]
return {"action": "keyboard_hotkey", "keys": keys}
# scroll up / scroll down / scroll <n>
m = re.match(r'^scroll\s+(up|down|(\-?\d+))$', s)
if m:
w, h = _get_screen_size()
direction = m.group(1)
if direction == "up":
clicks = 4
elif direction == "down":
clicks = -4
else:
clicks = int(direction)
return {"action": "scroll", "x": w // 2, "y": h // 2, "clicks": clicks}
# screenshot
if s in ("screenshot", "screen", "ss", "snap"):
return {"action": "screenshot"}
# screen info
if re.match(r'^screen\s+info$', s):
return {"action": "screen_info"}
return None # لم يُعرف
async def broadcast(msg: dict):
txt = json.dumps(msg, ensure_ascii=False)
dead = []
for ws in active_connections:
try: await ws.send_text(txt)
except: dead.append(ws)
for ws in dead:
if ws in active_connections:
active_connections.remove(ws)
async def screen_stream_loop():
global stream_active
interval = 1.0 / max(1, stream_fps)
while stream_active and active_connections:
try:
frame = capture_screen(scale=stream_scale, quality=stream_quality)
if frame:
await broadcast({"type": "frame", "data": frame})
except: pass
await asyncio.sleep(interval)
stream_active = False
# ─── Action Handler ──────────────────────────────────
async def handle_action(ws: WebSocket, msg: dict):
action = msg.get("action", "")
data = msg.get("data", {})
async def send(obj):
await ws.send_text(json.dumps(obj, ensure_ascii=False))
async def auto_shot_grid(label="", delay=0.5,
force_mx: int | None = None,
force_my: int | None = None):
"""
screenshot مع grid.
force_mx/force_my: ارسم الـ cursor في هذا الموضع بالضبط (بعد mouse_move/click)
بدلاً من إعادة قراءته من X server — يُصلح race condition.
"""
await asyncio.sleep(delay)
result = capture_screen_with_grid(scale=0.65, quality=72,
force_mx=force_mx, force_my=force_my)
if result["data"]:
await send({
"type": "screenshot",
"data": result["data"],
"ts": int(time.time() * 1000),
"auto": True,
"label": label,
"screen_width": result["width"],
"screen_height": result["height"],
"mouse_x": result["mouse_x"],
"mouse_y": result["mouse_y"],
"has_grid": True,
})
return result["data"]
# ── screenshot ────────────────────────────────────
if action == "screenshot":
result = capture_screen_with_grid(scale=0.65, quality=75)
await send({
"type": "screenshot",
"data": result["data"],
"ts": int(time.time() * 1000),
"screen_width": result["width"],
"screen_height": result["height"],
"mouse_x": result["mouse_x"],
"mouse_y": result["mouse_y"],
"has_grid": True,
})
# ── simple_command — الأوامر البسيطة ──────────────
elif action == "simple_command":
raw_cmd = data.get("cmd", "").strip()
parsed = parse_simple_command(raw_cmd)
if parsed is None:
await send({
"type": "simple_command_result",
"ok": False,
"cmd": raw_cmd,
"error": f"أمر غير معروف: '{raw_cmd}'\nالأوامر المتاحة: open pc, open firefox, mouse go x y, click x y, rclick x y, type TEXT, key ENTER, scroll up/down, screenshot"
})
return
# نفّذ الأمر المُحلّل
await send({"type": "simple_command_result", "ok": True, "cmd": raw_cmd, "parsed": parsed})
# أعِد تشغيل نفس الـ handle_action مع الأمر المُحلّل
sub_msg = {"action": parsed["action"], "data": {k: v for k, v in parsed.items() if k != "action"}}
await handle_action(ws, sub_msg)
return
# ── terminal ──────────────────────────────────────
elif action == "terminal":
cmd = data.get("cmd", "")
timeout = int(data.get("timeout", 60))
if not cmd:
await send({"type": "terminal_result", "cmd": "", "stdout": "",
"stderr": "no command", "returncode": -1})
return
res = run_command_smart(cmd, timeout=timeout)
await send({
"type": "terminal_result",
"cmd": cmd,
"stdout": res["stdout"],
"stderr": res.get("stderr", ""),
"returncode": res["returncode"],
"fallback_used": res.get("_fallback_used", None),
"sources_tried": res.get("_sources_tried", []),
})
await auto_shot_grid(f"بعد: {cmd[:45]}", delay=0.4)
# ── mouse_move ────────────────────────────────────
elif action == "mouse_move":
x, y = int(data.get("x", 0)), int(data.get("y", 0))
# --sync ينتظر حتى يؤكد X server استلام الأمر
xdo(["mousemove", "--sync", str(x), str(y)])
await send({"type": "ack", "action": "mouse_move", "x": x, "y": y})
# مرّر الإحداثيات مباشرةً لتجنّب race condition مع X server
await auto_shot_grid(f"ماوس → ({x},{y})", delay=0.2,
force_mx=x, force_my=y)
# ── mouse_click ───────────────────────────────────
elif action == "mouse_click":
x, y = int(data.get("x", 0)), int(data.get("y", 0))
btn_num = {"left": "1", "middle": "2", "right": "3"}.get(data.get("button", "left"), "1")
xdo(["mousemove", "--sync", str(x), str(y)])
await asyncio.sleep(0.08)
if data.get("double"):
xdo(["click", "--repeat", "2", "--delay", "100", btn_num])
else:
xdo(["click", btn_num])
btn_name = {"1": "left", "2": "middle", "3": "right"}.get(btn_num, "left")
await send({"type": "ack", "action": "mouse_click", "x": x, "y": y, "button": btn_name})
# مرّر الإحداثيات مباشرةً لتجنّب race condition
await auto_shot_grid(f"نقر {btn_name} → ({x},{y})", delay=0.5,
force_mx=x, force_my=y)
# ── keyboard_type ─────────────────────────────────
elif action == "keyboard_type":
text = data.get("text", "")
if text:
result = type_text_smart(text)
await send({"type": "ack", "action": "keyboard_type",
"method": result["method"], "text_len": len(text)})
else:
await send({"type": "ack", "action": "keyboard_type"})
await auto_shot_grid("بعد الكتابة", delay=0.5)
# ── keyboard_hotkey ───────────────────────────────
elif action == "keyboard_hotkey":
keys = data.get("keys", [])
if keys:
xdo(["key", "--clearmodifiers", "+".join(keys)])
await send({"type": "ack", "action": "keyboard_hotkey", "keys": keys})
await auto_shot_grid("بعد الاختصار", delay=0.5)
# ── keyboard_press ────────────────────────────────
elif action == "keyboard_press":
key = data.get("key", "")
if key:
xdo(["key", "--clearmodifiers", key])
await send({"type": "ack", "action": "keyboard_press"})
await auto_shot_grid("بعد المفتاح", delay=0.4)
# ── clipboard_write ───────────────────────────────
elif action == "clipboard_write":
text = data.get("text", "")
try:
proc = subprocess.Popen(
["xclip", "-selection", "clipboard"],
stdin=subprocess.PIPE,
env={**os.environ, "DISPLAY": DISPLAY}
)
proc.communicate(text.encode("utf-8"))
await send({"type": "ack", "action": "clipboard_write", "length": len(text)})
except Exception as e:
await send({"type": "error", "action": "clipboard_write", "msg": str(e)})
# ── clipboard_read ────────────────────────────────
elif action == "clipboard_read":
res = run_raw_command("xclip -selection clipboard -o", timeout=5)
await send({"type": "clipboard_content", "text": res["stdout"]})
# ── scroll ────────────────────────────────────────
elif action == "scroll":
x, y = int(data.get("x", 0)), int(data.get("y", 0))
clicks = int(data.get("clicks", 3))
btn = "4" if clicks > 0 else "5"
xdo(["mousemove", str(x), str(y)])
for _ in range(abs(clicks)):
xdo(["click", btn])
await asyncio.sleep(0.03)
await send({"type": "ack", "action": "scroll", "clicks": clicks})
await auto_shot_grid("بعد التمرير", delay=0.4)
# ── open_app ──────────────────────────────────────
elif action == "open_app":
app_cmd = data.get("cmd", "")
if app_cmd:
fixed_cmd = app_cmd
# إصلاح firefox-esr
if "firefox-esr" in app_cmd:
esr_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True)
if esr_check.returncode != 0:
fixed_cmd = app_cmd.replace("firefox-esr", BROWSER)
subprocess.Popen(fixed_cmd, shell=True,
env={**os.environ, "DISPLAY": DISPLAY},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
await send({"type": "ack", "action": "open_app", "cmd": fixed_cmd})
wait_time = 7.0 if any(x in app_cmd for x in ["firefox", "chromium", "chrome"]) else 4.0
await asyncio.sleep(2.0)
await auto_shot_grid(f"فتح: {fixed_cmd[:40]} (2s)", delay=0)
await asyncio.sleep(wait_time - 2.0)
await auto_shot_grid(f"بعد تحميل: {fixed_cmd[:40]}", delay=0)
else:
await send({"type": "ack", "action": "open_app"})
# ── open_browser — فتح المتصفح ────────────────────
elif action == "open_browser":
url = data.get("url", "")
if not url:
url = "about:blank"
browser_result = open_browser_smart(url)
await send({"type": "ack", "action": "open_browser", "result": browser_result})
await asyncio.sleep(2.0)
await auto_shot_grid(f"فتح المتصفح: {url[:50]}", delay=0)
await asyncio.sleep(5.0)
await auto_shot_grid("بعد تحميل المتصفح", delay=0)
# ── mouse_drag ────────────────────────────────────
elif action == "mouse_drag":
x1, y1 = int(data.get("x1", 0)), int(data.get("y1", 0))
x2, y2 = int(data.get("x2", 0)), int(data.get("y2", 0))
xdo(["mousemove", str(x1), str(y1)])
xdo(["mousedown", "1"])
await asyncio.sleep(0.1)
xdo(["mousemove", str(x2), str(y2)])
await asyncio.sleep(0.1)
xdo(["mouseup", "1"])
await send({"type": "ack", "action": "mouse_drag"})
# cursor ينتهي عند x2,y2
await auto_shot_grid("بعد السحب", delay=0.4, force_mx=x2, force_my=y2)
# ── start_stream ──────────────────────────────────
elif action == "start_stream":
global stream_active, stream_fps, stream_quality, stream_scale
stream_fps = int(data.get("fps", 3))
stream_quality = int(data.get("quality", 60))
stream_scale = float(data.get("scale", 0.5))
if not stream_active:
stream_active = True
asyncio.create_task(screen_stream_loop())
await send({"type": "ack", "action": "start_stream"})
# ── stop_stream ───────────────────────────────────
elif action == "stop_stream":
stream_active = False
await send({"type": "ack", "action": "stop_stream"})
# ── screen_info ───────────────────────────────────
elif action == "screen_info":
w, h = _get_screen_size()
mx, my = _get_mouse_pos()
await send({
"type": "screen_info",
"width": w, "height": h,
"mouse_x": mx, "mouse_y": my,
"browser": BROWSER,
})
# ── paste ─────────────────────────────────────────
elif action == "paste":
text = data.get("text", "")
if text:
proc = subprocess.Popen(
["xclip", "-selection", "clipboard"],
stdin=subprocess.PIPE,
env={**os.environ, "DISPLAY": DISPLAY}
)
proc.communicate(text.encode("utf-8"))
await asyncio.sleep(0.1)
xdo(["key", "--clearmodifiers", "ctrl+v"])
await send({"type": "ack", "action": "paste"})
await auto_shot_grid("بعد اللصق", delay=0.5)
# ── unknown ───────────────────────────────────────
else:
await send({
"type": "error",
"msg": f"Unknown action: '{action}'. Available: screenshot, terminal, mouse_move, mouse_click, mouse_drag, keyboard_type, keyboard_hotkey, keyboard_press, clipboard_write, clipboard_read, scroll, open_app, open_browser, paste, simple_command, screen_info, start_stream, stop_stream"
})
# ─── WebSocket ───────────────────────────────────────
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
active_connections.append(ws)
w, h = _get_screen_size()
await ws.send_text(json.dumps({
"type": "connected",
"screen_width": w,
"screen_height": h,
"browser": BROWSER,
"msg": f"Z Computer Mode v7 — Smart Control | Browser: {BROWSER} | Screen: {w}x{h}",
"commands": [
"screenshot", "terminal {cmd}", "mouse_move {x,y}", "mouse_click {x,y,button}",
"keyboard_type {text}", "keyboard_hotkey {keys}", "scroll {x,y,clicks}",
"open_app {cmd}", "open_browser {url}", "simple_command {cmd}",
"Simple cmds: 'open firefox', 'mouse go x y', 'click x y', 'type TEXT', 'key enter'"
]
}, ensure_ascii=False))
# أرسل screenshot أولية مع grid
result = capture_screen_with_grid(scale=0.65, quality=72)
if result["data"]:
await ws.send_text(json.dumps({
"type": "screenshot",
"data": result["data"],
"ts": int(time.time() * 1000),
"label": "الشاشة الأولية",
"screen_width": result["width"],
"screen_height": result["height"],
"mouse_x": result["mouse_x"],
"mouse_y": result["mouse_y"],
"has_grid": True,
}, ensure_ascii=False))
try:
while True:
raw = await ws.receive_text()
await handle_action(ws, json.loads(raw))
except WebSocketDisconnect:
pass
except Exception as e:
print(f"[ws] {e}")
finally:
if ws in active_connections:
active_connections.remove(ws)
# ─── REST ────────────────────────────────────────────
@app.get("/screenshot")
async def rest_screenshot():
result = capture_screen_with_grid(scale=0.7, quality=73)
return JSONResponse({
"image": result["data"],
"ts": int(time.time() * 1000),
"screen_width": result["width"],
"screen_height": result["height"],
"mouse_x": result["mouse_x"],
"mouse_y": result["mouse_y"],
"has_grid": True,
})
@app.get("/screenshot/clean")
async def rest_screenshot_clean():
return JSONResponse({"image": capture_screen(0.75, 75), "ts": int(time.time() * 1000)})
@app.post("/terminal")
async def rest_terminal(body: dict):
return JSONResponse(run_command_smart(body.get("cmd", ""), body.get("timeout", 60)))
@app.post("/simple")
async def rest_simple(body: dict):
"""تنفيذ أمر بسيط عبر REST"""
raw = body.get("cmd", "").strip()
parsed = parse_simple_command(raw)
if not parsed:
return JSONResponse({"ok": False, "error": f"Unknown simple command: {raw}"}, status_code=400)
return JSONResponse({"ok": True, "parsed": parsed})
@app.get("/health")
async def health():
w, h = _get_screen_size()
mx, my = _get_mouse_pos()
browser_check = subprocess.run(["which", "firefox-esr"], capture_output=True, text=True)
return {
"status": "ok",
"version": "v7-smart-control",
"browser": BROWSER,
"firefox_esr_available": browser_check.returncode == 0,
"screen_width": w,
"screen_height": h,
"mouse_x": mx,
"mouse_y": my,
}
@app.get("/search/{query}")
async def quick_search(query: str):
res = run_command_smart(
f"curl -s --max-time 15 'https://api.duckduckgo.com/?q={urllib.parse.quote_plus(query)}&format=json&no_html=1'",
timeout=30
)
return JSONResponse(res)
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
uvicorn.run("app:app", host="0.0.0.0", port=port, log_level="info")