package com.dalab.policyengine.kafka.producer; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class PolicyActionProducer { private static final Logger log = LoggerFactory.getLogger(PolicyActionProducer.class); @Value("${app.kafka.topic.policy-action-event:policy-action-events}") private String policyActionEventTopic; // Use generic KafkaTemplate since we're sending Map-based events for now private final KafkaTemplate kafkaTemplate; @Autowired public PolicyActionProducer(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // Method to send policy action events as Map objects for now public void sendPolicyActionEvent(Map event) { try { String assetId = (String) event.get("assetId"); String actionType = (String) event.get("actionType"); log.info("Sending PolicyActionEvent to topic '{}': AssetId={}, ActionType={}", policyActionEventTopic, assetId, actionType); // Use assetId as key for partitioning kafkaTemplate.send(policyActionEventTopic, assetId, event); } catch (Exception e) { String assetId = event != null ? (String) event.get("assetId") : "unknown"; log.error("Error sending PolicyActionEvent to Kafka for assetId {}: {}", assetId, e.getMessage(), e); } } }