Fire-crawl / src /__tests__ /queue-concurrency-integration.test.ts
Echo-AI-official's picture
Upload 280 files
0e759d2 verified
import { redisConnection } from "../services/queue-service";
import { addScrapeJob, addScrapeJobs } from "../services/queue-jobs";
import {
cleanOldConcurrencyLimitEntries,
pushConcurrencyLimitActiveJob,
takeConcurrencyLimitedJob,
removeConcurrencyLimitActiveJob,
} from "../lib/concurrency-limit";
import { WebScraperOptions } from "../types";
import { getACUCTeam } from "../controllers/auth";
// Mock all the dependencies
const mockAdd = jest.fn();
jest.mock("../services/queue-service", () => ({
redisConnection: {
zremrangebyscore: jest.fn(),
zrangebyscore: jest.fn(),
zadd: jest.fn(),
zrem: jest.fn(),
zmpop: jest.fn(),
zcard: jest.fn(),
smembers: jest.fn(),
},
getScrapeQueue: jest.fn(() => ({
add: mockAdd,
})),
}));
jest.mock("uuid", () => ({
v4: jest.fn(() => "mock-uuid"),
}));
describe("Queue Concurrency Integration", () => {
const mockTeamId = "test-team-id";
const mockNow = Date.now();
const defaultScrapeOptions = {
formats: ["markdown"] as (
| "markdown"
| "html"
| "rawHtml"
| "links"
| "screenshot"
| "screenshot@fullPage"
| "extract"
| "json"
)[],
onlyMainContent: true,
waitFor: 0,
mobile: false,
parsePDF: false,
timeout: 30000,
extract: {
mode: "llm" as const,
systemPrompt: "test",
schema: {},
},
extractOptions: { mode: "llm" as const, systemPrompt: "test" },
javascript: true,
headers: {},
cookies: [],
blockResources: true,
skipTlsVerification: false,
removeBase64Images: true,
fastMode: false,
blockAds: true,
};
beforeEach(() => {
jest.clearAllMocks();
jest.spyOn(Date, "now").mockImplementation(() => mockNow);
});
describe("Single Job Addition", () => {
const mockWebScraperOptions: WebScraperOptions = {
url: "https://test.com",
mode: "single_urls",
team_id: mockTeamId,
scrapeOptions: defaultScrapeOptions,
crawlerOptions: null,
};
it("should add job directly to BullMQ when under concurrency limit", async () => {
// Mock current active jobs to be under limit
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);
await addScrapeJob(mockWebScraperOptions);
// Should have checked concurrency
expect(redisConnection.zrangebyscore).toHaveBeenCalled();
// Should have added to BullMQ
expect(mockAdd).toHaveBeenCalled();
// Should have added to active jobs
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
expect.any(Number),
expect.any(String),
);
});
it("should add job to concurrency queue when at concurrency limit", async () => {
// Mock current active jobs to be at limit
(getACUCTeam as jest.Mock).mockResolvedValue({
concurrency: 15,
} as any);
const activeJobs = Array(15).fill("active-job");
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue(
activeJobs,
);
await addScrapeJob(mockWebScraperOptions);
// Should have checked concurrency
expect(redisConnection.zrangebyscore).toHaveBeenCalled();
// Should NOT have added to BullMQ
expect(mockAdd).not.toHaveBeenCalled();
// Should have added to concurrency queue
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limit-queue"),
expect.any(Number),
expect.stringContaining("mock-uuid"),
);
});
});
describe("Batch Job Addition", () => {
const createMockJobs = (count: number) =>
Array(count)
.fill(null)
.map((_, i) => ({
data: {
url: `https://test${i}.com`,
mode: "single_urls",
team_id: mockTeamId,
scrapeOptions: defaultScrapeOptions,
} as WebScraperOptions,
opts: {
jobId: `job-${i}`,
priority: 1,
},
}));
it("should handle batch jobs respecting concurrency limits", async () => {
const maxConcurrency = 15;
(getACUCTeam as jest.Mock).mockResolvedValue({
concurrency: maxConcurrency,
} as any);
const totalJobs = maxConcurrency + 5; // Some jobs should go to queue
const mockJobs = createMockJobs(totalJobs);
// Mock current active jobs to be empty
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);
await addScrapeJobs(mockJobs);
// Should have added maxConcurrency jobs to BullMQ
expect(mockAdd).toHaveBeenCalledTimes(maxConcurrency);
// Should have added remaining jobs to concurrency queue
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limit-queue"),
expect.any(Number),
expect.any(String),
);
});
it("should handle empty job array", async () => {
const result = await addScrapeJobs([]);
expect(result).toBe(true);
expect(mockAdd).not.toHaveBeenCalled();
expect(redisConnection.zadd).not.toHaveBeenCalled();
});
});
describe("Queue Worker Integration", () => {
it("should process next queued job when active job completes", async () => {
const mockJob = {
id: "test-job",
data: {
team_id: mockTeamId,
},
};
// Mock a queued job
const queuedJob = {
id: "queued-job",
data: { test: "data" },
opts: {},
};
(redisConnection.zmpop as jest.Mock).mockResolvedValueOnce([
"key",
[[JSON.stringify(queuedJob)]],
]);
// Simulate job completion in worker
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
await cleanOldConcurrencyLimitEntries(mockTeamId);
const nextJob = await takeConcurrencyLimitedJob(mockTeamId);
// Should have taken next job from queue
expect(nextJob).toEqual(queuedJob);
// Should have added new job to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, nextJob!.id, 2 * 60 * 1000);
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
expect.any(Number),
nextJob!.id,
);
});
it("should handle job failure and cleanup", async () => {
const mockJob = {
id: "failing-job",
data: {
team_id: mockTeamId,
},
};
// Add job to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, 2 * 60 * 1000);
// Simulate job failure and cleanup
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
await cleanOldConcurrencyLimitEntries(mockTeamId);
// Verify job was removed from active jobs
expect(redisConnection.zrem).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
mockJob.id,
);
});
});
describe("Edge Cases", () => {
it("should handle stalled jobs cleanup", async () => {
const stalledTime = mockNow - 3 * 60 * 1000; // 3 minutes ago
// Mock stalled jobs in Redis
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValueOnce([
"stalled-job",
]);
await cleanOldConcurrencyLimitEntries(mockTeamId, mockNow);
// Should have cleaned up stalled jobs
expect(redisConnection.zremrangebyscore).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
-Infinity,
mockNow,
);
});
it("should handle race conditions in job queue processing", async () => {
// Mock a race condition where job is taken by another worker
(redisConnection.zmpop as jest.Mock).mockResolvedValueOnce(null);
const nextJob = await takeConcurrencyLimitedJob(mockTeamId);
// Should handle gracefully when no job is available
expect(nextJob).toBeNull();
});
});
});