Fire-crawl / src /services /queue-jobs.ts
Echo-AI-official's picture
Upload 280 files
0e759d2 verified
import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { NotificationType, RateLimiterMode, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import {
cleanOldConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
getConcurrencyQueueJobsCount,
pushConcurrencyLimitActiveJob,
pushConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
import { logger } from "../lib/logger";
import { sendNotificationWithCustomDays } from './notification/email_notification';
import { shouldSendConcurrencyLimitNotification } from './notification/notification-check';
import { getACUC, getACUCTeam } from "../controllers/auth";
import { getJobFromGCS } from "../lib/gcs-jobs";
import { Document } from "../controllers/v1/types";
/**
* Checks if a job is a crawl or batch scrape based on its options
* @param options The job options containing crawlerOptions and crawl_id
* @returns true if the job is either a crawl or batch scrape
*/
function isCrawlOrBatchScrape(options: { crawlerOptions?: any; crawl_id?: string }): boolean {
// If crawlerOptions exists, it's a crawl
// If crawl_id exists but no crawlerOptions, it's a batch scrape
return !!options.crawlerOptions || !!options.crawl_id;
}
async function _addScrapeJobToConcurrencyQueue(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number,
) {
await pushConcurrencyLimitedJob(webScraperOptions.team_id, {
id: jobId,
data: webScraperOptions,
opts: {
...options,
priority: jobPriority,
jobId: jobId,
},
priority: jobPriority,
});
}
export async function _addScrapeJobToBullMQ(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number,
) {
if (
webScraperOptions &&
webScraperOptions.team_id
) {
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout
}
await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
jobId,
});
}
async function addScrapeJobRaw(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number,
) {
let concurrencyLimited = false;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;
if (
webScraperOptions &&
webScraperOptions.team_id
) {
const now = Date.now();
maxConcurrency = (await getACUCTeam(webScraperOptions.team_id, false, true, webScraperOptions.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length;
concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
}
const concurrencyQueueJobs = await getConcurrencyQueueJobsCount(webScraperOptions.team_id);
if (concurrencyLimited) {
// Detect if they hit their concurrent limit
// If above by 2x, send them an email
// No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x
if(concurrencyQueueJobs > maxConcurrency) {
logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id);
// Only send notification if it's not a crawl or batch scrape
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}
}
webScraperOptions.concurrencyLimited = true;
await _addScrapeJobToConcurrencyQueue(
webScraperOptions,
options,
jobId,
jobPriority,
);
} else {
await _addScrapeJobToBullMQ(webScraperOptions, options, jobId, jobPriority);
}
}
export async function addScrapeJob(
webScraperOptions: WebScraperOptions,
options: any = {},
jobId: string = uuidv4(),
jobPriority: number = 10,
) {
if (Sentry.isInitialized()) {
const size = JSON.stringify(webScraperOptions).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await addScrapeJobRaw(
{
...webScraperOptions,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
},
options,
jobId,
jobPriority,
);
},
);
} else {
await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
}
}
export async function addScrapeJobs(
jobs: {
data: WebScraperOptions;
opts: {
jobId: string;
priority: number;
};
}[],
) {
if (jobs.length === 0) return true;
let countCanBeDirectlyAdded = Infinity;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;
if (jobs[0].data && jobs[0].data.team_id) {
const now = Date.now();
maxConcurrency = (await getACUCTeam(jobs[0].data.team_id, false, true, jobs[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now);
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length;
countCanBeDirectlyAdded = Math.max(
maxConcurrency - currentActiveConcurrency,
0,
);
}
const addToBull = jobs.slice(0, countCanBeDirectlyAdded);
const addToCQ = jobs.slice(countCanBeDirectlyAdded);
// equals 2x the max concurrency
if(addToCQ.length > maxConcurrency) {
logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id);
// Only send notification if it's not a crawl or batch scrape
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}
}
await Promise.all(
addToBull.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await _addScrapeJobToBullMQ(
{
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
},
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
await Promise.all(
addToCQ.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await _addScrapeJobToConcurrencyQueue(
{
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
},
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
}
export function waitForJob(
jobId: string,
timeout: number,
): Promise<Document> {
return new Promise((resolve, reject) => {
const start = Date.now();
const int = setInterval(async () => {
if (Date.now() >= start + timeout) {
clearInterval(int);
reject(new Error("Job wait "));
} else {
const state = await getScrapeQueue().getJobState(jobId);
if (state === "completed") {
clearInterval(int);
let doc: Document;
doc = (await getScrapeQueue().getJob(jobId))!.returnvalue;
if (!doc) {
const docs = await getJobFromGCS(jobId);
if (!docs || docs.length === 0) {
throw new Error("Job not found in GCS");
}
doc = docs[0];
}
resolve(doc);
} else if (state === "failed") {
// console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason);
const job = await getScrapeQueue().getJob(jobId);
if (job && job.failedReason !== "Concurrency limit hit") {
clearInterval(int);
reject(job.failedReason);
}
}
}
}, 250);
});
}