api-qobiltu-dev / pkg /worker /processor.go
lifedebugger's picture
Deploy files from GitHub repository
3ccc959
package worker
import (
"context"
"log/slog"
"api.qobiltu.id/pkg/mail"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)
const (
Low = "low"
Default = "default"
Critical = "critical"
)
type TaskProcessor interface {
Start() error
Shutdown()
ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error
ProcessTaskSendForgotPasswordEmail(ctx context.Context, task *asynq.Task) error
}
type RedisTaskProcessor struct {
server *asynq.Server
emailSender mail.Sender
}
func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, emailSender mail.Sender) TaskProcessor {
logger := NewLogger()
redis.SetLogger(logger)
server := asynq.NewServer(
redisOpt,
asynq.Config{
// priority value. Keys are the names of the queues and values are associated priority value.
Queues: map[string]int{
Critical: 6,
Default: 3,
Low: 1,
},
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
slog.Error("process task failed", "error", err, "type", task.Type(), "payload", string(task.Payload()))
}),
// maximum number of concurrent processing of tasks.
Concurrency: 50,
Logger: logger,
},
)
return &RedisTaskProcessor{
server: server,
emailSender: emailSender,
}
}
func (p *RedisTaskProcessor) Start() error {
mux := asynq.NewServeMux()
mux.HandleFunc(TaskSendVerifyEmail, p.ProcessTaskSendVerifyEmail)
mux.HandleFunc(TaskSendForgotPasswordEmail, p.ProcessTaskSendForgotPasswordEmail)
return p.server.Start(mux)
}
func (p *RedisTaskProcessor) Shutdown() {
p.server.Shutdown()
}