From e690a6fda7d0b600880fbd1f988282b8c8fa5459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 22 Aug 2024 22:38:39 +0200 Subject: [PATCH] fix: remove QueueEvents --- apps/api/src/controllers/scrape.ts | 2 +- apps/api/src/controllers/search.ts | 15 +++++++++++++-- apps/api/src/services/queue-service.ts | 6 +++--- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 4f992891..3ffbc92b 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -9,7 +9,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import import { numTokensFromString } from '../lib/LLM-extraction/helpers'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values'; import { addScrapeJob } from '../services/queue-jobs'; -import { getScrapeQueue, scrapeQueueEvents } from '../services/queue-service'; +import { getScrapeQueue } from '../services/queue-service'; import { v4 as uuidv4 } from "uuid"; import { Logger } from '../lib/logger'; import * as Sentry from "@sentry/node"; diff --git a/apps/api/src/controllers/search.ts b/apps/api/src/controllers/search.ts index 304176a3..d86862b1 100644 --- a/apps/api/src/controllers/search.ts +++ b/apps/api/src/controllers/search.ts @@ -9,7 +9,7 @@ import { search } from "../search"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../lib/logger"; -import { getScrapeQueue, scrapeQueueEvents } from "../services/queue-service"; +import { getScrapeQueue } from "../services/queue-service"; import * as Sentry from "@sentry/node"; import { addScrapeJob } from "../services/queue-jobs"; @@ -108,7 +108,18 @@ export async function searchHelper( await getScrapeQueue().addBulk(jobs); } - const docs = (await Promise.all(jobs.map(x => x.waitUntilFinished(scrapeQueueEvents, 60000)))).map(x => x[0]); + const docs = (await Promise.all(jobs.map(x => new Promise((resolve, reject) => { + const start = Date.now(); + const int = setInterval(async () => { + if (Date.now() >= start + 60000) { + clearInterval(int); + reject(new Error("Job wait ")); + } else if (await x.getState() === "completed") { + clearInterval(int); + resolve((await getScrapeQueue().getJob(x.id)).returnvalue); + } + }, 1000); + })))).map(x => x[0]); if (docs.length === 0) { return { success: true, error: "No search results found", returnCode: 200 }; diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index 2e6d7562..113b3fa3 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -35,6 +35,6 @@ export function getScrapeQueue() { } -import { QueueEvents } from 'bullmq'; - -export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); \ No newline at end of file +// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE +// import { QueueEvents } from 'bullmq'; +// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); \ No newline at end of file