|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import HTMLResponse |
|
|
from pydantic import BaseModel |
|
|
from typing import Optional, Literal |
|
|
import asyncio, os, uuid, requests |
|
|
from datetime import datetime |
|
|
from skill_adapter import build_full_flow_plan, build_open_url_plan |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
agents, queues, jobs, results, waiters = {}, {}, {}, {}, {} |
|
|
ADMIN_SECRET = os.environ.get("ADMIN_SECRET") |
|
|
|
|
|
def now(): return datetime.utcnow().isoformat()+"Z" |
|
|
|
|
|
def cond_for(agent_id): |
|
|
if agent_id not in waiters: waiters[agent_id]=asyncio.Condition() |
|
|
return waiters[agent_id] |
|
|
|
|
|
def push_job(agent_id, payload): |
|
|
job_id = str(uuid.uuid4()) |
|
|
job = {"id":job_id,"agent_id":agent_id,"payload":payload,"status":"queued","created_at":now(),"updated_at":now()} |
|
|
jobs[job_id]=job |
|
|
queues.setdefault(agent_id,[]).append(job) |
|
|
async def notify(): |
|
|
async with cond_for(agent_id): cond_for(agent_id).notify_all() |
|
|
asyncio.create_task(notify()) |
|
|
return job |
|
|
|
|
|
class RegisterBody(BaseModel): display_name:str |
|
|
class RegisterResp(BaseModel): agent_id:str; token:str |
|
|
class EnqueuePlanBody(BaseModel): agent_id:str; plan:list; admin_secret:Optional[str]=None |
|
|
class SubmitResultBody(BaseModel): |
|
|
agent_id:str; token:str; job_id:str |
|
|
status:Literal["completed","failed"]; message:Optional[str]=None; artifacts:Optional[dict]=None |
|
|
class FullFlowReq(BaseModel): agent_id:str; email:str; password:str; project_id:str; admin_secret:Optional[str]=None |
|
|
class OpenURLReq(BaseModel): agent_id:str; url:str; admin_secret:Optional[str]=None |
|
|
|
|
|
@app.post("/register", response_model=RegisterResp) |
|
|
async def register(body:RegisterBody): |
|
|
agent_id, token = str(uuid.uuid4()), str(uuid.uuid4()) |
|
|
agents[agent_id]={"agent_id":agent_id,"token":token,"display_name":body.display_name,"registered_at":now(),"last_seen":now()} |
|
|
queues[agent_id]=[] |
|
|
return RegisterResp(agent_id=agent_id, token=token) |
|
|
|
|
|
@app.get("/agents") |
|
|
async def list_agents(): return {"agents":list(agents.values())} |
|
|
|
|
|
@app.get("/next-job") |
|
|
async def next_job(agent_id:str, token:str, wait:int=25): |
|
|
a=agents.get(agent_id) |
|
|
if not a or a["token"]!=token: raise HTTPException(status_code=401, detail="bad agent/token") |
|
|
a["last_seen"]=now(); q=queues.get(agent_id,[]) |
|
|
if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job} |
|
|
c=cond_for(agent_id) |
|
|
try: |
|
|
async with asyncio.timeout(wait): |
|
|
async with c: await c.wait() |
|
|
q=queues.get(agent_id,[]) |
|
|
if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job} |
|
|
except: pass |
|
|
return {} |
|
|
|
|
|
@app.post("/submit-result") |
|
|
async def submit_result(body:SubmitResultBody): |
|
|
j=jobs.get(body.job_id); |
|
|
if not j: raise HTTPException(status_code=404, detail="no job") |
|
|
j["status"]=body.status; j["updated_at"]=now() |
|
|
results[body.job_id]={"status":body.status,"message":body.message,"artifacts":body.artifacts,"submitted_at":now()} |
|
|
return {"ok":True} |
|
|
|
|
|
@app.post("/skill/full-flow") |
|
|
async def skill_full_flow(body:FullFlowReq): |
|
|
if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret") |
|
|
plan=build_full_flow_plan(body.email, body.password, body.project_id) |
|
|
job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job} |
|
|
|
|
|
@app.post("/skill/open-url") |
|
|
async def skill_open_url(body:OpenURLReq): |
|
|
if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret") |
|
|
plan=build_open_url_plan(body.url) |
|
|
job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job} |
|
|
|
|
|
@app.get("/") |
|
|
async def home(): return HTMLResponse("<h3>SkillFlow Server Running</h3>") |
|
|
|