Spaces:
Paused
Paused
| /** | |
| * Connections | |
| * Pokemon Showdown - http://pokemonshowdown.com/ | |
| * | |
| * Abstraction layer for multi-process SockJS connections. | |
| * | |
| * This file handles all the communications between the users' | |
| * browsers, the networking processes, and users.ts in the | |
| * main process. | |
| * | |
| * @license MIT | |
| */ | |
| import * as fs from 'fs'; | |
| import * as http from 'http'; | |
| import * as https from 'https'; | |
| import * as path from 'path'; | |
| import { crashlogger, ProcessManager, Streams, Repl } from '../lib'; | |
| import { IPTools } from './ip-tools'; | |
| import { type ChannelID, extractChannelMessages } from '../sim/battle'; | |
| type StreamWorker = ProcessManager.StreamWorker; | |
| export const Sockets = new class { | |
| async onSpawn(worker: StreamWorker) { | |
| const id = worker.workerid; | |
| for await (const data of worker.stream) { | |
| switch (data.charAt(0)) { | |
| case '*': { | |
| // *socketid, ip, protocol | |
| // connect | |
| worker.load++; | |
| const [socketid, ip, protocol] = data.substr(1).split('\n'); | |
| Users.socketConnect(worker, id, socketid, ip, protocol); | |
| break; | |
| } | |
| case '!': { | |
| // !socketid | |
| // disconnect | |
| worker.load--; | |
| const socketid = data.substr(1); | |
| Users.socketDisconnect(worker, id, socketid); | |
| break; | |
| } | |
| case '<': { | |
| // <socketid, message | |
| // message | |
| const idx = data.indexOf('\n'); | |
| const socketid = data.substr(1, idx - 1); | |
| const message = data.substr(idx + 1); | |
| Users.socketReceive(worker, id, socketid, message); | |
| break; | |
| } | |
| default: | |
| // unhandled | |
| } | |
| } | |
| } | |
| onUnspawn(this: void, worker: StreamWorker) { | |
| Users.socketDisconnectAll(worker, worker.workerid); | |
| } | |
| listen(port?: number, bindAddress?: string, workerCount?: number) { | |
| if (port !== undefined && !isNaN(port)) { | |
| Config.port = port; | |
| Config.ssl = null; | |
| } else { | |
| port = Config.port; | |
| // Autoconfigure when running in cloud environments. | |
| try { | |
| const cloudenv = (require as any)('cloud-env'); | |
| bindAddress = cloudenv.get('IP', bindAddress); | |
| port = cloudenv.get('PORT', port); | |
| } catch {} | |
| } | |
| if (bindAddress !== undefined) { | |
| Config.bindaddress = bindAddress; | |
| } | |
| if (port !== undefined) { | |
| Config.port = port; | |
| } | |
| if (workerCount === undefined) { | |
| workerCount = (Config.workers !== undefined ? Config.workers : 1); | |
| } | |
| PM.env = { PSPORT: Config.port, PSBINDADDR: Config.bindaddress || '0.0.0.0', PSNOSSL: Config.ssl ? 0 : 1 }; | |
| PM.subscribeSpawn(worker => void this.onSpawn(worker)); | |
| PM.subscribeUnspawn(this.onUnspawn); | |
| PM.spawn(workerCount); | |
| } | |
| socketSend(worker: StreamWorker, socketid: string, message: string) { | |
| void worker.stream.write(`>${socketid}\n${message}`); | |
| } | |
| socketDisconnect(worker: StreamWorker, socketid: string) { | |
| void worker.stream.write(`!${socketid}`); | |
| } | |
| roomBroadcast(roomid: RoomID, message: string) { | |
| for (const worker of PM.workers) { | |
| void worker.stream.write(`#${roomid}\n${message}`); | |
| } | |
| } | |
| roomAdd(worker: StreamWorker, roomid: RoomID, socketid: string) { | |
| void worker.stream.write(`+${roomid}\n${socketid}`); | |
| } | |
| roomRemove(worker: StreamWorker, roomid: RoomID, socketid: string) { | |
| void worker.stream.write(`-${roomid}\n${socketid}`); | |
| } | |
| channelBroadcast(roomid: RoomID, message: string) { | |
| for (const worker of PM.workers) { | |
| void worker.stream.write(`:${roomid}\n${message}`); | |
| } | |
| } | |
| channelMove(worker: StreamWorker, roomid: RoomID, channelid: ChannelID, socketid: string) { | |
| void worker.stream.write(`.${roomid}\n${channelid}\n${socketid}`); | |
| } | |
| eval(worker: StreamWorker, query: string) { | |
| void worker.stream.write(`$${query}`); | |
| } | |
| }; | |
| export class ServerStream extends Streams.ObjectReadWriteStream<string> { | |
| /** socketid:Connection */ | |
| sockets = new Map<string, import('sockjs').Connection>(); | |
| /** roomid:socketid:Connection */ | |
| rooms = new Map<RoomID, Map<string, import('sockjs').Connection>>(); | |
| /** roomid:socketid:channelid */ | |
| roomChannels = new Map<RoomID, Map<string, ChannelID>>(); | |
| server: http.Server; | |
| serverSsl: https.Server | null; | |
| socketCounter = 0; | |
| isTrustedProxyIp: (ip: string) => boolean; | |
| receivers: { [k: string]: (this: ServerStream, data: string) => void } = { | |
| '$'(data) { | |
| // $code | |
| // eslint-disable-next-line no-eval | |
| eval(data.substr(1)); | |
| }, | |
| '!'(data) { | |
| // !socketid | |
| // destroy | |
| const socketid = data.substr(1); | |
| const socket = this.sockets.get(socketid); | |
| if (!socket) return; | |
| socket.destroy(); | |
| this.sockets.delete(socketid); | |
| for (const [curRoomid, curRoom] of this.rooms) { | |
| curRoom.delete(socketid); | |
| const roomChannel = this.roomChannels.get(curRoomid); | |
| if (roomChannel) roomChannel.delete(socketid); | |
| if (!curRoom.size) { | |
| this.rooms.delete(curRoomid); | |
| if (roomChannel) this.roomChannels.delete(curRoomid); | |
| } | |
| } | |
| }, | |
| '>'(data) { | |
| // >socketid, message | |
| // message to single connection | |
| const nlLoc = data.indexOf('\n'); | |
| const socketid = data.substr(1, nlLoc - 1); | |
| const socket = this.sockets.get(socketid); | |
| if (!socket) return; | |
| const message = data.substr(nlLoc + 1); | |
| socket.write(message); | |
| }, | |
| '#'(data) { | |
| // #roomid, message | |
| // message to all connections in room | |
| // #, message | |
| // message to all connections | |
| const nlLoc = data.indexOf('\n'); | |
| const roomid = data.substr(1, nlLoc - 1) as RoomID; | |
| const room = roomid ? this.rooms.get(roomid) : this.sockets; | |
| if (!room) return; | |
| const message = data.substr(nlLoc + 1); | |
| for (const curSocket of room.values()) curSocket.write(message); | |
| }, | |
| '+'(data) { | |
| // +roomid, socketid | |
| // join room with connection | |
| const nlLoc = data.indexOf('\n'); | |
| const socketid = data.substr(nlLoc + 1); | |
| const socket = this.sockets.get(socketid); | |
| if (!socket) return; | |
| const roomid = data.substr(1, nlLoc - 1) as RoomID; | |
| let room = this.rooms.get(roomid); | |
| if (!room) { | |
| room = new Map(); | |
| this.rooms.set(roomid, room); | |
| } | |
| room.set(socketid, socket); | |
| }, | |
| '-'(data) { | |
| // -roomid, socketid | |
| // leave room with connection | |
| const nlLoc = data.indexOf('\n'); | |
| const roomid = data.slice(1, nlLoc) as RoomID; | |
| const room = this.rooms.get(roomid); | |
| if (!room) return; | |
| const socketid = data.slice(nlLoc + 1); | |
| room.delete(socketid); | |
| const roomChannel = this.roomChannels.get(roomid); | |
| if (roomChannel) roomChannel.delete(socketid); | |
| if (!room.size) { | |
| this.rooms.delete(roomid); | |
| if (roomChannel) this.roomChannels.delete(roomid); | |
| } | |
| }, | |
| '.'(data) { | |
| // .roomid, channelid, socketid | |
| // move connection to different channel in room | |
| const nlLoc = data.indexOf('\n'); | |
| const roomid = data.slice(1, nlLoc) as RoomID; | |
| const nlLoc2 = data.indexOf('\n', nlLoc + 1); | |
| const channelid = Number(data.slice(nlLoc + 1, nlLoc2)) as ChannelID; | |
| const socketid = data.slice(nlLoc2 + 1); | |
| let roomChannel = this.roomChannels.get(roomid); | |
| if (!roomChannel) { | |
| roomChannel = new Map(); | |
| this.roomChannels.set(roomid, roomChannel); | |
| } | |
| if (channelid === 0) { | |
| roomChannel.delete(socketid); | |
| } else { | |
| roomChannel.set(socketid, channelid); | |
| } | |
| }, | |
| ':'(data) { | |
| // :roomid, message | |
| // message to a room, splitting `|split` by channel | |
| const nlLoc = data.indexOf('\n'); | |
| const roomid = data.slice(1, nlLoc) as RoomID; | |
| const room = this.rooms.get(roomid); | |
| if (!room) return; | |
| const messages: [string | null, string | null, string | null, string | null, string | null] = [ | |
| null, null, null, null, null, | |
| ]; | |
| const message = data.substr(nlLoc + 1); | |
| const channelMessages = extractChannelMessages(message, [0, 1, 2, 3, 4]); | |
| const roomChannel = this.roomChannels.get(roomid); | |
| for (const [curSocketid, curSocket] of room) { | |
| const channelid = roomChannel?.get(curSocketid) || 0; | |
| if (!messages[channelid]) messages[channelid] = channelMessages[channelid].join('\n'); | |
| curSocket.write(messages[channelid]); | |
| } | |
| }, | |
| }; | |
| constructor(config: { | |
| port: number, | |
| bindaddress?: string, | |
| ssl?: typeof Config.ssl, | |
| wsdeflate?: typeof Config.wsdeflate, | |
| proxyip?: typeof Config.proxyip, | |
| customhttpresponse?: typeof Config.customhttpresponse, | |
| disablenodestatic?: boolean, | |
| }) { | |
| super(); | |
| if (!config.bindaddress) config.bindaddress = '0.0.0.0'; | |
| this.isTrustedProxyIp = config.proxyip ? IPTools.checker(config.proxyip) : () => false; | |
| // Static HTTP server | |
| // This handles the custom CSS and custom avatar features, and also | |
| // redirects yourserver:8001 to yourserver-8001.psim.us | |
| // It's optional if you don't need these features. | |
| this.server = http.createServer(); | |
| this.serverSsl = null; | |
| if (config.ssl) { | |
| let key; | |
| try { | |
| key = path.resolve(__dirname, config.ssl.options.key); | |
| if (!fs.statSync(key).isFile()) throw new Error(); | |
| try { | |
| key = fs.readFileSync(key); | |
| } catch (e: any) { | |
| crashlogger( | |
| new Error(`Failed to read the configured SSL private key PEM file:\n${e.stack}`), | |
| `Socket process ${process.pid}` | |
| ); | |
| } | |
| } catch { | |
| console.warn('SSL private key config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.'); | |
| key = config.ssl.options.key; | |
| } | |
| let cert; | |
| try { | |
| cert = path.resolve(__dirname, config.ssl.options.cert); | |
| if (!fs.statSync(cert).isFile()) throw new Error(); | |
| try { | |
| cert = fs.readFileSync(cert); | |
| } catch (e: any) { | |
| crashlogger( | |
| new Error(`Failed to read the configured SSL certificate PEM file:\n${e.stack}`), | |
| `Socket process ${process.pid}` | |
| ); | |
| } | |
| } catch { | |
| console.warn('SSL certificate config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.'); | |
| cert = config.ssl.options.cert; | |
| } | |
| if (key && cert) { | |
| try { | |
| // In case there are additional SSL config settings besides the key and cert... | |
| this.serverSsl = https.createServer({ ...config.ssl.options, key, cert }); | |
| } catch (e: any) { | |
| crashlogger(new Error(`The SSL settings are misconfigured:\n${e.stack}`), `Socket process ${process.pid}`); | |
| } | |
| } | |
| } | |
| // Static server | |
| try { | |
| if (config.disablenodestatic) throw new Error("disablenodestatic"); | |
| const StaticServer: typeof import('node-static').Server = require('node-static').Server; | |
| const roomidRegex = /^\/(?:[A-Za-z0-9][A-Za-z0-9-]*)\/?$/; | |
| const cssServer = new StaticServer('./config'); | |
| const avatarServer = new StaticServer('./config/avatars'); | |
| const staticServer = new StaticServer('./server/static'); | |
| const staticRequestHandler = (req: http.IncomingMessage, res: http.ServerResponse) => { | |
| // console.log(`static rq: ${req.socket.remoteAddress}:${req.socket.remotePort} -> ${req.socket.localAddress}:${req.socket.localPort} - ${req.method} ${req.url} ${req.httpVersion} - ${req.rawHeaders.join('|')}`); | |
| req.resume(); | |
| req.addListener('end', () => { | |
| if (config.customhttpresponse?.(req, res)) { | |
| return; | |
| } | |
| let server = staticServer; | |
| if (req.url) { | |
| if (req.url === '/custom.css') { | |
| server = cssServer; | |
| } else if (req.url.startsWith('/avatars/')) { | |
| req.url = req.url.substr(8); | |
| server = avatarServer; | |
| } else if (roomidRegex.test(req.url)) { | |
| req.url = '/'; | |
| } | |
| } | |
| server.serve(req, res, e => { | |
| if (e && (e as any).status === 404) { | |
| staticServer.serveFile('404.html', 404, {}, req, res); | |
| } | |
| }); | |
| }); | |
| }; | |
| this.server.on('request', staticRequestHandler); | |
| if (this.serverSsl) this.serverSsl.on('request', staticRequestHandler); | |
| } catch (e: any) { | |
| if (e.message === 'disablenodestatic') { | |
| console.log('node-static is disabled'); | |
| } else { | |
| console.log('Could not start node-static - try `npm install` if you want to use it'); | |
| } | |
| } | |
| // SockJS server | |
| // This is the main server that handles users connecting to our server | |
| // and doing things on our server. | |
| const sockjs: typeof import('sockjs') = (require as any)('sockjs'); | |
| const options: import('sockjs').ServerOptions & { faye_server_options?: { [key: string]: any } } = { | |
| sockjs_url: `//play.pokemonshowdown.com/js/lib/sockjs-1.4.0-nwjsfix.min.js`, | |
| prefix: '/showdown', | |
| log(severity: string, message: string) { | |
| if (severity === 'error') console.log(`ERROR: ${message}`); | |
| }, | |
| }; | |
| if (config.wsdeflate !== null) { | |
| try { | |
| const deflate = (require as any)('permessage-deflate').configure(config.wsdeflate); | |
| options.faye_server_options = { extensions: [deflate] }; | |
| } catch { | |
| crashlogger( | |
| new Error("Dependency permessage-deflate is not installed or is otherwise unaccessable. No message compression will take place until server restart."), | |
| "Sockets" | |
| ); | |
| } | |
| } | |
| const server = sockjs.createServer(options); | |
| process.once('disconnect', () => this.cleanup()); | |
| process.once('exit', () => this.cleanup()); | |
| // this is global so it can be hotpatched if necessary | |
| server.on('connection', connection => this.onConnection(connection)); | |
| server.installHandlers(this.server, {}); | |
| this.server.listen(config.port, config.bindaddress); | |
| console.log(`Worker ${PM.workerid} now listening on ${config.bindaddress}:${config.port}`); | |
| if (this.serverSsl) { | |
| server.installHandlers(this.serverSsl, {}); | |
| // @ts-expect-error if appssl exists, then `config.ssl` must also exist | |
| this.serverSsl.listen(config.ssl.port, config.bindaddress); | |
| // @ts-expect-error if appssl exists, then `config.ssl` must also exist | |
| console.log(`Worker ${PM.workerid} now listening for SSL on port ${config.ssl.port}`); | |
| } | |
| console.log(`Test your server at http://${config.bindaddress === '0.0.0.0' ? 'localhost' : config.bindaddress}:${config.port}`); | |
| } | |
| /** | |
| * Clean up any remaining connections on disconnect. If this isn't done, | |
| * the process will not exit until any remaining connections have been destroyed. | |
| * Afterwards, the worker process will die on its own | |
| */ | |
| cleanup() { | |
| for (const socket of this.sockets.values()) { | |
| try { | |
| socket.destroy(); | |
| } catch {} | |
| } | |
| this.sockets.clear(); | |
| this.rooms.clear(); | |
| this.roomChannels.clear(); | |
| this.server.close(); | |
| if (this.serverSsl) this.serverSsl.close(); | |
| // Let the server(s) finish closing. | |
| setImmediate(() => process.exit(0)); | |
| } | |
| onConnection(socket: import('sockjs').Connection) { | |
| // For reasons that are not entirely clear, SockJS sometimes triggers | |
| // this event with a null `socket` argument. | |
| if (!socket) return; | |
| if (!socket.remoteAddress) { | |
| // SockJS sometimes fails to be able to cache the IP, port, and | |
| // address from connection request headers. | |
| try { | |
| socket.destroy(); | |
| } catch {} | |
| return; | |
| } | |
| const socketid = `${++this.socketCounter}`; | |
| this.sockets.set(socketid, socket); | |
| let socketip = socket.remoteAddress; | |
| if (this.isTrustedProxyIp(socketip)) { | |
| const ips = (socket.headers['x-forwarded-for'] || '').split(',').reverse(); | |
| for (const ip of ips) { | |
| const proxy = ip.trim(); | |
| if (!this.isTrustedProxyIp(proxy)) { | |
| socketip = proxy; | |
| break; | |
| } | |
| } | |
| } | |
| this.push(`*${socketid}\n${socketip}\n${socket.protocol}`); | |
| socket.on('data', message => { | |
| // drop empty messages (DDoS?) | |
| if (!message) return; | |
| // drop messages over 100KB | |
| if (message.length > (100 * 1024)) { | |
| socket.write(`|popup|Your message must be below 100KB`); | |
| console.log(`Dropping client message ${message.length / 1024} KB...`); | |
| console.log(message.slice(0, 160)); | |
| return; | |
| } | |
| // drop legacy JSON messages | |
| if (typeof message !== 'string' || message.startsWith('{')) return; | |
| // drop blank messages (DDoS?) | |
| const pipeIndex = message.indexOf('|'); | |
| if (pipeIndex < 0 || pipeIndex === message.length - 1) return; | |
| this.push(`<${socketid}\n${message}`); | |
| }); | |
| socket.once('close', () => { | |
| this.push(`!${socketid}`); | |
| this.sockets.delete(socketid); | |
| for (const room of this.rooms.values()) room.delete(socketid); | |
| }); | |
| } | |
| _write(data: string) { | |
| // console.log('worker received: ' + data); | |
| const receiver = this.receivers[data.charAt(0)]; | |
| if (receiver) receiver.call(this, data); | |
| } | |
| } | |
| /********************************************************* | |
| * Process manager | |
| *********************************************************/ | |
| export const PM = new ProcessManager.RawProcessManager({ | |
| module, | |
| setupChild: () => new ServerStream(Config), | |
| isCluster: true, | |
| }); | |
| if (!PM.isParentProcess) { | |
| // This is a child process! | |
| global.Config = (require as any)('./config-loader').Config; | |
| if (Config.crashguard) { | |
| // graceful crash - allow current battles to finish before restarting | |
| process.on('uncaughtException', err => { | |
| crashlogger(err, `Socket process ${PM.workerid} (${process.pid})`); | |
| }); | |
| process.on('unhandledRejection', err => { | |
| crashlogger(err as any || {}, `Socket process ${PM.workerid} (${process.pid}) Promise`); | |
| }); | |
| } | |
| if (Config.sockets) { | |
| try { | |
| require.resolve('node-oom-heapdump'); | |
| } catch (e: any) { | |
| if (e.code !== 'MODULE_NOT_FOUND') throw e; // should never happen | |
| throw new Error( | |
| 'node-oom-heapdump is not installed, but it is a required dependency if Config.ofesockets is set to true! ' + | |
| 'Run npm install node-oom-heapdump and restart the server.' | |
| ); | |
| } | |
| // Create a heapdump if the process runs out of memory. | |
| (global as any).nodeOomHeapdump = (require as any)('node-oom-heapdump')({ | |
| addTimestamp: true, | |
| }); | |
| } | |
| // setup worker | |
| if (process.env.PSPORT) Config.port = +process.env.PSPORT; | |
| if (process.env.PSBINDADDR) Config.bindaddress = process.env.PSBINDADDR; | |
| if (process.env.PSNOSSL && parseInt(process.env.PSNOSSL)) Config.ssl = null; | |
| // eslint-disable-next-line no-eval | |
| Repl.start(`sockets-${PM.workerid}-${process.pid}`, cmd => eval(cmd)); | |
| } | |