Files
firecrawl/apps/api/src/services/queue-jobs.ts
T

147 lines
3.8 KiB
TypeScript
Raw Normal View History

2024-10-25 20:21:12 +02:00
import { Job, JobsOptions } from "bullmq";
2024-08-13 21:03:24 +02:00
import { getScrapeQueue } from "./queue-service";
2024-04-15 17:01:47 -04:00
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
2024-12-11 19:46:11 -03:00
import {
cleanOldConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
getConcurrencyLimitMax,
pushConcurrencyLimitActiveJob,
2024-12-11 19:51:08 -03:00
pushConcurrencyLimitedJob,
2024-12-11 19:46:11 -03:00
} from "../lib/concurrency-limit";
2024-04-15 17:01:47 -04:00
async function addScrapeJobRaw(
webScraperOptions: any,
options: any,
jobId: string,
2024-12-11 19:51:08 -03:00
jobPriority: number = 10,
2024-10-25 20:21:12 +02:00
) {
let concurrencyLimited = false;
2024-12-11 19:46:11 -03:00
if (
webScraperOptions &&
webScraperOptions.team_id &&
webScraperOptions.plan
) {
2024-10-25 20:21:12 +02:00
const now = Date.now();
const limit = await getConcurrencyLimitMax(webScraperOptions.plan);
cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
2024-12-11 19:46:11 -03:00
concurrencyLimited =
(await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now))
.length >= limit;
2024-10-25 20:21:12 +02:00
}
if (concurrencyLimited) {
await pushConcurrencyLimitedJob(webScraperOptions.team_id, {
id: jobId,
data: webScraperOptions,
opts: {
...options,
priority: jobPriority,
2024-12-11 19:51:08 -03:00
jobId: jobId,
2024-10-25 20:21:12 +02:00
},
2024-12-11 19:51:08 -03:00
priority: jobPriority,
2024-10-25 20:21:12 +02:00
});
} else {
2024-12-11 19:46:11 -03:00
if (
webScraperOptions &&
webScraperOptions.team_id &&
webScraperOptions.plan
) {
2024-10-25 20:21:12 +02:00
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId);
}
await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
2024-12-11 19:51:08 -03:00
jobId,
2024-10-25 20:21:12 +02:00
});
}
2024-07-30 14:44:13 -04:00
}
export async function addScrapeJob(
webScraperOptions: WebScraperOptions,
options: any = {},
jobId: string = uuidv4(),
2024-12-11 19:51:08 -03:00
jobPriority: number = 10,
2024-10-25 20:21:12 +02:00
) {
if (Sentry.isInitialized()) {
const size = JSON.stringify(webScraperOptions).length;
2024-12-11 19:46:11 -03:00
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": jobId,
"messaging.destination.name": getScrapeQueue().name,
2024-12-11 19:51:08 -03:00
"messaging.message.body.size": size,
},
},
2024-12-11 19:46:11 -03:00
async (span) => {
await addScrapeJobRaw(
{
...webScraperOptions,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
2024-12-11 19:51:08 -03:00
size,
},
2024-12-11 19:46:11 -03:00
},
options,
jobId,
2024-12-11 19:51:08 -03:00
jobPriority,
2024-12-11 19:46:11 -03:00
);
2024-12-11 19:51:08 -03:00
},
2024-12-11 19:46:11 -03:00
);
} else {
2024-10-25 20:21:12 +02:00
await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
}
}
2024-10-25 20:21:12 +02:00
export async function addScrapeJobs(
jobs: {
2024-12-11 19:46:11 -03:00
data: WebScraperOptions;
2024-10-25 20:21:12 +02:00
opts: {
2024-12-11 19:46:11 -03:00
jobId: string;
priority: number;
};
2024-12-11 19:51:08 -03:00
}[],
2024-10-25 20:21:12 +02:00
) {
if (jobs.length === 0) return true;
2024-10-25 20:21:12 +02:00
// TODO: better
2024-12-11 19:46:11 -03:00
await Promise.all(
jobs.map((job) =>
2024-12-11 19:51:08 -03:00
addScrapeJob(job.data, job.opts, job.opts.jobId, job.opts.priority),
),
2024-12-11 19:46:11 -03:00
);
2024-10-25 20:21:12 +02:00
}
2024-12-11 19:46:11 -03:00
export function waitForJob<T = unknown>(
jobId: string,
2024-12-11 19:51:08 -03:00
timeout: number,
2024-12-11 19:46:11 -03:00
): Promise<T> {
2024-08-23 18:27:00 +02:00
return new Promise((resolve, reject) => {
const start = Date.now();
const int = setInterval(async () => {
if (Date.now() >= start + timeout) {
clearInterval(int);
reject(new Error("Job wait "));
2024-08-23 19:14:49 +02:00
} else {
const state = await getScrapeQueue().getJobState(jobId);
if (state === "completed") {
clearInterval(int);
2024-11-07 20:57:33 +01:00
resolve((await getScrapeQueue().getJob(jobId))!.returnvalue);
2024-08-23 19:14:49 +02:00
} else if (state === "failed") {
// console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason);
2024-09-26 20:39:19 +02:00
const job = await getScrapeQueue().getJob(jobId);
if (job && job.failedReason !== "Concurrency limit hit") {
2024-09-26 20:39:19 +02:00
clearInterval(int);
reject(job.failedReason);
}
2024-08-23 19:14:49 +02:00
}
2024-08-23 18:27:00 +02:00
}
2024-11-24 19:48:57 -08:00
}, 250);
2024-12-11 19:46:11 -03:00
});
2024-08-23 18:27:00 +02:00
}