Spaces:
Paused
Paused
| import { redisConnection } from "../services/queue-service"; | |
| import { addScrapeJob, addScrapeJobs } from "../services/queue-jobs"; | |
| import { | |
| cleanOldConcurrencyLimitEntries, | |
| pushConcurrencyLimitActiveJob, | |
| takeConcurrencyLimitedJob, | |
| removeConcurrencyLimitActiveJob, | |
| } from "../lib/concurrency-limit"; | |
| import { WebScraperOptions } from "../types"; | |
| import { getACUCTeam } from "../controllers/auth"; | |
| // Mock all the dependencies | |
| const mockAdd = jest.fn(); | |
| jest.mock("../services/queue-service", () => ({ | |
| redisConnection: { | |
| zremrangebyscore: jest.fn(), | |
| zrangebyscore: jest.fn(), | |
| zadd: jest.fn(), | |
| zrem: jest.fn(), | |
| zmpop: jest.fn(), | |
| zcard: jest.fn(), | |
| smembers: jest.fn(), | |
| }, | |
| getScrapeQueue: jest.fn(() => ({ | |
| add: mockAdd, | |
| })), | |
| })); | |
| jest.mock("uuid", () => ({ | |
| v4: jest.fn(() => "mock-uuid"), | |
| })); | |
| describe("Queue Concurrency Integration", () => { | |
| const mockTeamId = "test-team-id"; | |
| const mockNow = Date.now(); | |
| const defaultScrapeOptions = { | |
| formats: ["markdown"] as ( | |
| | "markdown" | |
| | "html" | |
| | "rawHtml" | |
| | "links" | |
| | "screenshot" | |
| | "screenshot@fullPage" | |
| | "extract" | |
| | "json" | |
| )[], | |
| onlyMainContent: true, | |
| waitFor: 0, | |
| mobile: false, | |
| parsePDF: false, | |
| timeout: 30000, | |
| extract: { | |
| mode: "llm" as const, | |
| systemPrompt: "test", | |
| schema: {}, | |
| }, | |
| extractOptions: { mode: "llm" as const, systemPrompt: "test" }, | |
| javascript: true, | |
| headers: {}, | |
| cookies: [], | |
| blockResources: true, | |
| skipTlsVerification: false, | |
| removeBase64Images: true, | |
| fastMode: false, | |
| blockAds: true, | |
| }; | |
| beforeEach(() => { | |
| jest.clearAllMocks(); | |
| jest.spyOn(Date, "now").mockImplementation(() => mockNow); | |
| }); | |
| describe("Single Job Addition", () => { | |
| const mockWebScraperOptions: WebScraperOptions = { | |
| url: "https://test.com", | |
| mode: "single_urls", | |
| team_id: mockTeamId, | |
| scrapeOptions: defaultScrapeOptions, | |
| crawlerOptions: null, | |
| }; | |
| it("should add job directly to BullMQ when under concurrency limit", async () => { | |
| // Mock current active jobs to be under limit | |
| (redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]); | |
| await addScrapeJob(mockWebScraperOptions); | |
| // Should have checked concurrency | |
| expect(redisConnection.zrangebyscore).toHaveBeenCalled(); | |
| // Should have added to BullMQ | |
| expect(mockAdd).toHaveBeenCalled(); | |
| // Should have added to active jobs | |
| expect(redisConnection.zadd).toHaveBeenCalledWith( | |
| expect.stringContaining("concurrency-limiter"), | |
| expect.any(Number), | |
| expect.any(String), | |
| ); | |
| }); | |
| it("should add job to concurrency queue when at concurrency limit", async () => { | |
| // Mock current active jobs to be at limit | |
| (getACUCTeam as jest.Mock).mockResolvedValue({ | |
| concurrency: 15, | |
| } as any); | |
| const activeJobs = Array(15).fill("active-job"); | |
| (redisConnection.zrangebyscore as jest.Mock).mockResolvedValue( | |
| activeJobs, | |
| ); | |
| await addScrapeJob(mockWebScraperOptions); | |
| // Should have checked concurrency | |
| expect(redisConnection.zrangebyscore).toHaveBeenCalled(); | |
| // Should NOT have added to BullMQ | |
| expect(mockAdd).not.toHaveBeenCalled(); | |
| // Should have added to concurrency queue | |
| expect(redisConnection.zadd).toHaveBeenCalledWith( | |
| expect.stringContaining("concurrency-limit-queue"), | |
| expect.any(Number), | |
| expect.stringContaining("mock-uuid"), | |
| ); | |
| }); | |
| }); | |
| describe("Batch Job Addition", () => { | |
| const createMockJobs = (count: number) => | |
| Array(count) | |
| .fill(null) | |
| .map((_, i) => ({ | |
| data: { | |
| url: `https://test${i}.com`, | |
| mode: "single_urls", | |
| team_id: mockTeamId, | |
| scrapeOptions: defaultScrapeOptions, | |
| } as WebScraperOptions, | |
| opts: { | |
| jobId: `job-${i}`, | |
| priority: 1, | |
| }, | |
| })); | |
| it("should handle batch jobs respecting concurrency limits", async () => { | |
| const maxConcurrency = 15; | |
| (getACUCTeam as jest.Mock).mockResolvedValue({ | |
| concurrency: maxConcurrency, | |
| } as any); | |
| const totalJobs = maxConcurrency + 5; // Some jobs should go to queue | |
| const mockJobs = createMockJobs(totalJobs); | |
| // Mock current active jobs to be empty | |
| (redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]); | |
| await addScrapeJobs(mockJobs); | |
| // Should have added maxConcurrency jobs to BullMQ | |
| expect(mockAdd).toHaveBeenCalledTimes(maxConcurrency); | |
| // Should have added remaining jobs to concurrency queue | |
| expect(redisConnection.zadd).toHaveBeenCalledWith( | |
| expect.stringContaining("concurrency-limit-queue"), | |
| expect.any(Number), | |
| expect.any(String), | |
| ); | |
| }); | |
| it("should handle empty job array", async () => { | |
| const result = await addScrapeJobs([]); | |
| expect(result).toBe(true); | |
| expect(mockAdd).not.toHaveBeenCalled(); | |
| expect(redisConnection.zadd).not.toHaveBeenCalled(); | |
| }); | |
| }); | |
| describe("Queue Worker Integration", () => { | |
| it("should process next queued job when active job completes", async () => { | |
| const mockJob = { | |
| id: "test-job", | |
| data: { | |
| team_id: mockTeamId, | |
| }, | |
| }; | |
| // Mock a queued job | |
| const queuedJob = { | |
| id: "queued-job", | |
| data: { test: "data" }, | |
| opts: {}, | |
| }; | |
| (redisConnection.zmpop as jest.Mock).mockResolvedValueOnce([ | |
| "key", | |
| [[JSON.stringify(queuedJob)]], | |
| ]); | |
| // Simulate job completion in worker | |
| await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id); | |
| await cleanOldConcurrencyLimitEntries(mockTeamId); | |
| const nextJob = await takeConcurrencyLimitedJob(mockTeamId); | |
| // Should have taken next job from queue | |
| expect(nextJob).toEqual(queuedJob); | |
| // Should have added new job to active jobs | |
| await pushConcurrencyLimitActiveJob(mockTeamId, nextJob!.id, 2 * 60 * 1000); | |
| expect(redisConnection.zadd).toHaveBeenCalledWith( | |
| expect.stringContaining("concurrency-limiter"), | |
| expect.any(Number), | |
| nextJob!.id, | |
| ); | |
| }); | |
| it("should handle job failure and cleanup", async () => { | |
| const mockJob = { | |
| id: "failing-job", | |
| data: { | |
| team_id: mockTeamId, | |
| }, | |
| }; | |
| // Add job to active jobs | |
| await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, 2 * 60 * 1000); | |
| // Simulate job failure and cleanup | |
| await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id); | |
| await cleanOldConcurrencyLimitEntries(mockTeamId); | |
| // Verify job was removed from active jobs | |
| expect(redisConnection.zrem).toHaveBeenCalledWith( | |
| expect.stringContaining("concurrency-limiter"), | |
| mockJob.id, | |
| ); | |
| }); | |
| }); | |
| describe("Edge Cases", () => { | |
| it("should handle stalled jobs cleanup", async () => { | |
| const stalledTime = mockNow - 3 * 60 * 1000; // 3 minutes ago | |
| // Mock stalled jobs in Redis | |
| (redisConnection.zrangebyscore as jest.Mock).mockResolvedValueOnce([ | |
| "stalled-job", | |
| ]); | |
| await cleanOldConcurrencyLimitEntries(mockTeamId, mockNow); | |
| // Should have cleaned up stalled jobs | |
| expect(redisConnection.zremrangebyscore).toHaveBeenCalledWith( | |
| expect.stringContaining("concurrency-limiter"), | |
| -Infinity, | |
| mockNow, | |
| ); | |
| }); | |
| it("should handle race conditions in job queue processing", async () => { | |
| // Mock a race condition where job is taken by another worker | |
| (redisConnection.zmpop as jest.Mock).mockResolvedValueOnce(null); | |
| const nextJob = await takeConcurrencyLimitedJob(mockTeamId); | |
| // Should handle gracefully when no job is available | |
| expect(nextJob).toBeNull(); | |
| }); | |
| }); | |
| }); | |