Files
firecrawl/apps/api/src/main/runWebScraper.ts
T

161 lines
4.6 KiB
TypeScript
Raw Normal View History

2024-07-30 13:27:23 -04:00
import { Job } from "bullmq";
2024-07-12 19:07:59 -04:00
import {
CrawlResult,
WebScraperOptions,
RunWebScraperParams,
RunWebScraperResult,
} from "../types";
2024-04-15 17:01:47 -04:00
import { WebScraperDataProvider } from "../scraper/WebScraper";
2024-04-20 11:59:42 -07:00
import { DocumentUrl, Progress } from "../lib/entities";
2024-04-15 17:01:47 -04:00
import { billTeam } from "../services/billing/credit_billing";
2024-04-20 11:59:42 -07:00
import { Document } from "../lib/entities";
import { supabase_service } from "../services/supabase";
2024-07-23 17:30:46 -03:00
import { Logger } from "../lib/logger";
2024-07-24 15:25:36 -03:00
import { ScrapeEvents } from "../lib/scrape-events";
2024-08-06 16:26:46 +02:00
import { getWebScraperQueue } from "../services/queue-service";
2024-04-15 17:01:47 -04:00
export async function startWebScraperPipeline({
job,
2024-07-30 13:27:23 -04:00
token,
2024-04-15 17:01:47 -04:00
}: {
job: Job<WebScraperOptions>;
2024-07-30 13:27:23 -04:00
token: string;
2024-04-15 17:01:47 -04:00
}) {
2024-05-04 12:30:12 -07:00
let partialDocs: Document[] = [];
2024-04-15 17:01:47 -04:00
return (await runWebScraper({
url: job.data.url,
mode: job.data.mode,
crawlerOptions: job.data.crawlerOptions,
2024-04-17 18:24:46 -07:00
pageOptions: job.data.pageOptions,
2024-04-15 17:01:47 -04:00
inProgress: (progress) => {
2024-07-23 17:30:46 -03:00
Logger.debug(`🐂 Job in progress ${job.id}`);
2024-05-13 13:57:22 -07:00
if (progress.currentDocument) {
partialDocs.push(progress.currentDocument);
2024-06-05 11:11:09 -07:00
if (partialDocs.length > 50) {
partialDocs = partialDocs.slice(-50);
}
2024-07-30 13:27:23 -04:00
job.updateProgress({ ...progress, partialDocs: partialDocs });
2024-05-13 13:57:22 -07:00
}
2024-04-15 17:01:47 -04:00
},
2024-07-30 14:44:13 -04:00
onSuccess: (result, mode) => {
2024-07-23 17:30:46 -03:00
Logger.debug(`🐂 Job completed ${job.id}`);
2024-07-30 14:44:13 -04:00
saveJob(job, result, token, mode);
2024-04-15 17:01:47 -04:00
},
onError: (error) => {
2024-07-23 17:30:46 -03:00
Logger.error(`🐂 Job failed ${job.id}`);
2024-07-24 15:25:36 -03:00
ScrapeEvents.logJobEvent(job, "failed");
2024-07-30 13:27:23 -04:00
job.moveToFailed(error, token, false);
2024-04-15 17:01:47 -04:00
},
team_id: job.data.team_id,
2024-05-13 13:57:22 -07:00
bull_job_id: job.id.toString(),
2024-04-20 13:53:11 -07:00
})) as { success: boolean; message: string; docs: Document[] };
2024-04-15 17:01:47 -04:00
}
export async function runWebScraper({
url,
mode,
crawlerOptions,
2024-04-17 18:24:46 -07:00
pageOptions,
2024-04-15 17:01:47 -04:00
inProgress,
onSuccess,
onError,
team_id,
2024-05-06 17:16:43 -07:00
bull_job_id,
2024-06-25 12:20:25 -07:00
}: RunWebScraperParams): Promise<RunWebScraperResult> {
2024-04-15 17:01:47 -04:00
try {
const provider = new WebScraperDataProvider();
if (mode === "crawl") {
await provider.setOptions({
2024-07-24 14:31:25 +02:00
jobId: bull_job_id,
2024-04-15 17:01:47 -04:00
mode: mode,
urls: [url],
crawlerOptions: crawlerOptions,
2024-04-17 18:24:46 -07:00
pageOptions: pageOptions,
2024-05-13 13:57:22 -07:00
bullJobId: bull_job_id,
2024-04-15 17:01:47 -04:00
});
} else {
await provider.setOptions({
2024-07-24 14:31:25 +02:00
jobId: bull_job_id,
2024-04-15 17:01:47 -04:00
mode: mode,
urls: url.split(","),
crawlerOptions: crawlerOptions,
2024-05-13 13:57:22 -07:00
pageOptions: pageOptions,
2024-04-15 17:01:47 -04:00
});
}
const docs = (await provider.getDocuments(false, (progress: Progress) => {
inProgress(progress);
2024-04-20 11:59:42 -07:00
})) as Document[];
2024-04-20 11:46:06 -07:00
2024-04-15 17:01:47 -04:00
if (docs.length === 0) {
return {
success: true,
message: "No pages found",
2024-05-13 13:57:22 -07:00
docs: [],
2024-04-15 17:01:47 -04:00
};
}
// remove docs with empty content
2024-04-20 11:59:42 -07:00
const filteredDocs = crawlerOptions.returnOnlyUrls
? docs.map((doc) => {
if (doc.metadata.sourceURL) {
return { url: doc.metadata.sourceURL };
}
})
: docs.filter((doc) => doc.content.trim().length > 0);
2024-08-06 16:26:46 +02:00
const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + bull_job_id);
2024-04-20 11:59:42 -07:00
2024-08-06 16:26:46 +02:00
if (!isCancelled) {
const billingResult = await billTeam(team_id, filteredDocs.length);
2024-04-20 13:53:11 -07:00
2024-08-06 16:26:46 +02:00
if (!billingResult.success) {
// throw new Error("Failed to bill team, no subscription was found");
return {
success: false,
message: "Failed to bill team, no subscription was found",
docs: [],
};
}
2024-04-15 17:01:47 -04:00
}
2024-04-20 13:53:11 -07:00
// This is where the returnvalue from the job is set
2024-07-30 14:44:13 -04:00
onSuccess(filteredDocs, mode);
2024-04-20 13:53:11 -07:00
// this return doesn't matter too much for the job completion result
2024-04-20 11:59:42 -07:00
return { success: true, message: "", docs: filteredDocs };
2024-04-15 17:01:47 -04:00
} catch (error) {
onError(error);
return { success: false, message: error.message, docs: [] };
}
}
2024-07-30 14:44:13 -04:00
const saveJob = async (job: Job, result: any, token: string, mode: string) => {
2024-06-27 16:00:45 -03:00
try {
2024-07-12 15:39:10 -04:00
if (process.env.USE_DB_AUTHENTICATION === "true") {
2024-06-27 16:00:45 -03:00
const { data, error } = await supabase_service
.from("firecrawl_jobs")
.update({ docs: result })
.eq("job_id", job.id);
2024-06-27 16:00:45 -03:00
if (error) throw new Error(error.message);
2024-07-12 19:07:59 -04:00
try {
2024-07-30 14:44:13 -04:00
if (mode === "crawl") {
await job.moveToCompleted(null, token, false);
} else {
await job.moveToCompleted(result, token, false);
}
2024-07-12 19:07:59 -04:00
} catch (error) {
// I think the job won't exist here anymore
}
2024-06-27 16:00:45 -03:00
} else {
2024-07-12 19:07:59 -04:00
try {
2024-07-30 13:27:23 -04:00
await job.moveToCompleted(result, token, false);
2024-07-12 19:07:59 -04:00
} catch (error) {
// I think the job won't exist here anymore
}
2024-06-27 16:00:45 -03:00
}
2024-07-24 15:25:36 -03:00
ScrapeEvents.logJobEvent(job, "completed");
2024-06-27 16:00:45 -03:00
} catch (error) {
2024-07-23 17:30:46 -03:00
Logger.error(`🐂 Failed to update job status: ${error}`);
}
2024-07-12 19:07:59 -04:00
};