| package scheduler |
|
|
| import ( |
| "container/heap" |
| "fmt" |
| "sync" |
| ) |
|
|
| |
| type TaskQueue struct { |
| mu sync.Mutex |
| agents map[string]*agentQueue |
| totalCount int |
| maxTotal int |
| maxPerAgent int |
| } |
|
|
| type agentQueue struct { |
| tasks taskHeap |
| inflight int |
| } |
|
|
| |
| func NewTaskQueue(maxTotal, maxPerAgent int) *TaskQueue { |
| return &TaskQueue{ |
| agents: make(map[string]*agentQueue), |
| maxTotal: maxTotal, |
| maxPerAgent: maxPerAgent, |
| } |
| } |
|
|
| |
| func (q *TaskQueue) SetLimits(maxTotal, maxPerAgent int) { |
| q.mu.Lock() |
| defer q.mu.Unlock() |
| if maxTotal > 0 { |
| q.maxTotal = maxTotal |
| } |
| if maxPerAgent > 0 { |
| q.maxPerAgent = maxPerAgent |
| } |
| } |
|
|
| |
| func (q *TaskQueue) Enqueue(t *Task) (int, error) { |
| q.mu.Lock() |
| defer q.mu.Unlock() |
|
|
| if q.totalCount >= q.maxTotal { |
| return 0, fmt.Errorf("global queue full (%d/%d)", q.totalCount, q.maxTotal) |
| } |
|
|
| aq, ok := q.agents[t.AgentID] |
| if !ok { |
| aq = &agentQueue{} |
| heap.Init(&aq.tasks) |
| q.agents[t.AgentID] = aq |
| } |
|
|
| if aq.tasks.Len() >= q.maxPerAgent { |
| return 0, fmt.Errorf("agent queue full for %q (%d/%d)", t.AgentID, aq.tasks.Len(), q.maxPerAgent) |
| } |
|
|
| heap.Push(&aq.tasks, t) |
| q.totalCount++ |
| return q.totalCount, nil |
| } |
|
|
| |
| |
| |
| func (q *TaskQueue) Dequeue(maxPerAgentInflight, maxGlobalInflight int) *Task { |
| q.mu.Lock() |
| defer q.mu.Unlock() |
|
|
| globalInflight := 0 |
| for _, aq := range q.agents { |
| globalInflight += aq.inflight |
| } |
| if globalInflight >= maxGlobalInflight { |
| return nil |
| } |
|
|
| var bestAgent string |
| bestInflight := int(^uint(0) >> 1) |
|
|
| for agentID, aq := range q.agents { |
| if aq.tasks.Len() == 0 { |
| continue |
| } |
| if aq.inflight >= maxPerAgentInflight { |
| continue |
| } |
| if aq.inflight < bestInflight { |
| bestInflight = aq.inflight |
| bestAgent = agentID |
| } |
| } |
|
|
| if bestAgent == "" { |
| return nil |
| } |
|
|
| aq := q.agents[bestAgent] |
| t := heap.Pop(&aq.tasks).(*Task) |
| aq.inflight++ |
| q.totalCount-- |
| return t |
| } |
|
|
| |
| func (q *TaskQueue) Complete(agentID string) { |
| q.mu.Lock() |
| defer q.mu.Unlock() |
| if aq, ok := q.agents[agentID]; ok { |
| if aq.inflight > 0 { |
| aq.inflight-- |
| } |
| if aq.inflight == 0 && aq.tasks.Len() == 0 { |
| delete(q.agents, agentID) |
| } |
| } |
| } |
|
|
| |
| |
| func (q *TaskQueue) Remove(taskID, agentID string) bool { |
| q.mu.Lock() |
| defer q.mu.Unlock() |
|
|
| aq, ok := q.agents[agentID] |
| if !ok { |
| return false |
| } |
|
|
| for i, t := range aq.tasks { |
| if t.ID == taskID { |
| heap.Remove(&aq.tasks, i) |
| q.totalCount-- |
| if aq.tasks.Len() == 0 && aq.inflight == 0 { |
| delete(q.agents, agentID) |
| } |
| return true |
| } |
| } |
| return false |
| } |
|
|
| |
| |
| func (q *TaskQueue) ExpireDeadlined() []*Task { |
| q.mu.Lock() |
| defer q.mu.Unlock() |
|
|
| var expired []*Task |
| for agentID, aq := range q.agents { |
| var remaining taskHeap |
| for _, t := range aq.tasks { |
| if !t.Deadline.IsZero() && t.Deadline.Before(timeNow()) { |
| expired = append(expired, t) |
| q.totalCount-- |
| } else { |
| remaining = append(remaining, t) |
| } |
| } |
| if len(remaining) != len(aq.tasks) { |
| aq.tasks = remaining |
| heap.Init(&aq.tasks) |
| } |
| if aq.tasks.Len() == 0 && aq.inflight == 0 { |
| delete(q.agents, agentID) |
| } |
| } |
| return expired |
| } |
|
|
| |
| func (q *TaskQueue) Stats() QueueStats { |
| q.mu.Lock() |
| defer q.mu.Unlock() |
|
|
| s := QueueStats{ |
| TotalQueued: q.totalCount, |
| Agents: make(map[string]AgentStats, len(q.agents)), |
| } |
| for agentID, aq := range q.agents { |
| s.TotalInflight += aq.inflight |
| s.Agents[agentID] = AgentStats{ |
| Queued: aq.tasks.Len(), |
| Inflight: aq.inflight, |
| } |
| } |
| return s |
| } |
|
|
| |
| type QueueStats struct { |
| TotalQueued int `json:"totalQueued"` |
| TotalInflight int `json:"totalInflight"` |
| Agents map[string]AgentStats `json:"agents"` |
| } |
|
|
| |
| type AgentStats struct { |
| Queued int `json:"queued"` |
| Inflight int `json:"inflight"` |
| } |
|
|
| |
|
|
| |
| type taskHeap []*Task |
|
|
| func (h taskHeap) Len() int { return len(h) } |
|
|
| func (h taskHeap) Less(i, j int) bool { |
| if h[i].Priority != h[j].Priority { |
| return h[i].Priority < h[j].Priority |
| } |
| return h[i].CreatedAt.Before(h[j].CreatedAt) |
| } |
|
|
| func (h taskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
|
|
| func (h *taskHeap) Push(x any) { |
| *h = append(*h, x.(*Task)) |
| } |
|
|
| func (h *taskHeap) Pop() any { |
| old := *h |
| n := len(old) |
| t := old[n-1] |
| old[n-1] = nil |
| *h = old[:n-1] |
| return t |
| } |
|
|