| package scheduler |
|
|
| import ( |
| "testing" |
| "time" |
| ) |
|
|
| func TestQueueEnqueueDequeue(t *testing.T) { |
| q := NewTaskQueue(10, 5) |
|
|
| t1 := &Task{ID: "t1", AgentID: "a1", Priority: 5, CreatedAt: time.Now()} |
| t2 := &Task{ID: "t2", AgentID: "a1", Priority: 1, CreatedAt: time.Now()} |
|
|
| if _, err := q.Enqueue(t1); err != nil { |
| t.Fatal(err) |
| } |
| if _, err := q.Enqueue(t2); err != nil { |
| t.Fatal(err) |
| } |
|
|
| |
| got := q.Dequeue(10, 20) |
| if got == nil { |
| t.Fatal("expected a task") |
| return |
| } |
| if got.ID != "t2" { |
| t.Errorf("expected t2 (priority 1), got %s (priority %d)", got.ID, got.Priority) |
| } |
|
|
| got2 := q.Dequeue(10, 20) |
| if got2 == nil || got2.ID != "t1" { |
| t.Error("expected t1 next") |
| } |
| } |
|
|
| func TestQueueGlobalLimit(t *testing.T) { |
| q := NewTaskQueue(2, 10) |
|
|
| if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| _, err := q.Enqueue(&Task{ID: "t3", AgentID: "a1", CreatedAt: time.Now()}) |
| if err == nil { |
| t.Error("should reject when global limit reached") |
| } |
| } |
|
|
| func TestQueuePerAgentLimit(t *testing.T) { |
| q := NewTaskQueue(100, 2) |
|
|
| if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| _, err := q.Enqueue(&Task{ID: "t3", AgentID: "a1", CreatedAt: time.Now()}) |
| if err == nil { |
| t.Error("should reject when per-agent limit reached") |
| } |
|
|
| |
| if _, err := q.Enqueue(&Task{ID: "t4", AgentID: "a2", CreatedAt: time.Now()}); err != nil { |
| t.Errorf("different agent should succeed: %v", err) |
| } |
| } |
|
|
| func TestQueueFairness(t *testing.T) { |
| q := NewTaskQueue(100, 100) |
|
|
| |
| for i := range 3 { |
| if _, err := q.Enqueue(&Task{ID: "a1-" + string(rune('0'+i)), AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| } |
| if _, err := q.Enqueue(&Task{ID: "a2-0", AgentID: "a2", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
|
|
| |
| got1 := q.Dequeue(10, 20) |
| if got1 == nil { |
| t.Fatal("expected a task") |
| return |
| } |
|
|
| |
| got2 := q.Dequeue(10, 20) |
| if got2 == nil { |
| t.Fatal("expected a task") |
| return |
| } |
|
|
| |
| if got1.AgentID == got2.AgentID { |
| t.Errorf("fairness: expected different agents, got %s and %s", got1.AgentID, got2.AgentID) |
| } |
| } |
|
|
| func TestQueueInflightLimit(t *testing.T) { |
| q := NewTaskQueue(100, 100) |
|
|
| if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
|
|
| |
| got := q.Dequeue(1, 2) |
| if got == nil { |
| t.Fatal("first dequeue should work") |
| } |
|
|
| |
| got2 := q.Dequeue(1, 2) |
| if got2 != nil { |
| t.Error("should be blocked by per-agent inflight limit") |
| } |
|
|
| |
| q.Complete("a1") |
| got3 := q.Dequeue(1, 2) |
| if got3 == nil { |
| t.Error("should dequeue after completing") |
| } |
| } |
|
|
| func TestQueueRemove(t *testing.T) { |
| q := NewTaskQueue(10, 10) |
| if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
|
|
| if !q.Remove("t1", "a1") { |
| t.Error("should find and remove t1") |
| } |
| if q.Remove("t1", "a1") { |
| t.Error("removing again should return false") |
| } |
|
|
| |
| got := q.Dequeue(10, 20) |
| if got == nil || got.ID != "t2" { |
| t.Error("expected t2 remaining") |
| } |
| } |
|
|
| func TestQueueExpireDeadlined(t *testing.T) { |
| old := timeNow |
| defer func() { timeNow = old }() |
|
|
| now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) |
| timeNow = func() time.Time { return now } |
|
|
| q := NewTaskQueue(10, 10) |
|
|
| pastDeadline := &Task{ |
| ID: "expired", AgentID: "a1", CreatedAt: now, |
| Deadline: now.Add(-1 * time.Second), |
| } |
| futureDeadline := &Task{ |
| ID: "valid", AgentID: "a1", CreatedAt: now, |
| Deadline: now.Add(1 * time.Hour), |
| } |
| if _, err := q.Enqueue(pastDeadline); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| if _, err := q.Enqueue(futureDeadline); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
|
|
| expired := q.ExpireDeadlined() |
| if len(expired) != 1 || expired[0].ID != "expired" { |
| t.Errorf("expected 1 expired task, got %d", len(expired)) |
| } |
|
|
| stats := q.Stats() |
| if stats.TotalQueued != 1 { |
| t.Errorf("expected 1 remaining, got %d", stats.TotalQueued) |
| } |
| } |
|
|
| func TestQueueStats(t *testing.T) { |
| q := NewTaskQueue(100, 100) |
| if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a2", CreatedAt: time.Now()}); err != nil { |
| t.Fatalf("enqueue failed: %v", err) |
| } |
| q.Dequeue(10, 20) |
|
|
| stats := q.Stats() |
| if stats.TotalQueued != 1 { |
| t.Errorf("expected 1 queued, got %d", stats.TotalQueued) |
| } |
| if stats.TotalInflight != 1 { |
| t.Errorf("expected 1 inflight, got %d", stats.TotalInflight) |
| } |
| if len(stats.Agents) != 2 { |
| t.Errorf("expected 2 agents, got %d", len(stats.Agents)) |
| } |
| } |
|
|