| package task |
|
|
| import ( |
| "github.com/OpenListTeam/OpenList/v4/pkg/generic_sync" |
| "github.com/OpenListTeam/OpenList/v4/pkg/utils" |
| "github.com/pkg/errors" |
| log "github.com/sirupsen/logrus" |
| ) |
|
|
| type Manager[K comparable] struct { |
| curID K |
| workerC chan struct{} |
| updateID func(*K) |
| tasks generic_sync.MapOf[K, *Task[K]] |
| } |
|
|
| func (tm *Manager[K]) Submit(task *Task[K]) K { |
| if tm.updateID != nil { |
| tm.updateID(&tm.curID) |
| task.ID = tm.curID |
| } |
| tm.tasks.Store(task.ID, task) |
| tm.do(task) |
| return task.ID |
| } |
|
|
| func (tm *Manager[K]) do(task *Task[K]) { |
| go func() { |
| log.Debugf("task [%s] waiting for worker", task.Name) |
| select { |
| case <-tm.workerC: |
| log.Debugf("task [%s] starting", task.Name) |
| task.run() |
| log.Debugf("task [%s] ended", task.Name) |
| case <-task.Ctx.Done(): |
| log.Debugf("task [%s] canceled", task.Name) |
| return |
| } |
| |
| tm.workerC <- struct{}{} |
| }() |
| } |
|
|
| func (tm *Manager[K]) GetAll() []*Task[K] { |
| return tm.tasks.Values() |
| } |
|
|
| func (tm *Manager[K]) Get(tid K) (*Task[K], bool) { |
| return tm.tasks.Load(tid) |
| } |
|
|
| func (tm *Manager[K]) MustGet(tid K) *Task[K] { |
| task, _ := tm.Get(tid) |
| return task |
| } |
|
|
| func (tm *Manager[K]) Retry(tid K) error { |
| t, ok := tm.Get(tid) |
| if !ok { |
| return errors.WithStack(ErrTaskNotFound) |
| } |
| tm.do(t) |
| return nil |
| } |
|
|
| func (tm *Manager[K]) Cancel(tid K) error { |
| t, ok := tm.Get(tid) |
| if !ok { |
| return errors.WithStack(ErrTaskNotFound) |
| } |
| t.Cancel() |
| return nil |
| } |
|
|
| func (tm *Manager[K]) Remove(tid K) error { |
| t, ok := tm.Get(tid) |
| if !ok { |
| return errors.WithStack(ErrTaskNotFound) |
| } |
| if !t.Done() { |
| return errors.WithStack(ErrTaskRunning) |
| } |
| tm.tasks.Delete(tid) |
| return nil |
| } |
|
|
| |
| |
| func (tm *Manager[K]) RemoveAll() { |
| tm.tasks.Clear() |
| } |
|
|
| func (tm *Manager[K]) RemoveByStates(states ...string) { |
| tasks := tm.GetAll() |
| for _, task := range tasks { |
| if utils.SliceContains(states, task.GetState()) { |
| _ = tm.Remove(task.ID) |
| } |
| } |
| } |
|
|
| func (tm *Manager[K]) GetByStates(states ...string) []*Task[K] { |
| var tasks []*Task[K] |
| tm.tasks.Range(func(key K, value *Task[K]) bool { |
| if utils.SliceContains(states, value.GetState()) { |
| tasks = append(tasks, value) |
| } |
| return true |
| }) |
| return tasks |
| } |
|
|
| func (tm *Manager[K]) ListUndone() []*Task[K] { |
| return tm.GetByStates(PENDING, RUNNING, CANCELING) |
| } |
|
|
| func (tm *Manager[K]) ListDone() []*Task[K] { |
| return tm.GetByStates(SUCCEEDED, CANCELED, ERRORED) |
| } |
|
|
| func (tm *Manager[K]) ClearDone() { |
| tm.RemoveByStates(SUCCEEDED, CANCELED, ERRORED) |
| } |
|
|
| func (tm *Manager[K]) ClearSucceeded() { |
| tm.RemoveByStates(SUCCEEDED) |
| } |
|
|
| func (tm *Manager[K]) RawTasks() *generic_sync.MapOf[K, *Task[K]] { |
| return &tm.tasks |
| } |
|
|
| func NewTaskManager[K comparable](maxWorker int, updateID ...func(*K)) *Manager[K] { |
| tm := &Manager[K]{ |
| tasks: generic_sync.MapOf[K, *Task[K]]{}, |
| workerC: make(chan struct{}, maxWorker), |
| } |
| for i := 0; i < maxWorker; i++ { |
| tm.workerC <- struct{}{} |
| } |
| if len(updateID) > 0 { |
| tm.updateID = updateID[0] |
| } |
| return tm |
| } |
|
|