concurrency limit fix PoC II.

This commit is contained in:
Gergő Móricz
2024-10-25 20:21:12 +02:00
parent dbcf2d7ff6
commit 4468a49aee
10 changed files with 162 additions and 102 deletions
+50 -11
View File
@@ -1,20 +1,47 @@
import { Job, Queue } from "bullmq";
import { Job, JobsOptions } from "bullmq";
import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import { cleanOldConcurrencyLimitEntries, getConcurrencyLimitActiveJobs, getConcurrencyLimitMax, pushConcurrencyLimitActiveJob, pushConcurrencyLimitedJob } from "../lib/concurrency-limit";
async function addScrapeJobRaw(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number = 10
): Promise<Job> {
return await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
jobId,
});
) {
let concurrencyLimited = false;
if (webScraperOptions && webScraperOptions.team_id && webScraperOptions.plan) {
const now = Date.now();
const limit = await getConcurrencyLimitMax(webScraperOptions.plan);
cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
concurrencyLimited = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length >= limit;
}
if (concurrencyLimited) {
await pushConcurrencyLimitedJob(webScraperOptions.team_id, {
id: jobId,
data: webScraperOptions,
opts: {
...options,
priority: jobPriority,
jobId: jobId,
},
priority: jobPriority,
});
} else {
if (webScraperOptions && webScraperOptions.team_id && webScraperOptions.plan) {
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId);
}
await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
jobId,
});
}
}
export async function addScrapeJob(
@@ -22,8 +49,7 @@ export async function addScrapeJob(
options: any = {},
jobId: string = uuidv4(),
jobPriority: number = 10
): Promise<Job> {
) {
if (Sentry.isInitialized()) {
const size = JSON.stringify(webScraperOptions).length;
return await Sentry.startSpan({
@@ -35,7 +61,7 @@ export async function addScrapeJob(
"messaging.message.body.size": size,
},
}, async (span) => {
return await addScrapeJobRaw({
await addScrapeJobRaw({
...webScraperOptions,
sentry: {
trace: Sentry.spanToTraceHeader(span),
@@ -45,10 +71,23 @@ export async function addScrapeJob(
}, options, jobId, jobPriority);
});
} else {
return await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
}
}
export async function addScrapeJobs(
jobs: {
data: WebScraperOptions,
opts: {
jobId: string,
priority: number,
},
}[],
) {
// TODO: better
await Promise.all(jobs.map(job => addScrapeJob(job.data, job.opts, job.opts.jobId, job.opts.priority)));
}
export function waitForJob(jobId: string, timeout: number) {
return new Promise((resolve, reject) => {
const start = Date.now();