Spaces:
Configuration error
Configuration error
| package worker | |
| import ( | |
| "api.qobiltu.id/mail" | |
| "context" | |
| "github.com/hibiken/asynq" | |
| "github.com/redis/go-redis/v9" | |
| "log/slog" | |
| ) | |
| const ( | |
| Low = "low" | |
| Default = "default" | |
| Critical = "critical" | |
| ) | |
| type TaskProcessor interface { | |
| Start() error | |
| Shutdown() | |
| ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error | |
| ProcessTaskSendForgotPasswordEmail(ctx context.Context, task *asynq.Task) error | |
| } | |
| type RedisTaskProcessor struct { | |
| server *asynq.Server | |
| emailSender mail.Sender | |
| } | |
| func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, emailSender mail.Sender) TaskProcessor { | |
| logger := NewLogger() | |
| redis.SetLogger(logger) | |
| server := asynq.NewServer( | |
| redisOpt, | |
| asynq.Config{ | |
| // priority value. Keys are the names of the queues and values are associated priority value. | |
| Queues: map[string]int{ | |
| Critical: 6, | |
| Default: 3, | |
| Low: 1, | |
| }, | |
| ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { | |
| slog.Error("process task failed", "error", err, "type", task.Type(), "payload", string(task.Payload())) | |
| }), | |
| // maximum number of concurrent processing of tasks. | |
| Concurrency: 50, | |
| Logger: logger, | |
| }, | |
| ) | |
| return &RedisTaskProcessor{ | |
| server: server, | |
| emailSender: emailSender, | |
| } | |
| } | |
| func (p *RedisTaskProcessor) Start() error { | |
| mux := asynq.NewServeMux() | |
| mux.HandleFunc(TaskSendVerifyEmail, p.ProcessTaskSendVerifyEmail) | |
| mux.HandleFunc(TaskSendForgotPasswordEmail, p.ProcessTaskSendForgotPasswordEmail) | |
| return p.server.Start(mux) | |
| } | |
| func (p *RedisTaskProcessor) Shutdown() { | |
| p.server.Shutdown() | |
| } | |