WitNote / internal /bridge /tab_executor_test.go
AUXteam's picture
Upload folder using huggingface_hub
6a7089a verified
package bridge
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestDefaultMaxParallel(t *testing.T) {
n := DefaultMaxParallel()
if n < 1 {
t.Errorf("DefaultMaxParallel must be >= 1, got %d", n)
}
expected := runtime.NumCPU() * 2
if expected > 8 {
expected = 8
}
if n != expected {
t.Errorf("DefaultMaxParallel = %d, want %d (NumCPU=%d)", n, expected, runtime.NumCPU())
}
}
func TestNewTabExecutor_DefaultLimit(t *testing.T) {
te := NewTabExecutor(0)
if te.MaxParallel() != DefaultMaxParallel() {
t.Errorf("expected default max parallel %d, got %d", DefaultMaxParallel(), te.MaxParallel())
}
}
func TestNewTabExecutor_CustomLimit(t *testing.T) {
te := NewTabExecutor(3)
if te.MaxParallel() != 3 {
t.Errorf("expected max parallel 3, got %d", te.MaxParallel())
}
}
func TestTabExecutor_SingleTask(t *testing.T) {
te := NewTabExecutor(2)
var executed bool
err := te.Execute(context.Background(), "tab1", func(ctx context.Context) error {
executed = true
return nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !executed {
t.Error("task was not executed")
}
}
func TestTabExecutor_PropagatesError(t *testing.T) {
te := NewTabExecutor(2)
wantErr := errors.New("task failed")
err := te.Execute(context.Background(), "tab1", func(ctx context.Context) error {
return wantErr
})
if !errors.Is(err, wantErr) {
t.Errorf("expected error %v, got %v", wantErr, err)
}
}
func TestTabExecutor_PanicRecovery(t *testing.T) {
te := NewTabExecutor(2)
err := te.Execute(context.Background(), "tab1", func(ctx context.Context) error {
panic("boom")
})
if err == nil {
t.Fatal("expected error from panic, got nil")
}
if err.Error() != "tab tab1: panic: boom" {
t.Errorf("unexpected error message: %s", err.Error())
}
}
func TestTabExecutor_ContextCancellation(t *testing.T) {
te := NewTabExecutor(1)
// Fill semaphore
te.semaphore <- struct{}{}
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := te.Execute(ctx, "tab1", func(ctx context.Context) error {
return nil
})
if err == nil {
t.Fatal("expected context error, got nil")
}
// Release semaphore
<-te.semaphore
}
func TestTabExecutor_CancelledContextBeforeExecute(t *testing.T) {
te := NewTabExecutor(2)
ctx, cancel := context.WithCancel(context.Background())
cancel() // Pre-cancel
err := te.Execute(ctx, "tab1", func(ctx context.Context) error {
return nil
})
if err == nil {
t.Fatal("expected error for cancelled context")
}
}
func TestTabExecutor_PerTabSequential(t *testing.T) {
te := NewTabExecutor(4)
var counter int32
var maxConcurrent int32
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = te.Execute(context.Background(), "tab1", func(ctx context.Context) error {
cur := atomic.AddInt32(&counter, 1)
// Track max concurrent executions for same tab
for {
old := atomic.LoadInt32(&maxConcurrent)
if cur <= old || atomic.CompareAndSwapInt32(&maxConcurrent, old, cur) {
break
}
}
time.Sleep(5 * time.Millisecond)
atomic.AddInt32(&counter, -1)
return nil
})
}()
}
wg.Wait()
if max := atomic.LoadInt32(&maxConcurrent); max > 1 {
t.Errorf("per-tab execution should be sequential, but max concurrent was %d", max)
}
}
func TestTabExecutor_CrossTabParallel(t *testing.T) {
te := NewTabExecutor(4)
var maxConcurrent int32
var current int32
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
tabID := fmt.Sprintf("tab%d", i)
go func() {
defer wg.Done()
_ = te.Execute(context.Background(), tabID, func(ctx context.Context) error {
cur := atomic.AddInt32(&current, 1)
for {
old := atomic.LoadInt32(&maxConcurrent)
if cur <= old || atomic.CompareAndSwapInt32(&maxConcurrent, old, cur) {
break
}
}
time.Sleep(50 * time.Millisecond)
atomic.AddInt32(&current, -1)
return nil
})
}()
}
wg.Wait()
if max := atomic.LoadInt32(&maxConcurrent); max < 2 {
t.Errorf("cross-tab execution should be parallel, but max concurrent was %d", max)
}
}
func TestTabExecutor_SemaphoreLimit(t *testing.T) {
maxParallel := 2
te := NewTabExecutor(maxParallel)
var maxConcurrent int32
var current int32
var wg sync.WaitGroup
// Launch more tasks than the semaphore allows
for i := 0; i < 8; i++ {
wg.Add(1)
tabID := fmt.Sprintf("tab%d", i)
go func() {
defer wg.Done()
_ = te.Execute(context.Background(), tabID, func(ctx context.Context) error {
cur := atomic.AddInt32(&current, 1)
for {
old := atomic.LoadInt32(&maxConcurrent)
if cur <= old || atomic.CompareAndSwapInt32(&maxConcurrent, old, cur) {
break
}
}
time.Sleep(30 * time.Millisecond)
atomic.AddInt32(&current, -1)
return nil
})
}()
}
wg.Wait()
if max := atomic.LoadInt32(&maxConcurrent); int(max) > maxParallel {
t.Errorf("semaphore should limit to %d, but max concurrent was %d", maxParallel, max)
}
}
func TestTabExecutor_RemoveTab(t *testing.T) {
te := NewTabExecutor(2)
// Execute a task to create the per-tab mutex
_ = te.Execute(context.Background(), "tab1", func(ctx context.Context) error { return nil })
if te.ActiveTabs() != 1 {
t.Errorf("expected 1 active tab, got %d", te.ActiveTabs())
}
te.RemoveTab("tab1")
if te.ActiveTabs() != 0 {
t.Errorf("expected 0 active tabs after remove, got %d", te.ActiveTabs())
}
}
func TestTabExecutor_RemoveTab_Nonexistent(t *testing.T) {
te := NewTabExecutor(2)
// Should not panic
te.RemoveTab("nonexistent")
}
func TestTabExecutor_Stats(t *testing.T) {
te := NewTabExecutor(4)
stats := te.Stats()
if stats.MaxParallel != 4 {
t.Errorf("expected MaxParallel 4, got %d", stats.MaxParallel)
}
if stats.ActiveTabs != 0 {
t.Errorf("expected 0 active tabs, got %d", stats.ActiveTabs)
}
if stats.SemaphoreUsed != 0 {
t.Errorf("expected 0 semaphore used, got %d", stats.SemaphoreUsed)
}
if stats.SemaphoreFree != 4 {
t.Errorf("expected 4 semaphore free, got %d", stats.SemaphoreFree)
}
}
func TestTabExecutor_ExecuteWithTimeout(t *testing.T) {
te := NewTabExecutor(2)
err := te.ExecuteWithTimeout(context.Background(), "tab1", 100*time.Millisecond, func(ctx context.Context) error {
return nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestTabExecutor_ExecuteWithTimeout_Exceeded(t *testing.T) {
te := NewTabExecutor(2)
err := te.ExecuteWithTimeout(context.Background(), "tab1", 20*time.Millisecond, func(ctx context.Context) error {
select {
case <-time.After(500 * time.Millisecond):
return nil
case <-ctx.Done():
return ctx.Err()
}
})
if err == nil {
t.Fatal("expected timeout error")
}
}
func TestTabExecutor_EmptyTabID(t *testing.T) {
te := NewTabExecutor(2)
err := te.Execute(context.Background(), "", func(ctx context.Context) error {
return nil
})
if err == nil {
t.Fatal("expected error for empty tabID")
}
if err.Error() != "tabID must not be empty" {
t.Errorf("unexpected error message: %s", err.Error())
}
}
func TestTabExecutor_NilTask(t *testing.T) {
te := NewTabExecutor(2)
// Passing a nil task should panic and be recovered
err := te.Execute(context.Background(), "tab1", nil)
if err == nil {
t.Fatal("expected error from nil task panic")
}
}
func TestTabExecutor_MaxParallelOne(t *testing.T) {
// With maxParallel=1, ALL tabs are fully serialized (no parallelism)
te := NewTabExecutor(1)
var maxConcurrent int32
var current int32
var wg sync.WaitGroup
for i := 0; i < 6; i++ {
wg.Add(1)
tabID := fmt.Sprintf("tab%d", i)
go func() {
defer wg.Done()
_ = te.Execute(context.Background(), tabID, func(ctx context.Context) error {
cur := atomic.AddInt32(&current, 1)
for {
old := atomic.LoadInt32(&maxConcurrent)
if cur <= old || atomic.CompareAndSwapInt32(&maxConcurrent, old, cur) {
break
}
}
time.Sleep(5 * time.Millisecond)
atomic.AddInt32(&current, -1)
return nil
})
}()
}
wg.Wait()
if max := atomic.LoadInt32(&maxConcurrent); max > 1 {
t.Errorf("maxParallel=1 should serialize all tabs, but max concurrent was %d", max)
}
}
func TestTabExecutor_NegativeMaxParallel(t *testing.T) {
te := NewTabExecutor(-5)
if te.MaxParallel() != DefaultMaxParallel() {
t.Errorf("negative maxParallel should use default, got %d", te.MaxParallel())
}
}
func TestTabExecutor_MultiplePanicsAcrossTabs(t *testing.T) {
te := NewTabExecutor(4)
var wg sync.WaitGroup
errs := make([]error, 4)
for i := 0; i < 4; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
errs[i] = te.Execute(context.Background(), fmt.Sprintf("tab%d", i), func(ctx context.Context) error {
panic(fmt.Sprintf("panic in tab%d", i))
})
}()
}
wg.Wait()
for i, err := range errs {
if err == nil {
t.Errorf("tab%d: expected panic error, got nil", i)
}
}
// Executor should still work after multiple panics
err := te.Execute(context.Background(), "healthy_tab", func(ctx context.Context) error {
return nil
})
if err != nil {
t.Errorf("executor broken after panics: %v", err)
}
}
func TestTabExecutor_ReusedTabIDAfterRemove(t *testing.T) {
te := NewTabExecutor(2)
// Execute on tab, then remove, then reuse the same ID
var firstExecuted, secondExecuted bool
err := te.Execute(context.Background(), "reuse_tab", func(ctx context.Context) error {
firstExecuted = true
return nil
})
if err != nil {
t.Fatalf("first execute: %v", err)
}
te.RemoveTab("reuse_tab")
if te.ActiveTabs() != 0 {
t.Fatalf("expected 0 active tabs after remove, got %d", te.ActiveTabs())
}
// Reuse same tab ID — should get a fresh mutex and work fine
err = te.Execute(context.Background(), "reuse_tab", func(ctx context.Context) error {
secondExecuted = true
return nil
})
if err != nil {
t.Fatalf("second execute after reuse: %v", err)
}
if !firstExecuted || !secondExecuted {
t.Error("both executions should have run")
}
}
func TestTabExecutor_ContextTimeoutOnPerTabLock(t *testing.T) {
te := NewTabExecutor(4)
// Hold the per-tab lock for tab1 via a long-running task
started := make(chan struct{})
done := make(chan struct{})
go func() {
_ = te.Execute(context.Background(), "tab1", func(ctx context.Context) error {
close(started)
<-done // Block until test releases
return nil
})
}()
<-started
// Try to execute on the same tab with a short timeout
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := te.Execute(ctx, "tab1", func(ctx context.Context) error {
return nil
})
close(done) // Release the blocking task
if err == nil {
t.Fatal("expected timeout error waiting for per-tab lock")
}
}
func TestTabExecutor_SequentialVsParallelTiming(t *testing.T) {
// Measures that parallel execution across different tabs is genuinely
// faster than sequential execution on a single tab.
taskDuration := 20 * time.Millisecond
numTasks := 4
te := NewTabExecutor(numTasks)
// Sequential: all tasks on one tab
seqStart := time.Now()
for i := 0; i < numTasks; i++ {
_ = te.Execute(context.Background(), "single_tab", func(ctx context.Context) error {
time.Sleep(taskDuration)
return nil
})
}
seqDuration := time.Since(seqStart)
// Parallel: each task on a different tab
te2 := NewTabExecutor(numTasks)
parStart := time.Now()
var wg sync.WaitGroup
for i := 0; i < numTasks; i++ {
wg.Add(1)
tabID := fmt.Sprintf("par_tab_%d", i)
go func() {
defer wg.Done()
_ = te2.Execute(context.Background(), tabID, func(ctx context.Context) error {
time.Sleep(taskDuration)
return nil
})
}()
}
wg.Wait()
parDuration := time.Since(parStart)
// Sequential should take ~numTasks * taskDuration
// Parallel should take ~taskDuration (all run concurrently)
expectedSeqMin := time.Duration(numTasks) * taskDuration
if seqDuration < expectedSeqMin/2 {
t.Errorf("sequential took %v, expected at least ~%v", seqDuration, expectedSeqMin)
}
// Parallel should be at least 2x faster than sequential
if parDuration >= seqDuration/2 {
t.Errorf("parallel (%v) should be significantly faster than sequential (%v)", parDuration, seqDuration)
}
t.Logf("Sequential: %v, Parallel: %v, Speedup: %.2fx", seqDuration, parDuration, float64(seqDuration)/float64(parDuration))
}
func TestTabExecutor_SemaphoreFairnessUnderContention(t *testing.T) {
// With maxParallel=2 and 10 tasks on different tabs, all tasks should complete.
// No task should starve.
te := NewTabExecutor(2)
completed := make([]bool, 10)
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
err := te.Execute(context.Background(), fmt.Sprintf("fair_tab_%d", i), func(ctx context.Context) error {
time.Sleep(5 * time.Millisecond)
mu.Lock()
completed[i] = true
mu.Unlock()
return nil
})
if err != nil {
t.Errorf("task %d failed: %v", i, err)
}
}()
}
wg.Wait()
for i, c := range completed {
if !c {
t.Errorf("task %d was starved (never completed)", i)
}
}
}
func TestTabExecutor_ErrorDoesNotCorruptState(t *testing.T) {
// After a task returns an error, the same tab should still accept new tasks.
te := NewTabExecutor(2)
err := te.Execute(context.Background(), "err_tab", func(ctx context.Context) error {
return fmt.Errorf("deliberate error")
})
if err == nil {
t.Fatal("expected error")
}
// Tab should still work
var executed bool
err = te.Execute(context.Background(), "err_tab", func(ctx context.Context) error {
executed = true
return nil
})
if err != nil {
t.Fatalf("tab broken after error: %v", err)
}
if !executed {
t.Error("task did not execute after prior error")
}
}
func TestTabExecutor_ManyUniqueTabsCreation(t *testing.T) {
// Verify that creating many unique tabs doesn't cause issues.
te := NewTabExecutor(4)
var wg sync.WaitGroup
numTabs := 100
for i := 0; i < numTabs; i++ {
wg.Add(1)
tabID := fmt.Sprintf("unique_tab_%d", i)
go func() {
defer wg.Done()
_ = te.Execute(context.Background(), tabID, func(ctx context.Context) error {
return nil
})
}()
}
wg.Wait()
if te.ActiveTabs() != numTabs {
t.Errorf("expected %d active tabs, got %d", numTabs, te.ActiveTabs())
}
// Clean up all
for i := 0; i < numTabs; i++ {
te.RemoveTab(fmt.Sprintf("unique_tab_%d", i))
}
if te.ActiveTabs() != 0 {
t.Errorf("expected 0 active tabs after cleanup, got %d", te.ActiveTabs())
}
}
func TestTabExecutor_SlowAndFastTabsConcurrent(t *testing.T) {
// One slow tab shouldn't block a fast tab from completing.
te := NewTabExecutor(4)
fastDone := make(chan struct{})
slowDone := make(chan struct{})
// Slow tab
go func() {
_ = te.Execute(context.Background(), "slow_tab", func(ctx context.Context) error {
time.Sleep(100 * time.Millisecond)
return nil
})
close(slowDone)
}()
// Short delay to ensure slow task starts first
time.Sleep(5 * time.Millisecond)
// Fast tab
go func() {
_ = te.Execute(context.Background(), "fast_tab", func(ctx context.Context) error {
return nil // instant
})
close(fastDone)
}()
// Fast tab should finish before slow tab
select {
case <-fastDone:
// expected
case <-slowDone:
t.Error("slow tab finished before fast tab — fast tab was blocked")
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for tasks")
}
<-slowDone
}