dalabai's picture
Upload folder using huggingface_hub
9373c61 verified
raw
history blame
27.9 kB
package com.dalab.policyengine.service;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.jeasy.rules.api.Facts;
import org.jeasy.rules.api.Rule;
import org.jeasy.rules.api.Rules;
import org.jeasy.rules.api.RulesEngine;
import org.jeasy.rules.core.DefaultRulesEngine;
import org.jeasy.rules.mvel.MVELRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import com.dalab.policyengine.common.ResourceNotFoundException;
import com.dalab.policyengine.dto.EventAnalyticsDTO;
import com.dalab.policyengine.dto.EventStreamDTO;
import com.dalab.policyengine.dto.EventSubscriptionInputDTO;
import com.dalab.policyengine.dto.EventSubscriptionOutputDTO;
import com.dalab.policyengine.model.EventRule;
import com.dalab.policyengine.model.EventSubscription;
import com.dalab.policyengine.model.EventSubscriptionStatus;
import com.dalab.policyengine.model.EventType;
import com.dalab.policyengine.repository.EventRuleRepository;
import com.dalab.policyengine.repository.EventSubscriptionRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.persistence.criteria.Predicate;
/**
* Service implementation for Event Subscription management and event streaming.
* Provides operations for creating, managing, and processing event subscriptions.
*/
@Service
@Transactional
public class EventSubscriptionService implements IEventSubscriptionService {
private static final Logger log = LoggerFactory.getLogger(EventSubscriptionService.class);
private final EventSubscriptionRepository eventSubscriptionRepository;
private final EventRuleRepository eventRuleRepository;
private final ObjectMapper objectMapper;
private final RulesEngine rulesEngine;
@Autowired
public EventSubscriptionService(EventSubscriptionRepository eventSubscriptionRepository,
EventRuleRepository eventRuleRepository,
ObjectMapper objectMapper) {
this.eventSubscriptionRepository = eventSubscriptionRepository;
this.eventRuleRepository = eventRuleRepository;
this.objectMapper = objectMapper;
this.rulesEngine = new DefaultRulesEngine();
}
// ==================== SUBSCRIPTION MANAGEMENT ====================
@Override
public EventSubscriptionOutputDTO createSubscription(EventSubscriptionInputDTO inputDTO, UUID creatorUserId) {
log.info("Creating event subscription for user: {}", creatorUserId);
// Validate input
validateSubscriptionConfiguration(inputDTO);
// Create subscription entity
EventSubscription subscription = new EventSubscription();
subscription.setName(inputDTO.getName());
subscription.setDescription(inputDTO.getDescription());
subscription.setUserId(creatorUserId);
subscription.setStatus(EventSubscriptionStatus.ACTIVE);
subscription.setNotificationChannels(inputDTO.getNotificationChannels());
subscription.setNotificationSettings(inputDTO.getNotificationSettings());
subscription.setCreatedAt(Instant.now());
subscription.setUpdatedAt(Instant.now());
subscription.setCreatedByUserId(creatorUserId);
subscription.setUpdatedByUserId(creatorUserId);
// Save subscription
EventSubscription savedSubscription = eventSubscriptionRepository.save(subscription);
// Create and save rules
if (inputDTO.getRules() != null && !inputDTO.getRules().isEmpty()) {
List<EventRule> rules = inputDTO.getRules().stream()
.map(ruleInput -> {
EventRule rule = new EventRule();
rule.setSubscriptionId(savedSubscription.getId());
rule.setCondition(ruleInput.getCondition());
rule.setEventTypes(ruleInput.getEventTypes());
rule.setSeverityLevels(ruleInput.getSeverityLevels());
rule.setSourceServices(ruleInput.getSourceServices());
rule.setCreatedAt(Instant.now());
return eventRuleRepository.save(rule);
})
.collect(Collectors.toList());
savedSubscription.setRules(rules);
}
log.info("Successfully created event subscription: {}", savedSubscription.getId());
return convertToOutputDTO(savedSubscription);
}
@Override
public EventSubscriptionOutputDTO updateSubscription(UUID subscriptionId, EventSubscriptionInputDTO inputDTO, UUID updaterUserId) {
log.info("Updating event subscription: {}", subscriptionId);
EventSubscription subscription = eventSubscriptionRepository.findById(subscriptionId)
.orElseThrow(() -> new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString()));
// Validate input
validateSubscriptionConfiguration(inputDTO);
// Update subscription fields
subscription.setName(inputDTO.getName());
subscription.setDescription(inputDTO.getDescription());
subscription.setNotificationChannels(inputDTO.getNotificationChannels());
subscription.setNotificationSettings(inputDTO.getNotificationSettings());
subscription.setUpdatedAt(Instant.now());
subscription.setUpdatedByUserId(updaterUserId);
// Update rules
if (inputDTO.getRules() != null) {
// Delete existing rules
eventRuleRepository.deleteBySubscriptionId(subscriptionId);
// Create new rules
List<EventRule> rules = inputDTO.getRules().stream()
.map(ruleInput -> {
EventRule rule = new EventRule();
rule.setSubscriptionId(subscriptionId);
rule.setCondition(ruleInput.getCondition());
rule.setEventTypes(ruleInput.getEventTypes());
rule.setSeverityLevels(ruleInput.getSeverityLevels());
rule.setSourceServices(ruleInput.getSourceServices());
rule.setCreatedAt(Instant.now());
return eventRuleRepository.save(rule);
})
.collect(Collectors.toList());
subscription.setRules(rules);
}
EventSubscription updatedSubscription = eventSubscriptionRepository.save(subscription);
log.info("Successfully updated event subscription: {}", subscriptionId);
return convertToOutputDTO(updatedSubscription);
}
@Override
@Transactional(readOnly = true)
public EventSubscriptionOutputDTO getSubscriptionById(UUID subscriptionId) {
EventSubscription subscription = eventSubscriptionRepository.findById(subscriptionId)
.orElseThrow(() -> new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString()));
return convertToOutputDTO(subscription);
}
@Override
@Transactional(readOnly = true)
public Page<EventSubscriptionOutputDTO> getSubscriptionsForUser(UUID userId, Pageable pageable, String status, String nameContains) {
Specification<EventSubscription> spec = (root, query, criteriaBuilder) -> {
List<Predicate> predicates = new ArrayList<>();
predicates.add(criteriaBuilder.equal(root.get("userId"), userId));
if (StringUtils.hasText(status)) {
try {
predicates.add(criteriaBuilder.equal(root.get("status"), EventSubscriptionStatus.valueOf(status.toUpperCase())));
} catch (IllegalArgumentException e) {
log.warn("Invalid status provided for filtering: {}", status);
predicates.add(criteriaBuilder.disjunction());
}
}
if (StringUtils.hasText(nameContains)) {
predicates.add(criteriaBuilder.like(criteriaBuilder.lower(root.get("name")),
"%" + nameContains.toLowerCase() + "%"));
}
return criteriaBuilder.and(predicates.toArray(new Predicate[0]));
};
Page<EventSubscription> subscriptionPage = eventSubscriptionRepository.findAll(spec, pageable);
return subscriptionPage.map(this::convertToOutputDTO);
}
@Override
@Transactional(readOnly = true)
public Page<EventSubscriptionOutputDTO> getAllSubscriptions(Pageable pageable, String status, String nameContains) {
Specification<EventSubscription> spec = (root, query, criteriaBuilder) -> {
List<Predicate> predicates = new ArrayList<>();
if (StringUtils.hasText(status)) {
try {
predicates.add(criteriaBuilder.equal(root.get("status"), EventSubscriptionStatus.valueOf(status.toUpperCase())));
} catch (IllegalArgumentException e) {
log.warn("Invalid status provided for filtering: {}", status);
predicates.add(criteriaBuilder.disjunction());
}
}
if (StringUtils.hasText(nameContains)) {
predicates.add(criteriaBuilder.like(criteriaBuilder.lower(root.get("name")),
"%" + nameContains.toLowerCase() + "%"));
}
return criteriaBuilder.and(predicates.toArray(new Predicate[0]));
};
Page<EventSubscription> subscriptionPage = eventSubscriptionRepository.findAll(spec, pageable);
return subscriptionPage.map(this::convertToOutputDTO);
}
@Override
public void deleteSubscription(UUID subscriptionId) {
if (!eventSubscriptionRepository.existsById(subscriptionId)) {
throw new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString());
}
// Delete associated rules first
eventRuleRepository.deleteBySubscriptionId(subscriptionId);
// Delete subscription
eventSubscriptionRepository.deleteById(subscriptionId);
log.info("Successfully deleted event subscription: {}", subscriptionId);
}
@Override
public EventSubscriptionOutputDTO updateSubscriptionStatus(UUID subscriptionId, EventSubscriptionStatus status, UUID updaterUserId) {
EventSubscription subscription = eventSubscriptionRepository.findById(subscriptionId)
.orElseThrow(() -> new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString()));
subscription.setStatus(status);
subscription.setUpdatedAt(Instant.now());
subscription.setUpdatedByUserId(updaterUserId);
EventSubscription updatedSubscription = eventSubscriptionRepository.save(subscription);
log.info("Updated subscription {} status to: {}", subscriptionId, status);
return convertToOutputDTO(updatedSubscription);
}
// ==================== EVENT STREAMING AND PROCESSING ====================
@Override
@Transactional(readOnly = true)
public List<EventStreamDTO> getEventStreamForUser(UUID userId, Integer limit) {
log.info("Getting event stream for user: {} with limit: {}", userId, limit);
// Get user's active subscriptions
List<EventSubscription> activeSubscriptions = eventSubscriptionRepository
.findByUserIdAndStatus(userId, EventSubscriptionStatus.ACTIVE);
if (activeSubscriptions.isEmpty()) {
return new ArrayList<>();
}
// For now, return mock events that would match user's subscriptions
// In a real implementation, this would query actual event streams
return generateMockEventsForSubscriptions(activeSubscriptions, limit != null ? limit : 50);
}
@Override
@Transactional(readOnly = true)
public List<EventStreamDTO> getAllEventStream(Integer limit) {
log.info("Getting all event stream with limit: {}", limit);
// Return mock events for all subscriptions
List<EventSubscription> allSubscriptions = eventSubscriptionRepository.findAll();
return generateMockEventsForSubscriptions(allSubscriptions, limit != null ? limit : 100);
}
@Override
public void processIncomingEvent(EventStreamDTO eventDTO) {
log.info("Processing incoming event: {} from service: {}", eventDTO.getEventType(), eventDTO.getSourceService());
// Find all active subscriptions that might match this event
List<EventSubscription> activeSubscriptions = eventSubscriptionRepository
.findByStatus(EventSubscriptionStatus.ACTIVE);
for (EventSubscription subscription : activeSubscriptions) {
try {
if (matchesSubscription(eventDTO, subscription)) {
log.info("Event matches subscription: {}", subscription.getId());
// In a real implementation, this would trigger notifications
// For now, just log the match
}
} catch (Exception e) {
log.error("Error processing event for subscription {}: {}", subscription.getId(), e.getMessage(), e);
}
}
}
@Override
@Transactional(readOnly = true)
public Page<EventStreamDTO> getHistoricalEventsForSubscription(UUID subscriptionId, Pageable pageable) {
// Verify subscription exists
if (!eventSubscriptionRepository.existsById(subscriptionId)) {
throw new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString());
}
// For now, return mock historical events
// In a real implementation, this would query actual event history
List<EventStreamDTO> mockEvents = generateMockHistoricalEvents(subscriptionId, pageable.getPageSize());
// Create a simple page implementation
return new org.springframework.data.domain.PageImpl<>(
mockEvents,
pageable,
mockEvents.size()
);
}
// ==================== ANALYTICS AND METRICS ====================
@Override
@Transactional(readOnly = true)
public EventAnalyticsDTO getEventAnalyticsForUser(UUID userId) {
log.info("Getting event analytics for user: {}", userId);
List<EventSubscription> userSubscriptions = eventSubscriptionRepository.findByUserId(userId);
EventAnalyticsDTO analytics = new EventAnalyticsDTO();
analytics.setUserId(userId);
analytics.setTotalSubscriptions(userSubscriptions.size());
analytics.setActiveSubscriptions((int) userSubscriptions.stream()
.filter(s -> s.getStatus() == EventSubscriptionStatus.ACTIVE)
.count());
// Add mock metrics
EventAnalyticsDTO.EventMetricsDTO metrics = new EventAnalyticsDTO.EventMetricsDTO();
metrics.setTotalEvents(1250);
metrics.setEventsToday(45);
metrics.setEventsThisWeek(320);
metrics.setEventsThisMonth(1250);
analytics.setEventMetrics(metrics);
// Add mock trends
EventAnalyticsDTO.EventTrendsDTO trends = new EventAnalyticsDTO.EventTrendsDTO();
trends.setTrendDirection("increasing");
trends.setTrendPercentage(12.5);
trends.setPeakHour("14:00");
trends.setPeakDay("Wednesday");
analytics.setEventTrends(trends);
return analytics;
}
@Override
@Transactional(readOnly = true)
public EventAnalyticsDTO getSystemEventAnalytics() {
log.info("Getting system-wide event analytics");
List<EventSubscription> allSubscriptions = eventSubscriptionRepository.findAll();
EventAnalyticsDTO analytics = new EventAnalyticsDTO();
analytics.setTotalSubscriptions(allSubscriptions.size());
analytics.setActiveSubscriptions((int) allSubscriptions.stream()
.filter(s -> s.getStatus() == EventSubscriptionStatus.ACTIVE)
.count());
// Add comprehensive system metrics
EventAnalyticsDTO.EventMetricsDTO metrics = new EventAnalyticsDTO.EventMetricsDTO();
metrics.setTotalEvents(8750);
metrics.setEventsToday(245);
metrics.setEventsThisWeek(1820);
metrics.setEventsThisMonth(8750);
analytics.setEventMetrics(metrics);
// Add system trends
EventAnalyticsDTO.EventTrendsDTO trends = new EventAnalyticsDTO.EventTrendsDTO();
trends.setTrendDirection("stable");
trends.setTrendPercentage(2.1);
trends.setPeakHour("10:00");
trends.setPeakDay("Tuesday");
analytics.setEventTrends(trends);
return analytics;
}
@Override
@Transactional(readOnly = true)
public EventAnalyticsDTO getEventAnalyticsForTimeRange(UUID userId, Instant fromTime, Instant toTime) {
log.info("Getting event analytics for user: {} from {} to {}", userId, fromTime, toTime);
// For now, return mock analytics for the time range
// In a real implementation, this would query actual event data
EventAnalyticsDTO analytics = new EventAnalyticsDTO();
analytics.setUserId(userId);
analytics.setTotalSubscriptions(5);
analytics.setActiveSubscriptions(3);
EventAnalyticsDTO.EventMetricsDTO metrics = new EventAnalyticsDTO.EventMetricsDTO();
metrics.setTotalEvents(450);
metrics.setEventsToday(25);
metrics.setEventsThisWeek(180);
metrics.setEventsThisMonth(450);
analytics.setEventMetrics(metrics);
return analytics;
}
// ==================== UTILITY METHODS ====================
@Override
public boolean testEventRule(String ruleCondition, EventStreamDTO sampleEvent) {
try {
// Create MVEL rule for testing
Rule testRule = new MVELRule()
.name("Test Rule")
.description("Test rule for validation")
.when(ruleCondition)
.then("true");
Rules rules = new Rules(testRule);
Facts facts = new Facts();
// Add event data to facts
facts.put("event", sampleEvent);
facts.put("eventType", sampleEvent.getEventType());
facts.put("severity", sampleEvent.getSeverity());
facts.put("sourceService", sampleEvent.getSourceService());
facts.put("assetId", sampleEvent.getAssetId());
facts.put("timestamp", sampleEvent.getTimestamp());
// Execute rule
rulesEngine.fire(rules, facts);
// Check if rule was triggered
return facts.get("true") != null;
} catch (Exception e) {
log.error("Error testing event rule: {}", e.getMessage(), e);
return false;
}
}
@Override
public List<EventType> getAvailableEventTypes() {
return Arrays.asList(EventType.values());
}
@Override
public List<String> getAvailableSourceServices() {
return Arrays.asList(
"da-discovery",
"da-catalog",
"da-policyengine",
"da-reporting",
"da-autolabel",
"da-autoarchival",
"da-autodelete",
"da-autocompliance",
"da-admin-service"
);
}
@Override
public void validateSubscriptionConfiguration(EventSubscriptionInputDTO inputDTO) {
if (inputDTO == null) {
throw new IllegalArgumentException("Subscription input cannot be null");
}
if (!StringUtils.hasText(inputDTO.getName())) {
throw new IllegalArgumentException("Subscription name is required");
}
if (inputDTO.getName().length() > 255) {
throw new IllegalArgumentException("Subscription name cannot exceed 255 characters");
}
if (inputDTO.getNotificationChannels() == null || inputDTO.getNotificationChannels().isEmpty()) {
throw new IllegalArgumentException("At least one notification channel is required");
}
// Validate rules if present
if (inputDTO.getRules() != null) {
for (EventSubscriptionInputDTO.RuleInputDTO rule : inputDTO.getRules()) {
if (StringUtils.hasText(rule.getCondition())) {
// Test the MVEL condition syntax
try {
new MVELRule()
.name("Validation Rule")
.description("Rule for validation")
.when(rule.getCondition())
.then("true");
} catch (Exception e) {
throw new IllegalArgumentException("Invalid rule condition: " + e.getMessage());
}
}
}
}
}
// ==================== PRIVATE HELPER METHODS ====================
private EventSubscriptionOutputDTO convertToOutputDTO(EventSubscription subscription) {
EventSubscriptionOutputDTO outputDTO = new EventSubscriptionOutputDTO();
outputDTO.setId(subscription.getId());
outputDTO.setName(subscription.getName());
outputDTO.setDescription(subscription.getDescription());
outputDTO.setUserId(subscription.getUserId());
outputDTO.setStatus(subscription.getStatus());
outputDTO.setNotificationChannels(subscription.getNotificationChannels());
outputDTO.setNotificationSettings(subscription.getNotificationSettings());
outputDTO.setCreatedAt(subscription.getCreatedAt());
outputDTO.setUpdatedAt(subscription.getUpdatedAt());
outputDTO.setCreatedByUserId(subscription.getCreatedByUserId());
outputDTO.setUpdatedByUserId(subscription.getUpdatedByUserId());
// Convert rules
if (subscription.getRules() != null) {
List<EventSubscriptionOutputDTO.RuleOutputDTO> ruleDTOs = subscription.getRules().stream()
.map(this::convertRuleToOutputDTO)
.collect(Collectors.toList());
outputDTO.setRules(ruleDTOs);
}
// Add mock statistics
EventSubscriptionOutputDTO.SubscriptionStatsDTO stats = new EventSubscriptionOutputDTO.SubscriptionStatsDTO();
stats.setTotalEvents(125);
stats.setEventsToday(5);
stats.setLastEventAt(Instant.now().minusSeconds(3600));
outputDTO.setStatistics(stats);
return outputDTO;
}
private EventSubscriptionOutputDTO.RuleOutputDTO convertRuleToOutputDTO(EventRule rule) {
EventSubscriptionOutputDTO.RuleOutputDTO ruleDTO = new EventSubscriptionOutputDTO.RuleOutputDTO();
ruleDTO.setId(rule.getId());
ruleDTO.setCondition(rule.getCondition());
ruleDTO.setEventTypes(rule.getEventTypes());
ruleDTO.setSeverityLevels(rule.getSeverityLevels());
ruleDTO.setSourceServices(rule.getSourceServices());
ruleDTO.setCreatedAt(rule.getCreatedAt());
return ruleDTO;
}
private boolean matchesSubscription(EventStreamDTO event, EventSubscription subscription) {
if (subscription.getRules() == null || subscription.getRules().isEmpty()) {
return true; // No rules means match all events
}
for (EventRule rule : subscription.getRules()) {
try {
// Create MVEL rule for matching
Rule mvelRule = new MVELRule()
.name("Subscription Rule")
.description("Rule for subscription matching")
.when(rule.getCondition())
.then("true");
Rules rules = new Rules(mvelRule);
Facts facts = new Facts();
// Add event data to facts
facts.put("event", event);
facts.put("eventType", event.getEventType());
facts.put("severity", event.getSeverity());
facts.put("sourceService", event.getSourceService());
facts.put("assetId", event.getAssetId());
facts.put("timestamp", event.getTimestamp());
// Execute rule
rulesEngine.fire(rules, facts);
// If rule was triggered, event matches
if (facts.get("true") != null) {
return true;
}
} catch (Exception e) {
log.error("Error matching event against rule {}: {}", rule.getId(), e.getMessage(), e);
}
}
return false;
}
private List<EventStreamDTO> generateMockEventsForSubscriptions(List<EventSubscription> subscriptions, int limit) {
List<EventStreamDTO> events = new ArrayList<>();
for (int i = 0; i < Math.min(limit, subscriptions.size() * 3); i++) {
EventStreamDTO event = new EventStreamDTO();
event.setId(UUID.randomUUID());
event.setEventType(EventType.ASSET_DISCOVERED);
event.setSeverity("INFO");
event.setSourceService("da-discovery");
event.setAssetId("asset-" + (i + 1));
event.setTimestamp(Instant.now().minusSeconds(i * 60));
event.setMessage("Mock event for testing");
event.setMetadata(Map.of("test", "true", "index", String.valueOf(i)));
events.add(event);
}
return events;
}
private List<EventStreamDTO> generateMockHistoricalEvents(UUID subscriptionId, int limit) {
List<EventStreamDTO> events = new ArrayList<>();
for (int i = 0; i < limit; i++) {
EventStreamDTO event = new EventStreamDTO();
event.setId(UUID.randomUUID());
event.setEventType(EventType.POLICY_VIOLATION);
event.setSeverity("WARNING");
event.setSourceService("da-policyengine");
event.setAssetId("asset-" + (i + 1));
event.setTimestamp(Instant.now().minusSeconds(i * 3600)); // Historical events
event.setMessage("Historical mock event");
event.setMetadata(Map.of("subscriptionId", subscriptionId.toString(), "historical", "true"));
events.add(event);
}
return events;
}
}