Spaces:
Build error
Build error
File size: 2,136 Bytes
9373c61 5cfe5c4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
package com.dalab.policyengine.kafka.producer;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import com.dalab.policyengine.event.PolicyActionEvent;
@Component
public class PolicyActionKafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(PolicyActionKafkaProducer.class);
private final KafkaTemplate<String, PolicyActionEvent> kafkaTemplate;
@Value("${app.kafka.topic.policy-action-event}")
private String topicName;
public PolicyActionKafkaProducer(KafkaTemplate<String, PolicyActionEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendPolicyActionEvent(PolicyActionEvent event) {
LOGGER.info("Sending PolicyActionEvent with eventId: {} to topic: {}", event.getEventId(), topicName);
String eventKey = event.getTargetAssetId();
if (eventKey == null || eventKey.isEmpty()) {
eventKey = event.getPolicyId();
}
final String finalEventKey = eventKey;
CompletableFuture<SendResult<String, PolicyActionEvent>> future = kafkaTemplate.send(topicName, finalEventKey, event);
future.whenComplete((result, ex) -> {
if (ex == null) {
LOGGER.info(
"Successfully sent PolicyActionEvent [eventId: {}, key: {}] with offset: {}",
event.getEventId(), finalEventKey, result.getRecordMetadata().offset()
);
} else {
LOGGER.error(
"Failed to send PolicyActionEvent [eventId: {}, key: {}]: {}",
event.getEventId(), finalEventKey, ex.getMessage(), ex
);
// TODO: Handle send failure (e.g., retry, DLQ, persistent store for later retry)
}
});
}
} |