65 lines
1.7 KiB
TypeScript
65 lines
1.7 KiB
TypeScript
import { type Processor, type JobsOptions, Queue as BullQueue, Worker, QueueScheduler } from "bullmq";
|
|
|
|
import redis from "./redis.server";
|
|
import logger from "./logger.server";
|
|
|
|
type RegisteredQueue = {
|
|
queue: BullQueue;
|
|
worker: Worker;
|
|
scheduler: QueueScheduler;
|
|
};
|
|
|
|
declare global {
|
|
var __registeredQueues: Map<string, RegisteredQueue> | undefined;
|
|
}
|
|
|
|
const registeredQueues = global.__registeredQueues || (global.__registeredQueues = new Map<string, RegisteredQueue>());
|
|
|
|
export function Queue<Payload>(
|
|
name: string,
|
|
handler: Processor<Payload>,
|
|
defaultJobOptions: JobsOptions = {},
|
|
): BullQueue<Payload> {
|
|
if (registeredQueues.has(name)) {
|
|
return registeredQueues.get(name)!.queue;
|
|
}
|
|
|
|
const jobOptions: JobsOptions = {
|
|
attempts: 3,
|
|
backoff: { type: "exponential", delay: 1000 },
|
|
...defaultJobOptions,
|
|
};
|
|
const queue = new BullQueue<Payload>(name, {
|
|
defaultJobOptions: jobOptions,
|
|
connection: redis,
|
|
});
|
|
const worker = new Worker<Payload>(name, handler, { connection: redis });
|
|
const scheduler = new QueueScheduler(name, { connection: redis });
|
|
registeredQueues.set(name, { queue, worker, scheduler });
|
|
|
|
return queue;
|
|
}
|
|
|
|
export function CronJob(
|
|
name: string,
|
|
handler: Processor<undefined>,
|
|
cronSchedule: string,
|
|
defaultJobOptions: Exclude<JobsOptions, "repeat"> = {},
|
|
) {
|
|
const jobOptions: JobsOptions = {
|
|
...defaultJobOptions,
|
|
repeat: { cron: cronSchedule },
|
|
};
|
|
|
|
return function register() {
|
|
if (registeredQueues.has(name)) {
|
|
return registeredQueues.get(name)!.queue;
|
|
}
|
|
|
|
const queue = Queue<undefined>(name, handler, jobOptions);
|
|
queue.add(name, undefined, jobOptions);
|
|
logger.info(`registered cron job "${name}"`);
|
|
return queue;
|
|
};
|
|
}
|