diff --git a/backend/Dockerfile b/backend/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..c7b009e446fb177a8e9d177d8944d03707d0d101 --- /dev/null +++ b/backend/Dockerfile @@ -0,0 +1,30 @@ +FROM python:3.11-slim + +# Install system deps +RUN apt-get update && apt-get install -y \ + git curl build-essential libssl-dev \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Install Python deps +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy source +COPY . . + +# Create workspace +RUN mkdir -p /tmp/workspace /tmp/repos + +# HuggingFace Spaces runs as user 1000 +RUN useradd -m -u 1000 user && chown -R user:user /app /tmp/workspace /tmp/repos +USER 1000 + +EXPOSE 7860 + +ENV PORT=7860 +ENV HOST=0.0.0.0 +ENV DB_PATH=/tmp/devin_agent.db + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860", "--workers", "1", "--loop", "asyncio"] diff --git a/backend/Dockerfile.hf b/backend/Dockerfile.hf new file mode 100644 index 0000000000000000000000000000000000000000..c1c88d894a4559758feb5f87f1b540a89324ff8f --- /dev/null +++ b/backend/Dockerfile.hf @@ -0,0 +1,48 @@ +FROM python:3.11-slim + +# HuggingFace Spaces Dockerfile +# Compatible with free CPU tier + +WORKDIR /app + +# System deps +RUN apt-get update && apt-get install -y \ + git curl build-essential \ + && rm -rf /var/lib/apt/lists/* + +# Python deps +COPY requirements.txt . +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r requirements.txt + +# App code +COPY . . + +# Setup dirs +RUN mkdir -p /tmp/workspace /tmp/repos /tmp/devin_data + +# HF runs as uid 1000 +RUN useradd -m -u 1000 user 2>/dev/null || true +RUN chown -R 1000:1000 /app /tmp/workspace /tmp/repos /tmp/devin_data + +USER 1000 + +EXPOSE 7860 + +ENV PORT=7860 +ENV HOST=0.0.0.0 +ENV DB_PATH=/tmp/devin_agent.db +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \ + CMD curl -f http://localhost:7860/api/v1/health || exit 1 + +CMD ["uvicorn", "main:app", \ + "--host", "0.0.0.0", \ + "--port", "7860", \ + "--workers", "1", \ + "--loop", "asyncio", \ + "--timeout-keep-alive", "75", \ + "--log-level", "info"] diff --git a/backend/README.md b/backend/README.md new file mode 100644 index 0000000000000000000000000000000000000000..8d5b15bc9685fe2c43f49bfbda07c63f91613ef3 --- /dev/null +++ b/backend/README.md @@ -0,0 +1,58 @@ +--- +title: Devin Agent Platform +emoji: πŸ€– +colorFrom: blue +colorTo: purple +sdk: docker +app_port: 7860 +pinned: true +license: mit +short_description: Production-grade autonomous AI engineering platform +--- + +# πŸ€– Devin Agent Platform v2.0 + +> **Manus/Devin-style Autonomous AI Engineering Platform** +> Real-time WebSocket streaming Β· Autonomous GitHub operations Β· Persistent memory + +## ✨ Features + +- ⚑ **Real-time WebSocket streaming** β€” live token-by-token LLM output +- πŸ—ΊοΈ **Autonomous task planning** β€” goal β†’ plan β†’ execute automatically +- 🧠 **Persistent memory** β€” SQLite-backed conversation + project memory +- πŸ™ **GitHub automation** β€” clone, commit, push, PR, issues autonomously +- πŸ” **Self-healing** β€” auto-retry with exponential backoff +- πŸ“‘ **SSE fallback** β€” Server-Sent Events for streaming compatibility +- 🌐 **REST + WebSocket API** β€” full-featured backend + +## πŸ”Œ API Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/v1/tasks/create` | Create autonomous task | +| GET | `/api/v1/tasks/{id}` | Get task details | +| POST | `/api/v1/tasks/{id}/cancel` | Cancel task | +| POST | `/api/v1/tasks/{id}/retry` | Retry failed task | +| GET | `/api/v1/tasks/{id}/stream` | SSE task stream | +| POST | `/api/v1/chat` | Chat with agent | +| POST | `/api/v1/goal` | Submit high-level goal | +| POST | `/api/v1/plan` | Generate execution plan | +| WS | `/ws/tasks/{task_id}` | Live task WebSocket | +| WS | `/ws/logs` | Global log stream | +| WS | `/ws/chat/{session_id}` | Chat WebSocket | +| WS | `/ws/agent/status` | Agent status stream | + +## πŸ”‘ Environment Variables (HF Secrets) + +``` +OPENAI_API_KEY = sk-... (for real AI) +ANTHROPIC_API_KEY = sk-ant-... (alternative) +GITHUB_TOKEN = ghp_... (GitHub ops) +GITHUB_OWNER = your-username (GitHub ops) +``` + +## πŸš€ Quick Start + +Visit `/api/docs` for interactive Swagger UI. + +**Demo mode** works without any API keys β€” set `OPENAI_API_KEY` for real AI. diff --git a/backend/__pycache__/main.cpython-312.pyc b/backend/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0a6043f0154f0fc529cbf6cd350edd46c5c0a266 Binary files /dev/null and b/backend/__pycache__/main.cpython-312.pyc differ diff --git a/backend/api/__init__.py b/backend/api/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/backend/api/__pycache__/__init__.cpython-312.pyc b/backend/api/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..56c0c66ef7122cd03c986d89e42750d26fcb0ab6 Binary files /dev/null and b/backend/api/__pycache__/__init__.cpython-312.pyc differ diff --git a/backend/api/__pycache__/websocket_manager.cpython-312.pyc b/backend/api/__pycache__/websocket_manager.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..39fe470d2d47dfadd2d4e08f78c620aefad89ac6 Binary files /dev/null and b/backend/api/__pycache__/websocket_manager.cpython-312.pyc differ diff --git a/backend/api/routes/__init__.py b/backend/api/routes/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/backend/api/routes/__pycache__/__init__.cpython-312.pyc b/backend/api/routes/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5ccc91e45f72dfcd838292572a03de91995f0383 Binary files /dev/null and b/backend/api/routes/__pycache__/__init__.cpython-312.pyc differ diff --git a/backend/api/routes/__pycache__/chat.cpython-312.pyc b/backend/api/routes/__pycache__/chat.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..59bb359f9f22fbfe51c7b297cca6ac42c1c78254 Binary files /dev/null and b/backend/api/routes/__pycache__/chat.cpython-312.pyc differ diff --git a/backend/api/routes/__pycache__/github.cpython-312.pyc b/backend/api/routes/__pycache__/github.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..32bc13856652ed46d33b9497e7325e31674d549a Binary files /dev/null and b/backend/api/routes/__pycache__/github.cpython-312.pyc differ diff --git a/backend/api/routes/__pycache__/health.cpython-312.pyc b/backend/api/routes/__pycache__/health.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7b5fcc886a5431427c10b62efaba98c9c2c17db1 Binary files /dev/null and b/backend/api/routes/__pycache__/health.cpython-312.pyc differ diff --git a/backend/api/routes/__pycache__/memory.cpython-312.pyc b/backend/api/routes/__pycache__/memory.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..82cf4b4d288bba84b1bed52730e51b20b2f738d1 Binary files /dev/null and b/backend/api/routes/__pycache__/memory.cpython-312.pyc differ diff --git a/backend/api/routes/__pycache__/tasks.cpython-312.pyc b/backend/api/routes/__pycache__/tasks.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..93b690aa5eac9b4a354e0d9a727de72c5393c1f9 Binary files /dev/null and b/backend/api/routes/__pycache__/tasks.cpython-312.pyc differ diff --git a/backend/api/routes/chat.py b/backend/api/routes/chat.py new file mode 100644 index 0000000000000000000000000000000000000000..9f46ee598bbdbb3aa29f17ed6ba8147054c9b397 --- /dev/null +++ b/backend/api/routes/chat.py @@ -0,0 +1,214 @@ +""" +Chat + Goal API Routes β€” Real-time streaming responses +""" + +import asyncio +import json +import time +import uuid + +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import StreamingResponse + +from core.models import ChatRequest, GoalRequest, TaskCreateRequest +from memory.db import save_memory, get_history + +router = APIRouter() + + +def get_engine(request: Request): + return request.app.state.task_engine + + +def get_ws(request: Request): + return request.app.state.ws_manager + + +# ─── Chat (REST + SSE streaming) ─────────────────────────────────────────────── + +@router.post("/chat", summary="Chat with the agent") +async def chat(req: ChatRequest, request: Request): + from core.agent import AgentCore + ws = get_ws(request) + agent = AgentCore(ws) + + messages = [{"role": m.role, "content": m.content} for m in req.messages] + + if req.stream: + async def stream_gen(): + async def _run(): + result = await agent.llm_stream( + messages=messages, + session_id=req.session_id, + model=req.model, + temperature=req.temperature, + max_tokens=req.max_tokens, + ) + await save_memory( + content=result, + memory_type="conversation", + session_id=req.session_id, + project_id=req.project_id, + key="assistant", + ) + # Save user message too + user_msg = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") + await save_memory( + content=user_msg, + memory_type="conversation", + session_id=req.session_id, + project_id=req.project_id, + key="user", + ) + return result + + room_buffer = [] + original_emit_chat = ws.emit_chat + async def capture_emit(sid, etype, data): + if etype == "llm_chunk": + chunk = data.get("chunk", "") + room_buffer.append(chunk) + yield_data = json.dumps({"type": etype, "data": data, "session_id": sid}) + return yield_data + return None + + # Stream tokens directly + full = "" + from core.agent import AgentCore as _A + import httpx + import os + OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") + ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") + + if OPENAI_API_KEY: + headers = { + "Authorization": f"Bearer {OPENAI_API_KEY}", + "Content-Type": "application/json", + } + payload = { + "model": req.model, + "messages": messages, + "stream": True, + "temperature": req.temperature, + "max_tokens": req.max_tokens, + } + from core.agent import OPENAI_BASE_URL + async with httpx.AsyncClient(timeout=120) as client: + async with client.stream("POST", f"{OPENAI_BASE_URL}/chat/completions", + headers=headers, json=payload) as resp: + async for line in resp.aiter_lines(): + if not line.startswith("data:"): + continue + chunk_str = line[6:].strip() + if chunk_str == "[DONE]": + break + try: + data = json.loads(chunk_str) + delta = data["choices"][0]["delta"].get("content", "") + if delta: + full += delta + yield f"data: {json.dumps({'type': 'llm_chunk', 'data': {'chunk': delta}, 'session_id': req.session_id})}\n\n" + except Exception: + pass + else: + # Demo streaming + demo = ( + f"Hello! I'm your Devin-style AI Agent. I received: '{req.messages[-1].content[:80]}'. " + f"Set OPENAI_API_KEY or ANTHROPIC_API_KEY for real AI responses. " + f"I support real-time streaming, task planning, GitHub automation, and more!" + ) + for word in demo.split(): + chunk = word + " " + full += chunk + await asyncio.sleep(0.04) + yield f"data: {json.dumps({'type': 'llm_chunk', 'data': {'chunk': chunk}, 'session_id': req.session_id})}\n\n" + + yield f"data: {json.dumps({'type': 'stream_end', 'data': {'full_response': full}, 'session_id': req.session_id})}\n\n" + + return StreamingResponse( + stream_gen(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + else: + # Non-streaming + agent = AgentCore(get_ws(request)) + result = await agent.llm_stream(messages, session_id=req.session_id) + return { + "response": result, + "session_id": req.session_id, + "model": req.model, + "timestamp": time.time(), + } + + +@router.post("/chat/stream", summary="Explicit streaming chat endpoint") +async def chat_stream(req: ChatRequest, request: Request): + req.stream = True + return await chat(req, request) + + +# ─── Goal API (create task from goal) ───────────────────────────────────────── + +@router.post("/goal", summary="Submit a high-level goal to the agent") +async def submit_goal(req: GoalRequest, request: Request): + engine = get_engine(request) + task_req = TaskCreateRequest( + goal=req.goal, + session_id=req.session_id, + project_id=req.project_id, + stream=req.stream, + metadata={"source": "goal_api", "github_repo": req.github_repo}, + ) + task_id = await engine.submit(task_req) + return { + "task_id": task_id, + "goal": req.goal, + "status": "queued", + "session_id": req.session_id, + "ws_url": f"/ws/tasks/{task_id}", + "stream_url": f"/api/v1/tasks/{task_id}/stream", + } + + +@router.post("/goal/stream", summary="Submit goal with SSE streaming response") +async def submit_goal_stream(req: GoalRequest, request: Request): + req.stream = True + return await submit_goal(req, request) + + +# ─── Execute (direct tool execution) ────────────────────────────────────────── + +@router.post("/execute", summary="Execute a tool directly") +async def execute( + tool: str, + task: str, + request: Request, + session_id: str = "", +): + from tools.executor import ToolExecutor + ws = get_ws(request) + executor = ToolExecutor(ws) + result = await executor.run( + tool=tool, + task=task, + session_id=session_id, + ) + return {"tool": tool, "task": task, "result": result, "session_id": session_id} + + +# ─── Plan (generate plan without executing) ─────────────────────────────────── + +@router.post("/plan", summary="Generate execution plan for a goal") +async def generate_plan(req: GoalRequest, request: Request): + from core.agent import AgentCore + ws = get_ws(request) + agent = AgentCore(ws) + task_id = f"plan_{uuid.uuid4().hex[:8]}" + plan = await agent.plan(goal=req.goal, task_id=task_id, session_id=req.session_id) + return { + "goal": req.goal, + "plan": plan.model_dump(), + "session_id": req.session_id, + "task_id": task_id, + } diff --git a/backend/api/routes/github.py b/backend/api/routes/github.py new file mode 100644 index 0000000000000000000000000000000000000000..d6dc31c778141f8b1633e2ea90254827770b17d3 --- /dev/null +++ b/backend/api/routes/github.py @@ -0,0 +1,336 @@ +""" +GitHub Autonomous Engineering API Routes +Clone, commit, push, PR, issues β€” all autonomous +""" + +import os +import time +import asyncio +import tempfile +import shutil +from typing import Optional + +import httpx +from fastapi import APIRouter, HTTPException, Request + +from core.models import ( + GitHubCloneRequest, GitHubCreateRepoRequest, + GitHubCommitRequest, GitHubPRRequest, GitHubIssueRequest, +) +from memory.db import save_memory + +router = APIRouter() + +GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "") +GITHUB_OWNER = os.environ.get("GITHUB_OWNER", "") +GITHUB_API = "https://api.github.com" + + +def gh_headers(): + if not GITHUB_TOKEN: + raise HTTPException(status_code=400, detail="GITHUB_TOKEN not configured") + return { + "Authorization": f"Bearer {GITHUB_TOKEN}", + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + } + + +async def gh_get(path: str) -> dict: + async with httpx.AsyncClient(timeout=30) as client: + r = await client.get(f"{GITHUB_API}{path}", headers=gh_headers()) + r.raise_for_status() + return r.json() + + +async def gh_post(path: str, data: dict) -> dict: + async with httpx.AsyncClient(timeout=30) as client: + r = await client.post(f"{GITHUB_API}{path}", headers=gh_headers(), json=data) + r.raise_for_status() + return r.json() + + +async def gh_put(path: str, data: dict) -> dict: + async with httpx.AsyncClient(timeout=30) as client: + r = await client.put(f"{GITHUB_API}{path}", headers=gh_headers(), json=data) + r.raise_for_status() + return r.json() + + +async def gh_patch(path: str, data: dict) -> dict: + async with httpx.AsyncClient(timeout=30) as client: + r = await client.patch(f"{GITHUB_API}{path}", headers=gh_headers(), json=data) + r.raise_for_status() + return r.json() + + +# ─── Clone ──────────────────────────────────────────────────────────────────── + +@router.post("/clone", summary="Clone a GitHub repository") +async def clone_repo(req: GitHubCloneRequest): + try: + import git + except ImportError: + raise HTTPException(status_code=500, detail="gitpython not installed") + + local_path = req.local_path or f"/tmp/repos/{req.repo_url.split('/')[-1].replace('.git', '')}" + os.makedirs(local_path, exist_ok=True) + + if GITHUB_TOKEN: + url = req.repo_url.replace("https://", f"https://{GITHUB_TOKEN}@") + else: + url = req.repo_url + + try: + if os.path.exists(os.path.join(local_path, ".git")): + repo = git.Repo(local_path) + repo.remotes.origin.pull() + action = "pulled" + else: + repo = git.Repo.clone_from(url, local_path, branch=req.branch, depth=1) + action = "cloned" + + files = [] + for root, dirs, fnames in os.walk(local_path): + dirs[:] = [d for d in dirs if d not in [".git", "node_modules", "__pycache__"]] + for f in fnames[:50]: + files.append(os.path.relpath(os.path.join(root, f), local_path)) + + # Save to memory + await save_memory( + content=f"Repo {req.repo_url} cloned to {local_path}. Files: {', '.join(files[:20])}", + memory_type="repo", + key=req.repo_url, + ) + + return { + "action": action, + "repo_url": req.repo_url, + "local_path": local_path, + "branch": req.branch, + "files_count": len(files), + "files": files[:30], + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Clone failed: {str(e)}") + + +# ─── Create Repo ────────────────────────────────────────────────────────────── + +@router.post("/create_repo", summary="Create a new GitHub repository") +async def create_repo(req: GitHubCreateRepoRequest): + data = { + "name": req.name, + "description": req.description, + "private": req.private, + "auto_init": req.auto_init, + } + try: + result = await gh_post("/user/repos", data) + return { + "repo": result["full_name"], + "url": result["html_url"], + "clone_url": result["clone_url"], + "default_branch": result.get("default_branch", "main"), + "private": result["private"], + } + except httpx.HTTPStatusError as e: + raise HTTPException(status_code=e.response.status_code, detail=e.response.text) + + +# ─── Commit Files ───────────────────────────────────────────────────────────── + +@router.post("/commit", summary="Commit files to a repository") +async def commit_files(req: GitHubCommitRequest): + import base64 + + owner_repo = req.repo if "/" in req.repo else f"{GITHUB_OWNER}/{req.repo}" + results = [] + + for file_path, content in req.files.items(): + encoded = base64.b64encode(content.encode()).decode() + + # Get current SHA if file exists + sha = None + try: + existing = await gh_get(f"/repos/{owner_repo}/contents/{file_path}?ref={req.branch}") + sha = existing.get("sha") + except Exception: + pass + + payload = { + "message": req.message, + "content": encoded, + "branch": req.branch, + } + if sha: + payload["sha"] = sha + + try: + result = await gh_put(f"/repos/{owner_repo}/contents/{file_path}", payload) + results.append({"file": file_path, "status": "committed", "sha": result["content"]["sha"]}) + except Exception as e: + results.append({"file": file_path, "status": "error", "error": str(e)}) + + return { + "repo": owner_repo, + "branch": req.branch, + "message": req.message, + "files": results, + "committed": sum(1 for r in results if r["status"] == "committed"), + } + + +# ─── Push ───────────────────────────────────────────────────────────────────── + +@router.post("/push", summary="Push local changes to remote") +async def push_changes( + repo_path: str, + branch: str = "main", + message: str = "Auto-commit by Devin Agent", +): + try: + import git + repo = git.Repo(repo_path) + repo.git.add(A=True) + if repo.index.diff("HEAD") or repo.untracked_files: + repo.index.commit(message) + origin = repo.remote("origin") + origin.push(refspec=f"HEAD:{branch}") + return {"status": "pushed", "branch": branch, "message": message} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Push failed: {str(e)}") + + +# ─── Create PR ──────────────────────────────────────────────────────────────── + +@router.post("/pr/create", summary="Create a Pull Request") +async def create_pr(req: GitHubPRRequest): + owner_repo = req.repo if "/" in req.repo else f"{GITHUB_OWNER}/{req.repo}" + data = { + "title": req.title, + "body": req.body, + "head": req.head, + "base": req.base, + "draft": req.draft, + } + try: + result = await gh_post(f"/repos/{owner_repo}/pulls", data) + return { + "pr_number": result["number"], + "title": result["title"], + "url": result["html_url"], + "state": result["state"], + "head": req.head, + "base": req.base, + } + except httpx.HTTPStatusError as e: + raise HTTPException(status_code=e.response.status_code, detail=e.response.text) + + +# ─── Create Issue ───────────────────────────────────────────────────────────── + +@router.post("/issues/create", summary="Create a GitHub Issue") +async def create_issue(req: GitHubIssueRequest): + owner_repo = req.repo if "/" in req.repo else f"{GITHUB_OWNER}/{req.repo}" + data = {"title": req.title, "body": req.body, "labels": req.labels} + try: + result = await gh_post(f"/repos/{owner_repo}/issues", data) + return { + "issue_number": result["number"], + "title": result["title"], + "url": result["html_url"], + "state": result["state"], + } + except httpx.HTTPStatusError as e: + raise HTTPException(status_code=e.response.status_code, detail=e.response.text) + + +# ─── Code Review ────────────────────────────────────────────────────────────── + +@router.post("/review", summary="AI code review for a PR") +async def review_pr(repo: str, pr_number: int, request: Request): + owner_repo = repo if "/" in repo else f"{GITHUB_OWNER}/{repo}" + try: + pr = await gh_get(f"/repos/{owner_repo}/pulls/{pr_number}") + files = await gh_get(f"/repos/{owner_repo}/pulls/{pr_number}/files") + + file_changes = [] + for f in files[:10]: + file_changes.append(f"{f['filename']}: +{f.get('additions',0)}/-{f.get('deletions',0)}") + + ws = request.app.state.ws_manager + from core.agent import AgentCore + agent = AgentCore(ws) + + review_prompt = ( + f"Review this Pull Request:\n" + f"Title: {pr['title']}\n" + f"Description: {pr.get('body', 'No description')}\n" + f"Files changed: {chr(10).join(file_changes)}\n\n" + f"Provide a constructive code review with: summary, potential issues, suggestions, and verdict." + ) + messages = [ + {"role": "system", "content": "You are a senior software engineer doing code review. Be constructive, specific, and helpful."}, + {"role": "user", "content": review_prompt}, + ] + review = await agent.llm_stream(messages) + + # Post review comment + if GITHUB_TOKEN: + await gh_post(f"/repos/{owner_repo}/issues/{pr_number}/comments", {"body": f"πŸ€– **Devin Agent Code Review**\n\n{review}"}) + + return { + "pr_number": pr_number, + "title": pr["title"], + "review": review, + "files_reviewed": len(files), + "posted_to_github": bool(GITHUB_TOKEN), + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +# ─── Repo Info ──────────────────────────────────────────────────────────────── + +@router.get("/repo/{owner}/{repo}", summary="Get repository info") +async def get_repo_info(owner: str, repo: str): + try: + info = await gh_get(f"/repos/{owner}/{repo}") + return { + "name": info["name"], + "full_name": info["full_name"], + "description": info.get("description"), + "url": info["html_url"], + "default_branch": info["default_branch"], + "language": info.get("language"), + "stars": info["stargazers_count"], + "forks": info["forks_count"], + "open_issues": info["open_issues_count"], + "private": info["private"], + } + except httpx.HTTPStatusError as e: + raise HTTPException(status_code=e.response.status_code, detail=e.response.text) + + +# ─── Status check ───────────────────────────────────────────────────────────── + +@router.get("/status", summary="GitHub integration status") +async def github_status(): + configured = bool(GITHUB_TOKEN) + user = None + if configured: + try: + user_info = await gh_get("/user") + user = user_info.get("login") + except Exception: + configured = False + return { + "configured": configured, + "user": user, + "owner": GITHUB_OWNER or user, + "capabilities": [ + "clone", "create_repo", "commit", "push", + "pr/create", "issues/create", "review" + ], + } diff --git a/backend/api/routes/health.py b/backend/api/routes/health.py new file mode 100644 index 0000000000000000000000000000000000000000..23f54e1467b868ea8deb41de78d6ea1c497a8cd0 --- /dev/null +++ b/backend/api/routes/health.py @@ -0,0 +1,53 @@ +""" +Health + Status Routes +""" + +import time +import os +import psutil +from fastapi import APIRouter, Request + +router = APIRouter() + + +@router.get("/health", summary="Health check") +async def health(request: Request): + ws = request.app.state.ws_manager + engine = request.app.state.task_engine + stats = ws.get_stats() + return { + "status": "healthy", + "version": "2.0.0", + "timestamp": time.time(), + "websocket_connections": stats["total_connections"], + "websocket_rooms": list(stats["rooms"].keys()), + "task_queue_size": engine._queue.qsize(), + "active_tasks": len(engine._active), + "llm": { + "openai": bool(os.environ.get("OPENAI_API_KEY")), + "anthropic": bool(os.environ.get("ANTHROPIC_API_KEY")), + "model": os.environ.get("DEFAULT_MODEL", "gpt-4o"), + }, + "github": bool(os.environ.get("GITHUB_TOKEN")), + } + + +@router.get("/metrics", summary="System metrics") +async def metrics(): + cpu = psutil.cpu_percent(interval=0.1) + mem = psutil.virtual_memory() + disk = psutil.disk_usage("/") + return { + "cpu_percent": cpu, + "memory": { + "total_mb": round(mem.total / 1024 / 1024), + "used_mb": round(mem.used / 1024 / 1024), + "percent": mem.percent, + }, + "disk": { + "total_gb": round(disk.total / 1024 / 1024 / 1024, 1), + "used_gb": round(disk.used / 1024 / 1024 / 1024, 1), + "percent": disk.percent, + }, + "timestamp": time.time(), + } diff --git a/backend/api/routes/memory.py b/backend/api/routes/memory.py new file mode 100644 index 0000000000000000000000000000000000000000..52f485689944fb2bca0f71f4b31172da3176503d --- /dev/null +++ b/backend/api/routes/memory.py @@ -0,0 +1,50 @@ +""" +Memory API Routes β€” Persistent agent memory +""" + +import time +from fastapi import APIRouter, HTTPException, Query +from core.models import MemorySaveRequest, MemorySearchRequest +from memory.db import save_memory, search_memory, get_project_memory, get_history + +router = APIRouter() + + +@router.post("/", summary="Save memory") +async def save(req: MemorySaveRequest): + await save_memory( + content=req.content, + memory_type=req.memory_type.value, + session_id=req.session_id, + project_id=req.project_id, + key=req.key, + metadata=req.metadata, + ) + return {"status": "saved", "memory_type": req.memory_type, "timestamp": time.time()} + + +@router.post("/search", summary="Search memory") +async def search(req: MemorySearchRequest): + results = await search_memory( + query=req.query, + session_id=req.session_id, + project_id=req.project_id, + limit=req.limit, + ) + return {"results": results, "total": len(results), "query": req.query} + + +@router.get("/project/{project_id}", summary="Get project memory") +async def project_memory( + project_id: str, + memory_type: str = Query(default=""), + limit: int = Query(default=100, le=500), +): + results = await get_project_memory(project_id, memory_type=memory_type, limit=limit) + return {"project_id": project_id, "memories": results, "total": len(results)} + + +@router.get("/history/{session_id}", summary="Get conversation history") +async def history(session_id: str, limit: int = Query(default=50, le=200)): + results = await get_history(session_id, limit=limit) + return {"session_id": session_id, "history": results, "total": len(results)} diff --git a/backend/api/routes/tasks.py b/backend/api/routes/tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..82fde607690b1a939594ec278ff6191888af1e63 --- /dev/null +++ b/backend/api/routes/tasks.py @@ -0,0 +1,167 @@ +""" +Task API Routes β€” CRUD + Streaming + WebSocket +""" + +import asyncio +import json +import time +from typing import Optional + +from fastapi import APIRouter, HTTPException, Request, Query +from fastapi.responses import StreamingResponse + +from core.models import ( + TaskCreateRequest, TaskCancelRequest, TaskRetryRequest, TaskResponse, TaskStatus +) +from memory.db import get_task, list_tasks, get_task_events, update_task_status + +router = APIRouter() + + +def get_engine(request: Request): + return request.app.state.task_engine + + +def get_ws(request: Request): + return request.app.state.ws_manager + + +# ─── Create Task ─────────────────────────────────────────────────────────────── + +@router.post("/create", summary="Create & queue a new agent task") +async def create_task(req: TaskCreateRequest, request: Request): + engine = get_engine(request) + task_id = await engine.submit(req) + task = await get_task(task_id) + return { + "task_id": task_id, + "status": "queued", + "goal": req.goal, + "session_id": req.session_id, + "stream_url": f"/api/v1/tasks/{task_id}/stream", + "ws_url": f"/ws/tasks/{task_id}", + "created_at": task["created_at"] if task else time.time(), + } + + +# ─── Get Task ────────────────────────────────────────────────────────────────── + +@router.get("/{task_id}", summary="Get task details") +async def get_task_detail(task_id: str): + task = await get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + return task + + +# ─── Get Task Status ─────────────────────────────────────────────────────────── + +@router.get("/{task_id}/status", summary="Get task status only") +async def get_task_status(task_id: str): + task = await get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + return { + "task_id": task_id, + "status": task["status"], + "retry_count": task.get("retry_count", 0), + "created_at": task.get("created_at"), + "started_at": task.get("started_at"), + "completed_at": task.get("completed_at"), + } + + +# ─── Cancel Task ─────────────────────────────────────────────────────────────── + +@router.post("/{task_id}/cancel", summary="Cancel a running task") +async def cancel_task(task_id: str, req: TaskCancelRequest, request: Request): + task = await get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + if task["status"] in ("completed", "failed", "cancelled"): + raise HTTPException(status_code=400, detail=f"Task already {task['status']}") + engine = get_engine(request) + await engine.cancel(task_id, req.reason) + return {"task_id": task_id, "status": "cancelled", "reason": req.reason} + + +# ─── Retry Task ──────────────────────────────────────────────────────────────── + +@router.post("/{task_id}/retry", summary="Retry a failed task") +async def retry_task(task_id: str, request: Request): + task = await get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + if task["status"] not in ("failed", "cancelled"): + raise HTTPException(status_code=400, detail="Only failed/cancelled tasks can be retried") + engine = get_engine(request) + await engine.retry(task_id) + return {"task_id": task_id, "status": "queued", "message": "Task requeued for retry"} + + +# ─── Stream Task Events (SSE) ────────────────────────────────────────────────── + +@router.get("/{task_id}/stream", summary="Stream task events via SSE") +async def stream_task(task_id: str, request: Request): + task = await get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + + async def event_generator(): + # First, replay all stored events + events = await get_task_events(task_id) + for ev in events: + data = json.dumps({ + "type": ev["event_type"], + "task_id": task_id, + "timestamp": ev["timestamp"], + "data": json.loads(ev["data"]) if ev.get("data") else {}, + }) + yield f"data: {data}\n\n" + + # Then stream live events via WS manager buffer + ws = get_ws(request) + room = f"task:{task_id}" + last_count = len(events) + + # Poll for new events (for SSE fallback) + for _ in range(600): # max 5 minutes + await asyncio.sleep(0.5) + current_task = await get_task(task_id) + if current_task and current_task["status"] in ("completed", "failed", "cancelled"): + yield f"data: {json.dumps({'type': 'stream_end', 'task_id': task_id, 'status': current_task['status']})}\n\n" + break + # heartbeat + yield f"data: {json.dumps({'type': 'heartbeat', 'timestamp': time.time()})}\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + "Connection": "keep-alive", + }, + ) + + +# ─── List Tasks ──────────────────────────────────────────────────────────────── + +@router.get("/", summary="List tasks") +async def list_all_tasks( + session_id: str = Query(default=""), + limit: int = Query(default=50, le=200), +): + tasks = await list_tasks(session_id=session_id, limit=limit) + return {"tasks": tasks, "total": len(tasks)} + + +# ─── Task Events History ─────────────────────────────────────────────────────── + +@router.get("/{task_id}/events", summary="Get all events for a task") +async def task_events(task_id: str): + task = await get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + events = await get_task_events(task_id) + return {"task_id": task_id, "events": events, "total": len(events)} diff --git a/backend/api/websocket_manager.py b/backend/api/websocket_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..68aea8a58e3d2381e6eec51197b6a9f13ad9931d --- /dev/null +++ b/backend/api/websocket_manager.py @@ -0,0 +1,134 @@ +""" +WebSocket Connection Manager β€” Production Grade +Handles rooms, heartbeats, event buffering, reconnect support +""" + +import asyncio +import json +import time +import uuid +from collections import defaultdict +from typing import Dict, List, Optional, Set +import structlog + +log = structlog.get_logger() + + +class WebSocketManager: + def __init__(self): + # room β†’ set of websockets + self._rooms: Dict[str, Set] = defaultdict(set) + # ws β†’ list of rooms + self._ws_rooms: Dict[object, Set[str]] = defaultdict(set) + # Event buffer per room (for replay on reconnect) + self._event_buffer: Dict[str, List] = defaultdict(list) + self._buffer_max = 100 + # Active connection count + self._connection_count = 0 + + async def connect(self, websocket, room: str): + await websocket.accept() + self._rooms[room].add(websocket) + self._ws_rooms[websocket].add(room) + self._connection_count += 1 + log.info("WS connected", room=room, total=self._connection_count) + + # Replay buffered events for this room + buffered = self._event_buffer.get(room, [])[-20:] + for event in buffered: + try: + await websocket.send_json(event) + except Exception: + pass + + await websocket.send_json({ + "type": "connected", + "room": room, + "timestamp": time.time(), + "buffered_events": len(buffered), + }) + + def disconnect(self, websocket, room: Optional[str] = None): + if room: + self._rooms[room].discard(websocket) + self._ws_rooms[websocket].discard(room) + else: + for r in list(self._ws_rooms.get(websocket, [])): + self._rooms[r].discard(websocket) + self._ws_rooms.pop(websocket, None) + self._connection_count = max(0, self._connection_count - 1) + log.info("WS disconnected", room=room, total=self._connection_count) + + async def broadcast(self, room: str, event: dict): + """Broadcast event to all sockets in a room.""" + if "timestamp" not in event: + event["timestamp"] = time.time() + if "id" not in event: + event["id"] = str(uuid.uuid4())[:8] + + # Buffer event + self._event_buffer[room].append(event) + if len(self._event_buffer[room]) > self._buffer_max: + self._event_buffer[room].pop(0) + + dead = set() + for ws in list(self._rooms.get(room, [])): + try: + await ws.send_json(event) + except Exception: + dead.add(ws) + + for ws in dead: + self.disconnect(ws, room) + + async def broadcast_global(self, event: dict): + """Broadcast to ALL connected websockets.""" + for room in list(self._rooms.keys()): + await self.broadcast(room, event) + + async def emit(self, task_id: str, event_type: str, data: dict, session_id: str = ""): + """Emit a structured event to a task room + logs room.""" + event = { + "type": event_type, + "task_id": task_id, + "session_id": session_id, + "timestamp": time.time(), + "data": data, + } + await self.broadcast(f"task:{task_id}", event) + await self.broadcast("logs", event) + await self.broadcast("agent_status", { + "type": "agent_event", + "task_id": task_id, + "event_type": event_type, + "timestamp": time.time(), + }) + + async def emit_chat(self, session_id: str, event_type: str, data: dict): + """Emit event to a chat session room.""" + event = { + "type": event_type, + "session_id": session_id, + "timestamp": time.time(), + "data": data, + } + await self.broadcast(f"chat:{session_id}", event) + + async def heartbeat_loop(self): + """Send heartbeat to all connections every 15s.""" + while True: + await asyncio.sleep(15) + heartbeat = { + "type": "heartbeat", + "timestamp": time.time(), + "connections": self._connection_count, + } + for room in list(self._rooms.keys()): + await self.broadcast(room, heartbeat) + + def get_stats(self) -> dict: + return { + "total_connections": self._connection_count, + "rooms": {r: len(ws) for r, ws in self._rooms.items()}, + "buffered_events": {r: len(e) for r, e in self._event_buffer.items()}, + } diff --git a/backend/core/__init__.py b/backend/core/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/backend/core/__pycache__/__init__.cpython-312.pyc b/backend/core/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..379ed24d70cbb3676abd189e72df51c25c53be05 Binary files /dev/null and b/backend/core/__pycache__/__init__.cpython-312.pyc differ diff --git a/backend/core/__pycache__/agent.cpython-312.pyc b/backend/core/__pycache__/agent.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b5b1ebbbb408d883e9048adb2dd8992582281e2a Binary files /dev/null and b/backend/core/__pycache__/agent.cpython-312.pyc differ diff --git a/backend/core/__pycache__/models.cpython-312.pyc b/backend/core/__pycache__/models.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bb9d95933a1cb106e8eca4cd9eac10505e42bd2e Binary files /dev/null and b/backend/core/__pycache__/models.cpython-312.pyc differ diff --git a/backend/core/__pycache__/task_engine.cpython-312.pyc b/backend/core/__pycache__/task_engine.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..921c872cdb4610246211e471c629395245037bea Binary files /dev/null and b/backend/core/__pycache__/task_engine.cpython-312.pyc differ diff --git a/backend/core/agent.py b/backend/core/agent.py new file mode 100644 index 0000000000000000000000000000000000000000..293d2d0b8356d476b1b4d00fafe7712457de8222 --- /dev/null +++ b/backend/core/agent.py @@ -0,0 +1,392 @@ +""" +Agent Core β€” Planner + Executor + Self-Heal Loop +LLM-powered with OpenAI/Anthropic support, streaming tokens +""" + +import asyncio +import json +import os +import time +from typing import Any, Dict, List, Optional + +import httpx +import structlog + +from core.models import TaskPlan, TaskStep +from api.websocket_manager import WebSocketManager +from memory.db import save_memory, get_history, search_memory + +log = structlog.get_logger() + +OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") +ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") +DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "gpt-4o") +OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1") + + +SYSTEM_PROMPT = """You are an elite autonomous AI software engineer β€” like Devin or Manus. +You can plan, code, debug, refactor, test, and deploy software autonomously. +You think step-by-step, write production-quality code, and self-heal on errors. +Always respond in structured JSON when asked for plans or structured output. +""" + +PLANNER_PROMPT = """You are a senior software architect. Given a goal, produce a detailed execution plan. + +Respond ONLY with valid JSON: +{ + "steps": [ + { + "name": "Step name", + "description": "What this step does", + "tool": "code|shell|file|browser|github|memory|search|test|none", + "estimated_seconds": 10 + } + ], + "estimated_duration": 60, + "tools_needed": ["code", "shell"] +} + +Goal: {goal} +Context: {context} +""" + + +class AgentCore: + def __init__(self, ws_manager: WebSocketManager): + self.ws = ws_manager + self.model = DEFAULT_MODEL + + # ─── LLM Call (with streaming) ───────────────────────────────────────────── + + async def llm_stream( + self, + messages: List[Dict], + task_id: str = "", + session_id: str = "", + model: str = "", + temperature: float = 0.7, + max_tokens: int = 4096, + ) -> str: + """Stream LLM tokens, emitting llm_chunk events via WebSocket.""" + model = model or self.model + full_text = "" + + if OPENAI_API_KEY: + full_text = await self._openai_stream( + messages, task_id, session_id, model, temperature, max_tokens + ) + elif ANTHROPIC_API_KEY: + full_text = await self._anthropic_stream( + messages, task_id, session_id, temperature, max_tokens + ) + else: + # Demo mode β€” simulate streaming + full_text = await self._demo_stream(messages, task_id, session_id) + + return full_text + + async def _openai_stream( + self, messages, task_id, session_id, model, temperature, max_tokens + ) -> str: + full_text = "" + headers = { + "Authorization": f"Bearer {OPENAI_API_KEY}", + "Content-Type": "application/json", + } + payload = { + "model": model, + "messages": messages, + "stream": True, + "temperature": temperature, + "max_tokens": max_tokens, + } + async with httpx.AsyncClient(timeout=120) as client: + async with client.stream( + "POST", f"{OPENAI_BASE_URL}/chat/completions", + headers=headers, json=payload + ) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + if not line.startswith("data:"): + continue + chunk = line[6:].strip() + if chunk == "[DONE]": + break + try: + data = json.loads(chunk) + delta = data["choices"][0]["delta"].get("content", "") + if delta: + full_text += delta + if task_id: + await self.ws.emit(task_id, "llm_chunk", { + "chunk": delta, + "accumulated": len(full_text), + }, session_id=session_id) + if session_id and not task_id: + await self.ws.emit_chat(session_id, "llm_chunk", { + "chunk": delta, + }) + except Exception: + pass + return full_text + + async def _anthropic_stream( + self, messages, task_id, session_id, temperature, max_tokens + ) -> str: + full_text = "" + system = "" + filtered = [] + for m in messages: + if m["role"] == "system": + system = m["content"] + else: + filtered.append(m) + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "Content-Type": "application/json", + } + payload = { + "model": "claude-3-5-sonnet-20241022", + "max_tokens": max_tokens, + "messages": filtered, + "stream": True, + } + if system: + payload["system"] = system + async with httpx.AsyncClient(timeout=120) as client: + async with client.stream( + "POST", "https://api.anthropic.com/v1/messages", + headers=headers, json=payload + ) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + if not line.startswith("data:"): + continue + try: + data = json.loads(line[5:].strip()) + if data.get("type") == "content_block_delta": + delta = data["delta"].get("text", "") + if delta: + full_text += delta + if task_id: + await self.ws.emit(task_id, "llm_chunk", { + "chunk": delta, + }, session_id=session_id) + if session_id and not task_id: + await self.ws.emit_chat(session_id, "llm_chunk", { + "chunk": delta, + }) + except Exception: + pass + return full_text + + async def _demo_stream(self, messages, task_id, session_id) -> str: + """Demo mode β€” simulate LLM streaming without API key.""" + last_user = next( + (m["content"] for m in reversed(messages) if m["role"] == "user"), "Hello" + ) + response = ( + f"πŸ€– **Devin Agent** (Demo Mode)\n\n" + f"I received your request: *{last_user[:100]}*\n\n" + f"To enable real AI responses, set `OPENAI_API_KEY` or `ANTHROPIC_API_KEY` in your environment.\n\n" + f"**What I can do with a real API key:**\n" + f"- πŸ“‹ Generate detailed execution plans\n" + f"- πŸ’» Write and execute code autonomously\n" + f"- πŸ”§ Debug and self-heal on errors\n" + f"- πŸ™ Manage GitHub repos autonomously\n" + f"- 🧠 Remember long-running project context\n" + f"- πŸš€ Deploy applications automatically\n" + ) + full_text = "" + for word in response.split(): + chunk = word + " " + full_text += chunk + await asyncio.sleep(0.03) + if task_id: + await self.ws.emit(task_id, "llm_chunk", { + "chunk": chunk, + "demo": True, + }, session_id=session_id) + if session_id and not task_id: + await self.ws.emit_chat(session_id, "llm_chunk", { + "chunk": chunk, + "demo": True, + }) + return full_text + + # ─── Planning ────────────────────────────────────────────────────────────── + + async def plan(self, goal: str, task_id: str, session_id: str = "") -> TaskPlan: + """Generate a structured execution plan.""" + # Get context from memory + memories = await search_memory(goal[:50], session_id=session_id) + context = "\n".join([m["content"][:200] for m in memories[:3]]) + + prompt = PLANNER_PROMPT.format(goal=goal, context=context or "No prior context") + + messages = [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": prompt}, + ] + + if not OPENAI_API_KEY and not ANTHROPIC_API_KEY: + # Demo plan + return self._demo_plan(goal) + + raw = await self.llm_stream(messages, task_id=task_id, session_id=session_id) + + # Extract JSON from response + try: + # Find JSON block + start = raw.find("{") + end = raw.rfind("}") + 1 + if start >= 0 and end > start: + data = json.loads(raw[start:end]) + else: + data = json.loads(raw) + + steps = [] + for i, s in enumerate(data.get("steps", [])): + steps.append(TaskStep( + name=s.get("name", f"Step {i+1}"), + description=s.get("description", ""), + tool=s.get("tool", "none"), + )) + + return TaskPlan( + goal=goal, + steps=steps if steps else [TaskStep(name="Execute goal", description=goal, tool="code")], + estimated_duration=data.get("estimated_duration", 60), + tools_needed=data.get("tools_needed", []), + ) + except Exception as e: + log.warning("Plan parse failed, using fallback", error=str(e)) + return self._demo_plan(goal) + + def _demo_plan(self, goal: str) -> TaskPlan: + """Fallback plan for demo mode.""" + steps = [ + TaskStep(name="Analyze Requirements", description=f"Analyze: {goal[:60]}", tool="none"), + TaskStep(name="Design Solution", description="Design the solution architecture", tool="none"), + TaskStep(name="Implement", description="Write the implementation code", tool="code"), + TaskStep(name="Test", description="Test the implementation", tool="test"), + TaskStep(name="Document", description="Write documentation", tool="none"), + ] + return TaskPlan( + goal=goal, + steps=steps, + estimated_duration=120, + tools_needed=["code", "test"], + ) + + # ─── Step Execution ──────────────────────────────────────────────────────── + + async def execute_step( + self, + step: TaskStep, + task_id: str, + session_id: str = "", + context: Dict = {}, + ) -> str: + """Execute a single step using the appropriate tool.""" + from tools.executor import ToolExecutor + executor = ToolExecutor(self.ws) + + await self.ws.emit(task_id, "tool_called", { + "tool": step.tool or "none", + "step": step.name, + "description": step.description, + }, session_id=session_id) + + try: + result = await executor.run( + tool=step.tool or "none", + task=step.description, + goal=context.get("goal", ""), + previous=context.get("previous_results", []), + task_id=task_id, + session_id=session_id, + ) + await self.ws.emit(task_id, "tool_result", { + "tool": step.tool, + "step": step.name, + "result": str(result)[:500], + "success": True, + }, session_id=session_id) + return result + except Exception as e: + await self.ws.emit(task_id, "tool_result", { + "tool": step.tool, + "step": step.name, + "error": str(e), + "success": False, + }, session_id=session_id) + return f"Error in {step.name}: {str(e)}" + + # ─── Finalize ────────────────────────────────────────────────────────────── + + async def finalize( + self, + goal: str, + steps: List[TaskStep], + results: List[str], + task_id: str, + session_id: str = "", + ) -> str: + """Compile final result summary.""" + steps_summary = "\n".join([ + f"- {s.name}: {r[:200]}" for s, r in zip(steps, results) + ]) + messages = [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": ( + f"Summarize the completion of this goal:\n" + f"Goal: {goal}\n\n" + f"Steps completed:\n{steps_summary}\n\n" + f"Write a concise success summary with key outcomes." + )}, + ] + result = await self.llm_stream(messages, task_id=task_id, session_id=session_id) + return result or f"βœ… Completed: {goal}" + + # ─── Chat ────────────────────────────────────────────────────────────────── + + async def stream_chat(self, session_id: str, user_message: str): + """Stream a conversational chat response.""" + # Save user message to memory + await save_memory( + content=user_message, + memory_type="conversation", + session_id=session_id, + key="user_message", + ) + + # Get conversation history + history = await get_history(session_id, limit=10) + messages = [{"role": "system", "content": SYSTEM_PROMPT}] + for h in reversed(history[-10:]): + messages.append({"role": "user", "content": h["content"]}) + + messages.append({"role": "user", "content": user_message}) + + await self.ws.emit_chat(session_id, "stream_start", { + "status": "generating", + }) + + response = await self.llm_stream(messages, session_id=session_id) + + # Save assistant response to memory + await save_memory( + content=response, + memory_type="conversation", + session_id=session_id, + key="assistant_response", + ) + + await self.ws.emit_chat(session_id, "stream_end", { + "full_response": response, + "status": "complete", + }) + + return response diff --git a/backend/core/models.py b/backend/core/models.py new file mode 100644 index 0000000000000000000000000000000000000000..772070923d8ddf8b5cba203e2d64ddb31c010fe6 --- /dev/null +++ b/backend/core/models.py @@ -0,0 +1,213 @@ +""" +Pydantic Models β€” Task, Chat, Memory, GitHub +""" + +import time +import uuid +from enum import Enum +from typing import Any, Dict, List, Optional +from pydantic import BaseModel, Field + + +def gen_id(prefix: str = "") -> str: + return f"{prefix}{uuid.uuid4().hex[:12]}" + + +# ─── Enums ───────────────────────────────────────────────────────────────────── + +class TaskStatus(str, Enum): + queued = "queued" + initializing = "initializing" + planning = "planning" + executing = "executing" + streaming = "streaming" + waiting_input = "waiting_input" + retrying = "retrying" + finalizing = "finalizing" + completed = "completed" + failed = "failed" + cancelled = "cancelled" + + +class EventType(str, Enum): + task_created = "task_created" + task_queued = "task_queued" + task_started = "task_started" + plan_generated = "plan_generated" + step_started = "step_started" + step_progress = "step_progress" + tool_called = "tool_called" + tool_result = "tool_result" + llm_chunk = "llm_chunk" + memory_updated = "memory_updated" + retry_attempt = "retry_attempt" + step_completed = "step_completed" + warning = "warning" + error = "error" + task_completed = "task_completed" + task_failed = "task_failed" + heartbeat = "heartbeat" + + +class MemoryType(str, Enum): + conversation = "conversation" + task = "task" + project = "project" + execution = "execution" + tool = "tool" + error = "error" + repo = "repo" + planning = "planning" + + +# ─── Task Models ─────────────────────────────────────────────────────────────── + +class TaskCreateRequest(BaseModel): + goal: str = Field(..., min_length=1, max_length=10000, description="What should the agent do?") + session_id: str = Field(default_factory=lambda: gen_id("sess_")) + project_id: str = Field(default="") + stream: bool = True + metadata: Dict[str, Any] = Field(default_factory=dict) + github_repo: Optional[str] = None + auto_commit: bool = False + + +class TaskStep(BaseModel): + id: str = Field(default_factory=lambda: gen_id("step_")) + name: str + description: str = "" + tool: Optional[str] = None + status: str = "pending" + output: Optional[str] = None + error: Optional[str] = None + started_at: Optional[float] = None + completed_at: Optional[float] = None + duration_ms: Optional[float] = None + + +class TaskPlan(BaseModel): + goal: str + steps: List[TaskStep] + estimated_duration: int = 0 + tools_needed: List[str] = [] + created_at: float = Field(default_factory=time.time) + + +class TaskResponse(BaseModel): + id: str + goal: str + status: TaskStatus + session_id: str + project_id: str + plan: Optional[TaskPlan] = None + result: Optional[str] = None + error: Optional[str] = None + created_at: float + started_at: Optional[float] = None + completed_at: Optional[float] = None + retry_count: int = 0 + stream_url: Optional[str] = None + ws_url: Optional[str] = None + + +class TaskCancelRequest(BaseModel): + reason: str = "User cancelled" + + +class TaskRetryRequest(BaseModel): + reset_state: bool = True + + +# ─── Chat Models ─────────────────────────────────────────────────────────────── + +class ChatMessage(BaseModel): + role: str = Field(..., pattern="^(user|assistant|system)$") + content: str + timestamp: float = Field(default_factory=time.time) + + +class ChatRequest(BaseModel): + messages: List[ChatMessage] + session_id: str = Field(default_factory=lambda: gen_id("sess_")) + project_id: str = "" + stream: bool = True + model: str = "gpt-4o" + temperature: float = 0.7 + max_tokens: int = 4096 + system_prompt: Optional[str] = None + + +class GoalRequest(BaseModel): + goal: str = Field(..., min_length=1, max_length=10000) + session_id: str = Field(default_factory=lambda: gen_id("sess_")) + project_id: str = "" + stream: bool = True + auto_execute: bool = True + github_repo: Optional[str] = None + + +# ─── Memory Models ───────────────────────────────────────────────────────────── + +class MemorySaveRequest(BaseModel): + content: str + memory_type: MemoryType + session_id: str = "" + project_id: str = "" + key: str = "" + metadata: Dict[str, Any] = {} + + +class MemorySearchRequest(BaseModel): + query: str + session_id: str = "" + project_id: str = "" + limit: int = 20 + + +# ─── GitHub Models ───────────────────────────────────────────────────────────── + +class GitHubCloneRequest(BaseModel): + repo_url: str + branch: str = "main" + local_path: Optional[str] = None + + +class GitHubCreateRepoRequest(BaseModel): + name: str + description: str = "" + private: bool = False + auto_init: bool = True + + +class GitHubCommitRequest(BaseModel): + repo: str + branch: str = "main" + files: Dict[str, str] # path β†’ content + message: str + create_branch: bool = False + + +class GitHubPRRequest(BaseModel): + repo: str + title: str + body: str = "" + head: str + base: str = "main" + draft: bool = False + + +class GitHubIssueRequest(BaseModel): + repo: str + title: str + body: str = "" + labels: List[str] = [] + + +# ─── Event Schema (unified) ──────────────────────────────────────────────────── + +class StreamEvent(BaseModel): + type: str + task_id: str = "" + session_id: str = "" + timestamp: float = Field(default_factory=time.time) + data: Dict[str, Any] = {} diff --git a/backend/core/task_engine.py b/backend/core/task_engine.py new file mode 100644 index 0000000000000000000000000000000000000000..6eeb28d10be4e264994dc2b16beafb7d8b2492b6 --- /dev/null +++ b/backend/core/task_engine.py @@ -0,0 +1,241 @@ +""" +Task Engine β€” Heart of the Autonomous Agent +Manages task lifecycle, planning, execution, self-healing +""" + +import asyncio +import json +import os +import time +import uuid +from typing import Dict, Optional, List + +import structlog + +from core.models import TaskStatus, TaskPlan, TaskStep, TaskCreateRequest +from api.websocket_manager import WebSocketManager +from memory.db import ( + create_task, update_task_status, get_task, save_task_event, + save_memory, get_task_events +) + +log = structlog.get_logger() + +MAX_RETRIES = 3 +MAX_CONCURRENT = 5 + + +class TaskEngine: + def __init__(self, ws_manager: WebSocketManager): + self.ws = ws_manager + self._queue: asyncio.Queue = asyncio.Queue() + self._active: Dict[str, asyncio.Task] = {} + self._running = False + self._workers: List[asyncio.Task] = [] + + async def start(self): + self._running = True + for i in range(MAX_CONCURRENT): + worker = asyncio.create_task(self._worker(i)) + self._workers.append(worker) + log.info("TaskEngine started", workers=MAX_CONCURRENT) + + async def stop(self): + self._running = False + for w in self._workers: + w.cancel() + log.info("TaskEngine stopped") + + # ─── Public API ──────────────────────────────────────────────────────────── + + async def submit(self, req: TaskCreateRequest) -> str: + task_id = f"task_{uuid.uuid4().hex[:10]}" + await create_task( + task_id=task_id, + goal=req.goal, + session_id=req.session_id, + project_id=req.project_id, + metadata={**req.metadata, "github_repo": req.github_repo, "auto_commit": req.auto_commit}, + ) + await self.ws.emit(task_id, "task_created", { + "goal": req.goal, + "session_id": req.session_id, + "stream_url": f"/api/v1/tasks/{task_id}/stream", + "ws_url": f"/ws/tasks/{task_id}", + }, session_id=req.session_id) + await self._queue.put((task_id, req)) + await self.ws.emit(task_id, "task_queued", { + "position": self._queue.qsize(), + }, session_id=req.session_id) + log.info("Task submitted", task_id=task_id, goal=req.goal[:60]) + return task_id + + async def cancel(self, task_id: str, reason: str = "User cancelled"): + if task_id in self._active: + self._active[task_id].cancel() + del self._active[task_id] + await update_task_status(task_id, "cancelled", error=reason) + await self.ws.emit(task_id, "task_failed", {"reason": reason, "status": "cancelled"}) + + async def retry(self, task_id: str): + task = await get_task(task_id) + if not task: + return + req = TaskCreateRequest( + goal=task["goal"], + session_id=task["session_id"] or "", + project_id=task["project_id"] or "", + metadata=task.get("metadata") or {}, + ) + retry_count = (task.get("retry_count") or 0) + 1 + await update_task_status(task_id, "queued", retry_count=retry_count) + await self.ws.emit(task_id, "retry_attempt", {"count": retry_count}) + await self._queue.put((task_id, req)) + + async def handle_chat_message(self, session_id: str, content: str, websocket=None): + """Handle real-time chat message with streaming response.""" + from core.agent import AgentCore + agent = AgentCore(self.ws) + await agent.stream_chat(session_id=session_id, user_message=content) + + # ─── Worker Loop ─────────────────────────────────────────────────────────── + + async def _worker(self, worker_id: int): + log.info(f"Worker {worker_id} started") + while self._running: + try: + task_id, req = await asyncio.wait_for(self._queue.get(), timeout=1.0) + worker_task = asyncio.create_task(self._execute(task_id, req)) + self._active[task_id] = worker_task + await worker_task + self._active.pop(task_id, None) + self._queue.task_done() + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + except Exception as e: + log.error(f"Worker {worker_id} error", error=str(e)) + + async def _execute(self, task_id: str, req: TaskCreateRequest): + """Full task execution lifecycle.""" + from core.agent import AgentCore + agent = AgentCore(self.ws) + + try: + # ── Initializing ──────────────────────────────────────────────── + await update_task_status(task_id, "initializing") + await self.ws.emit(task_id, "task_started", { + "goal": req.goal, + "status": "initializing", + }, session_id=req.session_id) + await save_task_event(task_id, "task_started", {"goal": req.goal}) + + # ── Planning ──────────────────────────────────────────────────── + await update_task_status(task_id, "planning") + await self.ws.emit(task_id, "step_started", { + "step": "Planning", + "status": "planning", + "description": "Generating execution plan...", + }, session_id=req.session_id) + + plan = await agent.plan(goal=req.goal, task_id=task_id, session_id=req.session_id) + + await update_task_status(task_id, "executing", plan=plan.model_dump()) + await self.ws.emit(task_id, "plan_generated", { + "steps": [s.model_dump() for s in plan.steps], + "estimated_duration": plan.estimated_duration, + "tools_needed": plan.tools_needed, + }, session_id=req.session_id) + await save_task_event(task_id, "plan_generated", {"steps_count": len(plan.steps)}) + + # ── Execute Steps ──────────────────────────────────────────────── + results = [] + for i, step in enumerate(plan.steps): + await self.ws.emit(task_id, "step_started", { + "step": step.name, + "step_id": step.id, + "index": i, + "total": len(plan.steps), + "tool": step.tool, + }, session_id=req.session_id) + + step_result = await agent.execute_step( + step=step, + task_id=task_id, + session_id=req.session_id, + context={"goal": req.goal, "previous_results": results}, + ) + results.append(step_result) + + await self.ws.emit(task_id, "step_completed", { + "step": step.name, + "step_id": step.id, + "index": i, + "output": step_result[:500] if isinstance(step_result, str) else str(step_result)[:500], + "status": "completed", + }, session_id=req.session_id) + await save_task_event(task_id, "step_completed", {"step": step.name, "index": i}) + + # ── Finalize ───────────────────────────────────────────────────── + await update_task_status(task_id, "finalizing") + await self.ws.emit(task_id, "step_started", { + "step": "Finalizing", + "description": "Compiling results...", + }, session_id=req.session_id) + + final_result = await agent.finalize( + goal=req.goal, + steps=plan.steps, + results=results, + task_id=task_id, + session_id=req.session_id, + ) + + await update_task_status(task_id, "completed", result=final_result) + await self.ws.emit(task_id, "task_completed", { + "result": final_result, + "steps_completed": len(plan.steps), + "duration": time.time(), + }, session_id=req.session_id) + + # Save to memory + await save_memory( + content=f"Task: {req.goal}\nResult: {final_result}", + memory_type="task", + session_id=req.session_id, + project_id=req.project_id, + key=task_id, + ) + await self.ws.emit(task_id, "memory_updated", { + "type": "task", + "key": task_id, + }, session_id=req.session_id) + + log.info("Task completed", task_id=task_id) + + except asyncio.CancelledError: + await update_task_status(task_id, "cancelled") + await self.ws.emit(task_id, "task_failed", {"reason": "cancelled"}) + except Exception as e: + log.error("Task failed", task_id=task_id, error=str(e)) + task_data = await get_task(task_id) + retry_count = (task_data or {}).get("retry_count", 0) + + await self.ws.emit(task_id, "error", { + "error": str(e), + "retry_count": retry_count, + "will_retry": retry_count < MAX_RETRIES, + }, session_id=req.session_id) + + if retry_count < MAX_RETRIES: + await update_task_status(task_id, "retrying", retry_count=retry_count + 1) + await asyncio.sleep(2 ** retry_count) + await self.ws.emit(task_id, "retry_attempt", {"count": retry_count + 1}) + await self._execute(task_id, req) + else: + await update_task_status(task_id, "failed", error=str(e)) + await self.ws.emit(task_id, "task_failed", { + "error": str(e), + "retry_count": retry_count, + }, session_id=req.session_id) diff --git a/backend/ecosystem.config.cjs b/backend/ecosystem.config.cjs new file mode 100644 index 0000000000000000000000000000000000000000..ddba4e6b870059011d68d1ed7ec67d06c1a86771 --- /dev/null +++ b/backend/ecosystem.config.cjs @@ -0,0 +1,20 @@ +module.exports = { + apps: [ + { + name: 'devin-backend', + script: 'uvicorn', + args: 'main:app --host 0.0.0.0 --port 7860 --loop asyncio --log-level info', + interpreter: 'python3', + cwd: '/home/user/devin-agent/backend', + watch: false, + instances: 1, + exec_mode: 'fork', + env: { + PORT: 7860, + HOST: '0.0.0.0', + DB_PATH: '/tmp/devin_agent.db', + PYTHONUNBUFFERED: '1', + }, + }, + ], +} diff --git a/backend/github/__init__.py b/backend/github/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/backend/main.py b/backend/main.py new file mode 100644 index 0000000000000000000000000000000000000000..6202e92b7b308de38693263dcb15fb815df68418 --- /dev/null +++ b/backend/main.py @@ -0,0 +1,180 @@ +""" +πŸš€ Devin-Style Autonomous AI Engineering Platform +Production-Grade FastAPI + WebSocket Backend +""" + +import asyncio +import json +import logging +import os +import time +import uuid +from contextlib import asynccontextmanager +from typing import Optional + +import structlog +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.middleware.gzip import GZipMiddleware +from fastapi.responses import JSONResponse +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.errors import RateLimitExceeded + +from api.routes import tasks, chat, memory, github, health +from api.websocket_manager import WebSocketManager +from core.task_engine import TaskEngine +from memory.db import init_db + +# ─── Structured Logging ──────────────────────────────────────────────────────── +structlog.configure( + processors=[ + structlog.processors.TimeStamper(fmt="iso"), + structlog.stdlib.add_log_level, + structlog.processors.StackInfoRenderer(), + structlog.dev.ConsoleRenderer(), + ] +) +log = structlog.get_logger() + +# ─── Rate Limiter ────────────────────────────────────────────────────────────── +limiter = Limiter(key_func=get_remote_address) + +# ─── Global Managers (shared state) ─────────────────────────────────────────── +ws_manager = WebSocketManager() +task_engine = TaskEngine(ws_manager) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Startup + Shutdown lifecycle.""" + log.info("πŸš€ Starting Devin Agent Platform...") + await init_db() + await task_engine.start() + asyncio.create_task(ws_manager.heartbeat_loop()) + log.info("βœ… Platform ready") + yield + log.info("πŸ›‘ Shutting down...") + await task_engine.stop() + log.info("βœ… Shutdown complete") + + +# ─── FastAPI App ─────────────────────────────────────────────────────────────── +app = FastAPI( + title="πŸ€– Devin Agent Platform", + description="Production-Grade Autonomous AI Engineering Platform", + version="2.0.0", + lifespan=lifespan, + docs_url="/api/docs", + redoc_url="/api/redoc", +) + +app.state.limiter = limiter +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) + +# ─── Middleware ──────────────────────────────────────────────────────────────── +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) +app.add_middleware(GZipMiddleware, minimum_size=1000) + + +# ─── Request Logging ─────────────────────────────────────────────────────────── +@app.middleware("http") +async def log_requests(request: Request, call_next): + start = time.time() + response = await call_next(request) + duration = round((time.time() - start) * 1000, 2) + log.info("HTTP", method=request.method, path=request.url.path, status=response.status_code, ms=duration) + return response + + +# ─── Inject shared state into routes ────────────────────────────────────────── +app.state.ws_manager = ws_manager +app.state.task_engine = task_engine + + +# ─── REST API Routers ────────────────────────────────────────────────────────── +app.include_router(health.router, prefix="/api/v1", tags=["health"]) +app.include_router(tasks.router, prefix="/api/v1/tasks", tags=["tasks"]) +app.include_router(chat.router, prefix="/api/v1", tags=["chat"]) +app.include_router(memory.router, prefix="/api/v1/memory", tags=["memory"]) +app.include_router(github.router, prefix="/api/v1/github", tags=["github"]) + + +# ─── WebSocket Endpoints ─────────────────────────────────────────────────────── +@app.websocket("/ws/tasks/{task_id}") +async def ws_task(websocket: WebSocket, task_id: str): + """Live streaming for specific task execution.""" + await ws_manager.connect(websocket, room=f"task:{task_id}") + try: + while True: + data = await websocket.receive_text() + msg = json.loads(data) + if msg.get("type") == "ping": + await websocket.send_json({"type": "pong", "timestamp": time.time()}) + except WebSocketDisconnect: + ws_manager.disconnect(websocket, room=f"task:{task_id}") + + +@app.websocket("/ws/logs") +async def ws_logs(websocket: WebSocket): + """Global live log stream.""" + await ws_manager.connect(websocket, room="logs") + try: + while True: + data = await websocket.receive_text() + msg = json.loads(data) + if msg.get("type") == "ping": + await websocket.send_json({"type": "pong", "timestamp": time.time()}) + except WebSocketDisconnect: + ws_manager.disconnect(websocket, room="logs") + + +@app.websocket("/ws/chat/{session_id}") +async def ws_chat(websocket: WebSocket, session_id: str): + """Real-time chat streaming per session.""" + await ws_manager.connect(websocket, room=f"chat:{session_id}") + try: + while True: + data = await websocket.receive_text() + msg = json.loads(data) + if msg.get("type") == "ping": + await websocket.send_json({"type": "pong", "timestamp": time.time()}) + elif msg.get("type") == "chat_message": + # Trigger streaming chat response + asyncio.create_task( + task_engine.handle_chat_message(session_id, msg.get("content", ""), websocket) + ) + except WebSocketDisconnect: + ws_manager.disconnect(websocket, room=f"chat:{session_id}") + + +@app.websocket("/ws/agent/status") +async def ws_agent_status(websocket: WebSocket): + """Global agent status stream.""" + await ws_manager.connect(websocket, room="agent_status") + try: + while True: + data = await websocket.receive_text() + msg = json.loads(data) + if msg.get("type") == "ping": + await websocket.send_json({"type": "pong", "timestamp": time.time()}) + except WebSocketDisconnect: + ws_manager.disconnect(websocket, room="agent_status") + + +# ─── Root ────────────────────────────────────────────────────────────────────── +@app.get("/") +async def root(): + return { + "name": "πŸ€– Devin Agent Platform", + "version": "2.0.0", + "status": "operational", + "docs": "/api/docs", + "websockets": ["/ws/tasks/{task_id}", "/ws/logs", "/ws/chat/{session_id}", "/ws/agent/status"], + } diff --git a/backend/memory/__init__.py b/backend/memory/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/backend/memory/__pycache__/__init__.cpython-312.pyc b/backend/memory/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1952a0ff6a2a22048f4412f42cc0f824a14104de Binary files /dev/null and b/backend/memory/__pycache__/__init__.cpython-312.pyc differ diff --git a/backend/memory/__pycache__/db.cpython-312.pyc b/backend/memory/__pycache__/db.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5592e627062b9e4e02c1ac97ff7f9b8f152e8b15 Binary files /dev/null and b/backend/memory/__pycache__/db.cpython-312.pyc differ diff --git a/backend/memory/db.py b/backend/memory/db.py new file mode 100644 index 0000000000000000000000000000000000000000..68ae8bfba79386be88580dd8e5efcc89714c97d8 --- /dev/null +++ b/backend/memory/db.py @@ -0,0 +1,271 @@ +""" +Production SQLite Database β€” Async via aiosqlite +Handles tasks, memory, sessions, events +""" + +import aiosqlite +import os +import json +import time +from typing import Optional, List, Dict, Any +import structlog + +log = structlog.get_logger() + +DB_PATH = os.environ.get("DB_PATH", "/tmp/devin_agent.db") + + +async def get_db() -> aiosqlite.Connection: + db = await aiosqlite.connect(DB_PATH) + db.row_factory = aiosqlite.Row + await db.execute("PRAGMA journal_mode=WAL") + await db.execute("PRAGMA foreign_keys=ON") + return db + + +async def init_db(): + """Initialize all tables.""" + log.info("Initializing database", path=DB_PATH) + async with aiosqlite.connect(DB_PATH) as db: + await db.execute("PRAGMA journal_mode=WAL") + await db.execute("PRAGMA foreign_keys=ON") + + # Tasks table + await db.execute(""" + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + session_id TEXT, + project_id TEXT, + goal TEXT NOT NULL, + status TEXT DEFAULT 'queued', + plan TEXT, + result TEXT, + error TEXT, + metadata TEXT DEFAULT '{}', + created_at REAL, + started_at REAL, + completed_at REAL, + retry_count INTEGER DEFAULT 0 + ) + """) + + # Task events table + await db.execute(""" + CREATE TABLE IF NOT EXISTS task_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + event_type TEXT NOT NULL, + data TEXT DEFAULT '{}', + timestamp REAL, + FOREIGN KEY (task_id) REFERENCES tasks(id) + ) + """) + + # Memory table + await db.execute(""" + CREATE TABLE IF NOT EXISTS memory ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT, + project_id TEXT, + memory_type TEXT NOT NULL, + key TEXT, + content TEXT NOT NULL, + metadata TEXT DEFAULT '{}', + embedding TEXT, + created_at REAL, + updated_at REAL + ) + """) + + # Sessions table + await db.execute(""" + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + project_id TEXT, + user_id TEXT, + metadata TEXT DEFAULT '{}', + created_at REAL, + last_active REAL + ) + """) + + # GitHub operations table + await db.execute(""" + CREATE TABLE IF NOT EXISTS github_ops ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT, + operation TEXT NOT NULL, + repo TEXT, + branch TEXT, + status TEXT DEFAULT 'pending', + result TEXT, + created_at REAL + ) + """) + + # Indexes + await db.execute("CREATE INDEX IF NOT EXISTS idx_tasks_session ON tasks(session_id)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_events_task ON task_events(task_id)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_memory_session ON memory(session_id)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_memory_project ON memory(project_id)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_memory_type ON memory(memory_type)") + + await db.commit() + log.info("βœ… Database initialized") + + +# ─── Task CRUD ───────────────────────────────────────────────────────────────── + +async def create_task(task_id: str, goal: str, session_id: str = "", project_id: str = "", metadata: dict = {}): + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + INSERT INTO tasks (id, session_id, project_id, goal, status, metadata, created_at) + VALUES (?, ?, ?, ?, 'queued', ?, ?) + """, (task_id, session_id, project_id, goal, json.dumps(metadata), time.time())) + await db.commit() + + +async def update_task_status(task_id: str, status: str, **kwargs): + fields = ["status = ?"] + values = [status] + if status == "executing": + fields.append("started_at = ?") + values.append(time.time()) + if status in ("completed", "failed", "cancelled"): + fields.append("completed_at = ?") + values.append(time.time()) + for k, v in kwargs.items(): + if k in ("plan", "result", "error"): + fields.append(f"{k} = ?") + values.append(v if isinstance(v, str) else json.dumps(v)) + elif k == "retry_count": + fields.append("retry_count = ?") + values.append(v) + values.append(task_id) + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(f"UPDATE tasks SET {', '.join(fields)} WHERE id = ?", values) + await db.commit() + + +async def get_task(task_id: str) -> Optional[Dict]: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)) as cursor: + row = await cursor.fetchone() + if row: + d = dict(row) + d["metadata"] = json.loads(d.get("metadata") or "{}") + d["plan"] = json.loads(d["plan"]) if d.get("plan") else None + return d + return None + + +async def list_tasks(session_id: str = "", limit: int = 50) -> List[Dict]: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + if session_id: + async with db.execute( + "SELECT * FROM tasks WHERE session_id = ? ORDER BY created_at DESC LIMIT ?", + (session_id, limit) + ) as cursor: + rows = await cursor.fetchall() + else: + async with db.execute( + "SELECT * FROM tasks ORDER BY created_at DESC LIMIT ?", (limit,) + ) as cursor: + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + +async def save_task_event(task_id: str, event_type: str, data: dict = {}): + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + INSERT INTO task_events (task_id, event_type, data, timestamp) + VALUES (?, ?, ?, ?) + """, (task_id, event_type, json.dumps(data), time.time())) + await db.commit() + + +async def get_task_events(task_id: str) -> List[Dict]: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + "SELECT * FROM task_events WHERE task_id = ? ORDER BY timestamp ASC", (task_id,) + ) as cursor: + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + +# ─── Memory CRUD ─────────────────────────────────────────────────────────────── + +async def save_memory( + content: str, + memory_type: str, + session_id: str = "", + project_id: str = "", + key: str = "", + metadata: dict = {} +): + now = time.time() + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + INSERT INTO memory (session_id, project_id, memory_type, key, content, metadata, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, (session_id, project_id, memory_type, key, content, json.dumps(metadata), now, now)) + await db.commit() + + +async def search_memory(query: str, session_id: str = "", project_id: str = "", limit: int = 20) -> List[Dict]: + """Simple keyword search (upgrade to vector search in production).""" + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + q = f"%{query}%" + if session_id: + async with db.execute( + "SELECT * FROM memory WHERE session_id = ? AND content LIKE ? ORDER BY updated_at DESC LIMIT ?", + (session_id, q, limit) + ) as cursor: + rows = await cursor.fetchall() + elif project_id: + async with db.execute( + "SELECT * FROM memory WHERE project_id = ? AND content LIKE ? ORDER BY updated_at DESC LIMIT ?", + (project_id, q, limit) + ) as cursor: + rows = await cursor.fetchall() + else: + async with db.execute( + "SELECT * FROM memory WHERE content LIKE ? ORDER BY updated_at DESC LIMIT ?", + (q, limit) + ) as cursor: + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + +async def get_project_memory(project_id: str, memory_type: str = "", limit: int = 100) -> List[Dict]: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + if memory_type: + async with db.execute( + "SELECT * FROM memory WHERE project_id = ? AND memory_type = ? ORDER BY updated_at DESC LIMIT ?", + (project_id, memory_type, limit) + ) as cursor: + rows = await cursor.fetchall() + else: + async with db.execute( + "SELECT * FROM memory WHERE project_id = ? ORDER BY updated_at DESC LIMIT ?", + (project_id, limit) + ) as cursor: + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + +async def get_history(session_id: str, limit: int = 50) -> List[Dict]: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + "SELECT * FROM memory WHERE session_id = ? AND memory_type = 'conversation' ORDER BY created_at DESC LIMIT ?", + (session_id, limit) + ) as cursor: + rows = await cursor.fetchall() + return [dict(r) for r in rows] diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..c2dc8344489a23accea6972db25fe35a2feacfa8 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,28 @@ +fastapi==0.111.0 +uvicorn[standard]==0.29.0 +websockets==12.0 +pydantic==2.7.1 +pydantic-settings==2.2.1 +python-jose[cryptography]==3.3.0 +python-multipart==0.0.9 +aiohttp==3.9.5 +aiosqlite==0.20.0 +sqlalchemy[asyncio]==2.0.30 +alembic==1.13.1 +httpx==0.27.0 +openai==1.30.1 +anthropic==0.26.1 +gitpython==3.1.43 +pygithub==2.3.0 +python-dotenv==1.0.1 +slowapi==0.1.9 +structlog==24.1.0 +rich==13.7.1 +asyncio-mqtt==0.16.2 +redis==5.0.4 +celery==5.3.6 +passlib[bcrypt]==1.7.4 +cryptography==42.0.7 +typer==0.12.3 +watchfiles==0.21.0 +psutil==5.9.8 diff --git a/backend/tools/__init__.py b/backend/tools/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/backend/tools/executor.py b/backend/tools/executor.py new file mode 100644 index 0000000000000000000000000000000000000000..2ae0b80afbd92030c28334a35653c342c2599eb4 --- /dev/null +++ b/backend/tools/executor.py @@ -0,0 +1,176 @@ +""" +Tool Executor β€” Routes tool calls to the right implementation +Supports: code, shell, file, browser, github, memory, search, test, none +""" + +import asyncio +import os +import subprocess +import tempfile +import time +from typing import Any, List, Optional + +import structlog + +from api.websocket_manager import WebSocketManager + +log = structlog.get_logger() + + +class ToolExecutor: + def __init__(self, ws_manager: WebSocketManager): + self.ws = ws_manager + + async def run( + self, + tool: str, + task: str, + goal: str = "", + previous: List = [], + task_id: str = "", + session_id: str = "", + ) -> str: + tool = (tool or "none").lower().strip() + + dispatch = { + "code": self._tool_code, + "shell": self._tool_shell, + "file": self._tool_file, + "github": self._tool_github, + "memory": self._tool_memory, + "search": self._tool_search, + "test": self._tool_test, + "browser": self._tool_browser, + "none": self._tool_none, + } + + fn = dispatch.get(tool, self._tool_none) + return await fn(task=task, goal=goal, previous=previous, task_id=task_id, session_id=session_id) + + # ─── Code Tool ───────────────────────────────────────────────────────────── + async def _tool_code(self, task, goal, previous, task_id, session_id) -> str: + """Generate code using LLM.""" + from core.agent import AgentCore + agent = AgentCore(self.ws) + messages = [ + {"role": "system", "content": "You are an expert software engineer. Write clean, production-quality code. Return only the code with minimal explanation."}, + {"role": "user", "content": f"Task: {task}\nGoal: {goal}\n\nWrite the code to accomplish this."}, + ] + result = await agent.llm_stream(messages, task_id=task_id, session_id=session_id) + return result or f"# Code for: {task}" + + # ─── Shell Tool ──────────────────────────────────────────────────────────── + async def _tool_shell(self, task, goal, previous, task_id, session_id) -> str: + """Execute shell commands safely in a temp workspace.""" + # Extract command from task description + from core.agent import AgentCore + agent = AgentCore(self.ws) + messages = [ + {"role": "system", "content": "Extract the shell command to run. Return ONLY the command, nothing else."}, + {"role": "user", "content": f"Task: {task}"}, + ] + cmd = await agent.llm_stream(messages, task_id=task_id, session_id=session_id) + cmd = cmd.strip().strip("`").strip() + + # Safety: block dangerous commands + blocked = ["rm -rf /", ":(){ :|:& };:", "mkfs", "dd if=", "shutdown", "reboot", "halt"] + for b in blocked: + if b in cmd: + return f"❌ Blocked dangerous command: {cmd}" + + try: + await self.ws.emit(task_id, "step_progress", { + "action": "shell_exec", + "command": cmd[:200], + }, session_id=session_id) + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd="/tmp", + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) + output = stdout.decode()[:2000] + (stderr.decode()[:500] if stderr else "") + return output or "Command executed (no output)" + except asyncio.TimeoutError: + return "⚠️ Command timed out after 30s" + except Exception as e: + return f"❌ Shell error: {str(e)}" + + # ─── File Tool ───────────────────────────────────────────────────────────── + async def _tool_file(self, task, goal, previous, task_id, session_id) -> str: + """Create or modify files.""" + from core.agent import AgentCore + agent = AgentCore(self.ws) + messages = [ + {"role": "system", "content": "Generate file content. Respond with JSON: {\"filename\": \"...\", \"content\": \"...\"}"}, + {"role": "user", "content": f"Task: {task}\nGoal: {goal}"}, + ] + raw = await agent.llm_stream(messages, task_id=task_id, session_id=session_id) + try: + import json + start = raw.find("{") + end = raw.rfind("}") + 1 + data = json.loads(raw[start:end]) + filename = data.get("filename", "output.txt") + content = data.get("content", raw) + path = f"/tmp/workspace/{filename}" + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w") as f: + f.write(content) + await self.ws.emit(task_id, "step_progress", { + "action": "file_written", + "filename": filename, + "size": len(content), + }, session_id=session_id) + return f"βœ… File written: {filename} ({len(content)} chars)" + except Exception as e: + return f"File task result: {raw[:500]}" + + # ─── GitHub Tool ─────────────────────────────────────────────────────────── + async def _tool_github(self, task, goal, previous, task_id, session_id) -> str: + """Perform GitHub operations.""" + return f"GitHub: {task}\n(Set GITHUB_TOKEN to enable real GitHub operations)" + + # ─── Memory Tool ─────────────────────────────────────────────────────────── + async def _tool_memory(self, task, goal, previous, task_id, session_id) -> str: + """Save/retrieve from memory.""" + from memory.db import save_memory, search_memory + results = await search_memory(task[:50], session_id=session_id) + if results: + return "\n".join([r["content"][:300] for r in results[:3]]) + return "No relevant memories found" + + # ─── Search Tool ─────────────────────────────────────────────────────────── + async def _tool_search(self, task, goal, previous, task_id, session_id) -> str: + """Web search using available APIs.""" + return f"Search result for: {task}\n(Integrate search API for real results)" + + # ─── Test Tool ───────────────────────────────────────────────────────────── + async def _tool_test(self, task, goal, previous, task_id, session_id) -> str: + """Generate and run tests.""" + from core.agent import AgentCore + agent = AgentCore(self.ws) + messages = [ + {"role": "system", "content": "Write test cases for the given task. Use pytest format."}, + {"role": "user", "content": f"Write tests for: {task}\nContext: {goal}"}, + ] + result = await agent.llm_stream(messages, task_id=task_id, session_id=session_id) + return result or f"# Tests for: {task}" + + # ─── Browser Tool ────────────────────────────────────────────────────────── + async def _tool_browser(self, task, goal, previous, task_id, session_id) -> str: + """Browser automation (stub β€” extend with playwright).""" + return f"Browser task: {task}\n(Install playwright for real browser automation)" + + # ─── None Tool ───────────────────────────────────────────────────────────── + async def _tool_none(self, task, goal, previous, task_id, session_id) -> str: + """Use LLM directly without tools.""" + from core.agent import AgentCore + agent = AgentCore(self.ws) + messages = [ + {"role": "system", "content": "You are an expert engineer. Complete the task thoroughly."}, + {"role": "user", "content": f"Task: {task}\nGoal context: {goal}"}, + ] + result = await agent.llm_stream(messages, task_id=task_id, session_id=session_id) + return result or f"Completed: {task}" diff --git a/frontend/app/globals.css b/frontend/app/globals.css new file mode 100644 index 0000000000000000000000000000000000000000..9e464c4f4a7d096c286bec766d4c7d7b01713ce9 --- /dev/null +++ b/frontend/app/globals.css @@ -0,0 +1,178 @@ +@tailwind base; +@tailwind components; +@tailwind utilities; + +@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&family=JetBrains+Mono:wght@400;500;600&display=swap'); + +:root { + --bg-primary: #0f1017; + --bg-secondary: #13141c; + --bg-tertiary: #1a1b26; + --border: #2a2b3d; + --text-primary: #e2e8f0; + --text-secondary: #94a3b8; + --accent: #4f6ef7; + --accent-glow: rgba(79, 110, 247, 0.3); + --success: #4ade80; + --warning: #facc15; + --error: #f87171; + --terminal-bg: #0a0b10; +} + +* { box-sizing: border-box; margin: 0; padding: 0; } + +html, body { + height: 100%; + background-color: var(--bg-primary); + color: var(--text-primary); + font-family: 'Inter', system-ui, sans-serif; + font-size: 14px; + line-height: 1.6; + overflow: hidden; +} + +/* Scrollbar */ +::-webkit-scrollbar { width: 4px; height: 4px; } +::-webkit-scrollbar-track { background: transparent; } +::-webkit-scrollbar-thumb { background: #2a2b3d; border-radius: 2px; } +::-webkit-scrollbar-thumb:hover { background: #3a3b5a; } + +/* Selection */ +::selection { background: var(--accent-glow); color: var(--text-primary); } + +/* Focus */ +*:focus-visible { outline: 1px solid var(--accent); outline-offset: 2px; } + +/* Animations */ +@keyframes blink { 0%, 100% { opacity: 1; } 50% { opacity: 0; } } +@keyframes shimmer { + 0% { background-position: -200% 0; } + 100% { background-position: 200% 0; } +} +@keyframes scan { + 0% { transform: translateY(-100%); } + 100% { transform: translateY(100vh); } +} +@keyframes fadeSlideIn { + from { opacity: 0; transform: translateY(8px); } + to { opacity: 1; transform: translateY(0); } +} +@keyframes pulseRing { + 0% { transform: scale(1); opacity: 1; } + 100% { transform: scale(2); opacity: 0; } +} + +/* Cursor blink */ +.cursor-blink::after { + content: 'β–‹'; + animation: blink 1s step-end infinite; + color: var(--accent); +} + +/* Loading shimmer */ +.shimmer { + background: linear-gradient(90deg, #1a1b26 25%, #2a2b3d 50%, #1a1b26 75%); + background-size: 200% 100%; + animation: shimmer 1.5s infinite; +} + +/* Glass effect */ +.glass { + background: rgba(26, 27, 38, 0.7); + backdrop-filter: blur(12px); + -webkit-backdrop-filter: blur(12px); + border: 1px solid rgba(42, 43, 61, 0.8); +} + +/* Code blocks */ +pre, code { + font-family: 'JetBrains Mono', 'Fira Code', monospace; +} + +/* Prose override for dark */ +.prose-dark { + color: var(--text-primary); +} +.prose-dark h1, .prose-dark h2, .prose-dark h3 { color: #e2e8f0; } +.prose-dark code { + background: #1a1b26; + padding: 2px 6px; + border-radius: 4px; + font-size: 0.85em; + color: #c084fc; +} +.prose-dark pre { + background: #0a0b10 !important; + border: 1px solid #2a2b3d; + border-radius: 8px; +} +.prose-dark blockquote { + border-left: 3px solid var(--accent); + padding-left: 1rem; + color: var(--text-secondary); +} +.prose-dark a { color: var(--accent); } +.prose-dark strong { color: #e2e8f0; } +.prose-dark hr { border-color: #2a2b3d; } +.prose-dark ul li::marker { color: var(--accent); } +.prose-dark table th { background: #1a1b26; } +.prose-dark table td { border-color: #2a2b3d; } + +/* Step status colors */ +.step-running { color: #60a5fa; } +.step-completed { color: #4ade80; } +.step-failed { color: #f87171; } +.step-pending { color: #94a3b8; } + +/* Typing indicator */ +.typing-dot { + width: 6px; height: 6px; + border-radius: 50%; + background: var(--accent); + animation: pulseDot 1.4s ease-in-out infinite; +} +.typing-dot:nth-child(2) { animation-delay: 0.2s; } +.typing-dot:nth-child(3) { animation-delay: 0.4s; } +@keyframes pulseDot { + 0%, 80%, 100% { transform: scale(0.6); opacity: 0.4; } + 40% { transform: scale(1); opacity: 1; } +} + +/* Message animation */ +.message-enter { + animation: fadeSlideIn 0.3s ease-out forwards; +} + +/* Status badge */ +.status-queued { background: #1e293b; color: #94a3b8; } +.status-planning { background: #1a1f3a; color: #818cf8; } +.status-executing { background: #1a2d3a; color: #38bdf8; } +.status-completed { background: #162b1e; color: #4ade80; } +.status-failed { background: #2b1619; color: #f87171; } +.status-retrying { background: #2b2419; color: #facc15; } + +/* Glow border */ +.glow-border { + box-shadow: 0 0 0 1px var(--accent), 0 0 12px var(--accent-glow); +} + +/* Terminal window */ +.terminal { + background: var(--terminal-bg); + border-radius: 8px; + border: 1px solid #2a2b3d; + font-family: 'JetBrains Mono', monospace; + font-size: 12px; + line-height: 1.5; +} + +/* Scan line effect */ +.scan-line::before { + content: ''; + position: absolute; + top: 0; left: 0; right: 0; + height: 2px; + background: linear-gradient(90deg, transparent, var(--accent), transparent); + opacity: 0.3; + animation: scan 4s linear infinite; +} diff --git a/frontend/app/layout.tsx b/frontend/app/layout.tsx new file mode 100644 index 0000000000000000000000000000000000000000..ee9cff972d32aa2b2af8043859cac09faf574671 --- /dev/null +++ b/frontend/app/layout.tsx @@ -0,0 +1,21 @@ +import type { Metadata } from 'next' +import './globals.css' + +export const metadata: Metadata = { + title: 'πŸ€– Devin Agent β€” Autonomous AI Engineering Platform', + description: 'Production-grade autonomous AI coding agent with real-time streaming, WebSocket execution, GitHub automation, and persistent memory.', + keywords: ['AI agent', 'autonomous coding', 'Devin', 'Manus', 'streaming AI'], +} + +export default function RootLayout({ children }: { children: React.ReactNode }) { + return ( + + + + + + + {children} + + ) +} diff --git a/frontend/app/page.tsx b/frontend/app/page.tsx new file mode 100644 index 0000000000000000000000000000000000000000..3d89c57f4dcc644e028d2a6684f4aaccb7c24ba2 --- /dev/null +++ b/frontend/app/page.tsx @@ -0,0 +1,65 @@ +'use client' + +import { useEffect, useState } from 'react' +import { useAgentStore } from '@/hooks/useAgentStore' +import { useAgentWebSocket } from '@/hooks/useWebSocket' +import TopBar from '@/components/layout/TopBar' +import Sidebar from '@/components/layout/Sidebar' +import ChatPanel from '@/components/chat/ChatPanel' +import ExecutionTimeline from '@/components/timeline/ExecutionTimeline' +import TasksPanel from '@/components/layout/TasksPanel' +import MemoryPanel from '@/components/layout/MemoryPanel' + +export default function HomePage() { + const { activePanel, activeTaskId } = useAgentStore() + const [mounted, setMounted] = useState(false) + useEffect(() => setMounted(true), []) + + // Connect to global log stream + active task stream + useAgentWebSocket(undefined) + useAgentWebSocket(activeTaskId || undefined) + + if (!mounted) return ( +
+
+
πŸ€–
+
Loading Devin Agent...
+
+ {[0,1,2].map(i =>
)} +
+
+
+ ) + + const RightPanel = () => { + switch (activePanel) { + case 'timeline': return + case 'tasks': return + case 'memory': return + default: return + } + } + + return ( +
+ {/* Top bar */} + + + {/* Main layout */} +
+ {/* Left sidebar */} + + + {/* Center: Chat */} +
+ +
+ + {/* Right: Timeline / Tasks / Memory */} +
+ +
+
+
+ ) +} diff --git a/frontend/components/chat/ChatPanel.tsx b/frontend/components/chat/ChatPanel.tsx new file mode 100644 index 0000000000000000000000000000000000000000..66e840f49f3d336f039da4c9cc0e0bf53cf873b1 --- /dev/null +++ b/frontend/components/chat/ChatPanel.tsx @@ -0,0 +1,233 @@ +'use client' + +import { useState, useRef, useEffect, useCallback } from 'react' +import { useAgentStore } from '@/hooks/useAgentStore' +import { streamChatSSE } from '@/lib/websocket' +import { createTask } from '@/lib/api' +import MessageBubble from './MessageBubble' +import { Send, Loader2, Zap, Code2, GitBranch, Brain, Square } from 'lucide-react' + +const QUICK_ACTIONS = [ + { icon: Code2, label: 'Build a REST API', prompt: 'Build a production-ready REST API with FastAPI, SQLite, authentication, and CRUD endpoints for a todo app' }, + { icon: GitBranch, label: 'Create GitHub repo', prompt: 'Create a new GitHub repository, initialize it with a README, add a .gitignore for Python, and push initial code' }, + { icon: Brain, label: 'Analyze codebase', prompt: 'Analyze the current project structure and suggest improvements for code quality, performance, and maintainability' }, + { icon: Zap, label: 'Deploy to Vercel', prompt: 'Deploy this application to Vercel with proper environment variables and generate a production URL' }, +] + +export default function ChatPanel() { + const [input, setInput] = useState('') + const [mode, setMode] = useState<'chat' | 'agent'>('agent') + const messagesEndRef = useRef(null) + const inputRef = useRef(null) + const abortRef = useRef(null) + + const store = useAgentStore() + const { messages, sessionId, isStreaming, addMessage, setStreaming, appendChunk, updateMessage } = store + + useEffect(() => { + messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }) + }, [messages]) + + const handleSubmit = useCallback(async (e?: React.FormEvent) => { + e?.preventDefault() + const text = input.trim() + if (!text || isStreaming) return + + setInput('') + inputRef.current?.focus() + + // Add user message + addMessage({ role: 'user', content: text }) + + if (mode === 'agent') { + // Create autonomous task + try { + const assistantId = addMessage({ + role: 'assistant', + content: '', + streaming: true, + metadata: { mode: 'agent' }, + }) + setStreaming(true, assistantId) + + const result = await createTask(text, sessionId) + + updateMessage(assistantId, { + content: `πŸš€ **Task Created** \`${result.task_id}\`\n\nConnecting to execution stream... Watch the timeline β†’\n\n**Goal:** ${text}`, + streaming: false, + metadata: { task_id: result.task_id, mode: 'agent' }, + }) + setStreaming(false, null) + } catch (err: any) { + const id = addMessage({ + role: 'assistant', + content: `❌ Failed to create task: ${err.message}\n\nMake sure the backend is running at \`${process.env.NEXT_PUBLIC_API_URL}\``, + metadata: { error: true }, + }) + setStreaming(false, null) + } + } else { + // Streaming chat mode + const assistantId = addMessage({ + role: 'assistant', + content: '', + streaming: true, + metadata: { mode: 'chat' }, + }) + setStreaming(true, assistantId) + + const chatMessages = [ + ...messages.filter(m => !m.streaming).slice(-10).map(m => ({ + role: m.role as 'user' | 'assistant', + content: m.content, + })), + { role: 'user' as const, content: text }, + ] + + await streamChatSSE( + chatMessages, + sessionId, + (chunk) => appendChunk(assistantId, chunk), + (full) => { + updateMessage(assistantId, { content: full, streaming: false }) + setStreaming(false, null) + }, + (err) => { + updateMessage(assistantId, { + content: `❌ Stream error: ${err}`, + streaming: false, + metadata: { error: true }, + }) + setStreaming(false, null) + } + ) + } + }, [input, isStreaming, mode, messages, sessionId, addMessage, setStreaming, appendChunk, updateMessage]) + + const handleKeyDown = (e: React.KeyboardEvent) => { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault() + handleSubmit() + } + } + + const stopStreaming = () => { + abortRef.current?.abort() + setStreaming(false, null) + if (store.streamingMessageId) { + updateMessage(store.streamingMessageId, { streaming: false, content: store.messages.find(m => m.id === store.streamingMessageId)?.content + ' [stopped]' }) + } + } + + return ( +
+ {/* Header */} +
+
+
+ Agent Chat + {sessionId.slice(0, 12)}... +
+ {/* Mode switcher */} +
+ {(['agent', 'chat'] as const).map((m) => ( + + ))} +
+
+ + {/* Messages */} +
+ {messages.length === 0 && ( +
+
+
πŸ€–
+

Devin Agent

+

+ Autonomous AI engineering platform. Give me a goal and I'll plan, code, and execute it. +

+
+
+ {QUICK_ACTIONS.map(({ icon: Icon, label, prompt }) => ( + + ))} +
+
+ )} + + {messages.map((msg) => ( + + ))} +
+
+ + {/* Input */} +
+
+
+