Update queue-worker.ts

This commit is contained in:
Nicolas
2024-12-30 21:43:59 -03:00
parent e6da214aeb
commit bd81b41d5f
+94 -61
View File
@@ -48,6 +48,9 @@ import {
} from "../lib/concurrency-limit"; } from "../lib/concurrency-limit";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
import { indexPage } from "../lib/extract/index/pinecone";
import { Document } from "../controllers/v1/types";
configDotenv(); configDotenv();
class RacedRedirectError extends Error { class RacedRedirectError extends Error {
@@ -209,7 +212,10 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
const result = await processJob(job, token); const result = await processJob(job, token);
if (result.success) { if (result.success) {
try { try {
if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { if (
job.data.crawl_id &&
process.env.USE_DB_AUTHENTICATION === "true"
) {
logger.debug( logger.debug(
"Job succeeded -- has crawl associated, putting null in Redis", "Job succeeded -- has crawl associated, putting null in Redis",
); );
@@ -410,66 +416,66 @@ async function processKickoffJob(job: Job & { id: string }, token: string) {
const crawler = crawlToCrawler(job.data.crawl_id, sc); const crawler = crawlToCrawler(job.data.crawl_id, sc);
const sitemap = sc.crawlerOptions.ignoreSitemap const sitemap = sc.crawlerOptions.ignoreSitemap
? 0 ? 0
: await crawler.tryGetSitemap(async urls => { : await crawler.tryGetSitemap(async (urls) => {
if (urls.length === 0) return; if (urls.length === 0) return;
logger.debug("Using sitemap chunk of length " + urls.length, { logger.debug("Using sitemap chunk of length " + urls.length, {
sitemapLength: urls.length, sitemapLength: urls.length,
});
let jobPriority = await getJobPriority({
plan: job.data.plan,
team_id: job.data.team_id,
basePriority: 21,
});
logger.debug("Using job priority " + jobPriority, { jobPriority });
const jobs = urls.map(url => {
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls" as const,
team_id: job.data.team_id,
plan: job.data.plan!,
crawlerOptions: job.data.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
internalOptions: sc.internalOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
sitemapped: true,
webhook: job.data.webhook,
v1: job.data.v1,
},
opts: {
jobId: uuid,
priority: 20,
},
};
});
logger.debug("Locking URLs...");
await lockURLs(
job.data.crawl_id,
sc,
jobs.map((x) => x.data.url),
);
logger.debug("Adding scrape jobs to Redis...");
await addCrawlJobs(
job.data.crawl_id,
jobs.map((x) => x.opts.jobId),
);
logger.debug("Adding scrape jobs to BullMQ...");
await addScrapeJobs(jobs);
}); });
let jobPriority = await getJobPriority({
plan: job.data.plan,
team_id: job.data.team_id,
basePriority: 21,
});
logger.debug("Using job priority " + jobPriority, { jobPriority });
const jobs = urls.map((url) => {
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls" as const,
team_id: job.data.team_id,
plan: job.data.plan!,
crawlerOptions: job.data.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
internalOptions: sc.internalOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
sitemapped: true,
webhook: job.data.webhook,
v1: job.data.v1,
},
opts: {
jobId: uuid,
priority: 20,
},
};
});
logger.debug("Locking URLs...");
await lockURLs(
job.data.crawl_id,
sc,
jobs.map((x) => x.data.url),
);
logger.debug("Adding scrape jobs to Redis...");
await addCrawlJobs(
job.data.crawl_id,
jobs.map((x) => x.opts.jobId),
);
logger.debug("Adding scrape jobs to BullMQ...");
await addScrapeJobs(jobs);
});
if (sitemap === 0) { if (sitemap === 0) {
logger.debug("Sitemap not found or ignored.", { logger.debug("Sitemap not found or ignored.", {
ignoreSitemap: sc.crawlerOptions.ignoreSitemap, ignoreSitemap: sc.crawlerOptions.ignoreSitemap,
}); });
logger.debug("Locking URL..."); logger.debug("Locking URL...");
await lockURL(job.data.crawl_id, sc, job.data.url); await lockURL(job.data.crawl_id, sc, job.data.url);
const jobId = uuidv4(); const jobId = uuidv4();
@@ -511,14 +517,33 @@ async function processKickoffJob(job: Job & { id: string }, token: string) {
"crawl.started", "crawl.started",
); );
} }
return { success: true } return { success: true };
} catch (error) { } catch (error) {
logger.error("An error occurred!", { error }) logger.error("An error occurred!", { error });
return { success: false, error }; return { success: false, error };
} }
} }
async function indexJob(job: Job & { id: string }, document: Document) {
if (
document &&
document.markdown &&
job.data.team_id === process.env.BACKGROUND_INDEX_TEAM_ID!
) {
indexPage({
document: document,
originUrl: job.data.crawl_id
? (await getCrawl(job.data.crawl_id))?.originUrl!
: document.metadata.sourceURL!,
crawlId: job.data.crawl_id,
teamId: job.data.team_id,
}).catch((error) => {
_logger.error("Error indexing page", { error });
});
}
}
async function processJob(job: Job & { id: string }, token: string) { async function processJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({ const logger = _logger.child({
module: "queue-worker", module: "queue-worker",
@@ -623,14 +648,18 @@ async function processJob(job: Job & { id: string }, token: string) {
normalizeURL(doc.metadata.sourceURL, sc) normalizeURL(doc.metadata.sourceURL, sc)
) { ) {
const crawler = crawlToCrawler(job.data.crawl_id, sc); const crawler = crawlToCrawler(job.data.crawl_id, sc);
if (crawler.filterURL(doc.metadata.url, doc.metadata.sourceURL) === null) { if (
throw new Error("Redirected target URL is not allowed by crawlOptions"); // TODO: make this its own error type that is ignored by error tracking crawler.filterURL(doc.metadata.url, doc.metadata.sourceURL) === null
) {
throw new Error(
"Redirected target URL is not allowed by crawlOptions",
); // TODO: make this its own error type that is ignored by error tracking
} }
if (isUrlBlocked(doc.metadata.url)) { if (isUrlBlocked(doc.metadata.url)) {
throw new Error(BLOCKLISTED_URL_MESSAGE); // TODO: make this its own error type that is ignored by error tracking throw new Error(BLOCKLISTED_URL_MESSAGE); // TODO: make this its own error type that is ignored by error tracking
} }
const p1 = generateURLPermutations(normalizeURL(doc.metadata.url, sc)); const p1 = generateURLPermutations(normalizeURL(doc.metadata.url, sc));
const p2 = generateURLPermutations( const p2 = generateURLPermutations(
normalizeURL(doc.metadata.sourceURL, sc), normalizeURL(doc.metadata.sourceURL, sc),
@@ -675,6 +704,8 @@ async function processJob(job: Job & { id: string }, token: string) {
true, true,
); );
indexJob(job, doc);
logger.debug("Declaring job as done..."); logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, true); await addCrawlJobDone(job.data.crawl_id, job.id, true);
@@ -755,6 +786,8 @@ async function processJob(job: Job & { id: string }, token: string) {
} }
await finishCrawlIfNeeded(job, sc); await finishCrawlIfNeeded(job, sc);
} else {
indexJob(job, doc);
} }
logger.info(`🐂 Job done ${job.id}`); logger.info(`🐂 Job done ${job.id}`);