File size: 3,752 Bytes
495efa0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import Optional, Literal
import asyncio, os, uuid, requests
from datetime import datetime
from skill_adapter import build_full_flow_plan, build_open_url_plan
app = FastAPI()
agents, queues, jobs, results, waiters = {}, {}, {}, {}, {}
ADMIN_SECRET = os.environ.get("ADMIN_SECRET")
def now(): return datetime.utcnow().isoformat()+"Z"
def cond_for(agent_id):
if agent_id not in waiters: waiters[agent_id]=asyncio.Condition()
return waiters[agent_id]
def push_job(agent_id, payload):
job_id = str(uuid.uuid4())
job = {"id":job_id,"agent_id":agent_id,"payload":payload,"status":"queued","created_at":now(),"updated_at":now()}
jobs[job_id]=job
queues.setdefault(agent_id,[]).append(job)
async def notify():
async with cond_for(agent_id): cond_for(agent_id).notify_all()
asyncio.create_task(notify())
return job
class RegisterBody(BaseModel): display_name:str
class RegisterResp(BaseModel): agent_id:str; token:str
class EnqueuePlanBody(BaseModel): agent_id:str; plan:list; admin_secret:Optional[str]=None
class SubmitResultBody(BaseModel):
agent_id:str; token:str; job_id:str
status:Literal["completed","failed"]; message:Optional[str]=None; artifacts:Optional[dict]=None
class FullFlowReq(BaseModel): agent_id:str; email:str; password:str; project_id:str; admin_secret:Optional[str]=None
class OpenURLReq(BaseModel): agent_id:str; url:str; admin_secret:Optional[str]=None
@app.post("/register", response_model=RegisterResp)
async def register(body:RegisterBody):
agent_id, token = str(uuid.uuid4()), str(uuid.uuid4())
agents[agent_id]={"agent_id":agent_id,"token":token,"display_name":body.display_name,"registered_at":now(),"last_seen":now()}
queues[agent_id]=[]
return RegisterResp(agent_id=agent_id, token=token)
@app.get("/agents")
async def list_agents(): return {"agents":list(agents.values())}
@app.get("/next-job")
async def next_job(agent_id:str, token:str, wait:int=25):
a=agents.get(agent_id)
if not a or a["token"]!=token: raise HTTPException(status_code=401, detail="bad agent/token")
a["last_seen"]=now(); q=queues.get(agent_id,[])
if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job}
c=cond_for(agent_id)
try:
async with asyncio.timeout(wait):
async with c: await c.wait()
q=queues.get(agent_id,[])
if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job}
except: pass
return {}
@app.post("/submit-result")
async def submit_result(body:SubmitResultBody):
j=jobs.get(body.job_id);
if not j: raise HTTPException(status_code=404, detail="no job")
j["status"]=body.status; j["updated_at"]=now()
results[body.job_id]={"status":body.status,"message":body.message,"artifacts":body.artifacts,"submitted_at":now()}
return {"ok":True}
@app.post("/skill/full-flow")
async def skill_full_flow(body:FullFlowReq):
if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret")
plan=build_full_flow_plan(body.email, body.password, body.project_id)
job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job}
@app.post("/skill/open-url")
async def skill_open_url(body:OpenURLReq):
if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret")
plan=build_open_url_plan(body.url)
job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job}
@app.get("/")
async def home(): return HTMLResponse("<h3>SkillFlow Server Running</h3>")
|