feat(queue-worker): add better logging for worker

This commit is contained in:
Gergő Móricz
2024-12-05 22:06:07 +01:00
parent f82b9c205c
commit 934363b409
+23 -5
View File
@@ -11,7 +11,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook"; import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job"; import { logJob } from "./logging/log_job";
import { Job, Queue } from "bullmq"; import { Job, Queue } from "bullmq";
import { logger } from "../lib/logger"; import { logger as _logger } from "../lib/logger";
import { Worker } from "bullmq"; import { Worker } from "bullmq";
import systemMonitor from "./system-monitor"; import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
@@ -151,6 +151,8 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
} }
const processJobInternal = async (token: string, job: Job & { id: string }) => { const processJobInternal = async (token: string, job: Job & { id: string }) => {
const logger = _logger.child({ module: "queue-worker", method: "processJobInternal", jobId: job.id, scrapeId: job.id, crawlId: job.data?.crawl_id ?? undefined });
const extendLockInterval = setInterval(async () => { const extendLockInterval = setInterval(async () => {
logger.info(`🐂 Worker extending lock on job ${job.id}`); logger.info(`🐂 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime); await job.extendLock(token, jobLockExtensionTime);
@@ -163,17 +165,19 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
if (result.success) { if (result.success) {
try { try {
if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") {
logger.debug("Job succeeded -- has crawl associated, putting null in Redis");
await job.moveToCompleted(null, token, false); await job.moveToCompleted(null, token, false);
} else { } else {
logger.debug("Job succeeded -- putting result in Redis");
await job.moveToCompleted(result.document, token, false); await job.moveToCompleted(result.document, token, false);
} }
} catch (e) {} } catch (e) {}
} else { } else {
logger.debug("Job failed", { result });
await job.moveToFailed((result as any).error, token, false); await job.moveToFailed((result as any).error, token, false);
} }
} catch (error) { } catch (error) {
console.log("Job failed, error:", error); logger.debug("Job failed", { error });
Sentry.captureException(error); Sentry.captureException(error);
err = error; err = error;
await job.moveToFailed(error, token, false); await job.moveToFailed(error, token, false);
@@ -203,6 +207,8 @@ const workerFun = async (
queue: Queue, queue: Queue,
processJobInternal: (token: string, job: Job) => Promise<any> processJobInternal: (token: string, job: Job) => Promise<any>
) => { ) => {
const logger = _logger.child({ module: "queue-worker", method: "workerFun" });
const worker = new Worker(queue.name, null, { const worker = new Worker(queue.name, null, {
connection: redisConnection, connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute lockDuration: 1 * 60 * 1000, // 1 minute
@@ -339,6 +345,7 @@ const workerFun = async (
workerFun(getScrapeQueue(), processJobInternal); workerFun(getScrapeQueue(), processJobInternal);
async function processJob(job: Job & { id: string }, token: string) { async function processJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({ module: "queue-worker", method: "processJob", jobId: job.id, scrapeId: job.id, crawlId: job.data?.crawl_id ?? undefined });
logger.info(`🐂 Worker taking job ${job.id}`); logger.info(`🐂 Worker taking job ${job.id}`);
// Check if the job URL is researchhub and block it immediately // Check if the job URL is researchhub and block it immediately
@@ -408,6 +415,7 @@ async function processJob(job: Job & { id: string }, token: string) {
}; };
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
logger.debug("Calling webhook with success...", { webhook: job.data.webhook });
await callWebhook( await callWebhook(
job.data.team_id, job.data.team_id,
job.data.crawl_id, job.data.crawl_id,
@@ -423,7 +431,7 @@ async function processJob(job: Job & { id: string }, token: string) {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
if (doc.metadata.url !== undefined && doc.metadata.sourceURL !== undefined && normalizeURL(doc.metadata.url, sc) !== normalizeURL(doc.metadata.sourceURL, sc)) { if (doc.metadata.url !== undefined && doc.metadata.sourceURL !== undefined && normalizeURL(doc.metadata.url, sc) !== normalizeURL(doc.metadata.sourceURL, sc)) {
logger.debug("Was redirected, removing old URL and locking new URL..."); logger.debug("Was redirected, removing old URL and locking new URL...", { oldUrl: doc.metadata.sourceURL, newUrl: doc.metadata.url });
// Remove the old URL from visited unique due to checking for limit // Remove the old URL from visited unique due to checking for limit
// Do not remove from :visited otherwise it will keep crawling the original URL (sourceURL) // Do not remove from :visited otherwise it will keep crawling the original URL (sourceURL)
await redisConnection.srem("crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(doc.metadata.sourceURL, sc)); await redisConnection.srem("crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(doc.metadata.sourceURL, sc));
@@ -431,6 +439,7 @@ async function processJob(job: Job & { id: string }, token: string) {
await lockURL(job.data.crawl_id, sc, doc.metadata.url); await lockURL(job.data.crawl_id, sc, doc.metadata.url);
} }
logger.debug("Logging job to DB...");
await logJob({ await logJob({
job_id: job.id as string, job_id: job.id as string,
success: true, success: true,
@@ -446,6 +455,7 @@ async function processJob(job: Job & { id: string }, token: string) {
crawl_id: job.data.crawl_id, crawl_id: job.data.crawl_id,
}, true); }, true);
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id); await addCrawlJobDone(job.data.crawl_id, job.id);
if (job.data.crawlerOptions !== null) { if (job.data.crawlerOptions !== null) {
@@ -457,6 +467,7 @@ async function processJob(job: Job & { id: string }, token: string) {
Infinity, Infinity,
sc.crawlerOptions?.maxDepth ?? 10 sc.crawlerOptions?.maxDepth ?? 10
); );
logger.debug("Discovered " + links.length + " links...", { linksLength: links.length });
for (const link of links) { for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) { if (await lockURL(job.data.crawl_id, sc, link)) {
@@ -468,6 +479,8 @@ async function processJob(job: Job & { id: string }, token: string) {
}); });
const jobId = uuidv4(); const jobId = uuidv4();
logger.debug("Determined job priority " + jobPriority + " for URL " + JSON.stringify(link), { jobPriority, url: link });
// console.log("plan: ", sc.plan); // console.log("plan: ", sc.plan);
// console.log("team_id: ", sc.team_id) // console.log("team_id: ", sc.team_id)
// console.log("base priority: ", job.data.crawl_id ? 20 : 10) // console.log("base priority: ", job.data.crawl_id ? 20 : 10)
@@ -492,6 +505,9 @@ async function processJob(job: Job & { id: string }, token: string) {
); );
await addCrawlJob(job.data.crawl_id, jobId); await addCrawlJob(job.data.crawl_id, jobId);
logger.debug("Added job for URL " + JSON.stringify(link), { jobPriority, url: link });
} else {
logger.debug("Could not lock URL " + JSON.stringify(link), { url: link });
} }
} }
} }
@@ -506,7 +522,7 @@ async function processJob(job: Job & { id: string }, token: string) {
const isEarlyTimeout = error instanceof Error && error.message === "timeout"; const isEarlyTimeout = error instanceof Error && error.message === "timeout";
if (!isEarlyTimeout) { if (!isEarlyTimeout) {
logger.error(`🐂 Job errored ${job.id} - ${error}`); logger.error(`🐂 Job errored ${job.id} - ${error}`, { error });
Sentry.captureException(error, { Sentry.captureException(error, {
data: { data: {
@@ -557,8 +573,10 @@ async function processJob(job: Job & { id: string }, token: string) {
if (job.data.crawl_id) { if (job.data.crawl_id) {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id); await addCrawlJobDone(job.data.crawl_id, job.id);
logger.debug("Logging job to DB...");
await logJob({ await logJob({
job_id: job.id as string, job_id: job.id as string,
success: false, success: false,