from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel from typing import Optional, Literal import asyncio, os, uuid, requests from datetime import datetime from skill_adapter import build_full_flow_plan, build_open_url_plan app = FastAPI() agents, queues, jobs, results, waiters = {}, {}, {}, {}, {} ADMIN_SECRET = os.environ.get("ADMIN_SECRET") def now(): return datetime.utcnow().isoformat()+"Z" def cond_for(agent_id): if agent_id not in waiters: waiters[agent_id]=asyncio.Condition() return waiters[agent_id] def push_job(agent_id, payload): job_id = str(uuid.uuid4()) job = {"id":job_id,"agent_id":agent_id,"payload":payload,"status":"queued","created_at":now(),"updated_at":now()} jobs[job_id]=job queues.setdefault(agent_id,[]).append(job) async def notify(): async with cond_for(agent_id): cond_for(agent_id).notify_all() asyncio.create_task(notify()) return job class RegisterBody(BaseModel): display_name:str class RegisterResp(BaseModel): agent_id:str; token:str class EnqueuePlanBody(BaseModel): agent_id:str; plan:list; admin_secret:Optional[str]=None class SubmitResultBody(BaseModel): agent_id:str; token:str; job_id:str status:Literal["completed","failed"]; message:Optional[str]=None; artifacts:Optional[dict]=None class FullFlowReq(BaseModel): agent_id:str; email:str; password:str; project_id:str; admin_secret:Optional[str]=None class OpenURLReq(BaseModel): agent_id:str; url:str; admin_secret:Optional[str]=None @app.post("/register", response_model=RegisterResp) async def register(body:RegisterBody): agent_id, token = str(uuid.uuid4()), str(uuid.uuid4()) agents[agent_id]={"agent_id":agent_id,"token":token,"display_name":body.display_name,"registered_at":now(),"last_seen":now()} queues[agent_id]=[] return RegisterResp(agent_id=agent_id, token=token) @app.get("/agents") async def list_agents(): return {"agents":list(agents.values())} @app.get("/next-job") async def next_job(agent_id:str, token:str, wait:int=25): a=agents.get(agent_id) if not a or a["token"]!=token: raise HTTPException(status_code=401, detail="bad agent/token") a["last_seen"]=now(); q=queues.get(agent_id,[]) if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job} c=cond_for(agent_id) try: async with asyncio.timeout(wait): async with c: await c.wait() q=queues.get(agent_id,[]) if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job} except: pass return {} @app.post("/submit-result") async def submit_result(body:SubmitResultBody): j=jobs.get(body.job_id); if not j: raise HTTPException(status_code=404, detail="no job") j["status"]=body.status; j["updated_at"]=now() results[body.job_id]={"status":body.status,"message":body.message,"artifacts":body.artifacts,"submitted_at":now()} return {"ok":True} @app.post("/skill/full-flow") async def skill_full_flow(body:FullFlowReq): if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret") plan=build_full_flow_plan(body.email, body.password, body.project_id) job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job} @app.post("/skill/open-url") async def skill_open_url(body:OpenURLReq): if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret") plan=build_open_url_plan(body.url) job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job} @app.get("/") async def home(): return HTMLResponse("