Nick: webhooks v1 working great

This commit is contained in:
Nicolas
2024-09-01 13:44:36 -03:00
parent faae98ecb8
commit 95b9dc915d
4 changed files with 82 additions and 25 deletions
+5
View File
@@ -22,6 +22,7 @@ import { getScrapeQueue } from "../../services/queue-service";
import { addScrapeJob } from "../../services/queue-jobs"; import { addScrapeJob } from "../../services/queue-jobs";
import { Logger } from "../../lib/logger"; import { Logger } from "../../lib/logger";
import { getJobPriority } from "../../lib/job-priority"; import { getJobPriority } from "../../lib/job-priority";
import { callWebhook } from "../../services/webhook";
export async function crawlController( export async function crawlController(
req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>, req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>,
@@ -150,6 +151,10 @@ export async function crawlController(
await addCrawlJob(id, job.id); await addCrawlJob(id, job.id);
} }
if(req.body.webhook) {
await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "crawl.started");
}
return res.status(200).json({ return res.status(200).json({
success: true, success: true,
id, id,
+30 -7
View File
@@ -217,10 +217,15 @@ async function processJob(job: Job, token: string) {
docs, docs,
}; };
if (job.data.mode === "crawl") {
await callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1);
// No idea what this does and when it is called.
if (job.data.mode === "crawl" && !job.data.v1) {
callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1).catch((error) => {
Logger.error(`Error calling webhook for job (1 - mode crawl - v0) ${job.id} - ${error}`);
});
} }
if (job.data.webhook && job.data.mode !== "crawl") { if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1);
} }
@@ -344,12 +349,23 @@ async function processJob(job: Job, token: string) {
error: message /* etc... */, error: message /* etc... */,
docs: fullDocs, docs: fullDocs,
}; };
// v0 web hooks, call when done with all the data
if (!job.data.v1) {
callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed").catch((error) => {
Logger.error(`Error calling webhook for job ${job.id} - ${error}`);
});
}
// v1 web hooks, call when done with no data, but with event completed
if (job.data.v1 && job.data.webhook) {
callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.completed").catch((error) => {
Logger.error(`Error calling webhook for job ${job.id} - ${error}`);
});
}
} }
} }
if (!job.data.v1) {
await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1);
}
Logger.info(`🐂 Job done ${job.id}`); Logger.info(`🐂 Job done ${job.id}`);
return data; return data;
@@ -391,7 +407,14 @@ async function processJob(job: Job, token: string) {
}; };
if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) {
await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1).catch((error) => {
Logger.error(`Error calling webhook for job (catch - v0) ${job.id} - ${error}`);
});
}
if(job.data.v1) {
callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.failed").catch((error) => {
Logger.error(`Error calling webhook for job (catch - v1) ${job.id} - ${error}`);
});
} }
if (job.data.crawl_id) { if (job.data.crawl_id) {
+41 -15
View File
@@ -1,11 +1,23 @@
import axios from "axios";
import { legacyDocumentConverter } from "../../src/controllers/v1/types"; import { legacyDocumentConverter } from "../../src/controllers/v1/types";
import { Logger } from "../../src/lib/logger"; import { Logger } from "../../src/lib/logger";
import { supabase_service } from "./supabase"; import { supabase_service } from "./supabase";
import { WebhookEventType } from "../types";
export const callWebhook = async (teamId: string, id: string, data: any, specified?: string, v1 = false) => { export const callWebhook = async (
teamId: string,
id: string,
data: any | null,
specified?: string,
v1 = false,
eventType: WebhookEventType = "crawl.page"
) => {
try { try {
const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace("{{JOB_ID}}", id); const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace(
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === 'true'; "{{JOB_ID}}",
id
);
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
let webhookUrl = specified ?? selfHostedUrl; let webhookUrl = specified ?? selfHostedUrl;
// Only fetch the webhook URL from the database if the self-hosted webhook URL and specified webhook are not set // Only fetch the webhook URL from the database if the self-hosted webhook URL and specified webhook are not set
@@ -17,7 +29,9 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi
.eq("team_id", teamId) .eq("team_id", teamId)
.limit(1); .limit(1);
if (error) { if (error) {
Logger.error(`Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}`); Logger.error(
`Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}`
);
return null; return null;
} }
@@ -29,10 +43,12 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi
} }
let dataToSend = []; let dataToSend = [];
if (data.result.links && data.result.links.length !== 0) { if (data && data.result && data.result.links && data.result.links.length !== 0) {
for (let i = 0; i < data.result.links.length; i++) { for (let i = 0; i < data.result.links.length; i++) {
if (v1) { if (v1) {
dataToSend.push(legacyDocumentConverter(data.result.links[i].content)) dataToSend.push(
legacyDocumentConverter(data.result.links[i].content)
);
} else { } else {
dataToSend.push({ dataToSend.push({
content: data.result.links[i].content.content, content: data.result.links[i].content.content,
@@ -43,19 +59,29 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi
} }
} }
await fetch(webhookUrl, { axios.post(
method: "POST", webhookUrl,
{
success: !v1 ? data.success : eventType === "crawl.page" ? data.success : true,
type: eventType,
[v1 ? 'id' : 'jobId']: id,
data: dataToSend,
error: !v1 ? data?.error || undefined : eventType === "crawl.page" ? data?.error || undefined : undefined,
},
{
headers: { headers: {
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
body: JSON.stringify({ timeout: 10000, // 10 seconds timeout
success: data.success, }
id: id, ).catch((error) => {
data: dataToSend, Logger.error(
error: data.error || undefined, `Error sending webhook for team ID: ${teamId}, error: ${error.message}`
}), );
}); });
} catch (error) { } catch (error) {
Logger.debug(`Error sending webhook for team ID: ${teamId}, error: ${error.message}`); Logger.debug(
`Error sending webhook for team ID: ${teamId}, error: ${error.message}`
);
} }
}; };
+3
View File
@@ -154,3 +154,6 @@ export type PlanType =
| "growthdouble" | "growthdouble"
| "free" | "free"
| ""; | "";
export type WebhookEventType = "crawl.page" | "crawl.started" | "crawl.completed" | "crawl.failed";