This commit is contained in:
Nicolas
2024-07-16 22:41:13 -04:00
parent 92202de12b
commit c9073a747c
5 changed files with 127 additions and 61 deletions
+110 -37
View File
@@ -3,7 +3,6 @@ import bodyParser from "body-parser";
import cors from "cors";
import "dotenv/config";
import { getWebScraperQueue } from "./services/queue-service";
import { redisClient } from "./services/rate-limiter";
import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
@@ -11,6 +10,8 @@ import os from "os";
import { Job } from "bull";
import { sendSlackWebhook } from "./services/alerts/slack";
import { checkAlerts } from "./services/alerts";
import Redis from "ioredis";
import { redisRateLimitClient } from "./services/rate-limiter";
const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter");
@@ -34,11 +35,9 @@ if (cluster.isMaster) {
cluster.fork();
}
});
} else {
const app = express();
global.isProduction = process.env.IS_PRODUCTION === "true";
app.use(bodyParser.urlencoded({ extended: true }));
@@ -46,6 +45,8 @@ if (cluster.isMaster) {
app.use(cors()); // Add this line to enable CORS
const queueRedis = new Redis(process.env.REDIS_URL);
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
@@ -73,7 +74,6 @@ if (cluster.isMaster) {
const DEFAULT_PORT = process.env.PORT ?? 3002;
const HOST = process.env.HOST ?? "localhost";
redisClient.connect();
// HyperDX OpenTelemetry
if (process.env.ENV === "production") {
@@ -121,7 +121,6 @@ if (cluster.isMaster) {
});
app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => {
// return res.status(200).json({ ok: true });
try {
console.log("Gracefully shutting down...");
@@ -138,34 +137,38 @@ if (cluster.isMaster) {
const wsq = getWebScraperQueue();
const jobs = await wsq.getActive();
console.log("Requeueing", jobs.length, "jobs...");
if (jobs.length > 0) {
console.log(" Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => {
try {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
} catch (e) {
console.warn("Failed to remove job", x.id, e);
}
}));
await Promise.all(
jobs.map(async (x) => {
try {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
} catch (e) {
console.warn("Failed to remove job", x.id, e);
}
})
);
console.log(" Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({
data: x.data,
opts: {
jobId: x.id,
},
})));
await wsq.addBulk(
jobs.map((x) => ({
data: x.data,
opts: {
jobId: x.id,
},
}))
);
console.log(" Done!");
}
await getWebScraperQueue().resume(false);
res.json({ ok: true });
} catch (error) {
@@ -268,27 +271,32 @@ if (cluster.isMaster) {
const numberOfBatches = 9; // Adjust based on your needs
const completedJobsPromises: Promise<Job[]>[] = [];
for (let i = 0; i < numberOfBatches; i++) {
completedJobsPromises.push(webScraperQueue.getJobs(
["completed"],
i * batchSize,
i * batchSize + batchSize,
true
));
completedJobsPromises.push(
webScraperQueue.getJobs(
["completed"],
i * batchSize,
i * batchSize + batchSize,
true
)
);
}
const completedJobs: Job[] = (await Promise.all(completedJobsPromises)).flat();
const before24hJobs = completedJobs.filter(
(job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
) || [];
const completedJobs: Job[] = (
await Promise.all(completedJobsPromises)
).flat();
const before24hJobs =
completedJobs.filter(
(job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
) || [];
let count = 0;
if (!before24hJobs) {
return res.status(200).send(`No jobs to remove.`);
}
for (const job of before24hJobs) {
try {
await job.remove()
await job.remove();
count++;
} catch (jobError) {
console.error(`Failed to remove job with ID ${job.id}:`, jobError);
@@ -306,8 +314,73 @@ if (cluster.isMaster) {
res.send({ isProduction: global.isProduction });
});
app.get(
`/admin/${process.env.BULL_AUTH_KEY}/redis-health`,
async (req, res) => {
try {
const testKey = "test";
const testValue = "test";
// Test queueRedis
let queueRedisHealth;
try {
await queueRedis.set(testKey, testValue);
queueRedisHealth = await queueRedis.get(testKey);
await queueRedis.del(testKey);
} catch (error) {
console.error("queueRedis health check failed:", error);
queueRedisHealth = null;
}
// Test redisRateLimitClient
let redisRateLimitHealth;
try {
await redisRateLimitClient.set(testKey, testValue);
redisRateLimitHealth = await redisRateLimitClient.get(testKey);
await redisRateLimitClient.del(testKey);
} catch (error) {
console.error("redisRateLimitClient health check failed:", error);
redisRateLimitHealth = null;
}
const healthStatus = {
queueRedis: queueRedisHealth === testValue ? "healthy" : "unhealthy",
redisRateLimitClient:
redisRateLimitHealth === testValue ? "healthy" : "unhealthy",
};
if (
healthStatus.queueRedis === "healthy" &&
healthStatus.redisRateLimitClient === "healthy"
) {
console.log("Both Redis instances are healthy");
return res
.status(200)
.json({ status: "healthy", details: healthStatus });
} else {
console.log("Redis instances health check:", healthStatus);
await sendSlackWebhook(
`[REDIS DOWN] Redis instances health check: ${JSON.stringify(
healthStatus
)}`,
true
);
return res
.status(500)
.json({ status: "unhealthy", details: healthStatus });
}
} catch (error) {
console.error("Redis health check failed:", error);
await sendSlackWebhook(
`[REDIS DOWN] Redis instances health check: ${error.message}`,
true
);
return res
.status(500)
.json({ status: "unhealthy", message: error.message });
}
}
);
console.log(`Worker ${process.pid} started`);
}