Spaces:
Paused
Paused
File size: 3,523 Bytes
0e759d2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import { Queue } from "bullmq";
import { logger } from "../lib/logger";
import IORedis from "ioredis";
export type QueueFunction = () => Queue<any, any, string, any, any, string>;
let scrapeQueue: Queue;
let extractQueue: Queue;
let loggingQueue: Queue;
let indexQueue: Queue;
let deepResearchQueue: Queue;
let generateLlmsTxtQueue: Queue;
let billingQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
});
export const scrapeQueueName = "{scrapeQueue}";
export const extractQueueName = "{extractQueue}";
export const loggingQueueName = "{loggingQueue}";
export const indexQueueName = "{indexQueue}";
export const generateLlmsTxtQueueName = "{generateLlmsTxtQueue}";
export const deepResearchQueueName = "{deepResearchQueue}";
export const billingQueueName = "{billingQueue}";
export function getScrapeQueue() {
if (!scrapeQueue) {
scrapeQueue = new Queue(scrapeQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 3600, // 1 hour
},
removeOnFail: {
age: 3600, // 1 hour
},
},
});
logger.info("Web scraper queue created");
}
return scrapeQueue;
}
export function getExtractQueue() {
if (!extractQueue) {
extractQueue = new Queue(extractQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Extraction queue created");
}
return extractQueue;
}
export function getIndexQueue() {
if (!indexQueue) {
indexQueue = new Queue(indexQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Index queue created");
}
return indexQueue;
}
export function getGenerateLlmsTxtQueue() {
if (!generateLlmsTxtQueue) {
generateLlmsTxtQueue = new Queue(generateLlmsTxtQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("LLMs TXT generation queue created");
}
return generateLlmsTxtQueue;
}
export function getDeepResearchQueue() {
if (!deepResearchQueue) {
deepResearchQueue = new Queue(deepResearchQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Deep research queue created");
}
return deepResearchQueue;
}
export function getBillingQueue() {
if (!billingQueue) {
billingQueue = new Queue(billingQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Billing queue created");
}
return billingQueue;
}
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });
|