Avinash commited on
Commit
1561669
·
1 Parent(s): 96acc99

phase2: add worker job queue endpoints

Browse files
Files changed (1) hide show
  1. backend/api.py +69 -0
backend/api.py CHANGED
@@ -131,3 +131,72 @@ def notify_unknown(payload: Dict[str, Any]):
131
  )
132
 
133
  return {"ok": True}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
  )
132
 
133
  return {"ok": True}
134
+
135
+ # ------------------------------------------------------------------
136
+ # Worker job queue (in-memory, Phase 2)
137
+ # ------------------------------------------------------------------
138
+
139
+ from collections import deque
140
+ from dataclasses import dataclass, asdict
141
+ from typing import Optional
142
+ from uuid import uuid4
143
+ import time
144
+ from fastapi import Header
145
+ from fastapi.responses import JSONResponse
146
+ from pydantic import BaseModel
147
+
148
+ WORKER_TOKEN = os.getenv("WORKER_TOKEN", "")
149
+
150
+ def _require_worker_token(x_worker_token: Optional[str]):
151
+ if not WORKER_TOKEN:
152
+ raise HTTPException(status_code=500, detail="Server missing WORKER_TOKEN")
153
+ if not x_worker_token or x_worker_token != WORKER_TOKEN:
154
+ raise HTTPException(status_code=401, detail="Unauthorized worker")
155
+
156
+ @dataclass
157
+ class Job:
158
+ id: str
159
+ payload: dict
160
+ created_at: float
161
+ status: str = "queued" # queued|running|done|failed
162
+ message: str = ""
163
+
164
+ JOBQ: deque[Job] = deque()
165
+ JOBS: dict[str, Job] = {}
166
+
167
+ class EnqueueReq(BaseModel):
168
+ payload: dict
169
+
170
+ @app.post("/api/jobs/enqueue")
171
+ def enqueue_job(req: EnqueueReq):
172
+ job_id = uuid4().hex
173
+ job = Job(id=job_id, payload=req.payload, created_at=time.time())
174
+ JOBQ.append(job)
175
+ JOBS[job_id] = job
176
+ return {"job_id": job_id, "status": job.status}
177
+
178
+ @app.get("/api/jobs/next")
179
+ def jobs_next(x_worker_token: Optional[str] = Header(default=None)):
180
+ _require_worker_token(x_worker_token)
181
+ while JOBQ:
182
+ job = JOBQ.popleft()
183
+ if job.status == "queued":
184
+ job.status = "running"
185
+ return asdict(job)
186
+ return JSONResponse(status_code=204, content=None)
187
+
188
+ class StatusReq(BaseModel):
189
+ status: str # running|done|failed
190
+ message: str = ""
191
+
192
+ @app.post("/api/jobs/{job_id}/status")
193
+ def jobs_status(job_id: str, req: StatusReq, x_worker_token: Optional[str] = Header(default=None)):
194
+ _require_worker_token(x_worker_token)
195
+ job = JOBS.get(job_id)
196
+ if not job:
197
+ raise HTTPException(status_code=404, detail="Unknown job_id")
198
+ if req.status not in ("running", "done", "failed"):
199
+ raise HTTPException(status_code=400, detail="Invalid status")
200
+ job.status = req.status
201
+ job.message = req.message
202
+ return {"ok": True, "job_id": job_id, "status": job.status}