Spaces:
Configuration error
Configuration error
File size: 1,622 Bytes
48471f7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | package worker
import (
"api.qobiltu.id/mail"
"context"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"log/slog"
)
const (
Low = "low"
Default = "default"
Critical = "critical"
)
type TaskProcessor interface {
Start() error
Shutdown()
ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error
ProcessTaskSendForgotPasswordEmail(ctx context.Context, task *asynq.Task) error
}
type RedisTaskProcessor struct {
server *asynq.Server
emailSender mail.Sender
}
func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, emailSender mail.Sender) TaskProcessor {
logger := NewLogger()
redis.SetLogger(logger)
server := asynq.NewServer(
redisOpt,
asynq.Config{
// priority value. Keys are the names of the queues and values are associated priority value.
Queues: map[string]int{
Critical: 6,
Default: 3,
Low: 1,
},
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
slog.Error("process task failed", "error", err, "type", task.Type(), "payload", string(task.Payload()))
}),
// maximum number of concurrent processing of tasks.
Concurrency: 50,
Logger: logger,
},
)
return &RedisTaskProcessor{
server: server,
emailSender: emailSender,
}
}
func (p *RedisTaskProcessor) Start() error {
mux := asynq.NewServeMux()
mux.HandleFunc(TaskSendVerifyEmail, p.ProcessTaskSendVerifyEmail)
mux.HandleFunc(TaskSendForgotPasswordEmail, p.ProcessTaskSendForgotPasswordEmail)
return p.server.Start(mux)
}
func (p *RedisTaskProcessor) Shutdown() {
p.server.Shutdown()
}
|