package com.dalab.policyengine.service; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import org.jeasy.rules.api.Facts; import org.jeasy.rules.api.Rule; import org.jeasy.rules.api.Rules; import org.jeasy.rules.api.RulesEngine; import org.jeasy.rules.core.DefaultRulesEngine; import org.jeasy.rules.mvel.MVELRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; import com.dalab.policyengine.common.ResourceNotFoundException; import com.dalab.policyengine.dto.EventAnalyticsDTO; import com.dalab.policyengine.dto.EventStreamDTO; import com.dalab.policyengine.dto.EventSubscriptionInputDTO; import com.dalab.policyengine.dto.EventSubscriptionOutputDTO; import com.dalab.policyengine.model.EventRule; import com.dalab.policyengine.model.EventSubscription; import com.dalab.policyengine.model.EventSubscriptionStatus; import com.dalab.policyengine.model.EventType; import com.dalab.policyengine.repository.EventRuleRepository; import com.dalab.policyengine.repository.EventSubscriptionRepository; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.persistence.criteria.Predicate; /** * Service implementation for Event Subscription management and event streaming. * Provides operations for creating, managing, and processing event subscriptions. */ @Service @Transactional public class EventSubscriptionService implements IEventSubscriptionService { private static final Logger log = LoggerFactory.getLogger(EventSubscriptionService.class); private final EventSubscriptionRepository eventSubscriptionRepository; private final EventRuleRepository eventRuleRepository; private final ObjectMapper objectMapper; private final RulesEngine rulesEngine; @Autowired public EventSubscriptionService(EventSubscriptionRepository eventSubscriptionRepository, EventRuleRepository eventRuleRepository, ObjectMapper objectMapper) { this.eventSubscriptionRepository = eventSubscriptionRepository; this.eventRuleRepository = eventRuleRepository; this.objectMapper = objectMapper; this.rulesEngine = new DefaultRulesEngine(); } // ==================== SUBSCRIPTION MANAGEMENT ==================== @Override public EventSubscriptionOutputDTO createSubscription(EventSubscriptionInputDTO inputDTO, UUID creatorUserId) { log.info("Creating event subscription for user: {}", creatorUserId); // Validate input validateSubscriptionConfiguration(inputDTO); // Create subscription entity EventSubscription subscription = new EventSubscription(); subscription.setName(inputDTO.getName()); subscription.setDescription(inputDTO.getDescription()); subscription.setUserId(creatorUserId); subscription.setStatus(EventSubscriptionStatus.ACTIVE); subscription.setNotificationChannels(inputDTO.getNotificationChannels()); subscription.setNotificationSettings(inputDTO.getNotificationSettings()); subscription.setCreatedAt(Instant.now()); subscription.setUpdatedAt(Instant.now()); subscription.setCreatedByUserId(creatorUserId); subscription.setUpdatedByUserId(creatorUserId); // Save subscription EventSubscription savedSubscription = eventSubscriptionRepository.save(subscription); // Create and save rules if (inputDTO.getRules() != null && !inputDTO.getRules().isEmpty()) { List rules = inputDTO.getRules().stream() .map(ruleInput -> { EventRule rule = new EventRule(); rule.setSubscriptionId(savedSubscription.getId()); rule.setCondition(ruleInput.getCondition()); rule.setEventTypes(ruleInput.getEventTypes()); rule.setSeverityLevels(ruleInput.getSeverityLevels()); rule.setSourceServices(ruleInput.getSourceServices()); rule.setCreatedAt(Instant.now()); return eventRuleRepository.save(rule); }) .collect(Collectors.toList()); savedSubscription.setRules(rules); } log.info("Successfully created event subscription: {}", savedSubscription.getId()); return convertToOutputDTO(savedSubscription); } @Override public EventSubscriptionOutputDTO updateSubscription(UUID subscriptionId, EventSubscriptionInputDTO inputDTO, UUID updaterUserId) { log.info("Updating event subscription: {}", subscriptionId); EventSubscription subscription = eventSubscriptionRepository.findById(subscriptionId) .orElseThrow(() -> new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString())); // Validate input validateSubscriptionConfiguration(inputDTO); // Update subscription fields subscription.setName(inputDTO.getName()); subscription.setDescription(inputDTO.getDescription()); subscription.setNotificationChannels(inputDTO.getNotificationChannels()); subscription.setNotificationSettings(inputDTO.getNotificationSettings()); subscription.setUpdatedAt(Instant.now()); subscription.setUpdatedByUserId(updaterUserId); // Update rules if (inputDTO.getRules() != null) { // Delete existing rules eventRuleRepository.deleteBySubscriptionId(subscriptionId); // Create new rules List rules = inputDTO.getRules().stream() .map(ruleInput -> { EventRule rule = new EventRule(); rule.setSubscriptionId(subscriptionId); rule.setCondition(ruleInput.getCondition()); rule.setEventTypes(ruleInput.getEventTypes()); rule.setSeverityLevels(ruleInput.getSeverityLevels()); rule.setSourceServices(ruleInput.getSourceServices()); rule.setCreatedAt(Instant.now()); return eventRuleRepository.save(rule); }) .collect(Collectors.toList()); subscription.setRules(rules); } EventSubscription updatedSubscription = eventSubscriptionRepository.save(subscription); log.info("Successfully updated event subscription: {}", subscriptionId); return convertToOutputDTO(updatedSubscription); } @Override @Transactional(readOnly = true) public EventSubscriptionOutputDTO getSubscriptionById(UUID subscriptionId) { EventSubscription subscription = eventSubscriptionRepository.findById(subscriptionId) .orElseThrow(() -> new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString())); return convertToOutputDTO(subscription); } @Override @Transactional(readOnly = true) public Page getSubscriptionsForUser(UUID userId, Pageable pageable, String status, String nameContains) { Specification spec = (root, query, criteriaBuilder) -> { List predicates = new ArrayList<>(); predicates.add(criteriaBuilder.equal(root.get("userId"), userId)); if (StringUtils.hasText(status)) { try { predicates.add(criteriaBuilder.equal(root.get("status"), EventSubscriptionStatus.valueOf(status.toUpperCase()))); } catch (IllegalArgumentException e) { log.warn("Invalid status provided for filtering: {}", status); predicates.add(criteriaBuilder.disjunction()); } } if (StringUtils.hasText(nameContains)) { predicates.add(criteriaBuilder.like(criteriaBuilder.lower(root.get("name")), "%" + nameContains.toLowerCase() + "%")); } return criteriaBuilder.and(predicates.toArray(new Predicate[0])); }; Page subscriptionPage = eventSubscriptionRepository.findAll(spec, pageable); return subscriptionPage.map(this::convertToOutputDTO); } @Override @Transactional(readOnly = true) public Page getAllSubscriptions(Pageable pageable, String status, String nameContains) { Specification spec = (root, query, criteriaBuilder) -> { List predicates = new ArrayList<>(); if (StringUtils.hasText(status)) { try { predicates.add(criteriaBuilder.equal(root.get("status"), EventSubscriptionStatus.valueOf(status.toUpperCase()))); } catch (IllegalArgumentException e) { log.warn("Invalid status provided for filtering: {}", status); predicates.add(criteriaBuilder.disjunction()); } } if (StringUtils.hasText(nameContains)) { predicates.add(criteriaBuilder.like(criteriaBuilder.lower(root.get("name")), "%" + nameContains.toLowerCase() + "%")); } return criteriaBuilder.and(predicates.toArray(new Predicate[0])); }; Page subscriptionPage = eventSubscriptionRepository.findAll(spec, pageable); return subscriptionPage.map(this::convertToOutputDTO); } @Override public void deleteSubscription(UUID subscriptionId) { if (!eventSubscriptionRepository.existsById(subscriptionId)) { throw new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString()); } // Delete associated rules first eventRuleRepository.deleteBySubscriptionId(subscriptionId); // Delete subscription eventSubscriptionRepository.deleteById(subscriptionId); log.info("Successfully deleted event subscription: {}", subscriptionId); } @Override public EventSubscriptionOutputDTO updateSubscriptionStatus(UUID subscriptionId, EventSubscriptionStatus status, UUID updaterUserId) { EventSubscription subscription = eventSubscriptionRepository.findById(subscriptionId) .orElseThrow(() -> new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString())); subscription.setStatus(status); subscription.setUpdatedAt(Instant.now()); subscription.setUpdatedByUserId(updaterUserId); EventSubscription updatedSubscription = eventSubscriptionRepository.save(subscription); log.info("Updated subscription {} status to: {}", subscriptionId, status); return convertToOutputDTO(updatedSubscription); } // ==================== EVENT STREAMING AND PROCESSING ==================== @Override @Transactional(readOnly = true) public List getEventStreamForUser(UUID userId, Integer limit) { log.info("Getting event stream for user: {} with limit: {}", userId, limit); // Get user's active subscriptions List activeSubscriptions = eventSubscriptionRepository .findByUserIdAndStatus(userId, EventSubscriptionStatus.ACTIVE); if (activeSubscriptions.isEmpty()) { return new ArrayList<>(); } // For now, return mock events that would match user's subscriptions // In a real implementation, this would query actual event streams return generateMockEventsForSubscriptions(activeSubscriptions, limit != null ? limit : 50); } @Override @Transactional(readOnly = true) public List getAllEventStream(Integer limit) { log.info("Getting all event stream with limit: {}", limit); // Return mock events for all subscriptions List allSubscriptions = eventSubscriptionRepository.findAll(); return generateMockEventsForSubscriptions(allSubscriptions, limit != null ? limit : 100); } @Override public void processIncomingEvent(EventStreamDTO eventDTO) { log.info("Processing incoming event: {} from service: {}", eventDTO.getEventType(), eventDTO.getSourceService()); // Find all active subscriptions that might match this event List activeSubscriptions = eventSubscriptionRepository .findByStatus(EventSubscriptionStatus.ACTIVE); for (EventSubscription subscription : activeSubscriptions) { try { if (matchesSubscription(eventDTO, subscription)) { log.info("Event matches subscription: {}", subscription.getId()); // In a real implementation, this would trigger notifications // For now, just log the match } } catch (Exception e) { log.error("Error processing event for subscription {}: {}", subscription.getId(), e.getMessage(), e); } } } @Override @Transactional(readOnly = true) public Page getHistoricalEventsForSubscription(UUID subscriptionId, Pageable pageable) { // Verify subscription exists if (!eventSubscriptionRepository.existsById(subscriptionId)) { throw new ResourceNotFoundException("EventSubscription", "id", subscriptionId.toString()); } // For now, return mock historical events // In a real implementation, this would query actual event history List mockEvents = generateMockHistoricalEvents(subscriptionId, pageable.getPageSize()); // Create a simple page implementation return new org.springframework.data.domain.PageImpl<>( mockEvents, pageable, mockEvents.size() ); } // ==================== ANALYTICS AND METRICS ==================== @Override @Transactional(readOnly = true) public EventAnalyticsDTO getEventAnalyticsForUser(UUID userId) { log.info("Getting event analytics for user: {}", userId); List userSubscriptions = eventSubscriptionRepository.findByUserId(userId); EventAnalyticsDTO analytics = new EventAnalyticsDTO(); analytics.setUserId(userId); analytics.setTotalSubscriptions(userSubscriptions.size()); analytics.setActiveSubscriptions((int) userSubscriptions.stream() .filter(s -> s.getStatus() == EventSubscriptionStatus.ACTIVE) .count()); // Add mock metrics EventAnalyticsDTO.EventMetricsDTO metrics = new EventAnalyticsDTO.EventMetricsDTO(); metrics.setTotalEvents(1250); metrics.setEventsToday(45); metrics.setEventsThisWeek(320); metrics.setEventsThisMonth(1250); analytics.setEventMetrics(metrics); // Add mock trends EventAnalyticsDTO.EventTrendsDTO trends = new EventAnalyticsDTO.EventTrendsDTO(); trends.setTrendDirection("increasing"); trends.setTrendPercentage(12.5); trends.setPeakHour("14:00"); trends.setPeakDay("Wednesday"); analytics.setEventTrends(trends); return analytics; } @Override @Transactional(readOnly = true) public EventAnalyticsDTO getSystemEventAnalytics() { log.info("Getting system-wide event analytics"); List allSubscriptions = eventSubscriptionRepository.findAll(); EventAnalyticsDTO analytics = new EventAnalyticsDTO(); analytics.setTotalSubscriptions(allSubscriptions.size()); analytics.setActiveSubscriptions((int) allSubscriptions.stream() .filter(s -> s.getStatus() == EventSubscriptionStatus.ACTIVE) .count()); // Add comprehensive system metrics EventAnalyticsDTO.EventMetricsDTO metrics = new EventAnalyticsDTO.EventMetricsDTO(); metrics.setTotalEvents(8750); metrics.setEventsToday(245); metrics.setEventsThisWeek(1820); metrics.setEventsThisMonth(8750); analytics.setEventMetrics(metrics); // Add system trends EventAnalyticsDTO.EventTrendsDTO trends = new EventAnalyticsDTO.EventTrendsDTO(); trends.setTrendDirection("stable"); trends.setTrendPercentage(2.1); trends.setPeakHour("10:00"); trends.setPeakDay("Tuesday"); analytics.setEventTrends(trends); return analytics; } @Override @Transactional(readOnly = true) public EventAnalyticsDTO getEventAnalyticsForTimeRange(UUID userId, Instant fromTime, Instant toTime) { log.info("Getting event analytics for user: {} from {} to {}", userId, fromTime, toTime); // For now, return mock analytics for the time range // In a real implementation, this would query actual event data EventAnalyticsDTO analytics = new EventAnalyticsDTO(); analytics.setUserId(userId); analytics.setTotalSubscriptions(5); analytics.setActiveSubscriptions(3); EventAnalyticsDTO.EventMetricsDTO metrics = new EventAnalyticsDTO.EventMetricsDTO(); metrics.setTotalEvents(450); metrics.setEventsToday(25); metrics.setEventsThisWeek(180); metrics.setEventsThisMonth(450); analytics.setEventMetrics(metrics); return analytics; } // ==================== UTILITY METHODS ==================== @Override public boolean testEventRule(String ruleCondition, EventStreamDTO sampleEvent) { try { // Create MVEL rule for testing Rule testRule = new MVELRule() .name("Test Rule") .description("Test rule for validation") .when(ruleCondition) .then("true"); Rules rules = new Rules(testRule); Facts facts = new Facts(); // Add event data to facts facts.put("event", sampleEvent); facts.put("eventType", sampleEvent.getEventType()); facts.put("severity", sampleEvent.getSeverity()); facts.put("sourceService", sampleEvent.getSourceService()); facts.put("assetId", sampleEvent.getAssetId()); facts.put("timestamp", sampleEvent.getTimestamp()); // Execute rule rulesEngine.fire(rules, facts); // Check if rule was triggered return facts.get("true") != null; } catch (Exception e) { log.error("Error testing event rule: {}", e.getMessage(), e); return false; } } @Override public List getAvailableEventTypes() { return Arrays.asList(EventType.values()); } @Override public List getAvailableSourceServices() { return Arrays.asList( "da-discovery", "da-catalog", "da-policyengine", "da-reporting", "da-autolabel", "da-autoarchival", "da-autodelete", "da-autocompliance", "da-admin-service" ); } @Override public void validateSubscriptionConfiguration(EventSubscriptionInputDTO inputDTO) { if (inputDTO == null) { throw new IllegalArgumentException("Subscription input cannot be null"); } if (!StringUtils.hasText(inputDTO.getName())) { throw new IllegalArgumentException("Subscription name is required"); } if (inputDTO.getName().length() > 255) { throw new IllegalArgumentException("Subscription name cannot exceed 255 characters"); } if (inputDTO.getNotificationChannels() == null || inputDTO.getNotificationChannels().isEmpty()) { throw new IllegalArgumentException("At least one notification channel is required"); } // Validate rules if present if (inputDTO.getRules() != null) { for (EventSubscriptionInputDTO.RuleInputDTO rule : inputDTO.getRules()) { if (StringUtils.hasText(rule.getCondition())) { // Test the MVEL condition syntax try { new MVELRule() .name("Validation Rule") .description("Rule for validation") .when(rule.getCondition()) .then("true"); } catch (Exception e) { throw new IllegalArgumentException("Invalid rule condition: " + e.getMessage()); } } } } } // ==================== PRIVATE HELPER METHODS ==================== private EventSubscriptionOutputDTO convertToOutputDTO(EventSubscription subscription) { EventSubscriptionOutputDTO outputDTO = new EventSubscriptionOutputDTO(); outputDTO.setId(subscription.getId()); outputDTO.setName(subscription.getName()); outputDTO.setDescription(subscription.getDescription()); outputDTO.setUserId(subscription.getUserId()); outputDTO.setStatus(subscription.getStatus()); outputDTO.setNotificationChannels(subscription.getNotificationChannels()); outputDTO.setNotificationSettings(subscription.getNotificationSettings()); outputDTO.setCreatedAt(subscription.getCreatedAt()); outputDTO.setUpdatedAt(subscription.getUpdatedAt()); outputDTO.setCreatedByUserId(subscription.getCreatedByUserId()); outputDTO.setUpdatedByUserId(subscription.getUpdatedByUserId()); // Convert rules if (subscription.getRules() != null) { List ruleDTOs = subscription.getRules().stream() .map(this::convertRuleToOutputDTO) .collect(Collectors.toList()); outputDTO.setRules(ruleDTOs); } // Add mock statistics EventSubscriptionOutputDTO.SubscriptionStatsDTO stats = new EventSubscriptionOutputDTO.SubscriptionStatsDTO(); stats.setTotalEvents(125); stats.setEventsToday(5); stats.setLastEventAt(Instant.now().minusSeconds(3600)); outputDTO.setStatistics(stats); return outputDTO; } private EventSubscriptionOutputDTO.RuleOutputDTO convertRuleToOutputDTO(EventRule rule) { EventSubscriptionOutputDTO.RuleOutputDTO ruleDTO = new EventSubscriptionOutputDTO.RuleOutputDTO(); ruleDTO.setId(rule.getId()); ruleDTO.setCondition(rule.getCondition()); ruleDTO.setEventTypes(rule.getEventTypes()); ruleDTO.setSeverityLevels(rule.getSeverityLevels()); ruleDTO.setSourceServices(rule.getSourceServices()); ruleDTO.setCreatedAt(rule.getCreatedAt()); return ruleDTO; } private boolean matchesSubscription(EventStreamDTO event, EventSubscription subscription) { if (subscription.getRules() == null || subscription.getRules().isEmpty()) { return true; // No rules means match all events } for (EventRule rule : subscription.getRules()) { try { // Create MVEL rule for matching Rule mvelRule = new MVELRule() .name("Subscription Rule") .description("Rule for subscription matching") .when(rule.getCondition()) .then("true"); Rules rules = new Rules(mvelRule); Facts facts = new Facts(); // Add event data to facts facts.put("event", event); facts.put("eventType", event.getEventType()); facts.put("severity", event.getSeverity()); facts.put("sourceService", event.getSourceService()); facts.put("assetId", event.getAssetId()); facts.put("timestamp", event.getTimestamp()); // Execute rule rulesEngine.fire(rules, facts); // If rule was triggered, event matches if (facts.get("true") != null) { return true; } } catch (Exception e) { log.error("Error matching event against rule {}: {}", rule.getId(), e.getMessage(), e); } } return false; } private List generateMockEventsForSubscriptions(List subscriptions, int limit) { List events = new ArrayList<>(); for (int i = 0; i < Math.min(limit, subscriptions.size() * 3); i++) { EventStreamDTO event = new EventStreamDTO(); event.setId(UUID.randomUUID()); event.setEventType(EventType.ASSET_DISCOVERED); event.setSeverity("INFO"); event.setSourceService("da-discovery"); event.setAssetId("asset-" + (i + 1)); event.setTimestamp(Instant.now().minusSeconds(i * 60)); event.setMessage("Mock event for testing"); event.setMetadata(Map.of("test", "true", "index", String.valueOf(i))); events.add(event); } return events; } private List generateMockHistoricalEvents(UUID subscriptionId, int limit) { List events = new ArrayList<>(); for (int i = 0; i < limit; i++) { EventStreamDTO event = new EventStreamDTO(); event.setId(UUID.randomUUID()); event.setEventType(EventType.POLICY_VIOLATION); event.setSeverity("WARNING"); event.setSourceService("da-policyengine"); event.setAssetId("asset-" + (i + 1)); event.setTimestamp(Instant.now().minusSeconds(i * 3600)); // Historical events event.setMessage("Historical mock event"); event.setMetadata(Map.of("subscriptionId", subscriptionId.toString(), "historical", "true")); events.add(event); } return events; } }