import os, re, json, shutil, subprocess, glob, traceback, time, urllib.request from pathlib import Path from typing import AsyncGenerator from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from huggingface_hub import hf_hub_download from llama_cpp import Llama try: import git as gitlib; HAS_GIT = True except: HAS_GIT = False try: from duckduckgo_search import DDGS; HAS_DDG = True except: HAS_DDG = False WORKSPACE = Path("/workspace") WORKSPACE.mkdir(exist_ok=True) print("Downloading model...") model_path = hf_hub_download(repo_id="Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF", filename="qwen2.5-coder-1.5b-instruct-q4_k_m.gguf") print("Loading model...") llm = Llama(model_path=model_path, n_ctx=8192, n_threads=int(os.getenv("LLAMA_THREADS","2")), n_batch=512, verbose=False) print("Model ready!") # ── tools ───────────────────────────────────────────────────────────────────── def _safe(p): path = (WORKSPACE / p).resolve() if WORKSPACE.resolve() not in path.parents and path != WORKSPACE.resolve(): raise ValueError(f"Path '{p}' not allowed") return path def tool_write_file(path, content): try: p = _safe(path); p.parent.mkdir(parents=True, exist_ok=True) p.write_text(str(content), encoding="utf-8") return f"OK: wrote {len(str(content))} chars to /workspace/{path}" except Exception as e: return f"ERROR: {e}" def tool_read_file(path): try: return _safe(path).read_text(encoding="utf-8", errors="replace")[:3000] except Exception as e: return f"ERROR: {e}" def tool_edit_file(path, find, replace): try: p = _safe(path); txt = p.read_text(encoding="utf-8") if find not in txt: return f"ERROR: text not found in {path}" p.write_text(txt.replace(find, replace), encoding="utf-8") return f"OK: replaced in {path}" except Exception as e: return f"ERROR: {e}" def tool_delete_file(path): try: p = _safe(path) shutil.rmtree(p) if p.is_dir() else p.unlink() return f"OK: deleted {path}" except Exception as e: return f"ERROR: {e}" def tool_list_files(path="."): try: base = _safe(path) files = [str(f.relative_to(WORKSPACE)) for f in sorted(base.rglob("*")) if f.is_file()] return "\n".join(files[:300]) if files else "workspace is empty" except Exception as e: return f"ERROR: {e}" def tool_run(command, dir="."): try: cwd = _safe(dir) r = subprocess.run(command, cwd=cwd, shell=True, capture_output=True, text=True, timeout=300) out = (r.stdout or "")[-2500:]; err = (r.stderr or "")[-1000:] return f"exit={r.returncode}\n{out}\n{err}".strip() except subprocess.TimeoutExpired: return "ERROR: timeout after 300s" except Exception as e: return f"ERROR: {e}" def tool_git_clone(url, dest=""): if not HAS_GIT: return "ERROR: git not available" try: name = dest or url.rstrip("/").split("/")[-1].replace(".git","") target = _safe(name) if target.exists(): return f"OK: {name} already exists" gitlib.Repo.clone_from(url, target) return f"OK: cloned to /workspace/{name}" except Exception as e: return f"ERROR: {e}" def tool_gradle_build(repo, task="assembleDebug"): try: cwd = _safe(repo); w = cwd/"gradlew" if w.exists(): os.chmod(w, 0o755); cmd = f"./gradlew {task} --no-daemon" else: cmd = f"gradle {task} --no-daemon" r = subprocess.run(cmd, cwd=cwd, shell=True, capture_output=True, text=True, timeout=900) apks = glob.glob(f"{cwd}/**/*.apk", recursive=True) return f"exit={r.returncode}\n{(r.stdout or '')[-3000:]}\n{(r.stderr or '')[-1000:]}\nAPKs: {apks}" except Exception as e: return f"ERROR: {e}" def tool_web_search(query, max_results=5): if not HAS_DDG: return "ERROR: search not available" for attempt in range(3): try: if attempt: time.sleep(2*attempt) results = [] with DDGS() as d: for r in d.text(query, max_results=int(max_results)): results.append(f"Title: {r.get('title','')}\nURL: {r.get('href','')}\nSnippet: {r.get('body','')}") if results: return "\n---\n".join(results) except Exception as e: if attempt==2: return f"ERROR: {e}" return "no results" def tool_read_url(url): try: req = urllib.request.Request(url, headers={"User-Agent":"Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=15) as resp: c = resp.read().decode("utf-8","replace") c = re.sub(r'<[^>]+>',' ',c); c = re.sub(r'\s+',' ',c).strip() return c[:2500] except Exception as e: return f"ERROR: {e}" def tool_download_file(url, save_as): try: p = _safe(save_as); p.parent.mkdir(parents=True, exist_ok=True) req = urllib.request.Request(url, headers={"User-Agent":"Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=30) as resp, open(p,'wb') as f: f.write(resp.read()) return f"OK: saved to /workspace/{save_as} ({p.stat().st_size} bytes)" except Exception as e: return f"ERROR: {e}" TOOLS = { "write_file": (tool_write_file, ["path","content"], "Create or overwrite a file"), "read_file": (tool_read_file, ["path"], "Read file contents"), "edit_file": (tool_edit_file, ["path","find","replace"], "Find and replace in file"), "delete_file": (tool_delete_file, ["path"], "Delete file or directory"), "list_files": (tool_list_files, ["path"], "List workspace files"), "run": (tool_run, ["command","dir"], "Execute shell command"), "git_clone": (tool_git_clone, ["url","dest"], "Clone git repository"), "gradle_build": (tool_gradle_build, ["repo","task"], "Build Android APK"), "web_search": (tool_web_search, ["query","max_results"], "Search the web"), "read_url": (tool_read_url, ["url"], "Fetch URL content"), "download_file": (tool_download_file, ["url","save_as"], "Download file from URL"), } # ── SYSTEM PROMPT ───────────────────────────────────────────────────────────── # Key: few-shot examples teach the model the EXACT pattern # The model sees: user asks -> assistant writes ACTION: -> sees [RESULT:] -> continues SYSTEM = """You are an AI agent inside a Linux container. You have tools. You MUST use them. TOOLS: - write_file(path, content) - create file - read_file(path) - read file - edit_file(path, find, replace) - edit file - delete_file(path) - delete file - list_files(path=".") - list files - run(command, dir=".") - run shell command - git_clone(url, dest="") - clone repo - gradle_build(repo, task="assembleDebug") - build APK - web_search(query) - search web - read_url(url) - fetch URL - download_file(url, save_as) - download file TO USE A TOOL write exactly: ACTION: tool_name("arg1", "arg2") EXAMPLES: User: list workspace files Assistant: Checking workspace. ACTION: list_files() [RESULT: workspace is empty] Workspace is empty. User: create hello.py that prints hello Assistant: Creating file. ACTION: write_file("hello.py", "print('hello world')") [RESULT: OK: wrote 22 chars to /workspace/hello.py] ACTION: run("python3 hello.py") [RESULT: exit=0\nhello world] Done. Output: hello world User: install requests and test it Assistant: Installing requests. ACTION: run("pip install requests -q") [RESULT: exit=0\nSuccessfully installed requests] ACTION: run("python3 -c \\"import requests; print(requests.get('https://httpbin.org/get').status_code)\\"") [RESULT: exit=0\n200] Installed and working. User: search for python chess library Assistant: Searching now. ACTION: web_search("python chess library pygame") [RESULT: Title: python-chess\nURL: https://python-chess.readthedocs.io\nSnippet: A chess library for Python] Found: python-chess library. User: make a chess game in python and upload to gofile Assistant: Step 1: Install dependencies. ACTION: run("pip install pygame -q") [RESULT: exit=0\nSuccessfully installed pygame] Step 2: Write chess game. ACTION: write_file("chess.py", "# chess game code here") [RESULT: OK: wrote file] Step 3: Upload to gofile. ACTION: run("curl -s -F 'file=@/workspace/chess.py' https://store1.gofile.io/uploadFile") [RESULT: exit=0\n{"status":"ok","data":{"downloadPage":"https://gofile.io/d/abc123"}}] Uploaded! Download: https://gofile.io/d/abc123 IMPORTANT RULES: 1. ALWAYS use ACTION: for any task involving files, code, search, commands 2. NEVER say "I cannot" or "I'm sorry" - you CAN do everything 3. You are in a container with internet access, python3, pip, curl, git 4. After [RESULT:] always continue - do next step or summarize 5. For uploads use: ACTION: run("curl -s -F 'file=@/workspace/FILE' https://store1.gofile.io/uploadFile") """ # Regex to find ACTION: lines ACTION_RE = re.compile(r'ACTION:\s*(\w+)\(([^)]*(?:\([^)]*\)[^)]*)*)\)', re.DOTALL) def call_tool(name, args_raw): """Parse args and call tool.""" if name not in TOOLS: return f"ERROR: Unknown tool '{name}'. Available: {', '.join(TOOLS.keys())}" fn = TOOLS[name][0] args_raw = args_raw.strip() if not args_raw: try: return str(fn()) except: return str(fn(".")) # Parse quoted string arguments args = [] i = 0 while i < len(args_raw): # Skip whitespace and commas while i < len(args_raw) and args_raw[i] in ' ,\t\n': i += 1 if i >= len(args_raw): break if args_raw[i] in ('"', "'"): # Quoted string q = args_raw[i]; i += 1; s = "" while i < len(args_raw): c = args_raw[i] if c == '\\' and i+1 < len(args_raw): nc = args_raw[i+1] if nc == 'n': s += '\n'; i += 2; continue elif nc == 't': s += '\t'; i += 2; continue elif nc in ('"', "'", '\\'): s += nc; i += 2; continue if c == q: i += 1; break s += c; i += 1 args.append(s) else: # Unquoted - read until comma j = i while j < len(args_raw) and args_raw[j] != ',': j += 1 args.append(args_raw[i:j].strip()) i = j try: return str(fn(*args)) except Exception as e: try: return str(fn(args_raw.strip('"').strip("'"))) except Exception as e2: return f"ERROR: {e} | {e2}" def run_agent_sync(message, history): """Run agent loop, return list of SSE event dicts.""" msgs = [{"role": "system", "content": SYSTEM}] for h in history: if isinstance(h, (list, tuple)) and len(h) == 2: if h[0]: msgs.append({"role": "user", "content": str(h[0])}) if h[1]: msgs.append({"role": "assistant", "content": str(h[1])}) msgs.append({"role": "user", "content": message}) events = [] full_response = "" accumulated = "" # full assistant turn so far for step in range(15): # Generate next chunk - stop ONLY at [RESULT: so model writes ACTION: freely out = llm.create_chat_completion( messages=msgs, temperature=0.1, max_tokens=700, stop=["[RESULT:"], # stop when model tries to write its own result ) chunk = out["choices"][0]["message"]["content"] finish = out["choices"][0].get("finish_reason", "stop") # Find ACTION: in this chunk action_match = ACTION_RE.search(chunk) if not action_match: # Pure text - emit and done full_response += chunk events.append({"type": "token", "text": chunk}) break # Emit text before ACTION: action_pos = chunk.rfind("ACTION:") pre = chunk[:action_pos].rstrip() if pre: full_response += pre + "\n" events.append({"type": "token", "text": pre + "\n"}) tool_name = action_match.group(1) args_raw = action_match.group(2) # Emit tool_start events.append({"type": "tool_start", "tool": tool_name, "args": {"args": args_raw[:200]}}) # Execute tool result = call_tool(tool_name, args_raw) result_str = str(result)[:2000] events.append({"type": "tool_result", "tool": tool_name, "result": result_str}) # Build assistant turn with result injected accumulated += pre + "\n" if pre else "" accumulated += f"ACTION: {tool_name}({args_raw})\n[RESULT: {result_str}]\n" # Update messages: replace last assistant turn or append if msgs and msgs[-1]["role"] == "assistant": msgs[-1]["content"] = accumulated else: msgs.append({"role": "assistant", "content": accumulated}) full_response += f"[{tool_name}] {result_str[:80]}\n" # Continue generating after result # Add continuation prompt if msgs[-1]["role"] != "user": # Keep same assistant turn - model will continue from where it left off pass events.append({"type": "done", "full": full_response}) return events async def stream_agent(message, history): import asyncio loop = asyncio.get_event_loop() events = await loop.run_in_executor(None, run_agent_sync, message, history) for ev in events: yield f"data: {json.dumps(ev)}\n\n" # ── FastAPI ─────────────────────────────────────────────────────────────────── app = FastAPI(title="AI Agent") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) HTML = r"""