Spaces:
Paused
Paused
File size: 2,212 Bytes
cf9339a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | /**
* In-memory pub/sub bus for plugin SSE streams.
*
* Workers emit stream events via JSON-RPC notifications. The bus fans out
* each event to all connected SSE clients that match the (pluginId, channel,
* companyId) tuple.
*
* @see PLUGIN_SPEC.md §19.8 — Real-Time Streaming
*/
/** Valid SSE event types for plugin streams. */
export type StreamEventType = "message" | "open" | "close" | "error";
export type StreamSubscriber = (event: unknown, eventType: StreamEventType) => void;
/**
* Composite key for stream subscriptions: pluginId:channel:companyId
*/
function streamKey(pluginId: string, channel: string, companyId: string): string {
return `${pluginId}:${channel}:${companyId}`;
}
export interface PluginStreamBus {
/**
* Subscribe to stream events for a specific (pluginId, channel, companyId).
* Returns an unsubscribe function.
*/
subscribe(
pluginId: string,
channel: string,
companyId: string,
listener: StreamSubscriber,
): () => void;
/**
* Publish an event to all subscribers of (pluginId, channel, companyId).
* Called by the worker manager when it receives a stream notification.
*/
publish(
pluginId: string,
channel: string,
companyId: string,
event: unknown,
eventType?: StreamEventType,
): void;
}
/**
* Create a new PluginStreamBus instance.
*/
export function createPluginStreamBus(): PluginStreamBus {
const subscribers = new Map<string, Set<StreamSubscriber>>();
return {
subscribe(pluginId, channel, companyId, listener) {
const key = streamKey(pluginId, channel, companyId);
let set = subscribers.get(key);
if (!set) {
set = new Set();
subscribers.set(key, set);
}
set.add(listener);
return () => {
set!.delete(listener);
if (set!.size === 0) {
subscribers.delete(key);
}
};
},
publish(pluginId, channel, companyId, event, eventType: StreamEventType = "message") {
const key = streamKey(pluginId, channel, companyId);
const set = subscribers.get(key);
if (!set) return;
for (const listener of set) {
listener(event, eventType);
}
},
};
}
|