Spaces:
Build error
Build error
da-policyengine-dev
/
src
/main
/java
/com
/dalab
/policyengine
/kafka
/consumer
/AssetChangeConsumer.java
| package com.dalab.policyengine.kafka.consumer; | |
| import java.util.UUID; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.beans.factory.annotation.Autowired; | |
| import org.springframework.kafka.annotation.KafkaListener; | |
| import org.springframework.messaging.handler.annotation.Payload; | |
| import org.springframework.stereotype.Component; | |
| import org.springframework.util.StringUtils; | |
| // Import the common AssetChangeEvent from da-protos | |
| import com.dalab.common.event.AssetChangeEvent; | |
| import com.dalab.policyengine.service.IPolicyEvaluationService; | |
| public class AssetChangeConsumer { | |
| private static final Logger log = LoggerFactory.getLogger(AssetChangeConsumer.class); | |
| // System UUID for actions triggered by Kafka consumer events | |
| private static final UUID KAFKA_CONSUMER_USER_ID = UUID.fromString("00000000-0000-0000-0000-000000000001"); | |
| private final IPolicyEvaluationService policyEvaluationService; | |
| public AssetChangeConsumer(IPolicyEvaluationService policyEvaluationService) { | |
| this.policyEvaluationService = policyEvaluationService; | |
| } | |
| // Update KafkaListener to consume the common AssetChangeEvent | |
| public void handleAssetChangeEvent( AssetChangeEvent event) { | |
| log.info("Received AssetChangeEvent: AssetId={}, EventType={}", event.getAssetId(), event.getEventType()); | |
| try { | |
| if (event == null || event.getAssetId() == null || !StringUtils.hasText(event.getAssetId())) { | |
| log.error("AssetChangeEvent is null or missing assetId. Skipping."); | |
| return; | |
| } | |
| // The PolicyEvaluationService will now fetch active policies and iterate. | |
| policyEvaluationService.evaluatePolicyForAssetInternal(event, KAFKA_CONSUMER_USER_ID); | |
| } catch (Exception e) { | |
| log.error("Failed to process AssetChangeEvent for assetId {}: {}. Error: {}", | |
| event != null && event.getAssetId() != null ? event.getAssetId() : "unknown", | |
| e.getMessage(), e); | |
| } | |
| } | |
| } |