fix(queue-worker): graceful shutdown
This commit is contained in:
@@ -84,6 +84,8 @@ const connectionMonitorInterval =
|
|||||||
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
|
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
|
||||||
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
|
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
|
||||||
|
|
||||||
|
const runningJobs: Set<string> = new Set();
|
||||||
|
|
||||||
async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
|
async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
|
||||||
if (await finishCrawl(job.data.crawl_id)) {
|
if (await finishCrawl(job.data.crawl_id)) {
|
||||||
(async () => {
|
(async () => {
|
||||||
@@ -425,7 +427,15 @@ const workerFun = async (
|
|||||||
|
|
||||||
const job = await worker.getNextJob(token);
|
const job = await worker.getNextJob(token);
|
||||||
if (job) {
|
if (job) {
|
||||||
|
if (job.id) {
|
||||||
|
runningJobs.add(job.id);
|
||||||
|
}
|
||||||
|
|
||||||
async function afterJobDone(job: Job<any, any, string>) {
|
async function afterJobDone(job: Job<any, any, string>) {
|
||||||
|
if (job.id) {
|
||||||
|
runningJobs.delete(job.id);
|
||||||
|
}
|
||||||
|
|
||||||
if (job.id && job.data && job.data.team_id && job.data.plan) {
|
if (job.id && job.data && job.data.team_id && job.data.plan) {
|
||||||
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
|
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
|
||||||
cleanOldConcurrencyLimitEntries(job.data.team_id);
|
cleanOldConcurrencyLimitEntries(job.data.team_id);
|
||||||
@@ -523,10 +533,6 @@ const workerFun = async (
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start both workers
|
|
||||||
workerFun(getScrapeQueue(), processJobInternal);
|
|
||||||
workerFun(getExtractQueue(), processExtractJobInternal);
|
|
||||||
|
|
||||||
async function processKickoffJob(job: Job & { id: string }, token: string) {
|
async function processKickoffJob(job: Job & { id: string }, token: string) {
|
||||||
const logger = _logger.child({
|
const logger = _logger.child({
|
||||||
module: "queue-worker",
|
module: "queue-worker",
|
||||||
@@ -1065,3 +1071,19 @@ async function processJob(job: Job & { id: string }, token: string) {
|
|||||||
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
|
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
|
||||||
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
|
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
|
||||||
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
|
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
|
||||||
|
|
||||||
|
// Start both workers
|
||||||
|
(async () => {
|
||||||
|
await Promise.all([
|
||||||
|
workerFun(getScrapeQueue(), processJobInternal),
|
||||||
|
workerFun(getExtractQueue(), processExtractJobInternal),
|
||||||
|
]);
|
||||||
|
|
||||||
|
console.log("All workers exited. Waiting for all jobs to finish...");
|
||||||
|
|
||||||
|
while (runningJobs.size > 0) {
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 500));
|
||||||
|
}
|
||||||
|
|
||||||
|
process.exit(0);
|
||||||
|
})();
|
||||||
Reference in New Issue
Block a user