2ns / backend /main.py
proti0070's picture
Update backend/main.py
2ab8d44 verified
import os
import json
import asyncio
import subprocess
import shutil
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import httpx
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
WORKSPACE = Path("/workspace/projects")
WORKSPACE.mkdir(parents=True, exist_ok=True)
CEREBRAS_API_KEY = os.environ.get("CEREBRAS_API_KEY", "")
CLOUDFLARE_ACCOUNT_ID = os.environ.get("CLOUDFLARE_ACCOUNT_ID", "")
CLOUDFLARE_API_KEY = os.environ.get("CLOUDFLARE_API_KEY", "")
# ─── Models ──────────────────────────────────────────────────────────────────
MODELS = {
"qwen": {"provider": "cerebras", "id": "qwen-3-235b-instruct"},
"oss": {"provider": "cerebras", "id": "gpt-oss-120b"},
"llama": {"provider": "cloudflare", "id": "@cf/meta/llama-3.3-70b-instruct-fp8-fast"},
}
# ─── LLM Callers ─────────────────────────────────────────────────────────────
async def call_cerebras(model_id: str, messages: list, tools=None, stream=False):
headers = {
"Authorization": f"Bearer {CEREBRAS_API_KEY}",
"Content-Type": "application/json"
}
body = {"model": model_id, "messages": messages, "stream": stream, "max_tokens": 8192}
if tools:
body["tools"] = tools
body["tool_choice"] = "auto"
async with httpx.AsyncClient(timeout=120) as client:
if stream:
async with client.stream("POST", "https://api.cerebras.ai/v1/chat/completions", headers=headers, json=body) as r:
async for line in r.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
yield json.loads(line[6:])
else:
r = await client.post("https://api.cerebras.ai/v1/chat/completions", headers=headers, json=body)
yield r.json()
async def call_cloudflare(model_id: str, messages: list, stream=False):
url = f"https://api.cloudflare.com/client/v4/accounts/{CLOUDFLARE_ACCOUNT_ID}/ai/run/{model_id}"
headers = {"Authorization": f"Bearer {CLOUDFLARE_API_KEY}", "Content-Type": "application/json"}
body = {"messages": messages, "stream": stream, "max_tokens": 4096}
async with httpx.AsyncClient(timeout=120) as client:
if stream:
async with client.stream("POST", url, headers=headers, json=body) as r:
async for line in r.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
try:
yield json.loads(line[6:])
except:
pass
else:
r = await client.post(url, headers=headers, json=body)
yield r.json()
# ─── Tools Definition ─────────────────────────────────────────────────────────
TOOLS = [
{
"type": "function",
"function": {
"name": "create_file",
"description": "Create or overwrite a file with content",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file's content",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "edit_file",
"description": "Edit a specific part of a file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_content": {"type": "string"},
"new_content": {"type": "string"}
},
"required": ["path", "old_content", "new_content"]
}
}
},
{
"type": "function",
"function": {
"name": "delete_file",
"description": "Delete a file or folder",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "list_files",
"description": "List files in a directory",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "run_command",
"description": "Run a shell command in the project directory",
"parameters": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for information",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"]
}
}
}
]
# ─── Tool Executor ────────────────────────────────────────────────────────────
def execute_tool(name: str, args: dict, project_path: Path) -> str:
try:
if name == "create_file":
fp = project_path / args["path"]
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(args["content"])
return f"βœ… Created: {args['path']}"
elif name == "read_file":
fp = project_path / args["path"]
return fp.read_text() if fp.exists() else f"❌ File not found: {args['path']}"
elif name == "edit_file":
fp = project_path / args["path"]
if not fp.exists():
return f"❌ File not found: {args['path']}"
content = fp.read_text()
new = content.replace(args["old_content"], args["new_content"])
fp.write_text(new)
return f"βœ… Edited: {args['path']}"
elif name == "delete_file":
fp = project_path / args["path"]
if fp.is_dir():
shutil.rmtree(fp)
else:
fp.unlink(missing_ok=True)
return f"βœ… Deleted: {args['path']}"
elif name == "list_files":
fp = project_path / args.get("path", ".")
if not fp.exists():
return "[]"
items = []
for p in sorted(fp.rglob("*")):
rel = p.relative_to(project_path)
items.append({"path": str(rel), "type": "dir" if p.is_dir() else "file"})
return json.dumps(items)
elif name == "run_command":
result = subprocess.run(
args["command"], shell=True, capture_output=True,
text=True, cwd=str(project_path), timeout=30
)
out = result.stdout + result.stderr
return out[:3000] if out else "βœ… Command completed"
elif name == "web_search":
import urllib.request, urllib.parse
q = urllib.parse.quote(args["query"])
url = f"https://api.duckduckgo.com/?q={q}&format=json&no_html=1"
with urllib.request.urlopen(url, timeout=10) as r:
data = json.loads(r.read())
results = []
if data.get("AbstractText"):
results.append(data["AbstractText"])
for item in data.get("RelatedTopics", [])[:3]:
if isinstance(item, dict) and item.get("Text"):
results.append(item["Text"])
return "\n".join(results) if results else "No results found"
except Exception as e:
return f"❌ Error: {str(e)}"
# ─── Agent Endpoint ───────────────────────────────────────────────────────────
class AgentRequest(BaseModel):
project: str
message: str
history: list = []
@app.post("/api/agent")
async def agent_endpoint(req: AgentRequest):
project_path = WORKSPACE / req.project
project_path.mkdir(parents=True, exist_ok=True)
async def stream():
messages = req.history.copy()
abort = False
# Step 1: Qwen3 analyze
yield f"data: {json.dumps({'type': 'thinking', 'model': 'qwen', 'content': '🧠 Qwen3 analyzing your request...'})}\n\n"
analyze_messages = [
{"role": "system", "content": "You are an expert AI agent. Analyze the user request and explain what needs to be done. Be concise."},
{"role": "user", "content": req.message}
]
analysis = ""
async for chunk in call_cerebras(MODELS["qwen"]["id"], analyze_messages):
analysis = chunk.get("choices", [{}])[0].get("message", {}).get("content", "")
yield f"data: {json.dumps({'type': 'analysis', 'content': analysis})}\n\n"
# Step 2: GPT-OSS plan
yield f"data: {json.dumps({'type': 'thinking', 'model': 'oss', 'content': 'πŸ“‹ GPT-OSS creating full plan...'})}\n\n"
plan_messages = [
{"role": "system", "content": "You are a senior software architect. Create a detailed step-by-step plan to complete the task. List each step clearly."},
{"role": "user", "content": f"Task: {req.message}\nAnalysis: {analysis}\n\nCreate a numbered step-by-step plan."}
]
plan = ""
async for chunk in call_cerebras(MODELS["oss"]["id"], plan_messages):
plan = chunk.get("choices", [{}])[0].get("message", {}).get("content", "")
yield f"data: {json.dumps({'type': 'plan', 'content': plan})}\n\n"
# Step 3: Qwen3 execute with tools
yield f"data: {json.dumps({'type': 'thinking', 'model': 'qwen', 'content': '⚑ Qwen3 executing...'})}\n\n"
exec_messages = [
{
"role": "system",
"content": f"""You are an expert coding agent. Execute the plan step by step using tools.
Project directory: {req.project}
Always use relative paths for files.
Plan to execute:
{plan}"""
},
{"role": "user", "content": req.message}
]
max_iterations = 10
iteration = 0
while iteration < max_iterations:
iteration += 1
response_chunks = []
async for chunk in call_cerebras(MODELS["qwen"]["id"], exec_messages, tools=TOOLS):
response_chunks.append(chunk)
if not response_chunks:
break
response = response_chunks[-1]
choice = response.get("choices", [{}])[0]
msg = choice.get("message", {})
finish = choice.get("finish_reason", "")
exec_messages.append({"role": "assistant", "content": msg.get("content", ""), "tool_calls": msg.get("tool_calls")})
# Tool calls
tool_calls = msg.get("tool_calls", [])
if tool_calls:
for tc in tool_calls:
fn = tc["function"]["name"]
args = json.loads(tc["function"]["arguments"])
yield f"data: {json.dumps({'type': 'tool_start', 'tool': fn, 'args': args})}\n\n"
result = execute_tool(fn, args, project_path)
yield f"data: {json.dumps({'type': 'tool_result', 'tool': fn, 'result': result})}\n\n"
exec_messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": result
})
# Debug with GPT-OSS if error
if "❌" in result:
yield f"data: {json.dumps({'type': 'thinking', 'model': 'oss', 'content': 'πŸ”§ GPT-OSS debugging error...'})}\n\n"
debug_messages = [
{"role": "system", "content": "You are a debugging expert. Analyze the error and suggest a fix."},
{"role": "user", "content": f"Tool: {fn}\nArgs: {json.dumps(args)}\nError: {result}\nSuggest a fix."}
]
fix = ""
async for chunk in call_cerebras(MODELS["oss"]["id"], debug_messages):
fix = chunk.get("choices", [{}])[0].get("message", {}).get("content", "")
yield f"data: {json.dumps({'type': 'debug', 'content': fix})}\n\n"
exec_messages.append({"role": "user", "content": f"Debug suggestion: {fix}\nPlease fix and retry."})
if finish == "stop" and not tool_calls:
final_content = msg.get("content", "")
yield f"data: {json.dumps({'type': 'content', 'content': final_content})}\n\n"
break
# Step 4: Llama final response
yield f"data: {json.dumps({'type': 'thinking', 'model': 'llama', 'content': 'πŸ’¬ Preparing final response...'})}\n\n"
final_messages = [
{"role": "system", "content": "You are a helpful assistant. Summarize what was accomplished in a friendly, concise way."},
{"role": "user", "content": f"Task completed: {req.message}\nPlan: {plan}\n\nSummarize what was done."}
]
async for chunk in call_cloudflare(MODELS["llama"]["id"], final_messages, stream=True):
token = chunk.get("response", "")
if token:
yield f"data: {json.dumps({'type': 'final', 'content': token})}\n\n"
# File tree update
files = execute_tool("list_files", {"path": "."}, project_path)
yield f"data: {json.dumps({'type': 'files_update', 'files': json.loads(files)})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return StreamingResponse(stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# ─── Assistant Endpoint ───────────────────────────────────────────────────────
class AssistantRequest(BaseModel):
message: str
history: list = []
model: str = "llama"
@app.post("/api/assistant")
async def assistant_endpoint(req: AssistantRequest):
async def stream():
messages = req.history + [{"role": "user", "content": req.message}]
system = {"role": "system", "content": "You are a helpful coding assistant."}
full_messages = [system] + messages
if req.model == "llama":
async for chunk in call_cloudflare(MODELS["llama"]["id"], full_messages, stream=True):
token = chunk.get("response", "")
if token:
yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
else:
model_id = MODELS.get(req.model, MODELS["qwen"])["id"]
async for chunk in call_cerebras(model_id, full_messages, stream=True):
token = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if token:
yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return StreamingResponse(stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# ─── File API ─────────────────────────────────────────────────────────────────
@app.get("/api/files/{project}")
async def get_files(project: str):
project_path = WORKSPACE / project
project_path.mkdir(parents=True, exist_ok=True)
items = []
for p in sorted(project_path.rglob("*")):
rel = p.relative_to(project_path)
items.append({"path": str(rel), "type": "dir" if p.is_dir() else "file"})
return JSONResponse(items)
@app.get("/api/file/{project}/{path:path}")
async def read_file(project: str, path: str):
fp = WORKSPACE / project / path
if not fp.exists():
return JSONResponse({"error": "Not found"}, status_code=404)
return JSONResponse({"content": fp.read_text()})
class WriteFileRequest(BaseModel):
content: str
@app.post("/api/file/{project}/{path:path}")
async def write_file(project: str, path: str, req: WriteFileRequest):
fp = WORKSPACE / project / path
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(req.content)
return JSONResponse({"ok": True})
@app.delete("/api/file/{project}/{path:path}")
async def delete_file_api(project: str, path: str):
fp = WORKSPACE / project / path
if fp.is_dir():
shutil.rmtree(fp)
else:
fp.unlink(missing_ok=True)
return JSONResponse({"ok": True})
# ─── Projects API ─────────────────────────────────────────────────────────────
@app.get("/api/projects")
async def list_projects():
projects = []
for p in sorted(WORKSPACE.iterdir()):
if p.is_dir():
projects.append({"name": p.name, "path": str(p)})
return JSONResponse(projects)
class NewProjectRequest(BaseModel):
name: str
template: str = "empty"
TEMPLATES = {
"react": {
"src/App.jsx": "import React from 'react'\n\nfunction App() {\n return (\n <div>\n <h1>Hello React!</h1>\n </div>\n )\n}\n\nexport default App\n",
"src/index.js": "import React from 'react'\nimport ReactDOM from 'react-dom/client'\nimport App from './App'\n\nReactDOM.createRoot(document.getElementById('root')).render(<App />)\n",
"index.html": "<!DOCTYPE html>\n<html>\n<head><title>React App</title></head>\n<body><div id='root'></div></body>\n</html>\n",
"package.json": '{\n "name": "react-app",\n "version": "1.0.0",\n "scripts": {"dev": "vite"},\n "dependencies": {"react": "^18.0.0", "react-dom": "^18.0.0"},\n "devDependencies": {"vite": "^5.0.0"}\n}\n'
},
"node": {
"index.js": "const express = require('express')\nconst app = express()\n\napp.get('/', (req, res) => res.send('Hello World!'))\n\napp.listen(3000, () => console.log('Server running on port 3000'))\n",
"package.json": '{\n "name": "node-app",\n "version": "1.0.0",\n "scripts": {"start": "node index.js"},\n "dependencies": {"express": "^4.18.0"}\n}\n'
},
"python": {
"main.py": "def main():\n print('Hello Python!')\n\nif __name__ == '__main__':\n main()\n",
"requirements.txt": "# Add your dependencies here\n"
},
"html": {
"index.html": "<!DOCTYPE html>\n<html lang='en'>\n<head>\n <meta charset='UTF-8'>\n <title>My App</title>\n <link rel='stylesheet' href='style.css'>\n</head>\n<body>\n <h1>Hello World!</h1>\n <script src='app.js'></script>\n</body>\n</html>\n",
"style.css": "body {\n font-family: sans-serif;\n margin: 0;\n padding: 20px;\n}\n",
"app.js": "console.log('Hello from app.js')\n"
}
}
@app.post("/api/projects")
async def create_project(req: NewProjectRequest):
project_path = WORKSPACE / req.name
project_path.mkdir(parents=True, exist_ok=True)
if req.template in TEMPLATES:
for path, content in TEMPLATES[req.template].items():
fp = project_path / path
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return JSONResponse({"ok": True, "name": req.name})
@app.delete("/api/projects/{name}")
async def delete_project(name: str):
project_path = WORKSPACE / name
if project_path.exists():
shutil.rmtree(project_path)
return JSONResponse({"ok": True})
# ─── Shell Endpoint ───────────────────────────────────────────────────────────
class ShellRequest(BaseModel):
project: str
command: str
@app.post("/api/shell")
async def shell_endpoint(req: ShellRequest):
project_path = WORKSPACE / req.project
project_path.mkdir(parents=True, exist_ok=True)
async def stream():
try:
process = await asyncio.create_subprocess_shell(
req.command, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT, cwd=str(project_path)
)
async for line in process.stdout:
yield f"data: {json.dumps({'type': 'output', 'content': line.decode()})}\n\n"
await process.wait()
yield f"data: {json.dumps({'type': 'done', 'code': process.returncode})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
return StreamingResponse(stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# ─── Serve Frontend ───────────────────────────────────────────────────────────
FRONTEND_DIR = Path(__file__).parent.parent / "frontend"
app.mount("/", StaticFiles(directory=str(FRONTEND_DIR), html=True), name="frontend")