diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 37e14baf..76796742 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -12,7 +12,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; import { initSDK } from "@hyperdx/node-opentelemetry"; -import { Job } from "bullmq"; +import { Job, Queue } from "bullmq"; import { Logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; @@ -99,10 +99,10 @@ process.on("SIGINT", () => { }); const workerFun = async ( - queueName: string, + queue: Queue, processJobInternal: (token: string, job: Job) => Promise ) => { - const worker = new Worker(queueName, null, { + const worker = new Worker(queue.name, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute // lockRenewTime: 15 * 1000, // 15 seconds @@ -129,6 +129,29 @@ const workerFun = async ( const job = await worker.getNextJob(token); if (job) { + const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; + + if (job.data && job.data.team_id) { + const concurrencyLimit = 100; // TODO: determine based on price id + const now = Date.now(); + const stalledJobTimeoutMs = 2 * 60 * 1000; + + redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); + const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); + if (activeJobsOfTeam.length >= concurrencyLimit) { + Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); + // Concurrency limit hit + await job.moveToFailed(new Error("Concurrency limit hit"), token, false); + await queue.add(job.name, job.data, { + ...job.opts, + jobId: job.id, + priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs + }) + } else { + await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); + } + } + if (job.data && job.data.sentry && Sentry.isInitialized()) { Sentry.continueTrace( { @@ -159,7 +182,15 @@ const workerFun = async ( }, }, async () => { - const res = await processJobInternal(token, job); + let res; + try { + res = await processJobInternal(token, job); + } finally { + if (job.id && job.data && job.data.team_id) { + await redisConnection.zrem(concurrencyLimiterKey, job.id); + } + } + if (res !== null) { span.setStatus({ code: 2 }); // ERROR } else { @@ -181,7 +212,10 @@ const workerFun = async ( }, }, () => { - processJobInternal(token, job); + processJobInternal(token, job) + .finally(() => { + + }); } ); } @@ -193,7 +227,7 @@ const workerFun = async ( } }; -workerFun(scrapeQueueName, processJobInternal); +workerFun(getScrapeQueue(), processJobInternal); async function processJob(job: Job, token: string) { Logger.info(`🐂 Worker taking job ${job.id}`);