Spaces:
Sleeping
Sleeping
| ; | |
| const net = require('net'), | |
| tls = require('tls'), | |
| EventParser = require('../entities/EventParser.js'), | |
| Message = require('js-message'), | |
| fs = require('fs'), | |
| Queue = require('@node-ipc/js-queue'), | |
| Events = require('event-pubsub'); | |
| let eventParser = new EventParser(); | |
| class Client extends Events{ | |
| constructor(config,log){ | |
| super(); | |
| Object.assign( | |
| this, | |
| { | |
| Client : Client, | |
| config : config, | |
| queue : new Queue, | |
| socket : false, | |
| connect : connect, | |
| emit : emit, | |
| log : log, | |
| retriesRemaining:config.maxRetries||0, | |
| explicitlyDisconnected: false | |
| } | |
| ); | |
| eventParser=new EventParser(this.config); | |
| } | |
| } | |
| function emit(type,data){ | |
| this.log('dispatching event to ', this.id, this.path, ' : ', type, ',', data); | |
| let message=new Message; | |
| message.type=type; | |
| message.data=data; | |
| if(this.config.rawBuffer){ | |
| message=Buffer.from(type,this.config.encoding); | |
| }else{ | |
| message=eventParser.format(message); | |
| } | |
| if(!this.config.sync){ | |
| this.socket.write(message); | |
| return; | |
| } | |
| this.queue.add( | |
| syncEmit.bind(this,message) | |
| ); | |
| } | |
| function syncEmit(message){ | |
| this.log('dispatching event to ', this.id, this.path, ' : ', message); | |
| this.socket.write(message); | |
| } | |
| function connect(){ | |
| //init client object for scope persistance especially inside of socket events. | |
| let client=this; | |
| client.log('requested connection to ', client.id, client.path); | |
| if(!this.path){ | |
| client.log('\n\n######\nerror: ', client.id ,' client has not specified socket path it wishes to connect to.'); | |
| return; | |
| } | |
| const options={}; | |
| if(!client.port){ | |
| client.log('Connecting client on Unix Socket :', client.path); | |
| options.path=client.path; | |
| if (process.platform ==='win32' && !client.path.startsWith('\\\\.\\pipe\\')){ | |
| options.path = options.path.replace(/^\//, ''); | |
| options.path = options.path.replace(/\//g, '-'); | |
| options.path= `\\\\.\\pipe\\${options.path}`; | |
| } | |
| client.socket = net.connect(options); | |
| }else{ | |
| options.host=client.path; | |
| options.port=client.port; | |
| if(client.config.interface.localAddress){ | |
| options.localAddress=client.config.interface.localAddress; | |
| } | |
| if(client.config.interface.localPort){ | |
| options.localPort=client.config.interface.localPort; | |
| } | |
| if(client.config.interface.family){ | |
| options.family=client.config.interface.family; | |
| } | |
| if(client.config.interface.hints){ | |
| options.hints=client.config.interface.hints; | |
| } | |
| if(client.config.interface.lookup){ | |
| options.lookup=client.config.interface.lookup; | |
| } | |
| if(!client.config.tls){ | |
| client.log('Connecting client via TCP to', options); | |
| client.socket = net.connect(options); | |
| }else{ | |
| client.log('Connecting client via TLS to', client.path ,client.port,client.config.tls); | |
| if(client.config.tls.private){ | |
| client.config.tls.key=fs.readFileSync(client.config.tls.private); | |
| } | |
| if(client.config.tls.public){ | |
| client.config.tls.cert=fs.readFileSync(client.config.tls.public); | |
| } | |
| if(client.config.tls.trustedConnections){ | |
| if(typeof client.config.tls.trustedConnections === 'string'){ | |
| client.config.tls.trustedConnections=[client.config.tls.trustedConnections]; | |
| } | |
| client.config.tls.ca=[]; | |
| for(let i=0; i<client.config.tls.trustedConnections.length; i++){ | |
| client.config.tls.ca.push( | |
| fs.readFileSync(client.config.tls.trustedConnections[i]) | |
| ); | |
| } | |
| } | |
| Object.assign(client.config.tls,options); | |
| client.socket = tls.connect( | |
| client.config.tls | |
| ); | |
| } | |
| } | |
| client.socket.setEncoding(this.config.encoding); | |
| client.socket.on( | |
| 'error', | |
| function(err){ | |
| client.log('\n\n######\nerror: ', err); | |
| client.publish('error', err); | |
| } | |
| ); | |
| client.socket.on( | |
| 'connect', | |
| function connectionMade(){ | |
| client.publish('connect'); | |
| client.retriesRemaining=client.config.maxRetries; | |
| client.log('retrying reset'); | |
| } | |
| ); | |
| client.socket.on( | |
| 'close', | |
| function connectionClosed(){ | |
| client.log('connection closed' ,client.id , client.path, | |
| client.retriesRemaining, 'tries remaining of', client.config.maxRetries | |
| ); | |
| if( | |
| client.config.stopRetrying || | |
| client.retriesRemaining<1 || | |
| client.explicitlyDisconnected | |
| ){ | |
| client.publish('disconnect'); | |
| client.log( | |
| (client.config.id), | |
| 'exceeded connection rety amount of', | |
| ' or stopRetrying flag set.' | |
| ); | |
| client.socket.destroy(); | |
| client.publish('destroy'); | |
| client=undefined; | |
| return; | |
| } | |
| setTimeout( | |
| function retryTimeout(){ | |
| if (client.explicitlyDisconnected) { | |
| return; | |
| } | |
| client.retriesRemaining--; | |
| client.connect(); | |
| }.bind(null,client), | |
| client.config.retry | |
| ); | |
| client.publish('disconnect'); | |
| } | |
| ); | |
| client.socket.on( | |
| 'data', | |
| function(data) { | |
| client.log('## received events ##'); | |
| if(client.config.rawBuffer){ | |
| client.publish( | |
| 'data', | |
| Buffer.from(data,client.config.encoding) | |
| ); | |
| if(!client.config.sync){ | |
| return; | |
| } | |
| client.queue.next(); | |
| return; | |
| } | |
| if(!this.ipcBuffer){ | |
| this.ipcBuffer=''; | |
| } | |
| data=(this.ipcBuffer+=data); | |
| if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){ | |
| client.log('Messages are large, You may want to consider smaller messages.'); | |
| return; | |
| } | |
| this.ipcBuffer=''; | |
| const events = eventParser.parse(data); | |
| const eCount = events.length; | |
| for(let i=0; i<eCount; i++){ | |
| let message=new Message; | |
| message.load(events[i]); | |
| client.log('detected event', message.type, message.data); | |
| client.publish( | |
| message.type, | |
| message.data | |
| ); | |
| } | |
| if(!client.config.sync){ | |
| return; | |
| } | |
| client.queue.next(); | |
| } | |
| ); | |
| } | |
| module.exports=Client; | |