auto / api.py
zenefil's picture
Upload 4 files
495efa0 verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import Optional, Literal
import asyncio, os, uuid, requests
from datetime import datetime
from skill_adapter import build_full_flow_plan, build_open_url_plan
app = FastAPI()
agents, queues, jobs, results, waiters = {}, {}, {}, {}, {}
ADMIN_SECRET = os.environ.get("ADMIN_SECRET")
def now(): return datetime.utcnow().isoformat()+"Z"
def cond_for(agent_id):
if agent_id not in waiters: waiters[agent_id]=asyncio.Condition()
return waiters[agent_id]
def push_job(agent_id, payload):
job_id = str(uuid.uuid4())
job = {"id":job_id,"agent_id":agent_id,"payload":payload,"status":"queued","created_at":now(),"updated_at":now()}
jobs[job_id]=job
queues.setdefault(agent_id,[]).append(job)
async def notify():
async with cond_for(agent_id): cond_for(agent_id).notify_all()
asyncio.create_task(notify())
return job
class RegisterBody(BaseModel): display_name:str
class RegisterResp(BaseModel): agent_id:str; token:str
class EnqueuePlanBody(BaseModel): agent_id:str; plan:list; admin_secret:Optional[str]=None
class SubmitResultBody(BaseModel):
agent_id:str; token:str; job_id:str
status:Literal["completed","failed"]; message:Optional[str]=None; artifacts:Optional[dict]=None
class FullFlowReq(BaseModel): agent_id:str; email:str; password:str; project_id:str; admin_secret:Optional[str]=None
class OpenURLReq(BaseModel): agent_id:str; url:str; admin_secret:Optional[str]=None
@app.post("/register", response_model=RegisterResp)
async def register(body:RegisterBody):
agent_id, token = str(uuid.uuid4()), str(uuid.uuid4())
agents[agent_id]={"agent_id":agent_id,"token":token,"display_name":body.display_name,"registered_at":now(),"last_seen":now()}
queues[agent_id]=[]
return RegisterResp(agent_id=agent_id, token=token)
@app.get("/agents")
async def list_agents(): return {"agents":list(agents.values())}
@app.get("/next-job")
async def next_job(agent_id:str, token:str, wait:int=25):
a=agents.get(agent_id)
if not a or a["token"]!=token: raise HTTPException(status_code=401, detail="bad agent/token")
a["last_seen"]=now(); q=queues.get(agent_id,[])
if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job}
c=cond_for(agent_id)
try:
async with asyncio.timeout(wait):
async with c: await c.wait()
q=queues.get(agent_id,[])
if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job}
except: pass
return {}
@app.post("/submit-result")
async def submit_result(body:SubmitResultBody):
j=jobs.get(body.job_id);
if not j: raise HTTPException(status_code=404, detail="no job")
j["status"]=body.status; j["updated_at"]=now()
results[body.job_id]={"status":body.status,"message":body.message,"artifacts":body.artifacts,"submitted_at":now()}
return {"ok":True}
@app.post("/skill/full-flow")
async def skill_full_flow(body:FullFlowReq):
if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret")
plan=build_full_flow_plan(body.email, body.password, body.project_id)
job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job}
@app.post("/skill/open-url")
async def skill_open_url(body:OpenURLReq):
if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret")
plan=build_open_url_plan(body.url)
job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job}
@app.get("/")
async def home(): return HTMLResponse("<h3>SkillFlow Server Running</h3>")