Pcmodcomputer / app.py
THEZYZSTUDIO's picture
Update app.py
fcd518f verified
Raw
History Blame Contribute Delete
22.3 kB
"""
THE Z AI — Computer Mode Server v5 — Smart Terminal
====================================================
المشكلة: curl يفشل أحياناً، الذكاء يستسلم
الحل في السيرفر:
- عندما يأتي أمر terminal → ننفذه
- إذا فشل أو جاء فارغ → نجرب بدائل تلقائياً
- نرجع النتائج الحقيقية دائماً
- دعم 8+ مصادر بحث تلقائية
"""
import asyncio
import base64
import io
import json
import os
import re
import subprocess
import time
import urllib.parse
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
DISPLAY = os.environ.get("DISPLAY", ":1")
os.environ["DISPLAY"] = DISPLAY
app = FastAPI(title="Z-Computer-Mode API v5")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"])
active_connections: list[WebSocket] = []
stream_active = False
stream_fps = 3
stream_quality = 60
stream_scale = 0.5
# ─── CURL HEADERS — يعمل مع جميع المواقع ───────────
CURL_HEADERS = (
'-H "User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" '
'-H "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" '
'-H "Accept-Language: en-US,en;q=0.9,ar;q=0.8" '
'-H "Accept-Encoding: gzip, deflate" '
'--compressed '
'--max-time 15 '
'-L ' # follow redirects
'-s '
)
# ─── مصادر البحث — 8 مصادر ──────────────────────────
def _build_search_sources(query: str) -> list[dict]:
"""يبني قائمة مصادر بحث بديلة للكيورى"""
q = urllib.parse.quote_plus(query)
q_raw = query.replace(' ', '+')
return [
{
"name": "DuckDuckGo Instant",
"cmd": f"curl -s 'https://api.duckduckgo.com/?q={q}&format=json&no_html=1&skip_disambig=1' | python3 -c \"import sys,json; d=json.load(sys.stdin); ans=d.get('AbstractText',''); rels=d.get('RelatedTopics',[]); print('ANSWER:',ans if ans else 'no direct answer'); [print('-',r.get('Text','')[:250]) for r in rels if isinstance(r,dict) and r.get('Text')]\""
},
{
"name": "Google News RSS",
"cmd": f"curl -sL {CURL_HEADERS} 'https://news.google.com/rss/search?q={q_raw}&hl=en&gl=US&ceid=US:en' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title><!\\[CDATA\\[(.*?)\\]\\]></title>|<title>(.*?)</title>',xml); descs=re.findall(r'<description>(.*?)</description>',xml); clean=lambda s:re.sub('<[^>]+>','',s); results=[(a or b).strip() for a,b in titles if (a or b).strip()][1:9]; [print(str(i+1)+'. '+t[:180]) for i,t in enumerate(results)]\""
},
{
"name": "Wikipedia English",
"cmd": f"curl -s 'https://en.wikipedia.org/api/rest_v1/page/summary/{q}' | python3 -c \"import sys,json; d=json.load(sys.stdin); print(d.get('title','')+'\\n'+d.get('extract','')[:1500])\" 2>/dev/null || curl -s 'https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={q}&format=json&srlimit=5' | python3 -c \"import sys,json; d=json.load(sys.stdin); [print(str(i+1)+'. '+r['title']+': '+re.sub('<[^>]+>','',r.get('snippet',''))[:200]) for i,r in enumerate(d.get('query',{{}}).get('search',[]))]\" 2>/dev/null"
},
{
"name": "DuckDuckGo HTML",
"cmd": f"curl -sL {CURL_HEADERS} 'https://html.duckduckgo.com/html/?q={q}' | python3 -c \"import sys,re; h=sys.stdin.read(); snippets=re.findall(r'class=.result__snippet[^>]*>(.*?)</a>',h,re.DOTALL); titles=re.findall(r'class=.result__title[^>]*>.*?<a[^>]*>(.*?)</a>',h,re.DOTALL); clean=lambda s:re.sub('<[^>]+>','',s).strip(); [print(str(i+1)+'. '+clean(titles[i] if i<len(titles) else '')+'\\n '+clean(s)[:200]) for i,s in enumerate(snippets[:7])]\""
},
{
"name": "Bing News RSS",
"cmd": f"curl -sL {CURL_HEADERS} 'https://www.bing.com/news/search?q={q}&format=RSS' | python3 -c \"import sys,re; xml=sys.stdin.read(); items=re.findall(r'<item>(.*?)</item>',xml,re.DOTALL); [print(str(i+1)+'. '+re.sub('<[^>]+>','',re.search(r'<title>(.*?)</title>',it).group(1) if re.search(r'<title>',it) else '')+'\\n '+re.sub('<[^>]+>','',re.search(r'<description>(.*?)</description>',it).group(1) if re.search(r'<description>',it) else '')[:150]) for i,it in enumerate(items[:6])]\""
},
{
"name": "Reddit Search",
"cmd": f"curl -sL {CURL_HEADERS} -H 'Accept: application/json' 'https://www.reddit.com/search.json?q={q}&sort=new&limit=8&type=link' | python3 -c \"import sys,json; d=json.load(sys.stdin); posts=d.get('data',{{}}).get('children',[]); [print(str(i+1)+'. '+p['data'].get('title','')+'\\n r/'+p['data'].get('subreddit','')+' Score:'+str(p['data'].get('score',''))+'\\n '+p['data'].get('selftext','')[:200]) for i,p in enumerate(posts[:6])]\""
},
{
"name": "HackerNews",
"cmd": f"curl -s 'https://hn.algolia.com/api/v1/search?query={q}&hitsPerPage=8&tags=story' | python3 -c \"import sys,json; d=json.load(sys.stdin); hits=d.get('hits',[]); [print(str(i+1)+'. '+h.get('title','')+'\\n Points:'+str(h.get('points','0'))+' | '+h.get('url','')[:80]) for i,h in enumerate(hits[:6])]\""
},
{
"name": "ArXiv (academic)",
"cmd": f"curl -s 'https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=5' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title>(.*?)</title>',xml)[1:]; summaries=re.findall(r'<summary>(.*?)</summary>',xml,re.DOTALL); [print(str(i+1)+'. '+t.strip()+'\\n '+summaries[i].strip()[:200] if i<len(summaries) else '') for i,t in enumerate(titles[:5])]\""
},
]
def _extract_query_from_cmd(cmd: str) -> str:
"""يستخرج كلمات البحث من أمر curl"""
patterns = [
r'[?&]q=([^&\'"]+)',
r'[?&]query=([^&\'"]+)',
r'[?&]search_query=all:([^&\'"]+)',
r"search\?q=([^&'\"\s]+)",
]
for p in patterns:
m = re.search(p, cmd)
if m:
q = m.group(1).replace('+', ' ').replace('%20', ' ')
return urllib.parse.unquote(q)
return ""
def _is_empty_result(stdout: str) -> bool:
"""يتحقق إذا كانت النتيجة فارغة أو بلا معلومات"""
if not stdout or len(stdout.strip()) < 10:
return True
empty_signals = [
"no direct answer",
"no results",
"0 results",
"no items",
"[]",
"{}",
"error",
"not found",
"answer: no",
]
s = stdout.strip().lower()
meaningful_lines = [l for l in s.split('\n') if l.strip() and not any(sig in l for sig in empty_signals)]
if len(meaningful_lines) < 1:
return True
return False
# ─── Screen Capture ─────────────────────────────────
def capture_screen(scale=0.6, quality=65) -> str:
try:
tmp = f"/tmp/zs_{int(time.time()*1000)}.png"
r = subprocess.run(["scrot", "-q", "90", tmp],
env={**os.environ, "DISPLAY": DISPLAY},
timeout=8, capture_output=True)
if r.returncode != 0 or not os.path.exists(tmp):
return _capture_xwd(scale, quality)
from PIL import Image
img = Image.open(tmp)
os.unlink(tmp)
if scale < 1.0:
img = img.resize((int(img.width*scale), int(img.height*scale)), Image.LANCZOS)
buf = io.BytesIO()
img.convert("RGB").save(buf, format="JPEG", quality=quality, optimize=True)
return base64.b64encode(buf.getvalue()).decode()
except Exception as e:
print(f"[capture] {e}")
return ""
def _capture_xwd(scale=0.6, quality=65) -> str:
try:
xwd = f"/tmp/zs_{int(time.time()*1000)}.xwd"
png = xwd.replace(".xwd", ".png")
subprocess.run(["xwd", "-root", "-silent", "-out", xwd],
env={**os.environ, "DISPLAY": DISPLAY}, timeout=8, capture_output=True)
subprocess.run(["convert", xwd, png], timeout=8, capture_output=True)
from PIL import Image
if not os.path.exists(png):
return ""
img = Image.open(png)
for f in [xwd, png]:
try: os.unlink(f)
except: pass
if scale < 1.0:
img = img.resize((int(img.width*scale), int(img.height*scale)), Image.LANCZOS)
buf = io.BytesIO()
img.convert("RGB").save(buf, format="JPEG", quality=quality, optimize=True)
return base64.b64encode(buf.getvalue()).decode()
except:
return ""
# ─── Command Runner ──────────────────────────────────
def run_raw_command(cmd: str, timeout: int = 45) -> dict:
"""ينفذ أمر bash خام"""
env = {**os.environ, "DISPLAY": DISPLAY,
"PYTHONIOENCODING": "utf-8", "LANG": "en_US.UTF-8"}
try:
result = subprocess.run(cmd, shell=True, capture_output=True,
text=True, timeout=timeout, env=env,
executable="/bin/bash")
return {
"stdout": result.stdout[-15000:],
"stderr": result.stderr[-3000:],
"returncode": result.returncode,
}
except subprocess.TimeoutExpired:
return {"stdout": "", "stderr": f"⏱️ Timeout {timeout}s", "returncode": -1}
except Exception as e:
return {"stdout": "", "stderr": str(e), "returncode": -1}
def run_command_smart(cmd: str, timeout: int = 45) -> dict:
"""
ينفذ الأمر — إذا كان curl بحث وفشل/فرغ → يجرب مصادر بديلة تلقائياً
يرجع النتيجة الأولى التي تحتوي معلومات حقيقية
"""
res = run_raw_command(cmd, timeout=timeout)
stdout = res["stdout"].strip()
if res["returncode"] == 0 and not _is_empty_result(stdout):
return res
is_curl_search = "curl" in cmd and any(x in cmd for x in [
"duckduckgo", "google", "bing", "wikipedia", "reddit",
"hackernews", "hn.algolia", "arxiv", "news", "search"
])
if not is_curl_search:
return res
query = _extract_query_from_cmd(cmd)
if not query or len(query) < 3:
words = [w for w in cmd.split() if len(w) > 3 and not w.startswith('-')
and 'http' not in w and 'python3' not in w and 'curl' not in w]
query = ' '.join(words[:5]) if words else ""
if not query:
return res
sources = _build_search_sources(query)
tried_names = []
all_results = []
for source in sources:
src_name = source["name"]
if any(x in cmd.lower() for x in [src_name.lower().split()[0]]):
tried_names.append(f"⏭️ {src_name} (مجرب)")
continue
tried_names.append(f"🔍 {src_name}")
src_res = run_raw_command(source["cmd"], timeout=20)
src_out = src_res["stdout"].strip()
if not _is_empty_result(src_out):
combined_header = (
f"[تعذر الحصول على نتائج من المصدر الأصلي]\n"
f"[البديل المستخدم: {src_name}]\n"
f"{'='*50}\n"
)
all_results.append(src_out)
for source2 in sources:
if source2["name"] != src_name:
s2 = run_raw_command(source2["cmd"], timeout=15)
s2_out = s2["stdout"].strip()
if not _is_empty_result(s2_out):
all_results.append(f"\n[مصدر إضافي: {source2['name']}]\n{s2_out}")
break
final_out = combined_header + "\n\n".join(all_results)
return {
"stdout": final_out[:15000],
"stderr": "",
"returncode": 0,
"_sources_tried": tried_names,
"_fallback_used": src_name,
}
return {
"stdout": f"[لم تُرجع أي مصادر نتائج لـ: {query}]\nالمصادر التي جربتها: {', '.join(tried_names[:5])}",
"stderr": res["stderr"],
"returncode": -1,
"_sources_tried": tried_names,
}
def xdo(args: list, timeout=10) -> dict:
r = subprocess.run(["xdotool"] + args,
env={**os.environ, "DISPLAY": DISPLAY},
timeout=timeout, capture_output=True, text=True)
return {"rc": r.returncode, "out": r.stdout, "err": r.stderr}
async def broadcast(msg: dict):
txt = json.dumps(msg, ensure_ascii=False)
dead = []
for ws in active_connections:
try: await ws.send_text(txt)
except: dead.append(ws)
for ws in dead:
if ws in active_connections:
active_connections.remove(ws)
async def screen_stream_loop():
global stream_active
interval = 1.0 / max(1, stream_fps)
while stream_active and active_connections:
try:
frame = capture_screen(scale=stream_scale, quality=stream_quality)
if frame:
await broadcast({"type": "frame", "data": frame})
except: pass
await asyncio.sleep(interval)
stream_active = False
# ─── Action Handler ──────────────────────────────────
async def handle_action(ws: WebSocket, msg: dict):
action = msg.get("action", "")
data = msg.get("data", {})
async def send(obj):
await ws.send_text(json.dumps(obj, ensure_ascii=False))
async def auto_shot(label="", delay=0.4):
await asyncio.sleep(delay)
frame = capture_screen(scale=0.7, quality=68)
if frame:
await send({"type": "screenshot", "data": frame,
"ts": int(time.time()*1000), "auto": True, "label": label})
return frame
if action == "screenshot":
frame = capture_screen(scale=0.8, quality=75)
await send({"type": "screenshot", "data": frame, "ts": int(time.time()*1000)})
elif action == "terminal":
cmd = data.get("cmd", "")
timeout = int(data.get("timeout", 45))
if not cmd:
await send({"type": "terminal_result", "cmd": "", "stdout": "",
"stderr": "no command", "returncode": -1})
return
res = run_command_smart(cmd, timeout=timeout)
await send({
"type": "terminal_result",
"cmd": cmd,
"stdout": res["stdout"],
"stderr": res.get("stderr", ""),
"returncode": res["returncode"],
"fallback_used": res.get("_fallback_used", None),
"sources_tried": res.get("_sources_tried", []),
})
await auto_shot(f"بعد: {cmd[:45]}", delay=0.3)
elif action == "mouse_move":
x, y = int(data.get("x", 0)), int(data.get("y", 0))
xdo(["mousemove", str(x), str(y)])
await send({"type": "ack", "action": "mouse_move"})
elif action == "mouse_click":
x, y = int(data.get("x", 0)), int(data.get("y", 0))
btn_num = {"left": "1", "middle": "2", "right": "3"}.get(data.get("button","left"), "1")
xdo(["mousemove", str(x), str(y)])
await asyncio.sleep(0.05)
if data.get("double"):
xdo(["click", "--repeat", "2", "--delay", "100", btn_num])
else:
xdo(["click", btn_num])
await send({"type": "ack", "action": "mouse_click"})
await auto_shot("بعد النقر", 0.4)
elif action == "keyboard_type":
text = data.get("text", "")
if text:
xdo(["type", "--clearmodifiers", "--delay", "30", text])
await send({"type": "ack", "action": "keyboard_type"})
await auto_shot("بعد الكتابة", 0.3)
elif action == "keyboard_hotkey":
keys = data.get("keys", [])
if keys:
xdo(["key", "--clearmodifiers", "+".join(keys)])
await send({"type": "ack", "action": "keyboard_hotkey"})
await auto_shot("بعد الاختصار", 0.4)
elif action == "keyboard_press":
key = data.get("key", "")
if key:
xdo(["key", "--clearmodifiers", key])
await send({"type": "ack", "action": "keyboard_press"})
elif action == "clipboard_write":
proc = subprocess.Popen(["xclip", "-selection", "clipboard"],
stdin=subprocess.PIPE,
env={**os.environ, "DISPLAY": DISPLAY})
proc.communicate(data.get("text","").encode())
await send({"type": "ack", "action": "clipboard_write"})
elif action == "clipboard_read":
res = run_raw_command("xclip -selection clipboard -o", timeout=5)
await send({"type": "clipboard_content", "text": res["stdout"]})
elif action == "scroll":
x, y = int(data.get("x", 0)), int(data.get("y", 0))
clicks = int(data.get("clicks", 3))
btn = "4" if clicks > 0 else "5"
xdo(["mousemove", str(x), str(y)])
for _ in range(abs(clicks)):
xdo(["click", btn])
await send({"type": "ack", "action": "scroll"})
elif action == "open_app":
app_cmd = data.get("cmd", "")
if app_cmd:
subprocess.Popen(app_cmd, shell=True,
env={**os.environ, "DISPLAY": DISPLAY},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
await send({"type": "ack", "action": "open_app", "cmd": app_cmd})
await auto_shot(f"بعد فتح: {app_cmd}", 3.0)
else:
await send({"type": "ack", "action": "open_app"})
elif action == "mouse_drag":
x1,y1 = int(data.get("x1",0)), int(data.get("y1",0))
x2,y2 = int(data.get("x2",0)), int(data.get("y2",0))
xdo(["mousemove", str(x1), str(y1)])
xdo(["mousedown", "1"])
await asyncio.sleep(0.1)
xdo(["mousemove", str(x2), str(y2)])
await asyncio.sleep(0.1)
xdo(["mouseup", "1"])
await send({"type": "ack", "action": "mouse_drag"})
elif action == "start_stream":
global stream_active, stream_fps, stream_quality, stream_scale
stream_fps = int(data.get("fps", 3))
stream_quality = int(data.get("quality", 60))
stream_scale = float(data.get("scale", 0.5))
if not stream_active:
stream_active = True
asyncio.create_task(screen_stream_loop())
await send({"type": "ack", "action": "start_stream"})
elif action == "stop_stream":
stream_active = False
await send({"type": "ack", "action": "stop_stream"})
elif action == "screen_info":
res = run_raw_command("xdotool getdisplaygeometry", timeout=5)
try: parts = res["stdout"].strip().split(); w,h = int(parts[0]),int(parts[1])
except: w,h = 1920,1080
res2 = run_raw_command("xdotool getmouselocation", timeout=5)
try:
mx = int(re.search(r"x:(\d+)", res2["stdout"]).group(1))
my = int(re.search(r"y:(\d+)", res2["stdout"]).group(1))
except: mx,my = 0,0
await send({"type":"screen_info","width":w,"height":h,"mouse_x":mx,"mouse_y":my})
else:
await send({"type": "error", "msg": f"Unknown action: {action}"})
# ─── WebSocket ───────────────────────────────────────
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
active_connections.append(ws)
res = run_raw_command("xdotool getdisplaygeometry", timeout=5)
try: parts = res["stdout"].strip().split(); w,h = int(parts[0]),int(parts[1])
except: w,h = 1920,1080
await ws.send_text(json.dumps({
"type": "connected", "screen_width": w, "screen_height": h,
"msg": "Z Computer Mode v5 — Smart Terminal + Multi-Source Search"
}, ensure_ascii=False))
frame = capture_screen(scale=0.75, quality=72)
if frame:
await ws.send_text(json.dumps({"type":"screenshot","data":frame,
"ts":int(time.time()*1000),"label":"الشاشة الأولية"}))
try:
while True:
raw = await ws.receive_text()
await handle_action(ws, json.loads(raw))
except WebSocketDisconnect: pass
except Exception as e: print(f"[ws] {e}")
finally:
if ws in active_connections:
active_connections.remove(ws)
# ─── REST ────────────────────────────────────────────
@app.get("/screenshot")
async def rest_screenshot():
return JSONResponse({"image": capture_screen(0.75,70), "ts": int(time.time()*1000)})
@app.post("/terminal")
async def rest_terminal(body: dict):
return JSONResponse(run_command_smart(body.get("cmd",""), body.get("timeout",45)))
@app.get("/health")
async def health():
display = run_raw_command("xdotool getdisplaygeometry", 4)
test = run_command_smart("curl -s 'https://api.duckduckgo.com/?q=test&format=json&no_html=1'", 15)
return {
"status": "ok",
"version": "v5-smart-terminal",
"display_working": display["returncode"] == 0,
"search_working": test["returncode"] == 0,
"fallback_used": test.get("_fallback_used"),
}
@app.get("/search/{query}")
async def quick_search(query: str):
res = run_command_smart(
f"curl -s 'https://api.duckduckgo.com/?q={urllib.parse.quote_plus(query)}&format=json&no_html=1'",
timeout=30
)
return JSONResponse(res)
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
uvicorn.run("app:app", host="0.0.0.0", port=port, log_level="info")