import { Job, Queue } from "bullmq"; import { getScrapeQueue } from "./queue-service"; import { v4 as uuidv4 } from "uuid"; import { WebScraperOptions } from "../types"; import * as Sentry from "@sentry/node"; async function addScrapeJobRaw( webScraperOptions: any, options: any, jobId: string, jobPriority: number = 10 ): Promise { return await getScrapeQueue().add(jobId, webScraperOptions, { ...options, priority: jobPriority, jobId, }); } export async function addScrapeJob( webScraperOptions: WebScraperOptions, options: any = {}, jobId: string = uuidv4(), jobPriority: number = 10 ): Promise { if (Sentry.isInitialized()) { const size = JSON.stringify(webScraperOptions).length; return await Sentry.startSpan({ name: "Add scrape job", op: "queue.publish", attributes: { "messaging.message.id": jobId, "messaging.destination.name": getScrapeQueue().name, "messaging.message.body.size": size, }, }, async (span) => { return await addScrapeJobRaw({ ...webScraperOptions, sentry: { trace: Sentry.spanToTraceHeader(span), baggage: Sentry.spanToBaggageHeader(span), size, }, }, options, jobId, jobPriority); }); } else { return await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority); } } export function waitForJob(jobId: string, timeout: number) { return new Promise((resolve, reject) => { const start = Date.now(); const int = setInterval(async () => { if (Date.now() >= start + timeout) { clearInterval(int); reject(new Error("Job wait ")); } else { const state = await getScrapeQueue().getJobState(jobId); if (state === "completed") { clearInterval(int); resolve((await getScrapeQueue().getJob(jobId)).returnvalue); } else if (state === "failed") { // console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason); clearInterval(int); reject((await getScrapeQueue().getJob(jobId)).failedReason); } } }, 1000); }) }