Spaces:
Build error
Build error
File size: 1,742 Bytes
9373c61 5cfe5c4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package com.dalab.policyengine.kafka.producer;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class PolicyActionProducer {
private static final Logger log = LoggerFactory.getLogger(PolicyActionProducer.class);
@Value("${app.kafka.topic.policy-action-event:policy-action-events}")
private String policyActionEventTopic;
// Use generic KafkaTemplate since we're sending Map-based events for now
private final KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public PolicyActionProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// Method to send policy action events as Map objects for now
public void sendPolicyActionEvent(Map<String, Object> event) {
try {
String assetId = (String) event.get("assetId");
String actionType = (String) event.get("actionType");
log.info("Sending PolicyActionEvent to topic '{}': AssetId={}, ActionType={}",
policyActionEventTopic, assetId, actionType);
// Use assetId as key for partitioning
kafkaTemplate.send(policyActionEventTopic, assetId, event);
} catch (Exception e) {
String assetId = event != null ? (String) event.get("assetId") : "unknown";
log.error("Error sending PolicyActionEvent to Kafka for assetId {}: {}", assetId, e.getMessage(), e);
}
}
} |