Files
firecrawl/apps/api/src/main/runWebScraper.ts
T

237 lines
6.3 KiB
TypeScript
Raw Normal View History

2024-07-30 13:27:23 -04:00
import { Job } from "bullmq";
2024-07-12 19:07:59 -04:00
import {
WebScraperOptions,
RunWebScraperParams,
2024-12-11 19:51:08 -03:00
RunWebScraperResult,
2024-07-12 19:07:59 -04:00
} from "../types";
2024-04-15 17:01:47 -04:00
import { billTeam } from "../services/billing/credit_billing";
2024-11-07 20:57:33 +01:00
import { Document } from "../controllers/v1/types";
import { supabase_service } from "../services/supabase";
2024-11-07 20:57:33 +01:00
import { logger } from "../lib/logger";
2024-07-24 15:25:36 -03:00
import { ScrapeEvents } from "../lib/scrape-events";
2024-09-04 15:57:57 -03:00
import { configDotenv } from "dotenv";
2024-12-11 19:46:11 -03:00
import {
EngineResultsTracker,
scrapeURL,
2024-12-11 19:51:08 -03:00
ScrapeUrlResponse,
2024-12-11 19:46:11 -03:00
} from "../scraper/scrapeURL";
2024-11-07 20:57:33 +01:00
import { Engine } from "../scraper/scrapeURL/engines";
2024-09-04 15:57:57 -03:00
configDotenv();
2024-04-15 17:01:47 -04:00
export async function startWebScraperPipeline({
job,
2024-12-11 19:51:08 -03:00
token,
2024-04-15 17:01:47 -04:00
}: {
2024-11-07 20:57:33 +01:00
job: Job<WebScraperOptions> & { id: string };
2024-07-30 13:27:23 -04:00
token: string;
2024-04-15 17:01:47 -04:00
}) {
2024-12-11 19:46:11 -03:00
return await runWebScraper({
2024-04-15 17:01:47 -04:00
url: job.data.url,
mode: job.data.mode,
2024-11-07 20:57:33 +01:00
scrapeOptions: {
...job.data.scrapeOptions,
2024-12-11 19:46:11 -03:00
...(job.data.crawl_id
? {
2024-12-11 19:51:08 -03:00
formats: job.data.scrapeOptions.formats.concat(["rawHtml"]),
2024-12-11 19:46:11 -03:00
}
2024-12-11 19:51:08 -03:00
: {}),
2024-08-16 23:29:30 +02:00
},
2024-11-07 20:57:33 +01:00
internalOptions: job.data.internalOptions,
// onSuccess: (result, mode) => {
// logger.debug(`🐂 Job completed ${job.id}`);
// saveJob(job, result, token, mode);
// },
// onError: (error) => {
// logger.error(`🐂 Job failed ${job.id}`);
// ScrapeEvents.logJobEvent(job, "failed");
// },
2024-04-15 17:01:47 -04:00
team_id: job.data.team_id,
2024-05-13 13:57:22 -07:00
bull_job_id: job.id.toString(),
2024-08-15 19:04:46 +02:00
priority: job.opts.priority,
2024-12-11 19:51:08 -03:00
is_scrape: job.data.is_scrape ?? false,
is_crawl: !!(job.data.crawl_id && job.data.crawlerOptions !== null),
2024-12-11 19:46:11 -03:00
});
2024-04-15 17:01:47 -04:00
}
2024-10-03 16:37:58 -03:00
2024-04-15 17:01:47 -04:00
export async function runWebScraper({
url,
mode,
2024-11-07 20:57:33 +01:00
scrapeOptions,
internalOptions,
// onSuccess,
// onError,
2024-04-15 17:01:47 -04:00
team_id,
2024-05-06 17:16:43 -07:00
bull_job_id,
2024-08-15 19:04:46 +02:00
priority,
2024-12-11 19:51:08 -03:00
is_scrape = false,
is_crawl = false,
2024-11-07 20:57:33 +01:00
}: RunWebScraperParams): Promise<ScrapeUrlResponse> {
const tries = is_crawl ? 3 : 1;
2024-11-07 20:57:33 +01:00
let response: ScrapeUrlResponse | undefined = undefined;
let engines: EngineResultsTracker = {};
let error: any = undefined;
2024-04-15 17:01:47 -04:00
for (let i = 0; i < tries; i++) {
if (i > 0) {
2024-12-15 02:54:49 -03:00
logger.debug("Retrying scrape...", {
scrapeId: bull_job_id,
jobId: bull_job_id,
method: "runWebScraper",
module: "runWebScraper",
tries,
i,
previousStatusCode: (response as any)?.document?.metadata?.statusCode,
previousError: error,
});
2024-04-15 17:01:47 -04:00
}
response = undefined;
engines = {};
error = undefined;
2024-04-20 13:53:11 -07:00
try {
response = await scrapeURL(bull_job_id, url, scrapeOptions, {
priority,
...internalOptions,
});
if (!response.success) {
if (response.error instanceof Error) {
throw response.error;
} else {
throw new Error(
"scrapeURL error: " +
(Array.isArray(response.error)
? JSON.stringify(response.error)
: typeof response.error === "object"
? JSON.stringify({ ...response.error })
: response.error),
);
}
}
2024-12-15 02:54:49 -03:00
// This is where the returnvalue from the job is set
// onSuccess(response.document, mode);
2024-12-15 02:54:49 -03:00
engines = response.engines;
2024-11-07 20:57:33 +01:00
2024-12-15 02:54:49 -03:00
if (
(response.document.metadata.statusCode >= 200 &&
response.document.metadata.statusCode < 300) ||
response.document.metadata.statusCode === 304
) {
// status code is good -- do not attempt retry
break;
}
} catch (error) {
engines =
response !== undefined
? response.engines
: typeof error === "object" && error !== null
? ((error as any).results ?? {})
: {};
2024-11-07 20:57:33 +01:00
}
}
const engineOrder = Object.entries(engines)
2024-12-15 02:54:49 -03:00
.sort((a, b) => a[1].startedAt - b[1].startedAt)
.map((x) => x[0]) as Engine[];
2024-11-07 20:57:33 +01:00
2024-12-15 02:54:49 -03:00
for (const engine of engineOrder) {
const result = engines[engine] as Exclude<
EngineResultsTracker[Engine],
undefined
>;
ScrapeEvents.insert(bull_job_id, {
type: "scrape",
url,
method: engine,
result: {
success: result.state === "success",
response_code:
result.state === "success" ? result.result.statusCode : undefined,
response_size:
result.state === "success" ? result.result.html.length : undefined,
error:
result.state === "error"
? result.error
: result.state === "timeout"
? "Timed out"
: undefined,
time_taken: result.finishedAt - result.startedAt,
},
});
}
if (error === undefined && response?.success) {
if (is_scrape === false) {
let creditsToBeBilled = 1; // Assuming 1 credit per document
if (scrapeOptions.extract) {
creditsToBeBilled = 5;
}
billTeam(team_id, undefined, creditsToBeBilled).catch((error) => {
logger.error(
`Failed to bill team ${team_id} for ${creditsToBeBilled} credits: ${error}`,
);
// Optionally, you could notify an admin or add to a retry queue here
});
}
return response;
} else {
if (response !== undefined) {
return {
...response,
success: false,
error,
};
} else {
return {
success: false,
error,
logs: ["no logs -- error coming from runWebScraper"],
engines,
};
}
2024-04-15 17:01:47 -04:00
}
}
2024-12-11 19:46:11 -03:00
const saveJob = async (
job: Job,
result: any,
token: string,
mode: string,
2024-12-11 19:51:08 -03:00
engines?: EngineResultsTracker,
2024-12-11 19:46:11 -03:00
) => {
2024-06-27 16:00:45 -03:00
try {
2024-12-11 19:46:11 -03:00
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
if (useDbAuthentication) {
2024-06-27 16:00:45 -03:00
const { data, error } = await supabase_service
.from("firecrawl_jobs")
.update({ docs: result })
.eq("job_id", job.id);
2024-06-27 16:00:45 -03:00
if (error) throw new Error(error.message);
2024-08-07 18:49:22 +02:00
// try {
// if (mode === "crawl") {
// await job.moveToCompleted(null, token, false);
// } else {
// await job.moveToCompleted(result, token, false);
// }
// } catch (error) {
// // I think the job won't exist here anymore
// }
2024-12-11 19:46:11 -03:00
// } else {
// try {
// await job.moveToCompleted(result, token, false);
// } catch (error) {
// // I think the job won't exist here anymore
// }
2024-06-27 16:00:45 -03:00
}
2024-07-24 15:25:36 -03:00
ScrapeEvents.logJobEvent(job, "completed");
2024-06-27 16:00:45 -03:00
} catch (error) {
2024-11-07 20:57:33 +01:00
logger.error(`🐂 Failed to update job status: ${error}`);
}
2024-07-12 19:07:59 -04:00
};