diff --git a/apps/api/src/controllers/v0/crawl.ts b/apps/api/src/controllers/v0/crawl.ts index ceeaa436..9659c218 100644 --- a/apps/api/src/controllers/v0/crawl.ts +++ b/apps/api/src/controllers/v0/crawl.ts @@ -177,56 +177,51 @@ export async function crawlController(req: Request, res: Response) { await saveCrawl(id, sc); - const sitemap = sc.crawlerOptions?.ignoreSitemap - ? null - : await crawler.tryGetSitemap(); + const sitemap = sc.crawlerOptions.ignoreSitemap + ? 0 + : await crawler.tryGetSitemap(async urls => { + if (urls.length === 0) return; + + let jobPriority = await getJobPriority({ plan, team_id, basePriority: 21 }); + const jobs = urls.map(url => { + const uuid = uuidv4(); + return { + name: uuid, + data: { + url, + mode: "single_urls", + crawlerOptions, + scrapeOptions, + internalOptions, + team_id, + plan, + origin: req.body.origin ?? defaultOrigin, + crawl_id: id, + sitemapped: true, + }, + opts: { + jobId: uuid, + priority: jobPriority, + }, + }; + }); - if (sitemap !== null && sitemap.length > 0) { - let jobPriority = 20; - // If it is over 1000, we need to get the job priority, - // otherwise we can use the default priority of 20 - if (sitemap.length > 1000) { - // set base to 21 - jobPriority = await getJobPriority({ plan, team_id, basePriority: 21 }); - } - const jobs = sitemap.map((x) => { - const url = x.url; - const uuid = uuidv4(); - return { - name: uuid, - data: { - url, - mode: "single_urls", - crawlerOptions, - scrapeOptions, - internalOptions, - team_id, - plan, - origin: req.body.origin ?? defaultOrigin, - crawl_id: id, - sitemapped: true, - }, - opts: { - jobId: uuid, - priority: jobPriority, - }, - }; - }); + await lockURLs( + id, + sc, + jobs.map((x) => x.data.url), + ); + await addCrawlJobs( + id, + jobs.map((x) => x.opts.jobId), + ); + for (const job of jobs) { + // add with sentry instrumentation + await addScrapeJob(job.data as any, {}, job.opts.jobId); + } + }); - await lockURLs( - id, - sc, - jobs.map((x) => x.data.url), - ); - await addCrawlJobs( - id, - jobs.map((x) => x.opts.jobId), - ); - for (const job of jobs) { - // add with sentry instrumentation - await addScrapeJob(job.data as any, {}, job.opts.jobId); - } - } else { + if (sitemap === 0) { await lockURL(id, sc, url); // Not needed, first one should be 15. diff --git a/apps/api/src/controllers/v0/crawlPreview.ts b/apps/api/src/controllers/v0/crawlPreview.ts index f9462c3d..9ba9bd46 100644 --- a/apps/api/src/controllers/v0/crawlPreview.ts +++ b/apps/api/src/controllers/v0/crawlPreview.ts @@ -113,32 +113,32 @@ export async function crawlPreviewController(req: Request, res: Response) { const crawler = crawlToCrawler(id, sc); const sitemap = sc.crawlerOptions?.ignoreSitemap - ? null - : await crawler.tryGetSitemap(); + ? 0 + : await crawler.tryGetSitemap(async urls => { + for (const url of urls) { + await lockURL(id, sc, url); + const jobId = uuidv4(); + await addScrapeJob( + { + url, + mode: "single_urls", + team_id, + plan: plan!, + crawlerOptions, + scrapeOptions, + internalOptions, + origin: "website-preview", + crawl_id: id, + sitemapped: true, + }, + {}, + jobId, + ); + await addCrawlJob(id, jobId); + } + }); - if (sitemap !== null) { - for (const url of sitemap.map((x) => x.url)) { - await lockURL(id, sc, url); - const jobId = uuidv4(); - await addScrapeJob( - { - url, - mode: "single_urls", - team_id, - plan: plan!, - crawlerOptions, - scrapeOptions, - internalOptions, - origin: "website-preview", - crawl_id: id, - sitemapped: true, - }, - {}, - jobId, - ); - await addCrawlJob(id, jobId); - } - } else { + if (sitemap === 0) { await lockURL(id, sc, url); const jobId = uuidv4(); await addScrapeJob( diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index 59db16d8..1aec86c8 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -115,7 +115,7 @@ export async function crawlStatusController( const status: Exclude["status"] = sc.cancelled ? "cancelled" - : validJobStatuses.every((x) => x[1] === "completed") + : (validJobStatuses.every((x) => x[1] === "completed") && validJobStatuses.length > 0) ? "completed" : "scraping"; diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index c2e3369f..a759f448 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -18,7 +18,7 @@ import { } from "../../lib/crawl-redis"; import { logCrawl } from "../../services/logging/crawl_log"; import { getScrapeQueue } from "../../services/queue-service"; -import { addScrapeJob, addScrapeJobs } from "../../services/queue-jobs"; +import { _addScrapeJobToBullMQ, addScrapeJob, addScrapeJobs } from "../../services/queue-jobs"; import { logger as _logger } from "../../lib/logger"; import { getJobPriority } from "../../lib/job-priority"; import { callWebhook } from "../../services/webhook"; @@ -111,113 +111,20 @@ export async function crawlController( await saveCrawl(id, sc); - const sitemap = sc.crawlerOptions.ignoreSitemap - ? null - : await crawler.tryGetSitemap(); - - if (sitemap !== null && sitemap.length > 0) { - logger.debug("Using sitemap of length " + sitemap.length, { - sitemapLength: sitemap.length, - }); - let jobPriority = 20; - // If it is over 1000, we need to get the job priority, - // otherwise we can use the default priority of 20 - if (sitemap.length > 1000) { - // set base to 21 - jobPriority = await getJobPriority({ - plan: req.auth.plan, - team_id: req.auth.team_id, - basePriority: 21, - }); - } - logger.debug("Using job priority " + jobPriority, { jobPriority }); - - const jobs = sitemap.map((x) => { - const url = x.url; - const uuid = uuidv4(); - return { - name: uuid, - data: { - url, - mode: "single_urls" as const, - team_id: req.auth.team_id, - plan: req.auth.plan!, - crawlerOptions, - scrapeOptions, - internalOptions: sc.internalOptions, - origin: "api", - crawl_id: id, - sitemapped: true, - webhook: req.body.webhook, - v1: true, - }, - opts: { - jobId: uuid, - priority: 20, - }, - }; - }); - - logger.debug("Locking URLs..."); - await lockURLs( - id, - sc, - jobs.map((x) => x.data.url), - ); - logger.debug("Adding scrape jobs to Redis..."); - await addCrawlJobs( - id, - jobs.map((x) => x.opts.jobId), - ); - logger.debug("Adding scrape jobs to BullMQ..."); - await addScrapeJobs(jobs); - } else { - logger.debug("Sitemap not found or ignored.", { - ignoreSitemap: sc.crawlerOptions.ignoreSitemap, - }); - - logger.debug("Locking URL..."); - await lockURL(id, sc, req.body.url); - const jobId = uuidv4(); - logger.debug("Adding scrape job to Redis...", { jobId }); - await addScrapeJob( - { - url: req.body.url, - mode: "single_urls", - team_id: req.auth.team_id, - crawlerOptions, - scrapeOptions: scrapeOptionsSchema.parse(scrapeOptions), - internalOptions: sc.internalOptions, - plan: req.auth.plan!, - origin: "api", - crawl_id: id, - webhook: req.body.webhook, - v1: true, - }, - { - priority: 15, - }, - jobId, - ); - logger.debug("Adding scrape job to BullMQ...", { jobId }); - await addCrawlJob(id, jobId); - } - logger.debug("Done queueing jobs!"); - - if (req.body.webhook) { - logger.debug("Calling webhook with crawl.started...", { - webhook: req.body.webhook, - }); - await callWebhook( - req.auth.team_id, - id, - null, - req.body.webhook, - true, - "crawl.started", - ); - } - + await _addScrapeJobToBullMQ({ + url: req.body.url, + mode: "kickoff" as const, + team_id: req.auth.team_id, + plan: req.auth.plan, + crawlerOptions, + scrapeOptions: sc.scrapeOptions, + internalOptions: sc.internalOptions, + origin: "api", + crawl_id: id, + webhook: req.body.webhook, + v1: true, + }, {}, crypto.randomUUID(), 10); + const protocol = process.env.ENV === "local" ? req.protocol : "https"; return res.status(200).json({ diff --git a/apps/api/src/controllers/v1/map.ts b/apps/api/src/controllers/v1/map.ts index 27a926fc..3274dd93 100644 --- a/apps/api/src/controllers/v1/map.ts +++ b/apps/api/src/controllers/v1/map.ts @@ -86,11 +86,12 @@ export async function getMapResults({ // If sitemapOnly is true, only get links from sitemap if (crawlerOptions.sitemapOnly) { - const sitemap = await crawler.tryGetSitemap(true, true); - if (sitemap !== null) { - sitemap.forEach((x) => { - links.push(x.url); + const sitemap = await crawler.tryGetSitemap(urls => { + urls.forEach((x) => { + links.push(x); }); + }, true, true); + if (sitemap > 0) { links = links .slice(1) .map((x) => { @@ -143,8 +144,10 @@ export async function getMapResults({ } // Parallelize sitemap fetch with serper search - const [sitemap, ...searchResults] = await Promise.all([ - ignoreSitemap ? null : crawler.tryGetSitemap(true), + const [_, ...searchResults] = await Promise.all([ + ignoreSitemap ? null : crawler.tryGetSitemap(urls => { + links.push(...urls); + }, true), ...(cachedResult ? [] : pagePromises), ]); @@ -152,12 +155,6 @@ export async function getMapResults({ allResults = searchResults; } - if (sitemap !== null) { - sitemap.forEach((x) => { - links.push(x.url); - }); - } - mapResults = allResults .flat() .filter((result) => result !== null && result !== undefined); diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index 41bee2d6..52bea9e5 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -4,9 +4,10 @@ import { URL } from "url"; import { getLinksFromSitemap } from "./sitemap"; import robotsParser from "robots-parser"; import { getURLDepth } from "./utils/maxDepthUtils"; -import { axiosTimeout } from "../../../src/lib/timeout"; -import { logger as _logger } from "../../../src/lib/logger"; +import { axiosTimeout } from "../../lib/timeout"; +import { logger as _logger } from "../../lib/logger"; import https from "https"; +import { redisConnection } from "../../services/queue-service"; export class WebCrawler { private jobId: string; private initialUrl: string; @@ -198,26 +199,60 @@ export class WebCrawler { } public async tryGetSitemap( + urlsHandler: (urls: string[]) => unknown, fromMap: boolean = false, onlySitemap: boolean = false, - ): Promise<{ url: string; html: string }[] | null> { + ): Promise { this.logger.debug(`Fetching sitemap links from ${this.initialUrl}`, { method: "tryGetSitemap", }); - const sitemapLinks = await this.tryFetchSitemapLinks(this.initialUrl); - if (fromMap && onlySitemap) { - return sitemapLinks.map((link) => ({ url: link, html: "" })); + let leftOfLimit = this.limit; + + const normalizeUrl = (url: string) => { + url = url.replace(/^https?:\/\//, "").replace(/^www\./, ""); + if (url.endsWith("/")) { + url = url.slice(0, -1); + } + return url; + }; + + const _urlsHandler = async (urls: string[]) => { + let uniqueURLs: string[] = []; + for (const url of urls) { + if (await redisConnection.sadd("sitemap:" + this.jobId + ":links", normalizeUrl(url))) { + uniqueURLs.push(url); + } + } + + await redisConnection.expire("sitemap:" + this.jobId + ":links", 3600, "NX"); + if (uniqueURLs.length > 0) { + urlsHandler(uniqueURLs); + } + }; + + let count = await this.tryFetchSitemapLinks(this.initialUrl, (urls: string[]) => { + if (fromMap && onlySitemap) { + return urlsHandler(urls); + } else { + let filteredLinks = this.filterLinks( + [...new Set(urls)], + leftOfLimit, + this.maxCrawledDepth, + fromMap, + ); + leftOfLimit -= filteredLinks.length; + return _urlsHandler(filteredLinks); + } + }); + + if (count > 0) { + if (await redisConnection.sadd("sitemap:" + this.jobId + ":links", normalizeUrl(this.initialUrl))) { + urlsHandler([this.initialUrl]); + } + count++; } - if (sitemapLinks.length > 0) { - let filteredLinks = this.filterLinks( - [...new Set(sitemapLinks)], - this.limit, - this.maxCrawledDepth, - fromMap, - ); - return filteredLinks.map((link) => ({ url: link, html: "" })); - } - return null; + + return count; } public filterURL(href: string, url: string): string | null { @@ -436,22 +471,15 @@ export class WebCrawler { return socialMediaOrEmail.some((ext) => url.includes(ext)); } - private async tryFetchSitemapLinks(url: string): Promise { - const normalizeUrl = (url: string) => { - url = url.replace(/^https?:\/\//, "").replace(/^www\./, ""); - if (url.endsWith("/")) { - url = url.slice(0, -1); - } - return url; - }; - + private async tryFetchSitemapLinks(url: string, urlsHandler: (urls: string[]) => unknown): Promise { const sitemapUrl = url.endsWith(".xml") ? url : `${url}/sitemap.xml`; - let sitemapLinks: string[] = []; + + let sitemapCount: number = 0; // Try to get sitemap from the provided URL first try { - sitemapLinks = await getLinksFromSitemap( - { sitemapUrl, allUrls: [], mode: "fire-engine" }, + sitemapCount = await getLinksFromSitemap( + { sitemapUrl, urlsHandler, mode: "fire-engine" }, this.logger, ); } catch (error) { @@ -476,20 +504,18 @@ export class WebCrawler { try { // Get all links from the main domain's sitemap - const mainDomainLinks = await getLinksFromSitemap( - { sitemapUrl: mainDomainSitemapUrl, allUrls: [], mode: "fire-engine" }, + sitemapCount += await getLinksFromSitemap( + { sitemapUrl: mainDomainSitemapUrl, urlsHandler(urls) { + urlsHandler(urls.filter(link => { + try { + const linkUrl = new URL(link); + return linkUrl.hostname.endsWith(hostname); + } catch { + } + })) + }, mode: "fire-engine" }, this.logger, ); - // Filter links to only include those pointing to the current subdomain - const subdomainLinks = mainDomainLinks.filter(link => { - try { - const linkUrl = new URL(link); - return linkUrl.hostname.endsWith(hostname); - } catch { - return false; - } - }); - sitemapLinks = [...new Set([...sitemapLinks, ...subdomainLinks])]; } catch (error) { this.logger.debug( `Failed to fetch main domain sitemap from ${mainDomainSitemapUrl}`, @@ -506,15 +532,13 @@ export class WebCrawler { } // If no sitemap found yet, try the baseUrl as a last resort - if (sitemapLinks.length === 0) { + if (sitemapCount === 0) { const baseUrlSitemap = `${this.baseUrl}/sitemap.xml`; try { - const baseLinks = await getLinksFromSitemap( - { sitemapUrl: baseUrlSitemap, allUrls: [], mode: "fire-engine" }, + sitemapCount += await getLinksFromSitemap( + { sitemapUrl: baseUrlSitemap, urlsHandler, mode: "fire-engine" }, this.logger, ); - - sitemapLinks = [...new Set([...sitemapLinks, ...baseLinks])]; } catch (error) { this.logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}`, { method: "tryFetchSitemapLinks", @@ -524,25 +548,14 @@ export class WebCrawler { if (error instanceof AxiosError && error.response?.status === 404) { // ignore 404 } else { - sitemapLinks = await getLinksFromSitemap( - { sitemapUrl: baseUrlSitemap, mode: "fire-engine" }, + sitemapCount += await getLinksFromSitemap( + { sitemapUrl: baseUrlSitemap, urlsHandler, mode: "fire-engine" }, this.logger, ); } } } - const normalizedUrl = normalizeUrl(url); - const normalizedSitemapLinks = sitemapLinks.map((link) => - normalizeUrl(link), - ); - // has to be greater than 0 to avoid adding the initial URL to the sitemap links, and preventing crawler to crawl - if ( - !normalizedSitemapLinks.includes(normalizedUrl) && - sitemapLinks.length > 0 - ) { - sitemapLinks.push(url); - } - return sitemapLinks; + return sitemapCount; } } diff --git a/apps/api/src/scraper/WebScraper/sitemap.ts b/apps/api/src/scraper/WebScraper/sitemap.ts index 2529c022..8028d225 100644 --- a/apps/api/src/scraper/WebScraper/sitemap.ts +++ b/apps/api/src/scraper/WebScraper/sitemap.ts @@ -11,15 +11,15 @@ const useFireEngine = export async function getLinksFromSitemap( { sitemapUrl, - allUrls = [], + urlsHandler, mode = "axios", }: { sitemapUrl: string; - allUrls?: string[]; + urlsHandler(urls: string[]): unknown, mode?: "axios" | "fire-engine"; }, logger: Logger, -): Promise { +): Promise { try { let content: string = ""; try { @@ -31,9 +31,12 @@ export async function getLinksFromSitemap( { forceEngine: "fire-engine;tlsclient", v0DisableJsDom: true }, ); if (!response.success) { - throw response.error; + logger.debug("Failed to scrape sitemap via TLSClient, falling back to axios...", { error: response.error }) + const ar = await axios.get(sitemapUrl, { timeout: axiosTimeout }); + content = ar.data; + } else { + content = response.document.rawHtml!; } - content = response.document.rawHtml!; } else { const response = await axios.get(sitemapUrl, { timeout: axiosTimeout }); content = response.data; @@ -45,11 +48,13 @@ export async function getLinksFromSitemap( sitemapUrl, error, }); - return allUrls; + + return 0; } const parsed = await parseStringPromise(content); const root = parsed.urlset || parsed.sitemapindex; + let count = 0; if (root && root.sitemap) { // Handle sitemap index files @@ -57,20 +62,18 @@ export async function getLinksFromSitemap( .filter((sitemap) => sitemap.loc && sitemap.loc.length > 0) .map((sitemap) => sitemap.loc[0]); - const sitemapPromises = sitemapUrls.map((sitemapUrl) => + const sitemapPromises: Promise[] = sitemapUrls.map((sitemapUrl) => getLinksFromSitemap( - { sitemapUrl, allUrls: [], mode }, + { sitemapUrl, urlsHandler, mode }, logger, ), ); const results = await Promise.all(sitemapPromises); - results.forEach(urls => { - allUrls.push(...urls); - }); + count = results.reduce((a,x) => a + x) } else if (root && root.url) { // Check if any URLs point to additional sitemaps - const xmlSitemaps = root.url + const xmlSitemaps: string[] = root.url .filter( (url) => url.loc && @@ -83,18 +86,13 @@ export async function getLinksFromSitemap( // Recursively fetch links from additional sitemaps const sitemapPromises = xmlSitemaps.map((sitemapUrl) => getLinksFromSitemap( - { sitemapUrl, allUrls: [], mode }, + { sitemapUrl: sitemapUrl, urlsHandler, mode }, logger, ), ); - - const results = await Promise.all(sitemapPromises); - results.forEach(urls => { - allUrls.push(...urls); - }); + count += (await Promise.all(sitemapPromises)).reduce((a,x) => a + x, 0); } - // Add regular URLs that aren't sitemaps const validUrls = root.url .filter( (url) => @@ -104,8 +102,11 @@ export async function getLinksFromSitemap( !WebCrawler.prototype.isFile(url.loc[0]), ) .map((url) => url.loc[0]); - allUrls.push(...validUrls); + count += validUrls.length; + urlsHandler(validUrls); } + + return count; } catch (error) { logger.debug(`Error processing sitemapUrl: ${sitemapUrl}`, { method: "getLinksFromSitemap", @@ -115,7 +116,7 @@ export async function getLinksFromSitemap( }); } - return [...new Set(allUrls)]; + return 0; } export const fetchSitemapData = async ( diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 654f6cda..f59babe4 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -29,7 +29,7 @@ async function _addScrapeJobToConcurrencyQueue( }); } -async function _addScrapeJobToBullMQ( +export async function _addScrapeJobToBullMQ( webScraperOptions: any, options: any, jobId: string, @@ -138,7 +138,6 @@ export async function addScrapeJobs( if (jobs[0].data && jobs[0].data.team_id && jobs[0].data.plan) { const now = Date.now(); const limit = await getConcurrencyLimitMax(jobs[0].data.plan); - console.log("CC limit", limit); cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now); countCanBeDirectlyAdded = Math.max( diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 4ef9610d..e8c8bdf3 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -18,16 +18,18 @@ import { v4 as uuidv4 } from "uuid"; import { addCrawlJob, addCrawlJobDone, + addCrawlJobs, crawlToCrawler, finishCrawl, generateURLPermutations, getCrawl, getCrawlJobs, lockURL, + lockURLs, normalizeURL, } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; -import { addScrapeJob } from "./queue-jobs"; +import { addScrapeJob, addScrapeJobs } from "./queue-jobs"; import { addJobPriority, deleteJobPriority, @@ -191,22 +193,34 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { await addJobPriority(job.data.team_id, job.id); let err = null; try { - const result = await processJob(job, token); - if (result.success) { - try { - if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { - logger.debug( - "Job succeeded -- has crawl associated, putting null in Redis", - ); + if (job.data?.mode === "kickoff") { + const result = await processKickoffJob(job, token); + if (result.success) { + try { await job.moveToCompleted(null, token, false); - } else { - logger.debug("Job succeeded -- putting result in Redis"); - await job.moveToCompleted(result.document, token, false); - } - } catch (e) {} + } catch (e) {} + } else { + logger.debug("Job failed", { result, mode: job.data.mode }); + await job.moveToFailed((result as any).error, token, false); + } } else { - logger.debug("Job failed", { result }); - await job.moveToFailed((result as any).error, token, false); + const result = await processJob(job, token); + if (result.success) { + try { + if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { + logger.debug( + "Job succeeded -- has crawl associated, putting null in Redis", + ); + await job.moveToCompleted(null, token, false); + } else { + logger.debug("Job succeeded -- putting result in Redis"); + await job.moveToCompleted(result.document, token, false); + } + } catch (e) {} + } else { + logger.debug("Job failed", { result }); + await job.moveToFailed((result as any).error, token, false); + } } } catch (error) { logger.debug("Job failed", { error }); @@ -379,6 +393,130 @@ const workerFun = async ( workerFun(getScrapeQueue(), processJobInternal); +async function processKickoffJob(job: Job & { id: string }, token: string) { + const logger = _logger.child({ + module: "queue-worker", + method: "processKickoffJob", + jobId: job.id, + scrapeId: job.id, + crawlId: job.data?.crawl_id ?? undefined, + teamId: job.data?.team_id ?? undefined, + }); + + try { + const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; + const crawler = crawlToCrawler(job.data.crawl_id, sc); + + const sitemap = sc.crawlerOptions.ignoreSitemap + ? 0 + : await crawler.tryGetSitemap(async urls => { + if (urls.length === 0) return; + + logger.debug("Using sitemap chunk of length " + urls.length, { + sitemapLength: urls.length, + }); + + let jobPriority = await getJobPriority({ + plan: job.data.plan, + team_id: job.data.team_id, + basePriority: 21, + }); + logger.debug("Using job priority " + jobPriority, { jobPriority }); + + const jobs = urls.map(url => { + const uuid = uuidv4(); + return { + name: uuid, + data: { + url, + mode: "single_urls" as const, + team_id: job.data.team_id, + plan: job.data.plan!, + crawlerOptions: job.data.crawlerOptions, + scrapeOptions: job.data.scrapeOptions, + internalOptions: sc.internalOptions, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + sitemapped: true, + webhook: job.data.webhook, + v1: job.data.v1, + }, + opts: { + jobId: uuid, + priority: 20, + }, + }; + }); + + logger.debug("Locking URLs..."); + await lockURLs( + job.data.crawl_id, + sc, + jobs.map((x) => x.data.url), + ); + logger.debug("Adding scrape jobs to Redis..."); + await addCrawlJobs( + job.data.crawl_id, + jobs.map((x) => x.opts.jobId), + ); + logger.debug("Adding scrape jobs to BullMQ..."); + await addScrapeJobs(jobs); + }); + + if (sitemap === 0) { + logger.debug("Sitemap not found or ignored.", { + ignoreSitemap: sc.crawlerOptions.ignoreSitemap, + }); + + logger.debug("Locking URL..."); + await lockURL(job.data.crawl_id, sc, job.data.url); + const jobId = uuidv4(); + logger.debug("Adding scrape job to Redis...", { jobId }); + await addScrapeJob( + { + url: job.data.url, + mode: "single_urls", + team_id: job.data.team_id, + crawlerOptions: job.data.crawlerOptions, + scrapeOptions: scrapeOptions.parse(job.data.scrapeOptions), + internalOptions: sc.internalOptions, + plan: job.data.plan!, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + webhook: job.data.webhook, + v1: job.data.v1, + }, + { + priority: 15, + }, + jobId, + ); + logger.debug("Adding scrape job to BullMQ...", { jobId }); + await addCrawlJob(job.data.crawl_id, jobId); + } + logger.debug("Done queueing jobs!"); + + if (job.data.webhook) { + logger.debug("Calling webhook with crawl.started...", { + webhook: job.data.webhook, + }); + await callWebhook( + job.data.team_id, + job.data.crawl_id, + null, + job.data.webhook, + true, + "crawl.started", + ); + } + + return { success: true } + } catch (error) { + logger.error("An error occurred!", { error }) + return { success: false, error }; + } +} + async function processJob(job: Job & { id: string }, token: string) { const logger = _logger.child({ module: "queue-worker",