Spaces:
Runtime error
Runtime error
File size: 2,608 Bytes
4327358 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { ListenEventsForChatWoot } from '@waha/apps/chatwoot/consumers/waha/base';
import { populateSessionInfo } from '@waha/core/abc/manager.abc';
import { WhatsappSession } from '@waha/core/abc/session.abc';
import { WAHAEvents } from '@waha/structures/enums.dto';
import { Queue } from 'bullmq';
import { QueueName } from '../consumers/QueueName';
/**
* Service for managing ChatWoot queues for WAHA events
* This service is used to avoid cycle dependency between ChatWoot module and SessionManager
*/
@Injectable()
export class ChatWootWAHAQueueService {
constructor(
@InjectQueue(QueueName.WAHA_MESSAGE_ANY)
private readonly queueMessageAny: Queue,
@InjectQueue(QueueName.WAHA_MESSAGE_REACTION)
private readonly queueMessageReaction: Queue,
@InjectQueue(QueueName.WAHA_MESSAGE_EDITED)
private readonly queueMessageEdited: Queue,
@InjectQueue(QueueName.WAHA_MESSAGE_REVOKED)
private readonly queueMessageRevoked: Queue,
@InjectQueue(QueueName.WAHA_SESSION_STATUS)
private readonly queueSessionStatus: Queue,
) {}
/**
* Get the specific queue for an event
* @param event The event to get the queue for
* @returns The queue for the event, or null if there is no specific queue
*/
private getQueueForEvent(event: WAHAEvents): Queue | null {
switch (event) {
case WAHAEvents.MESSAGE_ANY:
return this.queueMessageAny;
case WAHAEvents.MESSAGE_REACTION:
return this.queueMessageReaction;
case WAHAEvents.MESSAGE_EDITED:
return this.queueMessageEdited;
case WAHAEvents.MESSAGE_REVOKED:
return this.queueMessageRevoked;
case WAHAEvents.SESSION_STATUS:
return this.queueSessionStatus;
default:
return null;
}
}
/**
* Add a job to the queue for an event
*/
private async addJobToQueue(
event: WAHAEvents,
data: any,
appId: string,
): Promise<void> {
const queue = this.getQueueForEvent(event);
if (queue) {
await queue.add(data.event, { app: appId, event: data });
}
}
/**
* Configure ChatWoot event handling for a session
*/
listenEvents(appId: string, session: WhatsappSession): void {
const events = ListenEventsForChatWoot();
for (const event of events) {
const obs$ = session.getEventObservable(event);
obs$.subscribe(async (payload) => {
const data = populateSessionInfo(event, session)(payload);
await this.addJobToQueue(event, data, appId);
});
}
}
}
|