Update queue-worker.ts

This commit is contained in:
Nicolas
2024-09-01 14:19:43 -03:00
parent 758f729ae2
commit 44fe741c35
+149 -56
View File
@@ -1,5 +1,5 @@
import "dotenv/config"; import "dotenv/config";
import "./sentry" import "./sentry";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
import { CustomError } from "../lib/custom-error"; import { CustomError } from "../lib/custom-error";
import { import {
@@ -17,11 +17,23 @@ import { Logger } from "../lib/logger";
import { Worker } from "bullmq"; import { Worker } from "bullmq";
import systemMonitor from "./system-monitor"; import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, lockURL } from "../lib/crawl-redis"; import {
addCrawlJob,
addCrawlJobDone,
crawlToCrawler,
finishCrawl,
getCrawl,
getCrawlJobs,
lockURL,
} from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs"; import { addScrapeJob } from "./queue-jobs";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority"; import {
addJobPriority,
deleteJobPriority,
getJobPriority,
} from "../../src/lib/job-priority";
import { PlanType } from "../types"; import { PlanType } from "../types";
if (process.env.ENV === "production") { if (process.env.ENV === "production") {
@@ -52,25 +64,24 @@ const processJobInternal = async (token: string, job: Job) => {
await job.extendLock(token, jobLockExtensionTime); await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval); }, jobLockExtendInterval);
await addJobPriority(job.data.team_id, job.id ); await addJobPriority(job.data.team_id, job.id);
let err = null; let err = null;
try { try {
const result = await processJob(job, token); const result = await processJob(job, token);
try{ try {
if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") {
await job.moveToCompleted(null, token, false); await job.moveToCompleted(null, token, false);
} else { } else {
await job.moveToCompleted(result.docs, token, false); await job.moveToCompleted(result.docs, token, false);
} }
}catch(e){ } catch (e) {}
}
} catch (error) { } catch (error) {
console.log("Job failed, error:", error); console.log("Job failed, error:", error);
Sentry.captureException(error); Sentry.captureException(error);
err = error; err = error;
await job.moveToFailed(error, token, false); await job.moveToFailed(error, token, false);
} finally { } finally {
await deleteJobPriority(job.data.team_id, job.id ); await deleteJobPriority(job.data.team_id, job.id);
clearInterval(extendLockInterval); clearInterval(extendLockInterval);
} }
@@ -84,7 +95,10 @@ process.on("SIGINT", () => {
isShuttingDown = true; isShuttingDown = true;
}); });
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<any>) => { const workerFun = async (
queueName: string,
processJobInternal: (token: string, job: Job) => Promise<any>
) => {
const worker = new Worker(queueName, null, { const worker = new Worker(queueName, null, {
connection: redisConnection, connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute lockDuration: 1 * 60 * 1000, // 1 minute
@@ -113,44 +127,60 @@ const workerFun = async (queueName: string, processJobInternal: (token: string,
const job = await worker.getNextJob(token); const job = await worker.getNextJob(token);
if (job) { if (job) {
if (job.data && job.data.sentry && Sentry.isInitialized()) { if (job.data && job.data.sentry && Sentry.isInitialized()) {
Sentry.continueTrace({ sentryTrace: job.data.sentry.trace, baggage: job.data.sentry.baggage }, () => { Sentry.continueTrace(
Sentry.startSpan({ {
sentryTrace: job.data.sentry.trace,
baggage: job.data.sentry.baggage,
},
() => {
Sentry.startSpan(
{
name: "Scrape job", name: "Scrape job",
attributes: { attributes: {
job: job.id, job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id, worker: process.env.FLY_MACHINE_ID ?? worker.id,
}, },
}, async (span) => { },
await Sentry.startSpan({ async (span) => {
await Sentry.startSpan(
{
name: "Process scrape job", name: "Process scrape job",
op: "queue.process", op: "queue.process",
attributes: { attributes: {
"messaging.message.id": job.id, "messaging.message.id": job.id,
"messaging.destination.name": getScrapeQueue().name, "messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": job.data.sentry.size, "messaging.message.body.size": job.data.sentry.size,
"messaging.message.receive.latency": Date.now() - (job.processedOn ?? job.timestamp), "messaging.message.receive.latency":
Date.now() - (job.processedOn ?? job.timestamp),
"messaging.message.retry.count": job.attemptsMade, "messaging.message.retry.count": job.attemptsMade,
} },
}, async () => { },
async () => {
const res = await processJobInternal(token, job); const res = await processJobInternal(token, job);
if (res !== null) { if (res !== null) {
span.setStatus({ code: 2 }); // ERROR span.setStatus({ code: 2 }); // ERROR
} else { } else {
span.setStatus({ code: 1 }); // OK span.setStatus({ code: 1 }); // OK
} }
}); }
}); );
}); }
);
}
);
} else { } else {
Sentry.startSpan({ Sentry.startSpan(
{
name: "Scrape job", name: "Scrape job",
attributes: { attributes: {
job: job.id, job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id, worker: process.env.FLY_MACHINE_ID ?? worker.id,
}, },
}, () => { },
() => {
processJobInternal(token, job); processJobInternal(token, job);
}); }
);
} }
await sleep(gotJobInterval); await sleep(gotJobInterval);
@@ -167,13 +197,20 @@ async function processJob(job: Job, token: string) {
// Check if the job URL is researchhub and block it immediately // Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue // TODO: remove this once solve the root issue
if (job.data.url && (job.data.url.includes("researchhub.com") || job.data.url.includes("ebay.com") || job.data.url.includes("youtube.com") || job.data.url.includes("microsoft.com") )) { if (
job.data.url &&
(job.data.url.includes("researchhub.com") ||
job.data.url.includes("ebay.com") ||
job.data.url.includes("youtube.com") ||
job.data.url.includes("microsoft.com"))
) {
Logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`); Logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
const data = { const data = {
success: false, success: false,
docs: [], docs: [],
project_id: job.data.project_id, project_id: job.data.project_id,
error: "URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.", error:
"URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.",
}; };
await job.moveToCompleted(data.docs, token, false); await job.moveToCompleted(data.docs, token, false);
return data; return data;
@@ -194,7 +231,7 @@ async function processJob(job: Job, token: string) {
}); });
// Better if we throw here so we capture with the correct error // Better if we throw here so we capture with the correct error
if(!success) { if (!success) {
throw new Error(message); throw new Error(message);
} }
const end = Date.now(); const end = Date.now();
@@ -217,14 +254,24 @@ async function processJob(job: Job, token: string) {
docs, docs,
}; };
// No idea what this does and when it is called. // No idea what this does and when it is called.
if (job.data.mode === "crawl" && !job.data.v1) { if (job.data.mode === "crawl" && !job.data.v1) {
callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); callWebhook(
job.data.team_id,
job.id as string,
data,
job.data.webhook,
job.data.v1
);
} }
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1
);
} }
if (job.data.crawl_id) { if (job.data.crawl_id) {
@@ -246,7 +293,7 @@ async function processJob(job: Job, token: string) {
await addCrawlJobDone(job.data.crawl_id, job.id); await addCrawlJobDone(job.data.crawl_id, job.id);
const sc = await getCrawl(job.data.crawl_id) as StoredCrawl; const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
if (!job.data.sitemapped) { if (!job.data.sitemapped) {
if (!sc.cancelled) { if (!sc.cancelled) {
@@ -256,13 +303,16 @@ async function processJob(job: Job, token: string) {
crawler.extractLinksFromHTML(rawHtml ?? "", sc.originUrl), crawler.extractLinksFromHTML(rawHtml ?? "", sc.originUrl),
Infinity, Infinity,
sc.crawlerOptions?.maxDepth ?? 10 sc.crawlerOptions?.maxDepth ?? 10
) );
for (const link of links) { for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) { if (await lockURL(job.data.crawl_id, sc, link)) {
// This seems to work really welel // This seems to work really welel
const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10}) const jobPriority = await getJobPriority({
plan: sc.plan as PlanType,
team_id: sc.team_id,
basePriority: job.data.crawl_id ? 20 : 10,
});
const jobId = uuidv4(); const jobId = uuidv4();
// console.log("plan: ", sc.plan); // console.log("plan: ", sc.plan);
@@ -270,7 +320,8 @@ async function processJob(job: Job, token: string) {
// console.log("base priority: ", job.data.crawl_id ? 20 : 10) // console.log("base priority: ", job.data.crawl_id ? 20 : 10)
// console.log("job priority: " , jobPriority, "\n\n\n") // console.log("job priority: " , jobPriority, "\n\n\n")
const newJob = await addScrapeJob({ const newJob = await addScrapeJob(
{
url: link, url: link,
mode: "single_urls", mode: "single_urls",
crawlerOptions: sc.crawlerOptions, crawlerOptions: sc.crawlerOptions,
@@ -279,7 +330,11 @@ async function processJob(job: Job, token: string) {
origin: job.data.origin, origin: job.data.origin,
crawl_id: job.data.crawl_id, crawl_id: job.data.crawl_id,
v1: job.data.v1, v1: job.data.v1,
}, {}, jobId, jobPriority); },
{},
jobId,
jobPriority
);
await addCrawlJob(job.data.crawl_id, newJob.id); await addCrawlJob(job.data.crawl_id, newJob.id);
} }
@@ -290,20 +345,30 @@ async function processJob(job: Job, token: string) {
if (await finishCrawl(job.data.crawl_id)) { if (await finishCrawl(job.data.crawl_id)) {
// v1 web hooks, call when done with no data, but with event completed // v1 web hooks, call when done with no data, but with event completed
if (job.data.v1 && job.data.webhook) { if (job.data.v1 && job.data.webhook) {
callWebhook(job.data.team_id, job.data.crawl_id, [], job.data.webhook, job.data.v1, "crawl.completed"); callWebhook(
job.data.team_id,
job.data.crawl_id,
[],
job.data.webhook,
job.data.v1,
"crawl.completed"
);
} }
if (!job.data.v1) {
const jobIDs = await getCrawlJobs(job.data.crawl_id); const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await Promise.all(jobIDs.map(async x => { const jobs = (
await Promise.all(
jobIDs.map(async (x) => {
if (x === job.id) { if (x === job.id) {
return { return {
async getState() { async getState() {
return "completed" return "completed";
}, },
timestamp: Date.now(), timestamp: Date.now(),
returnvalue: docs, returnvalue: docs,
} };
} }
const j = await getScrapeQueue().getJob(x); const j = await getScrapeQueue().getJob(x);
@@ -317,11 +382,18 @@ async function processJob(job: Job, token: string) {
} }
return j; return j;
}))).sort((a, b) => a.timestamp - b.timestamp); })
const jobStatuses = await Promise.all(jobs.map(x => x.getState())); )
const jobStatus = sc.cancelled || jobStatuses.some(x => x === "failed") ? "failed" : "completed"; ).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
const jobStatus =
sc.cancelled || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed";
const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); const fullDocs = jobs.map((x) =>
Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue
);
await logJob({ await logJob({
job_id: job.data.crawl_id, job_id: job.data.crawl_id,
@@ -352,16 +424,22 @@ async function processJob(job: Job, token: string) {
error: message /* etc... */, error: message /* etc... */,
docs: fullDocs, docs: fullDocs,
}; };
console.log(fullDocs.length);
// v0 web hooks, call when done with all the data // v0 web hooks, call when done with all the data
if (!job.data.v1) { if (!job.data.v1) {
callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed"); callWebhook(
} job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
"crawl.completed"
);
}
}
} }
} }
Logger.info(`🐂 Job done ${job.id}`); Logger.info(`🐂 Job done ${job.id}`);
return data; return data;
@@ -370,9 +448,9 @@ async function processJob(job: Job, token: string) {
Sentry.captureException(error, { Sentry.captureException(error, {
data: { data: {
job: job.id job: job.id,
}, },
}) });
if (error instanceof CustomError) { if (error instanceof CustomError) {
// Here we handle the error, then save the failed job // Here we handle the error, then save the failed job
@@ -403,10 +481,23 @@ async function processJob(job: Job, token: string) {
}; };
if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) {
callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); callWebhook(
job.data.team_id,
job.data.crawl_id ?? (job.id as string),
data,
job.data.webhook,
job.data.v1
);
} }
if(job.data.v1) { if (job.data.v1) {
callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.failed"); callWebhook(
job.data.team_id,
job.id as string,
[],
job.data.webhook,
job.data.v1,
"crawl.failed"
);
} }
if (job.data.crawl_id) { if (job.data.crawl_id) {
@@ -416,7 +507,8 @@ async function processJob(job: Job, token: string) {
message: message:
typeof error === "string" typeof error === "string"
? error ? error
: error.message ?? "Something went wrong... Contact help@mendable.ai", : error.message ??
"Something went wrong... Contact help@mendable.ai",
num_docs: 0, num_docs: 0,
docs: [], docs: [],
time_taken: 0, time_taken: 0,
@@ -437,7 +529,8 @@ async function processJob(job: Job, token: string) {
message: message:
typeof error === "string" typeof error === "string"
? error ? error
: error.message ?? "Something went wrong... Contact help@mendable.ai", : error.message ??
"Something went wrong... Contact help@mendable.ai",
num_docs: 0, num_docs: 0,
docs: [], docs: [],
time_taken: 0, time_taken: 0,