import os import pty import select import subprocess import termios import struct import fcntl import json import asyncio import signal import time import shutil import glob import re from typing import Dict, List, Optional, Any from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Response, HTTPException from fastapi.staticfiles import StaticFiles from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel import httpx app = FastAPI() # PTY state ptys: Dict[int, int] = {} next_pty_id = 1 # Shell sessions (persistent) shell_sessions: Dict[int, Dict[str, Any]] = {} next_shell_id = 1 # Background processes bg_processes: Dict[int, Dict[str, Any]] = {} next_bg_id = 1 # Secrets (file-based fallback) SECRETS_FILE = os.path.expanduser("~/.terax-secrets.json") def load_secrets(): if os.path.exists(SECRETS_FILE): try: with open(SECRETS_FILE, "r") as f: return json.load(f) except: return {} return {} def save_secrets(secrets): try: with open(SECRETS_FILE, "w") as f: json.dump(secrets, f) except Exception as e: print(f"Failed to save secrets: {e}") @app.get("/health") async def health(): return {"status": "ok", "message": "Terax backend is running"} # Tauri Command Handlers class InvokeRequest(BaseModel): cmd: str args: Dict[str, Any] @app.post("/invoke") async def invoke(request: InvokeRequest): cmd = request.cmd args = request.args try: if cmd == "fs_read_dir": path = args.get("path", ".") show_hidden = args.get("showHidden", False) entries = [] if not os.path.exists(path): return [] for entry in os.scandir(path): if not show_hidden and entry.name.startswith("."): continue try: info = entry.stat() entries.append({ "name": entry.name, "kind": "dir" if entry.is_dir() else "file", "size": info.st_size, "mtime": int(info.st_mtime * 1000) }) except OSError: continue return entries elif cmd == "fs_read_file": path = args.get("path") if not os.path.exists(path): raise HTTPException(status_code=404, detail="File not found") size = os.path.getsize(path) if size > 10 * 1024 * 1024: # 10MB limit return {"kind": "toolarge", "size": size, "limit": 10 * 1024 * 1024} try: with open(path, "r", encoding="utf-8") as f: content = f.read() return {"kind": "text", "content": content, "size": size} except UnicodeDecodeError: return {"kind": "binary", "size": size} elif cmd == "fs_write_file": path = args.get("path") content = args.get("content") os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(content) return None elif cmd == "fs_create_file": path = args.get("path") os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "a"): os.utime(path, None) return None elif cmd == "fs_create_dir": path = args.get("path") os.makedirs(path, exist_ok=True) return None elif cmd == "fs_rename": src = args.get("from") to = args.get("to") os.rename(src, to) return None elif cmd == "fs_delete": path = args.get("path") if os.path.isdir(path): shutil.rmtree(path) else: os.remove(path) return None elif cmd == "fs_grep": pattern = args.get("pattern") root = args.get("root", ".") try: # Use grep if available, fallback to pure python if needed proc = subprocess.run( ["grep", "-rnE", pattern, root], capture_output=True, text=True, timeout=10 ) hits = [] for line in proc.stdout.splitlines(): parts = line.split(":", 2) if len(parts) >= 3: hits.append({ "path": parts[0], "rel": os.path.relpath(parts[0], root), "line": int(parts[1]), "text": parts[2] }) return {"hits": hits[:100], "truncated": len(hits) > 100, "files_scanned": 0} except Exception as e: print(f"Grep error: {e}") return {"hits": [], "truncated": False, "files_scanned": 0} elif cmd == "fs_glob": pattern = args.get("pattern") root = args.get("root", ".") search_path = os.path.join(root, pattern) results = glob.glob(search_path, recursive=True) hits = [{"path": p, "rel": os.path.relpath(p, root)} for p in results[:100]] return {"hits": hits, "truncated": len(results) > 100} elif cmd == "shell_run_command": command = args.get("command") cwd = args.get("cwd") timeout = args.get("timeoutSecs", 30) try: proc = subprocess.run( command, shell=True, cwd=cwd, capture_output=True, text=True, timeout=timeout ) return { "stdout": proc.stdout, "stderr": proc.stderr, "exit_code": proc.returncode, "timed_out": False, "truncated": False } except subprocess.TimeoutExpired as e: return { "stdout": e.stdout.decode() if e.stdout else "", "stderr": e.stderr.decode() if e.stderr else "", "exit_code": None, "timed_out": True, "truncated": False } elif cmd == "shell_session_open": global next_shell_id id = next_shell_id next_shell_id += 1 shell_sessions[id] = {"cwd": args.get("cwd", ".")} return id elif cmd == "shell_session_run": id = args.get("id") command = args.get("command") session = shell_sessions.get(id) if not session: raise HTTPException(status_code=404, detail="Session not found") cwd = args.get("cwd") or session["cwd"] # Simple wrapper to try and track CWD changes wrapped_command = f"{command} && pwd" try: proc = subprocess.run( wrapped_command, shell=True, cwd=cwd, capture_output=True, text=True, timeout=args.get("timeoutSecs", 30) ) stdout_lines = proc.stdout.splitlines() new_cwd = stdout_lines[-1] if stdout_lines else cwd session["cwd"] = new_cwd return { "stdout": "\n".join(stdout_lines[:-1]), "stderr": proc.stderr, "exit_code": proc.returncode, "timed_out": False, "truncated": False, "cwd_after": new_cwd } except Exception as e: return {"error": str(e)} elif cmd == "shell_session_close": id = args.get("id") if id in shell_sessions: del shell_sessions[id] return None elif cmd == "secrets_get": key = args.get("key") secrets = load_secrets() return secrets.get(key) elif cmd == "secrets_set": key = args.get("key") value = args.get("value") secrets = load_secrets() secrets[key] = value save_secrets(secrets) return None elif cmd == "secrets_get_all": return load_secrets() elif cmd == "secrets_delete": key = args.get("key") secrets = load_secrets() if key in secrets: del secrets[key] save_secrets(secrets) return None elif cmd == "ai_http_request": method = args.get("method", "GET") url = args.get("url") headers = args.get("headers", {}) body = args.get("body") async with httpx.AsyncClient() as client: resp = await client.request(method, url, headers=headers, content=body) return { "status": resp.status_code, "headers": dict(resp.headers), "body": list(resp.content) } elif cmd == "lm_ping": base_url = (args.get("baseUrl") or args.get("base_url") or "").strip().rstrip('/') if not base_url: return 400 async with httpx.AsyncClient() as client: try: resp = await client.get(f"{base_url}/models", timeout=5) return resp.status_code except: return 500 return {"error": f"Command {cmd} not implemented"} except Exception as e: print(f"Error in invoke {cmd}: {e}") return JSONResponse(status_code=500, content={"error": str(e)}) @app.post("/ai_http_stream") async def ai_http_stream(request: Request): try: args = await request.json() except: return JSONResponse(status_code=400, content={"error": "Invalid JSON"}) url = args.get("url") method = args.get("method", "GET") headers = args.get("headers", {}) body = args.get("body") async def stream_generator(): async with httpx.AsyncClient() as client: try: async with client.stream(method, url, headers=headers, content=body) as resp: yield json.dumps({ "kind": "headers", "status": resp.status_code, "headers": dict(resp.headers) }) + "\n" async for chunk in resp.aiter_bytes(): yield json.dumps({ "kind": "chunk", "bytes": list(chunk) }) + "\n" yield json.dumps({"kind": "end"}) + "\n" except Exception as e: yield json.dumps({"kind": "error", "message": str(e)}) + "\n" return StreamingResponse(stream_generator(), media_type="application/x-ndjson") @app.websocket("/ws/pty/{pty_id}") async def pty_websocket(websocket: WebSocket, pty_id: int): await websocket.accept() master_fd, slave_fd = pty.openpty() # Set default size buf = struct.pack('HHHH', 24, 80, 0, 0) fcntl.ioctl(master_fd, termios.TIOCSWINSZ, buf) shell = os.environ.get("SHELL", "bash") p = subprocess.Popen( [shell], preexec_fn=os.setsid, stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, universal_newlines=True, env=os.environ.copy() ) os.close(slave_fd) async def read_from_pty(): loop = asyncio.get_event_loop() while True: try: data = await loop.run_in_executor(None, lambda: os.read(master_fd, 1024)) if not data: break await websocket.send_bytes(data) except Exception: break read_task = asyncio.create_task(read_from_pty()) try: while True: msg = await websocket.receive() if "bytes" in msg: os.write(master_fd, msg["bytes"]) elif "text" in msg: try: control = json.loads(msg["text"]) if control.get("type") == "resize": cols = control.get("cols", 80) rows = control.get("rows", 24) buf = struct.pack('HHHH', rows, cols, 0, 0) fcntl.ioctl(master_fd, termios.TIOCSWINSZ, buf) except: pass except WebSocketDisconnect: pass finally: read_task.cancel() p.terminate() try: os.close(master_fd) except: pass @app.post("/pty/open") async def open_pty_post(request: Request): global next_pty_id id = next_pty_id next_pty_id += 1 return {"id": id} # Serve static files from the 'dist' directory if os.path.exists("dist"): app.mount("/", StaticFiles(directory="dist", html=True), name="static") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)