| """ |
| Chat + Goal API Routes β Real-time streaming responses |
| """ |
|
|
| import asyncio |
| import json |
| import time |
| import uuid |
|
|
| from fastapi import APIRouter, HTTPException, Request |
| from fastapi.responses import StreamingResponse |
|
|
| from core.models import ChatRequest, GoalRequest, TaskCreateRequest |
| from memory.db import save_memory, get_history |
|
|
| router = APIRouter() |
|
|
|
|
| def get_engine(request: Request): |
| return request.app.state.task_engine |
|
|
|
|
| def get_ws(request: Request): |
| return request.app.state.ws_manager |
|
|
|
|
| |
|
|
| @router.post("/chat", summary="Chat with the agent") |
| async def chat(req: ChatRequest, request: Request): |
| from core.agent import AgentCore |
| ws = get_ws(request) |
| agent = AgentCore(ws) |
|
|
| messages = [{"role": m.role, "content": m.content} for m in req.messages] |
|
|
| if req.stream: |
| async def stream_gen(): |
| async def _run(): |
| result = await agent.llm_stream( |
| messages=messages, |
| session_id=req.session_id, |
| model=req.model, |
| temperature=req.temperature, |
| max_tokens=req.max_tokens, |
| ) |
| await save_memory( |
| content=result, |
| memory_type="conversation", |
| session_id=req.session_id, |
| project_id=req.project_id, |
| key="assistant", |
| ) |
| |
| user_msg = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") |
| await save_memory( |
| content=user_msg, |
| memory_type="conversation", |
| session_id=req.session_id, |
| project_id=req.project_id, |
| key="user", |
| ) |
| return result |
|
|
| room_buffer = [] |
| original_emit_chat = ws.emit_chat |
| async def capture_emit(sid, etype, data): |
| if etype == "llm_chunk": |
| chunk = data.get("chunk", "") |
| room_buffer.append(chunk) |
| yield_data = json.dumps({"type": etype, "data": data, "session_id": sid}) |
| return yield_data |
| return None |
|
|
| |
| full = "" |
| from core.agent import AgentCore as _A |
| import httpx |
| import os |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") |
| ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") |
|
|
| if OPENAI_API_KEY: |
| headers = { |
| "Authorization": f"Bearer {OPENAI_API_KEY}", |
| "Content-Type": "application/json", |
| } |
| payload = { |
| "model": req.model, |
| "messages": messages, |
| "stream": True, |
| "temperature": req.temperature, |
| "max_tokens": req.max_tokens, |
| } |
| from core.agent import OPENAI_BASE_URL |
| async with httpx.AsyncClient(timeout=120) as client: |
| async with client.stream("POST", f"{OPENAI_BASE_URL}/chat/completions", |
| headers=headers, json=payload) as resp: |
| async for line in resp.aiter_lines(): |
| if not line.startswith("data:"): |
| continue |
| chunk_str = line[6:].strip() |
| if chunk_str == "[DONE]": |
| break |
| try: |
| data = json.loads(chunk_str) |
| delta = data["choices"][0]["delta"].get("content", "") |
| if delta: |
| full += delta |
| yield f"data: {json.dumps({'type': 'llm_chunk', 'data': {'chunk': delta}, 'session_id': req.session_id})}\n\n" |
| except Exception: |
| pass |
| else: |
| |
| demo = ( |
| f"Hello! I'm your Devin-style AI Agent. I received: '{req.messages[-1].content[:80]}'. " |
| f"Set OPENAI_API_KEY or ANTHROPIC_API_KEY for real AI responses. " |
| f"I support real-time streaming, task planning, GitHub automation, and more!" |
| ) |
| for word in demo.split(): |
| chunk = word + " " |
| full += chunk |
| await asyncio.sleep(0.04) |
| yield f"data: {json.dumps({'type': 'llm_chunk', 'data': {'chunk': chunk}, 'session_id': req.session_id})}\n\n" |
|
|
| yield f"data: {json.dumps({'type': 'stream_end', 'data': {'full_response': full}, 'session_id': req.session_id})}\n\n" |
|
|
| return StreamingResponse( |
| stream_gen(), |
| media_type="text/event-stream", |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, |
| ) |
| else: |
| |
| agent = AgentCore(get_ws(request)) |
| result = await agent.llm_stream(messages, session_id=req.session_id) |
| return { |
| "response": result, |
| "session_id": req.session_id, |
| "model": req.model, |
| "timestamp": time.time(), |
| } |
|
|
|
|
| @router.post("/chat/stream", summary="Explicit streaming chat endpoint") |
| async def chat_stream(req: ChatRequest, request: Request): |
| req.stream = True |
| return await chat(req, request) |
|
|
|
|
| |
|
|
| @router.post("/goal", summary="Submit a high-level goal to the agent") |
| async def submit_goal(req: GoalRequest, request: Request): |
| engine = get_engine(request) |
| task_req = TaskCreateRequest( |
| goal=req.goal, |
| session_id=req.session_id, |
| project_id=req.project_id, |
| stream=req.stream, |
| metadata={"source": "goal_api", "github_repo": req.github_repo}, |
| ) |
| task_id = await engine.submit(task_req) |
| return { |
| "task_id": task_id, |
| "goal": req.goal, |
| "status": "queued", |
| "session_id": req.session_id, |
| "ws_url": f"/ws/tasks/{task_id}", |
| "stream_url": f"/api/v1/tasks/{task_id}/stream", |
| } |
|
|
|
|
| @router.post("/goal/stream", summary="Submit goal with SSE streaming response") |
| async def submit_goal_stream(req: GoalRequest, request: Request): |
| req.stream = True |
| return await submit_goal(req, request) |
|
|
|
|
| |
|
|
| @router.post("/execute", summary="Execute a tool directly") |
| async def execute( |
| tool: str, |
| task: str, |
| request: Request, |
| session_id: str = "", |
| ): |
| from tools.executor import ToolExecutor |
| ws = get_ws(request) |
| executor = ToolExecutor(ws) |
| result = await executor.run( |
| tool=tool, |
| task=task, |
| session_id=session_id, |
| ) |
| return {"tool": tool, "task": task, "result": result, "session_id": session_id} |
|
|
|
|
| |
|
|
| @router.post("/plan", summary="Generate execution plan for a goal") |
| async def generate_plan(req: GoalRequest, request: Request): |
| from core.agent import AgentCore |
| ws = get_ws(request) |
| agent = AgentCore(ws) |
| task_id = f"plan_{uuid.uuid4().hex[:8]}" |
| plan = await agent.plan(goal=req.goal, task_id=task_id, session_id=req.session_id) |
| return { |
| "goal": req.goal, |
| "plan": plan.model_dump(), |
| "session_id": req.session_id, |
| "task_id": task_id, |
| } |
|
|