add more logs in background jobs
This commit is contained in:
parent
a8491e28a5
commit
aeda74dcf8
@ -10,6 +10,7 @@ type Payload = {
|
|||||||
|
|
||||||
export default Queue<Payload>("fetch messages", async ({ data }) => {
|
export default Queue<Payload>("fetch messages", async ({ data }) => {
|
||||||
const { phoneNumberId } = data;
|
const { phoneNumberId } = data;
|
||||||
|
logger.info(`Fetching messages for phone number with id=${phoneNumberId}`);
|
||||||
const phoneNumber = await db.phoneNumber.findUnique({
|
const phoneNumber = await db.phoneNumber.findUnique({
|
||||||
where: { id: phoneNumberId },
|
where: { id: phoneNumberId },
|
||||||
include: { twilioAccount: true },
|
include: { twilioAccount: true },
|
||||||
@ -26,8 +27,11 @@ export default Queue<Payload>("fetch messages", async ({ data }) => {
|
|||||||
]);
|
]);
|
||||||
const messagesSent = sent.filter((message) => message.direction.startsWith("outbound"));
|
const messagesSent = sent.filter((message) => message.direction.startsWith("outbound"));
|
||||||
const messagesReceived = received.filter((message) => message.direction === "inbound");
|
const messagesReceived = received.filter((message) => message.direction === "inbound");
|
||||||
const messages = [...messagesSent, ...messagesReceived];
|
logger.info(
|
||||||
|
`Found ${messagesSent.length} outbound messages and ${messagesReceived.length} inbound messages for phone number with id=${phoneNumberId}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
const messages = [...messagesSent, ...messagesReceived];
|
||||||
await insertMessagesQueue.add(`insert messages of id=${phoneNumberId}`, {
|
await insertMessagesQueue.add(`insert messages of id=${phoneNumberId}`, {
|
||||||
phoneNumberId,
|
phoneNumberId,
|
||||||
messages,
|
messages,
|
||||||
|
@ -10,6 +10,7 @@ type Payload = {
|
|||||||
|
|
||||||
export default Queue<Payload>("fetch phone calls", async ({ data }) => {
|
export default Queue<Payload>("fetch phone calls", async ({ data }) => {
|
||||||
const { phoneNumberId } = data;
|
const { phoneNumberId } = data;
|
||||||
|
logger.info(`Fetching phone calls for phone number with id=${phoneNumberId}`);
|
||||||
const phoneNumber = await db.phoneNumber.findUnique({
|
const phoneNumber = await db.phoneNumber.findUnique({
|
||||||
where: { id: phoneNumberId },
|
where: { id: phoneNumberId },
|
||||||
include: { twilioAccount: true },
|
include: { twilioAccount: true },
|
||||||
@ -24,8 +25,11 @@ export default Queue<Payload>("fetch phone calls", async ({ data }) => {
|
|||||||
twilioClient.calls.list({ from: phoneNumber.number }),
|
twilioClient.calls.list({ from: phoneNumber.number }),
|
||||||
twilioClient.calls.list({ to: phoneNumber.number }),
|
twilioClient.calls.list({ to: phoneNumber.number }),
|
||||||
]);
|
]);
|
||||||
const calls = [...callsSent, ...callsReceived];
|
logger.info(
|
||||||
|
`Found ${callsSent.length} outbound calls and ${callsReceived.length} inbound calls for phone number with id=${phoneNumberId}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
const calls = [...callsSent, ...callsReceived];
|
||||||
await insertCallsQueue.add(`insert calls of id=${phoneNumberId}`, {
|
await insertCallsQueue.add(`insert calls of id=${phoneNumberId}`, {
|
||||||
phoneNumberId,
|
phoneNumberId,
|
||||||
calls,
|
calls,
|
||||||
|
@ -13,8 +13,10 @@ type Payload = {
|
|||||||
|
|
||||||
export default Queue<Payload>("insert messages", async ({ data }) => {
|
export default Queue<Payload>("insert messages", async ({ data }) => {
|
||||||
const { messages, phoneNumberId } = data;
|
const { messages, phoneNumberId } = data;
|
||||||
|
logger.info(`Inserting ${messages.length} messages for phone number with id=${phoneNumberId}`);
|
||||||
const phoneNumber = await db.phoneNumber.findUnique({ where: { id: phoneNumberId } });
|
const phoneNumber = await db.phoneNumber.findUnique({ where: { id: phoneNumberId } });
|
||||||
if (!phoneNumber) {
|
if (!phoneNumber) {
|
||||||
|
logger.warn(`No phone number found with id=${phoneNumberId}`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -37,7 +39,7 @@ export default Queue<Payload>("insert messages", async ({ data }) => {
|
|||||||
.sort((a, b) => a.sentAt.getTime() - b.sentAt.getTime());
|
.sort((a, b) => a.sentAt.getTime() - b.sentAt.getTime());
|
||||||
|
|
||||||
const { count } = await db.message.createMany({ data: sms, skipDuplicates: true });
|
const { count } = await db.message.createMany({ data: sms, skipDuplicates: true });
|
||||||
logger.info(`inserted ${count} new messages for phoneNumberId=${phoneNumberId}`);
|
logger.info(`Inserted ${count} new messages for phone number with id=${phoneNumberId}`);
|
||||||
|
|
||||||
if (!phoneNumber.isFetchingMessages) {
|
if (!phoneNumber.isFetchingMessages) {
|
||||||
return;
|
return;
|
||||||
|
@ -13,8 +13,10 @@ type Payload = {
|
|||||||
|
|
||||||
export default Queue<Payload>("insert phone calls", async ({ data }) => {
|
export default Queue<Payload>("insert phone calls", async ({ data }) => {
|
||||||
const { calls, phoneNumberId } = data;
|
const { calls, phoneNumberId } = data;
|
||||||
|
logger.info(`Inserting ${calls.length} phone calls for phone number with id=${phoneNumberId}`);
|
||||||
const phoneNumber = await db.phoneNumber.findUnique({ where: { id: phoneNumberId } });
|
const phoneNumber = await db.phoneNumber.findUnique({ where: { id: phoneNumberId } });
|
||||||
if (!phoneNumber) {
|
if (!phoneNumber) {
|
||||||
|
logger.warn(`No phone number found with id=${phoneNumberId}`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -37,7 +39,7 @@ export default Queue<Payload>("insert phone calls", async ({ data }) => {
|
|||||||
.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());
|
.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());
|
||||||
|
|
||||||
const { count } = await db.phoneCall.createMany({ data: phoneCalls, skipDuplicates: true });
|
const { count } = await db.phoneCall.createMany({ data: phoneCalls, skipDuplicates: true });
|
||||||
logger.info(`inserted ${count} new phone calls for phoneNumberId=${phoneNumberId}`);
|
logger.info(`Inserted ${count} new phone calls for phone number with id=${phoneNumberId}`);
|
||||||
|
|
||||||
if (!phoneNumber.isFetchingCalls) {
|
if (!phoneNumber.isFetchingCalls) {
|
||||||
return;
|
return;
|
||||||
|
@ -33,7 +33,52 @@ export function Queue<Payload>(
|
|||||||
defaultJobOptions: jobOptions,
|
defaultJobOptions: jobOptions,
|
||||||
connection: redis,
|
connection: redis,
|
||||||
});
|
});
|
||||||
const worker = new Worker<Payload>(name, handler, { connection: redis });
|
queue.on("error", (error) => logger.error(`queue:${name}:error`, error));
|
||||||
|
queue.on("cleaned", (jobs, type) =>
|
||||||
|
logger.debug(`queue:${name}:cleaned`, `${jobs.length} jobs cleaned (type=${type})`),
|
||||||
|
);
|
||||||
|
queue.on("waiting", (job) => logger.debug(`queue:${name}:waiting`, `job "${job.name}" is waiting`));
|
||||||
|
queue.on("paused", () => logger.debug(`queue:${name}:paused`));
|
||||||
|
queue.on("resumed", () => logger.debug(`queue:${name}:resumed`));
|
||||||
|
queue.on("ioredis:close", () => logger.debug(`queue:${name}:ioredis:close`));
|
||||||
|
queue.on("removed", (job) => logger.debug(`queue:${name}:removed`, `job "${job.name}" has been removed`));
|
||||||
|
queue.on("progress", (job, progress) =>
|
||||||
|
logger.debug(`queue:${name}:progress`, `job "${job.name}" has progressed => ${progress}`),
|
||||||
|
);
|
||||||
|
|
||||||
|
const worker = new Worker<Payload>(
|
||||||
|
name,
|
||||||
|
async (job, token) => {
|
||||||
|
try {
|
||||||
|
const res = await handler(job, token);
|
||||||
|
logger.debug(`queue:${name}:worker-success`, `worker finished job ${job.name}`);
|
||||||
|
return res;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`queue:${name}:worker-error`, `worker error for job ${job.name}`, error);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{ connection: redis },
|
||||||
|
);
|
||||||
|
worker.on("failed", (job, error, prev) =>
|
||||||
|
logger.error(`job "${job.name}" failed with error ${error} (prev=${prev})`),
|
||||||
|
);
|
||||||
|
worker.on("completed", (job, error, prev) =>
|
||||||
|
logger.debug(`job "${job.name}" completed (error=${error}) (prev=${prev})`),
|
||||||
|
);
|
||||||
|
|
||||||
|
worker.on("error", (error) => logger.error(`worker:${name}:error`, error));
|
||||||
|
worker.on("paused", () => logger.debug(`worker:${name}:paused`));
|
||||||
|
worker.on("resumed", () => logger.debug(`worker:${name}:resumed`));
|
||||||
|
worker.on("ioredis:close", () => logger.debug(`worker:${name}:ioredis:close`));
|
||||||
|
worker.on("progress", (job, progress) =>
|
||||||
|
logger.debug(`worker:${name}:progress`, `job "${job.name}" has progressed => ${progress}`),
|
||||||
|
);
|
||||||
|
worker.on("active", (job) => logger.debug(`worker:${name}:active job "${job.name}"`));
|
||||||
|
worker.on("closed", () => logger.debug(`worker:${name}:closed`));
|
||||||
|
worker.on("closing", (msg) => logger.debug(`worker:${name}:closing msg="${msg}"`));
|
||||||
|
worker.on("drained", () => logger.debug(`worker:${name}:drained`));
|
||||||
|
|
||||||
const scheduler = new QueueScheduler(name, { connection: redis });
|
const scheduler = new QueueScheduler(name, { connection: redis });
|
||||||
registeredQueues.set(name, { queue, worker, scheduler });
|
registeredQueues.set(name, { queue, worker, scheduler });
|
||||||
|
|
||||||
@ -58,7 +103,7 @@ export function CronJob(
|
|||||||
|
|
||||||
const queue = Queue<undefined>(name, handler, jobOptions);
|
const queue = Queue<undefined>(name, handler, jobOptions);
|
||||||
queue.add(name, undefined, jobOptions);
|
queue.add(name, undefined, jobOptions);
|
||||||
logger.info(`registered cron job "${name}"`);
|
logger.debug(`registered cron job "${name}"`);
|
||||||
return queue;
|
return queue;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user