File size: 1,742 Bytes
9373c61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5cfe5c4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.dalab.policyengine.kafka.producer;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class PolicyActionProducer {

    private static final Logger log = LoggerFactory.getLogger(PolicyActionProducer.class);

    @Value("${app.kafka.topic.policy-action-event:policy-action-events}")
    private String policyActionEventTopic;

    // Use generic KafkaTemplate since we're sending Map-based events for now
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public PolicyActionProducer(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // Method to send policy action events as Map objects for now
    public void sendPolicyActionEvent(Map<String, Object> event) {
        try {
            String assetId = (String) event.get("assetId");
            String actionType = (String) event.get("actionType");
            log.info("Sending PolicyActionEvent to topic '{}': AssetId={}, ActionType={}", 
                     policyActionEventTopic, assetId, actionType);
            // Use assetId as key for partitioning
            kafkaTemplate.send(policyActionEventTopic, assetId, event);
        } catch (Exception e) {
            String assetId = event != null ? (String) event.get("assetId") : "unknown";
            log.error("Error sending PolicyActionEvent to Kafka for assetId {}: {}", assetId, e.getMessage(), e);
        }
    }
}