feat(queue-worker): PoC of concurrency limits

This commit is contained in:
Gergő Móricz
2024-09-26 20:23:13 +02:00
parent 30058b1da0
commit 53fce67ca1
+40 -6
View File
@@ -12,7 +12,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook"; import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job"; import { logJob } from "./logging/log_job";
import { initSDK } from "@hyperdx/node-opentelemetry"; import { initSDK } from "@hyperdx/node-opentelemetry";
import { Job } from "bullmq"; import { Job, Queue } from "bullmq";
import { Logger } from "../lib/logger"; import { Logger } from "../lib/logger";
import { Worker } from "bullmq"; import { Worker } from "bullmq";
import systemMonitor from "./system-monitor"; import systemMonitor from "./system-monitor";
@@ -99,10 +99,10 @@ process.on("SIGINT", () => {
}); });
const workerFun = async ( const workerFun = async (
queueName: string, queue: Queue,
processJobInternal: (token: string, job: Job) => Promise<any> processJobInternal: (token: string, job: Job) => Promise<any>
) => { ) => {
const worker = new Worker(queueName, null, { const worker = new Worker(queue.name, null, {
connection: redisConnection, connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds // lockRenewTime: 15 * 1000, // 15 seconds
@@ -129,6 +129,29 @@ const workerFun = async (
const job = await worker.getNextJob(token); const job = await worker.getNextJob(token);
if (job) { if (job) {
const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id;
if (job.data && job.data.team_id) {
const concurrencyLimit = 100; // TODO: determine based on price id
const now = Date.now();
const stalledJobTimeoutMs = 2 * 60 * 1000;
redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now);
const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity);
if (activeJobsOfTeam.length >= concurrencyLimit) {
Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit");
// Concurrency limit hit
await job.moveToFailed(new Error("Concurrency limit hit"), token, false);
await queue.add(job.name, job.data, {
...job.opts,
jobId: job.id,
priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs
})
} else {
await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id);
}
}
if (job.data && job.data.sentry && Sentry.isInitialized()) { if (job.data && job.data.sentry && Sentry.isInitialized()) {
Sentry.continueTrace( Sentry.continueTrace(
{ {
@@ -159,7 +182,15 @@ const workerFun = async (
}, },
}, },
async () => { async () => {
const res = await processJobInternal(token, job); let res;
try {
res = await processJobInternal(token, job);
} finally {
if (job.id && job.data && job.data.team_id) {
await redisConnection.zrem(concurrencyLimiterKey, job.id);
}
}
if (res !== null) { if (res !== null) {
span.setStatus({ code: 2 }); // ERROR span.setStatus({ code: 2 }); // ERROR
} else { } else {
@@ -181,7 +212,10 @@ const workerFun = async (
}, },
}, },
() => { () => {
processJobInternal(token, job); processJobInternal(token, job)
.finally(() => {
});
} }
); );
} }
@@ -193,7 +227,7 @@ const workerFun = async (
} }
}; };
workerFun(scrapeQueueName, processJobInternal); workerFun(getScrapeQueue(), processJobInternal);
async function processJob(job: Job, token: string) { async function processJob(job: Job, token: string) {
Logger.info(`🐂 Worker taking job ${job.id}`); Logger.info(`🐂 Worker taking job ${job.id}`);