From 934363b40920c85864710f3477a7e4344942ca9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 5 Dec 2024 22:06:07 +0100 Subject: [PATCH] feat(queue-worker): add better logging for worker --- apps/api/src/services/queue-worker.ts | 28 ++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 101511a9..78578395 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -11,7 +11,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; import { Job, Queue } from "bullmq"; -import { logger } from "../lib/logger"; +import { logger as _logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; @@ -151,6 +151,8 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { } const processJobInternal = async (token: string, job: Job & { id: string }) => { + const logger = _logger.child({ module: "queue-worker", method: "processJobInternal", jobId: job.id, scrapeId: job.id, crawlId: job.data?.crawl_id ?? undefined }); + const extendLockInterval = setInterval(async () => { logger.info(`🐂 Worker extending lock on job ${job.id}`); await job.extendLock(token, jobLockExtensionTime); @@ -163,17 +165,19 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { if (result.success) { try { if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { + logger.debug("Job succeeded -- has crawl associated, putting null in Redis"); await job.moveToCompleted(null, token, false); } else { + logger.debug("Job succeeded -- putting result in Redis"); await job.moveToCompleted(result.document, token, false); } } catch (e) {} } else { + logger.debug("Job failed", { result }); await job.moveToFailed((result as any).error, token, false); } - } catch (error) { - console.log("Job failed, error:", error); + logger.debug("Job failed", { error }); Sentry.captureException(error); err = error; await job.moveToFailed(error, token, false); @@ -203,6 +207,8 @@ const workerFun = async ( queue: Queue, processJobInternal: (token: string, job: Job) => Promise ) => { + const logger = _logger.child({ module: "queue-worker", method: "workerFun" }); + const worker = new Worker(queue.name, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute @@ -339,6 +345,7 @@ const workerFun = async ( workerFun(getScrapeQueue(), processJobInternal); async function processJob(job: Job & { id: string }, token: string) { + const logger = _logger.child({ module: "queue-worker", method: "processJob", jobId: job.id, scrapeId: job.id, crawlId: job.data?.crawl_id ?? undefined }); logger.info(`🐂 Worker taking job ${job.id}`); // Check if the job URL is researchhub and block it immediately @@ -408,6 +415,7 @@ async function processJob(job: Job & { id: string }, token: string) { }; if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { + logger.debug("Calling webhook with success...", { webhook: job.data.webhook }); await callWebhook( job.data.team_id, job.data.crawl_id, @@ -423,7 +431,7 @@ async function processJob(job: Job & { id: string }, token: string) { const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; if (doc.metadata.url !== undefined && doc.metadata.sourceURL !== undefined && normalizeURL(doc.metadata.url, sc) !== normalizeURL(doc.metadata.sourceURL, sc)) { - logger.debug("Was redirected, removing old URL and locking new URL..."); + logger.debug("Was redirected, removing old URL and locking new URL...", { oldUrl: doc.metadata.sourceURL, newUrl: doc.metadata.url }); // Remove the old URL from visited unique due to checking for limit // Do not remove from :visited otherwise it will keep crawling the original URL (sourceURL) await redisConnection.srem("crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(doc.metadata.sourceURL, sc)); @@ -431,6 +439,7 @@ async function processJob(job: Job & { id: string }, token: string) { await lockURL(job.data.crawl_id, sc, doc.metadata.url); } + logger.debug("Logging job to DB..."); await logJob({ job_id: job.id as string, success: true, @@ -446,6 +455,7 @@ async function processJob(job: Job & { id: string }, token: string) { crawl_id: job.data.crawl_id, }, true); + logger.debug("Declaring job as done..."); await addCrawlJobDone(job.data.crawl_id, job.id); if (job.data.crawlerOptions !== null) { @@ -457,6 +467,7 @@ async function processJob(job: Job & { id: string }, token: string) { Infinity, sc.crawlerOptions?.maxDepth ?? 10 ); + logger.debug("Discovered " + links.length + " links...", { linksLength: links.length }); for (const link of links) { if (await lockURL(job.data.crawl_id, sc, link)) { @@ -468,6 +479,8 @@ async function processJob(job: Job & { id: string }, token: string) { }); const jobId = uuidv4(); + logger.debug("Determined job priority " + jobPriority + " for URL " + JSON.stringify(link), { jobPriority, url: link }); + // console.log("plan: ", sc.plan); // console.log("team_id: ", sc.team_id) // console.log("base priority: ", job.data.crawl_id ? 20 : 10) @@ -492,6 +505,9 @@ async function processJob(job: Job & { id: string }, token: string) { ); await addCrawlJob(job.data.crawl_id, jobId); + logger.debug("Added job for URL " + JSON.stringify(link), { jobPriority, url: link }); + } else { + logger.debug("Could not lock URL " + JSON.stringify(link), { url: link }); } } } @@ -506,7 +522,7 @@ async function processJob(job: Job & { id: string }, token: string) { const isEarlyTimeout = error instanceof Error && error.message === "timeout"; if (!isEarlyTimeout) { - logger.error(`🐂 Job errored ${job.id} - ${error}`); + logger.error(`🐂 Job errored ${job.id} - ${error}`, { error }); Sentry.captureException(error, { data: { @@ -557,8 +573,10 @@ async function processJob(job: Job & { id: string }, token: string) { if (job.data.crawl_id) { const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; + logger.debug("Declaring job as done..."); await addCrawlJobDone(job.data.crawl_id, job.id); + logger.debug("Logging job to DB..."); await logJob({ job_id: job.id as string, success: false,