Spaces:
Runtime error
Runtime error
File size: 1,862 Bytes
4327358 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
import { JOB_DELAY, JOB_LOCK_TTL } from '@waha/apps/app_sdk/constants';
import { JobLoggerWrapper } from '@waha/apps/app_sdk/JobLoggerWrapper';
import { RMutexService } from '@waha/modules/rmutex/rmutex.service';
import { BaseWorkerHost } from '@waha/utils/bull/BaseWorkerHost';
import { Job } from 'bullmq';
import { DelayedError } from 'bullmq';
import { PinoLogger } from 'nestjs-pino';
import { Logger } from 'pino';
/**
* Base class for app consumers that provides common functionality
* like mutex locking, logging setup, and error handling.
*/
export abstract class AppConsumer extends BaseWorkerHost {
protected readonly logger: Logger;
constructor(
appName: string,
componentName: string,
log: PinoLogger,
protected readonly rmutex: RMutexService,
) {
super();
this.logger = log.logger.child({
app: appName,
component: componentName,
});
}
/**
* Executes a job with mutex locking to ensure only one job processes a specific resource at a time.
* @param job The job to process
* @param mutexKey The key to use for mutex locking
* @param processor The function that processes the job
* @returns The result of the processor function
*/
protected async withMutex<T, R>(
job: Job<T, R, any>,
mutexKey: string,
processor: () => Promise<R>,
): Promise<R> {
const mutex = this.rmutex.get(mutexKey, JOB_LOCK_TTL);
const lock = await mutex.lock();
if (!lock) {
const logger = new JobLoggerWrapper(job, this.logger);
logger.debug(
`Postponing job '${job.id}' for ${JOB_DELAY}ms, another job is already running the mutex.key='${mutexKey}'`,
);
await job.moveToDelayed(Date.now() + JOB_DELAY);
throw new DelayedError();
}
try {
return await processor();
} finally {
await lock.release();
}
}
}
|