// Copyright 2023 Woodpecker Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package queue import ( "context" "errors" "fmt" "sync" "testing" "time" "github.com/stretchr/testify/assert" "go.woodpecker-ci.org/woodpecker/v3/server/model" ) var ( filterFnTrue = func(*model.Task) (bool, int) { return true, 1 } genDummyTask = func() *model.Task { return &model.Task{ ID: "1", Data: []byte("{}"), } } waitForProcess = func() { time.Sleep(processTimeInterval + 50*time.Millisecond) } ) func setupTestQueue(t *testing.T) (context.Context, context.CancelCauseFunc, *fifo) { ctx, cancel := context.WithCancelCause(t.Context()) t.Cleanup(func() { cancel(nil) }) q, _ := NewMemoryQueue(ctx).(*fifo) if q == nil { t.Fatal("Failed to create queue") } return ctx, cancel, q } func TestFifoBasicOperations(t *testing.T) { ctx, cancel, q := setupTestQueue(t) defer cancel(nil) t.Run("push poll done lifecycle", func(t *testing.T) { dummyTask := genDummyTask() assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() info := q.Info(ctx) assert.Len(t, info.Pending, 1) got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, dummyTask, got) waitForProcess() info = q.Info(ctx) assert.Len(t, info.Pending, 0) assert.Len(t, info.Running, 1) // Edge case: verify task can't be polled again while running pollCtx, pollCancel := context.WithTimeout(ctx, 100*time.Millisecond) _, err = q.Poll(pollCtx, 2, filterFnTrue) pollCancel() assert.Error(t, err) // Should timeout/cancel, not return the same task assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) waitForProcess() info = q.Info(ctx) assert.Len(t, info.Running, 0) // Edge case: Done on already completed task should handle gracefully err = q.Done(ctx, got.ID, model.StatusSuccess) // Document current behavior - should either error or be idempotent if err != nil { assert.Error(t, err) } }) t.Run("error handling", func(t *testing.T) { task1 := &model.Task{ID: "task-error-1"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1})) waitForProcess() got, _ := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("test error"))) waitForProcess() info := q.Info(ctx) assert.Len(t, info.Running, 0) assert.Error(t, q.Error(ctx, "totally-fake-id", fmt.Errorf("test error"))) // Edge case: Error on task that's already errored err := q.Error(ctx, got.ID, fmt.Errorf("double error")) // Should either error or be idempotent if err != nil { assert.Error(t, err) } }) t.Run("external error filtered by Wait", func(t *testing.T) { // Test that external errors (from Error/ErrorAtOnce) are wrapped as ErrExternal // and filtered out by Wait(), while internal errors like context cancellation // are passed through // Test 1: External error is filtered by Wait task1 := &model.Task{ID: "wait-external-1"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1})) waitForProcess() got1, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) // Start waiting on the task waitDone := make(chan error, 1) go func() { waitDone <- q.Wait(ctx, got1.ID) }() time.Sleep(10 * time.Millisecond) // Report an external error (agent reported error) externalErr := fmt.Errorf("agent reported error") assert.NoError(t, q.Error(ctx, got1.ID, externalErr)) // Wait should return nil (external error filtered out) select { case err := <-waitDone: assert.NoError(t, err, "Wait should filter ErrExternal and return nil") case <-time.After(time.Second): t.Fatal("Wait should have returned") } // Test 2: Internal error (context cancellation) passes through Wait task2 := &model.Task{ID: "wait-internal-1"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2})) waitForProcess() got2, err := q.Poll(ctx, 2, filterFnTrue) assert.NoError(t, err) waitCtx, waitCancel := context.WithCancelCause(ctx) waitDone2 := make(chan error, 1) go func() { waitDone2 <- q.Wait(waitCtx, got2.ID) }() time.Sleep(10 * time.Millisecond) waitCancel(nil) // Context cancellation should cause Wait to return (internal error handling) select { case err := <-waitDone2: // Wait returns nil when context is canceled (normal behavior) assert.NoError(t, err, "Wait should return nil when context is canceled") case <-time.After(time.Second): t.Fatal("Wait should return when context is canceled") } // Clean up assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) waitForProcess() // Test 3: Multiple waiters all get nil when external error occurs task3 := &model.Task{ID: "wait-multi-1"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task3})) waitForProcess() got3, err := q.Poll(ctx, 3, filterFnTrue) assert.NoError(t, err) // Start multiple waiters numWaiters := 3 waitResults := make(chan error, numWaiters) for i := 0; i < numWaiters; i++ { go func() { waitResults <- q.Wait(ctx, got3.ID) }() } time.Sleep(10 * time.Millisecond) // Report an external error batchErr := fmt.Errorf("external batch failure") assert.NoError(t, q.ErrorAtOnce(ctx, []string{got3.ID}, batchErr)) // All waiters should return nil (external error filtered) for i := 0; i < numWaiters; i++ { select { case err := <-waitResults: assert.NoError(t, err, "All waiters should get nil when ErrExternal is filtered") case <-time.After(time.Second): t.Fatalf("Waiter %d didn't return in time", i) } } }) t.Run("error at once", func(t *testing.T) { task1 := &model.Task{ID: "batch-1"} task2 := &model.Task{ID: "batch-2"} task3 := &model.Task{ID: "batch-3"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1, task2, task3})) waitForProcess() got1, _ := q.Poll(ctx, 1, filterFnTrue) got2, _ := q.Poll(ctx, 2, filterFnTrue) assert.NoError(t, q.ErrorAtOnce(ctx, []string{got1.ID, got2.ID}, fmt.Errorf("batch error"))) waitForProcess() info := q.Info(ctx) assert.Len(t, info.Running, 0) assert.Len(t, info.Pending, 1) got3, _ := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, q.Done(ctx, got3.ID, model.StatusSuccess)) waitForProcess() task4 := &model.Task{ID: "batch-4"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task4})) waitForProcess() got4, _ := q.Poll(ctx, 1, filterFnTrue) err := q.ErrorAtOnce(ctx, []string{got4.ID, "fake-1", "fake-2"}, fmt.Errorf("test")) assert.Error(t, err) assert.ErrorIs(t, err, ErrNotFound) waitForProcess() info = q.Info(ctx) assert.Len(t, info.Running, 0) // Edge case: ErrorAtOnce with empty slice err = q.ErrorAtOnce(ctx, []string{}, fmt.Errorf("no tasks")) assert.NoError(t, err) // Should handle gracefully, potentially no-op // Edge case: ErrorAtOnce with nil error task5 := &model.Task{ID: "batch-5"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task5})) waitForProcess() got5, _ := q.Poll(ctx, 3, filterFnTrue) err = q.ErrorAtOnce(ctx, []string{got5.ID}, nil) assert.NoError(t, err) // Should handle nil error gracefully waitForProcess() }) t.Run("error at once with waiting deps", func(t *testing.T) { task5 := &model.Task{ID: "deps-cancel-5"} task6 := &model.Task{ ID: "deps-cancel-6", Dependencies: []string{"deps-cancel-5"}, DepStatus: make(map[string]model.StatusValue), } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task5, task6})) waitForProcess() info := q.Info(ctx) assert.Equal(t, 1, info.Stats.WaitingOnDeps) assert.NoError(t, q.ErrorAtOnce(ctx, []string{"deps-cancel-5", "deps-cancel-6"}, fmt.Errorf("canceled"))) waitForProcess() info = q.Info(ctx) assert.Equal(t, 0, info.Stats.WaitingOnDeps) assert.Len(t, info.Pending, 0) // Edge case: verify both tasks are actually gone, not stuck somewhere assert.Len(t, info.Running, 0) assert.Len(t, info.WaitingOnDeps, 0) }) t.Run("error at once cancellation", func(t *testing.T) { task1 := &model.Task{ID: "cancel-prop-1"} task2 := &model.Task{ ID: "cancel-prop-2", Dependencies: []string{"cancel-prop-1"}, DepStatus: make(map[string]model.StatusValue), RunOn: []string{"success", "failure"}, } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1, task2})) waitForProcess() got1, _ := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, q.ErrorAtOnce(ctx, []string{got1.ID}, ErrCancel)) waitForProcess() waitForProcess() got2, _ := q.Poll(ctx, 2, filterFnTrue) assert.Equal(t, model.StatusKilled, got2.DepStatus["cancel-prop-1"]) // Edge case: verify ErrCancel results in StatusKilled not StatusFailure assert.NotEqual(t, model.StatusFailure, got2.DepStatus["cancel-prop-1"]) assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) waitForProcess() }) t.Run("pause resume", func(t *testing.T) { dummyTask := &model.Task{ID: "pause-1"} var wg sync.WaitGroup wg.Add(1) go func() { _, _ = q.Poll(ctx, 99, filterFnTrue) wg.Done() }() q.Pause() t0 := time.Now() assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() // Edge case: verify queue is actually paused info := q.Info(ctx) assert.True(t, info.Paused) assert.Len(t, info.Pending, 1) assert.Len(t, info.Running, 0) q.Resume() wg.Wait() assert.Greater(t, time.Since(t0), 20*time.Millisecond) // Edge case: verify queue is unpaused info = q.Info(ctx) assert.False(t, info.Paused) // Edge case: multiple pause/resume cycles task2 := &model.Task{ID: "pause-2"} q.Pause() q.Pause() // Double pause assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2})) waitForProcess() q.Resume() q.Resume() // Double resume waitForProcess() got, _ := q.Poll(ctx, 99, filterFnTrue) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) waitForProcess() }) } func TestFifoDependencies(t *testing.T) { ctx, cancel, q := setupTestQueue(t) defer cancel(nil) t.Run("basic dependency handling", func(t *testing.T) { task1 := &model.Task{ID: "dep-basic-1"} task2 := &model.Task{ ID: "dep-basic-2", Dependencies: []string{"dep-basic-1"}, DepStatus: make(map[string]model.StatusValue), } task3 := &model.Task{ ID: "dep-basic-3", Dependencies: []string{"dep-basic-1"}, DepStatus: make(map[string]model.StatusValue), RunOn: []string{"success", "failure"}, } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) waitForProcess() info := q.Info(ctx) assert.Equal(t, 2, info.Stats.WaitingOnDeps) got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task1, got) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1"))) waitForProcess() got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task2, got) assert.False(t, got.ShouldRun()) assert.Equal(t, model.StatusFailure, got.DepStatus["dep-basic-1"]) waitForProcess() got, err = q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) assert.Equal(t, task3, got) assert.True(t, got.ShouldRun()) assert.Equal(t, model.StatusFailure, got.DepStatus["dep-basic-1"]) waitForProcess() info = q.Info(ctx) assert.Equal(t, 0, info.Stats.WaitingOnDeps) // Edge case: verify DepStatus is correctly set before polling assert.NotEmpty(t, task2.DepStatus) assert.NotEmpty(t, task3.DepStatus) }) t.Run("multiple dependencies", func(t *testing.T) { task1 := &model.Task{ID: "multi-dep-1"} task2 := &model.Task{ID: "multi-dep-2"} task3 := &model.Task{ ID: "multi-dep-3", Dependencies: []string{"multi-dep-1", "multi-dep-2"}, DepStatus: make(map[string]model.StatusValue), } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) waitForProcess() got1, _ := q.Poll(ctx, 1, filterFnTrue) got2, _ := q.Poll(ctx, 2, filterFnTrue) gotIDs := map[string]bool{got1.ID: true, got2.ID: true} assert.True(t, gotIDs["multi-dep-1"] && gotIDs["multi-dep-2"]) if got1.ID == "multi-dep-1" { assert.NoError(t, q.Done(ctx, got1.ID, model.StatusSuccess)) assert.NoError(t, q.Error(ctx, got2.ID, fmt.Errorf("failed"))) } else { assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) assert.NoError(t, q.Error(ctx, got1.ID, fmt.Errorf("failed"))) } waitForProcess() got3, err := q.Poll(ctx, 3, filterFnTrue) assert.NoError(t, err) assert.Contains(t, got3.DepStatus, "multi-dep-1") assert.Contains(t, got3.DepStatus, "multi-dep-2") assert.True(t, (got3.DepStatus["multi-dep-1"] == model.StatusSuccess && got3.DepStatus["multi-dep-2"] == model.StatusFailure) || (got3.DepStatus["multi-dep-1"] == model.StatusFailure && got3.DepStatus["multi-dep-2"] == model.StatusSuccess)) assert.False(t, got3.ShouldRun()) // Edge case: verify both deps are tracked assert.Len(t, got3.DepStatus, 2) assert.NoError(t, q.Done(ctx, got3.ID, model.StatusSkipped)) waitForProcess() }) t.Run("transitive dependencies", func(t *testing.T) { task1 := &model.Task{ID: "trans-1"} task2 := &model.Task{ ID: "trans-2", Dependencies: []string{"trans-1"}, DepStatus: make(map[string]model.StatusValue), } task3 := &model.Task{ ID: "trans-3", Dependencies: []string{"trans-2"}, DepStatus: make(map[string]model.StatusValue), } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1})) waitForProcess() got, _ := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, q.Error(ctx, got.ID, fmt.Errorf("exit code 1"))) waitForProcess() got, _ = q.Poll(ctx, 2, filterFnTrue) assert.False(t, got.ShouldRun()) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped)) waitForProcess() got, _ = q.Poll(ctx, 3, filterFnTrue) assert.Equal(t, model.StatusSkipped, got.DepStatus["trans-2"]) assert.False(t, got.ShouldRun()) // Edge case: verify transitive failure propagates correctly // task3 should see trans-2 as skipped, not trans-1's status assert.NotContains(t, got.DepStatus, "trans-1") assert.NoError(t, q.Done(ctx, got.ID, model.StatusSkipped)) waitForProcess() }) t.Run("dependency status propagation", func(t *testing.T) { task1 := &model.Task{ID: "prop-1"} task2 := &model.Task{ ID: "prop-2", Dependencies: []string{"prop-1"}, DepStatus: make(map[string]model.StatusValue), } task3 := &model.Task{ ID: "prop-3", Dependencies: []string{"prop-1"}, DepStatus: make(map[string]model.StatusValue), RunOn: []string{"success", "failure"}, } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1, task2, task3})) waitForProcess() info := q.Info(ctx) assert.Equal(t, 2, info.Stats.WaitingOnDeps) got1, _ := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, q.Done(ctx, got1.ID, model.StatusSuccess)) waitForProcess() got2, _ := q.Poll(ctx, 2, filterFnTrue) got3, _ := q.Poll(ctx, 3, filterFnTrue) assert.Equal(t, model.StatusSuccess, got2.DepStatus["prop-1"]) assert.Equal(t, model.StatusSuccess, got3.DepStatus["prop-1"]) // Edge case: verify both tasks can be polled concurrently assert.NotEqual(t, got2.ID, got3.ID) assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) assert.NoError(t, q.Done(ctx, got3.ID, model.StatusSuccess)) waitForProcess() task4 := &model.Task{ID: "prop-4"} task5 := &model.Task{ ID: "prop-5", Dependencies: []string{"prop-4"}, DepStatus: make(map[string]model.StatusValue), } task6 := &model.Task{ ID: "prop-6", Dependencies: []string{"prop-4"}, DepStatus: make(map[string]model.StatusValue), RunOn: []string{"success", "failure"}, } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task4, task5, task6})) waitForProcess() got4, _ := q.Poll(ctx, 4, filterFnTrue) assert.NoError(t, q.Error(ctx, got4.ID, fmt.Errorf("failed"))) waitForProcess() got5, _ := q.Poll(ctx, 5, filterFnTrue) assert.Equal(t, model.StatusFailure, got5.DepStatus["prop-4"]) assert.False(t, got5.ShouldRun()) got6, _ := q.Poll(ctx, 6, filterFnTrue) assert.Equal(t, model.StatusFailure, got6.DepStatus["prop-4"]) assert.True(t, got6.ShouldRun()) // Edge case: complete dependent tasks assert.NoError(t, q.Done(ctx, got5.ID, model.StatusSkipped)) assert.NoError(t, q.Done(ctx, got6.ID, model.StatusSuccess)) waitForProcess() }) // Edge case: circular dependency detection (should be handled or cause issue) t.Run("circular dependencies", func(t *testing.T) { task1 := &model.Task{ ID: "circ-1", Dependencies: []string{"circ-2"}, DepStatus: make(map[string]model.StatusValue), } task2 := &model.Task{ ID: "circ-2", Dependencies: []string{"circ-1"}, DepStatus: make(map[string]model.StatusValue), } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1, task2})) waitForProcess() info := q.Info(ctx) // Both should be waiting on deps - this is a deadlock scenario assert.Equal(t, 2, info.Stats.WaitingOnDeps) assert.Len(t, info.Pending, 0) // Verify they never become available for polling pollCtx, pollCancel := context.WithTimeout(ctx, 200*time.Millisecond) _, err := q.Poll(pollCtx, 99, filterFnTrue) pollCancel() assert.Error(t, err) // Should timeout // Clean up the deadlocked tasks assert.NoError(t, q.ErrorAtOnce(ctx, []string{"circ-1", "circ-2"}, fmt.Errorf("circular dep"))) waitForProcess() }) // Edge case: dependency on non-existent task // NOTE: This reveals a potential issue - the queue doesn't validate dependencies exist. // If a dependency was never added to the queue, the task will run immediately since // depsInQueue() only checks currently pending/running tasks, not if deps will arrive. t.Run("non-existent dependency", func(t *testing.T) { task1 := &model.Task{ ID: "orphan-1", Dependencies: []string{"does-not-exist"}, DepStatus: make(map[string]model.StatusValue), } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1})) waitForProcess() info := q.Info(ctx) // Current implementation: task doesn't wait if dependency not in queue // This means tasks with typos in dependency names will run immediately! assert.Equal(t, 0, info.Stats.WaitingOnDeps) assert.Len(t, info.Pending, 1) // Task will be available for polling even though dependency doesn't exist got, err := q.Poll(ctx, 99, filterFnTrue) assert.NoError(t, err) assert.Equal(t, "orphan-1", got.ID) // DepStatus will be empty since dependency never completed assert.Empty(t, got.DepStatus) // Clean up assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) waitForProcess() }) // Edge case: dependency added AFTER dependent task (race condition) t.Run("dependency added after dependent", func(t *testing.T) { // Push dependent task first dependent := &model.Task{ ID: "late-dep-child", Dependencies: []string{"late-dep-parent"}, DepStatus: make(map[string]model.StatusValue), } assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dependent})) waitForProcess() // At this point, dependent doesn't see parent in queue, so it won't wait info := q.Info(ctx) // Dependent should NOT be waiting since parent doesn't exist yet initialWaiting := info.Stats.WaitingOnDeps // Now add the parent task parent := &model.Task{ID: "late-dep-parent"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{parent})) waitForProcess() // After filterWaiting runs, dependent SHOULD now see parent and wait info = q.Info(ctx) // The implementation calls filterWaiting() which rechecks dependencies // So dependent should now be waiting assert.Greater(t, info.Stats.WaitingOnDeps, initialWaiting, "dependent should start waiting once parent is added") // Complete parent first gotParent, _ := q.Poll(ctx, 1, filterFnTrue) assert.Equal(t, "late-dep-parent", gotParent.ID, "parent should be polled first") assert.NoError(t, q.Done(ctx, gotParent.ID, model.StatusSuccess)) waitForProcess() // Now child should be unblocked with parent's status gotChild, _ := q.Poll(ctx, 2, filterFnTrue) assert.Equal(t, "late-dep-child", gotChild.ID) assert.Equal(t, model.StatusSuccess, gotChild.DepStatus["late-dep-parent"]) assert.NoError(t, q.Done(ctx, gotChild.ID, model.StatusSuccess)) waitForProcess() }) } func TestFifoLeaseManagement(t *testing.T) { ctx, cancel, q := setupTestQueue(t) defer cancel(nil) t.Run("lease expiration", func(t *testing.T) { q.extension = 0 t.Cleanup(func() { q.extension = 50 * time.Millisecond }) dummyTask := &model.Task{ID: "lease-exp-1"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() got, err := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, err) errCh := make(chan error, 1) go func() { errCh <- q.Wait(ctx, got.ID) }() waitForProcess() select { case werr := <-errCh: assert.Error(t, werr) // Edge case: verify error is ErrTaskExpired assert.ErrorIs(t, werr, ErrTaskExpired) case <-time.After(2 * time.Second): t.Fatal("timeout waiting for Wait to return") } info := q.Info(ctx) assert.Len(t, info.Pending, 1) // Edge case: verify task was resubmitted to front of queue got2, _ := q.Poll(ctx, 1, filterFnTrue) assert.Equal(t, got.ID, got2.ID) // Same task resubmitted assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) waitForProcess() // Verify cleanup info = q.Info(ctx) assert.Len(t, info.Pending, 0) assert.Len(t, info.Running, 0) }) t.Run("extend lease", func(t *testing.T) { q.extension = 50 * time.Millisecond dummyTask := &model.Task{ID: "extend-1"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() got, _ := q.Poll(ctx, 5, filterFnTrue) assert.NoError(t, q.Extend(ctx, 5, got.ID)) assert.ErrorIs(t, q.Extend(ctx, 999, got.ID), ErrAgentMissMatch) assert.ErrorIs(t, q.Extend(ctx, 1, got.ID), ErrAgentMissMatch) assert.ErrorIs(t, q.Extend(ctx, 1, "non-existent"), ErrNotFound) // Edge case: extend multiple times rapidly for i := 0; i < 3; i++ { time.Sleep(30 * time.Millisecond) assert.NoError(t, q.Extend(ctx, 5, got.ID)) } info := q.Info(ctx) assert.Len(t, info.Running, 1) assert.Len(t, info.Pending, 0) // Edge case: extend after Done should error assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) waitForProcess() assert.ErrorIs(t, q.Extend(ctx, 5, got.ID), ErrNotFound) // Verify cleanup info = q.Info(ctx) assert.Len(t, info.Pending, 0) assert.Len(t, info.Running, 0) }) t.Run("wait operations", func(t *testing.T) { // Verify queue is clean before starting info := q.Info(ctx) assert.Len(t, info.Pending, 0, "queue should be empty at start of wait operations") assert.Len(t, info.Running, 0, "queue should be empty at start of wait operations") dummyTask := &model.Task{ID: "wait-1"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask})) waitForProcess() got, _ := q.Poll(ctx, 1, filterFnTrue) var wg sync.WaitGroup wg.Add(1) go func() { assert.NoError(t, q.Wait(ctx, got.ID)) wg.Done() }() time.Sleep(time.Millisecond) assert.NoError(t, q.Done(ctx, got.ID, model.StatusSuccess)) wg.Wait() // Edge case: Wait on non-existent task should return immediately assert.NoError(t, q.Wait(ctx, "non-existent")) dummyTask2 := &model.Task{ID: "wait-2"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask2})) waitForProcess() got2, _ := q.Poll(ctx, 1, filterFnTrue) waitCtx, waitCancel := context.WithCancelCause(ctx) errCh := make(chan error, 1) go func() { errCh <- q.Wait(waitCtx, got2.ID) }() time.Sleep(50 * time.Millisecond) waitCancel(nil) select { case err := <-errCh: assert.NoError(t, err) case <-time.After(time.Second): t.Fatal("Wait should return when context is canceled") } // Clean up - complete the second wait task assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) waitForProcess() // Edge case: multiple concurrent waits on same task dummyTask3 := &model.Task{ID: "wait-3"} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask3})) waitForProcess() got3, _ := q.Poll(ctx, 1, filterFnTrue) var wg2 sync.WaitGroup wg2.Add(3) for i := 0; i < 3; i++ { go func() { assert.NoError(t, q.Wait(ctx, got3.ID)) wg2.Done() }() } time.Sleep(10 * time.Millisecond) assert.NoError(t, q.Done(ctx, got3.ID, model.StatusSuccess)) wg2.Wait() // Verify cleanup info = q.Info(ctx) assert.Len(t, info.Pending, 0) assert.Len(t, info.Running, 0) }) } func TestFifoWorkerManagement(t *testing.T) { ctx, cancel, q := setupTestQueue(t) defer cancel(nil) t.Run("poll with context cancellation", func(t *testing.T) { pollCtx, pollCancel := context.WithCancelCause(ctx) errCh := make(chan error, 1) go func() { _, err := q.Poll(pollCtx, 1, filterFnTrue) errCh <- err }() time.Sleep(50 * time.Millisecond) pollCancel(nil) select { case err := <-errCh: assert.ErrorIs(t, err, context.Canceled) case <-time.After(time.Second): t.Fatal("Poll should return when context is canceled") } // Edge case: verify worker is cleaned up info := q.Info(ctx) assert.Equal(t, 0, info.Stats.Workers) }) t.Run("kick agent workers", func(t *testing.T) { pollResults := make(chan error, 5) for i := 0; i < 5; i++ { go func() { _, err := q.Poll(ctx, 42, filterFnTrue) pollResults <- err }() } time.Sleep(50 * time.Millisecond) // Edge case: verify workers are registered before kicking info := q.Info(ctx) assert.Equal(t, 5, info.Stats.Workers) q.KickAgentWorkers(42) kickedCount := 0 for i := 0; i < 5; i++ { select { case err := <-pollResults: if errors.Is(err, context.Canceled) { kickedCount++ } case <-time.After(time.Second): t.Fatal("expected all workers to be kicked") } } assert.Equal(t, 5, kickedCount) // Edge case: verify workers are removed after kicking waitForProcess() info = q.Info(ctx) assert.Equal(t, 0, info.Stats.Workers) // Edge case: kick non-existent agent should be no-op q.KickAgentWorkers(999) }) // Edge case: mixed agent workers t.Run("kick specific agent among multiple", func(t *testing.T) { pollResults := make(chan struct { agentID int64 err error }, 10) // Start workers for agent 1 for i := 0; i < 3; i++ { go func() { _, err := q.Poll(ctx, 1, filterFnTrue) pollResults <- struct { agentID int64 err error }{1, err} }() } // Start workers for agent 2 for i := 0; i < 3; i++ { go func() { _, err := q.Poll(ctx, 2, filterFnTrue) pollResults <- struct { agentID int64 err error }{2, err} }() } time.Sleep(50 * time.Millisecond) info := q.Info(ctx) assert.Equal(t, 6, info.Stats.Workers) // Kick only agent 1 q.KickAgentWorkers(1) kickedAgent1 := 0 kickedAgent2 := 0 for i := 0; i < 3; i++ { select { case result := <-pollResults: if errors.Is(result.err, context.Canceled) { if result.agentID == 1 { kickedAgent1++ } else { kickedAgent2++ } } case <-time.After(time.Second): t.Fatal("expected kicked workers to return") } } assert.Equal(t, 3, kickedAgent1) assert.Equal(t, 0, kickedAgent2) // Clean up agent 2 workers q.KickAgentWorkers(2) for i := 0; i < 3; i++ { <-pollResults } }) } func TestFifoLabelBasedScoring(t *testing.T) { ctx, cancel := context.WithCancelCause(t.Context()) defer cancel(nil) q := NewMemoryQueue(ctx) tasks := []*model.Task{ {ID: "1", Labels: map[string]string{"org-id": "123", "platform": "linux"}}, {ID: "2", Labels: map[string]string{"org-id": "456", "platform": "linux"}}, {ID: "3", Labels: map[string]string{"org-id": "123", "platform": "windows"}}, } assert.NoError(t, q.PushAtOnce(ctx, tasks)) filter123 := func(task *model.Task) (bool, int) { if task.Labels["org-id"] == "123" { return true, 20 } return true, 1 } filter456 := func(task *model.Task) (bool, int) { if task.Labels["org-id"] == "456" { return true, 20 } return true, 1 } results := make(chan *model.Task, 2) go func() { task, _ := q.Poll(ctx, 1, filter123) results <- task }() go func() { task, _ := q.Poll(ctx, 2, filter456) results <- task }() receivedTasks := make(map[string]int64) for i := 0; i < 2; i++ { select { case task := <-results: receivedTasks[task.ID] = task.AgentID case <-time.After(time.Second): t.Fatal("Timeout waiting for tasks") } } assert.Contains(t, []string{"1", "3"}, findTaskByAgent(receivedTasks, 1)) assert.Equal(t, "2", findTaskByAgent(receivedTasks, 2)) // Edge case: filter that rejects all tasks filterRejectAll := func(task *model.Task) (bool, int) { return false, 0 } task4 := &model.Task{ID: "4", Labels: map[string]string{"org-id": "789"}} assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task4})) waitForProcess() pollCtx, pollCancel := context.WithTimeout(ctx, 200*time.Millisecond) _, err := q.Poll(pollCtx, 99, filterRejectAll) pollCancel() assert.Error(t, err) // Should timeout as filter rejects task // Clean up remaining tasks task3, _ := q.Poll(ctx, 1, filterFnTrue) assert.NoError(t, q.Done(ctx, task3.ID, model.StatusSuccess)) task4Got, _ := q.Poll(ctx, 99, filterFnTrue) assert.NoError(t, q.Done(ctx, task4Got.ID, model.StatusSuccess)) waitForProcess() } func TestShouldRunLogic(t *testing.T) { tests := []struct { name string depStatus model.StatusValue runOn []string expected bool }{ {"Success without RunOn", model.StatusSuccess, nil, true}, {"Failure without RunOn", model.StatusFailure, nil, false}, {"Success with failure RunOn", model.StatusSuccess, []string{"failure"}, false}, {"Failure with failure RunOn", model.StatusFailure, []string{"failure"}, true}, {"Success with both RunOn", model.StatusSuccess, []string{"success", "failure"}, true}, {"Skipped without RunOn", model.StatusSkipped, nil, false}, {"Skipped with failure RunOn", model.StatusSkipped, []string{"failure"}, true}, // Edge cases {"Killed without RunOn", model.StatusKilled, nil, false}, {"Killed with failure RunOn", model.StatusKilled, []string{"failure"}, true}, {"Success with success RunOn only", model.StatusSuccess, []string{"success"}, true}, {"Failure with success RunOn only", model.StatusFailure, []string{"success"}, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { task := &model.Task{ ID: "2", Dependencies: []string{"1"}, DepStatus: map[string]model.StatusValue{"1": tt.depStatus}, RunOn: tt.runOn, } assert.Equal(t, tt.expected, task.ShouldRun()) }) } // Edge case: multiple dependencies with mixed statuses t.Run("multiple deps mixed status", func(t *testing.T) { task := &model.Task{ ID: "3", Dependencies: []string{"1", "2"}, DepStatus: map[string]model.StatusValue{ "1": model.StatusSuccess, "2": model.StatusFailure, }, RunOn: nil, } // With default RunOn (nil), needs all deps successful assert.False(t, task.ShouldRun()) task.RunOn = []string{"success", "failure"} // With both RunOn, should run regardless assert.True(t, task.ShouldRun()) }) } func findTaskByAgent(tasks map[string]int64, agentID int64) string { for taskID, aid := range tasks { if aid == agentID { return taskID } } return "" }