| |
| package task |
|
|
| import ( |
| "context" |
| "runtime" |
|
|
| "github.com/pkg/errors" |
| log "github.com/sirupsen/logrus" |
| ) |
|
|
| var ( |
| PENDING = "pending" |
| RUNNING = "running" |
| SUCCEEDED = "succeeded" |
| CANCELING = "canceling" |
| CANCELED = "canceled" |
| ERRORED = "errored" |
| ) |
|
|
| type Func[K comparable] func(task *Task[K]) error |
| type Callback[K comparable] func(task *Task[K]) |
|
|
| type Task[K comparable] struct { |
| ID K |
| Name string |
| state string |
| status string |
| progress float64 |
|
|
| Error error |
|
|
| Func Func[K] |
| callback Callback[K] |
|
|
| Ctx context.Context |
| cancel context.CancelFunc |
| } |
|
|
| func (t *Task[K]) SetStatus(status string) { |
| t.status = status |
| } |
|
|
| func (t *Task[K]) SetProgress(percentage float64) { |
| t.progress = percentage |
| } |
|
|
| func (t Task[K]) GetProgress() float64 { |
| return t.progress |
| } |
|
|
| func (t Task[K]) GetState() string { |
| return t.state |
| } |
|
|
| func (t Task[K]) GetStatus() string { |
| return t.status |
| } |
|
|
| func (t Task[K]) GetErrMsg() string { |
| if t.Error == nil { |
| return "" |
| } |
| return t.Error.Error() |
| } |
|
|
| func getCurrentGoroutineStack() string { |
| buf := make([]byte, 1<<16) |
| n := runtime.Stack(buf, false) |
| return string(buf[:n]) |
| } |
|
|
| func (t *Task[K]) run() { |
| t.state = RUNNING |
| defer func() { |
| if err := recover(); err != nil { |
| log.Errorf("error [%s] while run task [%s],stack trace:\n%s", err, t.Name, getCurrentGoroutineStack()) |
| t.Error = errors.Errorf("panic: %+v", err) |
| t.state = ERRORED |
| } |
| }() |
| t.Error = t.Func(t) |
| if t.Error != nil { |
| log.Errorf("error [%+v] while run task [%s]", t.Error, t.Name) |
| } |
| if errors.Is(t.Ctx.Err(), context.Canceled) { |
| t.state = CANCELED |
| } else if t.Error != nil { |
| t.state = ERRORED |
| } else { |
| t.state = SUCCEEDED |
| t.SetProgress(100) |
| if t.callback != nil { |
| t.callback(t) |
| } |
| } |
| } |
|
|
| func (t *Task[K]) retry() { |
| t.run() |
| } |
|
|
| func (t *Task[K]) Done() bool { |
| return t.state == SUCCEEDED || t.state == CANCELED || t.state == ERRORED |
| } |
|
|
| func (t *Task[K]) Cancel() { |
| if t.state == SUCCEEDED || t.state == CANCELED { |
| return |
| } |
| if t.cancel != nil { |
| t.cancel() |
| } |
| |
| t.state = CANCELING |
| } |
|
|
| func WithCancelCtx[K comparable](task *Task[K]) *Task[K] { |
| ctx, cancel := context.WithCancel(context.Background()) |
| task.Ctx = ctx |
| task.cancel = cancel |
| task.state = PENDING |
| return task |
| } |
|
|