File size: 3,752 Bytes
495efa0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import Optional, Literal
import asyncio, os, uuid, requests
from datetime import datetime
from skill_adapter import build_full_flow_plan, build_open_url_plan

app = FastAPI()

agents, queues, jobs, results, waiters = {}, {}, {}, {}, {}
ADMIN_SECRET = os.environ.get("ADMIN_SECRET")

def now(): return datetime.utcnow().isoformat()+"Z"

def cond_for(agent_id): 
    if agent_id not in waiters: waiters[agent_id]=asyncio.Condition()
    return waiters[agent_id]

def push_job(agent_id, payload):
    job_id = str(uuid.uuid4())
    job = {"id":job_id,"agent_id":agent_id,"payload":payload,"status":"queued","created_at":now(),"updated_at":now()}
    jobs[job_id]=job
    queues.setdefault(agent_id,[]).append(job)
    async def notify(): 
        async with cond_for(agent_id): cond_for(agent_id).notify_all()
    asyncio.create_task(notify())
    return job

class RegisterBody(BaseModel): display_name:str
class RegisterResp(BaseModel): agent_id:str; token:str
class EnqueuePlanBody(BaseModel): agent_id:str; plan:list; admin_secret:Optional[str]=None
class SubmitResultBody(BaseModel):
    agent_id:str; token:str; job_id:str
    status:Literal["completed","failed"]; message:Optional[str]=None; artifacts:Optional[dict]=None
class FullFlowReq(BaseModel): agent_id:str; email:str; password:str; project_id:str; admin_secret:Optional[str]=None
class OpenURLReq(BaseModel): agent_id:str; url:str; admin_secret:Optional[str]=None

@app.post("/register", response_model=RegisterResp)
async def register(body:RegisterBody):
    agent_id, token = str(uuid.uuid4()), str(uuid.uuid4())
    agents[agent_id]={"agent_id":agent_id,"token":token,"display_name":body.display_name,"registered_at":now(),"last_seen":now()}
    queues[agent_id]=[]
    return RegisterResp(agent_id=agent_id, token=token)

@app.get("/agents") 
async def list_agents(): return {"agents":list(agents.values())}

@app.get("/next-job")
async def next_job(agent_id:str, token:str, wait:int=25):
    a=agents.get(agent_id)
    if not a or a["token"]!=token: raise HTTPException(status_code=401, detail="bad agent/token")
    a["last_seen"]=now(); q=queues.get(agent_id,[])
    if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job}
    c=cond_for(agent_id)
    try:
        async with asyncio.timeout(wait):
            async with c: await c.wait()
            q=queues.get(agent_id,[])
            if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job}
    except: pass
    return {}

@app.post("/submit-result")
async def submit_result(body:SubmitResultBody):
    j=jobs.get(body.job_id); 
    if not j: raise HTTPException(status_code=404, detail="no job")
    j["status"]=body.status; j["updated_at"]=now()
    results[body.job_id]={"status":body.status,"message":body.message,"artifacts":body.artifacts,"submitted_at":now()}
    return {"ok":True}

@app.post("/skill/full-flow")
async def skill_full_flow(body:FullFlowReq):
    if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret")
    plan=build_full_flow_plan(body.email, body.password, body.project_id)
    job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job}

@app.post("/skill/open-url")
async def skill_open_url(body:OpenURLReq):
    if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret")
    plan=build_open_url_plan(body.url)
    job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job}

@app.get("/")
async def home(): return HTMLResponse("<h3>SkillFlow Server Running</h3>")