package com.dalab.policyengine.kafka.producer; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import com.dalab.policyengine.event.PolicyActionEvent; @Component public class PolicyActionKafkaProducer { private static final Logger LOGGER = LoggerFactory.getLogger(PolicyActionKafkaProducer.class); private final KafkaTemplate kafkaTemplate; @Value("${app.kafka.topic.policy-action-event}") private String topicName; public PolicyActionKafkaProducer(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendPolicyActionEvent(PolicyActionEvent event) { LOGGER.info("Sending PolicyActionEvent with eventId: {} to topic: {}", event.getEventId(), topicName); String eventKey = event.getTargetAssetId(); if (eventKey == null || eventKey.isEmpty()) { eventKey = event.getPolicyId(); } final String finalEventKey = eventKey; CompletableFuture> future = kafkaTemplate.send(topicName, finalEventKey, event); future.whenComplete((result, ex) -> { if (ex == null) { LOGGER.info( "Successfully sent PolicyActionEvent [eventId: {}, key: {}] with offset: {}", event.getEventId(), finalEventKey, result.getRecordMetadata().offset() ); } else { LOGGER.error( "Failed to send PolicyActionEvent [eventId: {}, key: {}]: {}", event.getEventId(), finalEventKey, ex.getMessage(), ex ); // TODO: Handle send failure (e.g., retry, DLQ, persistent store for later retry) } }); } }