| package bridge |
|
|
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "runtime" |
| "sync" |
| "sync/atomic" |
| "testing" |
| "time" |
| ) |
|
|
| func TestDefaultMaxParallel(t *testing.T) { |
| n := DefaultMaxParallel() |
| if n < 1 { |
| t.Errorf("DefaultMaxParallel must be >= 1, got %d", n) |
| } |
| expected := runtime.NumCPU() * 2 |
| if expected > 8 { |
| expected = 8 |
| } |
| if n != expected { |
| t.Errorf("DefaultMaxParallel = %d, want %d (NumCPU=%d)", n, expected, runtime.NumCPU()) |
| } |
| } |
|
|
| func TestNewTabExecutor_DefaultLimit(t *testing.T) { |
| te := NewTabExecutor(0) |
| if te.MaxParallel() != DefaultMaxParallel() { |
| t.Errorf("expected default max parallel %d, got %d", DefaultMaxParallel(), te.MaxParallel()) |
| } |
| } |
|
|
| func TestNewTabExecutor_CustomLimit(t *testing.T) { |
| te := NewTabExecutor(3) |
| if te.MaxParallel() != 3 { |
| t.Errorf("expected max parallel 3, got %d", te.MaxParallel()) |
| } |
| } |
|
|
| func TestTabExecutor_SingleTask(t *testing.T) { |
| te := NewTabExecutor(2) |
| var executed bool |
| err := te.Execute(context.Background(), "tab1", func(ctx context.Context) error { |
| executed = true |
| return nil |
| }) |
| if err != nil { |
| t.Fatalf("unexpected error: %v", err) |
| } |
| if !executed { |
| t.Error("task was not executed") |
| } |
| } |
|
|
| func TestTabExecutor_PropagatesError(t *testing.T) { |
| te := NewTabExecutor(2) |
| wantErr := errors.New("task failed") |
| err := te.Execute(context.Background(), "tab1", func(ctx context.Context) error { |
| return wantErr |
| }) |
| if !errors.Is(err, wantErr) { |
| t.Errorf("expected error %v, got %v", wantErr, err) |
| } |
| } |
|
|
| func TestTabExecutor_PanicRecovery(t *testing.T) { |
| te := NewTabExecutor(2) |
| err := te.Execute(context.Background(), "tab1", func(ctx context.Context) error { |
| panic("boom") |
| }) |
| if err == nil { |
| t.Fatal("expected error from panic, got nil") |
| } |
| if err.Error() != "tab tab1: panic: boom" { |
| t.Errorf("unexpected error message: %s", err.Error()) |
| } |
| } |
|
|
| func TestTabExecutor_ContextCancellation(t *testing.T) { |
| te := NewTabExecutor(1) |
|
|
| |
| te.semaphore <- struct{}{} |
|
|
| ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
| defer cancel() |
|
|
| err := te.Execute(ctx, "tab1", func(ctx context.Context) error { |
| return nil |
| }) |
| if err == nil { |
| t.Fatal("expected context error, got nil") |
| } |
|
|
| |
| <-te.semaphore |
| } |
|
|
| func TestTabExecutor_CancelledContextBeforeExecute(t *testing.T) { |
| te := NewTabExecutor(2) |
| ctx, cancel := context.WithCancel(context.Background()) |
| cancel() |
|
|
| err := te.Execute(ctx, "tab1", func(ctx context.Context) error { |
| return nil |
| }) |
| if err == nil { |
| t.Fatal("expected error for cancelled context") |
| } |
| } |
|
|
| func TestTabExecutor_PerTabSequential(t *testing.T) { |
| te := NewTabExecutor(4) |
| var counter int32 |
| var maxConcurrent int32 |
|
|
| var wg sync.WaitGroup |
| for i := 0; i < 10; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| _ = te.Execute(context.Background(), "tab1", func(ctx context.Context) error { |
| cur := atomic.AddInt32(&counter, 1) |
| |
| for { |
| old := atomic.LoadInt32(&maxConcurrent) |
| if cur <= old || atomic.CompareAndSwapInt32(&maxConcurrent, old, cur) { |
| break |
| } |
| } |
| time.Sleep(5 * time.Millisecond) |
| atomic.AddInt32(&counter, -1) |
| return nil |
| }) |
| }() |
| } |
| wg.Wait() |
|
|
| if max := atomic.LoadInt32(&maxConcurrent); max > 1 { |
| t.Errorf("per-tab execution should be sequential, but max concurrent was %d", max) |
| } |
| } |
|
|
| func TestTabExecutor_CrossTabParallel(t *testing.T) { |
| te := NewTabExecutor(4) |
| var maxConcurrent int32 |
| var current int32 |
|
|
| var wg sync.WaitGroup |
| for i := 0; i < 4; i++ { |
| wg.Add(1) |
| tabID := fmt.Sprintf("tab%d", i) |
| go func() { |
| defer wg.Done() |
| _ = te.Execute(context.Background(), tabID, func(ctx context.Context) error { |
| cur := atomic.AddInt32(¤t, 1) |
| for { |
| old := atomic.LoadInt32(&maxConcurrent) |
| if cur <= old || atomic.CompareAndSwapInt32(&maxConcurrent, old, cur) { |
| break |
| } |
| } |
| time.Sleep(50 * time.Millisecond) |
| atomic.AddInt32(¤t, -1) |
| return nil |
| }) |
| }() |
| } |
| wg.Wait() |
|
|
| if max := atomic.LoadInt32(&maxConcurrent); max < 2 { |
| t.Errorf("cross-tab execution should be parallel, but max concurrent was %d", max) |
| } |
| } |
|
|
| func TestTabExecutor_SemaphoreLimit(t *testing.T) { |
| maxParallel := 2 |
| te := NewTabExecutor(maxParallel) |
| var maxConcurrent int32 |
| var current int32 |
|
|
| var wg sync.WaitGroup |
| |
| for i := 0; i < 8; i++ { |
| wg.Add(1) |
| tabID := fmt.Sprintf("tab%d", i) |
| go func() { |
| defer wg.Done() |
| _ = te.Execute(context.Background(), tabID, func(ctx context.Context) error { |
| cur := atomic.AddInt32(¤t, 1) |
| for { |
| old := atomic.LoadInt32(&maxConcurrent) |
| if cur <= old || atomic.CompareAndSwapInt32(&maxConcurrent, old, cur) { |
| break |
| } |
| } |
| time.Sleep(30 * time.Millisecond) |
| atomic.AddInt32(¤t, -1) |
| return nil |
| }) |
| }() |
| } |
| wg.Wait() |
|
|
| if max := atomic.LoadInt32(&maxConcurrent); int(max) > maxParallel { |
| t.Errorf("semaphore should limit to %d, but max concurrent was %d", maxParallel, max) |
| } |
| } |
|
|
| func TestTabExecutor_RemoveTab(t *testing.T) { |
| te := NewTabExecutor(2) |
| |
| _ = te.Execute(context.Background(), "tab1", func(ctx context.Context) error { return nil }) |
|
|
| if te.ActiveTabs() != 1 { |
| t.Errorf("expected 1 active tab, got %d", te.ActiveTabs()) |
| } |
|
|
| te.RemoveTab("tab1") |
| if te.ActiveTabs() != 0 { |
| t.Errorf("expected 0 active tabs after remove, got %d", te.ActiveTabs()) |
| } |
| } |
|
|
| func TestTabExecutor_RemoveTab_Nonexistent(t *testing.T) { |
| te := NewTabExecutor(2) |
| |
| te.RemoveTab("nonexistent") |
| } |
|
|
| func TestTabExecutor_Stats(t *testing.T) { |
| te := NewTabExecutor(4) |
| stats := te.Stats() |
| if stats.MaxParallel != 4 { |
| t.Errorf("expected MaxParallel 4, got %d", stats.MaxParallel) |
| } |
| if stats.ActiveTabs != 0 { |
| t.Errorf("expected 0 active tabs, got %d", stats.ActiveTabs) |
| } |
| if stats.SemaphoreUsed != 0 { |
| t.Errorf("expected 0 semaphore used, got %d", stats.SemaphoreUsed) |
| } |
| if stats.SemaphoreFree != 4 { |
| t.Errorf("expected 4 semaphore free, got %d", stats.SemaphoreFree) |
| } |
| } |
|
|
| func TestTabExecutor_ExecuteWithTimeout(t *testing.T) { |
| te := NewTabExecutor(2) |
| err := te.ExecuteWithTimeout(context.Background(), "tab1", 100*time.Millisecond, func(ctx context.Context) error { |
| return nil |
| }) |
| if err != nil { |
| t.Fatalf("unexpected error: %v", err) |
| } |
| } |
|
|
| func TestTabExecutor_ExecuteWithTimeout_Exceeded(t *testing.T) { |
| te := NewTabExecutor(2) |
| err := te.ExecuteWithTimeout(context.Background(), "tab1", 20*time.Millisecond, func(ctx context.Context) error { |
| select { |
| case <-time.After(500 * time.Millisecond): |
| return nil |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| }) |
| if err == nil { |
| t.Fatal("expected timeout error") |
| } |
| } |
|
|
| func TestTabExecutor_EmptyTabID(t *testing.T) { |
| te := NewTabExecutor(2) |
| err := te.Execute(context.Background(), "", func(ctx context.Context) error { |
| return nil |
| }) |
| if err == nil { |
| t.Fatal("expected error for empty tabID") |
| } |
| if err.Error() != "tabID must not be empty" { |
| t.Errorf("unexpected error message: %s", err.Error()) |
| } |
| } |
|
|
| func TestTabExecutor_NilTask(t *testing.T) { |
| te := NewTabExecutor(2) |
| |
| err := te.Execute(context.Background(), "tab1", nil) |
| if err == nil { |
| t.Fatal("expected error from nil task panic") |
| } |
| } |
|
|
| func TestTabExecutor_MaxParallelOne(t *testing.T) { |
| |
| te := NewTabExecutor(1) |
| var maxConcurrent int32 |
| var current int32 |
|
|
| var wg sync.WaitGroup |
| for i := 0; i < 6; i++ { |
| wg.Add(1) |
| tabID := fmt.Sprintf("tab%d", i) |
| go func() { |
| defer wg.Done() |
| _ = te.Execute(context.Background(), tabID, func(ctx context.Context) error { |
| cur := atomic.AddInt32(¤t, 1) |
| for { |
| old := atomic.LoadInt32(&maxConcurrent) |
| if cur <= old || atomic.CompareAndSwapInt32(&maxConcurrent, old, cur) { |
| break |
| } |
| } |
| time.Sleep(5 * time.Millisecond) |
| atomic.AddInt32(¤t, -1) |
| return nil |
| }) |
| }() |
| } |
| wg.Wait() |
|
|
| if max := atomic.LoadInt32(&maxConcurrent); max > 1 { |
| t.Errorf("maxParallel=1 should serialize all tabs, but max concurrent was %d", max) |
| } |
| } |
|
|
| func TestTabExecutor_NegativeMaxParallel(t *testing.T) { |
| te := NewTabExecutor(-5) |
| if te.MaxParallel() != DefaultMaxParallel() { |
| t.Errorf("negative maxParallel should use default, got %d", te.MaxParallel()) |
| } |
| } |
|
|
| func TestTabExecutor_MultiplePanicsAcrossTabs(t *testing.T) { |
| te := NewTabExecutor(4) |
| var wg sync.WaitGroup |
| errs := make([]error, 4) |
|
|
| for i := 0; i < 4; i++ { |
| wg.Add(1) |
| i := i |
| go func() { |
| defer wg.Done() |
| errs[i] = te.Execute(context.Background(), fmt.Sprintf("tab%d", i), func(ctx context.Context) error { |
| panic(fmt.Sprintf("panic in tab%d", i)) |
| }) |
| }() |
| } |
| wg.Wait() |
|
|
| for i, err := range errs { |
| if err == nil { |
| t.Errorf("tab%d: expected panic error, got nil", i) |
| } |
| } |
|
|
| |
| err := te.Execute(context.Background(), "healthy_tab", func(ctx context.Context) error { |
| return nil |
| }) |
| if err != nil { |
| t.Errorf("executor broken after panics: %v", err) |
| } |
| } |
|
|
| func TestTabExecutor_ReusedTabIDAfterRemove(t *testing.T) { |
| te := NewTabExecutor(2) |
| |
| var firstExecuted, secondExecuted bool |
|
|
| err := te.Execute(context.Background(), "reuse_tab", func(ctx context.Context) error { |
| firstExecuted = true |
| return nil |
| }) |
| if err != nil { |
| t.Fatalf("first execute: %v", err) |
| } |
|
|
| te.RemoveTab("reuse_tab") |
| if te.ActiveTabs() != 0 { |
| t.Fatalf("expected 0 active tabs after remove, got %d", te.ActiveTabs()) |
| } |
|
|
| |
| err = te.Execute(context.Background(), "reuse_tab", func(ctx context.Context) error { |
| secondExecuted = true |
| return nil |
| }) |
| if err != nil { |
| t.Fatalf("second execute after reuse: %v", err) |
| } |
|
|
| if !firstExecuted || !secondExecuted { |
| t.Error("both executions should have run") |
| } |
| } |
|
|
| func TestTabExecutor_ContextTimeoutOnPerTabLock(t *testing.T) { |
| te := NewTabExecutor(4) |
|
|
| |
| started := make(chan struct{}) |
| done := make(chan struct{}) |
| go func() { |
| _ = te.Execute(context.Background(), "tab1", func(ctx context.Context) error { |
| close(started) |
| <-done |
| return nil |
| }) |
| }() |
| <-started |
|
|
| |
| ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
| defer cancel() |
|
|
| err := te.Execute(ctx, "tab1", func(ctx context.Context) error { |
| return nil |
| }) |
| close(done) |
|
|
| if err == nil { |
| t.Fatal("expected timeout error waiting for per-tab lock") |
| } |
| } |
|
|
| func TestTabExecutor_SequentialVsParallelTiming(t *testing.T) { |
| |
| |
| taskDuration := 20 * time.Millisecond |
| numTasks := 4 |
| te := NewTabExecutor(numTasks) |
|
|
| |
| seqStart := time.Now() |
| for i := 0; i < numTasks; i++ { |
| _ = te.Execute(context.Background(), "single_tab", func(ctx context.Context) error { |
| time.Sleep(taskDuration) |
| return nil |
| }) |
| } |
| seqDuration := time.Since(seqStart) |
|
|
| |
| te2 := NewTabExecutor(numTasks) |
| parStart := time.Now() |
| var wg sync.WaitGroup |
| for i := 0; i < numTasks; i++ { |
| wg.Add(1) |
| tabID := fmt.Sprintf("par_tab_%d", i) |
| go func() { |
| defer wg.Done() |
| _ = te2.Execute(context.Background(), tabID, func(ctx context.Context) error { |
| time.Sleep(taskDuration) |
| return nil |
| }) |
| }() |
| } |
| wg.Wait() |
| parDuration := time.Since(parStart) |
|
|
| |
| |
| expectedSeqMin := time.Duration(numTasks) * taskDuration |
| if seqDuration < expectedSeqMin/2 { |
| t.Errorf("sequential took %v, expected at least ~%v", seqDuration, expectedSeqMin) |
| } |
|
|
| |
| if parDuration >= seqDuration/2 { |
| t.Errorf("parallel (%v) should be significantly faster than sequential (%v)", parDuration, seqDuration) |
| } |
|
|
| t.Logf("Sequential: %v, Parallel: %v, Speedup: %.2fx", seqDuration, parDuration, float64(seqDuration)/float64(parDuration)) |
| } |
|
|
| func TestTabExecutor_SemaphoreFairnessUnderContention(t *testing.T) { |
| |
| |
| te := NewTabExecutor(2) |
| completed := make([]bool, 10) |
| var mu sync.Mutex |
| var wg sync.WaitGroup |
|
|
| for i := 0; i < 10; i++ { |
| wg.Add(1) |
| i := i |
| go func() { |
| defer wg.Done() |
| err := te.Execute(context.Background(), fmt.Sprintf("fair_tab_%d", i), func(ctx context.Context) error { |
| time.Sleep(5 * time.Millisecond) |
| mu.Lock() |
| completed[i] = true |
| mu.Unlock() |
| return nil |
| }) |
| if err != nil { |
| t.Errorf("task %d failed: %v", i, err) |
| } |
| }() |
| } |
| wg.Wait() |
|
|
| for i, c := range completed { |
| if !c { |
| t.Errorf("task %d was starved (never completed)", i) |
| } |
| } |
| } |
|
|
| func TestTabExecutor_ErrorDoesNotCorruptState(t *testing.T) { |
| |
| te := NewTabExecutor(2) |
|
|
| err := te.Execute(context.Background(), "err_tab", func(ctx context.Context) error { |
| return fmt.Errorf("deliberate error") |
| }) |
| if err == nil { |
| t.Fatal("expected error") |
| } |
|
|
| |
| var executed bool |
| err = te.Execute(context.Background(), "err_tab", func(ctx context.Context) error { |
| executed = true |
| return nil |
| }) |
| if err != nil { |
| t.Fatalf("tab broken after error: %v", err) |
| } |
| if !executed { |
| t.Error("task did not execute after prior error") |
| } |
| } |
|
|
| func TestTabExecutor_ManyUniqueTabsCreation(t *testing.T) { |
| |
| te := NewTabExecutor(4) |
| var wg sync.WaitGroup |
| numTabs := 100 |
|
|
| for i := 0; i < numTabs; i++ { |
| wg.Add(1) |
| tabID := fmt.Sprintf("unique_tab_%d", i) |
| go func() { |
| defer wg.Done() |
| _ = te.Execute(context.Background(), tabID, func(ctx context.Context) error { |
| return nil |
| }) |
| }() |
| } |
| wg.Wait() |
|
|
| if te.ActiveTabs() != numTabs { |
| t.Errorf("expected %d active tabs, got %d", numTabs, te.ActiveTabs()) |
| } |
|
|
| |
| for i := 0; i < numTabs; i++ { |
| te.RemoveTab(fmt.Sprintf("unique_tab_%d", i)) |
| } |
| if te.ActiveTabs() != 0 { |
| t.Errorf("expected 0 active tabs after cleanup, got %d", te.ActiveTabs()) |
| } |
| } |
|
|
| func TestTabExecutor_SlowAndFastTabsConcurrent(t *testing.T) { |
| |
| te := NewTabExecutor(4) |
| fastDone := make(chan struct{}) |
| slowDone := make(chan struct{}) |
|
|
| |
| go func() { |
| _ = te.Execute(context.Background(), "slow_tab", func(ctx context.Context) error { |
| time.Sleep(100 * time.Millisecond) |
| return nil |
| }) |
| close(slowDone) |
| }() |
|
|
| |
| time.Sleep(5 * time.Millisecond) |
|
|
| |
| go func() { |
| _ = te.Execute(context.Background(), "fast_tab", func(ctx context.Context) error { |
| return nil |
| }) |
| close(fastDone) |
| }() |
|
|
| |
| select { |
| case <-fastDone: |
| |
| case <-slowDone: |
| t.Error("slow tab finished before fast tab — fast tab was blocked") |
| case <-time.After(2 * time.Second): |
| t.Fatal("timed out waiting for tasks") |
| } |
|
|
| <-slowDone |
| } |
|
|