shellphone.app/app/utils/queue.server.ts

65 lines
1.7 KiB
TypeScript
Raw Normal View History

2022-05-14 10:22:06 +00:00
import { type Processor, type JobsOptions, Queue as BullQueue, Worker, QueueScheduler } from "bullmq";
import redis from "./redis.server";
import logger from "./logger.server";
type RegisteredQueue = {
queue: BullQueue;
worker: Worker;
scheduler: QueueScheduler;
};
declare global {
var __registeredQueues: Map<string, RegisteredQueue> | undefined;
}
const registeredQueues = global.__registeredQueues || (global.__registeredQueues = new Map<string, RegisteredQueue>());
export function Queue<Payload>(
name: string,
handler: Processor<Payload>,
defaultJobOptions: JobsOptions = {},
): BullQueue<Payload> {
if (registeredQueues.has(name)) {
return registeredQueues.get(name)!.queue;
}
const jobOptions: JobsOptions = {
attempts: 3,
backoff: { type: "exponential", delay: 1000 },
...defaultJobOptions,
};
const queue = new BullQueue<Payload>(name, {
defaultJobOptions: jobOptions,
connection: redis,
});
const worker = new Worker<Payload>(name, handler, { connection: redis });
const scheduler = new QueueScheduler(name, { connection: redis });
registeredQueues.set(name, { queue, worker, scheduler });
return queue;
}
export function CronJob(
name: string,
handler: Processor<undefined>,
cronSchedule: string,
defaultJobOptions: Exclude<JobsOptions, "repeat"> = {},
) {
const jobOptions: JobsOptions = {
...defaultJobOptions,
repeat: { cron: cronSchedule },
};
return function register() {
if (registeredQueues.has(name)) {
return registeredQueues.get(name)!.queue;
}
const queue = Queue<undefined>(name, handler, jobOptions);
queue.add(name, undefined, jobOptions);
logger.info(`registered cron job "${name}"`);
return queue;
};
}