import { Queue } from "bullmq"; import { logger } from "../lib/logger"; import IORedis from "ioredis"; export type QueueFunction = () => Queue; let scrapeQueue: Queue; let extractQueue: Queue; let loggingQueue: Queue; let indexQueue: Queue; let deepResearchQueue: Queue; let generateLlmsTxtQueue: Queue; let billingQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, }); export const scrapeQueueName = "{scrapeQueue}"; export const extractQueueName = "{extractQueue}"; export const loggingQueueName = "{loggingQueue}"; export const indexQueueName = "{indexQueue}"; export const generateLlmsTxtQueueName = "{generateLlmsTxtQueue}"; export const deepResearchQueueName = "{deepResearchQueue}"; export const billingQueueName = "{billingQueue}"; export function getScrapeQueue() { if (!scrapeQueue) { scrapeQueue = new Queue(scrapeQueueName, { connection: redisConnection, defaultJobOptions: { removeOnComplete: { age: 3600, // 1 hour }, removeOnFail: { age: 3600, // 1 hour }, }, }); logger.info("Web scraper queue created"); } return scrapeQueue; } export function getExtractQueue() { if (!extractQueue) { extractQueue = new Queue(extractQueueName, { connection: redisConnection, defaultJobOptions: { removeOnComplete: { age: 90000, // 25 hours }, removeOnFail: { age: 90000, // 25 hours }, }, }); logger.info("Extraction queue created"); } return extractQueue; } export function getIndexQueue() { if (!indexQueue) { indexQueue = new Queue(indexQueueName, { connection: redisConnection, defaultJobOptions: { removeOnComplete: { age: 90000, // 25 hours }, removeOnFail: { age: 90000, // 25 hours }, }, }); logger.info("Index queue created"); } return indexQueue; } export function getGenerateLlmsTxtQueue() { if (!generateLlmsTxtQueue) { generateLlmsTxtQueue = new Queue(generateLlmsTxtQueueName, { connection: redisConnection, defaultJobOptions: { removeOnComplete: { age: 90000, // 25 hours }, removeOnFail: { age: 90000, // 25 hours }, }, }); logger.info("LLMs TXT generation queue created"); } return generateLlmsTxtQueue; } export function getDeepResearchQueue() { if (!deepResearchQueue) { deepResearchQueue = new Queue(deepResearchQueueName, { connection: redisConnection, defaultJobOptions: { removeOnComplete: { age: 90000, // 25 hours }, removeOnFail: { age: 90000, // 25 hours }, }, }); logger.info("Deep research queue created"); } return deepResearchQueue; } export function getBillingQueue() { if (!billingQueue) { billingQueue = new Queue(billingQueueName, { connection: redisConnection, defaultJobOptions: { removeOnComplete: { age: 90000, // 25 hours }, removeOnFail: { age: 90000, // 25 hours }, }, }); logger.info("Billing queue created"); } return billingQueue; } // === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE // import { QueueEvents } from 'bullmq'; // export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });