| package scheduler |
|
|
| import ( |
| "encoding/json" |
| "errors" |
| "log/slog" |
| "net/http" |
|
|
| "github.com/pinchtab/pinchtab/internal/web" |
| ) |
|
|
| var ( |
| errMissingAgentID = errors.New("missing required field 'agentId'") |
| errEmptyBatch = errors.New("batch must contain at least one task") |
| ) |
|
|
| |
| type BatchRequest struct { |
| AgentID string `json:"agentId"` |
| CallbackURL string `json:"callbackUrl,omitempty"` |
| Tasks []BatchTaskDef `json:"tasks"` |
| } |
|
|
| |
| type BatchTaskDef struct { |
| Action string `json:"action"` |
| TabID string `json:"tabId,omitempty"` |
| Ref string `json:"ref,omitempty"` |
| Params map[string]any `json:"params,omitempty"` |
| Priority int `json:"priority,omitempty"` |
| Deadline string `json:"deadline,omitempty"` |
| } |
|
|
| |
| type BatchResponseItem struct { |
| TaskID string `json:"taskId"` |
| State TaskState `json:"state"` |
| Position int `json:"position,omitempty"` |
| Error string `json:"error,omitempty"` |
| } |
|
|
| func (s *Scheduler) handleBatch(w http.ResponseWriter, r *http.Request) { |
| var req BatchRequest |
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { |
| web.Error(w, 400, err) |
| return |
| } |
|
|
| if req.AgentID == "" { |
| web.Error(w, 400, errMissingAgentID) |
| return |
| } |
| if len(req.Tasks) == 0 { |
| web.Error(w, 400, errEmptyBatch) |
| return |
| } |
| s.cfgMu.RLock() |
| maxBatch := s.cfg.MaxBatchSize |
| s.cfgMu.RUnlock() |
| if len(req.Tasks) > maxBatch { |
| web.ErrorCode(w, 400, "batch_too_large", "batch exceeds maximum size", false, map[string]any{ |
| "submitted": len(req.Tasks), |
| "max": maxBatch, |
| }) |
| return |
| } |
|
|
| results := make([]BatchResponseItem, 0, len(req.Tasks)) |
| for _, td := range req.Tasks { |
| sr := SubmitRequest{ |
| AgentID: req.AgentID, |
| Action: td.Action, |
| TabID: td.TabID, |
| Ref: td.Ref, |
| Params: td.Params, |
| Priority: td.Priority, |
| Deadline: td.Deadline, |
| CallbackURL: req.CallbackURL, |
| } |
|
|
| task, err := s.Submit(sr) |
| if err != nil { |
| item := BatchResponseItem{State: StateRejected, Error: err.Error()} |
| if task != nil { |
| item.TaskID = task.ID |
| } |
| results = append(results, item) |
| slog.Warn("batch: task rejected", "agent", req.AgentID, "action", td.Action, "err", err) |
| continue |
| } |
|
|
| results = append(results, BatchResponseItem{ |
| TaskID: task.ID, |
| State: task.GetState(), |
| Position: task.Position, |
| }) |
| } |
|
|
| web.JSON(w, 202, map[string]any{ |
| "tasks": results, |
| "submitted": len(results), |
| }) |
| } |
|
|