Files
firecrawl/apps/api/src/services/queue-service.ts
T

96 lines
2.4 KiB
TypeScript
Raw Normal View History

2024-07-30 13:27:23 -04:00
import { Queue } from "bullmq";
2024-11-07 20:57:33 +01:00
import { logger } from "../lib/logger";
2024-07-30 13:27:23 -04:00
import IORedis from "ioredis";
2024-04-15 17:01:47 -04:00
2024-07-30 14:44:13 -04:00
let scrapeQueue: Queue;
2025-01-03 20:44:27 -03:00
let extractQueue: Queue;
let loggingQueue: Queue;
2025-01-19 13:09:29 -03:00
let indexQueue: Queue;
let deepResearchQueue: Queue;
2024-07-30 14:44:13 -04:00
2024-11-07 20:57:33 +01:00
export const redisConnection = new IORedis(process.env.REDIS_URL!, {
2024-12-11 19:51:08 -03:00
maxRetriesPerRequest: null,
2024-07-30 13:27:23 -04:00
});
2024-04-15 17:01:47 -04:00
2024-07-30 14:44:13 -04:00
export const scrapeQueueName = "{scrapeQueue}";
2025-01-03 20:44:27 -03:00
export const extractQueueName = "{extractQueue}";
export const loggingQueueName = "{loggingQueue}";
2025-01-19 13:09:29 -03:00
export const indexQueueName = "{indexQueue}";
export const deepResearchQueueName = "{deepResearchQueue}";
2024-07-30 13:27:23 -04:00
2024-07-30 14:44:13 -04:00
export function getScrapeQueue() {
if (!scrapeQueue) {
2025-01-10 18:35:10 -03:00
scrapeQueue = new Queue(scrapeQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
2025-01-23 20:30:20 +01:00
age: 3600, // 1 hour
2024-12-11 19:51:08 -03:00
},
2025-01-10 18:35:10 -03:00
removeOnFail: {
2025-01-23 20:30:20 +01:00
age: 3600, // 1 hour
2025-01-10 18:35:10 -03:00
},
},
});
2024-11-07 20:57:33 +01:00
logger.info("Web scraper queue created");
2024-07-30 14:44:13 -04:00
}
return scrapeQueue;
}
2025-01-03 20:44:27 -03:00
export function getExtractQueue() {
if (!extractQueue) {
2025-01-10 18:35:10 -03:00
extractQueue = new Queue(extractQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
2025-01-03 20:44:27 -03:00
},
2025-01-10 18:35:10 -03:00
},
});
2025-01-03 20:44:27 -03:00
logger.info("Extraction queue created");
}
return extractQueue;
}
2025-01-19 13:09:29 -03:00
export function getIndexQueue() {
if (!indexQueue) {
indexQueue = new Queue(indexQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Index queue created");
}
return indexQueue;
}
export function getDeepResearchQueue() {
if (!deepResearchQueue) {
deepResearchQueue = new Queue(deepResearchQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Deep research queue created");
}
return deepResearchQueue;
}
2024-08-22 22:38:39 +02:00
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
2024-12-11 19:46:11 -03:00
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });