Files
firecrawl/apps/api/src/services/queue-worker.ts
T

1132 lines
34 KiB
TypeScript
Raw Normal View History

2024-08-20 19:25:19 +02:00
import "dotenv/config";
2024-09-01 14:19:43 -03:00
import "./sentry";
2024-08-21 17:58:27 +02:00
import * as Sentry from "@sentry/node";
2024-04-15 17:01:47 -04:00
import { CustomError } from "../lib/custom-error";
2024-07-30 14:44:13 -04:00
import {
getScrapeQueue,
2025-01-03 20:44:27 -03:00
getExtractQueue,
2024-07-30 14:44:13 -04:00
redisConnection,
2024-12-11 19:51:08 -03:00
scrapeQueueName,
2025-01-03 20:44:27 -03:00
extractQueueName,
2024-07-30 14:44:13 -04:00
} from "./queue-service";
2024-04-15 17:01:47 -04:00
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
2024-04-20 13:53:11 -07:00
import { logJob } from "./logging/log_job";
import { Job, Queue } from "bullmq";
import { logger as _logger } from "../lib/logger";
2024-07-30 13:27:23 -04:00
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
2024-09-01 14:19:43 -03:00
import {
addCrawlJob,
addCrawlJobDone,
2024-12-27 19:59:26 +01:00
addCrawlJobs,
2024-09-01 14:19:43 -03:00
crawlToCrawler,
finishCrawl,
2024-12-03 17:53:17 -03:00
generateURLPermutations,
2024-09-01 14:19:43 -03:00
getCrawl,
getCrawlJobCount,
2024-09-01 14:19:43 -03:00
getCrawlJobs,
lockURL,
2024-12-27 19:59:26 +01:00
lockURLs,
2025-01-07 19:15:23 +01:00
lockURLsIndividually,
2024-12-11 19:51:08 -03:00
normalizeURL,
2025-01-07 19:15:23 +01:00
saveCrawl,
2024-09-01 14:19:43 -03:00
} from "../lib/crawl-redis";
2024-08-13 20:51:43 +02:00
import { StoredCrawl } from "../lib/crawl-redis";
2024-12-27 19:59:26 +01:00
import { addScrapeJob, addScrapeJobs } from "./queue-jobs";
2024-09-01 14:19:43 -03:00
import {
addJobPriority,
deleteJobPriority,
2024-12-11 19:51:08 -03:00
getJobPriority,
2024-09-01 14:19:43 -03:00
} from "../../src/lib/job-priority";
import { PlanType, RateLimiterMode } from "../types";
2024-11-07 20:57:33 +01:00
import { getJobs } from "..//controllers/v1/crawl-status";
2024-09-04 15:57:57 -03:00
import { configDotenv } from "dotenv";
2024-11-07 20:57:33 +01:00
import { scrapeOptions } from "../controllers/v1/types";
import { getRateLimiterPoints } from "./rate-limiter";
2024-12-11 19:46:11 -03:00
import {
cleanOldConcurrencyLimitEntries,
pushConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob,
2024-12-11 19:51:08 -03:00
takeConcurrencyLimitedJob,
2024-12-11 19:46:11 -03:00
} from "../lib/concurrency-limit";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
2024-12-30 21:43:59 -03:00
import { indexPage } from "../lib/extract/index/pinecone";
import { Document } from "../controllers/v1/types";
2025-01-03 20:44:27 -03:00
import { performExtraction } from "../lib/extract/extraction-service";
2025-01-03 21:48:28 -03:00
import { supabase_service } from "../services/supabase";
2025-01-03 23:54:03 -03:00
import { normalizeUrl, normalizeUrlOnlyHostname } from "../lib/canonical-url";
import { saveExtract, updateExtract } from "../lib/extract/extract-redis";
import { billTeam } from "./billing/credit_billing";
2024-12-30 21:43:59 -03:00
2024-09-04 15:57:57 -03:00
configDotenv();
2024-05-20 13:36:34 -07:00
class RacedRedirectError extends Error {
constructor() {
2024-12-11 19:46:11 -03:00
super("Raced redirect error");
}
}
2024-07-30 13:27:23 -04:00
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
2024-04-15 17:01:47 -04:00
2024-07-30 13:27:23 -04:00
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval =
Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000;
const jobLockExtendInterval =
Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000;
const jobLockExtensionTime =
Number(process.env.JOB_LOCK_EXTENSION_TIME) || 60000;
2024-07-30 13:27:23 -04:00
const cantAcceptConnectionInterval =
Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000;
const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
2024-07-11 20:08:21 +02:00
2025-01-09 16:04:59 +01:00
const runningJobs: Set<string> = new Set();
2024-12-11 19:46:11 -03:00
async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
if (await finishCrawl(job.data.crawl_id)) {
2025-01-03 22:50:53 -03:00
(async () => {
const originUrl = sc.originUrl ? normalizeUrlOnlyHostname(sc.originUrl) : undefined;
// Get all visited unique URLs from Redis
2025-01-03 22:50:53 -03:00
const visitedUrls = await redisConnection.smembers(
"crawl:" + job.data.crawl_id + ":visited_unique",
2025-01-03 22:50:53 -03:00
);
// Upload to Supabase if we have URLs and this is a crawl (not a batch scrape)
2025-01-10 18:35:10 -03:00
if (
visitedUrls.length > 0 &&
job.data.crawlerOptions !== null &&
originUrl
) {
2025-01-03 22:50:53 -03:00
// Fire and forget the upload to Supabase
try {
// Standardize URLs to canonical form (https, no www)
const standardizedUrls = [
...new Set(
visitedUrls.map((url) => {
return normalizeUrl(url);
}),
),
];
// First check if entry exists for this origin URL
const { data: existingMap } = await supabase_service
.from("crawl_maps")
.select("urls")
.eq("origin_url", originUrl)
.single();
2025-01-03 22:15:23 -03:00
2025-01-03 22:50:53 -03:00
if (existingMap) {
// Merge URLs, removing duplicates
const mergedUrls = [
...new Set([...existingMap.urls, ...standardizedUrls]),
];
2025-01-03 22:15:23 -03:00
2025-01-03 22:50:53 -03:00
const { error } = await supabase_service
.from("crawl_maps")
.update({
urls: mergedUrls,
num_urls: mergedUrls.length,
updated_at: new Date().toISOString(),
})
.eq("origin_url", originUrl);
if (error) {
_logger.error("Failed to update crawl map", { error });
}
} else {
// Insert new entry if none exists
const { error } = await supabase_service.from("crawl_maps").insert({
origin_url: originUrl,
urls: standardizedUrls,
num_urls: standardizedUrls.length,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
2025-01-03 22:15:23 -03:00
});
2025-01-03 22:50:53 -03:00
if (error) {
_logger.error("Failed to save crawl map", { error });
}
2025-01-03 22:15:23 -03:00
}
2025-01-03 22:50:53 -03:00
} catch (error) {
_logger.error("Error saving crawl map", { error });
2025-01-03 21:48:28 -03:00
}
}
2025-01-03 22:50:53 -03:00
})();
2025-01-03 21:48:28 -03:00
if (!job.data.v1) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
2024-12-11 19:46:11 -03:00
const jobs = (await getJobs(jobIDs)).sort(
2024-12-11 19:51:08 -03:00
(a, b) => a.timestamp - b.timestamp,
2024-12-11 19:46:11 -03:00
);
// const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
2024-12-11 19:46:11 -03:00
const jobStatus = sc.cancelled // || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed";
const fullDocs = jobs
.map((x) =>
x.returnvalue
? Array.isArray(x.returnvalue)
? x.returnvalue[0]
: x.returnvalue
2024-12-11 19:51:08 -03:00
: null,
2024-12-11 19:46:11 -03:00
)
.filter((x) => x !== null);
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : undefined,
num_docs: fullDocs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
url: sc.originUrl!,
scrapeOptions: sc.scrapeOptions,
crawlerOptions: sc.crawlerOptions,
2024-12-11 19:51:08 -03:00
origin: job.data.origin,
});
const data = {
success: jobStatus !== "failed",
result: {
links: fullDocs.map((doc) => {
return {
content: doc,
2024-12-11 19:51:08 -03:00
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
2024-12-11 19:51:08 -03:00
}),
},
project_id: job.data.project_id,
2024-12-11 19:51:08 -03:00
docs: fullDocs,
};
// v0 web hooks, call when done with all the data
if (!job.data.v1) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
2024-12-11 19:46:11 -03:00
job.data.crawlerOptions !== null
? "crawl.completed"
2024-12-11 19:51:08 -03:00
: "batch_scrape.completed",
);
}
} else {
const num_docs = await getCrawlJobCount(job.data.crawl_id);
2024-12-11 19:46:11 -03:00
const jobStatus = sc.cancelled ? "failed" : "completed";
2024-12-11 19:46:11 -03:00
await logJob(
{
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : undefined,
num_docs,
2024-12-11 19:46:11 -03:00
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
scrapeOptions: sc.scrapeOptions,
mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
url:
sc?.originUrl ??
(job.data.crawlerOptions === null ? "Batch Scrape" : "Unknown"),
crawlerOptions: sc.crawlerOptions,
2024-12-11 19:51:08 -03:00
origin: job.data.origin,
2024-12-11 19:46:11 -03:00
},
2024-12-11 19:51:08 -03:00
true,
2024-12-11 19:46:11 -03:00
);
2025-01-07 19:38:17 +01:00
// v1 web hooks, call when done with no data, but with event completed
if (job.data.v1 && job.data.webhook) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
[],
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null
? "crawl.completed"
: "batch_scrape.completed",
);
}
}
}
}
2024-11-07 20:57:33 +01:00
const processJobInternal = async (token: string, job: Job & { id: string }) => {
2024-12-11 19:46:11 -03:00
const logger = _logger.child({
module: "queue-worker",
method: "processJobInternal",
jobId: job.id,
scrapeId: job.id,
2024-12-11 19:51:08 -03:00
crawlId: job.data?.crawl_id ?? undefined,
2024-12-11 19:46:11 -03:00
});
2024-07-30 13:27:23 -04:00
const extendLockInterval = setInterval(async () => {
2024-11-07 20:57:33 +01:00
logger.info(`🐂 Worker extending lock on job ${job.id}`);
2024-07-30 13:27:23 -04:00
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
2024-09-01 14:19:43 -03:00
await addJobPriority(job.data.team_id, job.id);
let err = null;
2024-07-30 13:27:23 -04:00
try {
2024-12-27 19:59:26 +01:00
if (job.data?.mode === "kickoff") {
const result = await processKickoffJob(job, token);
if (result.success) {
try {
2024-11-07 20:57:33 +01:00
await job.moveToCompleted(null, token, false);
2024-12-27 19:59:26 +01:00
} catch (e) {}
} else {
logger.debug("Job failed", { result, mode: job.data.mode });
await job.moveToFailed((result as any).error, token, false);
}
2024-11-07 20:57:33 +01:00
} else {
2024-12-27 19:59:26 +01:00
const result = await processJob(job, token);
if (result.success) {
try {
2024-12-30 21:43:59 -03:00
if (
job.data.crawl_id &&
process.env.USE_DB_AUTHENTICATION === "true"
) {
2024-12-27 19:59:26 +01:00
logger.debug(
"Job succeeded -- has crawl associated, putting null in Redis",
);
await job.moveToCompleted(null, token, false);
} else {
logger.debug("Job succeeded -- putting result in Redis");
await job.moveToCompleted(result.document, token, false);
}
} catch (e) {}
} else {
logger.debug("Job failed", { result });
await job.moveToFailed((result as any).error, token, false);
}
2024-11-07 20:57:33 +01:00
}
2024-07-30 13:27:23 -04:00
} catch (error) {
logger.debug("Job failed", { error });
Sentry.captureException(error);
err = error;
2024-07-30 13:27:23 -04:00
await job.moveToFailed(error, token, false);
} finally {
2024-09-01 14:19:43 -03:00
await deleteJobPriority(job.data.team_id, job.id);
2024-07-30 13:27:23 -04:00
clearInterval(extendLockInterval);
}
return err;
2024-07-30 13:27:23 -04:00
};
2025-01-10 18:35:10 -03:00
const processExtractJobInternal = async (
token: string,
job: Job & { id: string },
) => {
2025-01-03 20:44:27 -03:00
const logger = _logger.child({
module: "extract-worker",
method: "processJobInternal",
jobId: job.id,
extractId: job.data.extractId,
teamId: job.data?.teamId ?? undefined,
});
const extendLockInterval = setInterval(async () => {
logger.info(`🔄 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
const result = await performExtraction(job.data.extractId, {
request: job.data.request,
teamId: job.data.teamId,
plan: job.data.plan,
subId: job.data.subId,
});
if (result.success) {
// Move job to completed state in Redis
await job.moveToCompleted(result, token, false);
return result;
} else {
// throw new Error(result.error || "Unknown error during extraction");
await job.moveToCompleted(result, token, false);
await updateExtract(job.data.extractId, {
status: "failed",
error: result.error ?? "Unknown error, please contact help@firecrawl.com. Extract id: " + job.data.extractId,
});
return result;
2025-01-03 20:44:27 -03:00
}
} catch (error) {
logger.error(`🚫 Job errored ${job.id} - ${error}`, { error });
2025-01-10 18:35:10 -03:00
2025-01-03 20:44:27 -03:00
Sentry.captureException(error, {
data: {
job: job.id,
},
});
2025-01-10 18:35:10 -03:00
2025-01-03 20:44:27 -03:00
// Move job to failed state in Redis
await job.moveToFailed(error, token, false);
await updateExtract(job.data.extractId, {
status: "failed",
2025-01-10 18:35:10 -03:00
error:
error.error ??
error ??
"Unknown error, please contact help@firecrawl.com. Extract id: " +
2025-01-10 18:35:10 -03:00
job.data.extractId,
});
return { success: false, error: error.error ?? error ?? "Unknown error, please contact help@firecrawl.com. Extract id: " + job.data.extractId };
2025-01-07 12:13:12 -03:00
// throw error;
2025-01-03 20:44:27 -03:00
} finally {
clearInterval(extendLockInterval);
}
};
2024-07-30 13:27:23 -04:00
let isShuttingDown = false;
2024-10-03 18:44:40 -03:00
process.on("SIGINT", () => {
console.log("Received SIGTERM. Shutting down gracefully...");
isShuttingDown = true;
});
2024-10-04 00:40:09 +03:00
process.on("SIGTERM", () => {
console.log("Received SIGTERM. Shutting down gracefully...");
2024-07-30 13:27:23 -04:00
isShuttingDown = true;
});
2024-11-08 20:19:44 +01:00
let cantAcceptConnectionCount = 0;
2024-09-01 14:19:43 -03:00
const workerFun = async (
queue: Queue,
2024-12-11 19:51:08 -03:00
processJobInternal: (token: string, job: Job) => Promise<any>,
2024-09-01 14:19:43 -03:00
) => {
const logger = _logger.child({ module: "queue-worker", method: "workerFun" });
const worker = new Worker(queue.name, null, {
2024-07-30 13:27:23 -04:00
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
stalledInterval: 30 * 1000, // 30 seconds
2024-12-11 19:51:08 -03:00
maxStalledCount: 10, // 10 times
2024-07-30 13:27:23 -04:00
});
worker.startStalledCheckTimer();
const monitor = await systemMonitor;
while (true) {
if (isShuttingDown) {
console.log("No longer accepting new jobs. SIGINT");
break;
}
const token = uuidv4();
const canAcceptConnection = await monitor.acceptConnection();
if (!canAcceptConnection) {
console.log("Cant accept connection");
2024-11-08 20:19:44 +01:00
cantAcceptConnectionCount++;
if (cantAcceptConnectionCount >= 25) {
logger.error("WORKER STALLED", {
cpuUsage: await monitor.checkCpuUsage(),
2024-12-11 19:51:08 -03:00
memoryUsage: await monitor.checkMemoryUsage(),
2024-11-08 20:19:44 +01:00
});
}
2024-07-30 13:27:23 -04:00
await sleep(cantAcceptConnectionInterval); // more sleep
continue;
2024-11-08 20:19:44 +01:00
} else {
cantAcceptConnectionCount = 0;
2024-07-30 13:27:23 -04:00
}
const job = await worker.getNextJob(token);
if (job) {
2025-01-09 16:04:59 +01:00
if (job.id) {
runningJobs.add(job.id);
}
2024-10-25 20:21:12 +02:00
async function afterJobDone(job: Job<any, any, string>) {
2025-01-09 16:04:59 +01:00
if (job.id) {
runningJobs.delete(job.id);
}
2024-10-25 20:21:12 +02:00
if (job.id && job.data && job.data.team_id && job.data.plan) {
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
cleanOldConcurrencyLimitEntries(job.data.team_id);
// Queue up next job, if it exists
// No need to check if we're under the limit here -- if the current job is finished,
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id);
2024-12-11 19:46:11 -03:00
await queue.add(
nextJob.id,
{
...nextJob.data,
2024-12-11 19:51:08 -03:00
concurrencyLimitHit: true,
2024-12-11 19:46:11 -03:00
},
{
...nextJob.opts,
jobId: nextJob.id,
2024-12-11 19:51:08 -03:00
priority: nextJob.priority,
},
2024-12-11 19:46:11 -03:00
);
2024-10-25 20:21:12 +02:00
}
}
}
if (job.data && job.data.sentry && Sentry.isInitialized()) {
2024-09-01 14:19:43 -03:00
Sentry.continueTrace(
{
sentryTrace: job.data.sentry.trace,
2024-12-11 19:51:08 -03:00
baggage: job.data.sentry.baggage,
2024-09-01 14:19:43 -03:00
},
() => {
Sentry.startSpan(
{
name: "Scrape job",
attributes: {
job: job.id,
2024-12-11 19:51:08 -03:00
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
2024-09-01 14:19:43 -03:00
},
async (span) => {
await Sentry.startSpan(
{
name: "Process scrape job",
op: "queue.process",
attributes: {
"messaging.message.id": job.id,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": job.data.sentry.size,
"messaging.message.receive.latency":
Date.now() - (job.processedOn ?? job.timestamp),
2024-12-11 19:51:08 -03:00
"messaging.message.retry.count": job.attemptsMade,
},
2024-09-01 14:19:43 -03:00
},
async () => {
let res;
try {
res = await processJobInternal(token, job);
2024-12-11 19:46:11 -03:00
} finally {
await afterJobDone(job);
}
2024-12-11 19:46:11 -03:00
2024-09-01 14:19:43 -03:00
if (res !== null) {
span.setStatus({ code: 2 }); // ERROR
} else {
span.setStatus({ code: 1 }); // OK
}
2024-12-11 19:51:08 -03:00
},
2024-09-01 14:19:43 -03:00
);
2024-12-11 19:51:08 -03:00
},
2024-09-01 14:19:43 -03:00
);
2024-12-11 19:51:08 -03:00
},
2024-09-01 14:19:43 -03:00
);
} else {
Sentry.startSpan(
{
name: "Scrape job",
attributes: {
job: job.id,
2024-12-11 19:51:08 -03:00
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
},
2024-09-01 14:19:43 -03:00
() => {
2024-12-11 19:46:11 -03:00
processJobInternal(token, job).finally(() => afterJobDone(job));
2024-12-11 19:51:08 -03:00
},
2024-09-01 14:19:43 -03:00
);
}
2024-09-01 14:19:43 -03:00
2024-07-30 13:27:23 -04:00
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);
}
}
};
2024-12-27 19:59:26 +01:00
async function processKickoffJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({
module: "queue-worker",
method: "processKickoffJob",
jobId: job.id,
scrapeId: job.id,
crawlId: job.data?.crawl_id ?? undefined,
teamId: job.data?.team_id ?? undefined,
});
try {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
const crawler = crawlToCrawler(job.data.crawl_id, sc);
2025-01-07 19:15:23 +01:00
logger.debug("Locking URL...");
await lockURL(job.data.crawl_id, sc, job.data.url);
const jobId = uuidv4();
logger.debug("Adding scrape job to Redis...", { jobId });
await addScrapeJob(
{
url: job.data.url,
mode: "single_urls",
team_id: job.data.team_id,
crawlerOptions: job.data.crawlerOptions,
scrapeOptions: scrapeOptions.parse(job.data.scrapeOptions),
internalOptions: sc.internalOptions,
plan: job.data.plan!,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
webhook: job.data.webhook,
v1: job.data.v1,
isCrawlSourceScrape: true,
},
{
priority: 15,
},
jobId,
);
logger.debug("Adding scrape job to BullMQ...", { jobId });
await addCrawlJob(job.data.crawl_id, jobId);
if (job.data.webhook) {
logger.debug("Calling webhook with crawl.started...", {
webhook: job.data.webhook,
});
await callWebhook(
job.data.team_id,
job.data.crawl_id,
null,
job.data.webhook,
true,
"crawl.started",
);
}
2024-12-27 19:59:26 +01:00
const sitemap = sc.crawlerOptions.ignoreSitemap
2024-12-30 21:43:59 -03:00
? 0
: await crawler.tryGetSitemap(async (urls) => {
if (urls.length === 0) return;
logger.debug("Using sitemap chunk of length " + urls.length, {
sitemapLength: urls.length,
2024-12-27 19:59:26 +01:00
});
2024-12-30 21:43:59 -03:00
let jobPriority = await getJobPriority({
plan: job.data.plan,
team_id: job.data.team_id,
basePriority: 21,
});
logger.debug("Using job priority " + jobPriority, { jobPriority });
const jobs = urls.map((url) => {
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls" as const,
team_id: job.data.team_id,
plan: job.data.plan!,
crawlerOptions: job.data.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
internalOptions: sc.internalOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
sitemapped: true,
webhook: job.data.webhook,
v1: job.data.v1,
},
opts: {
jobId: uuid,
priority: 20,
},
};
});
logger.debug("Locking URLs...");
2025-01-07 19:15:23 +01:00
const lockedIds = await lockURLsIndividually(
2024-12-30 21:43:59 -03:00
job.data.crawl_id,
sc,
2025-01-07 19:15:23 +01:00
jobs.map((x) => ({ id: x.opts.jobId, url: x.data.url })),
2024-12-30 21:43:59 -03:00
);
2025-01-10 18:35:10 -03:00
const lockedJobs = jobs.filter((x) =>
lockedIds.find((y) => y.id === x.opts.jobId),
);
2024-12-30 21:43:59 -03:00
logger.debug("Adding scrape jobs to Redis...");
await addCrawlJobs(
job.data.crawl_id,
2025-01-07 19:15:23 +01:00
lockedJobs.map((x) => x.opts.jobId),
2024-12-30 21:43:59 -03:00
);
logger.debug("Adding scrape jobs to BullMQ...");
2025-01-07 19:15:23 +01:00
await addScrapeJobs(lockedJobs);
2024-12-30 21:43:59 -03:00
});
2024-12-27 19:59:26 +01:00
if (sitemap === 0) {
logger.debug("Sitemap not found or ignored.", {
ignoreSitemap: sc.crawlerOptions.ignoreSitemap,
});
}
2025-01-07 19:15:23 +01:00
logger.debug("Done queueing jobs!");
2024-12-30 21:43:59 -03:00
return { success: true };
2024-12-27 19:59:26 +01:00
} catch (error) {
2024-12-30 21:43:59 -03:00
logger.error("An error occurred!", { error });
2024-12-27 19:59:26 +01:00
return { success: false, error };
}
}
2024-12-30 21:43:59 -03:00
async function indexJob(job: Job & { id: string }, document: Document) {
if (
document &&
document.markdown &&
job.data.team_id === process.env.BACKGROUND_INDEX_TEAM_ID!
) {
2025-01-03 21:26:05 -03:00
// indexPage({
// document: document,
// originUrl: job.data.crawl_id
// ? (await getCrawl(job.data.crawl_id))?.originUrl!
// : document.metadata.sourceURL!,
// crawlId: job.data.crawl_id,
// teamId: job.data.team_id,
// }).catch((error) => {
// _logger.error("Error indexing page", { error });
// });
2024-12-30 21:43:59 -03:00
}
}
2024-11-07 20:57:33 +01:00
async function processJob(job: Job & { id: string }, token: string) {
2024-12-11 19:46:11 -03:00
const logger = _logger.child({
module: "queue-worker",
method: "processJob",
jobId: job.id,
scrapeId: job.id,
2024-12-11 19:51:08 -03:00
crawlId: job.data?.crawl_id ?? undefined,
teamId: job.data?.team_id ?? undefined,
2024-12-11 19:46:11 -03:00
});
2024-12-09 23:40:44 +01:00
logger.info(`🐂 Worker taking job ${job.id}`, { url: job.data.url });
2024-07-24 18:44:14 +02:00
2024-08-16 22:17:38 -04:00
// Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue
2024-12-15 02:52:06 -03:00
// if (
// job.data.url &&
// (job.data.url.includes("researchhub.com") ||
// job.data.url.includes("ebay.com"))
// ) {
// logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
// const data = {
// success: false,
// document: null,
// project_id: job.data.project_id,
// error:
// "URL is blocked. Suspecious activity detected. Please contact help@firecrawl.com if you believe this is an error.",
// };
// return data;
// }
2024-08-16 22:17:38 -04:00
try {
2024-07-30 13:27:23 -04:00
job.updateProgress({
current: 1,
total: 100,
current_step: "SCRAPING",
2024-12-11 19:51:08 -03:00
current_url: "",
});
const start = Date.now();
2024-09-01 14:19:43 -03:00
2025-01-15 19:02:20 +01:00
if (job.data.crawl_id) {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
if (sc && sc.cancelled) {
throw new Error("Parent crawl/batch scrape was cancelled");
}
}
2024-11-12 12:42:39 +01:00
const pipeline = await Promise.race([
startWebScraperPipeline({
job,
2024-12-11 19:51:08 -03:00
token,
2024-11-12 12:42:39 +01:00
}),
2024-12-11 19:46:11 -03:00
...(job.data.scrapeOptions.timeout !== undefined
? [
(async () => {
await sleep(job.data.scrapeOptions.timeout);
throw new Error("timeout");
2024-12-11 19:51:08 -03:00
})(),
2024-12-11 19:46:11 -03:00
]
2024-12-11 19:51:08 -03:00
: []),
2024-11-12 12:42:39 +01:00
]);
2024-11-07 20:57:33 +01:00
if (!pipeline.success) {
throw pipeline.error;
}
2024-11-07 20:57:33 +01:00
const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000;
2024-04-20 19:37:45 -07:00
2024-11-07 20:57:33 +01:00
const doc = pipeline.document;
const rawHtml = doc.rawHtml ?? "";
2024-08-16 23:29:30 +02:00
if (!job.data.scrapeOptions.formats.includes("rawHtml")) {
delete doc.rawHtml;
}
const data = {
2024-11-07 20:57:33 +01:00
success: true,
result: {
2024-12-11 19:46:11 -03:00
links: [
{
content: doc,
2024-12-11 19:51:08 -03:00
source: doc?.metadata?.sourceURL ?? doc?.metadata?.url ?? "",
2025-01-09 19:14:00 +01:00
id: job.id,
2024-12-11 19:51:08 -03:00
},
],
},
project_id: job.data.project_id,
2024-12-11 19:51:08 -03:00
document: doc,
};
2024-04-15 17:01:47 -04:00
2024-09-01 13:44:36 -03:00
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
2024-12-11 19:46:11 -03:00
logger.debug("Calling webhook with success...", {
2024-12-11 19:51:08 -03:00
webhook: job.data.webhook,
2024-12-11 19:46:11 -03:00
});
2024-09-01 15:06:36 -03:00
await callWebhook(
2024-09-01 14:19:43 -03:00
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
2024-09-01 15:06:36 -03:00
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page",
2024-12-11 19:51:08 -03:00
true,
2024-09-01 14:19:43 -03:00
);
2024-07-25 00:14:25 +02:00
}
2024-04-20 19:37:45 -07:00
2024-08-13 20:51:43 +02:00
if (job.data.crawl_id) {
2024-11-07 20:57:33 +01:00
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
2024-12-11 19:46:11 -03:00
if (
doc.metadata.url !== undefined &&
doc.metadata.sourceURL !== undefined &&
normalizeURL(doc.metadata.url, sc) !==
normalizeURL(doc.metadata.sourceURL, sc) &&
job.data.crawlerOptions !== null // only on crawls, don't care on batch scrape
2024-12-11 19:46:11 -03:00
) {
const crawler = crawlToCrawler(job.data.crawl_id, sc);
2024-12-30 21:43:59 -03:00
if (
2025-01-10 18:35:10 -03:00
crawler.filterURL(doc.metadata.url, doc.metadata.sourceURL) ===
null &&
2025-01-07 19:15:23 +01:00
!job.data.isCrawlSourceScrape
2024-12-30 21:43:59 -03:00
) {
2025-01-07 19:15:23 +01:00
throw new Error(
"Redirected target URL is not allowed by crawlOptions",
); // TODO: make this its own error type that is ignored by error tracking
}
if (job.data.isCrawlSourceScrape) {
// TODO: re-fetch sitemap for redirect target domain
sc.originUrl = doc.metadata.url;
await saveCrawl(job.data.crawl_id, sc);
}
if (isUrlBlocked(doc.metadata.url)) {
throw new Error(BLOCKLISTED_URL_MESSAGE); // TODO: make this its own error type that is ignored by error tracking
}
2024-12-30 21:43:59 -03:00
const p1 = generateURLPermutations(normalizeURL(doc.metadata.url, sc));
2024-12-11 19:46:11 -03:00
const p2 = generateURLPermutations(
2024-12-11 19:51:08 -03:00
normalizeURL(doc.metadata.sourceURL, sc),
2024-12-11 19:46:11 -03:00
);
2024-12-15 20:16:29 +01:00
if (JSON.stringify(p1) !== JSON.stringify(p2)) {
logger.debug(
"Was redirected, removing old URL and locking new URL...",
{ oldUrl: doc.metadata.sourceURL, newUrl: doc.metadata.url },
);
// Prevent redirect target from being visited in the crawl again
// See lockURL
const x = await redisConnection.sadd(
"crawl:" + job.data.crawl_id + ":visited",
2024-12-17 16:58:57 -03:00
...p1.map((x) => x.href),
2024-12-15 20:16:29 +01:00
);
const lockRes = x === p1.length;
2024-12-17 16:58:57 -03:00
2024-12-15 20:16:29 +01:00
if (job.data.crawlerOptions !== null && !lockRes) {
throw new RacedRedirectError();
}
}
2024-11-08 16:22:06 +01:00
}
2024-11-07 20:57:33 +01:00
logger.debug("Logging job to DB...");
2024-12-11 19:46:11 -03:00
await logJob(
{
job_id: job.id as string,
success: true,
num_docs: 1,
docs: [doc],
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: sc.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
origin: job.data.origin,
2024-12-11 19:51:08 -03:00
crawl_id: job.data.crawl_id,
2024-12-11 19:46:11 -03:00
},
2024-12-11 19:51:08 -03:00
true,
2024-12-11 19:46:11 -03:00
);
2024-08-15 19:27:15 +02:00
2024-12-30 21:43:59 -03:00
indexJob(job, doc);
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, true);
2024-08-13 22:10:17 +02:00
2024-11-20 19:47:58 +01:00
if (job.data.crawlerOptions !== null) {
2024-08-13 20:51:43 +02:00
if (!sc.cancelled) {
2024-12-11 19:46:11 -03:00
const crawler = crawlToCrawler(
job.data.crawl_id,
sc,
2024-12-11 19:51:08 -03:00
doc.metadata.url ?? doc.metadata.sourceURL ?? sc.originUrl!,
2024-12-11 19:46:11 -03:00
);
2024-08-13 20:51:43 +02:00
2024-08-16 23:29:30 +02:00
const links = crawler.filterLinks(
2024-12-11 19:46:11 -03:00
crawler.extractLinksFromHTML(
rawHtml ?? "",
2024-12-11 19:51:08 -03:00
doc.metadata?.url ?? doc.metadata?.sourceURL ?? sc.originUrl!,
2024-12-11 19:46:11 -03:00
),
2024-08-13 20:51:43 +02:00
Infinity,
2024-12-11 19:51:08 -03:00
sc.crawlerOptions?.maxDepth ?? 10,
2024-09-01 14:19:43 -03:00
);
2024-12-11 19:46:11 -03:00
logger.debug("Discovered " + links.length + " links...", {
2024-12-11 19:51:08 -03:00
linksLength: links.length,
2024-12-11 19:46:11 -03:00
});
2024-09-01 14:19:43 -03:00
2024-08-13 20:51:43 +02:00
for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) {
2024-08-21 22:53:33 -03:00
// This seems to work really welel
2024-09-01 14:19:43 -03:00
const jobPriority = await getJobPriority({
plan: sc.plan as PlanType,
team_id: sc.team_id,
2024-12-11 19:51:08 -03:00
basePriority: job.data.crawl_id ? 20 : 10,
2024-09-01 14:19:43 -03:00
});
2024-08-21 22:20:40 -03:00
const jobId = uuidv4();
2024-12-11 19:46:11 -03:00
logger.debug(
"Determined job priority " +
jobPriority +
" for URL " +
JSON.stringify(link),
2024-12-11 19:51:08 -03:00
{ jobPriority, url: link },
2024-12-11 19:46:11 -03:00
);
2024-08-21 22:53:33 -03:00
// console.log("plan: ", sc.plan);
// console.log("team_id: ", sc.team_id)
// console.log("base priority: ", job.data.crawl_id ? 20 : 10)
// console.log("job priority: " , jobPriority, "\n\n\n")
2024-08-21 22:20:40 -03:00
2024-10-25 20:21:12 +02:00
await addScrapeJob(
2024-09-01 14:19:43 -03:00
{
url: link,
mode: "single_urls",
team_id: sc.team_id,
2024-11-07 20:57:33 +01:00
scrapeOptions: scrapeOptions.parse(sc.scrapeOptions),
internalOptions: sc.internalOptions,
plan: job.data.plan,
2024-09-01 14:19:43 -03:00
origin: job.data.origin,
crawl_id: job.data.crawl_id,
2024-10-03 16:37:58 -03:00
webhook: job.data.webhook,
2024-12-11 19:51:08 -03:00
v1: job.data.v1,
2024-09-01 14:19:43 -03:00
},
{},
jobId,
2024-12-11 19:51:08 -03:00
jobPriority,
2024-09-01 14:19:43 -03:00
);
2024-08-13 20:51:43 +02:00
2024-10-25 20:21:12 +02:00
await addCrawlJob(job.data.crawl_id, jobId);
2024-12-11 19:46:11 -03:00
logger.debug("Added job for URL " + JSON.stringify(link), {
jobPriority,
url: link,
2024-12-11 19:51:08 -03:00
newJobId: jobId,
2024-12-11 19:46:11 -03:00
});
} else {
2025-01-03 22:15:23 -03:00
// TODO: removed this, ok? too many 'not useful' logs (?) Mogery!
// logger.debug("Could not lock URL " + JSON.stringify(link), {
// url: link,
// });
2024-08-13 20:51:43 +02:00
}
}
}
}
2024-08-13 22:10:17 +02:00
await finishCrawlIfNeeded(job, sc);
2024-12-30 21:43:59 -03:00
} else {
indexJob(job, doc);
2024-08-13 20:51:43 +02:00
}
if (job.data.is_scrape !== true) {
let creditsToBeBilled = 1; // Assuming 1 credit per document
if (job.data.scrapeOptions.extract) {
creditsToBeBilled = 5;
}
if (job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID!) {
billTeam(job.data.team_id, undefined, creditsToBeBilled, logger).catch((error) => {
logger.error(
`Failed to bill team ${job.data.team_id} for ${creditsToBeBilled} credits`,
{ error },
);
// Optionally, you could notify an admin or add to a retry queue here
});
}
}
2024-11-07 20:57:33 +01:00
logger.info(`🐂 Job done ${job.id}`);
2024-07-30 13:27:23 -04:00
return data;
} catch (error) {
2024-12-11 19:46:11 -03:00
const isEarlyTimeout =
error instanceof Error && error.message === "timeout";
2025-01-15 19:02:20 +01:00
const isCancelled =
error instanceof Error && error.message === "Parent crawl/batch scrape was cancelled";
2024-04-15 17:01:47 -04:00
if (isEarlyTimeout) {
logger.error(`🐂 Job timed out ${job.id}`);
} else if (error instanceof RacedRedirectError) {
logger.warn(`🐂 Job got redirect raced ${job.id}, silently failing`);
2025-01-15 19:02:20 +01:00
} else if (isCancelled) {
logger.warn(`🐂 Job got cancelled, silently failing`);
} else {
logger.error(`🐂 Job errored ${job.id} - ${error}`, { error });
2024-11-12 12:42:39 +01:00
Sentry.captureException(error, {
data: {
2024-12-11 19:51:08 -03:00
job: job.id,
},
2024-11-12 12:42:39 +01:00
});
2024-11-12 18:10:11 +01:00
if (error instanceof CustomError) {
// Here we handle the error, then save the failed job
logger.error(error.message); // or any other error handling
}
logger.error(error);
if (error.stack) {
logger.error(error.stack);
}
2024-11-12 12:42:39 +01:00
}
2024-08-21 17:58:27 +02:00
const data = {
success: false,
2024-11-07 20:57:33 +01:00
document: null,
project_id: job.data.project_id,
2024-12-11 19:46:11 -03:00
error:
error instanceof Error
? error
: typeof error === "string"
? new Error(error)
2024-12-11 19:51:08 -03:00
: new Error(JSON.stringify(error)),
};
2024-08-30 16:38:55 -03:00
if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) {
2024-09-01 14:19:43 -03:00
callWebhook(
job.data.team_id,
job.data.crawl_id ?? (job.id as string),
data,
job.data.webhook,
job.data.v1,
2024-12-11 19:51:08 -03:00
job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page",
2024-09-01 14:19:43 -03:00
);
2024-09-01 13:44:36 -03:00
}
2025-01-09 18:48:47 +01:00
logger.debug("Logging job to DB...");
await logJob(
{
job_id: job.id as string,
success: false,
message:
typeof error === "string"
? error
: (error.message ??
"Something went wrong... Contact help@mendable.ai"),
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
},
true,
);
2024-09-01 14:19:43 -03:00
2024-08-15 18:55:18 +02:00
if (job.data.crawl_id) {
2024-11-07 20:57:33 +01:00
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
2024-12-11 19:46:11 -03:00
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, false);
2024-12-17 16:58:57 -03:00
await redisConnection.srem(
"crawl:" + job.data.crawl_id + ":visited_unique",
normalizeURL(job.data.url, sc),
);
2024-11-07 20:57:33 +01:00
await finishCrawlIfNeeded(job, sc);
2024-08-15 19:27:15 +02:00
// await logJob({
// job_id: job.data.crawl_id,
// success: false,
// message:
// typeof error === "string"
// ? error
// : error.message ??
// "Something went wrong... Contact help@mendable.ai",
// num_docs: 0,
// docs: [],
// time_taken: 0,
// team_id: job.data.team_id,
// mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
// url: sc ? sc.originUrl ?? job.data.url : job.data.url,
// crawlerOptions: sc ? sc.crawlerOptions : undefined,
// scrapeOptions: sc ? sc.scrapeOptions : job.data.scrapeOptions,
// origin: job.data.origin,
// });
2024-08-15 18:55:18 +02:00
}
2024-07-30 13:27:23 -04:00
// done(null, data);
return data;
2024-04-15 17:01:47 -04:00
}
}
2024-07-30 13:27:23 -04:00
// wsq.process(
// Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
// processJob
// );
// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
2025-01-09 16:04:59 +01:00
// Start both workers
(async () => {
await Promise.all([
workerFun(getScrapeQueue(), processJobInternal),
workerFun(getExtractQueue(), processExtractJobInternal),
]);
console.log("All workers exited. Waiting for all jobs to finish...");
while (runningJobs.size > 0) {
2025-01-10 18:35:10 -03:00
await new Promise((resolve) => setTimeout(resolve, 500));
2025-01-09 16:04:59 +01:00
}
2025-01-16 16:29:52 +01:00
console.log("All jobs finished. Worker out!");
2025-01-09 16:04:59 +01:00
process.exit(0);
2025-01-10 18:35:10 -03:00
})();