From e2179524347542acca70612fd0bbd5c77d5601eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Tue, 26 Nov 2024 16:28:45 +0100 Subject: [PATCH] fix(crawl): finish crawl even if last one fails --- apps/api/src/services/queue-worker.ts | 188 ++++++++++++++------------ 1 file changed, 98 insertions(+), 90 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 3d5b3e0e..17ed5969 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -56,6 +56,99 @@ const connectionMonitorInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10; const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20; +async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { + if (await finishCrawl(job.data.crawl_id)) { + if (!job.data.v1) { + const jobIDs = await getCrawlJobs(job.data.crawl_id); + + const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp); + // const jobStatuses = await Promise.all(jobs.map((x) => x.getState())); + const jobStatus = + sc.cancelled // || jobStatuses.some((x) => x === "failed") + ? "failed" + : "completed"; + + const fullDocs = jobs.map((x) => + Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue + ); + + await logJob({ + job_id: job.data.crawl_id, + success: jobStatus === "completed", + message: sc.cancelled ? "Cancelled" : undefined, + num_docs: fullDocs.length, + docs: [], + time_taken: (Date.now() - sc.createdAt) / 1000, + team_id: job.data.team_id, + mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape", + url: sc.originUrl!, + scrapeOptions: sc.scrapeOptions, + crawlerOptions: sc.crawlerOptions, + origin: job.data.origin, + }); + + const data = { + success: jobStatus !== "failed", + result: { + links: fullDocs.map((doc) => { + return { + content: doc, + source: doc?.metadata?.sourceURL ?? doc?.url ?? "", + }; + }), + }, + project_id: job.data.project_id, + docs: fullDocs, + }; + + // v0 web hooks, call when done with all the data + if (!job.data.v1) { + callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1, + job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed" + ); + } + } else { + const jobIDs = await getCrawlJobs(job.data.crawl_id); + const jobStatus = + sc.cancelled + ? "failed" + : "completed"; + + // v1 web hooks, call when done with no data, but with event completed + if (job.data.v1 && job.data.webhook) { + callWebhook( + job.data.team_id, + job.data.crawl_id, + [], + job.data.webhook, + job.data.v1, + job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed" + ); + } + + await logJob({ + job_id: job.data.crawl_id, + success: jobStatus === "completed", + message: sc.cancelled ? "Cancelled" : undefined, + num_docs: jobIDs.length, + docs: [], + time_taken: (Date.now() - sc.createdAt) / 1000, + team_id: job.data.team_id, + scrapeOptions: sc.scrapeOptions, + mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape", + url: sc?.originUrl ?? (job.data.crawlerOptions === null ? "Batch Scrape" : "Unknown"), + crawlerOptions: sc.crawlerOptions, + origin: job.data.origin, + }, true); + } + } +} + const processJobInternal = async (token: string, job: Job & { id: string }) => { const extendLockInterval = setInterval(async () => { logger.info(`🐂 Worker extending lock on job ${job.id}`); @@ -399,96 +492,7 @@ async function processJob(job: Job & { id: string }, token: string) { } } - if (await finishCrawl(job.data.crawl_id)) { - if (!job.data.v1) { - const jobIDs = await getCrawlJobs(job.data.crawl_id); - - const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp); - // const jobStatuses = await Promise.all(jobs.map((x) => x.getState())); - const jobStatus = - sc.cancelled // || jobStatuses.some((x) => x === "failed") - ? "failed" - : "completed"; - - const fullDocs = jobs.map((x) => - Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue - ); - - await logJob({ - job_id: job.data.crawl_id, - success: jobStatus === "completed", - message: sc.cancelled ? "Cancelled" : undefined, - num_docs: fullDocs.length, - docs: [], - time_taken: (Date.now() - sc.createdAt) / 1000, - team_id: job.data.team_id, - mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape", - url: sc.originUrl!, - scrapeOptions: sc.scrapeOptions, - crawlerOptions: sc.crawlerOptions, - origin: job.data.origin, - }); - - const data = { - success: jobStatus !== "failed", - result: { - links: fullDocs.map((doc) => { - return { - content: doc, - source: doc?.metadata?.sourceURL ?? doc?.url ?? "", - }; - }), - }, - project_id: job.data.project_id, - docs: fullDocs, - }; - - // v0 web hooks, call when done with all the data - if (!job.data.v1) { - callWebhook( - job.data.team_id, - job.data.crawl_id, - data, - job.data.webhook, - job.data.v1, - job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed" - ); - } - } else { - const jobIDs = await getCrawlJobs(job.data.crawl_id); - const jobStatus = - sc.cancelled - ? "failed" - : "completed"; - - // v1 web hooks, call when done with no data, but with event completed - if (job.data.v1 && job.data.webhook) { - callWebhook( - job.data.team_id, - job.data.crawl_id, - [], - job.data.webhook, - job.data.v1, - job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed" - ); - } - - await logJob({ - job_id: job.data.crawl_id, - success: jobStatus === "completed", - message: sc.cancelled ? "Cancelled" : undefined, - num_docs: jobIDs.length, - docs: [], - time_taken: (Date.now() - sc.createdAt) / 1000, - team_id: job.data.team_id, - scrapeOptions: sc.scrapeOptions, - mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape", - url: sc?.originUrl ?? (job.data.crawlerOptions === null ? "Batch Scrape" : "Unknown"), - crawlerOptions: sc.crawlerOptions, - origin: job.data.origin, - }, true); - } - } + await finishCrawlIfNeeded(job, sc); } logger.info(`🐂 Job done ${job.id}`); @@ -547,6 +551,8 @@ async function processJob(job: Job & { id: string }, token: string) { if (job.data.crawl_id) { const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; + + await addCrawlJobDone(job.data.crawl_id, job.id); await logJob({ job_id: job.id as string, @@ -567,6 +573,8 @@ async function processJob(job: Job & { id: string }, token: string) { origin: job.data.origin, crawl_id: job.data.crawl_id, }, true); + + await finishCrawlIfNeeded(job, sc); // await logJob({ // job_id: job.data.crawl_id,