package worker import ( "api.qobiltu.id/mail" "context" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "log/slog" ) const ( Low = "low" Default = "default" Critical = "critical" ) type TaskProcessor interface { Start() error Shutdown() ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error ProcessTaskSendForgotPasswordEmail(ctx context.Context, task *asynq.Task) error } type RedisTaskProcessor struct { server *asynq.Server emailSender mail.Sender } func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, emailSender mail.Sender) TaskProcessor { logger := NewLogger() redis.SetLogger(logger) server := asynq.NewServer( redisOpt, asynq.Config{ // priority value. Keys are the names of the queues and values are associated priority value. Queues: map[string]int{ Critical: 6, Default: 3, Low: 1, }, ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { slog.Error("process task failed", "error", err, "type", task.Type(), "payload", string(task.Payload())) }), // maximum number of concurrent processing of tasks. Concurrency: 50, Logger: logger, }, ) return &RedisTaskProcessor{ server: server, emailSender: emailSender, } } func (p *RedisTaskProcessor) Start() error { mux := asynq.NewServeMux() mux.HandleFunc(TaskSendVerifyEmail, p.ProcessTaskSendVerifyEmail) mux.HandleFunc(TaskSendForgotPasswordEmail, p.ProcessTaskSendForgotPasswordEmail) return p.server.Start(mux) } func (p *RedisTaskProcessor) Shutdown() { p.server.Shutdown() }