File size: 10,560 Bytes
763ef0d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""
FastAPI backend - AI Developer Agent
====================================
Endpoints:
    GET  /                       service info
    GET  /health                 health check
    GET  /api/runtime            runtime + provider telemetry
    POST /api/tasks              create + run a task (sync queued)
    GET  /api/tasks              list tasks
    GET  /api/tasks/{id}         get task
    GET  /api/tasks/{id}/events  list events (REST)
    GET  /api/tasks/{id}/stream  SSE event stream (live)
    POST /api/chat               one-shot chat (streams)
    POST /api/llm/chat           chat (non-streaming JSON)
    POST /api/deploy/huggingface push backend dir to HF Space
    POST /api/deploy/vercel      deploy frontend dir to Vercel
    POST /api/git/push           commit + push to GitHub branch

All endpoints accept JSON bodies and return JSON unless documented otherwise.
"""
from __future__ import annotations

import asyncio
import json
import logging
import os
import threading
import time
from typing import Any, Dict, List, Optional

from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel

from . import tasks
from .agent import run_task
from .llm_router import get_router
from .executor import get_executor
from . import deployers

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("app")

app = FastAPI(title="AI Developer Agent", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=os.getenv("CORS_ALLOW_ORIGINS", "*").split(","),
    allow_credentials=False,
    allow_methods=["*"],
    allow_headers=["*"],
)


# ---------------------------------------------------------------------------
# In-memory task queue (background worker)
# ---------------------------------------------------------------------------
_task_queue: "asyncio.Queue" = asyncio.Queue()
_active_subscribers: Dict[str, List[asyncio.Queue]] = {}


def _publish(task_id: str, event: Dict[str, Any]) -> None:
    for q in list(_active_subscribers.get(task_id, [])):
        try:
            q.put_nowait(event)
        except Exception:
            pass


def _worker_run(task_id: str, title: str, description: str) -> None:
    """Run the agent generator in a thread and publish events."""
    try:
        for ev in run_task(task_id, title, description):
            _publish(task_id, ev)
    except Exception as e:
        logger.exception("worker crashed")
        _publish(task_id, {"task_id": task_id, "kind": "error", "message": str(e), "ts": time.time(), "data": {}})


# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
class CreateTaskBody(BaseModel):
    title: str
    description: str = ""
    payload: Optional[Dict[str, Any]] = None


class ChatBody(BaseModel):
    messages: List[Dict[str, str]]
    model: Optional[str] = None
    temperature: float = 0.2
    max_tokens: int = 1500
    preferred_provider: Optional[str] = None


class HFDeployBody(BaseModel):
    repo_id: str
    source_dir: str = "."
    commit_message: str = "Update from AI Developer Agent"


class VercelDeployBody(BaseModel):
    project_name: str
    source_dir: str
    framework: Optional[str] = "nextjs"
    target: str = "production"
    install_command: Optional[str] = None
    build_command: Optional[str] = None
    env: Optional[Dict[str, str]] = None


class GitPushBody(BaseModel):
    repo_dir: str = "."
    branch: str = "genspark_ai_developer"
    commit_message: str = "AI Developer Agent commit"
    remote_url: Optional[str] = None


# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/")
def index():
    return {
        "service": "AI Developer Agent",
        "version": "1.0.0",
        "ok": True,
        "endpoints": [
            "/health", "/api/runtime", "/api/tasks", "/api/tasks/{id}/stream",
            "/api/chat", "/api/llm/chat",
            "/api/deploy/huggingface", "/api/deploy/vercel", "/api/git/push",
        ],
    }


@app.get("/health")
def health():
    router = get_router()
    return {
        "ok": True,
        "ts": time.time(),
        "providers": list(router.telemetry().keys()),
        "executor": "e2b" if (get_executor().sandbox and get_executor().sandbox.available) else "local",
    }


@app.get("/api/runtime")
def runtime():
    info = get_executor().inspect_runtime()
    info["providers"] = get_router().telemetry()
    info["db"] = tasks.DB_PATH
    return info


# ----- Tasks ---------------------------------------------------------------
@app.post("/api/tasks")
def create_task(body: CreateTaskBody):
    task_id = tasks.create_task(body.title, body.description, body.payload or {})
    t = threading.Thread(target=_worker_run, args=(task_id, body.title, body.description), daemon=True)
    t.start()
    return {"task_id": task_id, "title": body.title, "state": "queued"}


@app.get("/api/tasks")
def list_tasks(limit: int = 50):
    return {"tasks": tasks.list_tasks(limit=limit)}


@app.get("/api/tasks/{task_id}")
def get_task(task_id: str):
    t = tasks.get_task(task_id)
    if not t:
        raise HTTPException(404, "task not found")
    return t


@app.get("/api/tasks/{task_id}/events")
def get_events(task_id: str, since_id: int = 0, limit: int = 1000):
    return {"events": tasks.get_events(task_id, since_id=since_id, limit=limit)}


@app.get("/api/tasks/{task_id}/stream")
async def stream_events(task_id: str, request: Request):
    """Server-Sent Events stream. Replays historical events then live events."""

    async def gen():
        # 1) Replay history
        last_id = 0
        history = tasks.get_events(task_id, since_id=0, limit=2000)
        for ev in history:
            last_id = ev["id"]
            yield f"id: {ev['id']}\nevent: {ev['kind']}\ndata: {json.dumps(ev)}\n\n"

        # 2) Subscribe for live events
        q: asyncio.Queue = asyncio.Queue()
        _active_subscribers.setdefault(task_id, []).append(q)
        try:
            while True:
                if await request.is_disconnected():
                    break
                try:
                    ev = await asyncio.wait_for(q.get(), timeout=15.0)
                    yield f"event: {ev['kind']}\ndata: {json.dumps(ev)}\n\n"
                    if ev["kind"] in ("done", "error") and ev.get("data", {}).get("final"):
                        break
                except asyncio.TimeoutError:
                    # heartbeat
                    yield ":keepalive\n\n"
        finally:
            try:
                _active_subscribers.get(task_id, []).remove(q)
            except ValueError:
                pass

    return StreamingResponse(gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})


# ----- LLM endpoints -------------------------------------------------------
@app.post("/api/llm/chat")
def llm_chat(body: ChatBody):
    router = get_router()
    try:
        text = router.chat(
            body.messages, model=body.model, temperature=body.temperature,
            max_tokens=body.max_tokens, preferred_provider=body.preferred_provider,
        )
        return {"ok": True, "text": text, "telemetry": router.telemetry()}
    except Exception as e:
        return JSONResponse({"ok": False, "error": str(e)}, status_code=500)


@app.post("/api/chat")
def chat_stream(body: ChatBody):
    """SSE chat stream."""
    router = get_router()

    def gen():
        for chunk in router.stream(
            body.messages, model=body.model, temperature=body.temperature,
            max_tokens=body.max_tokens, preferred_provider=body.preferred_provider,
        ):
            yield f"data: {json.dumps({'delta': chunk})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(gen(), media_type="text/event-stream",
                             headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})


# ----- Deploy endpoints ----------------------------------------------------
@app.post("/api/deploy/huggingface")
def deploy_hf(body: HFDeployBody):
    src = os.path.abspath(body.source_dir)
    if not os.path.isdir(src):
        raise HTTPException(400, f"source_dir not found: {src}")
    r = deployers.hf_push_space(source_dir=src, repo_id=body.repo_id, commit_message=body.commit_message)
    if r.get("ok"):
        tasks.record_deployment("", "huggingface", r.get("url", ""), "ok")
    else:
        tasks.record_deployment("", "huggingface", "", "failed")
    return r


@app.post("/api/deploy/vercel")
def deploy_vercel(body: VercelDeployBody):
    src = os.path.abspath(body.source_dir)
    if not os.path.isdir(src):
        raise HTTPException(400, f"source_dir not found: {src}")
    files = deployers.collect_files_for_vercel(src)
    r = deployers.vercel_deploy_via_api(
        project_name=body.project_name, files=files, target=body.target,
        env=body.env, framework=body.framework,
        install_command=body.install_command, build_command=body.build_command,
    )
    if r.get("ok"):
        tasks.record_deployment("", "vercel", r.get("url", ""), "ok")
    return r


@app.post("/api/git/push")
def git_push(body: GitPushBody):
    repo_dir = os.path.abspath(body.repo_dir)
    if not os.path.isdir(repo_dir):
        raise HTTPException(400, f"repo_dir not found: {repo_dir}")
    return deployers.github_push(
        repo_dir=repo_dir, branch=body.branch,
        commit_message=body.commit_message, remote_url=body.remote_url,
    )


# ---------------------------------------------------------------------------
# Startup self-check
# ---------------------------------------------------------------------------
@app.on_event("startup")
def startup_check():
    logger.info("AI Developer Agent starting")
    try:
        tasks.init_db()
        info = get_executor().inspect_runtime()
        logger.info("Runtime: %s", info)
        logger.info("Providers: %s", list(get_router().telemetry().keys()))
    except Exception as e:
        logger.warning("Startup check error: %s", e)


# Allow running directly
if __name__ == "__main__":
    import uvicorn
    port = int(os.getenv("PORT", "7860"))
    uvicorn.run("apps.backend.app:app", host="0.0.0.0", port=port, log_level="info")