2024-08-20 19:25:19 +02:00
import "dotenv/config" ;
2024-09-01 14:19:43 -03:00
import "./sentry" ;
2024-08-21 17:58:27 +02:00
import * as Sentry from "@sentry/node" ;
2024-04-15 17:01:47 -04:00
import { CustomError } from "../lib/custom-error" ;
2024-07-30 14:44:13 -04:00
import {
getScrapeQueue ,
2025-01-03 20:44:27 -03:00
getExtractQueue ,
2025-02-19 12:44:21 -03:00
getDeepResearchQueue ,
2024-07-30 14:44:13 -04:00
redisConnection ,
2024-12-11 19:51:08 -03:00
scrapeQueueName ,
2025-01-03 20:44:27 -03:00
extractQueueName ,
2025-02-19 12:44:21 -03:00
deepResearchQueueName ,
2025-01-19 13:09:29 -03:00
getIndexQueue ,
2025-02-19 12:42:33 -05:00
getGenerateLlmsTxtQueue ,
2025-02-27 16:18:03 -03:00
getBillingQueue ,
2024-07-30 14:44:13 -04:00
} from "./queue-service" ;
2024-04-15 17:01:47 -04:00
import { startWebScraperPipeline } from "../main/runWebScraper" ;
import { callWebhook } from "./webhook" ;
2024-04-20 13:53:11 -07:00
import { logJob } from "./logging/log_job" ;
2024-09-26 20:23:13 +02:00
import { Job , Queue } from "bullmq" ;
2024-12-05 22:06:07 +01:00
import { logger as _logger } from "../lib/logger" ;
2024-07-30 13:27:23 -04:00
import { Worker } from "bullmq" ;
import systemMonitor from "./system-monitor" ;
import { v4 as uuidv4 } from "uuid" ;
2024-09-01 14:19:43 -03:00
import {
addCrawlJob ,
addCrawlJobDone ,
2024-12-27 19:59:26 +01:00
addCrawlJobs ,
2024-09-01 14:19:43 -03:00
crawlToCrawler ,
finishCrawl ,
2025-01-17 16:04:01 +01:00
finishCrawlKickoff ,
2024-12-03 17:53:17 -03:00
generateURLPermutations ,
2024-09-01 14:19:43 -03:00
getCrawl ,
2025-01-09 09:51:16 +01:00
getCrawlJobCount ,
2024-09-01 14:19:43 -03:00
getCrawlJobs ,
2025-02-13 17:14:24 +01:00
getDoneJobsOrderedLength ,
2024-09-01 14:19:43 -03:00
lockURL ,
2024-12-27 19:59:26 +01:00
lockURLs ,
2025-01-07 19:15:23 +01:00
lockURLsIndividually ,
2024-12-11 19:51:08 -03:00
normalizeURL ,
2025-01-07 19:15:23 +01:00
saveCrawl ,
2024-09-01 14:19:43 -03:00
} from "../lib/crawl-redis" ;
2024-08-13 20:51:43 +02:00
import { StoredCrawl } from "../lib/crawl-redis" ;
2024-12-27 19:59:26 +01:00
import { addScrapeJob , addScrapeJobs } from "./queue-jobs" ;
2024-09-01 14:19:43 -03:00
import {
addJobPriority ,
deleteJobPriority ,
2024-12-11 19:51:08 -03:00
getJobPriority ,
2024-09-01 14:19:43 -03:00
} from "../../src/lib/job-priority" ;
2024-09-28 00:19:46 +02:00
import { PlanType , RateLimiterMode } from "../types" ;
2024-11-07 20:57:33 +01:00
import { getJobs } from "..//controllers/v1/crawl-status" ;
2024-09-04 15:57:57 -03:00
import { configDotenv } from "dotenv" ;
2024-11-07 20:57:33 +01:00
import { scrapeOptions } from "../controllers/v1/types" ;
2024-09-28 00:19:46 +02:00
import { getRateLimiterPoints } from "./rate-limiter" ;
2024-12-11 19:46:11 -03:00
import {
cleanOldConcurrencyLimitEntries ,
pushConcurrencyLimitActiveJob ,
removeConcurrencyLimitActiveJob ,
2024-12-11 19:51:08 -03:00
takeConcurrencyLimitedJob ,
2024-12-11 19:46:11 -03:00
} from "../lib/concurrency-limit" ;
2024-12-30 14:41:31 +01:00
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist" ;
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings" ;
2024-12-30 21:43:59 -03:00
import { indexPage } from "../lib/extract/index/pinecone" ;
import { Document } from "../controllers/v1/types" ;
2025-01-03 20:44:27 -03:00
import { performExtraction } from "../lib/extract/extraction-service" ;
2025-01-03 21:48:28 -03:00
import { supabase_service } from "../services/supabase" ;
2025-01-03 23:54:03 -03:00
import { normalizeUrl , normalizeUrlOnlyHostname } from "../lib/canonical-url" ;
2025-01-07 16:21:51 -03:00
import { saveExtract , updateExtract } from "../lib/extract/extract-redis" ;
2025-01-15 17:22:52 +01:00
import { billTeam } from "./billing/credit_billing" ;
2025-01-19 12:33:44 -03:00
import { saveCrawlMap } from "./indexing/crawl-maps-index" ;
2025-02-19 12:44:21 -03:00
import { updateDeepResearch } from "../lib/deep-research/deep-research-redis" ;
import { performDeepResearch } from "../lib/deep-research/deep-research-service" ;
2025-02-19 12:42:33 -05:00
import { performGenerateLlmsTxt } from "../lib/generate-llmstxt/generate-llmstxt-service" ;
import { updateGeneratedLlmsTxt } from "../lib/generate-llmstxt/generate-llmstxt-redis" ;
2024-12-30 21:43:59 -03:00
2024-09-04 15:57:57 -03:00
configDotenv ( ) ;
2024-05-20 13:36:34 -07:00
2024-12-10 22:34:26 +01:00
class RacedRedirectError extends Error {
constructor ( ) {
2024-12-11 19:46:11 -03:00
super ( "Raced redirect error" ) ;
2024-12-10 22:34:26 +01:00
}
}
2024-07-30 13:27:23 -04:00
const sleep = ( ms : number ) = > new Promise ( ( resolve ) = > setTimeout ( resolve , ms ) ) ;
2024-04-15 17:01:47 -04:00
2024-07-30 13:27:23 -04:00
const workerLockDuration = Number ( process . env . WORKER_LOCK_DURATION ) || 60000 ;
const workerStalledCheckInterval =
Number ( process . env . WORKER_STALLED_CHECK_INTERVAL ) || 30000 ;
const jobLockExtendInterval =
Number ( process . env . JOB_LOCK_EXTEND_INTERVAL ) || 15000 ;
const jobLockExtensionTime =
2024-08-07 20:24:16 +02:00
Number ( process . env . JOB_LOCK_EXTENSION_TIME ) || 60000 ;
2024-07-30 13:27:23 -04:00
const cantAcceptConnectionInterval =
Number ( process . env . CANT_ACCEPT_CONNECTION_INTERVAL ) || 2000 ;
const connectionMonitorInterval =
Number ( process . env . CONNECTION_MONITOR_INTERVAL ) || 10 ;
const gotJobInterval = Number ( process . env . CONNECTION_MONITOR_INTERVAL ) || 20 ;
2024-07-11 20:08:21 +02:00
2025-01-09 16:04:59 +01:00
const runningJobs : Set < string > = new Set ( ) ;
2024-12-11 19:46:11 -03:00
async function finishCrawlIfNeeded ( job : Job & { id : string } , sc : StoredCrawl ) {
2024-11-26 16:28:45 +01:00
if ( await finishCrawl ( job . data . crawl_id ) ) {
2025-01-03 22:50:53 -03:00
( async ( ) = > {
2025-01-22 18:47:44 -03:00
const originUrl = sc . originUrl
? normalizeUrlOnlyHostname ( sc . originUrl )
: undefined ;
2025-01-13 22:30:15 -03:00
// Get all visited unique URLs from Redis
2025-01-03 22:50:53 -03:00
const visitedUrls = await redisConnection . smembers (
2025-01-13 22:30:15 -03:00
"crawl:" + job . data . crawl_id + ":visited_unique" ,
2025-01-03 22:50:53 -03:00
) ;
// Upload to Supabase if we have URLs and this is a crawl (not a batch scrape)
2025-01-10 18:35:10 -03:00
if (
visitedUrls . length > 0 &&
job . data . crawlerOptions !== null &&
originUrl
) {
2025-01-19 13:09:29 -03:00
// Queue the indexing job instead of doing it directly
await getIndexQueue ( ) . add (
job . data . crawl_id ,
{
originUrl ,
visitedUrls ,
} ,
{
priority : 10 ,
2025-01-22 18:47:44 -03:00
} ,
2025-01-19 13:09:29 -03:00
) ;
2025-01-03 21:48:28 -03:00
}
2025-01-03 22:50:53 -03:00
} ) ( ) ;
2025-01-03 21:48:28 -03:00
2024-11-26 16:28:45 +01:00
if ( ! job . data . v1 ) {
const jobIDs = await getCrawlJobs ( job . data . crawl_id ) ;
2024-12-11 19:46:11 -03:00
const jobs = ( await getJobs ( jobIDs ) ) . sort (
2024-12-11 19:51:08 -03:00
( a , b ) = > a . timestamp - b . timestamp ,
2024-12-11 19:46:11 -03:00
) ;
2024-11-26 16:28:45 +01:00
// const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
2024-12-11 19:46:11 -03:00
const jobStatus = sc . cancelled // || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed" ;
const fullDocs = jobs
. map ( ( x ) = >
x . returnvalue
? Array . isArray ( x . returnvalue )
? x . returnvalue [ 0 ]
: x.returnvalue
2024-12-11 19:51:08 -03:00
: null ,
2024-12-11 19:46:11 -03:00
)
. filter ( ( x ) = > x !== null ) ;
2024-11-26 16:28:45 +01:00
await logJob ( {
job_id : job.data.crawl_id ,
success : jobStatus === "completed" ,
message : sc.cancelled ? "Cancelled" : undefined ,
num_docs : fullDocs.length ,
docs : [ ] ,
time_taken : ( Date . now ( ) - sc . createdAt ) / 1000 ,
team_id : job.data.team_id ,
mode : job.data.crawlerOptions !== null ? "crawl" : "batch_scrape" ,
url : sc.originUrl ! ,
scrapeOptions : sc.scrapeOptions ,
crawlerOptions : sc.crawlerOptions ,
2024-12-11 19:51:08 -03:00
origin : job.data.origin ,
2024-11-26 16:28:45 +01:00
} ) ;
const data = {
success : jobStatus !== "failed" ,
result : {
links : fullDocs.map ( ( doc ) = > {
return {
content : doc ,
2024-12-11 19:51:08 -03:00
source : doc?.metadata?.sourceURL ? ? doc ? . url ? ? "" ,
2024-11-26 16:28:45 +01:00
} ;
2024-12-11 19:51:08 -03:00
} ) ,
2024-11-26 16:28:45 +01:00
} ,
project_id : job.data.project_id ,
2024-12-11 19:51:08 -03:00
docs : fullDocs ,
2024-11-26 16:28:45 +01:00
} ;
// v0 web hooks, call when done with all the data
if ( ! job . data . v1 ) {
callWebhook (
job . data . team_id ,
job . data . crawl_id ,
data ,
job . data . webhook ,
job . data . v1 ,
2024-12-11 19:46:11 -03:00
job . data . crawlerOptions !== null
? "crawl.completed"
2024-12-11 19:51:08 -03:00
: "batch_scrape.completed" ,
2024-11-26 16:28:45 +01:00
) ;
}
} else {
2025-02-13 17:14:24 +01:00
const num_docs = await getDoneJobsOrderedLength ( job . data . crawl_id ) ;
2024-12-11 19:46:11 -03:00
const jobStatus = sc . cancelled ? "failed" : "completed" ;
2024-11-26 16:28:45 +01:00
2024-12-11 19:46:11 -03:00
await logJob (
{
job_id : job.data.crawl_id ,
success : jobStatus === "completed" ,
message : sc.cancelled ? "Cancelled" : undefined ,
2025-01-09 09:51:16 +01:00
num_docs ,
2024-12-11 19:46:11 -03:00
docs : [ ] ,
time_taken : ( Date . now ( ) - sc . createdAt ) / 1000 ,
team_id : job.data.team_id ,
scrapeOptions : sc.scrapeOptions ,
mode : job.data.crawlerOptions !== null ? "crawl" : "batch_scrape" ,
url :
sc?.originUrl ? ?
( job . data . crawlerOptions === null ? "Batch Scrape" : "Unknown" ) ,
crawlerOptions : sc.crawlerOptions ,
2024-12-11 19:51:08 -03:00
origin : job.data.origin ,
2024-12-11 19:46:11 -03:00
} ,
2024-12-11 19:51:08 -03:00
true ,
2024-12-11 19:46:11 -03:00
) ;
2025-01-07 19:38:17 +01:00
// v1 web hooks, call when done with no data, but with event completed
if ( job . data . v1 && job . data . webhook ) {
callWebhook (
job . data . team_id ,
job . data . crawl_id ,
[ ] ,
job . data . webhook ,
job . data . v1 ,
job . data . crawlerOptions !== null
? "crawl.completed"
: "batch_scrape.completed" ,
) ;
}
2024-11-26 16:28:45 +01:00
}
}
}
2024-11-07 20:57:33 +01:00
const processJobInternal = async ( token : string , job : Job & { id : string } ) = > {
2024-12-11 19:46:11 -03:00
const logger = _logger . child ( {
module : "queue-worker" ,
method : "processJobInternal" ,
jobId : job.id ,
scrapeId : job.id ,
2024-12-11 19:51:08 -03:00
crawlId : job.data?.crawl_id ? ? undefined ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-12-05 22:06:07 +01:00
2024-07-30 13:27:23 -04:00
const extendLockInterval = setInterval ( async ( ) = > {
2025-01-24 19:35:25 +01:00
logger . info ( ` 🐂 Worker extending lock on job ${ job . id } ` , {
extendInterval : jobLockExtendInterval ,
extensionTime : jobLockExtensionTime ,
} ) ;
2025-02-19 20:13:22 +01:00
if ( job . data ? . mode !== "kickoff" && job . data ? . team_id ) {
await pushConcurrencyLimitActiveJob ( job . data . team_id , job . id , 60 * 1000 ) ; // 60s lock renew, just like in the queue
}
2024-07-30 13:27:23 -04:00
await job . extendLock ( token , jobLockExtensionTime ) ;
} , jobLockExtendInterval ) ;
2024-09-01 14:19:43 -03:00
await addJobPriority ( job . data . team_id , job . id ) ;
2024-08-22 16:47:38 +02:00
let err = null ;
2024-07-30 13:27:23 -04:00
try {
2024-12-27 19:59:26 +01:00
if ( job . data ? . mode === "kickoff" ) {
const result = await processKickoffJob ( job , token ) ;
if ( result . success ) {
try {
2024-11-07 20:57:33 +01:00
await job . moveToCompleted ( null , token , false ) ;
2024-12-27 19:59:26 +01:00
} catch ( e ) { }
} else {
logger . debug ( "Job failed" , { result , mode : job.data.mode } ) ;
await job . moveToFailed ( ( result as any ) . error , token , false ) ;
}
2024-11-07 20:57:33 +01:00
} else {
2024-12-27 19:59:26 +01:00
const result = await processJob ( job , token ) ;
if ( result . success ) {
try {
2024-12-30 21:43:59 -03:00
if (
job . data . crawl_id &&
process . env . USE_DB_AUTHENTICATION === "true"
) {
2024-12-27 19:59:26 +01:00
logger . debug (
"Job succeeded -- has crawl associated, putting null in Redis" ,
) ;
await job . moveToCompleted ( null , token , false ) ;
} else {
logger . debug ( "Job succeeded -- putting result in Redis" ) ;
await job . moveToCompleted ( result . document , token , false ) ;
}
} catch ( e ) { }
} else {
logger . debug ( "Job failed" , { result } ) ;
await job . moveToFailed ( ( result as any ) . error , token , false ) ;
}
2024-11-07 20:57:33 +01:00
}
2024-07-30 13:27:23 -04:00
} catch ( error ) {
2024-12-05 22:06:07 +01:00
logger . debug ( "Job failed" , { error } ) ;
2024-08-22 16:47:38 +02:00
Sentry . captureException ( error ) ;
err = error ;
2024-07-30 13:27:23 -04:00
await job . moveToFailed ( error , token , false ) ;
} finally {
2024-09-01 14:19:43 -03:00
await deleteJobPriority ( job . data . team_id , job . id ) ;
2024-07-30 13:27:23 -04:00
clearInterval ( extendLockInterval ) ;
}
2024-08-22 16:47:38 +02:00
return err ;
2024-07-30 13:27:23 -04:00
} ;
2025-01-10 18:35:10 -03:00
const processExtractJobInternal = async (
token : string ,
job : Job & { id : string } ,
) = > {
2025-01-03 20:44:27 -03:00
const logger = _logger . child ( {
module : "extract-worker" ,
method : "processJobInternal" ,
jobId : job.id ,
extractId : job.data.extractId ,
teamId : job.data?.teamId ? ? undefined ,
} ) ;
const extendLockInterval = setInterval ( async ( ) = > {
logger . info ( ` 🔄 Worker extending lock on job ${ job . id } ` ) ;
await job . extendLock ( token , jobLockExtensionTime ) ;
} , jobLockExtendInterval ) ;
try {
const result = await performExtraction ( job . data . extractId , {
request : job.data.request ,
teamId : job.data.teamId ,
plan : job.data.plan ,
subId : job.data.subId ,
} ) ;
if ( result . success ) {
// Move job to completed state in Redis
await job . moveToCompleted ( result , token , false ) ;
return result ;
} else {
2025-01-13 22:30:15 -03:00
// throw new Error(result.error || "Unknown error during extraction");
2025-01-22 18:47:44 -03:00
2025-01-13 22:30:15 -03:00
await job . moveToCompleted ( result , token , false ) ;
await updateExtract ( job . data . extractId , {
status : "failed" ,
2025-01-22 18:47:44 -03:00
error :
result.error ? ?
"Unknown error, please contact help@firecrawl.com. Extract id: " +
job . data . extractId ,
2025-01-13 22:30:15 -03:00
} ) ;
return result ;
2025-01-03 20:44:27 -03:00
}
} catch ( error ) {
logger . error ( ` 🚫 Job errored ${ job . id } - ${ error } ` , { error } ) ;
2025-01-10 18:35:10 -03:00
2025-01-03 20:44:27 -03:00
Sentry . captureException ( error , {
data : {
job : job.id ,
} ,
} ) ;
2025-01-10 18:35:10 -03:00
2025-01-22 17:26:32 -03:00
try {
// Move job to failed state in Redis
await job . moveToFailed ( error , token , false ) ;
} catch ( e ) {
logger . log ( "Failed to move job to failed state in Redis" , { error } ) ;
}
2025-01-07 16:21:51 -03:00
await updateExtract ( job . data . extractId , {
status : "failed" ,
2025-01-10 18:35:10 -03:00
error :
error.error ? ?
error ? ?
2025-01-13 22:30:15 -03:00
"Unknown error, please contact help@firecrawl.com. Extract id: " +
2025-01-10 18:35:10 -03:00
job . data . extractId ,
2025-01-07 16:21:51 -03:00
} ) ;
2025-01-22 18:47:44 -03:00
return {
success : false ,
error :
error.error ? ?
error ? ?
"Unknown error, please contact help@firecrawl.com. Extract id: " +
job . data . extractId ,
} ;
2025-01-07 12:13:12 -03:00
// throw error;
2025-01-03 20:44:27 -03:00
} finally {
clearInterval ( extendLockInterval ) ;
}
} ;
2025-02-19 12:44:21 -03:00
const processDeepResearchJobInternal = async (
token : string ,
job : Job & { id : string } ,
) = > {
const logger = _logger . child ( {
module : "deep-research-worker" ,
method : "processJobInternal" ,
jobId : job.id ,
researchId : job.data.researchId ,
teamId : job.data?.teamId ? ? undefined ,
} ) ;
const extendLockInterval = setInterval ( async ( ) = > {
logger . info ( ` 🔄 Worker extending lock on job ${ job . id } ` ) ;
await job . extendLock ( token , jobLockExtensionTime ) ;
} , jobLockExtendInterval ) ;
try {
console . log ( "[Deep Research] Starting deep research: " , job . data . researchId ) ;
const result = await performDeepResearch ( {
researchId : job.data.researchId ,
teamId : job.data.teamId ,
plan : job.data.plan ,
2025-03-02 17:33:21 -03:00
query : job.data.request.query ,
2025-02-19 12:44:21 -03:00
maxDepth : job.data.request.maxDepth ,
timeLimit : job.data.request.timeLimit ,
2025-02-19 15:21:52 -03:00
subId : job.data.subId ,
2025-02-27 13:24:40 -03:00
maxUrls : job.data.request.maxUrls ,
2025-03-18 15:51:40 -04:00
analysisPrompt : job.data.request.analysisPrompt ,
2025-03-24 12:13:52 -04:00
systemPrompt : job.data.request.systemPrompt ,
formats : job.data.request.formats ,
jsonOptions : job.data.request.jsonOptions ,
2025-02-19 12:44:21 -03:00
} ) ;
if ( result . success ) {
// Move job to completed state in Redis and update research status
await job . moveToCompleted ( result , token , false ) ;
return result ;
} else {
// If the deep research failed but didn't throw an error
const error = new Error ( "Deep research failed without specific error" ) ;
await updateDeepResearch ( job . data . researchId , {
status : "failed" ,
error : error.message ,
} ) ;
await job . moveToFailed ( error , token , false ) ;
return { success : false , error : error.message } ;
}
} catch ( error ) {
logger . error ( ` 🚫 Job errored ${ job . id } - ${ error } ` , { error } ) ;
Sentry . captureException ( error , {
data : {
job : job.id ,
} ,
} ) ;
try {
// Move job to failed state in Redis
await job . moveToFailed ( error , token , false ) ;
} catch ( e ) {
logger . error ( "Failed to move job to failed state in Redis" , { error } ) ;
}
await updateDeepResearch ( job . data . researchId , {
status : "failed" ,
error : error.message || "Unknown error occurred" ,
} ) ;
return { success : false , error : error.message || "Unknown error occurred" } ;
} finally {
clearInterval ( extendLockInterval ) ;
}
} ;
2025-02-19 12:42:33 -05:00
const processGenerateLlmsTxtJobInternal = async (
token : string ,
job : Job & { id : string } ,
) = > {
const logger = _logger . child ( {
module : "generate-llmstxt-worker" ,
method : "processJobInternal" ,
jobId : job.id ,
generateId : job.data.generateId ,
teamId : job.data?.teamId ? ? undefined ,
} ) ;
const extendLockInterval = setInterval ( async ( ) = > {
logger . info ( ` 🔄 Worker extending lock on job ${ job . id } ` ) ;
await job . extendLock ( token , jobLockExtensionTime ) ;
} , jobLockExtendInterval ) ;
try {
const result = await performGenerateLlmsTxt ( {
generationId : job.data.generationId ,
teamId : job.data.teamId ,
plan : job.data.plan ,
url : job.data.request.url ,
maxUrls : job.data.request.maxUrls ,
showFullText : job.data.request.showFullText ,
2025-02-19 15:21:52 -03:00
subId : job.data.subId ,
2025-02-19 12:42:33 -05:00
} ) ;
if ( result . success ) {
await job . moveToCompleted ( result , token , false ) ;
await updateGeneratedLlmsTxt ( job . data . generateId , {
status : "completed" ,
generatedText : result.data.generatedText ,
fullText : result.data.fullText ,
} ) ;
return result ;
} else {
const error = new Error ( "LLMs text generation failed without specific error" ) ;
await job . moveToFailed ( error , token , false ) ;
await updateGeneratedLlmsTxt ( job . data . generateId , {
status : "failed" ,
error : error.message ,
} ) ;
return { success : false , error : error.message } ;
}
} catch ( error ) {
logger . error ( ` 🚫 Job errored ${ job . id } - ${ error } ` , { error } ) ;
Sentry . captureException ( error , {
data : {
job : job.id ,
} ,
} ) ;
try {
await job . moveToFailed ( error , token , false ) ;
} catch ( e ) {
logger . error ( "Failed to move job to failed state in Redis" , { error } ) ;
}
await updateGeneratedLlmsTxt ( job . data . generateId , {
status : "failed" ,
error : error.message || "Unknown error occurred" ,
} ) ;
return { success : false , error : error.message || "Unknown error occurred" } ;
} finally {
clearInterval ( extendLockInterval ) ;
}
} ;
2024-07-30 13:27:23 -04:00
let isShuttingDown = false ;
2024-10-03 18:44:40 -03:00
process . on ( "SIGINT" , ( ) = > {
console . log ( "Received SIGTERM. Shutting down gracefully..." ) ;
isShuttingDown = true ;
} ) ;
2024-10-04 00:40:09 +03:00
process . on ( "SIGTERM" , ( ) = > {
console . log ( "Received SIGTERM. Shutting down gracefully..." ) ;
2024-07-30 13:27:23 -04:00
isShuttingDown = true ;
} ) ;
2024-11-08 20:19:44 +01:00
let cantAcceptConnectionCount = 0 ;
2024-09-01 14:19:43 -03:00
const workerFun = async (
2024-09-26 20:23:13 +02:00
queue : Queue ,
2024-12-11 19:51:08 -03:00
processJobInternal : ( token : string , job : Job ) = > Promise < any > ,
2024-09-01 14:19:43 -03:00
) = > {
2024-12-05 22:06:07 +01:00
const logger = _logger . child ( { module : "queue-worker" , method : "workerFun" } ) ;
2024-09-26 20:23:13 +02:00
const worker = new Worker ( queue . name , null , {
2024-07-30 13:27:23 -04:00
connection : redisConnection ,
lockDuration : 1 * 60 * 1000 , // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
stalledInterval : 30 * 1000 , // 30 seconds
2024-12-11 19:51:08 -03:00
maxStalledCount : 10 , // 10 times
2024-07-30 13:27:23 -04:00
} ) ;
worker . startStalledCheckTimer ( ) ;
const monitor = await systemMonitor ;
while ( true ) {
if ( isShuttingDown ) {
console . log ( "No longer accepting new jobs. SIGINT" ) ;
break ;
}
const token = uuidv4 ( ) ;
const canAcceptConnection = await monitor . acceptConnection ( ) ;
if ( ! canAcceptConnection ) {
2025-03-13 11:05:09 -07:00
console . log ( "Can't accept connection due to RAM/CPU load" ) ;
logger . info ( "Can't accept connection due to RAM/CPU load" ) ;
2024-11-08 20:19:44 +01:00
cantAcceptConnectionCount ++ ;
if ( cantAcceptConnectionCount >= 25 ) {
logger . error ( "WORKER STALLED" , {
cpuUsage : await monitor . checkCpuUsage ( ) ,
2024-12-11 19:51:08 -03:00
memoryUsage : await monitor . checkMemoryUsage ( ) ,
2024-11-08 20:19:44 +01:00
} ) ;
}
2024-07-30 13:27:23 -04:00
await sleep ( cantAcceptConnectionInterval ) ; // more sleep
continue ;
2024-11-08 20:19:44 +01:00
} else {
cantAcceptConnectionCount = 0 ;
2024-07-30 13:27:23 -04:00
}
const job = await worker . getNextJob ( token ) ;
if ( job ) {
2025-01-09 16:04:59 +01:00
if ( job . id ) {
runningJobs . add ( job . id ) ;
}
2024-10-25 20:21:12 +02:00
async function afterJobDone ( job : Job < any , any , string > ) {
2025-01-09 16:04:59 +01:00
if ( job . id ) {
runningJobs . delete ( job . id ) ;
}
2024-10-25 20:21:12 +02:00
if ( job . id && job . data && job . data . team_id && job . data . plan ) {
await removeConcurrencyLimitActiveJob ( job . data . team_id , job . id ) ;
cleanOldConcurrencyLimitEntries ( job . data . team_id ) ;
// Queue up next job, if it exists
// No need to check if we're under the limit here -- if the current job is finished,
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const nextJob = await takeConcurrencyLimitedJob ( job . data . team_id ) ;
if ( nextJob !== null ) {
2025-02-19 20:13:22 +01:00
await pushConcurrencyLimitActiveJob ( job . data . team_id , nextJob . id , 60 * 1000 ) ; // 60s initial timeout
2024-10-25 20:21:12 +02:00
2024-12-11 19:46:11 -03:00
await queue . add (
nextJob . id ,
{
. . . nextJob . data ,
2024-12-11 19:51:08 -03:00
concurrencyLimitHit : true ,
2024-12-11 19:46:11 -03:00
} ,
{
. . . nextJob . opts ,
jobId : nextJob.id ,
2024-12-11 19:51:08 -03:00
priority : nextJob.priority ,
} ,
2024-12-11 19:46:11 -03:00
) ;
2024-10-25 20:21:12 +02:00
}
2024-09-26 20:23:13 +02:00
}
}
2024-08-22 16:47:38 +02:00
if ( job . data && job . data . sentry && Sentry . isInitialized ( ) ) {
2024-09-01 14:19:43 -03:00
Sentry . continueTrace (
{
sentryTrace : job.data.sentry.trace ,
2024-12-11 19:51:08 -03:00
baggage : job.data.sentry.baggage ,
2024-09-01 14:19:43 -03:00
} ,
( ) = > {
Sentry . startSpan (
{
name : "Scrape job" ,
attributes : {
job : job.id ,
2024-12-11 19:51:08 -03:00
worker : process.env.FLY_MACHINE_ID ? ? worker . id ,
} ,
2024-09-01 14:19:43 -03:00
} ,
async ( span ) = > {
await Sentry . startSpan (
{
name : "Process scrape job" ,
op : "queue.process" ,
attributes : {
"messaging.message.id" : job . id ,
"messaging.destination.name" : getScrapeQueue ( ) . name ,
"messaging.message.body.size" : job . data . sentry . size ,
"messaging.message.receive.latency" :
Date . now ( ) - ( job . processedOn ? ? job . timestamp ) ,
2024-12-11 19:51:08 -03:00
"messaging.message.retry.count" : job . attemptsMade ,
} ,
2024-09-01 14:19:43 -03:00
} ,
async ( ) = > {
2024-09-26 20:23:13 +02:00
let res ;
try {
res = await processJobInternal ( token , job ) ;
2024-12-11 19:46:11 -03:00
} finally {
await afterJobDone ( job ) ;
2024-09-26 20:23:13 +02:00
}
2024-12-11 19:46:11 -03:00
2024-09-01 14:19:43 -03:00
if ( res !== null ) {
span . setStatus ( { code : 2 } ) ; // ERROR
} else {
span . setStatus ( { code : 1 } ) ; // OK
}
2024-12-11 19:51:08 -03:00
} ,
2024-09-01 14:19:43 -03:00
) ;
2024-12-11 19:51:08 -03:00
} ,
2024-09-01 14:19:43 -03:00
) ;
2024-12-11 19:51:08 -03:00
} ,
2024-09-01 14:19:43 -03:00
) ;
} else {
Sentry . startSpan (
{
2024-08-22 16:47:38 +02:00
name : "Scrape job" ,
attributes : {
job : job.id ,
2024-12-11 19:51:08 -03:00
worker : process.env.FLY_MACHINE_ID ? ? worker . id ,
} ,
2024-08-22 16:47:38 +02:00
} ,
2024-09-01 14:19:43 -03:00
( ) = > {
2024-12-11 19:46:11 -03:00
processJobInternal ( token , job ) . finally ( ( ) = > afterJobDone ( job ) ) ;
2024-12-11 19:51:08 -03:00
} ,
2024-09-01 14:19:43 -03:00
) ;
2024-08-22 16:47:38 +02:00
}
2024-09-01 14:19:43 -03:00
2024-07-30 13:27:23 -04:00
await sleep ( gotJobInterval ) ;
} else {
await sleep ( connectionMonitorInterval ) ;
}
}
} ;
2024-12-27 19:59:26 +01:00
async function processKickoffJob ( job : Job & { id : string } , token : string ) {
const logger = _logger . child ( {
module : "queue-worker" ,
method : "processKickoffJob" ,
jobId : job.id ,
scrapeId : job.id ,
crawlId : job.data?.crawl_id ? ? undefined ,
teamId : job.data?.team_id ? ? undefined ,
} ) ;
try {
const sc = ( await getCrawl ( job . data . crawl_id ) ) as StoredCrawl ;
const crawler = crawlToCrawler ( job . data . crawl_id , sc ) ;
2025-01-07 19:15:23 +01:00
logger . debug ( "Locking URL..." ) ;
await lockURL ( job . data . crawl_id , sc , job . data . url ) ;
const jobId = uuidv4 ( ) ;
logger . debug ( "Adding scrape job to Redis..." , { jobId } ) ;
await addScrapeJob (
{
url : job.data.url ,
mode : "single_urls" ,
team_id : job.data.team_id ,
crawlerOptions : job.data.crawlerOptions ,
scrapeOptions : scrapeOptions.parse ( job . data . scrapeOptions ) ,
internalOptions : sc.internalOptions ,
plan : job.data.plan ! ,
origin : job.data.origin ,
crawl_id : job.data.crawl_id ,
webhook : job.data.webhook ,
v1 : job.data.v1 ,
isCrawlSourceScrape : true ,
} ,
{
priority : 15 ,
} ,
jobId ,
) ;
logger . debug ( "Adding scrape job to BullMQ..." , { jobId } ) ;
await addCrawlJob ( job . data . crawl_id , jobId ) ;
if ( job . data . webhook ) {
logger . debug ( "Calling webhook with crawl.started..." , {
webhook : job.data.webhook ,
} ) ;
await callWebhook (
job . data . team_id ,
job . data . crawl_id ,
null ,
job . data . webhook ,
true ,
"crawl.started" ,
) ;
}
2024-12-27 19:59:26 +01:00
const sitemap = sc . crawlerOptions . ignoreSitemap
2024-12-30 21:43:59 -03:00
? 0
: await crawler . tryGetSitemap ( async ( urls ) = > {
if ( urls . length === 0 ) return ;
logger . debug ( "Using sitemap chunk of length " + urls . length , {
sitemapLength : urls.length ,
2024-12-27 19:59:26 +01:00
} ) ;
2024-12-30 21:43:59 -03:00
let jobPriority = await getJobPriority ( {
plan : job.data.plan ,
team_id : job.data.team_id ,
basePriority : 21 ,
} ) ;
logger . debug ( "Using job priority " + jobPriority , { jobPriority } ) ;
const jobs = urls . map ( ( url ) = > {
const uuid = uuidv4 ( ) ;
return {
name : uuid ,
data : {
url ,
mode : "single_urls" as const ,
team_id : job.data.team_id ,
plan : job.data.plan ! ,
crawlerOptions : job.data.crawlerOptions ,
scrapeOptions : job.data.scrapeOptions ,
internalOptions : sc.internalOptions ,
origin : job.data.origin ,
crawl_id : job.data.crawl_id ,
sitemapped : true ,
webhook : job.data.webhook ,
v1 : job.data.v1 ,
} ,
opts : {
jobId : uuid ,
priority : 20 ,
} ,
} ;
} ) ;
logger . debug ( "Locking URLs..." ) ;
2025-01-07 19:15:23 +01:00
const lockedIds = await lockURLsIndividually (
2024-12-30 21:43:59 -03:00
job . data . crawl_id ,
sc ,
2025-01-07 19:15:23 +01:00
jobs . map ( ( x ) = > ( { id : x.opts.jobId , url : x.data.url } ) ) ,
2024-12-30 21:43:59 -03:00
) ;
2025-01-10 18:35:10 -03:00
const lockedJobs = jobs . filter ( ( x ) = >
lockedIds . find ( ( y ) = > y . id === x . opts . jobId ) ,
) ;
2024-12-30 21:43:59 -03:00
logger . debug ( "Adding scrape jobs to Redis..." ) ;
await addCrawlJobs (
job . data . crawl_id ,
2025-01-07 19:15:23 +01:00
lockedJobs . map ( ( x ) = > x . opts . jobId ) ,
2024-12-30 21:43:59 -03:00
) ;
logger . debug ( "Adding scrape jobs to BullMQ..." ) ;
2025-01-07 19:15:23 +01:00
await addScrapeJobs ( lockedJobs ) ;
2024-12-30 21:43:59 -03:00
} ) ;
2024-12-27 19:59:26 +01:00
if ( sitemap === 0 ) {
logger . debug ( "Sitemap not found or ignored." , {
ignoreSitemap : sc.crawlerOptions.ignoreSitemap ,
} ) ;
}
2025-01-07 19:15:23 +01:00
logger . debug ( "Done queueing jobs!" ) ;
2024-12-30 21:43:59 -03:00
2025-01-17 16:04:01 +01:00
await finishCrawlKickoff ( job . data . crawl_id ) ;
await finishCrawlIfNeeded ( job , sc ) ;
2024-12-30 21:43:59 -03:00
return { success : true } ;
2024-12-27 19:59:26 +01:00
} catch ( error ) {
2024-12-30 21:43:59 -03:00
logger . error ( "An error occurred!" , { error } ) ;
2025-01-17 17:11:19 +01:00
await finishCrawlKickoff ( job . data . crawl_id ) ;
const sc = ( await getCrawl ( job . data . crawl_id ) ) as StoredCrawl ;
if ( sc ) {
await finishCrawlIfNeeded ( job , sc ) ;
}
2024-12-27 19:59:26 +01:00
return { success : false , error } ;
}
}
2024-12-30 21:43:59 -03:00
async function indexJob ( job : Job & { id : string } , document : Document ) {
if (
document &&
document . markdown &&
job . data . team_id === process . env . BACKGROUND_INDEX_TEAM_ID !
) {
2025-01-03 21:26:05 -03:00
// indexPage({
// document: document,
// originUrl: job.data.crawl_id
// ? (await getCrawl(job.data.crawl_id))?.originUrl!
// : document.metadata.sourceURL!,
// crawlId: job.data.crawl_id,
// teamId: job.data.team_id,
// }).catch((error) => {
// _logger.error("Error indexing page", { error });
// });
2024-12-30 21:43:59 -03:00
}
}
2024-11-07 20:57:33 +01:00
async function processJob ( job : Job & { id : string } , token : string ) {
2024-12-11 19:46:11 -03:00
const logger = _logger . child ( {
module : "queue-worker" ,
method : "processJob" ,
jobId : job.id ,
scrapeId : job.id ,
2024-12-11 19:51:08 -03:00
crawlId : job.data?.crawl_id ? ? undefined ,
2024-12-17 22:06:36 +01:00
teamId : job.data?.team_id ? ? undefined ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-12-09 23:40:44 +01:00
logger . info ( ` 🐂 Worker taking job ${ job . id } ` , { url : job.data.url } ) ;
2025-01-17 11:28:37 +01:00
const start = Date . now ( ) ;
2024-07-24 18:44:14 +02:00
2024-08-16 22:17:38 -04:00
// Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue
2024-12-15 02:52:06 -03:00
// if (
// job.data.url &&
// (job.data.url.includes("researchhub.com") ||
// job.data.url.includes("ebay.com"))
// ) {
// logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
// const data = {
// success: false,
// document: null,
// project_id: job.data.project_id,
// error:
// "URL is blocked. Suspecious activity detected. Please contact help@firecrawl.com if you believe this is an error.",
// };
// return data;
// }
2024-08-16 22:17:38 -04:00
2024-07-17 21:30:56 +02:00
try {
2024-07-30 13:27:23 -04:00
job . updateProgress ( {
2024-07-17 21:30:56 +02:00
current : 1 ,
total : 100 ,
current_step : "SCRAPING" ,
2024-12-11 19:51:08 -03:00
current_url : "" ,
2024-07-17 21:30:56 +02:00
} ) ;
2024-09-01 14:19:43 -03:00
2025-01-15 19:02:20 +01:00
if ( job . data . crawl_id ) {
const sc = ( await getCrawl ( job . data . crawl_id ) ) as StoredCrawl ;
if ( sc && sc . cancelled ) {
throw new Error ( "Parent crawl/batch scrape was cancelled" ) ;
}
}
2024-11-12 12:42:39 +01:00
const pipeline = await Promise . race ( [
startWebScraperPipeline ( {
job ,
2024-12-11 19:51:08 -03:00
token ,
2024-11-12 12:42:39 +01:00
} ) ,
2024-12-11 19:46:11 -03:00
. . . ( job . data . scrapeOptions . timeout !== undefined
? [
( async ( ) = > {
await sleep ( job . data . scrapeOptions . timeout ) ;
throw new Error ( "timeout" ) ;
2024-12-11 19:51:08 -03:00
} ) ( ) ,
2024-12-11 19:46:11 -03:00
]
2024-12-11 19:51:08 -03:00
: [ ] ) ,
2024-11-12 12:42:39 +01:00
] ) ;
2024-08-30 11:57:55 -03:00
2024-11-07 20:57:33 +01:00
if ( ! pipeline . success ) {
throw pipeline . error ;
2024-08-30 11:57:55 -03:00
}
2024-11-07 20:57:33 +01:00
2024-07-17 21:30:56 +02:00
const end = Date . now ( ) ;
const timeTakenInSeconds = ( end - start ) / 1000 ;
2024-04-20 19:37:45 -07:00
2024-11-07 20:57:33 +01:00
const doc = pipeline . document ;
const rawHtml = doc . rawHtml ? ? "" ;
2024-08-16 23:29:30 +02:00
2025-01-07 10:09:15 +01:00
if ( ! job . data . scrapeOptions . formats . includes ( "rawHtml" ) ) {
delete doc . rawHtml ;
}
2025-03-16 19:57:27 +01:00
if ( job . data . concurrencyLimited ) {
doc . warning = "This scrape job was throttled at your current concurrency limit. If you'd like to scrape faster, you can upgrade your plan." + ( doc . warning ? " " + doc . warning : "" ) ;
}
2024-07-17 21:30:56 +02:00
const data = {
2024-11-07 20:57:33 +01:00
success : true ,
2024-07-17 21:30:56 +02:00
result : {
2024-12-11 19:46:11 -03:00
links : [
{
content : doc ,
2024-12-11 19:51:08 -03:00
source : doc?.metadata?.sourceURL ? ? doc ? . metadata ? . url ? ? "" ,
2025-01-09 19:14:00 +01:00
id : job.id ,
2024-12-11 19:51:08 -03:00
} ,
] ,
2024-07-17 21:30:56 +02:00
} ,
project_id : job.data.project_id ,
2024-12-11 19:51:08 -03:00
document : doc ,
2024-07-17 21:30:56 +02:00
} ;
2024-04-15 17:01:47 -04:00
2024-09-01 13:44:36 -03:00
if ( job . data . webhook && job . data . mode !== "crawl" && job . data . v1 ) {
2024-12-11 19:46:11 -03:00
logger . debug ( "Calling webhook with success..." , {
2024-12-11 19:51:08 -03:00
webhook : job.data.webhook ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-09-01 15:06:36 -03:00
await callWebhook (
2024-09-01 14:19:43 -03:00
job . data . team_id ,
job . data . crawl_id ,
data ,
job . data . webhook ,
2024-09-01 15:06:36 -03:00
job . data . v1 ,
2024-10-23 21:55:21 +02:00
job . data . crawlerOptions !== null ? "crawl.page" : "batch_scrape.page" ,
2024-12-11 19:51:08 -03:00
true ,
2024-09-01 14:19:43 -03:00
) ;
2024-07-25 00:14:25 +02:00
}
2024-04-20 19:37:45 -07:00
2024-08-13 20:51:43 +02:00
if ( job . data . crawl_id ) {
2024-11-07 20:57:33 +01:00
const sc = ( await getCrawl ( job . data . crawl_id ) ) as StoredCrawl ;
2024-12-11 19:46:11 -03:00
if (
doc . metadata . url !== undefined &&
doc . metadata . sourceURL !== undefined &&
normalizeURL ( doc . metadata . url , sc ) !==
2025-01-04 16:59:35 +01:00
normalizeURL ( doc . metadata . sourceURL , sc ) &&
job . data . crawlerOptions !== null // only on crawls, don't care on batch scrape
2024-12-11 19:46:11 -03:00
) {
2024-12-30 14:41:31 +01:00
const crawler = crawlToCrawler ( job . data . crawl_id , sc ) ;
2024-12-30 21:43:59 -03:00
if (
2025-01-10 18:35:10 -03:00
crawler . filterURL ( doc . metadata . url , doc . metadata . sourceURL ) ===
null &&
2025-01-07 19:15:23 +01:00
! job . data . isCrawlSourceScrape
2024-12-30 21:43:59 -03:00
) {
2025-01-07 19:15:23 +01:00
throw new Error (
"Redirected target URL is not allowed by crawlOptions" ,
) ; // TODO: make this its own error type that is ignored by error tracking
}
2025-03-02 13:32:46 +01:00
// Only re-set originUrl if it's different from the current hostname
// This is only done on this condition to handle cross-domain redirects
// If this would be done for non-crossdomain redirects, but also for e.g.
// redirecting / -> /introduction (like our docs site does), it would
// break crawling the entire site without allowBackwardsCrawling - mogery
const isHostnameDifferent = normalizeUrlOnlyHostname ( doc . metadata . url ) !== normalizeUrlOnlyHostname ( doc . metadata . sourceURL ) ;
if ( job . data . isCrawlSourceScrape && isHostnameDifferent ) {
2025-01-07 19:15:23 +01:00
// TODO: re-fetch sitemap for redirect target domain
sc . originUrl = doc . metadata . url ;
await saveCrawl ( job . data . crawl_id , sc ) ;
2024-12-30 14:41:31 +01:00
}
if ( isUrlBlocked ( doc . metadata . url ) ) {
throw new Error ( BLOCKLISTED_URL_MESSAGE ) ; // TODO: make this its own error type that is ignored by error tracking
}
2024-12-30 21:43:59 -03:00
2024-12-10 22:34:26 +01:00
const p1 = generateURLPermutations ( normalizeURL ( doc . metadata . url , sc ) ) ;
2024-12-11 19:46:11 -03:00
const p2 = generateURLPermutations (
2024-12-11 19:51:08 -03:00
normalizeURL ( doc . metadata . sourceURL , sc ) ,
2024-12-11 19:46:11 -03:00
) ;
2024-12-10 22:34:26 +01:00
2024-12-15 20:16:29 +01:00
if ( JSON . stringify ( p1 ) !== JSON . stringify ( p2 ) ) {
logger . debug (
"Was redirected, removing old URL and locking new URL..." ,
{ oldUrl : doc.metadata.sourceURL , newUrl : doc.metadata.url } ,
) ;
// Prevent redirect target from being visited in the crawl again
// See lockURL
const x = await redisConnection . sadd (
"crawl:" + job . data . crawl_id + ":visited" ,
2024-12-17 16:58:57 -03:00
. . . p1 . map ( ( x ) = > x . href ) ,
2024-12-15 20:16:29 +01:00
) ;
const lockRes = x === p1 . length ;
2024-12-17 16:58:57 -03:00
2024-12-15 20:16:29 +01:00
if ( job . data . crawlerOptions !== null && ! lockRes ) {
throw new RacedRedirectError ( ) ;
}
2024-12-10 22:34:26 +01:00
}
2024-11-08 16:22:06 +01:00
}
2024-11-07 20:57:33 +01:00
2024-12-05 22:06:07 +01:00
logger . debug ( "Logging job to DB..." ) ;
2024-12-11 19:46:11 -03:00
await logJob (
{
job_id : job.id as string ,
success : true ,
num_docs : 1 ,
docs : [ doc ] ,
time_taken : timeTakenInSeconds ,
team_id : job.data.team_id ,
mode : job.data.mode ,
url : job.data.url ,
crawlerOptions : sc.crawlerOptions ,
scrapeOptions : job.data.scrapeOptions ,
origin : job.data.origin ,
2024-12-11 19:51:08 -03:00
crawl_id : job.data.crawl_id ,
2024-12-11 19:46:11 -03:00
} ,
2024-12-11 19:51:08 -03:00
true ,
2024-12-11 19:46:11 -03:00
) ;
2024-08-15 19:27:15 +02:00
2024-12-30 21:43:59 -03:00
indexJob ( job , doc ) ;
2024-12-05 22:06:07 +01:00
logger . debug ( "Declaring job as done..." ) ;
2024-12-10 22:34:26 +01:00
await addCrawlJobDone ( job . data . crawl_id , job . id , true ) ;
2024-08-13 22:10:17 +02:00
2024-11-20 19:47:58 +01:00
if ( job . data . crawlerOptions !== null ) {
2024-08-13 20:51:43 +02:00
if ( ! sc . cancelled ) {
2024-12-11 19:46:11 -03:00
const crawler = crawlToCrawler (
job . data . crawl_id ,
sc ,
2024-12-11 19:51:08 -03:00
doc . metadata . url ? ? doc . metadata . sourceURL ? ? sc . originUrl ! ,
2025-03-12 18:46:57 +01:00
job . data . crawlerOptions ,
2024-12-11 19:46:11 -03:00
) ;
2024-08-13 20:51:43 +02:00
2024-08-16 23:29:30 +02:00
const links = crawler . filterLinks (
2025-01-24 22:04:54 +01:00
await crawler . extractLinksFromHTML (
2024-12-11 19:46:11 -03:00
rawHtml ? ? "" ,
2024-12-11 19:51:08 -03:00
doc . metadata ? . url ? ? doc . metadata ? . sourceURL ? ? sc . originUrl ! ,
2024-12-11 19:46:11 -03:00
) ,
2024-08-13 20:51:43 +02:00
Infinity ,
2024-12-11 19:51:08 -03:00
sc . crawlerOptions ? . maxDepth ? ? 10 ,
2024-09-01 14:19:43 -03:00
) ;
2024-12-11 19:46:11 -03:00
logger . debug ( "Discovered " + links . length + " links..." , {
2024-12-11 19:51:08 -03:00
linksLength : links.length ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-09-01 14:19:43 -03:00
2024-08-13 20:51:43 +02:00
for ( const link of links ) {
if ( await lockURL ( job . data . crawl_id , sc , link ) ) {
2024-08-21 22:53:33 -03:00
// This seems to work really welel
2024-09-01 14:19:43 -03:00
const jobPriority = await getJobPriority ( {
plan : sc.plan as PlanType ,
team_id : sc.team_id ,
2024-12-11 19:51:08 -03:00
basePriority : job.data.crawl_id ? 20 : 10 ,
2024-09-01 14:19:43 -03:00
} ) ;
2024-08-21 22:20:40 -03:00
const jobId = uuidv4 ( ) ;
2024-12-11 19:46:11 -03:00
logger . debug (
"Determined job priority " +
jobPriority +
" for URL " +
JSON . stringify ( link ) ,
2024-12-11 19:51:08 -03:00
{ jobPriority , url : link } ,
2024-12-11 19:46:11 -03:00
) ;
2024-12-05 22:06:07 +01:00
2024-08-21 22:53:33 -03:00
// console.log("plan: ", sc.plan);
// console.log("team_id: ", sc.team_id)
// console.log("base priority: ", job.data.crawl_id ? 20 : 10)
// console.log("job priority: " , jobPriority, "\n\n\n")
2024-08-21 22:20:40 -03:00
2024-10-25 20:21:12 +02:00
await addScrapeJob (
2024-09-01 14:19:43 -03:00
{
url : link ,
mode : "single_urls" ,
team_id : sc.team_id ,
2024-11-07 20:57:33 +01:00
scrapeOptions : scrapeOptions.parse ( sc . scrapeOptions ) ,
internalOptions : sc.internalOptions ,
2025-03-12 18:46:57 +01:00
crawlerOptions : {
. . . sc . crawlerOptions ,
currentDiscoveryDepth : ( job . data . crawlerOptions ? . currentDiscoveryDepth ? ? 0 ) + 1 ,
} ,
2024-09-28 00:19:46 +02:00
plan : job.data.plan ,
2024-09-01 14:19:43 -03:00
origin : job.data.origin ,
crawl_id : job.data.crawl_id ,
2024-10-03 16:37:58 -03:00
webhook : job.data.webhook ,
2024-12-11 19:51:08 -03:00
v1 : job.data.v1 ,
2024-09-01 14:19:43 -03:00
} ,
{ } ,
jobId ,
2024-12-11 19:51:08 -03:00
jobPriority ,
2024-09-01 14:19:43 -03:00
) ;
2024-08-13 20:51:43 +02:00
2024-10-25 20:21:12 +02:00
await addCrawlJob ( job . data . crawl_id , jobId ) ;
2024-12-11 19:46:11 -03:00
logger . debug ( "Added job for URL " + JSON . stringify ( link ) , {
jobPriority ,
url : link ,
2024-12-11 19:51:08 -03:00
newJobId : jobId ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-12-05 22:06:07 +01:00
} else {
2025-01-03 22:15:23 -03:00
// TODO: removed this, ok? too many 'not useful' logs (?) Mogery!
// logger.debug("Could not lock URL " + JSON.stringify(link), {
// url: link,
// });
2024-08-13 20:51:43 +02:00
}
}
2025-03-06 17:05:15 +01:00
// Only run check after adding new jobs for discovery - mogery
if ( job . data . isCrawlSourceScrape && crawler . filterLinks ( [ doc . metadata . url ? ? doc . metadata . sourceURL ! ] , 1 , sc . crawlerOptions ? . maxDepth ? ? 10 ) . length === 0 ) {
throw new Error ( "Source URL is not allowed by includePaths/excludePaths rules" )
}
2024-08-13 20:51:43 +02:00
}
}
2024-08-13 22:10:17 +02:00
2024-11-26 16:28:45 +01:00
await finishCrawlIfNeeded ( job , sc ) ;
2024-12-30 21:43:59 -03:00
} else {
indexJob ( job , doc ) ;
2024-08-13 20:51:43 +02:00
}
2025-01-15 17:22:52 +01:00
if ( job . data . is_scrape !== true ) {
let creditsToBeBilled = 1 ; // Assuming 1 credit per document
if ( job . data . scrapeOptions . extract ) {
creditsToBeBilled = 5 ;
}
2025-02-27 16:18:03 -03:00
if ( job . data . team_id !== process . env . BACKGROUND_INDEX_TEAM_ID ! && process . env . USE_DB_AUTHENTICATION === "true" ) {
try {
const billingJobId = uuidv4 ( ) ;
logger . debug ( ` Adding billing job to queue for team ${ job . data . team_id } ` , {
billingJobId ,
credits : creditsToBeBilled ,
2025-03-02 20:51:42 +01:00
is_extract : false ,
2025-02-27 16:18:03 -03:00
} ) ;
// Add directly to the billing queue - the billing worker will handle the rest
await getBillingQueue ( ) . add (
"bill_team" ,
{
team_id : job.data.team_id ,
subscription_id : undefined ,
credits : creditsToBeBilled ,
2025-03-02 20:51:42 +01:00
is_extract : false ,
2025-02-27 16:18:03 -03:00
timestamp : new Date ( ) . toISOString ( ) ,
originating_job_id : job.id
} ,
{
jobId : billingJobId ,
priority : 10 ,
}
) ;
} catch ( error ) {
logger . error (
` Failed to add billing job to queue for team ${ job . data . team_id } for ${ creditsToBeBilled } credits ` ,
{ error } ,
) ;
Sentry . captureException ( error ) ;
}
2025-01-15 17:22:52 +01:00
}
}
2024-11-07 20:57:33 +01:00
logger . info ( ` 🐂 Job done ${ job . id } ` ) ;
2024-07-30 13:27:23 -04:00
return data ;
2024-07-17 21:30:56 +02:00
} catch ( error ) {
2025-01-17 17:46:29 +01:00
if ( job . data . crawl_id ) {
const sc = ( await getCrawl ( job . data . crawl_id ) ) as StoredCrawl ;
logger . debug ( "Declaring job as done..." ) ;
await addCrawlJobDone ( job . data . crawl_id , job . id , false ) ;
await redisConnection . srem (
"crawl:" + job . data . crawl_id + ":visited_unique" ,
normalizeURL ( job . data . url , sc ) ,
) ;
await finishCrawlIfNeeded ( job , sc ) ;
}
2025-01-22 18:47:44 -03:00
2024-12-11 19:46:11 -03:00
const isEarlyTimeout =
error instanceof Error && error . message === "timeout" ;
2025-01-15 19:02:20 +01:00
const isCancelled =
2025-01-22 18:47:44 -03:00
error instanceof Error &&
error . message === "Parent crawl/batch scrape was cancelled" ;
2024-04-15 17:01:47 -04:00
2024-12-10 22:34:26 +01:00
if ( isEarlyTimeout ) {
logger . error ( ` 🐂 Job timed out ${ job . id } ` ) ;
} else if ( error instanceof RacedRedirectError ) {
logger . warn ( ` 🐂 Job got redirect raced ${ job . id } , silently failing ` ) ;
2025-01-15 19:02:20 +01:00
} else if ( isCancelled ) {
logger . warn ( ` 🐂 Job got cancelled, silently failing ` ) ;
2024-12-10 22:34:26 +01:00
} else {
2024-12-05 22:06:07 +01:00
logger . error ( ` 🐂 Job errored ${ job . id } - ${ error } ` , { error } ) ;
2024-11-12 12:42:39 +01:00
Sentry . captureException ( error , {
data : {
2024-12-11 19:51:08 -03:00
job : job.id ,
} ,
2024-11-12 12:42:39 +01:00
} ) ;
2024-11-12 18:10:11 +01:00
if ( error instanceof CustomError ) {
// Here we handle the error, then save the failed job
logger . error ( error . message ) ; // or any other error handling
}
logger . error ( error ) ;
if ( error . stack ) {
logger . error ( error . stack ) ;
}
2024-11-12 12:42:39 +01:00
}
2024-08-21 17:58:27 +02:00
2024-07-17 21:30:56 +02:00
const data = {
success : false ,
2024-11-07 20:57:33 +01:00
document : null ,
2024-07-17 21:30:56 +02:00
project_id : job.data.project_id ,
2024-12-11 19:46:11 -03:00
error :
error instanceof Error
? error
: typeof error === "string"
? new Error ( error )
2024-12-11 19:51:08 -03:00
: new Error ( JSON . stringify ( error ) ) ,
2024-07-17 21:30:56 +02:00
} ;
2024-08-30 16:38:55 -03:00
if ( ! job . data . v1 && ( job . data . mode === "crawl" || job . data . crawl_id ) ) {
2024-09-01 14:19:43 -03:00
callWebhook (
job . data . team_id ,
job . data . crawl_id ? ? ( job . id as string ) ,
data ,
job . data . webhook ,
2024-10-23 21:55:21 +02:00
job . data . v1 ,
2024-12-11 19:51:08 -03:00
job . data . crawlerOptions !== null ? "crawl.page" : "batch_scrape.page" ,
2024-09-01 14:19:43 -03:00
) ;
2024-09-01 13:44:36 -03:00
}
2025-01-09 18:48:47 +01:00
2025-01-17 11:28:37 +01:00
const end = Date . now ( ) ;
const timeTakenInSeconds = ( end - start ) / 1000 ;
2025-01-09 18:48:47 +01:00
logger . debug ( "Logging job to DB..." ) ;
await logJob (
{
job_id : job.id as string ,
success : false ,
message :
typeof error === "string"
? error
: ( error . message ? ?
"Something went wrong... Contact help@mendable.ai" ) ,
num_docs : 0 ,
docs : [ ] ,
2025-01-17 11:28:37 +01:00
time_taken : timeTakenInSeconds ,
2025-01-09 18:48:47 +01:00
team_id : job.data.team_id ,
mode : job.data.mode ,
url : job.data.url ,
crawlerOptions : job.data.crawlerOptions ,
scrapeOptions : job.data.scrapeOptions ,
origin : job.data.origin ,
crawl_id : job.data.crawl_id ,
} ,
true ,
) ;
2024-07-30 13:27:23 -04:00
return data ;
2024-04-15 17:01:47 -04:00
}
2024-07-17 21:30:56 +02:00
}
2024-07-30 13:27:23 -04:00
// wsq.process(
// Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
// processJob
// );
// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
2025-01-09 16:04:59 +01:00
2025-02-19 12:44:21 -03:00
// Start all workers
2025-01-09 16:04:59 +01:00
( async ( ) = > {
await Promise . all ( [
workerFun ( getScrapeQueue ( ) , processJobInternal ) ,
workerFun ( getExtractQueue ( ) , processExtractJobInternal ) ,
2025-02-19 12:44:21 -03:00
workerFun ( getDeepResearchQueue ( ) , processDeepResearchJobInternal ) ,
2025-02-19 12:42:33 -05:00
workerFun ( getGenerateLlmsTxtQueue ( ) , processGenerateLlmsTxtJobInternal ) ,
2025-01-09 16:04:59 +01:00
] ) ;
console . log ( "All workers exited. Waiting for all jobs to finish..." ) ;
while ( runningJobs . size > 0 ) {
2025-01-10 18:35:10 -03:00
await new Promise ( ( resolve ) = > setTimeout ( resolve , 500 ) ) ;
2025-01-09 16:04:59 +01:00
}
2025-01-16 16:29:52 +01:00
console . log ( "All jobs finished. Worker out!" ) ;
2025-01-09 16:04:59 +01:00
process . exit ( 0 ) ;
2025-01-10 18:35:10 -03:00
} ) ( ) ;