WitNote / internal /scheduler /queue_test.go
AUXteam's picture
Upload folder using huggingface_hub
6a7089a verified
package scheduler
import (
"testing"
"time"
)
func TestQueueEnqueueDequeue(t *testing.T) {
q := NewTaskQueue(10, 5)
t1 := &Task{ID: "t1", AgentID: "a1", Priority: 5, CreatedAt: time.Now()}
t2 := &Task{ID: "t2", AgentID: "a1", Priority: 1, CreatedAt: time.Now()}
if _, err := q.Enqueue(t1); err != nil {
t.Fatal(err)
}
if _, err := q.Enqueue(t2); err != nil {
t.Fatal(err)
}
// t2 has higher priority (lower number), should come first.
got := q.Dequeue(10, 20)
if got == nil {
t.Fatal("expected a task")
return
}
if got.ID != "t2" {
t.Errorf("expected t2 (priority 1), got %s (priority %d)", got.ID, got.Priority)
}
got2 := q.Dequeue(10, 20)
if got2 == nil || got2.ID != "t1" {
t.Error("expected t1 next")
}
}
func TestQueueGlobalLimit(t *testing.T) {
q := NewTaskQueue(2, 10)
if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
_, err := q.Enqueue(&Task{ID: "t3", AgentID: "a1", CreatedAt: time.Now()})
if err == nil {
t.Error("should reject when global limit reached")
}
}
func TestQueuePerAgentLimit(t *testing.T) {
q := NewTaskQueue(100, 2)
if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
_, err := q.Enqueue(&Task{ID: "t3", AgentID: "a1", CreatedAt: time.Now()})
if err == nil {
t.Error("should reject when per-agent limit reached")
}
// Different agent should still work.
if _, err := q.Enqueue(&Task{ID: "t4", AgentID: "a2", CreatedAt: time.Now()}); err != nil {
t.Errorf("different agent should succeed: %v", err)
}
}
func TestQueueFairness(t *testing.T) {
q := NewTaskQueue(100, 100)
// Agent a1 has 3 tasks, a2 has 1 task.
for i := range 3 {
if _, err := q.Enqueue(&Task{ID: "a1-" + string(rune('0'+i)), AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
}
if _, err := q.Enqueue(&Task{ID: "a2-0", AgentID: "a2", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
// First dequeue: both have 0 inflight, either could be picked.
got1 := q.Dequeue(10, 20)
if got1 == nil {
t.Fatal("expected a task")
return
}
// Now one agent has 1 inflight. The agent with 0 should be preferred.
got2 := q.Dequeue(10, 20)
if got2 == nil {
t.Fatal("expected a task")
return
}
// The two dequeued tasks should be from different agents (fairness).
if got1.AgentID == got2.AgentID {
t.Errorf("fairness: expected different agents, got %s and %s", got1.AgentID, got2.AgentID)
}
}
func TestQueueInflightLimit(t *testing.T) {
q := NewTaskQueue(100, 100)
if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
// Dequeue with max inflight = 1
got := q.Dequeue(1, 2)
if got == nil {
t.Fatal("first dequeue should work")
}
// Second dequeue should be blocked (per-agent limit).
got2 := q.Dequeue(1, 2)
if got2 != nil {
t.Error("should be blocked by per-agent inflight limit")
}
// Complete the first, now second should work.
q.Complete("a1")
got3 := q.Dequeue(1, 2)
if got3 == nil {
t.Error("should dequeue after completing")
}
}
func TestQueueRemove(t *testing.T) {
q := NewTaskQueue(10, 10)
if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
if !q.Remove("t1", "a1") {
t.Error("should find and remove t1")
}
if q.Remove("t1", "a1") {
t.Error("removing again should return false")
}
// Only t2 should remain.
got := q.Dequeue(10, 20)
if got == nil || got.ID != "t2" {
t.Error("expected t2 remaining")
}
}
func TestQueueExpireDeadlined(t *testing.T) {
old := timeNow
defer func() { timeNow = old }()
now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
timeNow = func() time.Time { return now }
q := NewTaskQueue(10, 10)
pastDeadline := &Task{
ID: "expired", AgentID: "a1", CreatedAt: now,
Deadline: now.Add(-1 * time.Second),
}
futureDeadline := &Task{
ID: "valid", AgentID: "a1", CreatedAt: now,
Deadline: now.Add(1 * time.Hour),
}
if _, err := q.Enqueue(pastDeadline); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
if _, err := q.Enqueue(futureDeadline); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
expired := q.ExpireDeadlined()
if len(expired) != 1 || expired[0].ID != "expired" {
t.Errorf("expected 1 expired task, got %d", len(expired))
}
stats := q.Stats()
if stats.TotalQueued != 1 {
t.Errorf("expected 1 remaining, got %d", stats.TotalQueued)
}
}
func TestQueueStats(t *testing.T) {
q := NewTaskQueue(100, 100)
if _, err := q.Enqueue(&Task{ID: "t1", AgentID: "a1", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
if _, err := q.Enqueue(&Task{ID: "t2", AgentID: "a2", CreatedAt: time.Now()}); err != nil {
t.Fatalf("enqueue failed: %v", err)
}
q.Dequeue(10, 20) // a1 or a2 gets 1 inflight
stats := q.Stats()
if stats.TotalQueued != 1 {
t.Errorf("expected 1 queued, got %d", stats.TotalQueued)
}
if stats.TotalInflight != 1 {
t.Errorf("expected 1 inflight, got %d", stats.TotalInflight)
}
if len(stats.Agents) != 2 {
t.Errorf("expected 2 agents, got %d", len(stats.Agents))
}
}