From 7bb922071cce3008529128b99f8a0004021f75f3 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Wed, 7 Aug 2024 14:35:20 +0200 Subject: [PATCH] fix(queue-worker): manually renew lock (testing) --- apps/api/src/scraper/WebScraper/index.ts | 2 +- apps/api/src/services/queue-worker.ts | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index e667fa6b..c3834bcd 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -94,7 +94,7 @@ export class WebScraperDataProvider { const jobStatus = await job.getState(); if (jobStatus === "failed") { Logger.info( - "Job has failed or has been cancelled by the user. Stopping the job..." + "Job " + job.id + " has failed or has been cancelled by the user. Stopping the job..." ); return [] as Document[]; } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index e7767809..e46ffc1a 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -22,6 +22,11 @@ const wsq = getWebScraperQueue(); async function processJob(job: Job, done) { Logger.debug(`🐂 Worker taking job ${job.id}`); + const lockInterval = setInterval(() => { + Logger.debug(`🐂 Renewing lock for ${job.id}`); + job.extendLock(60000); + }, 15000); + try { job.progress({ current: 1, @@ -62,6 +67,7 @@ async function processJob(job: Job, done) { origin: job.data.origin, }); Logger.debug(`🐂 Job done ${job.id}`); + clearInterval(lockInterval); done(null, data); } catch (error) { Logger.error(`🐂 Job errored ${job.id} - ${error}`); @@ -108,8 +114,9 @@ async function processJob(job: Job, done) { pageOptions: job.data.pageOptions, origin: job.data.origin, }); + clearInterval(lockInterval); done(null, data); - } + } } wsq.process(