Files
firecrawl/apps/api/src/services/queue-worker.ts
T

380 lines
12 KiB
TypeScript
Raw Normal View History

2024-04-15 17:01:47 -04:00
import { CustomError } from "../lib/custom-error";
2024-07-30 14:44:13 -04:00
import {
getScrapeQueue,
redisConnection,
scrapeQueueName,
} from "./queue-service";
2024-04-15 17:01:47 -04:00
import "dotenv/config";
import { logtail } from "./logtail";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
2024-04-20 13:53:11 -07:00
import { logJob } from "./logging/log_job";
2024-07-30 14:44:13 -04:00
import { initSDK } from "@hyperdx/node-opentelemetry";
2024-08-14 17:53:47 +02:00
import { Job } from "bullmq";
2024-07-23 17:30:46 -03:00
import { Logger } from "../lib/logger";
2024-07-30 13:27:23 -04:00
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
2024-08-16 19:16:08 +02:00
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, lockURL } from "../lib/crawl-redis";
2024-08-13 20:51:43 +02:00
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
2024-08-14 17:53:47 +02:00
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
2024-05-20 13:36:34 -07:00
2024-07-30 14:44:13 -04:00
if (process.env.ENV === "production") {
2024-07-11 23:14:15 +02:00
initSDK({
consoleCapture: true,
additionalInstrumentations: [],
});
}
2024-07-30 13:27:23 -04:00
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
2024-04-15 17:01:47 -04:00
2024-07-30 13:27:23 -04:00
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval =
Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000;
const jobLockExtendInterval =
Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000;
const jobLockExtensionTime =
Number(process.env.JOB_LOCK_EXTENSION_TIME) || 60000;
2024-07-30 13:27:23 -04:00
const cantAcceptConnectionInterval =
Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000;
const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
2024-07-11 20:08:21 +02:00
2024-07-30 13:27:23 -04:00
const processJobInternal = async (token: string, job: Job) => {
const extendLockInterval = setInterval(async () => {
2024-08-07 19:49:48 +02:00
Logger.info(`🐂 Worker extending lock on job ${job.id}`);
2024-07-30 13:27:23 -04:00
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
const result = await processJob(job, token);
2024-08-09 14:07:46 -04:00
try{
2024-08-13 21:40:59 +02:00
if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") {
await job.moveToCompleted(null, token, false);
} else {
await job.moveToCompleted(result.docs, token, false);
}
2024-08-09 14:07:46 -04:00
}catch(e){
2024-07-30 14:44:13 -04:00
}
2024-07-30 13:27:23 -04:00
} catch (error) {
console.log("Job failed, error:", error);
await job.moveToFailed(error, token, false);
} finally {
clearInterval(extendLockInterval);
}
};
let isShuttingDown = false;
process.on("SIGINT", () => {
console.log("Received SIGINT. Shutting down gracefully...");
isShuttingDown = true;
});
2024-07-30 14:44:13 -04:00
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<void>) => {
const worker = new Worker(queueName, null, {
2024-07-30 13:27:23 -04:00
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
stalledInterval: 30 * 1000, // 30 seconds
maxStalledCount: 10, // 10 times
});
worker.startStalledCheckTimer();
const monitor = await systemMonitor;
while (true) {
if (isShuttingDown) {
console.log("No longer accepting new jobs. SIGINT");
break;
}
const token = uuidv4();
const canAcceptConnection = await monitor.acceptConnection();
if (!canAcceptConnection) {
console.log("Cant accept connection");
await sleep(cantAcceptConnectionInterval); // more sleep
continue;
}
const job = await worker.getNextJob(token);
if (job) {
2024-08-06 16:57:00 +02:00
processJobInternal(token, job);
2024-07-30 13:27:23 -04:00
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);
}
}
};
2024-07-30 14:44:13 -04:00
workerFun(scrapeQueueName, processJobInternal);
2024-07-30 13:27:23 -04:00
async function processJob(job: Job, token: string) {
Logger.info(`🐂 Worker taking job ${job.id}`);
2024-07-24 18:44:14 +02:00
2024-08-16 22:17:38 -04:00
// Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue
2024-08-19 09:29:23 -03:00
if (job.data.url && (job.data.url.includes("researchhub.com") || job.data.url.includes("ebay.com") || job.data.url.includes("youtube.com"))) {
2024-08-16 22:17:38 -04:00
Logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
const data = {
success: false,
docs: [],
project_id: job.data.project_id,
2024-08-17 03:09:31 -04:00
error: "URL is blocked. Please contact hello@firecrawl.com if you believe this is an error.",
2024-08-16 22:17:38 -04:00
};
await job.moveToCompleted(data.docs, token, false);
return data;
}
try {
2024-07-30 13:27:23 -04:00
job.updateProgress({
current: 1,
total: 100,
current_step: "SCRAPING",
current_url: "",
});
const start = Date.now();
2024-08-17 03:09:31 -04:00
2024-07-30 14:44:13 -04:00
const { success, message, docs } = await startWebScraperPipeline({
job,
token,
});
const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000;
2024-04-20 19:37:45 -07:00
const data = {
2024-08-06 16:26:46 +02:00
success,
result: {
2024-08-13 20:51:43 +02:00
links: docs.map((doc) => {
2024-07-30 14:44:13 -04:00
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
2024-08-13 20:51:43 +02:00
error: message /* etc... */,
docs,
};
2024-04-15 17:01:47 -04:00
2024-08-13 20:51:43 +02:00
if (job.data.mode === "crawl") {
2024-07-25 00:14:25 +02:00
await callWebhook(job.data.team_id, job.id as string, data);
}
2024-04-20 19:37:45 -07:00
2024-08-13 20:51:43 +02:00
if (job.data.crawl_id) {
2024-08-15 19:27:15 +02:00
await logJob({
job_id: job.id as string,
success: success,
message: message,
num_docs: docs.length,
docs: docs,
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
2024-08-13 22:10:17 +02:00
await addCrawlJobDone(job.data.crawl_id, job.id);
2024-08-14 17:53:47 +02:00
const sc = await getCrawl(job.data.crawl_id) as StoredCrawl;
2024-08-13 20:51:43 +02:00
2024-08-14 17:53:47 +02:00
if (!job.data.sitemapped) {
2024-08-13 20:51:43 +02:00
if (!sc.cancelled) {
const crawler = crawlToCrawler(job.data.crawl_id, sc);
2024-08-16 22:01:56 -04:00
let linksOnPage = [];
try{
linksOnPage = data.docs[0]?.linksOnPage ?? [];
}catch(e){
linksOnPage = []
}
const links = crawler.filterLinks(
linksOnPage.map(href => crawler.filterURL(href.trim(), sc.originUrl))
2024-08-13 20:51:43 +02:00
.filter(x => x !== null),
Infinity,
sc.crawlerOptions?.maxDepth ?? 10
)
for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) {
const newJob = await addScrapeJob({
url: link,
mode: "single_urls",
crawlerOptions: sc.crawlerOptions,
team_id: sc.team_id,
pageOptions: sc.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
await addCrawlJob(job.data.crawl_id, newJob.id);
}
}
}
}
2024-08-13 22:10:17 +02:00
if (await finishCrawl(job.data.crawl_id)) {
2024-08-14 17:53:47 +02:00
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await Promise.all(jobIDs.map(async x => {
if (x === job.id) {
return {
async getState() {
return "completed"
},
timestamp: Date.now(),
returnvalue: docs,
}
}
const j = await getScrapeQueue().getJob(x);
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(j.id);
if (supabaseData) {
j.returnvalue = supabaseData.docs;
}
}
return j;
}))).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
2024-08-16 19:16:08 +02:00
const jobStatus = sc.cancelled || jobStatuses.some(x => x === "failed") ? "failed" : "completed";
2024-08-14 17:53:47 +02:00
2024-08-14 22:02:05 +02:00
const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
2024-08-14 17:53:47 +02:00
2024-08-15 18:55:18 +02:00
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : message,
2024-08-15 18:55:18 +02:00
num_docs: fullDocs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: "crawl",
url: sc.originUrl,
crawlerOptions: sc.crawlerOptions,
pageOptions: sc.pageOptions,
origin: job.data.origin,
});
2024-08-14 17:53:47 +02:00
const data = {
success: jobStatus !== "failed",
result: {
2024-08-14 22:02:05 +02:00
links: fullDocs.map((doc) => {
2024-08-14 17:53:47 +02:00
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
error: message /* etc... */,
2024-08-14 22:02:05 +02:00
docs: fullDocs,
2024-08-14 17:53:47 +02:00
};
await callWebhook(job.data.team_id, job.data.crawl_id, data);
2024-08-13 22:10:17 +02:00
}
2024-08-13 20:51:43 +02:00
}
Logger.info(`🐂 Job done ${job.id}`);
2024-07-30 13:27:23 -04:00
return data;
} catch (error) {
2024-07-23 17:30:46 -03:00
Logger.error(`🐂 Job errored ${job.id} - ${error}`);
2024-04-15 17:01:47 -04:00
if (error instanceof CustomError) {
// Here we handle the error, then save the failed job
2024-07-23 17:30:46 -03:00
Logger.error(error.message); // or any other error handling
2024-04-15 17:01:47 -04:00
logtail.error("Custom error while ingesting", {
2024-04-15 17:01:47 -04:00
job_id: job.id,
error: error.message,
dataIngestionJob: error.dataIngestionJob,
2024-04-15 17:01:47 -04:00
});
}
2024-07-23 17:30:46 -03:00
Logger.error(error);
2024-08-14 22:02:05 +02:00
if (error.stack) {
Logger.error(error.stack);
}
logtail.error("Overall error ingesting", {
job_id: job.id,
error: error.message,
});
const data = {
success: false,
2024-07-30 14:44:13 -04:00
docs: [],
project_id: job.data.project_id,
error:
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
};
2024-08-15 18:55:18 +02:00
2024-08-13 22:10:17 +02:00
if (job.data.mode === "crawl" || job.data.crawl_id) {
await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data);
2024-07-25 00:14:25 +02:00
}
2024-08-15 18:55:18 +02:00
if (job.data.crawl_id) {
2024-08-15 19:27:15 +02:00
await logJob({
job_id: job.id as string,
success: false,
message:
typeof error === "string"
? error
: error.message ?? "Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
2024-08-15 18:55:18 +02:00
const sc = await getCrawl(job.data.crawl_id);
await logJob({
job_id: job.data.crawl_id,
success: false,
message:
typeof error === "string"
? error
: error.message ?? "Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: sc ? sc.originUrl : job.data.url,
crawlerOptions: sc ? sc.crawlerOptions : job.data.crawlerOptions,
pageOptions: sc ? sc.pageOptions : job.data.pageOptions,
origin: job.data.origin,
});
}
2024-07-30 13:27:23 -04:00
// done(null, data);
return data;
2024-04-15 17:01:47 -04:00
}
}
2024-07-30 13:27:23 -04:00
// wsq.process(
// Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
// processJob
// );
// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));