fix(concurrency-limit): move to renewing a lock on each job instead of estimating time to complete (#1197)

This commit is contained in:
Gergő Móricz
2025-02-19 20:13:22 +01:00
committed by GitHub
parent acf1e60608
commit 055f7d2da0
3 changed files with 9 additions and 44 deletions
+1 -29
View File
@@ -1,38 +1,10 @@
import { CONCURRENCY_LIMIT } from "../services/rate-limiter";
import { redisConnection } from "../services/queue-service"; import { redisConnection } from "../services/queue-service";
import { PlanType } from "../types"; import type { JobsOptions } from "bullmq";
import type { Job, JobsOptions } from "bullmq";
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
const constructQueueKey = (team_id: string) => const constructQueueKey = (team_id: string) =>
"concurrency-limit-queue:" + team_id; "concurrency-limit-queue:" + team_id;
export function calculateJobTimeToRun(
job: ConcurrencyLimitedJob
): number {
let jobTimeToRun = 86400000; // 24h (crawl)
if (job.data.scrapeOptions) {
if (job.data.scrapeOptions.timeout) {
jobTimeToRun = job.data.scrapeOptions.timeout;
}
if (job.data.scrapeOptions.waitFor) {
jobTimeToRun += job.data.scrapeOptions.waitFor;
}
(job.data.scrapeOptions.actions ?? []).forEach(x => {
if (x.type === "wait" && x.milliseconds) {
jobTimeToRun += x.milliseconds;
} else {
jobTimeToRun += 1000;
}
})
}
return jobTimeToRun;
}
export async function cleanOldConcurrencyLimitEntries( export async function cleanOldConcurrencyLimitEntries(
team_id: string, team_id: string,
now: number = Date.now(), now: number = Date.now(),
+2 -13
View File
@@ -1,10 +1,8 @@
import { Job, JobsOptions } from "bullmq";
import { getScrapeQueue } from "./queue-service"; import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { NotificationType, PlanType, WebScraperOptions } from "../types"; import { PlanType, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
import { import {
calculateJobTimeToRun,
cleanOldConcurrencyLimitEntries, cleanOldConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs, getConcurrencyLimitActiveJobs,
getConcurrencyQueueJobsCount, getConcurrencyQueueJobsCount,
@@ -13,7 +11,6 @@ import {
} from "../lib/concurrency-limit"; } from "../lib/concurrency-limit";
import { logger } from "../lib/logger"; import { logger } from "../lib/logger";
import { getConcurrencyLimitMax } from "./rate-limiter"; import { getConcurrencyLimitMax } from "./rate-limiter";
import { sendNotificationWithCustomDays } from "./notification/email_notification";
async function _addScrapeJobToConcurrencyQueue( async function _addScrapeJobToConcurrencyQueue(
webScraperOptions: any, webScraperOptions: any,
@@ -44,15 +41,7 @@ export async function _addScrapeJobToBullMQ(
webScraperOptions.team_id && webScraperOptions.team_id &&
webScraperOptions.plan webScraperOptions.plan
) { ) {
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, calculateJobTimeToRun({ await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout
id: jobId,
opts: {
...options,
priority: jobPriority,
jobId,
},
data: webScraperOptions,
}));
} }
await getScrapeQueue().add(jobId, webScraperOptions, { await getScrapeQueue().add(jobId, webScraperOptions, {
+6 -2
View File
@@ -52,7 +52,6 @@ import { configDotenv } from "dotenv";
import { scrapeOptions } from "../controllers/v1/types"; import { scrapeOptions } from "../controllers/v1/types";
import { getRateLimiterPoints } from "./rate-limiter"; import { getRateLimiterPoints } from "./rate-limiter";
import { import {
calculateJobTimeToRun,
cleanOldConcurrencyLimitEntries, cleanOldConcurrencyLimitEntries,
pushConcurrencyLimitActiveJob, pushConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob,
@@ -247,6 +246,11 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
extendInterval: jobLockExtendInterval, extendInterval: jobLockExtendInterval,
extensionTime: jobLockExtensionTime, extensionTime: jobLockExtensionTime,
}); });
if (job.data?.mode !== "kickoff" && job.data?.team_id) {
await pushConcurrencyLimitActiveJob(job.data.team_id, job.id, 60 * 1000); // 60s lock renew, just like in the queue
}
await job.extendLock(token, jobLockExtensionTime); await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval); }, jobLockExtendInterval);
@@ -597,7 +601,7 @@ const workerFun = async (
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
if (nextJob !== null) { if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id, calculateJobTimeToRun(nextJob)); await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id, 60 * 1000); // 60s initial timeout
await queue.add( await queue.add(
nextJob.id, nextJob.id,