diff --git a/apps/api/package.json b/apps/api/package.json index 230667a8..183ddaa3 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -73,7 +73,7 @@ "form-data": "^4.0.0", "glob": "^10.4.2", "gpt3-tokenizer": "^1.1.5", - "ioredis": "^5.3.2", + "ioredis": "^5.4.1", "joplin-turndown-plugin-gfm": "^1.0.12", "json-schema-to-zod": "^2.3.0", "keyword-extractor": "^0.0.28", @@ -92,7 +92,6 @@ "promptable": "^0.0.10", "puppeteer": "^22.12.1", "rate-limiter-flexible": "2.4.2", - "redis": "^4.6.7", "resend": "^3.4.0", "robots-parser": "^3.0.1", "scrapingbee": "^1.7.4", diff --git a/apps/api/pnpm-lock.yaml b/apps/api/pnpm-lock.yaml index 80b12936..02d8363b 100644 --- a/apps/api/pnpm-lock.yaml +++ b/apps/api/pnpm-lock.yaml @@ -90,7 +90,7 @@ importers: specifier: ^1.1.5 version: 1.1.5 ioredis: - specifier: ^5.3.2 + specifier: ^5.4.1 version: 5.4.1 joplin-turndown-plugin-gfm: specifier: ^1.0.12 @@ -146,9 +146,6 @@ importers: rate-limiter-flexible: specifier: 2.4.2 version: 2.4.2 - redis: - specifier: ^4.6.7 - version: 4.6.14 resend: specifier: ^3.4.0 version: 3.4.0 diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 198288f6..c5e6a438 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -3,7 +3,6 @@ import bodyParser from "body-parser"; import cors from "cors"; import "dotenv/config"; import { getWebScraperQueue } from "./services/queue-service"; -import { redisClient } from "./services/rate-limiter"; import { v0Router } from "./routes/v0"; import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; @@ -11,6 +10,8 @@ import os from "os"; import { Job } from "bull"; import { sendSlackWebhook } from "./services/alerts/slack"; import { checkAlerts } from "./services/alerts"; +import Redis from "ioredis"; +import { redisRateLimitClient } from "./services/rate-limiter"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -34,11 +35,9 @@ if (cluster.isMaster) { cluster.fork(); } }); - } else { const app = express(); - global.isProduction = process.env.IS_PRODUCTION === "true"; app.use(bodyParser.urlencoded({ extended: true })); @@ -46,6 +45,8 @@ if (cluster.isMaster) { app.use(cors()); // Add this line to enable CORS + const queueRedis = new Redis(process.env.REDIS_URL); + const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); @@ -73,7 +74,6 @@ if (cluster.isMaster) { const DEFAULT_PORT = process.env.PORT ?? 3002; const HOST = process.env.HOST ?? "localhost"; - redisClient.connect(); // HyperDX OpenTelemetry if (process.env.ENV === "production") { @@ -121,7 +121,6 @@ if (cluster.isMaster) { }); app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => { - // return res.status(200).json({ ok: true }); try { console.log("Gracefully shutting down..."); @@ -138,34 +137,38 @@ if (cluster.isMaster) { const wsq = getWebScraperQueue(); const jobs = await wsq.getActive(); - + console.log("Requeueing", jobs.length, "jobs..."); if (jobs.length > 0) { console.log(" Removing", jobs.length, "jobs..."); - await Promise.all(jobs.map(async x => { - try { - await wsq.client.del(await x.lockKey()); - await x.takeLock(); - await x.moveToFailed({ message: "interrupted" }); - await x.remove(); - } catch (e) { - console.warn("Failed to remove job", x.id, e); - } - })); + await Promise.all( + jobs.map(async (x) => { + try { + await wsq.client.del(await x.lockKey()); + await x.takeLock(); + await x.moveToFailed({ message: "interrupted" }); + await x.remove(); + } catch (e) { + console.warn("Failed to remove job", x.id, e); + } + }) + ); console.log(" Re-adding", jobs.length, "jobs..."); - await wsq.addBulk(jobs.map(x => ({ - data: x.data, - opts: { - jobId: x.id, - }, - }))); + await wsq.addBulk( + jobs.map((x) => ({ + data: x.data, + opts: { + jobId: x.id, + }, + })) + ); console.log(" Done!"); } - + await getWebScraperQueue().resume(false); res.json({ ok: true }); } catch (error) { @@ -268,27 +271,32 @@ if (cluster.isMaster) { const numberOfBatches = 9; // Adjust based on your needs const completedJobsPromises: Promise[] = []; for (let i = 0; i < numberOfBatches; i++) { - completedJobsPromises.push(webScraperQueue.getJobs( - ["completed"], - i * batchSize, - i * batchSize + batchSize, - true - )); + completedJobsPromises.push( + webScraperQueue.getJobs( + ["completed"], + i * batchSize, + i * batchSize + batchSize, + true + ) + ); } - const completedJobs: Job[] = (await Promise.all(completedJobsPromises)).flat(); - const before24hJobs = completedJobs.filter( - (job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000 - ) || []; - + const completedJobs: Job[] = ( + await Promise.all(completedJobsPromises) + ).flat(); + const before24hJobs = + completedJobs.filter( + (job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000 + ) || []; + let count = 0; - + if (!before24hJobs) { return res.status(200).send(`No jobs to remove.`); } for (const job of before24hJobs) { try { - await job.remove() + await job.remove(); count++; } catch (jobError) { console.error(`Failed to remove job with ID ${job.id}:`, jobError); @@ -306,8 +314,73 @@ if (cluster.isMaster) { res.send({ isProduction: global.isProduction }); }); + app.get( + `/admin/${process.env.BULL_AUTH_KEY}/redis-health`, + async (req, res) => { + try { + const testKey = "test"; + const testValue = "test"; - + // Test queueRedis + let queueRedisHealth; + try { + await queueRedis.set(testKey, testValue); + queueRedisHealth = await queueRedis.get(testKey); + await queueRedis.del(testKey); + } catch (error) { + console.error("queueRedis health check failed:", error); + queueRedisHealth = null; + } + + // Test redisRateLimitClient + let redisRateLimitHealth; + try { + await redisRateLimitClient.set(testKey, testValue); + redisRateLimitHealth = await redisRateLimitClient.get(testKey); + await redisRateLimitClient.del(testKey); + } catch (error) { + console.error("redisRateLimitClient health check failed:", error); + redisRateLimitHealth = null; + } + + const healthStatus = { + queueRedis: queueRedisHealth === testValue ? "healthy" : "unhealthy", + redisRateLimitClient: + redisRateLimitHealth === testValue ? "healthy" : "unhealthy", + }; + + if ( + healthStatus.queueRedis === "healthy" && + healthStatus.redisRateLimitClient === "healthy" + ) { + console.log("Both Redis instances are healthy"); + return res + .status(200) + .json({ status: "healthy", details: healthStatus }); + } else { + console.log("Redis instances health check:", healthStatus); + await sendSlackWebhook( + `[REDIS DOWN] Redis instances health check: ${JSON.stringify( + healthStatus + )}`, + true + ); + return res + .status(500) + .json({ status: "unhealthy", details: healthStatus }); + } + } catch (error) { + console.error("Redis health check failed:", error); + await sendSlackWebhook( + `[REDIS DOWN] Redis instances health check: ${error.message}`, + true + ); + return res + .status(500) + .json({ status: "unhealthy", message: error.message }); + } + } + ); console.log(`Worker ${process.pid} started`); } diff --git a/apps/api/src/services/rate-limiter.ts b/apps/api/src/services/rate-limiter.ts index 3a1d353e..2c7dd963 100644 --- a/apps/api/src/services/rate-limiter.ts +++ b/apps/api/src/services/rate-limiter.ts @@ -1,6 +1,6 @@ import { RateLimiterRedis } from "rate-limiter-flexible"; -import * as redis from "redis"; import { RateLimiterMode } from "../../src/types"; +import Redis from "ioredis"; const RATE_LIMITS = { crawl: { @@ -57,14 +57,13 @@ const RATE_LIMITS = { }, }; -export const redisClient = redis.createClient({ - url: process.env.REDIS_RATE_LIMIT_URL, - legacyMode: true, -}); +export const redisRateLimitClient = new Redis( + process.env.REDIS_RATE_LIMIT_URL +) const createRateLimiter = (keyPrefix, points) => new RateLimiterRedis({ - storeClient: redisClient, + storeClient: redisRateLimitClient, keyPrefix, points, duration: 60, // Duration in seconds @@ -76,7 +75,7 @@ export const serverRateLimiter = createRateLimiter( ); export const testSuiteRateLimiter = new RateLimiterRedis({ - storeClient: redisClient, + storeClient: redisRateLimitClient, keyPrefix: "test-suite", points: 10000, duration: 60, // Duration in seconds diff --git a/apps/api/src/services/redis.ts b/apps/api/src/services/redis.ts index 491eeb11..b720a330 100644 --- a/apps/api/src/services/redis.ts +++ b/apps/api/src/services/redis.ts @@ -1,10 +1,8 @@ import Redis from "ioredis"; - -// Initialize Redis client -const redis = new Redis(process.env.REDIS_URL); +import { redisRateLimitClient } from "./rate-limiter"; // Listen to 'error' events to the Redis connection -redis.on("error", (error) => { +redisRateLimitClient.on("error", (error) => { try { if (error.message === "ECONNRESET") { console.log("Connection to Redis Session Store timed out."); @@ -15,16 +13,16 @@ redis.on("error", (error) => { }); // Listen to 'reconnecting' event to Redis -redis.on("reconnecting", (err) => { +redisRateLimitClient.on("reconnecting", (err) => { try { - if (redis.status === "reconnecting") + if (redisRateLimitClient.status === "reconnecting") console.log("Reconnecting to Redis Session Store..."); else console.log("Error reconnecting to Redis Session Store."); } catch (error) {} }); // Listen to the 'connect' event to Redis -redis.on("connect", (err) => { +redisRateLimitClient.on("connect", (err) => { try { if (!err) console.log("Connected to Redis Session Store!"); } catch (error) {} @@ -38,9 +36,9 @@ redis.on("connect", (err) => { */ const setValue = async (key: string, value: string, expire?: number) => { if (expire) { - await redis.set(key, value, "EX", expire); + await redisRateLimitClient.set(key, value, "EX", expire); } else { - await redis.set(key, value); + await redisRateLimitClient.set(key, value); } }; @@ -50,7 +48,7 @@ const setValue = async (key: string, value: string, expire?: number) => { * @returns {Promise} The value, if found, otherwise null. */ const getValue = async (key: string): Promise => { - const value = await redis.get(key); + const value = await redisRateLimitClient.get(key); return value; }; @@ -59,7 +57,7 @@ const getValue = async (key: string): Promise => { * @param {string} key The key to delete. */ const deleteKey = async (key: string) => { - await redis.del(key); + await redisRateLimitClient.del(key); }; export { setValue, getValue, deleteKey };