Echo-AI-official's picture
Upload 280 files
0e759d2 verified
import { Request, Response } from "express";
import { Job } from "bullmq";
import { logger } from "../../../lib/logger";
import { getScrapeQueue } from "../../../services/queue-service";
import { checkAlerts } from "../../../services/alerts";
import { sendSlackWebhook } from "../../../services/alerts/slack";
export async function cleanBefore24hCompleteJobsController(
req: Request,
res: Response,
) {
logger.info("πŸ‚ Cleaning jobs older than 24h");
try {
const scrapeQueue = getScrapeQueue();
const batchSize = 10;
const numberOfBatches = 9; // Adjust based on your needs
const completedJobsPromises: Promise<Job[]>[] = [];
for (let i = 0; i < numberOfBatches; i++) {
completedJobsPromises.push(
scrapeQueue.getJobs(
["completed"],
i * batchSize,
i * batchSize + batchSize,
true,
),
);
}
const completedJobs: Job[] = (
await Promise.all(completedJobsPromises)
).flat();
const before24hJobs =
completedJobs.filter(
(job) =>
job.finishedOn !== undefined &&
job.finishedOn < Date.now() - 24 * 60 * 60 * 1000,
) || [];
let count = 0;
if (!before24hJobs) {
return res.status(200).send(`No jobs to remove.`);
}
for (const job of before24hJobs) {
try {
await job.remove();
count++;
} catch (jobError) {
logger.error(`πŸ‚ Failed to remove job with ID ${job.id}: ${jobError}`);
}
}
return res.status(200).send(`Removed ${count} completed jobs.`);
} catch (error) {
logger.error(`πŸ‚ Failed to clean last 24h complete jobs: ${error}`);
return res.status(500).send("Failed to clean jobs");
}
}
export async function checkQueuesController(req: Request, res: Response) {
try {
await checkAlerts();
return res.status(200).send("Alerts initialized");
} catch (error) {
logger.debug(`Failed to initialize alerts: ${error}`);
return res.status(500).send("Failed to initialize alerts");
}
}
// Use this as a "health check" that way we dont destroy the server
export async function queuesController(req: Request, res: Response) {
try {
const scrapeQueue = getScrapeQueue();
const [webScraperActive] = await Promise.all([
scrapeQueue.getActiveCount(),
]);
const noActiveJobs = webScraperActive === 0;
// 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({
webScraperActive,
noActiveJobs,
});
} catch (error) {
logger.error(error);
return res.status(500).json({ error: error.message });
}
}
export async function autoscalerController(req: Request, res: Response) {
try {
const maxNumberOfMachines = 80;
const minNumberOfMachines = 20;
const scrapeQueue = getScrapeQueue();
const [webScraperActive, webScraperWaiting, webScraperPriority] =
await Promise.all([
scrapeQueue.getActiveCount(),
scrapeQueue.getWaitingCount(),
scrapeQueue.getPrioritizedCount(),
]);
let waitingAndPriorityCount = webScraperWaiting + webScraperPriority;
// get number of machines active
const request = await fetch(
"https://api.machines.dev/v1/apps/firecrawl-scraper-js/machines",
{
headers: {
Authorization: `Bearer ${process.env.FLY_API_TOKEN}`,
},
},
);
const machines = await request.json();
// Only worker machines
const activeMachines = machines.filter(
(machine) =>
(machine.state === "started" ||
machine.state === "starting" ||
machine.state === "replacing") &&
machine.config.env["FLY_PROCESS_GROUP"] === "worker",
).length;
let targetMachineCount = activeMachines;
const baseScaleUp = 10;
// Slow scale down
const baseScaleDown = 2;
// Scale up logic
if (webScraperActive > 9000 || waitingAndPriorityCount > 2000) {
targetMachineCount = Math.min(
maxNumberOfMachines,
activeMachines + baseScaleUp * 3,
);
} else if (webScraperActive > 5000 || waitingAndPriorityCount > 1000) {
targetMachineCount = Math.min(
maxNumberOfMachines,
activeMachines + baseScaleUp * 2,
);
} else if (webScraperActive > 1000 || waitingAndPriorityCount > 500) {
targetMachineCount = Math.min(
maxNumberOfMachines,
activeMachines + baseScaleUp,
);
}
// Scale down logic
if (webScraperActive < 100 && waitingAndPriorityCount < 50) {
targetMachineCount = Math.max(
minNumberOfMachines,
activeMachines - baseScaleDown * 3,
);
} else if (webScraperActive < 500 && waitingAndPriorityCount < 200) {
targetMachineCount = Math.max(
minNumberOfMachines,
activeMachines - baseScaleDown * 2,
);
} else if (webScraperActive < 1000 && waitingAndPriorityCount < 500) {
targetMachineCount = Math.max(
minNumberOfMachines,
activeMachines - baseScaleDown,
);
}
if (targetMachineCount !== activeMachines) {
logger.info(
`πŸ‚ Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting`,
);
if (targetMachineCount > activeMachines) {
sendSlackWebhook(
`πŸ‚ Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting - Current DateTime: ${new Date().toISOString()}`,
false,
process.env.SLACK_AUTOSCALER ?? "",
);
} else {
sendSlackWebhook(
`πŸ‚ Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting - Current DateTime: ${new Date().toISOString()}`,
false,
process.env.SLACK_AUTOSCALER ?? "",
);
}
return res.status(200).json({
mode: "scale-descale",
count: targetMachineCount,
});
}
return res.status(200).json({
mode: "normal",
count: activeMachines,
});
} catch (error) {
logger.error(error);
return res.status(500).send("Failed to initialize autoscaler");
}
}