Spaces:
Build error
Build error
da-policyengine-dev
/
src
/main
/java
/com
/dalab
/policyengine
/kafka
/producer
/PolicyActionKafkaProducer.java
| package com.dalab.policyengine.kafka.producer; | |
| import java.util.concurrent.CompletableFuture; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.beans.factory.annotation.Value; | |
| import org.springframework.kafka.core.KafkaTemplate; | |
| import org.springframework.kafka.support.SendResult; | |
| import org.springframework.stereotype.Component; | |
| import com.dalab.policyengine.event.PolicyActionEvent; | |
| public class PolicyActionKafkaProducer { | |
| private static final Logger LOGGER = LoggerFactory.getLogger(PolicyActionKafkaProducer.class); | |
| private final KafkaTemplate<String, PolicyActionEvent> kafkaTemplate; | |
| private String topicName; | |
| public PolicyActionKafkaProducer(KafkaTemplate<String, PolicyActionEvent> kafkaTemplate) { | |
| this.kafkaTemplate = kafkaTemplate; | |
| } | |
| public void sendPolicyActionEvent(PolicyActionEvent event) { | |
| LOGGER.info("Sending PolicyActionEvent with eventId: {} to topic: {}", event.getEventId(), topicName); | |
| String eventKey = event.getTargetAssetId(); | |
| if (eventKey == null || eventKey.isEmpty()) { | |
| eventKey = event.getPolicyId(); | |
| } | |
| final String finalEventKey = eventKey; | |
| CompletableFuture<SendResult<String, PolicyActionEvent>> future = kafkaTemplate.send(topicName, finalEventKey, event); | |
| future.whenComplete((result, ex) -> { | |
| if (ex == null) { | |
| LOGGER.info( | |
| "Successfully sent PolicyActionEvent [eventId: {}, key: {}] with offset: {}", | |
| event.getEventId(), finalEventKey, result.getRecordMetadata().offset() | |
| ); | |
| } else { | |
| LOGGER.error( | |
| "Failed to send PolicyActionEvent [eventId: {}, key: {}]: {}", | |
| event.getEventId(), finalEventKey, ex.getMessage(), ex | |
| ); | |
| // TODO: Handle send failure (e.g., retry, DLQ, persistent store for later retry) | |
| } | |
| }); | |
| } | |
| } |