""" THE Z AI — Computer Mode Server v5 — Smart Terminal ==================================================== المشكلة: curl يفشل أحياناً، الذكاء يستسلم الحل في السيرفر: - عندما يأتي أمر terminal → ننفذه - إذا فشل أو جاء فارغ → نجرب بدائل تلقائياً - نرجع النتائج الحقيقية دائماً - دعم 8+ مصادر بحث تلقائية """ import asyncio import base64 import io import json import os import re import subprocess import time import urllib.parse from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import uvicorn DISPLAY = os.environ.get("DISPLAY", ":1") os.environ["DISPLAY"] = DISPLAY app = FastAPI(title="Z-Computer-Mode API v5") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) active_connections: list[WebSocket] = [] stream_active = False stream_fps = 3 stream_quality = 60 stream_scale = 0.5 # ─── CURL HEADERS — يعمل مع جميع المواقع ─────────── CURL_HEADERS = ( '-H "User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ' '-H "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" ' '-H "Accept-Language: en-US,en;q=0.9,ar;q=0.8" ' '-H "Accept-Encoding: gzip, deflate" ' '--compressed ' '--max-time 15 ' '-L ' # follow redirects '-s ' ) # ─── مصادر البحث — 8 مصادر ────────────────────────── def _build_search_sources(query: str) -> list[dict]: """يبني قائمة مصادر بحث بديلة للكيورى""" q = urllib.parse.quote_plus(query) q_raw = query.replace(' ', '+') return [ { "name": "DuckDuckGo Instant", "cmd": f"curl -s 'https://api.duckduckgo.com/?q={q}&format=json&no_html=1&skip_disambig=1' | python3 -c \"import sys,json; d=json.load(sys.stdin); ans=d.get('AbstractText',''); rels=d.get('RelatedTopics',[]); print('ANSWER:',ans if ans else 'no direct answer'); [print('-',r.get('Text','')[:250]) for r in rels if isinstance(r,dict) and r.get('Text')]\"" }, { "name": "Google News RSS", "cmd": f"curl -sL {CURL_HEADERS} 'https://news.google.com/rss/search?q={q_raw}&hl=en&gl=US&ceid=US:en' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<!\\[CDATA\\[(.*?)\\]\\]>|(.*?)',xml); descs=re.findall(r'(.*?)',xml); clean=lambda s:re.sub('<[^>]+>','',s); results=[(a or b).strip() for a,b in titles if (a or b).strip()][1:9]; [print(str(i+1)+'. '+t[:180]) for i,t in enumerate(results)]\"" }, { "name": "Wikipedia English", "cmd": f"curl -s 'https://en.wikipedia.org/api/rest_v1/page/summary/{q}' | python3 -c \"import sys,json; d=json.load(sys.stdin); print(d.get('title','')+'\\n'+d.get('extract','')[:1500])\" 2>/dev/null || curl -s 'https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={q}&format=json&srlimit=5' | python3 -c \"import sys,json; d=json.load(sys.stdin); [print(str(i+1)+'. '+r['title']+': '+re.sub('<[^>]+>','',r.get('snippet',''))[:200]) for i,r in enumerate(d.get('query',{{}}).get('search',[]))]\" 2>/dev/null" }, { "name": "DuckDuckGo HTML", "cmd": f"curl -sL {CURL_HEADERS} 'https://html.duckduckgo.com/html/?q={q}' | python3 -c \"import sys,re; h=sys.stdin.read(); snippets=re.findall(r'class=.result__snippet[^>]*>(.*?)',h,re.DOTALL); titles=re.findall(r'class=.result__title[^>]*>.*?]*>(.*?)',h,re.DOTALL); clean=lambda s:re.sub('<[^>]+>','',s).strip(); [print(str(i+1)+'. '+clean(titles[i] if i(.*?)',xml,re.DOTALL); [print(str(i+1)+'. '+re.sub('<[^>]+>','',re.search(r'(.*?)',it).group(1) if re.search(r'',it) else '')+'\\n '+re.sub('<[^>]+>','',re.search(r'<description>(.*?)</description>',it).group(1) if re.search(r'<description>',it) else '')[:150]) for i,it in enumerate(items[:6])]\"" }, { "name": "Reddit Search", "cmd": f"curl -sL {CURL_HEADERS} -H 'Accept: application/json' 'https://www.reddit.com/search.json?q={q}&sort=new&limit=8&type=link' | python3 -c \"import sys,json; d=json.load(sys.stdin); posts=d.get('data',{{}}).get('children',[]); [print(str(i+1)+'. '+p['data'].get('title','')+'\\n r/'+p['data'].get('subreddit','')+' Score:'+str(p['data'].get('score',''))+'\\n '+p['data'].get('selftext','')[:200]) for i,p in enumerate(posts[:6])]\"" }, { "name": "HackerNews", "cmd": f"curl -s 'https://hn.algolia.com/api/v1/search?query={q}&hitsPerPage=8&tags=story' | python3 -c \"import sys,json; d=json.load(sys.stdin); hits=d.get('hits',[]); [print(str(i+1)+'. '+h.get('title','')+'\\n Points:'+str(h.get('points','0'))+' | '+h.get('url','')[:80]) for i,h in enumerate(hits[:6])]\"" }, { "name": "ArXiv (academic)", "cmd": f"curl -s 'https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=5' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title>(.*?)',xml)[1:]; summaries=re.findall(r'(.*?)',xml,re.DOTALL); [print(str(i+1)+'. '+t.strip()+'\\n '+summaries[i].strip()[:200] if i str: """يستخرج كلمات البحث من أمر curl""" patterns = [ r'[?&]q=([^&\'"]+)', r'[?&]query=([^&\'"]+)', r'[?&]search_query=all:([^&\'"]+)', r"search\?q=([^&'\"\s]+)", ] for p in patterns: m = re.search(p, cmd) if m: q = m.group(1).replace('+', ' ').replace('%20', ' ') return urllib.parse.unquote(q) return "" def _is_empty_result(stdout: str) -> bool: """يتحقق إذا كانت النتيجة فارغة أو بلا معلومات""" if not stdout or len(stdout.strip()) < 10: return True empty_signals = [ "no direct answer", "no results", "0 results", "no items", "[]", "{}", "error", "not found", "answer: no", ] s = stdout.strip().lower() meaningful_lines = [l for l in s.split('\n') if l.strip() and not any(sig in l for sig in empty_signals)] if len(meaningful_lines) < 1: return True return False # ─── Screen Capture ───────────────────────────────── def capture_screen(scale=0.6, quality=65) -> str: try: tmp = f"/tmp/zs_{int(time.time()*1000)}.png" r = subprocess.run(["scrot", "-q", "90", tmp], env={**os.environ, "DISPLAY": DISPLAY}, timeout=8, capture_output=True) if r.returncode != 0 or not os.path.exists(tmp): return _capture_xwd(scale, quality) from PIL import Image img = Image.open(tmp) os.unlink(tmp) if scale < 1.0: img = img.resize((int(img.width*scale), int(img.height*scale)), Image.LANCZOS) buf = io.BytesIO() img.convert("RGB").save(buf, format="JPEG", quality=quality, optimize=True) return base64.b64encode(buf.getvalue()).decode() except Exception as e: print(f"[capture] {e}") return "" def _capture_xwd(scale=0.6, quality=65) -> str: try: xwd = f"/tmp/zs_{int(time.time()*1000)}.xwd" png = xwd.replace(".xwd", ".png") subprocess.run(["xwd", "-root", "-silent", "-out", xwd], env={**os.environ, "DISPLAY": DISPLAY}, timeout=8, capture_output=True) subprocess.run(["convert", xwd, png], timeout=8, capture_output=True) from PIL import Image if not os.path.exists(png): return "" img = Image.open(png) for f in [xwd, png]: try: os.unlink(f) except: pass if scale < 1.0: img = img.resize((int(img.width*scale), int(img.height*scale)), Image.LANCZOS) buf = io.BytesIO() img.convert("RGB").save(buf, format="JPEG", quality=quality, optimize=True) return base64.b64encode(buf.getvalue()).decode() except: return "" # ─── Command Runner ────────────────────────────────── def run_raw_command(cmd: str, timeout: int = 45) -> dict: """ينفذ أمر bash خام""" env = {**os.environ, "DISPLAY": DISPLAY, "PYTHONIOENCODING": "utf-8", "LANG": "en_US.UTF-8"} try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout, env=env, executable="/bin/bash") return { "stdout": result.stdout[-15000:], "stderr": result.stderr[-3000:], "returncode": result.returncode, } except subprocess.TimeoutExpired: return {"stdout": "", "stderr": f"⏱️ Timeout {timeout}s", "returncode": -1} except Exception as e: return {"stdout": "", "stderr": str(e), "returncode": -1} def run_command_smart(cmd: str, timeout: int = 45) -> dict: """ ينفذ الأمر — إذا كان curl بحث وفشل/فرغ → يجرب مصادر بديلة تلقائياً يرجع النتيجة الأولى التي تحتوي معلومات حقيقية """ res = run_raw_command(cmd, timeout=timeout) stdout = res["stdout"].strip() if res["returncode"] == 0 and not _is_empty_result(stdout): return res is_curl_search = "curl" in cmd and any(x in cmd for x in [ "duckduckgo", "google", "bing", "wikipedia", "reddit", "hackernews", "hn.algolia", "arxiv", "news", "search" ]) if not is_curl_search: return res query = _extract_query_from_cmd(cmd) if not query or len(query) < 3: words = [w for w in cmd.split() if len(w) > 3 and not w.startswith('-') and 'http' not in w and 'python3' not in w and 'curl' not in w] query = ' '.join(words[:5]) if words else "" if not query: return res sources = _build_search_sources(query) tried_names = [] all_results = [] for source in sources: src_name = source["name"] if any(x in cmd.lower() for x in [src_name.lower().split()[0]]): tried_names.append(f"⏭️ {src_name} (مجرب)") continue tried_names.append(f"🔍 {src_name}") src_res = run_raw_command(source["cmd"], timeout=20) src_out = src_res["stdout"].strip() if not _is_empty_result(src_out): combined_header = ( f"[تعذر الحصول على نتائج من المصدر الأصلي]\n" f"[البديل المستخدم: {src_name}]\n" f"{'='*50}\n" ) all_results.append(src_out) for source2 in sources: if source2["name"] != src_name: s2 = run_raw_command(source2["cmd"], timeout=15) s2_out = s2["stdout"].strip() if not _is_empty_result(s2_out): all_results.append(f"\n[مصدر إضافي: {source2['name']}]\n{s2_out}") break final_out = combined_header + "\n\n".join(all_results) return { "stdout": final_out[:15000], "stderr": "", "returncode": 0, "_sources_tried": tried_names, "_fallback_used": src_name, } return { "stdout": f"[لم تُرجع أي مصادر نتائج لـ: {query}]\nالمصادر التي جربتها: {', '.join(tried_names[:5])}", "stderr": res["stderr"], "returncode": -1, "_sources_tried": tried_names, } def xdo(args: list, timeout=10) -> dict: r = subprocess.run(["xdotool"] + args, env={**os.environ, "DISPLAY": DISPLAY}, timeout=timeout, capture_output=True, text=True) return {"rc": r.returncode, "out": r.stdout, "err": r.stderr} async def broadcast(msg: dict): txt = json.dumps(msg, ensure_ascii=False) dead = [] for ws in active_connections: try: await ws.send_text(txt) except: dead.append(ws) for ws in dead: if ws in active_connections: active_connections.remove(ws) async def screen_stream_loop(): global stream_active interval = 1.0 / max(1, stream_fps) while stream_active and active_connections: try: frame = capture_screen(scale=stream_scale, quality=stream_quality) if frame: await broadcast({"type": "frame", "data": frame}) except: pass await asyncio.sleep(interval) stream_active = False # ─── Action Handler ────────────────────────────────── async def handle_action(ws: WebSocket, msg: dict): action = msg.get("action", "") data = msg.get("data", {}) async def send(obj): await ws.send_text(json.dumps(obj, ensure_ascii=False)) async def auto_shot(label="", delay=0.4): await asyncio.sleep(delay) frame = capture_screen(scale=0.7, quality=68) if frame: await send({"type": "screenshot", "data": frame, "ts": int(time.time()*1000), "auto": True, "label": label}) return frame if action == "screenshot": frame = capture_screen(scale=0.8, quality=75) await send({"type": "screenshot", "data": frame, "ts": int(time.time()*1000)}) elif action == "terminal": cmd = data.get("cmd", "") timeout = int(data.get("timeout", 45)) if not cmd: await send({"type": "terminal_result", "cmd": "", "stdout": "", "stderr": "no command", "returncode": -1}) return res = run_command_smart(cmd, timeout=timeout) await send({ "type": "terminal_result", "cmd": cmd, "stdout": res["stdout"], "stderr": res.get("stderr", ""), "returncode": res["returncode"], "fallback_used": res.get("_fallback_used", None), "sources_tried": res.get("_sources_tried", []), }) await auto_shot(f"بعد: {cmd[:45]}", delay=0.3) elif action == "mouse_move": x, y = int(data.get("x", 0)), int(data.get("y", 0)) xdo(["mousemove", str(x), str(y)]) await send({"type": "ack", "action": "mouse_move"}) elif action == "mouse_click": x, y = int(data.get("x", 0)), int(data.get("y", 0)) btn_num = {"left": "1", "middle": "2", "right": "3"}.get(data.get("button","left"), "1") xdo(["mousemove", str(x), str(y)]) await asyncio.sleep(0.05) if data.get("double"): xdo(["click", "--repeat", "2", "--delay", "100", btn_num]) else: xdo(["click", btn_num]) await send({"type": "ack", "action": "mouse_click"}) await auto_shot("بعد النقر", 0.4) elif action == "keyboard_type": text = data.get("text", "") if text: xdo(["type", "--clearmodifiers", "--delay", "30", text]) await send({"type": "ack", "action": "keyboard_type"}) await auto_shot("بعد الكتابة", 0.3) elif action == "keyboard_hotkey": keys = data.get("keys", []) if keys: xdo(["key", "--clearmodifiers", "+".join(keys)]) await send({"type": "ack", "action": "keyboard_hotkey"}) await auto_shot("بعد الاختصار", 0.4) elif action == "keyboard_press": key = data.get("key", "") if key: xdo(["key", "--clearmodifiers", key]) await send({"type": "ack", "action": "keyboard_press"}) elif action == "clipboard_write": proc = subprocess.Popen(["xclip", "-selection", "clipboard"], stdin=subprocess.PIPE, env={**os.environ, "DISPLAY": DISPLAY}) proc.communicate(data.get("text","").encode()) await send({"type": "ack", "action": "clipboard_write"}) elif action == "clipboard_read": res = run_raw_command("xclip -selection clipboard -o", timeout=5) await send({"type": "clipboard_content", "text": res["stdout"]}) elif action == "scroll": x, y = int(data.get("x", 0)), int(data.get("y", 0)) clicks = int(data.get("clicks", 3)) btn = "4" if clicks > 0 else "5" xdo(["mousemove", str(x), str(y)]) for _ in range(abs(clicks)): xdo(["click", btn]) await send({"type": "ack", "action": "scroll"}) elif action == "open_app": app_cmd = data.get("cmd", "") if app_cmd: subprocess.Popen(app_cmd, shell=True, env={**os.environ, "DISPLAY": DISPLAY}, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) await send({"type": "ack", "action": "open_app", "cmd": app_cmd}) await auto_shot(f"بعد فتح: {app_cmd}", 3.0) else: await send({"type": "ack", "action": "open_app"}) elif action == "mouse_drag": x1,y1 = int(data.get("x1",0)), int(data.get("y1",0)) x2,y2 = int(data.get("x2",0)), int(data.get("y2",0)) xdo(["mousemove", str(x1), str(y1)]) xdo(["mousedown", "1"]) await asyncio.sleep(0.1) xdo(["mousemove", str(x2), str(y2)]) await asyncio.sleep(0.1) xdo(["mouseup", "1"]) await send({"type": "ack", "action": "mouse_drag"}) elif action == "start_stream": global stream_active, stream_fps, stream_quality, stream_scale stream_fps = int(data.get("fps", 3)) stream_quality = int(data.get("quality", 60)) stream_scale = float(data.get("scale", 0.5)) if not stream_active: stream_active = True asyncio.create_task(screen_stream_loop()) await send({"type": "ack", "action": "start_stream"}) elif action == "stop_stream": stream_active = False await send({"type": "ack", "action": "stop_stream"}) elif action == "screen_info": res = run_raw_command("xdotool getdisplaygeometry", timeout=5) try: parts = res["stdout"].strip().split(); w,h = int(parts[0]),int(parts[1]) except: w,h = 1920,1080 res2 = run_raw_command("xdotool getmouselocation", timeout=5) try: mx = int(re.search(r"x:(\d+)", res2["stdout"]).group(1)) my = int(re.search(r"y:(\d+)", res2["stdout"]).group(1)) except: mx,my = 0,0 await send({"type":"screen_info","width":w,"height":h,"mouse_x":mx,"mouse_y":my}) else: await send({"type": "error", "msg": f"Unknown action: {action}"}) # ─── WebSocket ─────────────────────────────────────── @app.websocket("/ws") async def websocket_endpoint(ws: WebSocket): await ws.accept() active_connections.append(ws) res = run_raw_command("xdotool getdisplaygeometry", timeout=5) try: parts = res["stdout"].strip().split(); w,h = int(parts[0]),int(parts[1]) except: w,h = 1920,1080 await ws.send_text(json.dumps({ "type": "connected", "screen_width": w, "screen_height": h, "msg": "Z Computer Mode v5 — Smart Terminal + Multi-Source Search" }, ensure_ascii=False)) frame = capture_screen(scale=0.75, quality=72) if frame: await ws.send_text(json.dumps({"type":"screenshot","data":frame, "ts":int(time.time()*1000),"label":"الشاشة الأولية"})) try: while True: raw = await ws.receive_text() await handle_action(ws, json.loads(raw)) except WebSocketDisconnect: pass except Exception as e: print(f"[ws] {e}") finally: if ws in active_connections: active_connections.remove(ws) # ─── REST ──────────────────────────────────────────── @app.get("/screenshot") async def rest_screenshot(): return JSONResponse({"image": capture_screen(0.75,70), "ts": int(time.time()*1000)}) @app.post("/terminal") async def rest_terminal(body: dict): return JSONResponse(run_command_smart(body.get("cmd",""), body.get("timeout",45))) @app.get("/health") async def health(): display = run_raw_command("xdotool getdisplaygeometry", 4) test = run_command_smart("curl -s 'https://api.duckduckgo.com/?q=test&format=json&no_html=1'", 15) return { "status": "ok", "version": "v5-smart-terminal", "display_working": display["returncode"] == 0, "search_working": test["returncode"] == 0, "fallback_used": test.get("_fallback_used"), } @app.get("/search/{query}") async def quick_search(query: str): res = run_command_smart( f"curl -s 'https://api.duckduckgo.com/?q={urllib.parse.quote_plus(query)}&format=json&no_html=1'", timeout=30 ) return JSONResponse(res) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) uvicorn.run("app:app", host="0.0.0.0", port=port, log_level="info")