File size: 2,608 Bytes
4327358
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { ListenEventsForChatWoot } from '@waha/apps/chatwoot/consumers/waha/base';
import { populateSessionInfo } from '@waha/core/abc/manager.abc';
import { WhatsappSession } from '@waha/core/abc/session.abc';
import { WAHAEvents } from '@waha/structures/enums.dto';
import { Queue } from 'bullmq';

import { QueueName } from '../consumers/QueueName';

/**
 * Service for managing ChatWoot queues for WAHA events
 * This service is used to avoid cycle dependency between ChatWoot module and SessionManager
 */
@Injectable()
export class ChatWootWAHAQueueService {
  constructor(
    @InjectQueue(QueueName.WAHA_MESSAGE_ANY)
    private readonly queueMessageAny: Queue,
    @InjectQueue(QueueName.WAHA_MESSAGE_REACTION)
    private readonly queueMessageReaction: Queue,
    @InjectQueue(QueueName.WAHA_MESSAGE_EDITED)
    private readonly queueMessageEdited: Queue,
    @InjectQueue(QueueName.WAHA_MESSAGE_REVOKED)
    private readonly queueMessageRevoked: Queue,
    @InjectQueue(QueueName.WAHA_SESSION_STATUS)
    private readonly queueSessionStatus: Queue,
  ) {}

  /**
   * Get the specific queue for an event
   * @param event The event to get the queue for
   * @returns The queue for the event, or null if there is no specific queue
   */
  private getQueueForEvent(event: WAHAEvents): Queue | null {
    switch (event) {
      case WAHAEvents.MESSAGE_ANY:
        return this.queueMessageAny;
      case WAHAEvents.MESSAGE_REACTION:
        return this.queueMessageReaction;
      case WAHAEvents.MESSAGE_EDITED:
        return this.queueMessageEdited;
      case WAHAEvents.MESSAGE_REVOKED:
        return this.queueMessageRevoked;
      case WAHAEvents.SESSION_STATUS:
        return this.queueSessionStatus;
      default:
        return null;
    }
  }

  /**
   * Add a job to the queue for an event
   */
  private async addJobToQueue(
    event: WAHAEvents,
    data: any,
    appId: string,
  ): Promise<void> {
    const queue = this.getQueueForEvent(event);
    if (queue) {
      await queue.add(data.event, { app: appId, event: data });
    }
  }

  /**
   * Configure ChatWoot event handling for a session
   */
  listenEvents(appId: string, session: WhatsappSession): void {
    const events = ListenEventsForChatWoot();
    for (const event of events) {
      const obs$ = session.getEventObservable(event);
      obs$.subscribe(async (payload) => {
        const data = populateSessionInfo(event, session)(payload);
        await this.addJobToQueue(event, data, appId);
      });
    }
  }
}