Spaces:
Build error
Build error
da-policyengine-dev
/
src
/main
/java
/com
/dalab
/policyengine
/kafka
/producer
/PolicyActionProducer.java
| package com.dalab.policyengine.kafka.producer; | |
| import java.util.Map; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.beans.factory.annotation.Autowired; | |
| import org.springframework.beans.factory.annotation.Value; | |
| import org.springframework.kafka.core.KafkaTemplate; | |
| import org.springframework.stereotype.Service; | |
| public class PolicyActionProducer { | |
| private static final Logger log = LoggerFactory.getLogger(PolicyActionProducer.class); | |
| private String policyActionEventTopic; | |
| // Use generic KafkaTemplate since we're sending Map-based events for now | |
| private final KafkaTemplate<String, Object> kafkaTemplate; | |
| public PolicyActionProducer(KafkaTemplate<String, Object> kafkaTemplate) { | |
| this.kafkaTemplate = kafkaTemplate; | |
| } | |
| // Method to send policy action events as Map objects for now | |
| public void sendPolicyActionEvent(Map<String, Object> event) { | |
| try { | |
| String assetId = (String) event.get("assetId"); | |
| String actionType = (String) event.get("actionType"); | |
| log.info("Sending PolicyActionEvent to topic '{}': AssetId={}, ActionType={}", | |
| policyActionEventTopic, assetId, actionType); | |
| // Use assetId as key for partitioning | |
| kafkaTemplate.send(policyActionEventTopic, assetId, event); | |
| } catch (Exception e) { | |
| String assetId = event != null ? (String) event.get("assetId") : "unknown"; | |
| log.error("Error sending PolicyActionEvent to Kafka for assetId {}: {}", assetId, e.getMessage(), e); | |
| } | |
| } | |
| } |