File size: 2,277 Bytes
9373c61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5cfe5c4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.dalab.policyengine.kafka.consumer;

import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

// Import the common AssetChangeEvent from da-protos
import com.dalab.common.event.AssetChangeEvent;
import com.dalab.policyengine.service.IPolicyEvaluationService;

@Component
public class AssetChangeConsumer {

    private static final Logger log = LoggerFactory.getLogger(AssetChangeConsumer.class);
    // System UUID for actions triggered by Kafka consumer events
    private static final UUID KAFKA_CONSUMER_USER_ID = UUID.fromString("00000000-0000-0000-0000-000000000001"); 

    private final IPolicyEvaluationService policyEvaluationService;

    @Autowired
    public AssetChangeConsumer(IPolicyEvaluationService policyEvaluationService) {
        this.policyEvaluationService = policyEvaluationService;
    }

    // Update KafkaListener to consume the common AssetChangeEvent
    @KafkaListener(topics = "${app.kafka.topic.asset-change-event:asset-change-events}", groupId = "${spring.kafka.consumer.group-id}")
    public void handleAssetChangeEvent(@Payload AssetChangeEvent event) {
        log.info("Received AssetChangeEvent: AssetId={}, EventType={}", event.getAssetId(), event.getEventType());

        try {
            if (event == null || event.getAssetId() == null || !StringUtils.hasText(event.getAssetId())) {
                log.error("AssetChangeEvent is null or missing assetId. Skipping.");
                return;
            }
            // The PolicyEvaluationService will now fetch active policies and iterate.
            policyEvaluationService.evaluatePolicyForAssetInternal(event, KAFKA_CONSUMER_USER_ID);

        } catch (Exception e) {
            log.error("Failed to process AssetChangeEvent for assetId {}: {}. Error: {}", 
                event != null && event.getAssetId() != null ? event.getAssetId() : "unknown", 
                e.getMessage(), e);
        }
    }
}