File size: 1,627 Bytes
7beb700
 
 
 
3ccc959
 
 
7beb700
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package worker

import (
	"context"
	"log/slog"

	"api.qobiltu.id/pkg/mail"
	"github.com/hibiken/asynq"
	"github.com/redis/go-redis/v9"
)

const (
	Low      = "low"
	Default  = "default"
	Critical = "critical"
)

type TaskProcessor interface {
	Start() error
	Shutdown()
	ProcessTaskSendVerifyEmail(ctx context.Context, task *asynq.Task) error
	ProcessTaskSendForgotPasswordEmail(ctx context.Context, task *asynq.Task) error
}

type RedisTaskProcessor struct {
	server      *asynq.Server
	emailSender mail.Sender
}

func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, emailSender mail.Sender) TaskProcessor {
	logger := NewLogger()
	redis.SetLogger(logger)

	server := asynq.NewServer(
		redisOpt,
		asynq.Config{
			// priority value. Keys are the names of the queues and values are associated priority value.
			Queues: map[string]int{
				Critical: 6,
				Default:  3,
				Low:      1,
			},
			ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
				slog.Error("process task failed", "error", err, "type", task.Type(), "payload", string(task.Payload()))
			}),
			// maximum number of concurrent processing of tasks.
			Concurrency: 50,
			Logger:      logger,
		},
	)

	return &RedisTaskProcessor{
		server:      server,
		emailSender: emailSender,
	}
}

func (p *RedisTaskProcessor) Start() error {
	mux := asynq.NewServeMux()
	mux.HandleFunc(TaskSendVerifyEmail, p.ProcessTaskSendVerifyEmail)
	mux.HandleFunc(TaskSendForgotPasswordEmail, p.ProcessTaskSendForgotPasswordEmail)
	return p.server.Start(mux)
}

func (p *RedisTaskProcessor) Shutdown() {
	p.server.Shutdown()
}