2024-08-20 19:25:19 +02:00
import "dotenv/config" ;
2024-09-01 14:19:43 -03:00
import "./sentry" ;
2024-08-21 17:58:27 +02:00
import * as Sentry from "@sentry/node" ;
2024-04-15 17:01:47 -04:00
import { CustomError } from "../lib/custom-error" ;
2024-07-30 14:44:13 -04:00
import {
getScrapeQueue ,
redisConnection ,
2024-12-11 19:51:08 -03:00
scrapeQueueName ,
2024-07-30 14:44:13 -04:00
} from "./queue-service" ;
2024-04-15 17:01:47 -04:00
import { startWebScraperPipeline } from "../main/runWebScraper" ;
import { callWebhook } from "./webhook" ;
2024-04-20 13:53:11 -07:00
import { logJob } from "./logging/log_job" ;
2024-09-26 20:23:13 +02:00
import { Job , Queue } from "bullmq" ;
2024-12-05 22:06:07 +01:00
import { logger as _logger } from "../lib/logger" ;
2024-07-30 13:27:23 -04:00
import { Worker } from "bullmq" ;
import systemMonitor from "./system-monitor" ;
import { v4 as uuidv4 } from "uuid" ;
2024-09-01 14:19:43 -03:00
import {
addCrawlJob ,
addCrawlJobDone ,
crawlToCrawler ,
finishCrawl ,
2024-12-03 17:53:17 -03:00
generateURLPermutations ,
2024-09-01 14:19:43 -03:00
getCrawl ,
getCrawlJobs ,
lockURL ,
2024-12-11 19:51:08 -03:00
normalizeURL ,
2024-09-01 14:19:43 -03:00
} from "../lib/crawl-redis" ;
2024-08-13 20:51:43 +02:00
import { StoredCrawl } from "../lib/crawl-redis" ;
import { addScrapeJob } from "./queue-jobs" ;
2024-09-01 14:19:43 -03:00
import {
addJobPriority ,
deleteJobPriority ,
2024-12-11 19:51:08 -03:00
getJobPriority ,
2024-09-01 14:19:43 -03:00
} from "../../src/lib/job-priority" ;
2024-09-28 00:19:46 +02:00
import { PlanType , RateLimiterMode } from "../types" ;
2024-11-07 20:57:33 +01:00
import { getJobs } from "..//controllers/v1/crawl-status" ;
2024-09-04 15:57:57 -03:00
import { configDotenv } from "dotenv" ;
2024-11-07 20:57:33 +01:00
import { scrapeOptions } from "../controllers/v1/types" ;
2024-09-28 00:19:46 +02:00
import { getRateLimiterPoints } from "./rate-limiter" ;
2024-12-11 19:46:11 -03:00
import {
cleanOldConcurrencyLimitEntries ,
pushConcurrencyLimitActiveJob ,
removeConcurrencyLimitActiveJob ,
2024-12-11 19:51:08 -03:00
takeConcurrencyLimitedJob ,
2024-12-11 19:46:11 -03:00
} from "../lib/concurrency-limit" ;
2024-09-04 15:57:57 -03:00
configDotenv ( ) ;
2024-05-20 13:36:34 -07:00
2024-12-10 22:34:26 +01:00
class RacedRedirectError extends Error {
constructor ( ) {
2024-12-11 19:46:11 -03:00
super ( "Raced redirect error" ) ;
2024-12-10 22:34:26 +01:00
}
}
2024-07-30 13:27:23 -04:00
const sleep = ( ms : number ) = > new Promise ( ( resolve ) = > setTimeout ( resolve , ms ) ) ;
2024-04-15 17:01:47 -04:00
2024-07-30 13:27:23 -04:00
const workerLockDuration = Number ( process . env . WORKER_LOCK_DURATION ) || 60000 ;
const workerStalledCheckInterval =
Number ( process . env . WORKER_STALLED_CHECK_INTERVAL ) || 30000 ;
const jobLockExtendInterval =
Number ( process . env . JOB_LOCK_EXTEND_INTERVAL ) || 15000 ;
const jobLockExtensionTime =
2024-08-07 20:24:16 +02:00
Number ( process . env . JOB_LOCK_EXTENSION_TIME ) || 60000 ;
2024-07-30 13:27:23 -04:00
const cantAcceptConnectionInterval =
Number ( process . env . CANT_ACCEPT_CONNECTION_INTERVAL ) || 2000 ;
const connectionMonitorInterval =
Number ( process . env . CONNECTION_MONITOR_INTERVAL ) || 10 ;
const gotJobInterval = Number ( process . env . CONNECTION_MONITOR_INTERVAL ) || 20 ;
2024-07-11 20:08:21 +02:00
2024-12-11 19:46:11 -03:00
async function finishCrawlIfNeeded ( job : Job & { id : string } , sc : StoredCrawl ) {
2024-11-26 16:28:45 +01:00
if ( await finishCrawl ( job . data . crawl_id ) ) {
if ( ! job . data . v1 ) {
const jobIDs = await getCrawlJobs ( job . data . crawl_id ) ;
2024-12-11 19:46:11 -03:00
const jobs = ( await getJobs ( jobIDs ) ) . sort (
2024-12-11 19:51:08 -03:00
( a , b ) = > a . timestamp - b . timestamp ,
2024-12-11 19:46:11 -03:00
) ;
2024-11-26 16:28:45 +01:00
// const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
2024-12-11 19:46:11 -03:00
const jobStatus = sc . cancelled // || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed" ;
const fullDocs = jobs
. map ( ( x ) = >
x . returnvalue
? Array . isArray ( x . returnvalue )
? x . returnvalue [ 0 ]
: x.returnvalue
2024-12-11 19:51:08 -03:00
: null ,
2024-12-11 19:46:11 -03:00
)
. filter ( ( x ) = > x !== null ) ;
2024-11-26 16:28:45 +01:00
await logJob ( {
job_id : job.data.crawl_id ,
success : jobStatus === "completed" ,
message : sc.cancelled ? "Cancelled" : undefined ,
num_docs : fullDocs.length ,
docs : [ ] ,
time_taken : ( Date . now ( ) - sc . createdAt ) / 1000 ,
team_id : job.data.team_id ,
mode : job.data.crawlerOptions !== null ? "crawl" : "batch_scrape" ,
url : sc.originUrl ! ,
scrapeOptions : sc.scrapeOptions ,
crawlerOptions : sc.crawlerOptions ,
2024-12-11 19:51:08 -03:00
origin : job.data.origin ,
2024-11-26 16:28:45 +01:00
} ) ;
const data = {
success : jobStatus !== "failed" ,
result : {
links : fullDocs.map ( ( doc ) = > {
return {
content : doc ,
2024-12-11 19:51:08 -03:00
source : doc?.metadata?.sourceURL ? ? doc ? . url ? ? "" ,
2024-11-26 16:28:45 +01:00
} ;
2024-12-11 19:51:08 -03:00
} ) ,
2024-11-26 16:28:45 +01:00
} ,
project_id : job.data.project_id ,
2024-12-11 19:51:08 -03:00
docs : fullDocs ,
2024-11-26 16:28:45 +01:00
} ;
// v0 web hooks, call when done with all the data
if ( ! job . data . v1 ) {
callWebhook (
job . data . team_id ,
job . data . crawl_id ,
data ,
job . data . webhook ,
job . data . v1 ,
2024-12-11 19:46:11 -03:00
job . data . crawlerOptions !== null
? "crawl.completed"
2024-12-11 19:51:08 -03:00
: "batch_scrape.completed" ,
2024-11-26 16:28:45 +01:00
) ;
}
} else {
const jobIDs = await getCrawlJobs ( job . data . crawl_id ) ;
2024-12-11 19:46:11 -03:00
const jobStatus = sc . cancelled ? "failed" : "completed" ;
2024-11-26 16:28:45 +01:00
// v1 web hooks, call when done with no data, but with event completed
if ( job . data . v1 && job . data . webhook ) {
callWebhook (
job . data . team_id ,
job . data . crawl_id ,
[ ] ,
job . data . webhook ,
job . data . v1 ,
2024-12-11 19:46:11 -03:00
job . data . crawlerOptions !== null
? "crawl.completed"
2024-12-11 19:51:08 -03:00
: "batch_scrape.completed" ,
2024-12-11 19:46:11 -03:00
) ;
}
2024-11-26 16:28:45 +01:00
2024-12-11 19:46:11 -03:00
await logJob (
{
job_id : job.data.crawl_id ,
success : jobStatus === "completed" ,
message : sc.cancelled ? "Cancelled" : undefined ,
num_docs : jobIDs.length ,
docs : [ ] ,
time_taken : ( Date . now ( ) - sc . createdAt ) / 1000 ,
team_id : job.data.team_id ,
scrapeOptions : sc.scrapeOptions ,
mode : job.data.crawlerOptions !== null ? "crawl" : "batch_scrape" ,
url :
sc?.originUrl ? ?
( job . data . crawlerOptions === null ? "Batch Scrape" : "Unknown" ) ,
crawlerOptions : sc.crawlerOptions ,
2024-12-11 19:51:08 -03:00
origin : job.data.origin ,
2024-12-11 19:46:11 -03:00
} ,
2024-12-11 19:51:08 -03:00
true ,
2024-12-11 19:46:11 -03:00
) ;
2024-11-26 16:28:45 +01:00
}
}
}
2024-11-07 20:57:33 +01:00
const processJobInternal = async ( token : string , job : Job & { id : string } ) = > {
2024-12-11 19:46:11 -03:00
const logger = _logger . child ( {
module : "queue-worker" ,
method : "processJobInternal" ,
jobId : job.id ,
scrapeId : job.id ,
2024-12-11 19:51:08 -03:00
crawlId : job.data?.crawl_id ? ? undefined ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-12-05 22:06:07 +01:00
2024-07-30 13:27:23 -04:00
const extendLockInterval = setInterval ( async ( ) = > {
2024-11-07 20:57:33 +01:00
logger . info ( ` 🐂 Worker extending lock on job ${ job . id } ` ) ;
2024-07-30 13:27:23 -04:00
await job . extendLock ( token , jobLockExtensionTime ) ;
} , jobLockExtendInterval ) ;
2024-09-01 14:19:43 -03:00
await addJobPriority ( job . data . team_id , job . id ) ;
2024-08-22 16:47:38 +02:00
let err = null ;
2024-07-30 13:27:23 -04:00
try {
const result = await processJob ( job , token ) ;
2024-11-07 20:57:33 +01:00
if ( result . success ) {
try {
if ( job . data . crawl_id && process . env . USE_DB_AUTHENTICATION === "true" ) {
2024-12-11 19:46:11 -03:00
logger . debug (
2024-12-11 19:51:08 -03:00
"Job succeeded -- has crawl associated, putting null in Redis" ,
2024-12-11 19:46:11 -03:00
) ;
2024-11-07 20:57:33 +01:00
await job . moveToCompleted ( null , token , false ) ;
} else {
2024-12-05 22:06:07 +01:00
logger . debug ( "Job succeeded -- putting result in Redis" ) ;
2024-11-07 20:57:33 +01:00
await job . moveToCompleted ( result . document , token , false ) ;
}
} catch ( e ) { }
} else {
2024-12-05 22:06:07 +01:00
logger . debug ( "Job failed" , { result } ) ;
2024-11-07 20:57:33 +01:00
await job . moveToFailed ( ( result as any ) . error , token , false ) ;
}
2024-07-30 13:27:23 -04:00
} catch ( error ) {
2024-12-05 22:06:07 +01:00
logger . debug ( "Job failed" , { error } ) ;
2024-08-22 16:47:38 +02:00
Sentry . captureException ( error ) ;
err = error ;
2024-07-30 13:27:23 -04:00
await job . moveToFailed ( error , token , false ) ;
} finally {
2024-09-01 14:19:43 -03:00
await deleteJobPriority ( job . data . team_id , job . id ) ;
2024-07-30 13:27:23 -04:00
clearInterval ( extendLockInterval ) ;
}
2024-08-22 16:47:38 +02:00
return err ;
2024-07-30 13:27:23 -04:00
} ;
let isShuttingDown = false ;
2024-10-03 18:44:40 -03:00
process . on ( "SIGINT" , ( ) = > {
console . log ( "Received SIGTERM. Shutting down gracefully..." ) ;
isShuttingDown = true ;
} ) ;
2024-10-04 00:40:09 +03:00
process . on ( "SIGTERM" , ( ) = > {
console . log ( "Received SIGTERM. Shutting down gracefully..." ) ;
2024-07-30 13:27:23 -04:00
isShuttingDown = true ;
} ) ;
2024-11-08 20:19:44 +01:00
let cantAcceptConnectionCount = 0 ;
2024-09-01 14:19:43 -03:00
const workerFun = async (
2024-09-26 20:23:13 +02:00
queue : Queue ,
2024-12-11 19:51:08 -03:00
processJobInternal : ( token : string , job : Job ) = > Promise < any > ,
2024-09-01 14:19:43 -03:00
) = > {
2024-12-05 22:06:07 +01:00
const logger = _logger . child ( { module : "queue-worker" , method : "workerFun" } ) ;
2024-09-26 20:23:13 +02:00
const worker = new Worker ( queue . name , null , {
2024-07-30 13:27:23 -04:00
connection : redisConnection ,
lockDuration : 1 * 60 * 1000 , // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
stalledInterval : 30 * 1000 , // 30 seconds
2024-12-11 19:51:08 -03:00
maxStalledCount : 10 , // 10 times
2024-07-30 13:27:23 -04:00
} ) ;
worker . startStalledCheckTimer ( ) ;
const monitor = await systemMonitor ;
while ( true ) {
if ( isShuttingDown ) {
console . log ( "No longer accepting new jobs. SIGINT" ) ;
break ;
}
const token = uuidv4 ( ) ;
const canAcceptConnection = await monitor . acceptConnection ( ) ;
if ( ! canAcceptConnection ) {
console . log ( "Cant accept connection" ) ;
2024-11-08 20:19:44 +01:00
cantAcceptConnectionCount ++ ;
if ( cantAcceptConnectionCount >= 25 ) {
logger . error ( "WORKER STALLED" , {
cpuUsage : await monitor . checkCpuUsage ( ) ,
2024-12-11 19:51:08 -03:00
memoryUsage : await monitor . checkMemoryUsage ( ) ,
2024-11-08 20:19:44 +01:00
} ) ;
}
2024-07-30 13:27:23 -04:00
await sleep ( cantAcceptConnectionInterval ) ; // more sleep
continue ;
2024-11-08 20:19:44 +01:00
} else {
cantAcceptConnectionCount = 0 ;
2024-07-30 13:27:23 -04:00
}
const job = await worker . getNextJob ( token ) ;
if ( job ) {
2024-10-25 20:21:12 +02:00
async function afterJobDone ( job : Job < any , any , string > ) {
if ( job . id && job . data && job . data . team_id && job . data . plan ) {
await removeConcurrencyLimitActiveJob ( job . data . team_id , job . id ) ;
cleanOldConcurrencyLimitEntries ( job . data . team_id ) ;
// Queue up next job, if it exists
// No need to check if we're under the limit here -- if the current job is finished,
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const nextJob = await takeConcurrencyLimitedJob ( job . data . team_id ) ;
if ( nextJob !== null ) {
await pushConcurrencyLimitActiveJob ( job . data . team_id , nextJob . id ) ;
2024-12-11 19:46:11 -03:00
await queue . add (
nextJob . id ,
{
. . . nextJob . data ,
2024-12-11 19:51:08 -03:00
concurrencyLimitHit : true ,
2024-12-11 19:46:11 -03:00
} ,
{
. . . nextJob . opts ,
jobId : nextJob.id ,
2024-12-11 19:51:08 -03:00
priority : nextJob.priority ,
} ,
2024-12-11 19:46:11 -03:00
) ;
2024-10-25 20:21:12 +02:00
}
2024-09-26 20:23:13 +02:00
}
}
2024-08-22 16:47:38 +02:00
if ( job . data && job . data . sentry && Sentry . isInitialized ( ) ) {
2024-09-01 14:19:43 -03:00
Sentry . continueTrace (
{
sentryTrace : job.data.sentry.trace ,
2024-12-11 19:51:08 -03:00
baggage : job.data.sentry.baggage ,
2024-09-01 14:19:43 -03:00
} ,
( ) = > {
Sentry . startSpan (
{
name : "Scrape job" ,
attributes : {
job : job.id ,
2024-12-11 19:51:08 -03:00
worker : process.env.FLY_MACHINE_ID ? ? worker . id ,
} ,
2024-09-01 14:19:43 -03:00
} ,
async ( span ) = > {
await Sentry . startSpan (
{
name : "Process scrape job" ,
op : "queue.process" ,
attributes : {
"messaging.message.id" : job . id ,
"messaging.destination.name" : getScrapeQueue ( ) . name ,
"messaging.message.body.size" : job . data . sentry . size ,
"messaging.message.receive.latency" :
Date . now ( ) - ( job . processedOn ? ? job . timestamp ) ,
2024-12-11 19:51:08 -03:00
"messaging.message.retry.count" : job . attemptsMade ,
} ,
2024-09-01 14:19:43 -03:00
} ,
async ( ) = > {
2024-09-26 20:23:13 +02:00
let res ;
try {
res = await processJobInternal ( token , job ) ;
2024-12-11 19:46:11 -03:00
} finally {
await afterJobDone ( job ) ;
2024-09-26 20:23:13 +02:00
}
2024-12-11 19:46:11 -03:00
2024-09-01 14:19:43 -03:00
if ( res !== null ) {
span . setStatus ( { code : 2 } ) ; // ERROR
} else {
span . setStatus ( { code : 1 } ) ; // OK
}
2024-12-11 19:51:08 -03:00
} ,
2024-09-01 14:19:43 -03:00
) ;
2024-12-11 19:51:08 -03:00
} ,
2024-09-01 14:19:43 -03:00
) ;
2024-12-11 19:51:08 -03:00
} ,
2024-09-01 14:19:43 -03:00
) ;
} else {
Sentry . startSpan (
{
2024-08-22 16:47:38 +02:00
name : "Scrape job" ,
attributes : {
job : job.id ,
2024-12-11 19:51:08 -03:00
worker : process.env.FLY_MACHINE_ID ? ? worker . id ,
} ,
2024-08-22 16:47:38 +02:00
} ,
2024-09-01 14:19:43 -03:00
( ) = > {
2024-12-11 19:46:11 -03:00
processJobInternal ( token , job ) . finally ( ( ) = > afterJobDone ( job ) ) ;
2024-12-11 19:51:08 -03:00
} ,
2024-09-01 14:19:43 -03:00
) ;
2024-08-22 16:47:38 +02:00
}
2024-09-01 14:19:43 -03:00
2024-07-30 13:27:23 -04:00
await sleep ( gotJobInterval ) ;
} else {
await sleep ( connectionMonitorInterval ) ;
}
}
} ;
2024-09-26 20:23:13 +02:00
workerFun ( getScrapeQueue ( ) , processJobInternal ) ;
2024-07-30 13:27:23 -04:00
2024-11-07 20:57:33 +01:00
async function processJob ( job : Job & { id : string } , token : string ) {
2024-12-11 19:46:11 -03:00
const logger = _logger . child ( {
module : "queue-worker" ,
method : "processJob" ,
jobId : job.id ,
scrapeId : job.id ,
2024-12-11 19:51:08 -03:00
crawlId : job.data?.crawl_id ? ? undefined ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-12-09 23:40:44 +01:00
logger . info ( ` 🐂 Worker taking job ${ job . id } ` , { url : job.data.url } ) ;
2024-07-24 18:44:14 +02:00
2024-08-16 22:17:38 -04:00
// Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue
2024-12-15 02:52:06 -03:00
// if (
// job.data.url &&
// (job.data.url.includes("researchhub.com") ||
// job.data.url.includes("ebay.com"))
// ) {
// logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
// const data = {
// success: false,
// document: null,
// project_id: job.data.project_id,
// error:
// "URL is blocked. Suspecious activity detected. Please contact help@firecrawl.com if you believe this is an error.",
// };
// return data;
// }
2024-08-16 22:17:38 -04:00
2024-07-17 21:30:56 +02:00
try {
2024-07-30 13:27:23 -04:00
job . updateProgress ( {
2024-07-17 21:30:56 +02:00
current : 1 ,
total : 100 ,
current_step : "SCRAPING" ,
2024-12-11 19:51:08 -03:00
current_url : "" ,
2024-07-17 21:30:56 +02:00
} ) ;
const start = Date . now ( ) ;
2024-09-01 14:19:43 -03:00
2024-11-12 12:42:39 +01:00
const pipeline = await Promise . race ( [
startWebScraperPipeline ( {
job ,
2024-12-11 19:51:08 -03:00
token ,
2024-11-12 12:42:39 +01:00
} ) ,
2024-12-11 19:46:11 -03:00
. . . ( job . data . scrapeOptions . timeout !== undefined
? [
( async ( ) = > {
await sleep ( job . data . scrapeOptions . timeout ) ;
throw new Error ( "timeout" ) ;
2024-12-11 19:51:08 -03:00
} ) ( ) ,
2024-12-11 19:46:11 -03:00
]
2024-12-11 19:51:08 -03:00
: [ ] ) ,
2024-11-12 12:42:39 +01:00
] ) ;
2024-08-30 11:57:55 -03:00
2024-11-07 20:57:33 +01:00
if ( ! pipeline . success ) {
// TODO: let's Not do this
throw pipeline . error ;
2024-08-30 11:57:55 -03:00
}
2024-11-07 20:57:33 +01:00
2024-07-17 21:30:56 +02:00
const end = Date . now ( ) ;
const timeTakenInSeconds = ( end - start ) / 1000 ;
2024-04-20 19:37:45 -07:00
2024-11-07 20:57:33 +01:00
const doc = pipeline . document ;
const rawHtml = doc . rawHtml ? ? "" ;
2024-08-16 23:29:30 +02:00
2024-07-17 21:30:56 +02:00
const data = {
2024-11-07 20:57:33 +01:00
success : true ,
2024-07-17 21:30:56 +02:00
result : {
2024-12-11 19:46:11 -03:00
links : [
{
content : doc ,
2024-12-11 19:51:08 -03:00
source : doc?.metadata?.sourceURL ? ? doc ? . metadata ? . url ? ? "" ,
} ,
] ,
2024-07-17 21:30:56 +02:00
} ,
project_id : job.data.project_id ,
2024-12-11 19:51:08 -03:00
document : doc ,
2024-07-17 21:30:56 +02:00
} ;
2024-04-15 17:01:47 -04:00
2024-09-01 13:44:36 -03:00
if ( job . data . webhook && job . data . mode !== "crawl" && job . data . v1 ) {
2024-12-11 19:46:11 -03:00
logger . debug ( "Calling webhook with success..." , {
2024-12-11 19:51:08 -03:00
webhook : job.data.webhook ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-09-01 15:06:36 -03:00
await callWebhook (
2024-09-01 14:19:43 -03:00
job . data . team_id ,
job . data . crawl_id ,
data ,
job . data . webhook ,
2024-09-01 15:06:36 -03:00
job . data . v1 ,
2024-10-23 21:55:21 +02:00
job . data . crawlerOptions !== null ? "crawl.page" : "batch_scrape.page" ,
2024-12-11 19:51:08 -03:00
true ,
2024-09-01 14:19:43 -03:00
) ;
2024-07-25 00:14:25 +02:00
}
2024-04-20 19:37:45 -07:00
2024-08-13 20:51:43 +02:00
if ( job . data . crawl_id ) {
2024-11-07 20:57:33 +01:00
const sc = ( await getCrawl ( job . data . crawl_id ) ) as StoredCrawl ;
2024-12-11 19:46:11 -03:00
if (
doc . metadata . url !== undefined &&
doc . metadata . sourceURL !== undefined &&
normalizeURL ( doc . metadata . url , sc ) !==
normalizeURL ( doc . metadata . sourceURL , sc )
) {
logger . debug (
"Was redirected, removing old URL and locking new URL..." ,
2024-12-11 19:51:08 -03:00
{ oldUrl : doc.metadata.sourceURL , newUrl : doc.metadata.url } ,
2024-12-11 19:46:11 -03:00
) ;
2024-12-04 12:56:47 -03:00
// Remove the old URL from visited unique due to checking for limit
// Do not remove from :visited otherwise it will keep crawling the original URL (sourceURL)
2024-12-11 19:46:11 -03:00
await redisConnection . srem (
"crawl:" + job . data . crawl_id + ":visited_unique" ,
2024-12-11 19:51:08 -03:00
normalizeURL ( doc . metadata . sourceURL , sc ) ,
2024-12-11 19:46:11 -03:00
) ;
2024-12-10 22:34:26 +01:00
const p1 = generateURLPermutations ( normalizeURL ( doc . metadata . url , sc ) ) ;
2024-12-11 19:46:11 -03:00
const p2 = generateURLPermutations (
2024-12-11 19:51:08 -03:00
normalizeURL ( doc . metadata . sourceURL , sc ) ,
2024-12-11 19:46:11 -03:00
) ;
2024-12-10 22:34:26 +01:00
// In crawls, we should only crawl a redirected page once, no matter how many; times it is redirected to, or if it's been discovered by the crawler before.
// This can prevent flakiness with race conditions.
2024-12-03 17:53:17 -03:00
// Lock the new URL
2024-12-10 22:34:26 +01:00
const lockRes = await lockURL ( job . data . crawl_id , sc , doc . metadata . url ) ;
2024-12-11 19:46:11 -03:00
if (
job . data . crawlerOptions !== null &&
! lockRes &&
JSON . stringify ( p1 ) !== JSON . stringify ( p2 )
) {
2024-12-10 22:34:26 +01:00
throw new RacedRedirectError ( ) ;
}
2024-11-08 16:22:06 +01:00
}
2024-11-07 20:57:33 +01:00
2024-12-05 22:06:07 +01:00
logger . debug ( "Logging job to DB..." ) ;
2024-12-11 19:46:11 -03:00
await logJob (
{
job_id : job.id as string ,
success : true ,
num_docs : 1 ,
docs : [ doc ] ,
time_taken : timeTakenInSeconds ,
team_id : job.data.team_id ,
mode : job.data.mode ,
url : job.data.url ,
crawlerOptions : sc.crawlerOptions ,
scrapeOptions : job.data.scrapeOptions ,
origin : job.data.origin ,
2024-12-11 19:51:08 -03:00
crawl_id : job.data.crawl_id ,
2024-12-11 19:46:11 -03:00
} ,
2024-12-11 19:51:08 -03:00
true ,
2024-12-11 19:46:11 -03:00
) ;
2024-08-15 19:27:15 +02:00
2024-12-05 22:06:07 +01:00
logger . debug ( "Declaring job as done..." ) ;
2024-12-10 22:34:26 +01:00
await addCrawlJobDone ( job . data . crawl_id , job . id , true ) ;
2024-08-13 22:10:17 +02:00
2024-11-20 19:47:58 +01:00
if ( job . data . crawlerOptions !== null ) {
2024-08-13 20:51:43 +02:00
if ( ! sc . cancelled ) {
2024-12-11 19:46:11 -03:00
const crawler = crawlToCrawler (
job . data . crawl_id ,
sc ,
2024-12-11 19:51:08 -03:00
doc . metadata . url ? ? doc . metadata . sourceURL ? ? sc . originUrl ! ,
2024-12-11 19:46:11 -03:00
) ;
2024-08-13 20:51:43 +02:00
2024-08-16 23:29:30 +02:00
const links = crawler . filterLinks (
2024-12-11 19:46:11 -03:00
crawler . extractLinksFromHTML (
rawHtml ? ? "" ,
2024-12-11 19:51:08 -03:00
doc . metadata ? . url ? ? doc . metadata ? . sourceURL ? ? sc . originUrl ! ,
2024-12-11 19:46:11 -03:00
) ,
2024-08-13 20:51:43 +02:00
Infinity ,
2024-12-11 19:51:08 -03:00
sc . crawlerOptions ? . maxDepth ? ? 10 ,
2024-09-01 14:19:43 -03:00
) ;
2024-12-11 19:46:11 -03:00
logger . debug ( "Discovered " + links . length + " links..." , {
2024-12-11 19:51:08 -03:00
linksLength : links.length ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-09-01 14:19:43 -03:00
2024-08-13 20:51:43 +02:00
for ( const link of links ) {
if ( await lockURL ( job . data . crawl_id , sc , link ) ) {
2024-08-21 22:53:33 -03:00
// This seems to work really welel
2024-09-01 14:19:43 -03:00
const jobPriority = await getJobPriority ( {
plan : sc.plan as PlanType ,
team_id : sc.team_id ,
2024-12-11 19:51:08 -03:00
basePriority : job.data.crawl_id ? 20 : 10 ,
2024-09-01 14:19:43 -03:00
} ) ;
2024-08-21 22:20:40 -03:00
const jobId = uuidv4 ( ) ;
2024-12-11 19:46:11 -03:00
logger . debug (
"Determined job priority " +
jobPriority +
" for URL " +
JSON . stringify ( link ) ,
2024-12-11 19:51:08 -03:00
{ jobPriority , url : link } ,
2024-12-11 19:46:11 -03:00
) ;
2024-12-05 22:06:07 +01:00
2024-08-21 22:53:33 -03:00
// console.log("plan: ", sc.plan);
// console.log("team_id: ", sc.team_id)
// console.log("base priority: ", job.data.crawl_id ? 20 : 10)
// console.log("job priority: " , jobPriority, "\n\n\n")
2024-08-21 22:20:40 -03:00
2024-10-25 20:21:12 +02:00
await addScrapeJob (
2024-09-01 14:19:43 -03:00
{
url : link ,
mode : "single_urls" ,
team_id : sc.team_id ,
2024-11-07 20:57:33 +01:00
scrapeOptions : scrapeOptions.parse ( sc . scrapeOptions ) ,
internalOptions : sc.internalOptions ,
2024-09-28 00:19:46 +02:00
plan : job.data.plan ,
2024-09-01 14:19:43 -03:00
origin : job.data.origin ,
crawl_id : job.data.crawl_id ,
2024-10-03 16:37:58 -03:00
webhook : job.data.webhook ,
2024-12-11 19:51:08 -03:00
v1 : job.data.v1 ,
2024-09-01 14:19:43 -03:00
} ,
{ } ,
jobId ,
2024-12-11 19:51:08 -03:00
jobPriority ,
2024-09-01 14:19:43 -03:00
) ;
2024-08-13 20:51:43 +02:00
2024-10-25 20:21:12 +02:00
await addCrawlJob ( job . data . crawl_id , jobId ) ;
2024-12-11 19:46:11 -03:00
logger . debug ( "Added job for URL " + JSON . stringify ( link ) , {
jobPriority ,
url : link ,
2024-12-11 19:51:08 -03:00
newJobId : jobId ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-12-05 22:06:07 +01:00
} else {
2024-12-11 19:46:11 -03:00
logger . debug ( "Could not lock URL " + JSON . stringify ( link ) , {
2024-12-11 19:51:08 -03:00
url : link ,
2024-12-11 19:46:11 -03:00
} ) ;
2024-08-13 20:51:43 +02:00
}
}
}
}
2024-08-13 22:10:17 +02:00
2024-11-26 16:28:45 +01:00
await finishCrawlIfNeeded ( job , sc ) ;
2024-08-13 20:51:43 +02:00
}
2024-11-07 20:57:33 +01:00
logger . info ( ` 🐂 Job done ${ job . id } ` ) ;
2024-07-30 13:27:23 -04:00
return data ;
2024-07-17 21:30:56 +02:00
} catch ( error ) {
2024-12-11 19:46:11 -03:00
const isEarlyTimeout =
error instanceof Error && error . message === "timeout" ;
2024-04-15 17:01:47 -04:00
2024-12-10 22:34:26 +01:00
if ( isEarlyTimeout ) {
logger . error ( ` 🐂 Job timed out ${ job . id } ` ) ;
} else if ( error instanceof RacedRedirectError ) {
logger . warn ( ` 🐂 Job got redirect raced ${ job . id } , silently failing ` ) ;
} else {
2024-12-05 22:06:07 +01:00
logger . error ( ` 🐂 Job errored ${ job . id } - ${ error } ` , { error } ) ;
2024-11-12 12:42:39 +01:00
Sentry . captureException ( error , {
data : {
2024-12-11 19:51:08 -03:00
job : job.id ,
} ,
2024-11-12 12:42:39 +01:00
} ) ;
2024-11-12 18:10:11 +01:00
if ( error instanceof CustomError ) {
// Here we handle the error, then save the failed job
logger . error ( error . message ) ; // or any other error handling
}
logger . error ( error ) ;
if ( error . stack ) {
logger . error ( error . stack ) ;
}
2024-11-12 12:42:39 +01:00
}
2024-08-21 17:58:27 +02:00
2024-07-17 21:30:56 +02:00
const data = {
success : false ,
2024-11-07 20:57:33 +01:00
document : null ,
2024-07-17 21:30:56 +02:00
project_id : job.data.project_id ,
2024-12-11 19:46:11 -03:00
error :
error instanceof Error
? error
: typeof error === "string"
? new Error ( error )
2024-12-11 19:51:08 -03:00
: new Error ( JSON . stringify ( error ) ) ,
2024-07-17 21:30:56 +02:00
} ;
2024-08-30 16:38:55 -03:00
if ( ! job . data . v1 && ( job . data . mode === "crawl" || job . data . crawl_id ) ) {
2024-09-01 14:19:43 -03:00
callWebhook (
job . data . team_id ,
job . data . crawl_id ? ? ( job . id as string ) ,
data ,
job . data . webhook ,
2024-10-23 21:55:21 +02:00
job . data . v1 ,
2024-12-11 19:51:08 -03:00
job . data . crawlerOptions !== null ? "crawl.page" : "batch_scrape.page" ,
2024-09-01 14:19:43 -03:00
) ;
2024-09-01 13:44:36 -03:00
}
2024-10-03 16:37:58 -03:00
// if (job.data.v1) {
// callWebhook(
// job.data.team_id,
// job.id as string,
// [],
// job.data.webhook,
// job.data.v1,
// "crawl.failed"
// );
// }
2024-09-01 14:19:43 -03:00
2024-08-15 18:55:18 +02:00
if ( job . data . crawl_id ) {
2024-11-07 20:57:33 +01:00
const sc = ( await getCrawl ( job . data . crawl_id ) ) as StoredCrawl ;
2024-12-11 19:46:11 -03:00
2024-12-05 22:06:07 +01:00
logger . debug ( "Declaring job as done..." ) ;
2024-12-10 22:34:26 +01:00
await addCrawlJobDone ( job . data . crawl_id , job . id , false ) ;
2024-11-07 20:57:33 +01:00
2024-12-05 22:06:07 +01:00
logger . debug ( "Logging job to DB..." ) ;
2024-12-11 19:46:11 -03:00
await logJob (
{
job_id : job.id as string ,
success : false ,
message :
typeof error === "string"
? error
: ( error . message ? ?
"Something went wrong... Contact help@mendable.ai" ) ,
num_docs : 0 ,
docs : [ ] ,
time_taken : 0 ,
team_id : job.data.team_id ,
mode : job.data.mode ,
url : job.data.url ,
crawlerOptions : sc.crawlerOptions ,
scrapeOptions : job.data.scrapeOptions ,
origin : job.data.origin ,
2024-12-11 19:51:08 -03:00
crawl_id : job.data.crawl_id ,
2024-12-11 19:46:11 -03:00
} ,
2024-12-11 19:51:08 -03:00
true ,
2024-12-11 19:46:11 -03:00
) ;
2024-11-26 16:28:45 +01:00
await finishCrawlIfNeeded ( job , sc ) ;
2024-08-15 19:27:15 +02:00
2024-11-12 22:53:29 +01:00
// await logJob({
// job_id: job.data.crawl_id,
// success: false,
// message:
// typeof error === "string"
// ? error
// : error.message ??
// "Something went wrong... Contact help@mendable.ai",
// num_docs: 0,
// docs: [],
// time_taken: 0,
// team_id: job.data.team_id,
// mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
// url: sc ? sc.originUrl ?? job.data.url : job.data.url,
// crawlerOptions: sc ? sc.crawlerOptions : undefined,
// scrapeOptions: sc ? sc.scrapeOptions : job.data.scrapeOptions,
// origin: job.data.origin,
// });
2024-08-15 18:55:18 +02:00
}
2024-07-30 13:27:23 -04:00
// done(null, data);
return data ;
2024-04-15 17:01:47 -04:00
}
2024-07-17 21:30:56 +02:00
}
2024-07-30 13:27:23 -04:00
// wsq.process(
// Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
// processJob
// );
// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));