feat: stuck job restoration iteration 2

This commit is contained in:
Gergo Moricz
2024-07-11 20:08:21 +02:00
parent 86d0e88a91
commit bffb9f8fd0
2 changed files with 33 additions and 91 deletions
-89
View File
@@ -9,8 +9,6 @@ import { v0Router } from "./routes/v0";
import cluster from "cluster";
import os from "os";
import { Job } from "bull";
import { supabase_service } from "./services/supabase";
import { logJob } from "./services/logging/log_job";
const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter");
@@ -22,39 +20,6 @@ console.log(`Number of CPUs: ${numCPUs} available`);
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
// (async () => {
// if (process.env.USE_DB_AUTHENTICATION) {
// const wsq = getWebScraperQueue();
// const { error, data } = await supabase_service
// .from("firecrawl_jobs")
// .select()
// .eq("retry", true);
// if (error) throw new Error(error.message);
// await wsq.addBulk(data.map(x => ({
// data: {
// url: x.url,
// mode: x.mode,
// crawlerOptions: x.crawler_options,
// team_id: x.team_id,
// pageOptions: x.page_options,
// origin: x.origin,
// },
// opts: {
// jobId: x.job_id,
// }
// })))
// if (data.length > 0) {
// await supabase_service
// .from("firecrawl_jobs")
// .delete()
// .in("id", data.map(x => x.id));
// }
// }
// })();
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
@@ -271,57 +236,3 @@ if (cluster.isMaster) {
console.log(`Worker ${process.pid} started`);
}
const onExit = async () => {
console.log("Shutting down gracefully...");
if (cluster.workers) {
for (const worker of Object.keys(cluster.workers || {})) {
cluster.workers[worker].process.kill();
}
}
if (process.env.USE_DB_AUTHENTICATION) {
const wsq = getWebScraperQueue();
const activeJobCount = await wsq.getActiveCount();
console.log("Updating", activeJobCount, "in-progress jobs");
const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => {
return wsq.getActive(i, i+10)
}))).flat(1);
for (const job of activeJobs) {
console.log(job.id);
try {
await logJob({
job_id: job.id as string,
success: false,
message: "Interrupted, retrying",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
retry: true,
});
await wsq.client.del(await job.lockKey());
await job.takeLock();
await job.moveToFailed({ message: "interrupted" });
await job.remove();
} catch (error) {
console.error("Failed to update job status:", error);
}
}
}
console.log("Bye!");
process.exit();
};
process.on("SIGINT", onExit);
process.on("SIGTERM", onExit);