Files
firecrawl/apps/api/src/lib/concurrency-limit.ts
T

113 lines
2.7 KiB
TypeScript
Raw Normal View History

import { CONCURRENCY_LIMIT } from "../services/rate-limiter";
2024-10-25 20:21:12 +02:00
import { redisConnection } from "../services/queue-service";
import { PlanType } from "../types";
2025-01-31 11:22:10 +01:00
import type { Job, JobsOptions } from "bullmq";
2024-10-25 20:21:12 +02:00
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
2024-12-11 19:46:11 -03:00
const constructQueueKey = (team_id: string) =>
"concurrency-limit-queue:" + team_id;
2024-10-25 20:21:12 +02:00
2025-01-31 11:22:10 +01:00
export function calculateJobTimeToRun(
job: ConcurrencyLimitedJob
): number {
let jobTimeToRun = 86400000; // 24h (crawl)
2025-01-31 11:22:10 +01:00
if (job.data.scrapeOptions) {
if (job.data.scrapeOptions.timeout) {
jobTimeToRun = job.data.scrapeOptions.timeout;
}
if (job.data.scrapeOptions.waitFor) {
jobTimeToRun += job.data.scrapeOptions.waitFor;
}
(job.data.scrapeOptions.actions ?? []).forEach(x => {
if (x.type === "wait" && x.milliseconds) {
jobTimeToRun += x.milliseconds;
} else {
jobTimeToRun += 1000;
}
})
}
return jobTimeToRun;
}
2024-10-25 20:21:12 +02:00
2024-12-11 19:46:11 -03:00
export async function cleanOldConcurrencyLimitEntries(
team_id: string,
2024-12-11 19:51:08 -03:00
now: number = Date.now(),
2024-12-11 19:46:11 -03:00
) {
await redisConnection.zremrangebyscore(constructKey(team_id), -Infinity, now);
2024-10-25 20:21:12 +02:00
}
2024-12-11 19:46:11 -03:00
export async function getConcurrencyLimitActiveJobs(
team_id: string,
2024-12-11 19:51:08 -03:00
now: number = Date.now(),
2024-12-11 19:46:11 -03:00
): Promise<string[]> {
return await redisConnection.zrangebyscore(
constructKey(team_id),
now,
2024-12-11 19:51:08 -03:00
Infinity,
2024-12-11 19:46:11 -03:00
);
2024-10-25 20:21:12 +02:00
}
2024-12-11 19:46:11 -03:00
export async function pushConcurrencyLimitActiveJob(
team_id: string,
id: string,
2025-01-31 11:22:10 +01:00
timeout: number,
2024-12-11 19:51:08 -03:00
now: number = Date.now(),
2024-12-11 19:46:11 -03:00
) {
await redisConnection.zadd(
constructKey(team_id),
2025-01-31 11:22:10 +01:00
now + timeout,
2024-12-11 19:51:08 -03:00
id,
2024-12-11 19:46:11 -03:00
);
2024-10-25 20:21:12 +02:00
}
2024-12-11 19:46:11 -03:00
export async function removeConcurrencyLimitActiveJob(
team_id: string,
2024-12-11 19:51:08 -03:00
id: string,
2024-12-11 19:46:11 -03:00
) {
await redisConnection.zrem(constructKey(team_id), id);
2024-10-25 20:21:12 +02:00
}
export type ConcurrencyLimitedJob = {
2024-12-11 19:46:11 -03:00
id: string;
data: any;
opts: JobsOptions;
priority?: number;
};
2024-10-25 20:21:12 +02:00
2024-12-11 19:46:11 -03:00
export async function takeConcurrencyLimitedJob(
2024-12-11 19:51:08 -03:00
team_id: string,
2024-12-11 19:46:11 -03:00
): Promise<ConcurrencyLimitedJob | null> {
const res = await redisConnection.zmpop(1, constructQueueKey(team_id), "MIN");
if (res === null || res === undefined) {
return null;
}
2024-10-25 20:21:12 +02:00
2024-12-11 19:46:11 -03:00
return JSON.parse(res[1][0][0]);
2024-10-25 20:21:12 +02:00
}
2024-12-11 19:46:11 -03:00
export async function pushConcurrencyLimitedJob(
team_id: string,
2024-12-11 19:51:08 -03:00
job: ConcurrencyLimitedJob,
2024-12-11 19:46:11 -03:00
) {
await redisConnection.zadd(
constructQueueKey(team_id),
job.priority ?? 1,
2024-12-11 19:51:08 -03:00
JSON.stringify(job),
2024-12-11 19:46:11 -03:00
);
2024-10-25 20:21:12 +02:00
}
export async function getConcurrencyLimitedJobs(
team_id: string,
) {
return new Set((await redisConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id));
}
export async function getConcurrencyQueueJobsCount(team_id: string): Promise<number> {
const count = await redisConnection.zcard(constructQueueKey(team_id));
return count;
}