| import os |
| import json |
| import asyncio |
| import subprocess |
| import shutil |
| from pathlib import Path |
| from fastapi import FastAPI, Request |
| from fastapi.responses import StreamingResponse, JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from typing import Optional |
| import httpx |
|
|
| app = FastAPI() |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) |
|
|
| WORKSPACE = Path("/workspace/projects") |
| WORKSPACE.mkdir(parents=True, exist_ok=True) |
|
|
| CEREBRAS_API_KEY = os.environ.get("CEREBRAS_API_KEY", "") |
| CLOUDFLARE_ACCOUNT_ID = os.environ.get("CLOUDFLARE_ACCOUNT_ID", "") |
| CLOUDFLARE_API_KEY = os.environ.get("CLOUDFLARE_API_KEY", "") |
|
|
| |
|
|
| MODELS = { |
| "qwen": {"provider": "cerebras", "id": "qwen-3-235b-instruct"}, |
| "oss": {"provider": "cerebras", "id": "gpt-oss-120b"}, |
| "llama": {"provider": "cloudflare", "id": "@cf/meta/llama-3.3-70b-instruct-fp8-fast"}, |
| } |
|
|
| |
|
|
| async def call_cerebras(model_id: str, messages: list, tools=None, stream=False): |
| headers = { |
| "Authorization": f"Bearer {CEREBRAS_API_KEY}", |
| "Content-Type": "application/json" |
| } |
| body = {"model": model_id, "messages": messages, "stream": stream, "max_tokens": 8192} |
| if tools: |
| body["tools"] = tools |
| body["tool_choice"] = "auto" |
| async with httpx.AsyncClient(timeout=120) as client: |
| if stream: |
| async with client.stream("POST", "https://api.cerebras.ai/v1/chat/completions", headers=headers, json=body) as r: |
| async for line in r.aiter_lines(): |
| if line.startswith("data: ") and line != "data: [DONE]": |
| yield json.loads(line[6:]) |
| else: |
| r = await client.post("https://api.cerebras.ai/v1/chat/completions", headers=headers, json=body) |
| yield r.json() |
|
|
| async def call_cloudflare(model_id: str, messages: list, stream=False): |
| url = f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/ai/run/{model_id}" |
| headers = {"Authorization": f"Bearer {CLOUDFLARE_API_KEY}", "Content-Type": "application/json"} |
| body = {"messages": messages, "stream": stream, "max_tokens": 4096} |
| async with httpx.AsyncClient(timeout=120) as client: |
| if stream: |
| async with client.stream("POST", url, headers=headers, json=body) as r: |
| async for line in r.aiter_lines(): |
| if line.startswith("data: ") and line != "data: [DONE]": |
| try: |
| yield json.loads(line[6:]) |
| except: |
| pass |
| else: |
| r = await client.post(url, headers=headers, json=body) |
| yield r.json() |
|
|
| |
|
|
| TOOLS = [ |
| { |
| "type": "function", |
| "function": { |
| "name": "create_file", |
| "description": "Create or overwrite a file with content", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "path": {"type": "string"}, |
| "content": {"type": "string"} |
| }, |
| "required": ["path", "content"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "read_file", |
| "description": "Read a file's content", |
| "parameters": { |
| "type": "object", |
| "properties": {"path": {"type": "string"}}, |
| "required": ["path"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "edit_file", |
| "description": "Edit a specific part of a file", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "path": {"type": "string"}, |
| "old_content": {"type": "string"}, |
| "new_content": {"type": "string"} |
| }, |
| "required": ["path", "old_content", "new_content"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "delete_file", |
| "description": "Delete a file or folder", |
| "parameters": { |
| "type": "object", |
| "properties": {"path": {"type": "string"}}, |
| "required": ["path"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "list_files", |
| "description": "List files in a directory", |
| "parameters": { |
| "type": "object", |
| "properties": {"path": {"type": "string"}}, |
| "required": ["path"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "run_command", |
| "description": "Run a shell command in the project directory", |
| "parameters": { |
| "type": "object", |
| "properties": {"command": {"type": "string"}}, |
| "required": ["command"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "web_search", |
| "description": "Search the web for information", |
| "parameters": { |
| "type": "object", |
| "properties": {"query": {"type": "string"}}, |
| "required": ["query"] |
| } |
| } |
| } |
| ] |
|
|
| |
|
|
| def execute_tool(name: str, args: dict, project_path: Path) -> str: |
| try: |
| if name == "create_file": |
| fp = project_path / args["path"] |
| fp.parent.mkdir(parents=True, exist_ok=True) |
| fp.write_text(args["content"]) |
| return f"β
Created: {args['path']}" |
|
|
| elif name == "read_file": |
| fp = project_path / args["path"] |
| return fp.read_text() if fp.exists() else f"β File not found: {args['path']}" |
|
|
| elif name == "edit_file": |
| fp = project_path / args["path"] |
| if not fp.exists(): |
| return f"β File not found: {args['path']}" |
| content = fp.read_text() |
| new = content.replace(args["old_content"], args["new_content"]) |
| fp.write_text(new) |
| return f"β
Edited: {args['path']}" |
|
|
| elif name == "delete_file": |
| fp = project_path / args["path"] |
| if fp.is_dir(): |
| shutil.rmtree(fp) |
| else: |
| fp.unlink(missing_ok=True) |
| return f"β
Deleted: {args['path']}" |
|
|
| elif name == "list_files": |
| fp = project_path / args.get("path", ".") |
| if not fp.exists(): |
| return "[]" |
| items = [] |
| for p in sorted(fp.rglob("*")): |
| rel = p.relative_to(project_path) |
| items.append({"path": str(rel), "type": "dir" if p.is_dir() else "file"}) |
| return json.dumps(items) |
|
|
| elif name == "run_command": |
| result = subprocess.run( |
| args["command"], shell=True, capture_output=True, |
| text=True, cwd=str(project_path), timeout=30 |
| ) |
| out = result.stdout + result.stderr |
| return out[:3000] if out else "β
Command completed" |
|
|
| elif name == "web_search": |
| import urllib.request, urllib.parse |
| q = urllib.parse.quote(args["query"]) |
| url = f"https://api.duckduckgo.com/?q={q}&format=json&no_html=1" |
| with urllib.request.urlopen(url, timeout=10) as r: |
| data = json.loads(r.read()) |
| results = [] |
| if data.get("AbstractText"): |
| results.append(data["AbstractText"]) |
| for item in data.get("RelatedTopics", [])[:3]: |
| if isinstance(item, dict) and item.get("Text"): |
| results.append(item["Text"]) |
| return "\n".join(results) if results else "No results found" |
|
|
| except Exception as e: |
| return f"β Error: {str(e)}" |
|
|
| |
|
|
| class AgentRequest(BaseModel): |
| project: str |
| message: str |
| history: list = [] |
|
|
| @app.post("/api/agent") |
| async def agent_endpoint(req: AgentRequest): |
| project_path = WORKSPACE / req.project |
| project_path.mkdir(parents=True, exist_ok=True) |
|
|
| async def stream(): |
| messages = req.history.copy() |
| abort = False |
|
|
| |
| yield f"data: {json.dumps({'type': 'thinking', 'model': 'qwen', 'content': 'π§ Qwen3 analyzing your request...'})}\n\n" |
|
|
| analyze_messages = [ |
| {"role": "system", "content": "You are an expert AI agent. Analyze the user request and explain what needs to be done. Be concise."}, |
| {"role": "user", "content": req.message} |
| ] |
| analysis = "" |
| async for chunk in call_cerebras(MODELS["qwen"]["id"], analyze_messages): |
| analysis = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") |
|
|
| yield f"data: {json.dumps({'type': 'analysis', 'content': analysis})}\n\n" |
|
|
| |
| yield f"data: {json.dumps({'type': 'thinking', 'model': 'oss', 'content': 'π GPT-OSS creating full plan...'})}\n\n" |
|
|
| plan_messages = [ |
| {"role": "system", "content": "You are a senior software architect. Create a detailed step-by-step plan to complete the task. List each step clearly."}, |
| {"role": "user", "content": f"Task: {req.message}\nAnalysis: {analysis}\n\nCreate a numbered step-by-step plan."} |
| ] |
| plan = "" |
| async for chunk in call_cerebras(MODELS["oss"]["id"], plan_messages): |
| plan = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") |
|
|
| yield f"data: {json.dumps({'type': 'plan', 'content': plan})}\n\n" |
|
|
| |
| yield f"data: {json.dumps({'type': 'thinking', 'model': 'qwen', 'content': 'β‘ Qwen3 executing...'})}\n\n" |
|
|
| exec_messages = [ |
| { |
| "role": "system", |
| "content": f"""You are an expert coding agent. Execute the plan step by step using tools. |
| Project directory: {req.project} |
| Always use relative paths for files. |
| Plan to execute: |
| {plan}""" |
| }, |
| {"role": "user", "content": req.message} |
| ] |
|
|
| max_iterations = 10 |
| iteration = 0 |
|
|
| while iteration < max_iterations: |
| iteration += 1 |
| response_chunks = [] |
| async for chunk in call_cerebras(MODELS["qwen"]["id"], exec_messages, tools=TOOLS): |
| response_chunks.append(chunk) |
|
|
| if not response_chunks: |
| break |
|
|
| response = response_chunks[-1] |
| choice = response.get("choices", [{}])[0] |
| msg = choice.get("message", {}) |
| finish = choice.get("finish_reason", "") |
|
|
| exec_messages.append({"role": "assistant", "content": msg.get("content", ""), "tool_calls": msg.get("tool_calls")}) |
|
|
| |
| tool_calls = msg.get("tool_calls", []) |
| if tool_calls: |
| for tc in tool_calls: |
| fn = tc["function"]["name"] |
| args = json.loads(tc["function"]["arguments"]) |
|
|
| yield f"data: {json.dumps({'type': 'tool_start', 'tool': fn, 'args': args})}\n\n" |
|
|
| result = execute_tool(fn, args, project_path) |
|
|
| yield f"data: {json.dumps({'type': 'tool_result', 'tool': fn, 'result': result})}\n\n" |
|
|
| exec_messages.append({ |
| "role": "tool", |
| "tool_call_id": tc["id"], |
| "content": result |
| }) |
|
|
| |
| if "β" in result: |
| yield f"data: {json.dumps({'type': 'thinking', 'model': 'oss', 'content': 'π§ GPT-OSS debugging error...'})}\n\n" |
| debug_messages = [ |
| {"role": "system", "content": "You are a debugging expert. Analyze the error and suggest a fix."}, |
| {"role": "user", "content": f"Tool: {fn}\nArgs: {json.dumps(args)}\nError: {result}\nSuggest a fix."} |
| ] |
| fix = "" |
| async for chunk in call_cerebras(MODELS["oss"]["id"], debug_messages): |
| fix = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") |
| yield f"data: {json.dumps({'type': 'debug', 'content': fix})}\n\n" |
| exec_messages.append({"role": "user", "content": f"Debug suggestion: {fix}\nPlease fix and retry."}) |
|
|
| if finish == "stop" and not tool_calls: |
| final_content = msg.get("content", "") |
| yield f"data: {json.dumps({'type': 'content', 'content': final_content})}\n\n" |
| break |
|
|
| |
| yield f"data: {json.dumps({'type': 'thinking', 'model': 'llama', 'content': 'π¬ Preparing final response...'})}\n\n" |
|
|
| final_messages = [ |
| {"role": "system", "content": "You are a helpful assistant. Summarize what was accomplished in a friendly, concise way."}, |
| {"role": "user", "content": f"Task completed: {req.message}\nPlan: {plan}\n\nSummarize what was done."} |
| ] |
|
|
| async for chunk in call_cloudflare(MODELS["llama"]["id"], final_messages, stream=True): |
| token = chunk.get("response", "") |
| if token: |
| yield f"data: {json.dumps({'type': 'final', 'content': token})}\n\n" |
|
|
| |
| files = execute_tool("list_files", {"path": "."}, project_path) |
| yield f"data: {json.dumps({'type': 'files_update', 'files': json.loads(files)})}\n\n" |
| yield f"data: {json.dumps({'type': 'done'})}\n\n" |
|
|
| return StreamingResponse(stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) |
|
|
| |
|
|
| class AssistantRequest(BaseModel): |
| message: str |
| history: list = [] |
| model: str = "llama" |
|
|
| @app.post("/api/assistant") |
| async def assistant_endpoint(req: AssistantRequest): |
| async def stream(): |
| messages = req.history + [{"role": "user", "content": req.message}] |
| system = {"role": "system", "content": "You are a helpful coding assistant."} |
| full_messages = [system] + messages |
|
|
| if req.model == "llama": |
| async for chunk in call_cloudflare(MODELS["llama"]["id"], full_messages, stream=True): |
| token = chunk.get("response", "") |
| if token: |
| yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n" |
| else: |
| model_id = MODELS.get(req.model, MODELS["qwen"])["id"] |
| async for chunk in call_cerebras(model_id, full_messages, stream=True): |
| token = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") |
| if token: |
| yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n" |
|
|
| yield f"data: {json.dumps({'type': 'done'})}\n\n" |
|
|
| return StreamingResponse(stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) |
|
|
| |
|
|
| @app.get("/api/files/{project}") |
| async def get_files(project: str): |
| project_path = WORKSPACE / project |
| project_path.mkdir(parents=True, exist_ok=True) |
| items = [] |
| for p in sorted(project_path.rglob("*")): |
| rel = p.relative_to(project_path) |
| items.append({"path": str(rel), "type": "dir" if p.is_dir() else "file"}) |
| return JSONResponse(items) |
|
|
| @app.get("/api/file/{project}/{path:path}") |
| async def read_file(project: str, path: str): |
| fp = WORKSPACE / project / path |
| if not fp.exists(): |
| return JSONResponse({"error": "Not found"}, status_code=404) |
| return JSONResponse({"content": fp.read_text()}) |
|
|
| class WriteFileRequest(BaseModel): |
| content: str |
|
|
| @app.post("/api/file/{project}/{path:path}") |
| async def write_file(project: str, path: str, req: WriteFileRequest): |
| fp = WORKSPACE / project / path |
| fp.parent.mkdir(parents=True, exist_ok=True) |
| fp.write_text(req.content) |
| return JSONResponse({"ok": True}) |
|
|
| @app.delete("/api/file/{project}/{path:path}") |
| async def delete_file_api(project: str, path: str): |
| fp = WORKSPACE / project / path |
| if fp.is_dir(): |
| shutil.rmtree(fp) |
| else: |
| fp.unlink(missing_ok=True) |
| return JSONResponse({"ok": True}) |
|
|
| |
|
|
| @app.get("/api/projects") |
| async def list_projects(): |
| projects = [] |
| for p in sorted(WORKSPACE.iterdir()): |
| if p.is_dir(): |
| projects.append({"name": p.name, "path": str(p)}) |
| return JSONResponse(projects) |
|
|
| class NewProjectRequest(BaseModel): |
| name: str |
| template: str = "empty" |
|
|
| TEMPLATES = { |
| "react": { |
| "src/App.jsx": "import React from 'react'\n\nfunction App() {\n return (\n <div>\n <h1>Hello React!</h1>\n </div>\n )\n}\n\nexport default App\n", |
| "src/index.js": "import React from 'react'\nimport ReactDOM from 'react-dom/client'\nimport App from './App'\n\nReactDOM.createRoot(document.getElementById('root')).render(<App />)\n", |
| "index.html": "<!DOCTYPE html>\n<html>\n<head><title>React App</title></head>\n<body><div id='root'></div></body>\n</html>\n", |
| "package.json": '{\n "name": "react-app",\n "version": "1.0.0",\n "scripts": {"dev": "vite"},\n "dependencies": {"react": "^18.0.0", "react-dom": "^18.0.0"},\n "devDependencies": {"vite": "^5.0.0"}\n}\n' |
| }, |
| "node": { |
| "index.js": "const express = require('express')\nconst app = express()\n\napp.get('/', (req, res) => res.send('Hello World!'))\n\napp.listen(3000, () => console.log('Server running on port 3000'))\n", |
| "package.json": '{\n "name": "node-app",\n "version": "1.0.0",\n "scripts": {"start": "node index.js"},\n "dependencies": {"express": "^4.18.0"}\n}\n' |
| }, |
| "python": { |
| "main.py": "def main():\n print('Hello Python!')\n\nif __name__ == '__main__':\n main()\n", |
| "requirements.txt": "# Add your dependencies here\n" |
| }, |
| "html": { |
| "index.html": "<!DOCTYPE html>\n<html lang='en'>\n<head>\n <meta charset='UTF-8'>\n <title>My App</title>\n <link rel='stylesheet' href='style.css'>\n</head>\n<body>\n <h1>Hello World!</h1>\n <script src='app.js'></script>\n</body>\n</html>\n", |
| "style.css": "body {\n font-family: sans-serif;\n margin: 0;\n padding: 20px;\n}\n", |
| "app.js": "console.log('Hello from app.js')\n" |
| } |
| } |
|
|
| @app.post("/api/projects") |
| async def create_project(req: NewProjectRequest): |
| project_path = WORKSPACE / req.name |
| project_path.mkdir(parents=True, exist_ok=True) |
| if req.template in TEMPLATES: |
| for path, content in TEMPLATES[req.template].items(): |
| fp = project_path / path |
| fp.parent.mkdir(parents=True, exist_ok=True) |
| fp.write_text(content) |
| return JSONResponse({"ok": True, "name": req.name}) |
|
|
| @app.delete("/api/projects/{name}") |
| async def delete_project(name: str): |
| project_path = WORKSPACE / name |
| if project_path.exists(): |
| shutil.rmtree(project_path) |
| return JSONResponse({"ok": True}) |
|
|
| |
|
|
| class ShellRequest(BaseModel): |
| project: str |
| command: str |
|
|
| @app.post("/api/shell") |
| async def shell_endpoint(req: ShellRequest): |
| project_path = WORKSPACE / req.project |
| project_path.mkdir(parents=True, exist_ok=True) |
|
|
| async def stream(): |
| try: |
| process = await asyncio.create_subprocess_shell( |
| req.command, stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.STDOUT, cwd=str(project_path) |
| ) |
| async for line in process.stdout: |
| yield f"data: {json.dumps({'type': 'output', 'content': line.decode()})}\n\n" |
| await process.wait() |
| yield f"data: {json.dumps({'type': 'done', 'code': process.returncode})}\n\n" |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n" |
|
|
| return StreamingResponse(stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) |
|
|
| |
|
|
| FRONTEND_DIR = Path(__file__).parent.parent / "frontend" |
| app.mount("/", StaticFiles(directory=str(FRONTEND_DIR), html=True), name="frontend") |
|
|