|
|
import { Logger } from '@config/logger.config'; |
|
|
import { BaileysEventMap, MessageUpsertType, WAMessage } from 'baileys'; |
|
|
import { catchError, concatMap, delay, EMPTY, from, retryWhen, Subject, Subscription, take, tap } from 'rxjs'; |
|
|
|
|
|
type MessageUpsertPayload = BaileysEventMap['messages.upsert']; |
|
|
type MountProps = { |
|
|
onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise<void>; |
|
|
}; |
|
|
|
|
|
export class BaileysMessageProcessor { |
|
|
private processorLogs = new Logger('BaileysMessageProcessor'); |
|
|
private subscription?: Subscription; |
|
|
|
|
|
protected messageSubject = new Subject<{ |
|
|
messages: WAMessage[]; |
|
|
type: MessageUpsertType; |
|
|
requestId?: string; |
|
|
settings: any; |
|
|
}>(); |
|
|
|
|
|
mount({ onMessageReceive }: MountProps) { |
|
|
this.subscription = this.messageSubject |
|
|
.pipe( |
|
|
tap(({ messages }) => { |
|
|
this.processorLogs.log(`Processing batch of ${messages.length} messages`); |
|
|
}), |
|
|
concatMap(({ messages, type, requestId, settings }) => |
|
|
from(onMessageReceive({ messages, type, requestId }, settings)).pipe( |
|
|
retryWhen((errors) => |
|
|
errors.pipe( |
|
|
tap((error) => this.processorLogs.warn(`Retrying message batch due to error: ${error.message}`)), |
|
|
delay(1000), |
|
|
take(3), |
|
|
), |
|
|
), |
|
|
), |
|
|
), |
|
|
catchError((error) => { |
|
|
this.processorLogs.error(`Error processing message batch: ${error}`); |
|
|
return EMPTY; |
|
|
}), |
|
|
) |
|
|
.subscribe({ |
|
|
error: (error) => { |
|
|
this.processorLogs.error(`Message stream error: ${error}`); |
|
|
}, |
|
|
}); |
|
|
} |
|
|
|
|
|
processMessage(payload: MessageUpsertPayload, settings: any) { |
|
|
const { messages, type, requestId } = payload; |
|
|
this.messageSubject.next({ messages, type, requestId, settings }); |
|
|
} |
|
|
|
|
|
onDestroy() { |
|
|
this.subscription?.unsubscribe(); |
|
|
this.messageSubject.complete(); |
|
|
} |
|
|
} |
|
|
|