package com.dalab.policyengine.kafka.consumer; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; // Import the common AssetChangeEvent from da-protos import com.dalab.common.event.AssetChangeEvent; import com.dalab.policyengine.service.IPolicyEvaluationService; @Component public class AssetChangeConsumer { private static final Logger log = LoggerFactory.getLogger(AssetChangeConsumer.class); // System UUID for actions triggered by Kafka consumer events private static final UUID KAFKA_CONSUMER_USER_ID = UUID.fromString("00000000-0000-0000-0000-000000000001"); private final IPolicyEvaluationService policyEvaluationService; @Autowired public AssetChangeConsumer(IPolicyEvaluationService policyEvaluationService) { this.policyEvaluationService = policyEvaluationService; } // Update KafkaListener to consume the common AssetChangeEvent @KafkaListener(topics = "${app.kafka.topic.asset-change-event:asset-change-events}", groupId = "${spring.kafka.consumer.group-id}") public void handleAssetChangeEvent(@Payload AssetChangeEvent event) { log.info("Received AssetChangeEvent: AssetId={}, EventType={}", event.getAssetId(), event.getEventType()); try { if (event == null || event.getAssetId() == null || !StringUtils.hasText(event.getAssetId())) { log.error("AssetChangeEvent is null or missing assetId. Skipping."); return; } // The PolicyEvaluationService will now fetch active policies and iterate. policyEvaluationService.evaluatePolicyForAssetInternal(event, KAFKA_CONSUMER_USER_ID); } catch (Exception e) { log.error("Failed to process AssetChangeEvent for assetId {}: {}. Error: {}", event != null && event.getAssetId() != null ? event.getAssetId() : "unknown", e.getMessage(), e); } } }