Files
firecrawl/apps/api/src/controllers/v1/crawl-status-ws.ts
T

230 lines
5.6 KiB
TypeScript
Raw Normal View History

2024-08-17 01:04:14 +02:00
import { authMiddleware } from "../../routes/v1";
import { RateLimiterMode } from "../../types";
2024-08-26 18:48:00 -03:00
import { authenticateUser } from "../auth";
2024-12-11 19:46:11 -03:00
import {
CrawlStatusParams,
CrawlStatusResponse,
Document,
ErrorResponse,
2024-12-11 19:51:08 -03:00
RequestWithAuth,
2024-12-11 19:46:11 -03:00
} from "./types";
2024-08-17 01:04:14 +02:00
import { WebSocket } from "ws";
import { v4 as uuidv4 } from "uuid";
2024-11-07 20:57:33 +01:00
import { logger } from "../../lib/logger";
2024-12-11 19:46:11 -03:00
import {
getCrawl,
getCrawlExpiry,
getCrawlJobs,
getDoneJobsOrdered,
getDoneJobsOrderedLength,
isCrawlFinished,
2024-12-11 19:51:08 -03:00
isCrawlFinishedLocked,
2024-12-11 19:46:11 -03:00
} from "../../lib/crawl-redis";
2024-08-23 18:27:00 +02:00
import { getScrapeQueue } from "../../services/queue-service";
2024-08-17 01:04:14 +02:00
import { getJob, getJobs } from "./crawl-status";
2024-08-23 19:55:41 +02:00
import * as Sentry from "@sentry/node";
2024-11-07 20:57:33 +01:00
import { Job, JobState } from "bullmq";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
2024-08-17 01:04:14 +02:00
type ErrorMessage = {
2024-12-11 19:46:11 -03:00
type: "error";
error: string;
};
2024-08-17 01:04:14 +02:00
type CatchupMessage = {
2024-12-11 19:46:11 -03:00
type: "catchup";
data: CrawlStatusResponse;
};
2024-08-17 01:04:14 +02:00
type DocumentMessage = {
2024-12-11 19:46:11 -03:00
type: "document";
data: Document;
};
2024-08-17 01:04:14 +02:00
2024-12-11 19:46:11 -03:00
type DoneMessage = { type: "done" };
2024-08-17 01:04:14 +02:00
type Message = ErrorMessage | CatchupMessage | DoneMessage | DocumentMessage;
function send(ws: WebSocket, msg: Message) {
if (ws.readyState === 1) {
return new Promise((resolve, reject) => {
ws.send(JSON.stringify(msg), (err) => {
if (err) reject(err);
else resolve(null);
});
});
}
}
function close(ws: WebSocket, code: number, msg: Message) {
if (ws.readyState <= 1) {
ws.close(code, JSON.stringify(msg));
}
}
2024-12-11 19:46:11 -03:00
async function crawlStatusWS(
ws: WebSocket,
2024-12-11 19:51:08 -03:00
req: RequestWithAuth<CrawlStatusParams, undefined, undefined>,
2024-12-11 19:46:11 -03:00
) {
2024-08-17 01:04:14 +02:00
const sc = await getCrawl(req.params.jobId);
if (!sc) {
return close(ws, 1008, { type: "error", error: "Job not found" });
}
if (sc.team_id !== req.auth.team_id) {
return close(ws, 3003, { type: "error", error: "Forbidden" });
}
2024-11-07 20:57:33 +01:00
let doneJobIDs: string[] = [];
2024-08-23 19:55:41 +02:00
let finished = false;
2024-08-17 01:04:14 +02:00
2024-08-23 19:55:41 +02:00
const loop = async () => {
if (finished) return;
const jobIDs = await getCrawlJobs(req.params.jobId);
if (jobIDs.length === doneJobIDs.length) {
return close(ws, 1000, { type: "done" });
}
2024-12-11 19:46:11 -03:00
const notDoneJobIDs = jobIDs.filter((x) => !doneJobIDs.includes(x));
const jobStatuses = await Promise.all(
2024-12-11 19:51:08 -03:00
notDoneJobIDs.map(async (x) => [
x,
await getScrapeQueue().getJobState(x),
]),
2024-12-11 19:46:11 -03:00
);
const newlyDoneJobIDs: string[] = jobStatuses
.filter((x) => x[1] === "completed" || x[1] === "failed")
.map((x) => x[0]);
const newlyDoneJobs: Job[] = (
await Promise.all(newlyDoneJobIDs.map((x) => getJob(x)))
).filter((x) => x !== undefined) as Job[];
2024-08-23 19:55:41 +02:00
2024-11-07 20:57:33 +01:00
for (const job of newlyDoneJobs) {
2024-08-23 19:55:41 +02:00
if (job.returnvalue) {
2024-08-17 01:04:14 +02:00
send(ws, {
type: "document",
2024-12-11 19:51:08 -03:00
data: job.returnvalue,
2024-12-11 19:46:11 -03:00
});
2024-08-17 01:04:14 +02:00
} else {
// Crawl errors are ignored.
2024-08-17 01:04:14 +02:00
}
}
doneJobIDs.push(...newlyDoneJobIDs);
2024-08-23 19:55:41 +02:00
setTimeout(loop, 1000);
};
2024-08-17 01:04:14 +02:00
2024-08-23 19:55:41 +02:00
setTimeout(loop, 1000);
2024-08-17 01:04:14 +02:00
doneJobIDs = await getDoneJobsOrdered(req.params.jobId);
2024-10-03 16:37:58 -03:00
let jobIDs = await getCrawlJobs(req.params.jobId);
2024-12-11 19:46:11 -03:00
let jobStatuses = await Promise.all(
2024-12-11 19:51:08 -03:00
jobIDs.map(
async (x) => [x, await getScrapeQueue().getJobState(x)] as const,
),
2024-12-11 19:46:11 -03:00
);
const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
2024-11-07 20:57:33 +01:00
const validJobStatuses: [string, JobState | "unknown"][] = [];
const validJobIDs: string[] = [];
2024-10-15 10:12:27 -03:00
for (const [id, status] of jobStatuses) {
if (throttledJobsSet.has(id)) {
validJobStatuses.push([id, "prioritized"]);
validJobIDs.push(id);
} else if (
2024-12-11 19:46:11 -03:00
status !== "failed" &&
status !== "unknown"
) {
2024-10-15 10:12:27 -03:00
validJobStatuses.push([id, status]);
validJobIDs.push(id);
}
}
2024-12-11 19:46:11 -03:00
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] =
sc.cancelled
? "cancelled"
: validJobStatuses.every((x) => x[1] === "completed")
? "completed"
: "scraping";
2024-10-15 10:12:27 -03:00
jobIDs = validJobIDs; // Use validJobIDs instead of jobIDs for further processing
2024-08-17 01:04:14 +02:00
const doneJobs = await getJobs(doneJobIDs);
2024-12-11 19:46:11 -03:00
const data = doneJobs.map((x) => x.returnvalue);
2024-08-17 01:04:14 +02:00
await send(ws, {
2024-08-17 01:04:14 +02:00
type: "catchup",
data: {
2024-09-10 18:59:09 +02:00
success: true,
2024-08-17 01:04:14 +02:00
status,
total: jobIDs.length,
completed: doneJobIDs.length,
2024-08-17 01:04:14 +02:00
creditsUsed: jobIDs.length,
expiresAt: (await getCrawlExpiry(req.params.jobId)).toISOString(),
2024-12-11 19:51:08 -03:00
data: data,
},
2024-08-17 01:04:14 +02:00
});
if (status !== "scraping") {
2024-08-23 19:55:41 +02:00
finished = true;
2024-08-17 01:04:14 +02:00
return close(ws, 1000, { type: "done" });
}
}
// Basically just middleware and error wrapping
2024-12-11 19:46:11 -03:00
export async function crawlStatusWSController(
ws: WebSocket,
2024-12-11 19:51:08 -03:00
req: RequestWithAuth<CrawlStatusParams, undefined, undefined>,
2024-12-11 19:46:11 -03:00
) {
2024-08-17 01:04:14 +02:00
try {
2024-12-11 19:46:11 -03:00
const auth = await authenticateUser(req, null, RateLimiterMode.CrawlStatus);
2024-08-17 01:04:14 +02:00
2024-11-07 20:57:33 +01:00
if (!auth.success) {
2024-08-17 01:04:14 +02:00
return close(ws, 3000, {
type: "error",
2024-12-11 19:51:08 -03:00
error: auth.error,
2024-08-17 01:04:14 +02:00
});
}
2024-11-07 20:57:33 +01:00
const { team_id, plan } = auth;
2024-08-17 01:04:14 +02:00
req.auth = { team_id, plan };
await crawlStatusWS(ws, req);
} catch (err) {
2024-08-23 19:55:41 +02:00
Sentry.captureException(err);
2024-08-17 01:04:14 +02:00
const id = uuidv4();
let verbose = JSON.stringify(err);
if (verbose === "{}") {
if (err instanceof Error) {
verbose = JSON.stringify({
message: err.message,
name: err.name,
2024-12-11 19:51:08 -03:00
stack: err.stack,
2024-08-17 01:04:14 +02:00
});
}
}
2024-12-11 19:46:11 -03:00
logger.error(
"Error occurred in WebSocket! (" +
req.path +
") -- ID " +
id +
" -- " +
2024-12-11 19:51:08 -03:00
verbose,
2024-12-11 19:46:11 -03:00
);
2024-08-17 01:04:14 +02:00
return close(ws, 1011, {
type: "error",
2024-12-11 19:46:11 -03:00
error:
"An unexpected error occurred. Please contact help@firecrawl.com for help. Your exception ID is " +
2024-12-11 19:51:08 -03:00
id,
2024-08-17 01:04:14 +02:00
});
}
}