| import os |
|
|
| |
| for key in list(os.environ.keys()): |
| if 'proxy' in key.lower(): |
| del os.environ[key] |
|
|
| import subprocess |
| import httpx |
| import json |
| import re |
| import asyncio |
| import sqlite3 |
| import bcrypt |
| import secrets |
| import traceback |
| import urllib.request |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect |
| from fastapi.responses import HTMLResponse |
| from fastapi.templating import Jinja2Templates |
| from starlette.requests import Request |
| from pydantic import BaseModel |
|
|
| app = FastAPI() |
| templates = Jinja2Templates(directory="templates") |
|
|
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") |
|
|
| |
| API_URL = "https://" + "openrouter.ai/api/v1/chat/completions" |
| REFERER_URL = "https://" + "huggingface.co/" |
|
|
| |
| DB_FILE = "openclaw.db" |
| USERS_DIR = "user_spaces" |
|
|
| def init_db(): |
| conn = sqlite3.connect(DB_FILE) |
| c = conn.cursor() |
| c.execute('''CREATE TABLE IF NOT EXISTS users |
| (username TEXT PRIMARY KEY, password TEXT, token TEXT, settings TEXT)''') |
| conn.commit() |
| conn.close() |
| if not os.path.exists(USERS_DIR): |
| os.makedirs(USERS_DIR) |
|
|
| init_db() |
|
|
| def get_user_dir(token: str): |
| conn = sqlite3.connect(DB_FILE) |
| c = conn.cursor() |
| c.execute("SELECT username FROM users WHERE token=?", (token,)) |
| row = c.fetchone() |
| conn.close() |
| if not row: return None |
| return os.path.abspath(os.path.join(USERS_DIR, row[0])) |
|
|
| |
| class UserAuth(BaseModel): |
| username: str |
| password: str |
|
|
| @app.post("/api/register") |
| async def register(user: UserAuth): |
| conn = sqlite3.connect(DB_FILE) |
| c = conn.cursor() |
| c.execute("SELECT username FROM users WHERE username=?", (user.username,)) |
| if c.fetchone(): return {"error": "Username already exists"} |
| |
| hashed_pw = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt()) |
| token = secrets.token_hex(16) |
| default_settings = json.dumps({"theme": "#00ffcc", "bg": "#05050f", "font": "'Fira Code', monospace"}) |
| |
| c.execute("INSERT INTO users VALUES (?, ?, ?, ?)", (user.username, hashed_pw, token, default_settings)) |
| conn.commit() |
| conn.close() |
| |
| user_path = os.path.join(USERS_DIR, user.username) |
| os.makedirs(user_path, exist_ok=True) |
| return {"success": True, "token": token, "username": user.username, "settings": default_settings} |
|
|
| @app.post("/api/login") |
| async def login(user: UserAuth): |
| conn = sqlite3.connect(DB_FILE) |
| c = conn.cursor() |
| c.execute("SELECT password, token, settings FROM users WHERE username=?", (user.username,)) |
| row = c.fetchone() |
| conn.close() |
| |
| if row and bcrypt.checkpw(user.password.encode('utf-8'), row[0]): |
| return {"success": True, "token": row[1], "username": user.username, "settings": row[2]} |
| return {"error": "Invalid credentials"} |
|
|
| @app.post("/api/settings") |
| async def update_settings(data: dict): |
| conn = sqlite3.connect(DB_FILE) |
| c = conn.cursor() |
| c.execute("UPDATE users SET settings=? WHERE token=?", (json.dumps(data.get("settings", {})), data.get("token"))) |
| conn.commit() |
| conn.close() |
| return {"success": True} |
|
|
| |
| class FileReq(BaseModel): |
| token: str |
| filename: str = "" |
| content: str = "" |
| new_name: str = "" |
|
|
| @app.post("/api/files") |
| async def list_files(data: dict): |
| user_dir = get_user_dir(data.get("token")) |
| if not user_dir: return {"error": "Unauthorized"} |
| files = [] |
| for root, _, filenames in os.walk(user_dir): |
| for f in filenames: |
| rel_dir = os.path.relpath(root, user_dir) |
| files.append(f if rel_dir == "." else f"{rel_dir}/{f}") |
| return {"files": files} |
|
|
| @app.post("/api/file/read") |
| async def read_file(data: FileReq): |
| user_dir = get_user_dir(data.token) |
| filepath = os.path.abspath(os.path.join(user_dir, data.filename)) |
| if not filepath.startswith(user_dir): return {"error": "Access denied"} |
| try: |
| with open(filepath, "r") as f: return {"content": f.read()} |
| except Exception as e: return {"error": str(e)} |
|
|
| @app.post("/api/file/save") |
| async def save_file(data: FileReq): |
| user_dir = get_user_dir(data.token) |
| filepath = os.path.abspath(os.path.join(user_dir, data.filename)) |
| if not filepath.startswith(user_dir): return {"error": "Access denied"} |
| os.makedirs(os.path.dirname(filepath), exist_ok=True) |
| try: |
| with open(filepath, "w") as f: f.write(data.content) |
| return {"success": True} |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| @app.post("/api/file/rename") |
| async def rename_file(data: FileReq): |
| user_dir = get_user_dir(data.token) |
| old_path = os.path.abspath(os.path.join(user_dir, data.filename)) |
| new_path = os.path.abspath(os.path.join(user_dir, data.new_name)) |
| if not old_path.startswith(user_dir) or not new_path.startswith(user_dir): return {"error": "Access denied"} |
| os.rename(old_path, new_path) |
| return {"success": True} |
|
|
| @app.post("/api/ai_edit") |
| async def ai_edit(data: dict): |
| prompt, content = data.get("prompt"), data.get("content") |
| messages = [ |
| {"role": "system", "content": "You are an expert coder. Rewrite the provided code based on the user's request. Output ONLY the raw updated code. Do not use markdown blocks like ```python. No conversational text."}, |
| {"role": "user", "content": f"Current Code:\n{content}\n\nRequest: {prompt}"} |
| ] |
| payload = {"model": "openrouter/auto", "messages": messages} |
| |
| try: |
| headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": REFERER_URL} |
| async with httpx.AsyncClient(trust_env=False) as client: |
| res = await client.post(API_URL, headers=headers, json=payload, timeout=60.0) |
| if res.status_code != 200: |
| return {"code": f"# API_ERROR: {res.text}"} |
| new_code = res.json()['choices'][0]['message']['content'] |
| except Exception as httpx_err: |
| try: |
| cmd = [ |
| "curl", "--noproxy", "*", "-s", "-X", "POST", API_URL, |
| "-H", f"Authorization: Bearer {OPENROUTER_API_KEY}", |
| "-H", "Content-Type: application/json", |
| "-H", f"HTTP-Referer: {REFERER_URL}", |
| "-d", json.dumps(payload) |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) |
| if result.returncode != 0: |
| return {"code": f"# CURL_NETWORK_ERROR: {result.stderr}\n# HTTPX_ERROR: {str(httpx_err)}"} |
| res_json = json.loads(result.stdout) |
| new_code = res_json['choices'][0]['message']['content'] |
| except Exception as curl_err: |
| return {"code": f"# FATAL_NETWORK_ERROR. HTTPX: {str(httpx_err)} | CURL: {str(curl_err)}"} |
|
|
| new_code = re.sub(r"^```[a-z]*\n", "", new_code) |
| new_code = re.sub(r"\n```$", "", new_code) |
| return {"code": new_code.strip()} |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def get(request: Request): |
| return templates.TemplateResponse("index.html", {"request": request}) |
|
|
| |
| SYSTEM_PROMPT = """You are OpenClaw, an AI terminal assistant. |
| You are operating within a user's private Linux directory. |
| To execute a bash command, output it wrapped EXACTLY in <EXEC> and </EXEC> tags. |
| Wait for the system to provide the output before taking further action. Keep responses concise.""" |
|
|
| async def ask_openrouter(messages) -> str: |
| if not OPENROUTER_API_KEY: |
| return "API_ERROR: OPENROUTER_API_KEY is not set in Spaces Secrets." |
|
|
| data = {"model": "openrouter/auto", "messages": messages} |
| |
| try: |
| headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": REFERER_URL} |
| async with httpx.AsyncClient(trust_env=False) as client: |
| response = await client.post(API_URL, headers=headers, json=data, timeout=45.0) |
| if response.status_code != 200: return f"API_ERROR: {response.text}" |
| return response.json()['choices'][0]['message']['content'] |
| |
| except Exception as e: |
| httpx_error = str(e) |
| try: |
| cmd = [ |
| "curl", "--noproxy", "*", "-s", "-X", "POST", API_URL, |
| "-H", f"Authorization: Bearer {OPENROUTER_API_KEY}", |
| "-H", "Content-Type: application/json", |
| "-H", f"HTTP-Referer: {REFERER_URL}", |
| "-d", json.dumps(data) |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=45) |
| |
| if result.returncode != 0: |
| return f"CURL_NETWORK_ERROR: {result.stderr}\nHTTPX_ERROR: {httpx_error}" |
| |
| res_json = json.loads(result.stdout) |
| if "error" in res_json: |
| return f"API_ERROR: {json.dumps(res_json['error'])}" |
| |
| return res_json['choices'][0]['message']['content'] |
| |
| except Exception as curl_e: |
| return f"FATAL_NETWORK_ERROR\nHTTPX Error: {httpx_error}\nCURL Error: {str(curl_e)}" |
|
|
| @app.websocket("/ws/{token}") |
| async def websocket_endpoint(websocket: WebSocket, token: str): |
| await websocket.accept() |
| user_dir = get_user_dir(token) |
| if not user_dir: |
| await websocket.send_text(json.dumps({"type": "error", "content": "Authentication failed."})) |
| await websocket.close() |
| return |
|
|
| current_dir = user_dir |
| session_history = [{"role": "system", "content": SYSTEM_PROMPT}] |
| |
| await websocket.send_text(json.dumps({"type": "system", "content": f"Secure environment established. Type 'debug net' to run network diagnostics."})) |
|
|
| def run_cmd(cmd): |
| nonlocal current_dir |
| if cmd.startswith("cd "): |
| target = cmd[3:].strip() |
| new_dir = os.path.abspath(os.path.join(current_dir, target)) |
| if not new_dir.startswith(os.path.abspath(USERS_DIR)): return "Permission denied." |
| current_dir = new_dir |
| return f"Directory changed to {current_dir}" |
| else: |
| try: |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True, cwd=current_dir, timeout=25) |
| out = result.stdout + result.stderr |
| return out if out else "[Executed successfully with no output]" |
| except Exception as e: return f"[Execution Error: {str(e)}]" |
|
|
| try: |
| while True: |
| data = await websocket.receive_text() |
| payload = json.loads(data) |
| command = payload.get("command", "").strip() |
| if not command: continue |
| |
| if command.lower() == "debug net": |
| debug_info = "=== NETWORK DIAGNOSTICS ===\n\n1. OS Environment Variables (Proxy):\n" |
| for k, v in os.environ.items(): |
| if 'proxy' in k.lower(): debug_info += f" {k} = {v}\n" |
| |
| debug_info += "\n2. Python internal getproxies():\n" |
| debug_info += f" {urllib.request.getproxies()}\n" |
| |
| debug_info += "\n3. Testing httpx to OpenRouter...\n" |
| try: |
| with httpx.Client(trust_env=False) as c: |
| |
| r = c.get("https://" + "openrouter.ai/") |
| debug_info += f" HTTPX Success! Status: {r.status_code}\n" |
| except Exception as e: |
| debug_info += f" HTTPX FAILED: {str(e)}\n\nTRACEBACK:\n{traceback.format_exc()}" |
| |
| await websocket.send_text(json.dumps({"type": "output", "content": debug_info})) |
| continue |
| |
| if command.lower().startswith("ai "): |
| prompt = command[3:].strip() |
| await websocket.send_text(json.dumps({"type": "ai_status", "status": "thinking"})) |
| session_history.append({"role": "user", "content": prompt}) |
| if len(session_history) > 21: session_history = [session_history[0]] + session_history[-20:] |
| |
| for _ in range(5): |
| ai_response = await ask_openrouter(session_history) |
| if "ERROR:" in ai_response: |
| await websocket.send_text(json.dumps({"type": "error", "content": ai_response})) |
| session_history.pop() |
| break |
| |
| match = re.search(r'<EXEC>(.*?)</EXEC>', ai_response, re.DOTALL) |
| if match: |
| exec_cmd = match.group(1).strip() |
| text_before = ai_response[:match.start()].strip() |
| |
| full_ai_message = "" |
| if text_before: |
| await websocket.send_text(json.dumps({"type": "ai", "content": text_before})) |
| full_ai_message += text_before + "\n" |
| |
| await websocket.send_text(json.dumps({"type": "system", "content": f"⚙️ AI is running: {exec_cmd}"})) |
| await websocket.send_text(json.dumps({"type": "ai_status", "status": f"executing..."})) |
| |
| full_ai_message += f"<EXEC>{exec_cmd}</EXEC>" |
| session_history.append({"role": "assistant", "content": full_ai_message}) |
| |
| output = run_cmd(exec_cmd) |
| |
| await websocket.send_text(json.dumps({"type": "output", "content": output})) |
| |
| session_history.append({"role": "user", "content": f"Command Output:\n{output}"}) |
| await asyncio.sleep(1) |
| else: |
| await websocket.send_text(json.dumps({"type": "ai", "content": ai_response})) |
| session_history.append({"role": "assistant", "content": ai_response}) |
| break |
| await websocket.send_text(json.dumps({"type": "ai_status", "status": "idle"})) |
| else: |
| if command.lower() == "clear": |
| await websocket.send_text(json.dumps({"type": "clear", "content": ""})) |
| else: |
| output = run_cmd(command) |
| await websocket.send_text(json.dumps({"type": "output", "content": output})) |
| except WebSocketDisconnect: |
| pass |
| |