dalabai's picture
Upload folder using huggingface_hub
9373c61 verified
raw
history blame
1.74 kB
package com.dalab.policyengine.kafka.producer;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class PolicyActionProducer {
private static final Logger log = LoggerFactory.getLogger(PolicyActionProducer.class);
@Value("${app.kafka.topic.policy-action-event:policy-action-events}")
private String policyActionEventTopic;
// Use generic KafkaTemplate since we're sending Map-based events for now
private final KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public PolicyActionProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// Method to send policy action events as Map objects for now
public void sendPolicyActionEvent(Map<String, Object> event) {
try {
String assetId = (String) event.get("assetId");
String actionType = (String) event.get("actionType");
log.info("Sending PolicyActionEvent to topic '{}': AssetId={}, ActionType={}",
policyActionEventTopic, assetId, actionType);
// Use assetId as key for partitioning
kafkaTemplate.send(policyActionEventTopic, assetId, event);
} catch (Exception e) {
String assetId = event != null ? (String) event.get("assetId") : "unknown";
log.error("Error sending PolicyActionEvent to Kafka for assetId {}: {}", assetId, e.getMessage(), e);
}
}
}