File size: 3,052 Bytes
529090e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import EventEmitter from 'eventemitter3';
import type { Response } from 'express';
import { randomUUID } from 'crypto';
import { getSecurityIntegrationConfig } from '../../config/securityConfig.js';
import {
  persistActivityEvent,
  listActivityEvents,
} from './securityRepository.js';
import type { EventCategory, SecurityActivityEvent, ThreatLevel } from './securityTypes.js';

type ActivityEvents = {
  event: [SecurityActivityEvent];
};

const emitter = new EventEmitter<ActivityEvents>();
const clients = new Map<string, Response>();
const heartbeatInterval = getSecurityIntegrationConfig().registry.streamHeartbeatMs;

setInterval(() => {
  for (const res of clients.values()) {
    res.write(`event: security-heartbeat\ndata: ${JSON.stringify({ timestamp: new Date().toISOString() })}\n\n`);
  }
}, heartbeatInterval);

interface SseOptions {
  severity?: string;
  category?: string;
}

export function registerActivityStream(res: Response, options: SseOptions = {}): void {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.flushHeaders();

  const clientId = randomUUID();
  clients.set(clientId, res);

  // Send initial payload
  const recent = listActivityEvents({
    severity: options.severity && options.severity !== 'all' ? options.severity : undefined,
    category: options.category && options.category !== 'all' ? options.category : undefined,
    limit: 25,
  });
  res.write(`event: security-bootstrap\ndata: ${JSON.stringify(recent)}\n\n`);

  const handler = (event: SecurityActivityEvent) => {
    const matchesSeverity = !options.severity || options.severity === 'all' || event.severity === options.severity;
    const matchesCategory = !options.category || options.category === 'all' || event.category === options.category;
    if (matchesSeverity && matchesCategory) {
      res.write(`event: security-activity\ndata: ${JSON.stringify(event)}\n\n`);
    }
  };

  emitter.on('event', handler);

  res.on('close', () => {
    emitter.off('event', handler);
    clients.delete(clientId);
  });
}

interface RegistryEventPayload {
  id?: string;
  title: string;
  description: string;
  category: EventCategory;
  severity: ThreatLevel;
  source: string;
  rule?: string;
  channel?: 'SSE' | 'Webhook' | 'Job';
  payload?: Record<string, unknown>;
  createdAt?: string;
  acknowledged?: boolean;
}

export function ingestRegistryEvent(payload: RegistryEventPayload): SecurityActivityEvent {
  const event: SecurityActivityEvent = {
    id: payload.id ?? `evt-${Date.now()}`,
    title: payload.title,
    description: payload.description,
    category: payload.category,
    severity: payload.severity,
    source: payload.source,
    rule: payload.rule,
    channel: payload.channel ?? 'SSE',
    payload: payload.payload,
    createdAt: payload.createdAt ?? new Date().toISOString(),
    acknowledged: payload.acknowledged ?? false,
  };

  persistActivityEvent(event);
  emitter.emit('event', event);
  return event;
}