openclaw / app.py
akborana4's picture
Update app.py
b89ae19 verified
import os
# Clear proxies just in case
for key in list(os.environ.keys()):
if 'proxy' in key.lower():
del os.environ[key]
import subprocess
import httpx
import json
import re
import asyncio
import sqlite3
import bcrypt
import secrets
import traceback
import urllib.request
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from starlette.requests import Request
from pydantic import BaseModel
app = FastAPI()
templates = Jinja2Templates(directory="templates")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
# ⚡ ANTI-LINKIFIER URLS: Defeats the copy-paste bug
API_URL = "https://" + "openrouter.ai/api/v1/chat/completions"
REFERER_URL = "https://" + "huggingface.co/"
# --- DATABASE SETUP ---
DB_FILE = "openclaw.db"
USERS_DIR = "user_spaces"
def init_db():
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users
(username TEXT PRIMARY KEY, password TEXT, token TEXT, settings TEXT)''')
conn.commit()
conn.close()
if not os.path.exists(USERS_DIR):
os.makedirs(USERS_DIR)
init_db()
def get_user_dir(token: str):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("SELECT username FROM users WHERE token=?", (token,))
row = c.fetchone()
conn.close()
if not row: return None
return os.path.abspath(os.path.join(USERS_DIR, row[0]))
# --- AUTH & SETTINGS ROUTES ---
class UserAuth(BaseModel):
username: str
password: str
@app.post("/api/register")
async def register(user: UserAuth):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("SELECT username FROM users WHERE username=?", (user.username,))
if c.fetchone(): return {"error": "Username already exists"}
hashed_pw = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
token = secrets.token_hex(16)
default_settings = json.dumps({"theme": "#00ffcc", "bg": "#05050f", "font": "'Fira Code', monospace"})
c.execute("INSERT INTO users VALUES (?, ?, ?, ?)", (user.username, hashed_pw, token, default_settings))
conn.commit()
conn.close()
user_path = os.path.join(USERS_DIR, user.username)
os.makedirs(user_path, exist_ok=True)
return {"success": True, "token": token, "username": user.username, "settings": default_settings}
@app.post("/api/login")
async def login(user: UserAuth):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("SELECT password, token, settings FROM users WHERE username=?", (user.username,))
row = c.fetchone()
conn.close()
if row and bcrypt.checkpw(user.password.encode('utf-8'), row[0]):
return {"success": True, "token": row[1], "username": user.username, "settings": row[2]}
return {"error": "Invalid credentials"}
@app.post("/api/settings")
async def update_settings(data: dict):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("UPDATE users SET settings=? WHERE token=?", (json.dumps(data.get("settings", {})), data.get("token")))
conn.commit()
conn.close()
return {"success": True}
# --- EDITOR ROUTES ---
class FileReq(BaseModel):
token: str
filename: str = ""
content: str = ""
new_name: str = ""
@app.post("/api/files")
async def list_files(data: dict):
user_dir = get_user_dir(data.get("token"))
if not user_dir: return {"error": "Unauthorized"}
files = []
for root, _, filenames in os.walk(user_dir):
for f in filenames:
rel_dir = os.path.relpath(root, user_dir)
files.append(f if rel_dir == "." else f"{rel_dir}/{f}")
return {"files": files}
@app.post("/api/file/read")
async def read_file(data: FileReq):
user_dir = get_user_dir(data.token)
filepath = os.path.abspath(os.path.join(user_dir, data.filename))
if not filepath.startswith(user_dir): return {"error": "Access denied"}
try:
with open(filepath, "r") as f: return {"content": f.read()}
except Exception as e: return {"error": str(e)}
@app.post("/api/file/save")
async def save_file(data: FileReq):
user_dir = get_user_dir(data.token)
filepath = os.path.abspath(os.path.join(user_dir, data.filename))
if not filepath.startswith(user_dir): return {"error": "Access denied"}
os.makedirs(os.path.dirname(filepath), exist_ok=True)
try:
with open(filepath, "w") as f: f.write(data.content)
return {"success": True}
except Exception as e:
return {"error": str(e)}
@app.post("/api/file/rename")
async def rename_file(data: FileReq):
user_dir = get_user_dir(data.token)
old_path = os.path.abspath(os.path.join(user_dir, data.filename))
new_path = os.path.abspath(os.path.join(user_dir, data.new_name))
if not old_path.startswith(user_dir) or not new_path.startswith(user_dir): return {"error": "Access denied"}
os.rename(old_path, new_path)
return {"success": True}
@app.post("/api/ai_edit")
async def ai_edit(data: dict):
prompt, content = data.get("prompt"), data.get("content")
messages = [
{"role": "system", "content": "You are an expert coder. Rewrite the provided code based on the user's request. Output ONLY the raw updated code. Do not use markdown blocks like ```python. No conversational text."},
{"role": "user", "content": f"Current Code:\n{content}\n\nRequest: {prompt}"}
]
payload = {"model": "openrouter/auto", "messages": messages}
try:
headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": REFERER_URL}
async with httpx.AsyncClient(trust_env=False) as client:
res = await client.post(API_URL, headers=headers, json=payload, timeout=60.0)
if res.status_code != 200:
return {"code": f"# API_ERROR: {res.text}"}
new_code = res.json()['choices'][0]['message']['content']
except Exception as httpx_err:
try:
cmd = [
"curl", "--noproxy", "*", "-s", "-X", "POST", API_URL,
"-H", f"Authorization: Bearer {OPENROUTER_API_KEY}",
"-H", "Content-Type: application/json",
"-H", f"HTTP-Referer: {REFERER_URL}",
"-d", json.dumps(payload)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
return {"code": f"# CURL_NETWORK_ERROR: {result.stderr}\n# HTTPX_ERROR: {str(httpx_err)}"}
res_json = json.loads(result.stdout)
new_code = res_json['choices'][0]['message']['content']
except Exception as curl_err:
return {"code": f"# FATAL_NETWORK_ERROR. HTTPX: {str(httpx_err)} | CURL: {str(curl_err)}"}
new_code = re.sub(r"^```[a-z]*\n", "", new_code)
new_code = re.sub(r"\n```$", "", new_code)
return {"code": new_code.strip()}
@app.get("/", response_class=HTMLResponse)
async def get(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
# --- AI & TERMINAL LOGIC ---
SYSTEM_PROMPT = """You are OpenClaw, an AI terminal assistant.
You are operating within a user's private Linux directory.
To execute a bash command, output it wrapped EXACTLY in <EXEC> and </EXEC> tags.
Wait for the system to provide the output before taking further action. Keep responses concise."""
async def ask_openrouter(messages) -> str:
if not OPENROUTER_API_KEY:
return "API_ERROR: OPENROUTER_API_KEY is not set in Spaces Secrets."
data = {"model": "openrouter/auto", "messages": messages}
try:
headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": REFERER_URL}
async with httpx.AsyncClient(trust_env=False) as client:
response = await client.post(API_URL, headers=headers, json=data, timeout=45.0)
if response.status_code != 200: return f"API_ERROR: {response.text}"
return response.json()['choices'][0]['message']['content']
except Exception as e:
httpx_error = str(e)
try:
cmd = [
"curl", "--noproxy", "*", "-s", "-X", "POST", API_URL,
"-H", f"Authorization: Bearer {OPENROUTER_API_KEY}",
"-H", "Content-Type: application/json",
"-H", f"HTTP-Referer: {REFERER_URL}",
"-d", json.dumps(data)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=45)
if result.returncode != 0:
return f"CURL_NETWORK_ERROR: {result.stderr}\nHTTPX_ERROR: {httpx_error}"
res_json = json.loads(result.stdout)
if "error" in res_json:
return f"API_ERROR: {json.dumps(res_json['error'])}"
return res_json['choices'][0]['message']['content']
except Exception as curl_e:
return f"FATAL_NETWORK_ERROR\nHTTPX Error: {httpx_error}\nCURL Error: {str(curl_e)}"
@app.websocket("/ws/{token}")
async def websocket_endpoint(websocket: WebSocket, token: str):
await websocket.accept()
user_dir = get_user_dir(token)
if not user_dir:
await websocket.send_text(json.dumps({"type": "error", "content": "Authentication failed."}))
await websocket.close()
return
current_dir = user_dir
session_history = [{"role": "system", "content": SYSTEM_PROMPT}]
await websocket.send_text(json.dumps({"type": "system", "content": f"Secure environment established. Type 'debug net' to run network diagnostics."}))
def run_cmd(cmd):
nonlocal current_dir
if cmd.startswith("cd "):
target = cmd[3:].strip()
new_dir = os.path.abspath(os.path.join(current_dir, target))
if not new_dir.startswith(os.path.abspath(USERS_DIR)): return "Permission denied."
current_dir = new_dir
return f"Directory changed to {current_dir}"
else:
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, cwd=current_dir, timeout=25)
out = result.stdout + result.stderr
return out if out else "[Executed successfully with no output]"
except Exception as e: return f"[Execution Error: {str(e)}]"
try:
while True:
data = await websocket.receive_text()
payload = json.loads(data)
command = payload.get("command", "").strip()
if not command: continue
if command.lower() == "debug net":
debug_info = "=== NETWORK DIAGNOSTICS ===\n\n1. OS Environment Variables (Proxy):\n"
for k, v in os.environ.items():
if 'proxy' in k.lower(): debug_info += f" {k} = {v}\n"
debug_info += "\n2. Python internal getproxies():\n"
debug_info += f" {urllib.request.getproxies()}\n"
debug_info += "\n3. Testing httpx to OpenRouter...\n"
try:
with httpx.Client(trust_env=False) as c:
# Split string to defeat linkifier here too
r = c.get("https://" + "openrouter.ai/")
debug_info += f" HTTPX Success! Status: {r.status_code}\n"
except Exception as e:
debug_info += f" HTTPX FAILED: {str(e)}\n\nTRACEBACK:\n{traceback.format_exc()}"
await websocket.send_text(json.dumps({"type": "output", "content": debug_info}))
continue
if command.lower().startswith("ai "):
prompt = command[3:].strip()
await websocket.send_text(json.dumps({"type": "ai_status", "status": "thinking"}))
session_history.append({"role": "user", "content": prompt})
if len(session_history) > 21: session_history = [session_history[0]] + session_history[-20:]
for _ in range(5):
ai_response = await ask_openrouter(session_history)
if "ERROR:" in ai_response:
await websocket.send_text(json.dumps({"type": "error", "content": ai_response}))
session_history.pop()
break
match = re.search(r'<EXEC>(.*?)</EXEC>', ai_response, re.DOTALL)
if match:
exec_cmd = match.group(1).strip()
text_before = ai_response[:match.start()].strip()
full_ai_message = ""
if text_before:
await websocket.send_text(json.dumps({"type": "ai", "content": text_before}))
full_ai_message += text_before + "\n"
await websocket.send_text(json.dumps({"type": "system", "content": f"⚙️ AI is running: {exec_cmd}"}))
await websocket.send_text(json.dumps({"type": "ai_status", "status": f"executing..."}))
full_ai_message += f"<EXEC>{exec_cmd}</EXEC>"
session_history.append({"role": "assistant", "content": full_ai_message})
output = run_cmd(exec_cmd)
await websocket.send_text(json.dumps({"type": "output", "content": output}))
session_history.append({"role": "user", "content": f"Command Output:\n{output}"})
await asyncio.sleep(1)
else:
await websocket.send_text(json.dumps({"type": "ai", "content": ai_response}))
session_history.append({"role": "assistant", "content": ai_response})
break
await websocket.send_text(json.dumps({"type": "ai_status", "status": "idle"}))
else:
if command.lower() == "clear":
await websocket.send_text(json.dumps({"type": "clear", "content": ""}))
else:
output = run_cmd(command)
await websocket.send_text(json.dumps({"type": "output", "content": output}))
except WebSocketDisconnect:
pass