File size: 2,136 Bytes
9373c61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5cfe5c4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.dalab.policyengine.kafka.producer;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;

import com.dalab.policyengine.event.PolicyActionEvent;

@Component
public class PolicyActionKafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(PolicyActionKafkaProducer.class);

    private final KafkaTemplate<String, PolicyActionEvent> kafkaTemplate;

    @Value("${app.kafka.topic.policy-action-event}")
    private String topicName;

    public PolicyActionKafkaProducer(KafkaTemplate<String, PolicyActionEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendPolicyActionEvent(PolicyActionEvent event) {
        LOGGER.info("Sending PolicyActionEvent with eventId: {} to topic: {}", event.getEventId(), topicName);

        String eventKey = event.getTargetAssetId();
        if (eventKey == null || eventKey.isEmpty()) {
            eventKey = event.getPolicyId();
        }
        final String finalEventKey = eventKey;

        CompletableFuture<SendResult<String, PolicyActionEvent>> future = kafkaTemplate.send(topicName, finalEventKey, event);

        future.whenComplete((result, ex) -> {
            if (ex == null) {
                LOGGER.info(
                        "Successfully sent PolicyActionEvent [eventId: {}, key: {}] with offset: {}",
                        event.getEventId(), finalEventKey, result.getRecordMetadata().offset()
                );
            } else {
                LOGGER.error(
                        "Failed to send PolicyActionEvent [eventId: {}, key: {}]: {}",
                        event.getEventId(), finalEventKey, ex.getMessage(), ex
                );
                // TODO: Handle send failure (e.g., retry, DLQ, persistent store for later retry)
            }
        });
    }
}