| |
| import "./instrument"; |
|
|
| import { closeWorkerDb } from "@midday/db/worker-client"; |
| import { |
| buildReadinessResponse, |
| checkDependencies, |
| } from "@midday/health/checker"; |
| import { workerDependencies } from "@midday/health/probes"; |
| import { createLoggerWithContext } from "@midday/logger"; |
| import * as Sentry from "@sentry/bun"; |
| import { Worker } from "bullmq"; |
| import { Hono } from "hono"; |
| import { workbench } from "workbench/hono"; |
| import { getProcessor } from "./processors/registry"; |
| import { getAllQueues, queueConfigs } from "./queues"; |
| import { registerStaticSchedulers } from "./schedulers/registry"; |
|
|
| const logger = createLoggerWithContext("worker"); |
|
|
| |
| |
| |
| const workers = queueConfigs.map((config) => { |
| const worker = new Worker( |
| config.name, |
| async (job) => { |
| const processor = getProcessor(job.name); |
| if (!processor) { |
| throw new Error(`No processor registered for job: ${job.name}`); |
| } |
| return processor.handle(job); |
| }, |
| config.workerOptions, |
| ); |
|
|
| |
| |
| worker.on("error", (err) => { |
| logger.error(`Worker error: ${config.name}`, { error: err.message }); |
| Sentry.captureException(err, { |
| tags: { workerName: config.name, errorType: "worker_error" }, |
| }); |
| }); |
|
|
| |
| |
| |
| worker.on("failed", async (job, err) => { |
| logger.error(`Job failed: ${job?.name}`, { |
| worker: config.name, |
| jobId: job?.id, |
| error: err.message, |
| }); |
| Sentry.captureException(err, { |
| tags: { |
| workerName: config.name, |
| jobName: job?.name ?? "unknown", |
| errorType: "job_failed", |
| }, |
| extra: { |
| jobId: job?.id, |
| attemptsMade: job?.attemptsMade, |
| }, |
| }); |
|
|
| |
| |
| if (config.eventHandlers?.onFailed) { |
| try { |
| await config.eventHandlers.onFailed( |
| job |
| ? { |
| name: job.name, |
| id: job.id, |
| data: job.data, |
| attemptsMade: job.attemptsMade, |
| opts: job.opts, |
| } |
| : null, |
| err, |
| ); |
| } catch (handlerError) { |
| logger.error(`Error in onFailed handler: ${config.name}`, { |
| error: |
| handlerError instanceof Error |
| ? handlerError.message |
| : String(handlerError), |
| }); |
| Sentry.captureException(handlerError, { |
| tags: { |
| workerName: config.name, |
| errorType: "onfailed_handler_error", |
| }, |
| }); |
| } |
| } |
| }); |
|
|
| |
| if (config.eventHandlers) { |
| if (config.eventHandlers.onCompleted) { |
| worker.on("completed", (job) => { |
| config.eventHandlers!.onCompleted!({ |
| name: job.name, |
| id: job.id, |
| }); |
| }); |
| } |
| } |
|
|
| return worker; |
| }); |
|
|
| |
| registerStaticSchedulers().catch((error) => { |
| logger.error("Failed to register static schedulers", { |
| error: error instanceof Error ? error.message : String(error), |
| }); |
| process.exit(1); |
| }); |
|
|
| |
| const app = new Hono(); |
|
|
| const basePath = "/admin"; |
|
|
| |
| function initializeWorkbench() { |
| const queues = getAllQueues(); |
|
|
| if (queues.length === 0) { |
| logger.warn("No queues found when initializing Workbench"); |
| return; |
| } |
|
|
| |
| app.route( |
| basePath, |
| workbench({ |
| queues, |
| auth: |
| process.env.BOARD_USERNAME && process.env.BOARD_PASSWORD |
| ? { |
| username: process.env.BOARD_USERNAME, |
| password: process.env.BOARD_PASSWORD, |
| } |
| : undefined, |
| title: "Midday Jobs", |
| tags: ["teamId"], |
| }), |
| ); |
|
|
| logger.info(`Workbench initialized with ${queues.length} queues`, { |
| queues: queues.map((q) => q.name), |
| }); |
| } |
|
|
| |
| initializeWorkbench(); |
|
|
| |
| app.get("/", (c) => { |
| return c.json({ status: "ok" }, 200); |
| }); |
|
|
| |
| app.get("/health", (c) => { |
| return c.json({ status: "ok" }, 200); |
| }); |
|
|
| |
| app.get("/health/ready", async (c) => { |
| const results = await checkDependencies(workerDependencies(), 1); |
| const response = buildReadinessResponse(results); |
| return c.json(response, response.status === "ok" ? 200 : 503); |
| }); |
|
|
| |
| app.get("/info", (c) => { |
| const queues = getAllQueues(); |
| return c.json({ |
| queues: queues.map((q) => ({ name: q.name })), |
| dashboardUrl: `${basePath}/queues`, |
| }); |
| }); |
|
|
| |
| const port = Number.parseInt(process.env.PORT || "8080", 10); |
|
|
| Bun.serve({ |
| port, |
| hostname: "0.0.0.0", |
| fetch: app.fetch, |
| }); |
|
|
| logger.info(`Worker server running on port ${port}`); |
| logger.info("Workers initialized and ready to process jobs"); |
|
|
| |
| |
| |
| |
| const shutdown = async (signal: string) => { |
| logger.info(`Received ${signal}, starting graceful shutdown...`); |
|
|
| const SHUTDOWN_TIMEOUT = 30_000; |
|
|
| const shutdownPromise = (async () => { |
| try { |
| |
| logger.info("Stopping workers from accepting new jobs..."); |
| await Promise.all(workers.map((worker) => worker.close())); |
|
|
| |
| logger.info("Waiting for in-flight jobs to complete..."); |
| await new Promise((resolve) => setTimeout(resolve, 5000)); |
|
|
| |
| logger.info("Closing database connections..."); |
| await closeWorkerDb(); |
|
|
| |
| logger.info("Flushing Sentry events..."); |
| await Sentry.close(2000); |
|
|
| logger.info("Graceful shutdown complete"); |
| } catch (error) { |
| logger.error("Error during shutdown", { |
| error: error instanceof Error ? error.message : String(error), |
| }); |
| } |
| })(); |
|
|
| |
| const timeoutPromise = new Promise<void>((resolve) => { |
| setTimeout(() => { |
| logger.warn("Shutdown timeout reached, forcing exit"); |
| resolve(); |
| }, SHUTDOWN_TIMEOUT); |
| }); |
|
|
| await Promise.race([shutdownPromise, timeoutPromise]); |
| process.exit(0); |
| }; |
|
|
| process.on("SIGTERM", () => shutdown("SIGTERM")); |
| process.on("SIGINT", () => shutdown("SIGINT")); |
|
|
| |
| |
| |
| |
| process.on("uncaughtException", (err) => { |
| logger.error("Uncaught exception", { error: err.message, stack: err.stack }); |
| Sentry.captureException(err, { |
| tags: { errorType: "uncaught_exception" }, |
| }); |
| |
| }); |
|
|
| process.on("unhandledRejection", (reason, promise) => { |
| logger.error("Unhandled rejection", { |
| reason: reason instanceof Error ? reason.message : String(reason), |
| stack: reason instanceof Error ? reason.stack : undefined, |
| }); |
| Sentry.captureException( |
| reason instanceof Error ? reason : new Error(String(reason)), |
| { |
| tags: { errorType: "unhandled_rejection" }, |
| }, |
| ); |
| |
| }); |
|
|