From fd18f2269bbb56b8150a62dd6da3cde70df17d08 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 12 Jul 2024 19:07:59 -0400 Subject: [PATCH 1/2] Nick: slack alerts --- apps/api/src/index.ts | 5 +++ apps/api/src/main/runWebScraper.ts | 22 ++++++++--- apps/api/src/services/alerts/index.ts | 55 +++++++++++++++++++++++++++ apps/api/src/services/alerts/slack.ts | 23 +++++++++++ 4 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 apps/api/src/services/alerts/index.ts create mode 100644 apps/api/src/services/alerts/slack.ts diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 1cd03eb0..0ef67e6f 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -9,6 +9,8 @@ import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; import os from "os"; import { Job } from "bull"; +import { sendSlackWebhook } from "./services/alerts/slack"; +import { initAlerts } from "./services/alerts"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -32,6 +34,8 @@ if (cluster.isMaster) { cluster.fork(); } }); + + initAlerts(); } else { const app = express(); @@ -290,6 +294,7 @@ if (cluster.isMaster) { }); + console.log(`Worker ${process.pid} started`); } diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 2b141daa..3c98e11e 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -1,5 +1,10 @@ import { Job } from "bull"; -import { CrawlResult, WebScraperOptions, RunWebScraperParams, RunWebScraperResult } from "../types"; +import { + CrawlResult, + WebScraperOptions, + RunWebScraperParams, + RunWebScraperResult, +} from "../types"; import { WebScraperDataProvider } from "../scraper/WebScraper"; import { DocumentUrl, Progress } from "../lib/entities"; import { billTeam } from "../services/billing/credit_billing"; @@ -118,12 +123,19 @@ const saveJob = async (job: Job, result: any) => { .eq("job_id", job.id); if (error) throw new Error(error.message); - await job.moveToCompleted(null); + try { + await job.moveToCompleted(null); + } catch (error) { + // I think the job won't exist here anymore + } } else { - await job.moveToCompleted(result); + try { + await job.moveToCompleted(result); + } catch (error) { + // I think the job won't exist here anymore + } } } catch (error) { console.error("Failed to update job status:", error); } -} - +}; diff --git a/apps/api/src/services/alerts/index.ts b/apps/api/src/services/alerts/index.ts new file mode 100644 index 00000000..2b52d95e --- /dev/null +++ b/apps/api/src/services/alerts/index.ts @@ -0,0 +1,55 @@ +import { getWebScraperQueue } from "../queue-service"; +import { sendSlackWebhook } from "./slack"; + +export function initAlerts() { + if ( + process.env.SLACK_WEBHOOK_URL && + process.env.ENV === "production" && + process.env.ALERT_NUM_ACTIVE_JOBS && + process.env.ALERT_NUM_WAITING_JOBS + ) { + console.info("Initializing alerts"); + const checkActiveJobs = async () => { + try { + const webScraperQueue = getWebScraperQueue(); + const activeJobs = await webScraperQueue.getActiveCount(); + if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) { + console.warn( + `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.` + ); + sendSlackWebhook( + `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}`, + true + ); + } else { + console.info( + `Number of active jobs is under ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}` + ); + } + } catch (error) { + console.error("Failed to check active jobs:", error); + } + }; + + const checkWaitingQueue = async () => { + const webScraperQueue = getWebScraperQueue(); + const waitingJobs = await webScraperQueue.getWaitingCount(); + if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { + console.warn( + `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.` + ); + sendSlackWebhook( + `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}. Scale up the number of workers with fly scale count worker=20`, + true + ); + } + }; + + const checkAll = async () => { + await checkActiveJobs(); + await checkWaitingQueue(); + }; + + setInterval(checkAll, 5 * 60 * 1000); // Run every 5 minutes + } +} diff --git a/apps/api/src/services/alerts/slack.ts b/apps/api/src/services/alerts/slack.ts new file mode 100644 index 00000000..f65035b1 --- /dev/null +++ b/apps/api/src/services/alerts/slack.ts @@ -0,0 +1,23 @@ +import axios from "axios"; + +export async function sendSlackWebhook( + message: string, + alertEveryone: boolean = false +) { + const webhookUrl = process.env.SLACK_WEBHOOK_URL; + const messagePrefix = alertEveryone ? " " : ""; + const payload = { + text: `${messagePrefix} ${message}`, + }; + + try { + const response = await axios.post(webhookUrl, payload, { + headers: { + "Content-Type": "application/json", + }, + }); + console.log("Webhook sent successfully:", response.data); + } catch (error) { + console.error("Error sending webhook:", error); + } +} From fc3328f3d17a6e5a8815ea8e4c3181db1149f2b7 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 12 Jul 2024 19:12:56 -0400 Subject: [PATCH 2/2] Update index.ts --- apps/api/src/services/alerts/index.ts | 84 ++++++++++++++------------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/apps/api/src/services/alerts/index.ts b/apps/api/src/services/alerts/index.ts index 2b52d95e..8d16489d 100644 --- a/apps/api/src/services/alerts/index.ts +++ b/apps/api/src/services/alerts/index.ts @@ -2,54 +2,58 @@ import { getWebScraperQueue } from "../queue-service"; import { sendSlackWebhook } from "./slack"; export function initAlerts() { - if ( - process.env.SLACK_WEBHOOK_URL && - process.env.ENV === "production" && - process.env.ALERT_NUM_ACTIVE_JOBS && - process.env.ALERT_NUM_WAITING_JOBS - ) { - console.info("Initializing alerts"); - const checkActiveJobs = async () => { - try { + try { + if ( + process.env.SLACK_WEBHOOK_URL && + process.env.ENV === "production" && + process.env.ALERT_NUM_ACTIVE_JOBS && + process.env.ALERT_NUM_WAITING_JOBS + ) { + console.info("Initializing alerts"); + const checkActiveJobs = async () => { + try { + const webScraperQueue = getWebScraperQueue(); + const activeJobs = await webScraperQueue.getActiveCount(); + if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) { + console.warn( + `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.` + ); + sendSlackWebhook( + `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}`, + true + ); + } else { + console.info( + `Number of active jobs is under ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}` + ); + } + } catch (error) { + console.error("Failed to check active jobs:", error); + } + }; + + const checkWaitingQueue = async () => { const webScraperQueue = getWebScraperQueue(); - const activeJobs = await webScraperQueue.getActiveCount(); - if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) { + const waitingJobs = await webScraperQueue.getWaitingCount(); + if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { console.warn( - `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.` + `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.` ); sendSlackWebhook( - `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}`, + `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}. Scale up the number of workers with fly scale count worker=20`, true ); - } else { - console.info( - `Number of active jobs is under ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}` - ); } - } catch (error) { - console.error("Failed to check active jobs:", error); - } - }; + }; - const checkWaitingQueue = async () => { - const webScraperQueue = getWebScraperQueue(); - const waitingJobs = await webScraperQueue.getWaitingCount(); - if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { - console.warn( - `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.` - ); - sendSlackWebhook( - `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}. Scale up the number of workers with fly scale count worker=20`, - true - ); - } - }; + const checkAll = async () => { + await checkActiveJobs(); + await checkWaitingQueue(); + }; - const checkAll = async () => { - await checkActiveJobs(); - await checkWaitingQueue(); - }; - - setInterval(checkAll, 5 * 60 * 1000); // Run every 5 minutes + setInterval(checkAll, 5 * 60 * 1000); // Run every 5 minutes + } + } catch (error) { + console.error("Failed to initialize alerts:", error); } }