Plandex_backup / app /server /db /queue.go
google-labs-jules[bot]
Final deployment for HF with landing page
93d826e
package db
import (
"context"
"fmt"
"log"
"runtime/debug"
"sync"
"github.com/google/uuid"
)
type repoOpFn func(repo *GitRepo) error
type repoOperation struct {
orgId string
userId string
planId string
branch string
scope LockScope
planBuildId string
id string
reason string
op repoOpFn
ctx context.Context
cancelFn context.CancelFunc
done chan error
clearRepoOnErr bool
}
type repoQueue struct {
ops []*repoOperation
mu sync.Mutex
isProcessing bool
}
type repoQueueMap map[string]*repoQueue
var queuesMu sync.Mutex
var repoQueues = make(repoQueueMap)
func (m repoQueueMap) getQueue(planId string) *repoQueue {
queuesMu.Lock()
defer queuesMu.Unlock()
if locksVerboseLogging {
log.Printf("[Queue] Getting queue for plan %s", planId)
}
q, ok := m[planId]
if !ok {
if locksVerboseLogging {
log.Printf("[Queue] Creating new queue for plan %s", planId)
}
q = &repoQueue{}
m[planId] = q
}
return q
}
func (m repoQueueMap) add(op *repoOperation) int {
if locksVerboseLogging {
log.Printf("[Queue] Adding operation %s (%s) to queue for plan %s", op.id, op.reason, op.planId)
}
q := m.getQueue(op.planId)
return q.add(op)
}
// Add enqueues an operation, and then kicks off processing if needed.
func (q *repoQueue) add(op *repoOperation) int {
var numOps int
q.mu.Lock()
q.ops = append(q.ops, op)
numOps = len(q.ops)
if locksVerboseLogging {
log.Printf("[Queue] Operation %s (%s) enqueued, queue length now %d", op.id, op.reason, numOps)
}
// If nobody else is processing, we'll start
if !q.isProcessing {
if locksVerboseLogging {
log.Printf("[Queue] Starting queue processing for operation %s (%s)", op.id, op.reason)
}
q.isProcessing = true
go q.runQueue() // run in the background
} else if locksVerboseLogging {
log.Printf("[Queue] Queue already processing, operation %s (%s) will wait", op.id, op.reason)
}
q.mu.Unlock()
return numOps
}
func (q *repoQueue) nextBatch() []*repoOperation {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.ops) == 0 {
if locksVerboseLogging {
log.Printf("[Queue] No operations in queue")
}
return nil
}
firstOp := q.ops[0]
res := []*repoOperation{firstOp}
if locksVerboseLogging {
log.Printf("[Queue] Processing first operation %s (%s) with scope %s, branch %s",
firstOp.id, firstOp.reason, firstOp.scope, firstOp.branch)
}
q.ops = q.ops[1:]
// writes always go one at a time, blocking everything else, as do read locks on the root plan (no branch)
if firstOp.scope == LockScopeWrite || firstOp.branch == "" {
if locksVerboseLogging {
log.Printf("[Queue] Operation %s is write or root branch read, processing alone", firstOp.id)
}
return res
}
// reads go in parallel as long as they are on the same branch
for len(q.ops) > 0 {
op := q.ops[0]
if op.scope == LockScopeRead && op.branch == firstOp.branch {
if locksVerboseLogging {
log.Printf("[Queue] Batching compatible read operation %s (%s) with same branch %s",
op.id, op.reason, op.branch)
}
res = append(res, op)
q.ops = q.ops[1:]
} else {
if locksVerboseLogging {
log.Printf("[Queue] Operation %s (%s) with scope %s, branch %s not compatible with batch, stopping",
op.id, op.reason, op.scope, op.branch)
}
break
}
}
if locksVerboseLogging {
log.Printf("[Queue] Created batch of %d operations", len(res))
}
return res
}
func (q *repoQueue) runQueue() {
if locksVerboseLogging {
log.Printf("[Queue] Starting queue processing")
}
for {
// get the next batch
ops := q.nextBatch()
if len(ops) == 0 {
// Nothing left in the queue, so mark not processing and return
if locksVerboseLogging {
log.Printf("[Queue] Queue empty, stopping processing")
}
q.mu.Lock()
q.isProcessing = false
q.mu.Unlock()
return
}
firstOp := ops[0]
func() {
if locksVerboseLogging {
log.Printf("[Queue] Attempting to acquire DB lock for plan %s, branch %s, scope %s",
firstOp.planId, firstOp.branch, firstOp.scope)
}
lockId, err := lockRepoDB(LockRepoParams{
OrgId: firstOp.orgId,
UserId: firstOp.userId,
PlanId: firstOp.planId,
Branch: firstOp.branch,
Scope: firstOp.scope,
PlanBuildId: firstOp.planBuildId,
Reason: firstOp.reason,
Ctx: firstOp.ctx,
CancelFn: firstOp.cancelFn,
}, 0)
if lockId != "" {
log.Printf("[Queue] Acquired DB lock %s", lockId)
defer func() {
log.Printf("[Queue] Releasing DB lock %s for plan %s", lockId, firstOp.planId)
releaseErr := deleteRepoLockDB(lockId, firstOp.planId, firstOp.reason, 0)
if releaseErr != nil {
log.Printf("[Queue] Failed to release DB lock: %v", releaseErr)
} else {
log.Printf("[Queue] DB lock %s released successfully", lockId)
}
}()
}
if err != nil {
log.Printf("[Queue] Failed to get DB lock: %v", err)
for _, op := range ops {
if locksVerboseLogging {
log.Printf("[Queue] Notifying operation %s (%s) of lock failure", op.id, op.reason)
}
op.done <- fmt.Errorf("failed to get DB lock: %w", err)
}
// we still need to process the rest of the queue
// if the error is critical, caller will handle it
return
}
if locksVerboseLogging {
log.Printf("[Queue] Acquired DB lock %s, processing batch of %d operations", lockId, len(ops))
}
repo := getGitRepo(firstOp.orgId, firstOp.planId)
var needsRollback bool
// Process the batch
// If it's a writer => single op
// If multiple same‐branch readers => do them in parallel
var wg sync.WaitGroup
for _, op := range ops {
wg.Add(1)
go func(op *repoOperation) {
defer wg.Done()
select {
case <-op.ctx.Done():
if locksVerboseLogging {
log.Printf("[Queue] Operation %s (%s) context canceled", op.id, op.reason)
}
op.done <- op.ctx.Err()
default:
if locksVerboseLogging {
log.Printf("[Queue] Starting operation %s (%s)", op.id, op.reason)
}
// actually do the operation
var opErr error
func() {
defer func() {
panicErr := recover()
if panicErr != nil {
log.Printf("[Queue] Panic in operation %s (%s): %v", op.id, op.reason, panicErr)
log.Printf("[Queue] Stack trace: %s", string(debug.Stack()))
opErr = fmt.Errorf("panic in operation: %v\n%s", panicErr, string(debug.Stack()))
}
if opErr != nil && op.scope == LockScopeWrite && op.clearRepoOnErr {
if locksVerboseLogging {
log.Printf("[Queue] Operation %s (%s) failed with error, marking for rollback: %v",
op.id, op.reason, opErr)
}
needsRollback = true
}
}()
if locksVerboseLogging {
log.Printf("[Queue] Executing operation %s (%s)", op.id, op.reason)
}
opErr = op.op(repo)
if locksVerboseLogging {
if opErr != nil {
log.Printf("[Queue] Operation %s (%s) failed with error: %v", op.id, op.reason, opErr)
} else {
log.Printf("[Queue] Operation %s (%s) completed successfully", op.id, op.reason)
}
}
}()
// signal to the caller via op.done
if locksVerboseLogging {
log.Printf("[Queue] Notifying caller of operation %s (%s) completion", op.id, op.reason)
}
op.done <- opErr
}
}(op)
}
wg.Wait()
if needsRollback {
log.Printf("[Queue] Performing rollback for plan %s branch %s", firstOp.planId, firstOp.branch)
rollbackErr := repo.GitClearUncommittedChanges(firstOp.branch)
if rollbackErr != nil {
log.Printf("[Queue] Failed to rollback: %v", rollbackErr)
} else if locksVerboseLogging {
log.Printf("[Queue] Rollback completed successfully")
}
}
}()
}
}
type ExecRepoOperationParams struct {
OrgId string
UserId string
PlanId string
Branch string
Scope LockScope
PlanBuildId string
Reason string
Ctx context.Context
CancelFn context.CancelFunc
ClearRepoOnErr bool
}
func ExecRepoOperation(
params ExecRepoOperationParams,
op repoOpFn,
) error {
id := uuid.New().String()
log.Printf("[Queue] ExecRepoOperation called for plan %s, branch %s, scope %s, reason %s",
params.PlanId, params.Branch, params.Scope, params.Reason)
done := make(chan error, 1)
numOps := repoQueues.add(&repoOperation{
id: id,
orgId: params.OrgId,
planId: params.PlanId,
branch: params.Branch,
scope: params.Scope,
reason: params.Reason,
planBuildId: params.PlanBuildId,
op: op,
done: done,
ctx: params.Ctx,
cancelFn: params.CancelFn,
clearRepoOnErr: params.ClearRepoOnErr,
})
if numOps > 1 {
if locksVerboseLogging {
log.Printf("[Queue] Operation %s (%s) queued behind %d operations", id, params.Reason, numOps-1)
for i, op := range repoQueues.getQueue(params.PlanId).ops {
log.Printf("[Queue] Operation %d: %s - %s\n", i, op.id, op.reason)
}
}
}
select {
case err := <-done:
if locksVerboseLogging {
if err != nil {
log.Printf("[Queue] Operation %s (%s) completed with error: %v", id, params.Reason, err)
} else {
log.Printf("[Queue] Operation %s (%s) completed successfully", id, params.Reason)
}
}
return err
case <-params.Ctx.Done():
if locksVerboseLogging {
log.Printf("[Queue] Operation %s (%s) context canceled while waiting", id, params.Reason)
}
return params.Ctx.Err()
}
}