waha / src /core /manager.core.ts
NitinBot002's picture
Upload 384 files
4327358 verified
import {
Inject,
Injectable,
NotFoundException,
OnModuleInit,
UnprocessableEntityException,
} from '@nestjs/common';
import {
AppsService,
IAppsService,
} from '@waha/apps/app_sdk/services/IAppsService';
import { EngineBootstrap } from '@waha/core/abc/EngineBootstrap';
import { GowsEngineConfigService } from '@waha/core/config/GowsEngineConfigService';
import { WebJSEngineConfigService } from '@waha/core/config/WebJSEngineConfigService';
import { WhatsappSessionGoWSCore } from '@waha/core/engines/gows/session.gows.core';
import { WebhookConductor } from '@waha/core/integrations/webhooks/WebhookConductor';
import { MediaStorageFactory } from '@waha/core/media/MediaStorageFactory';
import { DefaultMap } from '@waha/utils/DefaultMap';
import { getPinoLogLevel, LoggerBuilder } from '@waha/utils/logging';
import { promiseTimeout, sleep } from '@waha/utils/promiseTimeout';
import { complete } from '@waha/utils/reactive/complete';
import { SwitchObservable } from '@waha/utils/reactive/SwitchObservable';
import { PinoLogger } from 'nestjs-pino';
import { Observable, retry, share } from 'rxjs';
import { map } from 'rxjs/operators';
import { WhatsappConfigService } from '../config.service';
import {
WAHAEngine,
WAHAEvents,
WAHASessionStatus,
} from '../structures/enums.dto';
import {
ProxyConfig,
SessionConfig,
SessionDetailedInfo,
SessionDTO,
SessionInfo,
} from '../structures/sessions.dto';
import { WebhookConfig } from '../structures/webhooks.config.dto';
import { populateSessionInfo, SessionManager } from './abc/manager.abc';
import { SessionParams, WhatsappSession } from './abc/session.abc';
import { EngineConfigService } from './config/EngineConfigService';
import { WhatsappSessionNoWebCore } from './engines/noweb/session.noweb.core';
import { WhatsappSessionWebJSCore } from './engines/webjs/session.webjs.core';
import { DOCS_URL } from './exceptions';
import { getProxyConfig } from './helpers.proxy';
import { MediaManager } from './media/MediaManager';
import { LocalSessionAuthRepository } from './storage/LocalSessionAuthRepository';
import { LocalStoreCore } from './storage/LocalStoreCore';
export class OnlyDefaultSessionIsAllowed extends UnprocessableEntityException {
constructor(name: string) {
const encoded = Buffer.from(name, 'utf-8').toString('base64');
super(
`WAHA Core support only 'default' session. You tried to access '${name}' session (base64: ${encoded}). ` +
`If you want to run more then one WhatsApp account - please get WAHA PLUS version. Check this out: ${DOCS_URL}`,
);
}
}
enum DefaultSessionStatus {
REMOVED = undefined,
STOPPED = null,
}
@Injectable()
export class SessionManagerCore extends SessionManager implements OnModuleInit {
SESSION_STOP_TIMEOUT = 3000;
// session - exists and running (or failed or smth)
// null - stopped
// undefined - removed
private session: WhatsappSession | DefaultSessionStatus;
private sessionConfig?: SessionConfig;
DEFAULT = 'default';
protected readonly EngineClass: typeof WhatsappSession;
protected events2: DefaultMap<WAHAEvents, SwitchObservable<any>>;
protected readonly engineBootstrap: EngineBootstrap;
constructor(
config: WhatsappConfigService,
private engineConfigService: EngineConfigService,
private webjsEngineConfigService: WebJSEngineConfigService,
gowsConfigService: GowsEngineConfigService,
log: PinoLogger,
private mediaStorageFactory: MediaStorageFactory,
@Inject(AppsService)
appsService: IAppsService,
) {
super(log, config, gowsConfigService, appsService);
this.session = DefaultSessionStatus.STOPPED;
this.sessionConfig = null;
const engineName = this.engineConfigService.getDefaultEngineName();
this.EngineClass = this.getEngine(engineName);
this.engineBootstrap = this.getEngineBootstrap(engineName);
this.events2 = new DefaultMap<WAHAEvents, SwitchObservable<any>>(
(key) =>
new SwitchObservable((obs$) => {
return obs$.pipe(retry(), share());
}),
);
this.store = new LocalStoreCore(engineName.toLowerCase());
this.sessionAuthRepository = new LocalSessionAuthRepository(this.store);
this.clearStorage().catch((error) => {
this.log.error({ error }, 'Error while clearing storage');
});
}
protected getEngine(engine: WAHAEngine): typeof WhatsappSession {
if (engine === WAHAEngine.WEBJS) {
return WhatsappSessionWebJSCore;
} else if (engine === WAHAEngine.NOWEB) {
return WhatsappSessionNoWebCore;
} else if (engine === WAHAEngine.GOWS) {
return WhatsappSessionGoWSCore;
} else {
throw new NotFoundException(`Unknown whatsapp engine '${engine}'.`);
}
}
private onlyDefault(name: string) {
if (name !== this.DEFAULT) {
throw new OnlyDefaultSessionIsAllowed(name);
}
}
async beforeApplicationShutdown(signal?: string) {
if (this.session) {
await this.stop(this.DEFAULT, true);
}
this.stopEvents();
await this.engineBootstrap.shutdown();
}
async onApplicationBootstrap() {
await this.engineBootstrap.bootstrap();
this.startPredefinedSessions();
}
private async clearStorage() {
const storage = await this.mediaStorageFactory.build(
'all',
this.log.logger.child({ name: 'Storage' }),
);
await storage.purge();
}
//
// API Methods
//
async exists(name: string): Promise<boolean> {
this.onlyDefault(name);
return this.session !== DefaultSessionStatus.REMOVED;
}
isRunning(name: string): boolean {
this.onlyDefault(name);
return !!this.session;
}
async upsert(name: string, config?: SessionConfig): Promise<void> {
this.onlyDefault(name);
this.sessionConfig = config;
}
async start(name: string): Promise<SessionDTO> {
this.onlyDefault(name);
if (this.session) {
throw new UnprocessableEntityException(
`Session '${this.DEFAULT}' is already started.`,
);
}
this.log.info({ session: name }, `Starting session...`);
const logger = this.log.logger.child({ session: name });
logger.level = getPinoLogLevel(this.sessionConfig?.debug);
const loggerBuilder: LoggerBuilder = logger;
const storage = await this.mediaStorageFactory.build(
name,
loggerBuilder.child({ name: 'Storage' }),
);
await storage.init();
const mediaManager = new MediaManager(
storage,
this.config.mimetypes,
loggerBuilder.child({ name: 'MediaManager' }),
);
const webhook = new WebhookConductor(loggerBuilder);
const proxyConfig = this.getProxyConfig();
const sessionConfig: SessionParams = {
name,
mediaManager,
loggerBuilder,
printQR: this.engineConfigService.shouldPrintQR,
sessionStore: this.store,
proxyConfig: proxyConfig,
sessionConfig: this.sessionConfig,
ignore: this.ignoreChatsConfig(this.sessionConfig),
};
if (this.EngineClass === WhatsappSessionWebJSCore) {
sessionConfig.engineConfig = this.webjsEngineConfigService.getConfig();
} else if (this.EngineClass === WhatsappSessionGoWSCore) {
sessionConfig.engineConfig = this.gowsConfigService.getConfig();
}
await this.sessionAuthRepository.init(name);
// @ts-ignore
const session = new this.EngineClass(sessionConfig);
this.session = session;
this.updateSession();
// configure webhooks
const webhooks = this.getWebhooks();
webhook.configure(session, webhooks);
// Apps
await this.configureApps(session);
// start session
await session.start();
logger.info('Session has been started.');
return {
name: session.name,
status: session.status,
config: session.sessionConfig,
};
}
private updateSession() {
if (!this.session) {
return;
}
const session: WhatsappSession = this.session as WhatsappSession;
for (const eventName in WAHAEvents) {
const event = WAHAEvents[eventName];
const stream$ = session
.getEventObservable(event)
.pipe(map(populateSessionInfo(event, session)));
this.events2.get(event).switch(stream$);
}
}
getSessionEvent(session: string, event: WAHAEvents): Observable<any> {
return this.events2.get(event);
}
async stop(name: string, silent: boolean): Promise<void> {
this.onlyDefault(name);
if (!this.isRunning(name)) {
this.log.debug({ session: name }, `Session is not running.`);
return;
}
this.log.info({ session: name }, `Stopping session...`);
try {
const session = this.getSession(name);
await session.stop();
} catch (err) {
this.log.warn(`Error while stopping session '${name}'`);
if (!silent) {
throw err;
}
}
this.log.info({ session: name }, `Session has been stopped.`);
this.session = DefaultSessionStatus.STOPPED;
this.updateSession();
await sleep(this.SESSION_STOP_TIMEOUT);
}
async unpair(name: string) {
if (!this.session) {
return;
}
const session = this.session as WhatsappSession;
this.log.info({ session: name }, 'Unpairing the device from account...');
await session.unpair().catch((err) => {
this.log.warn(`Error while unpairing from device: ${err}`);
});
await sleep(1000);
}
async logout(name: string): Promise<void> {
this.onlyDefault(name);
await this.sessionAuthRepository.clean(name);
}
async delete(name: string): Promise<void> {
this.onlyDefault(name);
this.session = DefaultSessionStatus.REMOVED;
this.updateSession();
this.sessionConfig = undefined;
}
/**
* Combine per session and global webhooks
*/
private getWebhooks() {
let webhooks: WebhookConfig[] = [];
if (this.sessionConfig?.webhooks) {
webhooks = webhooks.concat(this.sessionConfig.webhooks);
}
const globalWebhookConfig = this.config.getWebhookConfig();
if (globalWebhookConfig) {
webhooks.push(globalWebhookConfig);
}
return webhooks;
}
/**
* Get either session's or global proxy if defined
*/
protected getProxyConfig(): ProxyConfig | undefined {
if (this.sessionConfig?.proxy) {
return this.sessionConfig.proxy;
}
if (!this.session) {
return undefined;
}
const sessions = { [this.DEFAULT]: this.session as WhatsappSession };
return getProxyConfig(this.config, sessions, this.DEFAULT);
}
getSession(name: string): WhatsappSession {
this.onlyDefault(name);
const session = this.session;
if (!session) {
throw new NotFoundException(
`We didn't find a session with name '${name}'.\n` +
`Please start it first by using POST /api/sessions/${name}/start request`,
);
}
return session as WhatsappSession;
}
async getSessions(all: boolean): Promise<SessionInfo[]> {
if (this.session === DefaultSessionStatus.STOPPED && all) {
return [
{
name: this.DEFAULT,
status: WAHASessionStatus.STOPPED,
config: this.sessionConfig,
me: null,
},
];
}
if (this.session === DefaultSessionStatus.REMOVED && all) {
return [];
}
if (!this.session && !all) {
return [];
}
const session = this.session as WhatsappSession;
const me = session?.getSessionMeInfo();
return [
{
name: session.name,
status: session.status,
config: session.sessionConfig,
me: me,
},
];
}
private async fetchEngineInfo() {
const session = this.session as WhatsappSession;
// Get engine info
let engineInfo = {};
if (session) {
try {
engineInfo = await promiseTimeout(1000, session.getEngineInfo());
} catch (error) {
this.log.debug(
{ session: session.name, error: `${error}` },
'Can not get engine info',
);
}
}
const engine = {
engine: session?.engine,
...engineInfo,
};
return engine;
}
async getSessionInfo(name: string): Promise<SessionDetailedInfo | null> {
this.onlyDefault(name);
const sessions = await this.getSessions(true);
if (sessions.length === 0) {
return null;
}
const session = sessions[0];
const engine = await this.fetchEngineInfo();
return { ...session, engine: engine };
}
protected stopEvents() {
complete(this.events2);
}
async onModuleInit() {
await this.init();
}
async init() {
await this.store.init();
const knex = this.store.getWAHADatabase();
await this.appsService.migrate(knex);
}
}