File size: 2,838 Bytes
3a25f97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import type http from "node:http";
import { WebSocketServer, type WebSocket } from "ws";

type SerializableValue = object | string | number | boolean | null;

export interface JsonWebSocketSubscription<TStream extends string> {
  stream: TStream;
  topicId: string;
}

interface JsonWebSocketClient<TStream extends string> extends JsonWebSocketSubscription<TStream> {
  ws: WebSocket;
}

export interface JsonWebSocketHubOptions<TStream extends string> {
  server: http.Server;
  path?: string;
  resolveSubscription: (url: URL) => JsonWebSocketSubscription<TStream>;
}

export class JsonWebSocketHub<TStream extends string> {
  private readonly clients = new Set<JsonWebSocketClient<TStream>>();
  private readonly wss: WebSocketServer;

  public constructor(options: JsonWebSocketHubOptions<TStream>) {
    this.wss = new WebSocketServer({ server: options.server, path: options.path ?? "/ws" });

    this.wss.on("connection", (ws, req) => {
      const url = new URL(req.url ?? "/", "http://localhost");
      const subscription = options.resolveSubscription(url);
      const entry: JsonWebSocketClient<TStream> = { ws, ...subscription };
      this.clients.add(entry);

      ws.on("close", () => {
        this.clients.delete(entry);
      });

      ws.on("error", () => {
        this.clients.delete(entry);
      });
    });
  }

  public broadcastWhere(predicate: (client: JsonWebSocketSubscription<TStream>) => boolean, type: string, data: SerializableValue): void {
    const payload = JSON.stringify({ type, data });

    for (const client of this.clients) {
      if (predicate(client)) {
        this.sendPayload(client.ws, payload);
      }
    }
  }

  public broadcastToStream(stream: TStream, type: string, data: SerializableValue): void {
    this.broadcastWhere((client) => client.stream === stream, type, data);
  }

  public broadcastToTopic(stream: TStream, topicId: string, type: string, data: SerializableValue): void {
    this.broadcastWhere((client) => client.stream === stream && client.topicId === topicId, type, data);
  }

  private sendPayload(ws: WebSocket, payload: string): void {
    if (ws.readyState !== ws.OPEN) {
      return;
    }

    try {
      ws.send(payload);
    } catch {
      // Close/error handlers prune broken sockets.
    }
  }
}

export class InMemoryTopicMessageStore<TMessage> {
  private readonly messages = new Map<string, TMessage[]>();

  public constructor(private readonly limit = 100) {}

  public append(topicId: string, message: TMessage): void {
    const history = this.messages.get(topicId) ?? [];
    history.push(message);
    if (history.length > this.limit) {
      history.splice(0, history.length - this.limit);
    }
    this.messages.set(topicId, history);
  }

  public list(topicId: string): TMessage[] {
    return this.messages.get(topicId) ?? [];
  }
}