dalabai's picture
Upload folder using huggingface_hub
9373c61 verified
raw
history blame
2.28 kB
package com.dalab.policyengine.kafka.consumer;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
// Import the common AssetChangeEvent from da-protos
import com.dalab.common.event.AssetChangeEvent;
import com.dalab.policyengine.service.IPolicyEvaluationService;
@Component
public class AssetChangeConsumer {
private static final Logger log = LoggerFactory.getLogger(AssetChangeConsumer.class);
// System UUID for actions triggered by Kafka consumer events
private static final UUID KAFKA_CONSUMER_USER_ID = UUID.fromString("00000000-0000-0000-0000-000000000001");
private final IPolicyEvaluationService policyEvaluationService;
@Autowired
public AssetChangeConsumer(IPolicyEvaluationService policyEvaluationService) {
this.policyEvaluationService = policyEvaluationService;
}
// Update KafkaListener to consume the common AssetChangeEvent
@KafkaListener(topics = "${app.kafka.topic.asset-change-event:asset-change-events}", groupId = "${spring.kafka.consumer.group-id}")
public void handleAssetChangeEvent(@Payload AssetChangeEvent event) {
log.info("Received AssetChangeEvent: AssetId={}, EventType={}", event.getAssetId(), event.getEventType());
try {
if (event == null || event.getAssetId() == null || !StringUtils.hasText(event.getAssetId())) {
log.error("AssetChangeEvent is null or missing assetId. Skipping.");
return;
}
// The PolicyEvaluationService will now fetch active policies and iterate.
policyEvaluationService.evaluatePolicyForAssetInternal(event, KAFKA_CONSUMER_USER_ID);
} catch (Exception e) {
log.error("Failed to process AssetChangeEvent for assetId {}: {}. Error: {}",
event != null && event.getAssetId() != null ? event.getAssetId() : "unknown",
e.getMessage(), e);
}
}
}