Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import HTMLResponse | |
| from pydantic import BaseModel | |
| from typing import Optional, Literal | |
| import asyncio, os, uuid, requests | |
| from datetime import datetime | |
| from skill_adapter import build_full_flow_plan, build_open_url_plan | |
| app = FastAPI() | |
| agents, queues, jobs, results, waiters = {}, {}, {}, {}, {} | |
| ADMIN_SECRET = os.environ.get("ADMIN_SECRET") | |
| def now(): return datetime.utcnow().isoformat()+"Z" | |
| def cond_for(agent_id): | |
| if agent_id not in waiters: waiters[agent_id]=asyncio.Condition() | |
| return waiters[agent_id] | |
| def push_job(agent_id, payload): | |
| job_id = str(uuid.uuid4()) | |
| job = {"id":job_id,"agent_id":agent_id,"payload":payload,"status":"queued","created_at":now(),"updated_at":now()} | |
| jobs[job_id]=job | |
| queues.setdefault(agent_id,[]).append(job) | |
| async def notify(): | |
| async with cond_for(agent_id): cond_for(agent_id).notify_all() | |
| asyncio.create_task(notify()) | |
| return job | |
| class RegisterBody(BaseModel): display_name:str | |
| class RegisterResp(BaseModel): agent_id:str; token:str | |
| class EnqueuePlanBody(BaseModel): agent_id:str; plan:list; admin_secret:Optional[str]=None | |
| class SubmitResultBody(BaseModel): | |
| agent_id:str; token:str; job_id:str | |
| status:Literal["completed","failed"]; message:Optional[str]=None; artifacts:Optional[dict]=None | |
| class FullFlowReq(BaseModel): agent_id:str; email:str; password:str; project_id:str; admin_secret:Optional[str]=None | |
| class OpenURLReq(BaseModel): agent_id:str; url:str; admin_secret:Optional[str]=None | |
| async def register(body:RegisterBody): | |
| agent_id, token = str(uuid.uuid4()), str(uuid.uuid4()) | |
| agents[agent_id]={"agent_id":agent_id,"token":token,"display_name":body.display_name,"registered_at":now(),"last_seen":now()} | |
| queues[agent_id]=[] | |
| return RegisterResp(agent_id=agent_id, token=token) | |
| async def list_agents(): return {"agents":list(agents.values())} | |
| async def next_job(agent_id:str, token:str, wait:int=25): | |
| a=agents.get(agent_id) | |
| if not a or a["token"]!=token: raise HTTPException(status_code=401, detail="bad agent/token") | |
| a["last_seen"]=now(); q=queues.get(agent_id,[]) | |
| if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job} | |
| c=cond_for(agent_id) | |
| try: | |
| async with asyncio.timeout(wait): | |
| async with c: await c.wait() | |
| q=queues.get(agent_id,[]) | |
| if q: job=q.pop(0); job["status"]="taken"; job["updated_at"]=now(); return {"job":job} | |
| except: pass | |
| return {} | |
| async def submit_result(body:SubmitResultBody): | |
| j=jobs.get(body.job_id); | |
| if not j: raise HTTPException(status_code=404, detail="no job") | |
| j["status"]=body.status; j["updated_at"]=now() | |
| results[body.job_id]={"status":body.status,"message":body.message,"artifacts":body.artifacts,"submitted_at":now()} | |
| return {"ok":True} | |
| async def skill_full_flow(body:FullFlowReq): | |
| if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret") | |
| plan=build_full_flow_plan(body.email, body.password, body.project_id) | |
| job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job} | |
| async def skill_open_url(body:OpenURLReq): | |
| if ADMIN_SECRET and body.admin_secret!=ADMIN_SECRET: raise HTTPException(status_code=401, detail="bad secret") | |
| plan=build_open_url_plan(body.url) | |
| job=push_job(body.agent_id, {"type":"plan","steps":plan}); return {"job":job} | |
| async def home(): return HTMLResponse("<h3>SkillFlow Server Running</h3>") | |