| package bridge | |
| import ( | |
| "context" | |
| "fmt" | |
| "log/slog" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| // TabExecutor provides safe parallel execution across tabs. | |
| // | |
| // Each tab executes tasks sequentially (one at a time), but multiple tabs | |
| // can execute concurrently up to a configurable limit. This prevents | |
| // resource exhaustion on constrained devices (Raspberry Pi, low-memory | |
| // servers) while enabling parallelism where hardware allows. | |
| // | |
| // Architecture: | |
| // | |
| // Tab1 ─── sequential actions ───► | |
| // Tab2 ─── sequential actions ───► (concurrent across tabs) | |
| // Tab3 ─── sequential actions ───► | |
| type TabExecutor struct { | |
| semaphore chan struct{} // limits concurrent tab executions | |
| tabLocks map[string]*sync.Mutex // per-tab sequential execution | |
| mu sync.Mutex // protects tabLocks map | |
| maxParallel int | |
| } | |
| // NewTabExecutor creates a TabExecutor with the given concurrency limit. | |
| // If maxParallel <= 0, DefaultMaxParallel() is used. | |
| func NewTabExecutor(maxParallel int) *TabExecutor { | |
| if maxParallel <= 0 { | |
| maxParallel = DefaultMaxParallel() | |
| } | |
| return &TabExecutor{ | |
| semaphore: make(chan struct{}, maxParallel), | |
| tabLocks: make(map[string]*sync.Mutex), | |
| maxParallel: maxParallel, | |
| } | |
| } | |
| // DefaultMaxParallel returns a safe default based on available CPUs. | |
| // Capped at 8 to prevent resource exhaustion on large machines. | |
| func DefaultMaxParallel() int { | |
| n := runtime.NumCPU() * 2 | |
| if n > 8 { | |
| n = 8 | |
| } | |
| if n < 1 { | |
| n = 1 | |
| } | |
| return n | |
| } | |
| // MaxParallel returns the configured concurrency limit. | |
| func (te *TabExecutor) MaxParallel() int { | |
| return te.maxParallel | |
| } | |
| // tabMutex returns the per-tab mutex, creating one if needed. | |
| func (te *TabExecutor) tabMutex(tabID string) *sync.Mutex { | |
| te.mu.Lock() | |
| defer te.mu.Unlock() | |
| m, ok := te.tabLocks[tabID] | |
| if !ok { | |
| m = &sync.Mutex{} | |
| te.tabLocks[tabID] = m | |
| } | |
| return m | |
| } | |
| // Execute runs a task for the given tab, ensuring: | |
| // - Only one task runs per tab at a time (per-tab sequential execution) | |
| // - At most maxParallel tabs execute concurrently (global semaphore) | |
| // - Panics inside the task are recovered and returned as errors | |
| // - Context cancellation/timeout is respected | |
| // | |
| // The task function receives the same context passed to Execute. Callers | |
| // should use context.WithTimeout to bound execution time. | |
| func (te *TabExecutor) Execute(ctx context.Context, tabID string, task func(ctx context.Context) error) error { | |
| if tabID == "" { | |
| return fmt.Errorf("tabID must not be empty") | |
| } | |
| if ctx.Err() != nil { | |
| return ctx.Err() | |
| } | |
| // Acquire global semaphore (respect context cancellation) | |
| select { | |
| case te.semaphore <- struct{}{}: | |
| defer func() { <-te.semaphore }() | |
| case <-ctx.Done(): | |
| return fmt.Errorf("tab %s: waiting for execution slot: %w", tabID, ctx.Err()) | |
| } | |
| // Acquire per-tab lock for sequential execution within a tab | |
| tabMu := te.tabMutex(tabID) | |
| locked := make(chan struct{}) | |
| go func() { | |
| tabMu.Lock() | |
| close(locked) | |
| }() | |
| select { | |
| case <-locked: | |
| defer tabMu.Unlock() | |
| case <-ctx.Done(): | |
| // If we timed out waiting for the per-tab lock, we need to clean up. | |
| // Launch a goroutine to wait for the lock and immediately release it. | |
| go func() { | |
| <-locked | |
| tabMu.Unlock() | |
| }() | |
| return fmt.Errorf("tab %s: waiting for tab lock: %w", tabID, ctx.Err()) | |
| } | |
| // Execute the task with panic recovery | |
| return te.safeRun(ctx, tabID, task) | |
| } | |
| // safeRun executes the task with panic recovery. | |
| func (te *TabExecutor) safeRun(ctx context.Context, tabID string, task func(ctx context.Context) error) (err error) { | |
| defer func() { | |
| if r := recover(); r != nil { | |
| slog.Error("panic recovered in tab execution", | |
| "tabId", tabID, | |
| "panic", fmt.Sprintf("%v", r), | |
| ) | |
| err = fmt.Errorf("tab %s: panic: %v", tabID, r) | |
| } | |
| }() | |
| return task(ctx) | |
| } | |
| // RemoveTab cleans up the per-tab mutex when a tab is closed. | |
| // It deletes the entry from the map first, then acquires the old mutex | |
| // to wait for any in-flight task to complete before returning. | |
| // | |
| // Note: after deletion, a concurrent Execute call for the same tabID | |
| // will create a fresh mutex via tabMutex(). This is intentional — the | |
| // old tab is being removed and a new tab with the same ID should start | |
| // with a clean slate. Callers (CloseTab, CleanStaleTabs) ensure the | |
| // tab context is cancelled before calling RemoveTab, so in-flight tasks | |
| // will exit promptly. | |
| func (te *TabExecutor) RemoveTab(tabID string) { | |
| te.mu.Lock() | |
| m, ok := te.tabLocks[tabID] | |
| if !ok { | |
| te.mu.Unlock() | |
| return | |
| } | |
| delete(te.tabLocks, tabID) | |
| te.mu.Unlock() | |
| // Wait for any in-flight task holding this mutex to finish. | |
| // We acquire the lock to block until the active task releases it, | |
| // then immediately release — the mutex is orphaned after this. | |
| m.Lock() | |
| defer m.Unlock() //nolint:staticcheck // SA2001: intentional barrier—blocks until in-flight task completes | |
| } | |
| // ActiveTabs returns the number of tabs that have associated mutexes. | |
| func (te *TabExecutor) ActiveTabs() int { | |
| te.mu.Lock() | |
| defer te.mu.Unlock() | |
| return len(te.tabLocks) | |
| } | |
| // Stats returns execution statistics. | |
| type ExecutorStats struct { | |
| MaxParallel int `json:"maxParallel"` | |
| ActiveTabs int `json:"activeTabs"` | |
| SemaphoreUsed int `json:"semaphoreUsed"` | |
| SemaphoreFree int `json:"semaphoreFree"` | |
| } | |
| func (te *TabExecutor) Stats() ExecutorStats { | |
| used := len(te.semaphore) | |
| return ExecutorStats{ | |
| MaxParallel: te.maxParallel, | |
| ActiveTabs: te.ActiveTabs(), | |
| SemaphoreUsed: used, | |
| SemaphoreFree: te.maxParallel - used, | |
| } | |
| } | |
| // ExecuteWithTimeout is a convenience wrapper that creates a timeout context. | |
| func (te *TabExecutor) ExecuteWithTimeout(ctx context.Context, tabID string, timeout time.Duration, task func(ctx context.Context) error) error { | |
| tCtx, cancel := context.WithTimeout(ctx, timeout) | |
| defer cancel() | |
| return te.Execute(tCtx, tabID, task) | |
| } | |