File size: 5,882 Bytes
6a7089a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package bridge

import (
	"context"
	"fmt"
	"log/slog"
	"runtime"
	"sync"
	"time"
)

// TabExecutor provides safe parallel execution across tabs.
//
// Each tab executes tasks sequentially (one at a time), but multiple tabs
// can execute concurrently up to a configurable limit. This prevents
// resource exhaustion on constrained devices (Raspberry Pi, low-memory
// servers) while enabling parallelism where hardware allows.
//
// Architecture:
//
//	Tab1 ─── sequential actions ───►
//	Tab2 ─── sequential actions ───►  (concurrent across tabs)
//	Tab3 ─── sequential actions ───►
type TabExecutor struct {
	semaphore   chan struct{}          // limits concurrent tab executions
	tabLocks    map[string]*sync.Mutex // per-tab sequential execution
	mu          sync.Mutex             // protects tabLocks map
	maxParallel int
}

// NewTabExecutor creates a TabExecutor with the given concurrency limit.
// If maxParallel <= 0, DefaultMaxParallel() is used.
func NewTabExecutor(maxParallel int) *TabExecutor {
	if maxParallel <= 0 {
		maxParallel = DefaultMaxParallel()
	}
	return &TabExecutor{
		semaphore:   make(chan struct{}, maxParallel),
		tabLocks:    make(map[string]*sync.Mutex),
		maxParallel: maxParallel,
	}
}

// DefaultMaxParallel returns a safe default based on available CPUs.
// Capped at 8 to prevent resource exhaustion on large machines.
func DefaultMaxParallel() int {
	n := runtime.NumCPU() * 2
	if n > 8 {
		n = 8
	}
	if n < 1 {
		n = 1
	}
	return n
}

// MaxParallel returns the configured concurrency limit.
func (te *TabExecutor) MaxParallel() int {
	return te.maxParallel
}

// tabMutex returns the per-tab mutex, creating one if needed.
func (te *TabExecutor) tabMutex(tabID string) *sync.Mutex {
	te.mu.Lock()
	defer te.mu.Unlock()
	m, ok := te.tabLocks[tabID]
	if !ok {
		m = &sync.Mutex{}
		te.tabLocks[tabID] = m
	}
	return m
}

// Execute runs a task for the given tab, ensuring:
//   - Only one task runs per tab at a time (per-tab sequential execution)
//   - At most maxParallel tabs execute concurrently (global semaphore)
//   - Panics inside the task are recovered and returned as errors
//   - Context cancellation/timeout is respected
//
// The task function receives the same context passed to Execute. Callers
// should use context.WithTimeout to bound execution time.
func (te *TabExecutor) Execute(ctx context.Context, tabID string, task func(ctx context.Context) error) error {
	if tabID == "" {
		return fmt.Errorf("tabID must not be empty")
	}
	if ctx.Err() != nil {
		return ctx.Err()
	}

	// Acquire global semaphore (respect context cancellation)
	select {
	case te.semaphore <- struct{}{}:
		defer func() { <-te.semaphore }()
	case <-ctx.Done():
		return fmt.Errorf("tab %s: waiting for execution slot: %w", tabID, ctx.Err())
	}

	// Acquire per-tab lock for sequential execution within a tab
	tabMu := te.tabMutex(tabID)
	locked := make(chan struct{})
	go func() {
		tabMu.Lock()
		close(locked)
	}()

	select {
	case <-locked:
		defer tabMu.Unlock()
	case <-ctx.Done():
		// If we timed out waiting for the per-tab lock, we need to clean up.
		// Launch a goroutine to wait for the lock and immediately release it.
		go func() {
			<-locked
			tabMu.Unlock()
		}()
		return fmt.Errorf("tab %s: waiting for tab lock: %w", tabID, ctx.Err())
	}

	// Execute the task with panic recovery
	return te.safeRun(ctx, tabID, task)
}

// safeRun executes the task with panic recovery.
func (te *TabExecutor) safeRun(ctx context.Context, tabID string, task func(ctx context.Context) error) (err error) {
	defer func() {
		if r := recover(); r != nil {
			slog.Error("panic recovered in tab execution",
				"tabId", tabID,
				"panic", fmt.Sprintf("%v", r),
			)
			err = fmt.Errorf("tab %s: panic: %v", tabID, r)
		}
	}()
	return task(ctx)
}

// RemoveTab cleans up the per-tab mutex when a tab is closed.
// It deletes the entry from the map first, then acquires the old mutex
// to wait for any in-flight task to complete before returning.
//
// Note: after deletion, a concurrent Execute call for the same tabID
// will create a fresh mutex via tabMutex(). This is intentional — the
// old tab is being removed and a new tab with the same ID should start
// with a clean slate. Callers (CloseTab, CleanStaleTabs) ensure the
// tab context is cancelled before calling RemoveTab, so in-flight tasks
// will exit promptly.
func (te *TabExecutor) RemoveTab(tabID string) {
	te.mu.Lock()
	m, ok := te.tabLocks[tabID]
	if !ok {
		te.mu.Unlock()
		return
	}
	delete(te.tabLocks, tabID)
	te.mu.Unlock()

	// Wait for any in-flight task holding this mutex to finish.
	// We acquire the lock to block until the active task releases it,
	// then immediately release — the mutex is orphaned after this.
	m.Lock()
	defer m.Unlock() //nolint:staticcheck // SA2001: intentional barrier—blocks until in-flight task completes
}

// ActiveTabs returns the number of tabs that have associated mutexes.
func (te *TabExecutor) ActiveTabs() int {
	te.mu.Lock()
	defer te.mu.Unlock()
	return len(te.tabLocks)
}

// Stats returns execution statistics.
type ExecutorStats struct {
	MaxParallel   int `json:"maxParallel"`
	ActiveTabs    int `json:"activeTabs"`
	SemaphoreUsed int `json:"semaphoreUsed"`
	SemaphoreFree int `json:"semaphoreFree"`
}

func (te *TabExecutor) Stats() ExecutorStats {
	used := len(te.semaphore)
	return ExecutorStats{
		MaxParallel:   te.maxParallel,
		ActiveTabs:    te.ActiveTabs(),
		SemaphoreUsed: used,
		SemaphoreFree: te.maxParallel - used,
	}
}

// ExecuteWithTimeout is a convenience wrapper that creates a timeout context.
func (te *TabExecutor) ExecuteWithTimeout(ctx context.Context, tabID string, timeout time.Duration, task func(ctx context.Context) error) error {
	tCtx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	return te.Execute(tCtx, tabID, task)
}