Spaces:
Runtime error
Runtime error
File size: 4,070 Bytes
4327358 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import {
BeforeApplicationShutdown,
Logger,
LoggerService,
} from '@nestjs/common';
import { sleep } from '@nestjs/terminus/dist/utils';
import {
OnGatewayConnection,
OnGatewayDisconnect,
OnGatewayInit,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import { SessionManager } from '@waha/core/abc/manager.abc';
import { WebSocketAuth } from '@waha/core/auth/WebSocketAuth';
import { WebsocketHeartbeatJob } from '@waha/nestjs/ws/WebsocketHeartbeatJob';
import { WebSocket } from '@waha/nestjs/ws/ws';
import { WAHAEvents, WAHAEventsWild } from '@waha/structures/enums.dto';
import { EventWildUnmask } from '@waha/utils/events';
import { generatePrefixedId } from '@waha/utils/ids';
import { IncomingMessage } from 'http';
import * as url from 'url';
import { Server } from 'ws';
export enum WebSocketCloseCode {
NORMAL = 1000,
GOING_AWAY = 1001,
PROTOCOL_ERROR = 1002,
UNSUPPORTED_DATA = 1003,
POLICY_VIOLATION = 1008,
INTERNAL_ERROR = 1011,
}
@WebSocketGateway({
path: '/ws',
cors: true,
})
export class WebsocketGatewayCore
implements
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
BeforeApplicationShutdown
{
HEARTBEAT_INTERVAL = 60_000;
@WebSocketServer()
server: Server;
private readonly logger: LoggerService;
private heartbeat: WebsocketHeartbeatJob;
private eventUnmask = new EventWildUnmask(WAHAEvents, WAHAEventsWild);
constructor(
private manager: SessionManager,
private auth: WebSocketAuth,
) {
this.logger = new Logger('WebsocketGateway');
this.heartbeat = new WebsocketHeartbeatJob(
this.logger,
this.HEARTBEAT_INTERVAL,
);
}
handleConnection(socket: WebSocket, request: IncomingMessage, ...args): any {
// wsc - websocket client
socket.id = generatePrefixedId('wsc');
if (!this.auth.validateRequest(request)) {
// Not authorized - close connection
socket.close(WebSocketCloseCode.POLICY_VIOLATION, 'Unauthorized');
this.logger.debug(
`Unauthorized websocket connection attempt: ${request.url} - ${socket.id}`,
);
return;
}
this.logger.debug(`New client connected: ${request.url} - ${socket.id}`);
const params = this.getParams(request);
const session: string = params.session;
const events: WAHAEvents[] = params.events;
this.logger.debug(
`Client connected to session: '${session}', events: ${events}, ${socket.id}`,
);
const sub = this.manager
.getSessionEvents(session, events)
.subscribe((data) => {
this.logger.debug(`Sending data to client, event.id: ${data.id}`, data);
socket.send(JSON.stringify(data), (err) => {
if (!err) {
return;
}
this.logger.error(`Error sending data to client: ${err}`);
});
});
socket.on('close', () => {
this.logger.debug(`Client disconnected - ${socket.id}`);
sub.unsubscribe();
});
}
private getParams(request: IncomingMessage) {
const query = url.parse(request.url, true).query;
const session = (query.session as string) || '*';
let paramsEvents = (query.events as string[]) || '*';
// if params events string - split by ","
if (typeof paramsEvents === 'string') {
paramsEvents = paramsEvents.split(',');
}
const events = this.eventUnmask.unmask(paramsEvents);
return { session, events };
}
handleDisconnect(socket: WebSocket): any {
this.logger.debug(`Client disconnected - ${socket.id}`);
}
async beforeApplicationShutdown(signal?: string) {
this.logger.log('Shutting down websocket server');
this.heartbeat?.stop();
// Allow pending messages to be sent, it can be even 1ms, just to release the event loop
await sleep(100);
this.logger.log('Websocket server is down');
}
afterInit(server: Server) {
this.logger.debug('Websocket server initialized');
this.logger.debug('Starting heartbeat service...');
this.heartbeat.start(server);
this.logger.debug('Heartbeat service started');
}
}
|