Files
firecrawl/apps/api/src/services/queue-worker.ts
T

606 lines
19 KiB
TypeScript
Raw Normal View History

2024-08-20 19:25:19 +02:00
import "dotenv/config";
2024-09-01 14:19:43 -03:00
import "./sentry";
2024-08-21 17:58:27 +02:00
import * as Sentry from "@sentry/node";
2024-04-15 17:01:47 -04:00
import { CustomError } from "../lib/custom-error";
2024-07-30 14:44:13 -04:00
import {
getScrapeQueue,
redisConnection,
scrapeQueueName,
} from "./queue-service";
2024-04-15 17:01:47 -04:00
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
2024-04-20 13:53:11 -07:00
import { logJob } from "./logging/log_job";
import { Job, Queue } from "bullmq";
2024-11-07 20:57:33 +01:00
import { logger } from "../lib/logger";
2024-07-30 13:27:23 -04:00
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
2024-09-01 14:19:43 -03:00
import {
addCrawlJob,
addCrawlJobDone,
crawlToCrawler,
finishCrawl,
getCrawl,
getCrawlJobs,
lockURL,
2024-11-08 16:22:06 +01:00
normalizeURL,
2024-09-01 14:19:43 -03:00
} from "../lib/crawl-redis";
2024-08-13 20:51:43 +02:00
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
2024-09-01 14:19:43 -03:00
import {
addJobPriority,
deleteJobPriority,
getJobPriority,
} from "../../src/lib/job-priority";
import { PlanType, RateLimiterMode } from "../types";
2024-11-07 20:57:33 +01:00
import { getJobs } from "..//controllers/v1/crawl-status";
2024-09-04 15:57:57 -03:00
import { configDotenv } from "dotenv";
2024-11-07 20:57:33 +01:00
import { scrapeOptions } from "../controllers/v1/types";
import { getRateLimiterPoints } from "./rate-limiter";
2024-10-25 20:21:12 +02:00
import { cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, takeConcurrencyLimitedJob } from "../lib/concurrency-limit";
2024-09-04 15:57:57 -03:00
configDotenv();
2024-05-20 13:36:34 -07:00
2024-07-30 13:27:23 -04:00
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
2024-04-15 17:01:47 -04:00
2024-07-30 13:27:23 -04:00
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval =
Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000;
const jobLockExtendInterval =
Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000;
const jobLockExtensionTime =
Number(process.env.JOB_LOCK_EXTENSION_TIME) || 60000;
2024-07-30 13:27:23 -04:00
const cantAcceptConnectionInterval =
Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000;
const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
2024-07-11 20:08:21 +02:00
2024-11-07 20:57:33 +01:00
const processJobInternal = async (token: string, job: Job & { id: string }) => {
2024-07-30 13:27:23 -04:00
const extendLockInterval = setInterval(async () => {
2024-11-07 20:57:33 +01:00
logger.info(`🐂 Worker extending lock on job ${job.id}`);
2024-07-30 13:27:23 -04:00
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
2024-09-01 14:19:43 -03:00
await addJobPriority(job.data.team_id, job.id);
let err = null;
2024-07-30 13:27:23 -04:00
try {
const result = await processJob(job, token);
2024-11-07 20:57:33 +01:00
if (result.success) {
try {
if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") {
await job.moveToCompleted(null, token, false);
} else {
await job.moveToCompleted(result.document, token, false);
}
} catch (e) {}
} else {
await job.moveToFailed((result as any).error, token, false);
}
2024-07-30 13:27:23 -04:00
} catch (error) {
console.log("Job failed, error:", error);
Sentry.captureException(error);
err = error;
2024-07-30 13:27:23 -04:00
await job.moveToFailed(error, token, false);
} finally {
2024-09-01 14:19:43 -03:00
await deleteJobPriority(job.data.team_id, job.id);
2024-07-30 13:27:23 -04:00
clearInterval(extendLockInterval);
}
return err;
2024-07-30 13:27:23 -04:00
};
let isShuttingDown = false;
2024-10-03 18:44:40 -03:00
process.on("SIGINT", () => {
console.log("Received SIGTERM. Shutting down gracefully...");
isShuttingDown = true;
});
2024-10-04 00:40:09 +03:00
process.on("SIGTERM", () => {
console.log("Received SIGTERM. Shutting down gracefully...");
2024-07-30 13:27:23 -04:00
isShuttingDown = true;
});
2024-11-08 20:19:44 +01:00
let cantAcceptConnectionCount = 0;
2024-09-01 14:19:43 -03:00
const workerFun = async (
queue: Queue,
2024-09-01 14:19:43 -03:00
processJobInternal: (token: string, job: Job) => Promise<any>
) => {
const worker = new Worker(queue.name, null, {
2024-07-30 13:27:23 -04:00
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
stalledInterval: 30 * 1000, // 30 seconds
maxStalledCount: 10, // 10 times
});
worker.startStalledCheckTimer();
const monitor = await systemMonitor;
while (true) {
if (isShuttingDown) {
console.log("No longer accepting new jobs. SIGINT");
break;
}
const token = uuidv4();
const canAcceptConnection = await monitor.acceptConnection();
if (!canAcceptConnection) {
console.log("Cant accept connection");
2024-11-08 20:19:44 +01:00
cantAcceptConnectionCount++;
if (cantAcceptConnectionCount >= 25) {
logger.error("WORKER STALLED", {
cpuUsage: await monitor.checkCpuUsage(),
memoryUsage: await monitor.checkMemoryUsage(),
});
}
2024-07-30 13:27:23 -04:00
await sleep(cantAcceptConnectionInterval); // more sleep
continue;
2024-11-08 20:19:44 +01:00
} else {
cantAcceptConnectionCount = 0;
2024-07-30 13:27:23 -04:00
}
const job = await worker.getNextJob(token);
if (job) {
2024-10-25 20:21:12 +02:00
async function afterJobDone(job: Job<any, any, string>) {
if (job.id && job.data && job.data.team_id && job.data.plan) {
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
cleanOldConcurrencyLimitEntries(job.data.team_id);
// Queue up next job, if it exists
// No need to check if we're under the limit here -- if the current job is finished,
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id);
await queue.add(nextJob.id, {
...nextJob.data,
concurrencyLimitHit: true,
}, {
...nextJob.opts,
jobId: nextJob.id,
priority: nextJob.priority,
});
}
}
}
if (job.data && job.data.sentry && Sentry.isInitialized()) {
2024-09-01 14:19:43 -03:00
Sentry.continueTrace(
{
sentryTrace: job.data.sentry.trace,
baggage: job.data.sentry.baggage,
},
() => {
Sentry.startSpan(
{
name: "Scrape job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
},
async (span) => {
await Sentry.startSpan(
{
name: "Process scrape job",
op: "queue.process",
attributes: {
"messaging.message.id": job.id,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": job.data.sentry.size,
"messaging.message.receive.latency":
Date.now() - (job.processedOn ?? job.timestamp),
"messaging.message.retry.count": job.attemptsMade,
},
},
async () => {
let res;
try {
res = await processJobInternal(token, job);
} finally {
2024-10-25 20:21:12 +02:00
await afterJobDone(job)
}
2024-09-01 14:19:43 -03:00
if (res !== null) {
span.setStatus({ code: 2 }); // ERROR
} else {
span.setStatus({ code: 1 }); // OK
}
}
);
}
);
}
);
} else {
Sentry.startSpan(
{
name: "Scrape job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
},
2024-09-01 14:19:43 -03:00
() => {
processJobInternal(token, job)
2024-10-25 20:21:12 +02:00
.finally(() => afterJobDone(job));
2024-09-01 14:19:43 -03:00
}
);
}
2024-09-01 14:19:43 -03:00
2024-07-30 13:27:23 -04:00
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);
}
}
};
workerFun(getScrapeQueue(), processJobInternal);
2024-07-30 13:27:23 -04:00
2024-11-07 20:57:33 +01:00
async function processJob(job: Job & { id: string }, token: string) {
logger.info(`🐂 Worker taking job ${job.id}`);
2024-07-24 18:44:14 +02:00
2024-08-16 22:17:38 -04:00
// Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue
2024-09-01 14:19:43 -03:00
if (
job.data.url &&
(job.data.url.includes("researchhub.com") ||
job.data.url.includes("ebay.com") ||
job.data.url.includes("youtube.com") ||
job.data.url.includes("microsoft.com"))
) {
2024-11-07 20:57:33 +01:00
logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
2024-08-16 22:17:38 -04:00
const data = {
success: false,
2024-11-07 20:57:33 +01:00
document: null,
2024-08-16 22:17:38 -04:00
project_id: job.data.project_id,
2024-09-01 14:19:43 -03:00
error:
"URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.",
2024-08-16 22:17:38 -04:00
};
return data;
}
try {
2024-07-30 13:27:23 -04:00
job.updateProgress({
current: 1,
total: 100,
current_step: "SCRAPING",
current_url: "",
});
const start = Date.now();
2024-09-01 14:19:43 -03:00
2024-11-12 12:42:39 +01:00
const pipeline = await Promise.race([
startWebScraperPipeline({
job,
token,
}),
...(job.data.scrapeOptions.timeout !== undefined ? [
(async () => {
await sleep(job.data.scrapeOptions.timeout);
throw new Error("timeout")
})(),
] : [])
]);
2024-11-07 20:57:33 +01:00
if (!pipeline.success) {
// TODO: let's Not do this
throw pipeline.error;
}
2024-11-07 20:57:33 +01:00
const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000;
2024-04-20 19:37:45 -07:00
2024-11-07 20:57:33 +01:00
const doc = pipeline.document;
const rawHtml = doc.rawHtml ?? "";
2024-08-16 23:29:30 +02:00
const data = {
2024-11-07 20:57:33 +01:00
success: true,
result: {
2024-11-07 20:57:33 +01:00
links: [{
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.metadata?.url ?? "",
}],
},
project_id: job.data.project_id,
2024-11-07 20:57:33 +01:00
document: doc,
};
2024-04-15 17:01:47 -04:00
2024-09-01 13:44:36 -03:00
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
2024-09-01 15:06:36 -03:00
await callWebhook(
2024-09-01 14:19:43 -03:00
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
2024-09-01 15:06:36 -03:00
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page",
2024-09-01 15:06:36 -03:00
true
2024-09-01 14:19:43 -03:00
);
2024-07-25 00:14:25 +02:00
}
2024-04-20 19:37:45 -07:00
2024-08-13 20:51:43 +02:00
if (job.data.crawl_id) {
2024-11-07 20:57:33 +01:00
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
2024-11-08 16:22:06 +01:00
if (doc.metadata.url !== undefined && doc.metadata.sourceURL !== undefined && normalizeURL(doc.metadata.url, sc) !== normalizeURL(doc.metadata.sourceURL, sc)) {
2024-11-08 16:22:06 +01:00
logger.debug("Was redirected, locking new URL...");
await lockURL(job.data.crawl_id, sc, doc.metadata.url);
}
2024-11-07 20:57:33 +01:00
2024-08-15 19:27:15 +02:00
await logJob({
job_id: job.id as string,
2024-11-07 20:57:33 +01:00
success: true,
num_docs: 1,
docs: [doc],
2024-08-15 19:27:15 +02:00
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
2024-08-15 19:27:15 +02:00
url: job.data.url,
2024-11-07 20:57:33 +01:00
crawlerOptions: sc.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
2024-08-15 19:27:15 +02:00
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
2024-08-13 22:10:17 +02:00
await addCrawlJobDone(job.data.crawl_id, job.id);
2024-10-17 19:40:18 +02:00
if (!job.data.sitemapped && job.data.crawlerOptions !== null) {
2024-08-13 20:51:43 +02:00
if (!sc.cancelled) {
2024-11-13 21:38:44 +01:00
const crawler = crawlToCrawler(job.data.crawl_id, sc, doc.metadata.url ?? doc.metadata.sourceURL ?? sc.originUrl);
2024-08-13 20:51:43 +02:00
2024-08-16 23:29:30 +02:00
const links = crawler.filterLinks(
crawler.extractLinksFromHTML(rawHtml ?? "", doc.metadata?.url ?? doc.metadata?.sourceURL ?? sc.originUrl as string),
2024-08-13 20:51:43 +02:00
Infinity,
sc.crawlerOptions?.maxDepth ?? 10
2024-09-01 14:19:43 -03:00
);
2024-08-13 20:51:43 +02:00
for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) {
2024-08-21 22:53:33 -03:00
// This seems to work really welel
2024-09-01 14:19:43 -03:00
const jobPriority = await getJobPriority({
plan: sc.plan as PlanType,
team_id: sc.team_id,
basePriority: job.data.crawl_id ? 20 : 10,
});
2024-08-21 22:20:40 -03:00
const jobId = uuidv4();
2024-08-21 22:53:33 -03:00
// console.log("plan: ", sc.plan);
// console.log("team_id: ", sc.team_id)
// console.log("base priority: ", job.data.crawl_id ? 20 : 10)
// console.log("job priority: " , jobPriority, "\n\n\n")
2024-08-21 22:20:40 -03:00
2024-10-25 20:21:12 +02:00
await addScrapeJob(
2024-09-01 14:19:43 -03:00
{
url: link,
mode: "single_urls",
team_id: sc.team_id,
2024-11-07 20:57:33 +01:00
scrapeOptions: scrapeOptions.parse(sc.scrapeOptions),
internalOptions: sc.internalOptions,
plan: job.data.plan,
2024-09-01 14:19:43 -03:00
origin: job.data.origin,
crawl_id: job.data.crawl_id,
2024-10-03 16:37:58 -03:00
webhook: job.data.webhook,
2024-09-01 14:19:43 -03:00
v1: job.data.v1,
},
{},
jobId,
jobPriority
);
2024-08-13 20:51:43 +02:00
2024-10-25 20:21:12 +02:00
await addCrawlJob(job.data.crawl_id, jobId);
2024-08-13 20:51:43 +02:00
}
}
}
}
2024-08-13 22:10:17 +02:00
if (await finishCrawl(job.data.crawl_id)) {
2024-09-01 13:44:36 -03:00
if (!job.data.v1) {
2024-09-01 14:19:43 -03:00
const jobIDs = await getCrawlJobs(job.data.crawl_id);
2024-09-01 19:29:35 +02:00
const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
2024-09-01 14:19:43 -03:00
const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
const jobStatus =
sc.cancelled || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed";
const fullDocs = jobs.map((x) =>
Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue
);
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
2024-11-07 20:57:33 +01:00
message: sc.cancelled ? "Cancelled" : undefined,
2024-09-01 14:19:43 -03:00
num_docs: fullDocs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
2024-11-07 20:57:33 +01:00
url: sc.originUrl!,
scrapeOptions: sc.scrapeOptions,
2024-09-01 14:19:43 -03:00
crawlerOptions: sc.crawlerOptions,
origin: job.data.origin,
});
const data = {
success: jobStatus !== "failed",
result: {
links: fullDocs.map((doc) => {
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
docs: fullDocs,
};
// v0 web hooks, call when done with all the data
if (!job.data.v1) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed"
2024-09-01 14:19:43 -03:00
);
}
2024-09-01 19:29:35 +02:00
} else {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobStatus =
2024-10-03 16:37:58 -03:00
sc.cancelled
2024-09-01 19:29:35 +02:00
? "failed"
: "completed";
2024-09-01 15:06:36 -03:00
// v1 web hooks, call when done with no data, but with event completed
if (job.data.v1 && job.data.webhook) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
[],
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed"
2024-09-01 15:06:36 -03:00
);
}
2024-09-01 19:29:35 +02:00
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
2024-11-07 20:57:33 +01:00
message: sc.cancelled ? "Cancelled" : undefined,
2024-09-01 19:29:35 +02:00
num_docs: jobIDs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
2024-11-07 20:57:33 +01:00
scrapeOptions: sc.scrapeOptions,
mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
2024-10-24 23:44:08 -03:00
url: sc?.originUrl ?? (job.data.crawlerOptions === null ? "Batch Scrape" : "Unknown"),
2024-09-01 19:29:35 +02:00
crawlerOptions: sc.crawlerOptions,
origin: job.data.origin,
});
2024-09-01 13:44:36 -03:00
}
2024-08-13 22:10:17 +02:00
}
2024-08-13 20:51:43 +02:00
}
2024-11-07 20:57:33 +01:00
logger.info(`🐂 Job done ${job.id}`);
2024-07-30 13:27:23 -04:00
return data;
} catch (error) {
2024-11-12 12:42:39 +01:00
const isEarlyTimeout = error instanceof Error && error.message === "timeout";
2024-04-15 17:01:47 -04:00
2024-11-12 12:42:39 +01:00
if (!isEarlyTimeout) {
logger.error(`🐂 Job errored ${job.id} - ${error}`);
Sentry.captureException(error, {
data: {
job: job.id,
},
});
2024-11-12 18:10:11 +01:00
if (error instanceof CustomError) {
// Here we handle the error, then save the failed job
logger.error(error.message); // or any other error handling
}
logger.error(error);
if (error.stack) {
logger.error(error.stack);
}
2024-11-12 12:42:39 +01:00
} else {
logger.error(`🐂 Job timed out ${job.id}`);
}
2024-08-21 17:58:27 +02:00
const data = {
success: false,
2024-11-07 20:57:33 +01:00
document: null,
project_id: job.data.project_id,
2024-11-07 23:18:24 +01:00
error: error instanceof Error ? error : typeof error === "string" ? new Error(error) : new Error(JSON.stringify(error)),
};
2024-08-30 16:38:55 -03:00
if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) {
2024-09-01 14:19:43 -03:00
callWebhook(
job.data.team_id,
job.data.crawl_id ?? (job.id as string),
data,
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page",
2024-09-01 14:19:43 -03:00
);
2024-09-01 13:44:36 -03:00
}
2024-10-03 16:37:58 -03:00
// if (job.data.v1) {
// callWebhook(
// job.data.team_id,
// job.id as string,
// [],
// job.data.webhook,
// job.data.v1,
// "crawl.failed"
// );
// }
2024-09-01 14:19:43 -03:00
2024-08-15 18:55:18 +02:00
if (job.data.crawl_id) {
2024-11-07 20:57:33 +01:00
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
2024-08-15 19:27:15 +02:00
await logJob({
job_id: job.id as string,
success: false,
message:
typeof error === "string"
? error
2024-09-01 14:19:43 -03:00
: error.message ??
"Something went wrong... Contact help@mendable.ai",
2024-08-15 19:27:15 +02:00
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
2024-11-07 20:57:33 +01:00
crawlerOptions: sc.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
2024-08-15 19:27:15 +02:00
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
// await logJob({
// job_id: job.data.crawl_id,
// success: false,
// message:
// typeof error === "string"
// ? error
// : error.message ??
// "Something went wrong... Contact help@mendable.ai",
// num_docs: 0,
// docs: [],
// time_taken: 0,
// team_id: job.data.team_id,
// mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
// url: sc ? sc.originUrl ?? job.data.url : job.data.url,
// crawlerOptions: sc ? sc.crawlerOptions : undefined,
// scrapeOptions: sc ? sc.scrapeOptions : job.data.scrapeOptions,
// origin: job.data.origin,
// });
2024-08-15 18:55:18 +02:00
}
2024-07-30 13:27:23 -04:00
// done(null, data);
return data;
2024-04-15 17:01:47 -04:00
}
}
2024-07-30 13:27:23 -04:00
// wsq.process(
// Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
// processJob
// );
// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));