Spaces:
Paused
Paused
| """ | |
| THE Z AI — Computer Mode Server v5 — Smart Terminal | |
| ==================================================== | |
| المشكلة: curl يفشل أحياناً، الذكاء يستسلم | |
| الحل في السيرفر: | |
| - عندما يأتي أمر terminal → ننفذه | |
| - إذا فشل أو جاء فارغ → نجرب بدائل تلقائياً | |
| - نرجع النتائج الحقيقية دائماً | |
| - دعم 8+ مصادر بحث تلقائية | |
| """ | |
| import asyncio | |
| import base64 | |
| import io | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import time | |
| import urllib.parse | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| DISPLAY = os.environ.get("DISPLAY", ":1") | |
| os.environ["DISPLAY"] = DISPLAY | |
| app = FastAPI(title="Z-Computer-Mode API v5") | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, | |
| allow_methods=["*"], allow_headers=["*"]) | |
| active_connections: list[WebSocket] = [] | |
| stream_active = False | |
| stream_fps = 3 | |
| stream_quality = 60 | |
| stream_scale = 0.5 | |
| # ─── CURL HEADERS — يعمل مع جميع المواقع ─────────── | |
| CURL_HEADERS = ( | |
| '-H "User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' | |
| '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ' | |
| '-H "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" ' | |
| '-H "Accept-Language: en-US,en;q=0.9,ar;q=0.8" ' | |
| '-H "Accept-Encoding: gzip, deflate" ' | |
| '--compressed ' | |
| '--max-time 15 ' | |
| '-L ' # follow redirects | |
| '-s ' | |
| ) | |
| # ─── مصادر البحث — 8 مصادر ────────────────────────── | |
| def _build_search_sources(query: str) -> list[dict]: | |
| """يبني قائمة مصادر بحث بديلة للكيورى""" | |
| q = urllib.parse.quote_plus(query) | |
| q_raw = query.replace(' ', '+') | |
| return [ | |
| { | |
| "name": "DuckDuckGo Instant", | |
| "cmd": f"curl -s 'https://api.duckduckgo.com/?q={q}&format=json&no_html=1&skip_disambig=1' | python3 -c \"import sys,json; d=json.load(sys.stdin); ans=d.get('AbstractText',''); rels=d.get('RelatedTopics',[]); print('ANSWER:',ans if ans else 'no direct answer'); [print('-',r.get('Text','')[:250]) for r in rels if isinstance(r,dict) and r.get('Text')]\"" | |
| }, | |
| { | |
| "name": "Google News RSS", | |
| "cmd": f"curl -sL {CURL_HEADERS} 'https://news.google.com/rss/search?q={q_raw}&hl=en&gl=US&ceid=US:en' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title><!\\[CDATA\\[(.*?)\\]\\]></title>|<title>(.*?)</title>',xml); descs=re.findall(r'<description>(.*?)</description>',xml); clean=lambda s:re.sub('<[^>]+>','',s); results=[(a or b).strip() for a,b in titles if (a or b).strip()][1:9]; [print(str(i+1)+'. '+t[:180]) for i,t in enumerate(results)]\"" | |
| }, | |
| { | |
| "name": "Wikipedia English", | |
| "cmd": f"curl -s 'https://en.wikipedia.org/api/rest_v1/page/summary/{q}' | python3 -c \"import sys,json; d=json.load(sys.stdin); print(d.get('title','')+'\\n'+d.get('extract','')[:1500])\" 2>/dev/null || curl -s 'https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={q}&format=json&srlimit=5' | python3 -c \"import sys,json; d=json.load(sys.stdin); [print(str(i+1)+'. '+r['title']+': '+re.sub('<[^>]+>','',r.get('snippet',''))[:200]) for i,r in enumerate(d.get('query',{{}}).get('search',[]))]\" 2>/dev/null" | |
| }, | |
| { | |
| "name": "DuckDuckGo HTML", | |
| "cmd": f"curl -sL {CURL_HEADERS} 'https://html.duckduckgo.com/html/?q={q}' | python3 -c \"import sys,re; h=sys.stdin.read(); snippets=re.findall(r'class=.result__snippet[^>]*>(.*?)</a>',h,re.DOTALL); titles=re.findall(r'class=.result__title[^>]*>.*?<a[^>]*>(.*?)</a>',h,re.DOTALL); clean=lambda s:re.sub('<[^>]+>','',s).strip(); [print(str(i+1)+'. '+clean(titles[i] if i<len(titles) else '')+'\\n '+clean(s)[:200]) for i,s in enumerate(snippets[:7])]\"" | |
| }, | |
| { | |
| "name": "Bing News RSS", | |
| "cmd": f"curl -sL {CURL_HEADERS} 'https://www.bing.com/news/search?q={q}&format=RSS' | python3 -c \"import sys,re; xml=sys.stdin.read(); items=re.findall(r'<item>(.*?)</item>',xml,re.DOTALL); [print(str(i+1)+'. '+re.sub('<[^>]+>','',re.search(r'<title>(.*?)</title>',it).group(1) if re.search(r'<title>',it) else '')+'\\n '+re.sub('<[^>]+>','',re.search(r'<description>(.*?)</description>',it).group(1) if re.search(r'<description>',it) else '')[:150]) for i,it in enumerate(items[:6])]\"" | |
| }, | |
| { | |
| "name": "Reddit Search", | |
| "cmd": f"curl -sL {CURL_HEADERS} -H 'Accept: application/json' 'https://www.reddit.com/search.json?q={q}&sort=new&limit=8&type=link' | python3 -c \"import sys,json; d=json.load(sys.stdin); posts=d.get('data',{{}}).get('children',[]); [print(str(i+1)+'. '+p['data'].get('title','')+'\\n r/'+p['data'].get('subreddit','')+' Score:'+str(p['data'].get('score',''))+'\\n '+p['data'].get('selftext','')[:200]) for i,p in enumerate(posts[:6])]\"" | |
| }, | |
| { | |
| "name": "HackerNews", | |
| "cmd": f"curl -s 'https://hn.algolia.com/api/v1/search?query={q}&hitsPerPage=8&tags=story' | python3 -c \"import sys,json; d=json.load(sys.stdin); hits=d.get('hits',[]); [print(str(i+1)+'. '+h.get('title','')+'\\n Points:'+str(h.get('points','0'))+' | '+h.get('url','')[:80]) for i,h in enumerate(hits[:6])]\"" | |
| }, | |
| { | |
| "name": "ArXiv (academic)", | |
| "cmd": f"curl -s 'https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=5' | python3 -c \"import sys,re; xml=sys.stdin.read(); titles=re.findall(r'<title>(.*?)</title>',xml)[1:]; summaries=re.findall(r'<summary>(.*?)</summary>',xml,re.DOTALL); [print(str(i+1)+'. '+t.strip()+'\\n '+summaries[i].strip()[:200] if i<len(summaries) else '') for i,t in enumerate(titles[:5])]\"" | |
| }, | |
| ] | |
| def _extract_query_from_cmd(cmd: str) -> str: | |
| """يستخرج كلمات البحث من أمر curl""" | |
| patterns = [ | |
| r'[?&]q=([^&\'"]+)', | |
| r'[?&]query=([^&\'"]+)', | |
| r'[?&]search_query=all:([^&\'"]+)', | |
| r"search\?q=([^&'\"\s]+)", | |
| ] | |
| for p in patterns: | |
| m = re.search(p, cmd) | |
| if m: | |
| q = m.group(1).replace('+', ' ').replace('%20', ' ') | |
| return urllib.parse.unquote(q) | |
| return "" | |
| def _is_empty_result(stdout: str) -> bool: | |
| """يتحقق إذا كانت النتيجة فارغة أو بلا معلومات""" | |
| if not stdout or len(stdout.strip()) < 10: | |
| return True | |
| empty_signals = [ | |
| "no direct answer", | |
| "no results", | |
| "0 results", | |
| "no items", | |
| "[]", | |
| "{}", | |
| "error", | |
| "not found", | |
| "answer: no", | |
| ] | |
| s = stdout.strip().lower() | |
| meaningful_lines = [l for l in s.split('\n') if l.strip() and not any(sig in l for sig in empty_signals)] | |
| if len(meaningful_lines) < 1: | |
| return True | |
| return False | |
| # ─── Screen Capture ───────────────────────────────── | |
| def capture_screen(scale=0.6, quality=65) -> str: | |
| try: | |
| tmp = f"/tmp/zs_{int(time.time()*1000)}.png" | |
| r = subprocess.run(["scrot", "-q", "90", tmp], | |
| env={**os.environ, "DISPLAY": DISPLAY}, | |
| timeout=8, capture_output=True) | |
| if r.returncode != 0 or not os.path.exists(tmp): | |
| return _capture_xwd(scale, quality) | |
| from PIL import Image | |
| img = Image.open(tmp) | |
| os.unlink(tmp) | |
| if scale < 1.0: | |
| img = img.resize((int(img.width*scale), int(img.height*scale)), Image.LANCZOS) | |
| buf = io.BytesIO() | |
| img.convert("RGB").save(buf, format="JPEG", quality=quality, optimize=True) | |
| return base64.b64encode(buf.getvalue()).decode() | |
| except Exception as e: | |
| print(f"[capture] {e}") | |
| return "" | |
| def _capture_xwd(scale=0.6, quality=65) -> str: | |
| try: | |
| xwd = f"/tmp/zs_{int(time.time()*1000)}.xwd" | |
| png = xwd.replace(".xwd", ".png") | |
| subprocess.run(["xwd", "-root", "-silent", "-out", xwd], | |
| env={**os.environ, "DISPLAY": DISPLAY}, timeout=8, capture_output=True) | |
| subprocess.run(["convert", xwd, png], timeout=8, capture_output=True) | |
| from PIL import Image | |
| if not os.path.exists(png): | |
| return "" | |
| img = Image.open(png) | |
| for f in [xwd, png]: | |
| try: os.unlink(f) | |
| except: pass | |
| if scale < 1.0: | |
| img = img.resize((int(img.width*scale), int(img.height*scale)), Image.LANCZOS) | |
| buf = io.BytesIO() | |
| img.convert("RGB").save(buf, format="JPEG", quality=quality, optimize=True) | |
| return base64.b64encode(buf.getvalue()).decode() | |
| except: | |
| return "" | |
| # ─── Command Runner ────────────────────────────────── | |
| def run_raw_command(cmd: str, timeout: int = 45) -> dict: | |
| """ينفذ أمر bash خام""" | |
| env = {**os.environ, "DISPLAY": DISPLAY, | |
| "PYTHONIOENCODING": "utf-8", "LANG": "en_US.UTF-8"} | |
| try: | |
| result = subprocess.run(cmd, shell=True, capture_output=True, | |
| text=True, timeout=timeout, env=env, | |
| executable="/bin/bash") | |
| return { | |
| "stdout": result.stdout[-15000:], | |
| "stderr": result.stderr[-3000:], | |
| "returncode": result.returncode, | |
| } | |
| except subprocess.TimeoutExpired: | |
| return {"stdout": "", "stderr": f"⏱️ Timeout {timeout}s", "returncode": -1} | |
| except Exception as e: | |
| return {"stdout": "", "stderr": str(e), "returncode": -1} | |
| def run_command_smart(cmd: str, timeout: int = 45) -> dict: | |
| """ | |
| ينفذ الأمر — إذا كان curl بحث وفشل/فرغ → يجرب مصادر بديلة تلقائياً | |
| يرجع النتيجة الأولى التي تحتوي معلومات حقيقية | |
| """ | |
| res = run_raw_command(cmd, timeout=timeout) | |
| stdout = res["stdout"].strip() | |
| if res["returncode"] == 0 and not _is_empty_result(stdout): | |
| return res | |
| is_curl_search = "curl" in cmd and any(x in cmd for x in [ | |
| "duckduckgo", "google", "bing", "wikipedia", "reddit", | |
| "hackernews", "hn.algolia", "arxiv", "news", "search" | |
| ]) | |
| if not is_curl_search: | |
| return res | |
| query = _extract_query_from_cmd(cmd) | |
| if not query or len(query) < 3: | |
| words = [w for w in cmd.split() if len(w) > 3 and not w.startswith('-') | |
| and 'http' not in w and 'python3' not in w and 'curl' not in w] | |
| query = ' '.join(words[:5]) if words else "" | |
| if not query: | |
| return res | |
| sources = _build_search_sources(query) | |
| tried_names = [] | |
| all_results = [] | |
| for source in sources: | |
| src_name = source["name"] | |
| if any(x in cmd.lower() for x in [src_name.lower().split()[0]]): | |
| tried_names.append(f"⏭️ {src_name} (مجرب)") | |
| continue | |
| tried_names.append(f"🔍 {src_name}") | |
| src_res = run_raw_command(source["cmd"], timeout=20) | |
| src_out = src_res["stdout"].strip() | |
| if not _is_empty_result(src_out): | |
| combined_header = ( | |
| f"[تعذر الحصول على نتائج من المصدر الأصلي]\n" | |
| f"[البديل المستخدم: {src_name}]\n" | |
| f"{'='*50}\n" | |
| ) | |
| all_results.append(src_out) | |
| for source2 in sources: | |
| if source2["name"] != src_name: | |
| s2 = run_raw_command(source2["cmd"], timeout=15) | |
| s2_out = s2["stdout"].strip() | |
| if not _is_empty_result(s2_out): | |
| all_results.append(f"\n[مصدر إضافي: {source2['name']}]\n{s2_out}") | |
| break | |
| final_out = combined_header + "\n\n".join(all_results) | |
| return { | |
| "stdout": final_out[:15000], | |
| "stderr": "", | |
| "returncode": 0, | |
| "_sources_tried": tried_names, | |
| "_fallback_used": src_name, | |
| } | |
| return { | |
| "stdout": f"[لم تُرجع أي مصادر نتائج لـ: {query}]\nالمصادر التي جربتها: {', '.join(tried_names[:5])}", | |
| "stderr": res["stderr"], | |
| "returncode": -1, | |
| "_sources_tried": tried_names, | |
| } | |
| def xdo(args: list, timeout=10) -> dict: | |
| r = subprocess.run(["xdotool"] + args, | |
| env={**os.environ, "DISPLAY": DISPLAY}, | |
| timeout=timeout, capture_output=True, text=True) | |
| return {"rc": r.returncode, "out": r.stdout, "err": r.stderr} | |
| async def broadcast(msg: dict): | |
| txt = json.dumps(msg, ensure_ascii=False) | |
| dead = [] | |
| for ws in active_connections: | |
| try: await ws.send_text(txt) | |
| except: dead.append(ws) | |
| for ws in dead: | |
| if ws in active_connections: | |
| active_connections.remove(ws) | |
| async def screen_stream_loop(): | |
| global stream_active | |
| interval = 1.0 / max(1, stream_fps) | |
| while stream_active and active_connections: | |
| try: | |
| frame = capture_screen(scale=stream_scale, quality=stream_quality) | |
| if frame: | |
| await broadcast({"type": "frame", "data": frame}) | |
| except: pass | |
| await asyncio.sleep(interval) | |
| stream_active = False | |
| # ─── Action Handler ────────────────────────────────── | |
| async def handle_action(ws: WebSocket, msg: dict): | |
| action = msg.get("action", "") | |
| data = msg.get("data", {}) | |
| async def send(obj): | |
| await ws.send_text(json.dumps(obj, ensure_ascii=False)) | |
| async def auto_shot(label="", delay=0.4): | |
| await asyncio.sleep(delay) | |
| frame = capture_screen(scale=0.7, quality=68) | |
| if frame: | |
| await send({"type": "screenshot", "data": frame, | |
| "ts": int(time.time()*1000), "auto": True, "label": label}) | |
| return frame | |
| if action == "screenshot": | |
| frame = capture_screen(scale=0.8, quality=75) | |
| await send({"type": "screenshot", "data": frame, "ts": int(time.time()*1000)}) | |
| elif action == "terminal": | |
| cmd = data.get("cmd", "") | |
| timeout = int(data.get("timeout", 45)) | |
| if not cmd: | |
| await send({"type": "terminal_result", "cmd": "", "stdout": "", | |
| "stderr": "no command", "returncode": -1}) | |
| return | |
| res = run_command_smart(cmd, timeout=timeout) | |
| await send({ | |
| "type": "terminal_result", | |
| "cmd": cmd, | |
| "stdout": res["stdout"], | |
| "stderr": res.get("stderr", ""), | |
| "returncode": res["returncode"], | |
| "fallback_used": res.get("_fallback_used", None), | |
| "sources_tried": res.get("_sources_tried", []), | |
| }) | |
| await auto_shot(f"بعد: {cmd[:45]}", delay=0.3) | |
| elif action == "mouse_move": | |
| x, y = int(data.get("x", 0)), int(data.get("y", 0)) | |
| xdo(["mousemove", str(x), str(y)]) | |
| await send({"type": "ack", "action": "mouse_move"}) | |
| elif action == "mouse_click": | |
| x, y = int(data.get("x", 0)), int(data.get("y", 0)) | |
| btn_num = {"left": "1", "middle": "2", "right": "3"}.get(data.get("button","left"), "1") | |
| xdo(["mousemove", str(x), str(y)]) | |
| await asyncio.sleep(0.05) | |
| if data.get("double"): | |
| xdo(["click", "--repeat", "2", "--delay", "100", btn_num]) | |
| else: | |
| xdo(["click", btn_num]) | |
| await send({"type": "ack", "action": "mouse_click"}) | |
| await auto_shot("بعد النقر", 0.4) | |
| elif action == "keyboard_type": | |
| text = data.get("text", "") | |
| if text: | |
| xdo(["type", "--clearmodifiers", "--delay", "30", text]) | |
| await send({"type": "ack", "action": "keyboard_type"}) | |
| await auto_shot("بعد الكتابة", 0.3) | |
| elif action == "keyboard_hotkey": | |
| keys = data.get("keys", []) | |
| if keys: | |
| xdo(["key", "--clearmodifiers", "+".join(keys)]) | |
| await send({"type": "ack", "action": "keyboard_hotkey"}) | |
| await auto_shot("بعد الاختصار", 0.4) | |
| elif action == "keyboard_press": | |
| key = data.get("key", "") | |
| if key: | |
| xdo(["key", "--clearmodifiers", key]) | |
| await send({"type": "ack", "action": "keyboard_press"}) | |
| elif action == "clipboard_write": | |
| proc = subprocess.Popen(["xclip", "-selection", "clipboard"], | |
| stdin=subprocess.PIPE, | |
| env={**os.environ, "DISPLAY": DISPLAY}) | |
| proc.communicate(data.get("text","").encode()) | |
| await send({"type": "ack", "action": "clipboard_write"}) | |
| elif action == "clipboard_read": | |
| res = run_raw_command("xclip -selection clipboard -o", timeout=5) | |
| await send({"type": "clipboard_content", "text": res["stdout"]}) | |
| elif action == "scroll": | |
| x, y = int(data.get("x", 0)), int(data.get("y", 0)) | |
| clicks = int(data.get("clicks", 3)) | |
| btn = "4" if clicks > 0 else "5" | |
| xdo(["mousemove", str(x), str(y)]) | |
| for _ in range(abs(clicks)): | |
| xdo(["click", btn]) | |
| await send({"type": "ack", "action": "scroll"}) | |
| elif action == "open_app": | |
| app_cmd = data.get("cmd", "") | |
| if app_cmd: | |
| subprocess.Popen(app_cmd, shell=True, | |
| env={**os.environ, "DISPLAY": DISPLAY}, | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| await send({"type": "ack", "action": "open_app", "cmd": app_cmd}) | |
| await auto_shot(f"بعد فتح: {app_cmd}", 3.0) | |
| else: | |
| await send({"type": "ack", "action": "open_app"}) | |
| elif action == "mouse_drag": | |
| x1,y1 = int(data.get("x1",0)), int(data.get("y1",0)) | |
| x2,y2 = int(data.get("x2",0)), int(data.get("y2",0)) | |
| xdo(["mousemove", str(x1), str(y1)]) | |
| xdo(["mousedown", "1"]) | |
| await asyncio.sleep(0.1) | |
| xdo(["mousemove", str(x2), str(y2)]) | |
| await asyncio.sleep(0.1) | |
| xdo(["mouseup", "1"]) | |
| await send({"type": "ack", "action": "mouse_drag"}) | |
| elif action == "start_stream": | |
| global stream_active, stream_fps, stream_quality, stream_scale | |
| stream_fps = int(data.get("fps", 3)) | |
| stream_quality = int(data.get("quality", 60)) | |
| stream_scale = float(data.get("scale", 0.5)) | |
| if not stream_active: | |
| stream_active = True | |
| asyncio.create_task(screen_stream_loop()) | |
| await send({"type": "ack", "action": "start_stream"}) | |
| elif action == "stop_stream": | |
| stream_active = False | |
| await send({"type": "ack", "action": "stop_stream"}) | |
| elif action == "screen_info": | |
| res = run_raw_command("xdotool getdisplaygeometry", timeout=5) | |
| try: parts = res["stdout"].strip().split(); w,h = int(parts[0]),int(parts[1]) | |
| except: w,h = 1920,1080 | |
| res2 = run_raw_command("xdotool getmouselocation", timeout=5) | |
| try: | |
| mx = int(re.search(r"x:(\d+)", res2["stdout"]).group(1)) | |
| my = int(re.search(r"y:(\d+)", res2["stdout"]).group(1)) | |
| except: mx,my = 0,0 | |
| await send({"type":"screen_info","width":w,"height":h,"mouse_x":mx,"mouse_y":my}) | |
| else: | |
| await send({"type": "error", "msg": f"Unknown action: {action}"}) | |
| # ─── WebSocket ─────────────────────────────────────── | |
| async def websocket_endpoint(ws: WebSocket): | |
| await ws.accept() | |
| active_connections.append(ws) | |
| res = run_raw_command("xdotool getdisplaygeometry", timeout=5) | |
| try: parts = res["stdout"].strip().split(); w,h = int(parts[0]),int(parts[1]) | |
| except: w,h = 1920,1080 | |
| await ws.send_text(json.dumps({ | |
| "type": "connected", "screen_width": w, "screen_height": h, | |
| "msg": "Z Computer Mode v5 — Smart Terminal + Multi-Source Search" | |
| }, ensure_ascii=False)) | |
| frame = capture_screen(scale=0.75, quality=72) | |
| if frame: | |
| await ws.send_text(json.dumps({"type":"screenshot","data":frame, | |
| "ts":int(time.time()*1000),"label":"الشاشة الأولية"})) | |
| try: | |
| while True: | |
| raw = await ws.receive_text() | |
| await handle_action(ws, json.loads(raw)) | |
| except WebSocketDisconnect: pass | |
| except Exception as e: print(f"[ws] {e}") | |
| finally: | |
| if ws in active_connections: | |
| active_connections.remove(ws) | |
| # ─── REST ──────────────────────────────────────────── | |
| async def rest_screenshot(): | |
| return JSONResponse({"image": capture_screen(0.75,70), "ts": int(time.time()*1000)}) | |
| async def rest_terminal(body: dict): | |
| return JSONResponse(run_command_smart(body.get("cmd",""), body.get("timeout",45))) | |
| async def health(): | |
| display = run_raw_command("xdotool getdisplaygeometry", 4) | |
| test = run_command_smart("curl -s 'https://api.duckduckgo.com/?q=test&format=json&no_html=1'", 15) | |
| return { | |
| "status": "ok", | |
| "version": "v5-smart-terminal", | |
| "display_working": display["returncode"] == 0, | |
| "search_working": test["returncode"] == 0, | |
| "fallback_used": test.get("_fallback_used"), | |
| } | |
| async def quick_search(query: str): | |
| res = run_command_smart( | |
| f"curl -s 'https://api.duckduckgo.com/?q={urllib.parse.quote_plus(query)}&format=json&no_html=1'", | |
| timeout=30 | |
| ) | |
| return JSONResponse(res) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run("app:app", host="0.0.0.0", port=port, log_level="info") | |