File size: 4,965 Bytes
bcb86b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
"""
FastAPI REST API for Computer-Using Agent
Provides HTTP endpoints for agent control and interaction
"""
from fastapi import FastAPI, HTTPException, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, Any
import asyncio
from loguru import logger
from .cua_agent import ComputerUsingAgent
# Initialize FastAPI app
app = FastAPI(
title="Computer-Using Agent API",
description="REST API for controlling the computer-using agent",
version="1.0.0"
)
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize agent
agent = ComputerUsingAgent()
# Request/Response models
class TaskRequest(BaseModel):
task: str
class TaskResponse(BaseModel):
success: bool
message: str
screenshot: Optional[str] = None
task: str
class StatusResponse(BaseModel):
status: str
current_task: Optional[str]
display: str
active_window: Dict[str, Any]
class ScreenshotResponse(BaseModel):
screenshot: str
timestamp: str
# API Endpoints
@app.get("/")
async def root():
"""API root endpoint"""
return {
"name": "Computer-Using Agent API",
"version": "1.0.0",
"status": "running",
"endpoints": {
"status": "/agent/status",
"execute": "/agent/execute",
"screenshot": "/agent/screenshot",
"stop": "/agent/stop",
"docs": "/docs"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}
@app.get("/agent/status", response_model=StatusResponse)
async def get_status():
"""
Get current agent status
Returns agent status, current task, and active window information
"""
try:
status = agent.get_status()
return StatusResponse(**status)
except Exception as e:
logger.error(f"Error getting status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/agent/execute", response_model=TaskResponse)
async def execute_task(request: TaskRequest):
"""
Execute a task using the computer-using agent
Args:
request: Task request with natural language description
Returns:
Task execution result with screenshot
"""
try:
logger.info(f"Received task: {request.task}")
result = agent.execute_task(request.task)
return TaskResponse(**result)
except Exception as e:
logger.error(f"Error executing task: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/agent/screenshot", response_model=ScreenshotResponse)
async def capture_screenshot():
"""
Capture a screenshot of the desktop
Returns:
Screenshot as base64-encoded PNG
"""
try:
screenshot_b64 = agent.get_screenshot_base64()
if screenshot_b64:
import datetime
return ScreenshotResponse(
screenshot=screenshot_b64,
timestamp=datetime.datetime.now().isoformat()
)
else:
raise HTTPException(status_code=500, detail="Failed to capture screenshot")
except Exception as e:
logger.error(f"Error capturing screenshot: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/agent/stop")
async def stop_agent():
"""
Stop the current agent task
Returns:
Success message
"""
try:
agent.stop()
return {"message": "Agent stopped", "status": "stopped"}
except Exception as e:
logger.error(f"Error stopping agent: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.websocket("/ws/agent")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for real-time agent updates
Streams agent status and task updates
"""
await websocket.accept()
logger.info("WebSocket client connected")
try:
while True:
# Send status update every 2 seconds
status = agent.get_status()
await websocket.send_json(status)
await asyncio.sleep(2)
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
logger.info("WebSocket client disconnected")
# Startup event
@app.on_event("startup")
async def startup_event():
"""Initialize services on startup"""
logger.info("Agent API starting up")
# Create logs directory if it doesn't exist
import os
os.makedirs("/app/logs", exist_ok=True)
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
logger.info("Agent API shutting down")
agent.stop()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
|