Files
firecrawl/apps/api/src/main/runWebScraper.ts
T

241 lines
6.5 KiB
TypeScript
Raw Normal View History

2024-07-30 13:27:23 -04:00
import { Job } from "bullmq";
2024-07-12 19:07:59 -04:00
import {
WebScraperOptions,
RunWebScraperParams,
2024-12-11 19:51:08 -03:00
RunWebScraperResult,
2024-07-12 19:07:59 -04:00
} from "../types";
2024-04-15 17:01:47 -04:00
import { billTeam } from "../services/billing/credit_billing";
2024-11-07 20:57:33 +01:00
import { Document } from "../controllers/v1/types";
import { supabase_service } from "../services/supabase";
2024-12-17 22:01:41 +01:00
import { logger as _logger } from "../lib/logger";
2024-07-24 15:25:36 -03:00
import { ScrapeEvents } from "../lib/scrape-events";
2024-09-04 15:57:57 -03:00
import { configDotenv } from "dotenv";
2024-12-11 19:46:11 -03:00
import {
EngineResultsTracker,
scrapeURL,
2024-12-11 19:51:08 -03:00
ScrapeUrlResponse,
2024-12-11 19:46:11 -03:00
} from "../scraper/scrapeURL";
2024-11-07 20:57:33 +01:00
import { Engine } from "../scraper/scrapeURL/engines";
2024-12-30 21:42:01 -03:00
import { indexPage } from "../lib/extract/index/pinecone";
2025-04-17 09:23:53 -07:00
import { CostTracking } from "../lib/extract/extraction-service";
2024-09-04 15:57:57 -03:00
configDotenv();
2024-04-15 17:01:47 -04:00
export async function startWebScraperPipeline({
job,
2024-12-11 19:51:08 -03:00
token,
2025-04-17 09:23:53 -07:00
costTracking,
2024-04-15 17:01:47 -04:00
}: {
2024-11-07 20:57:33 +01:00
job: Job<WebScraperOptions> & { id: string };
2024-07-30 13:27:23 -04:00
token: string;
2025-04-17 09:23:53 -07:00
costTracking: CostTracking;
2024-04-15 17:01:47 -04:00
}) {
2024-12-11 19:46:11 -03:00
return await runWebScraper({
2024-04-15 17:01:47 -04:00
url: job.data.url,
mode: job.data.mode,
2024-11-07 20:57:33 +01:00
scrapeOptions: {
...job.data.scrapeOptions,
2024-12-11 19:46:11 -03:00
...(job.data.crawl_id
? {
2024-12-11 19:51:08 -03:00
formats: job.data.scrapeOptions.formats.concat(["rawHtml"]),
2024-12-11 19:46:11 -03:00
}
2024-12-11 19:51:08 -03:00
: {}),
2024-08-16 23:29:30 +02:00
},
2024-11-07 20:57:33 +01:00
internalOptions: job.data.internalOptions,
// onSuccess: (result, mode) => {
// logger.debug(`🐂 Job completed ${job.id}`);
// saveJob(job, result, token, mode);
// },
// onError: (error) => {
// logger.error(`🐂 Job failed ${job.id}`);
// ScrapeEvents.logJobEvent(job, "failed");
// },
2024-04-15 17:01:47 -04:00
team_id: job.data.team_id,
2024-05-13 13:57:22 -07:00
bull_job_id: job.id.toString(),
2024-08-15 19:04:46 +02:00
priority: job.opts.priority,
2024-12-11 19:51:08 -03:00
is_scrape: job.data.is_scrape ?? false,
is_crawl: !!(job.data.crawl_id && job.data.crawlerOptions !== null),
2025-04-02 19:52:43 +02:00
urlInvisibleInCurrentCrawl: job.data.crawlerOptions?.urlInvisibleInCurrentCrawl ?? false,
2025-04-17 09:23:53 -07:00
costTracking,
2024-12-11 19:46:11 -03:00
});
2024-04-15 17:01:47 -04:00
}
2024-10-03 16:37:58 -03:00
2024-04-15 17:01:47 -04:00
export async function runWebScraper({
url,
mode,
2024-11-07 20:57:33 +01:00
scrapeOptions,
internalOptions,
// onSuccess,
// onError,
2024-04-15 17:01:47 -04:00
team_id,
2024-05-06 17:16:43 -07:00
bull_job_id,
2024-08-15 19:04:46 +02:00
priority,
2024-12-11 19:51:08 -03:00
is_scrape = false,
is_crawl = false,
2025-04-02 19:52:43 +02:00
urlInvisibleInCurrentCrawl = false,
2025-04-17 09:23:53 -07:00
costTracking,
2024-11-07 20:57:33 +01:00
}: RunWebScraperParams): Promise<ScrapeUrlResponse> {
2024-12-17 22:01:41 +01:00
const logger = _logger.child({
method: "runWebScraper",
module: "runWebscraper",
scrapeId: bull_job_id,
jobId: bull_job_id,
2024-12-20 18:09:49 -03:00
});
const tries = is_crawl ? 3 : 1;
2024-11-07 20:57:33 +01:00
let response: ScrapeUrlResponse | undefined = undefined;
let engines: EngineResultsTracker = {};
let error: any = undefined;
2024-04-15 17:01:47 -04:00
for (let i = 0; i < tries; i++) {
if (i > 0) {
2024-12-15 02:54:49 -03:00
logger.debug("Retrying scrape...", {
tries,
i,
previousStatusCode: (response as any)?.document?.metadata?.statusCode,
previousError: error,
});
2024-04-15 17:01:47 -04:00
}
response = undefined;
engines = {};
error = undefined;
2024-04-20 13:53:11 -07:00
try {
response = await scrapeURL(bull_job_id, url, scrapeOptions, {
priority,
...internalOptions,
2025-04-02 19:52:43 +02:00
urlInvisibleInCurrentCrawl,
teamId: internalOptions?.teamId ?? team_id,
2025-04-17 09:23:53 -07:00
}, costTracking);
if (!response.success) {
if (response.error instanceof Error) {
throw response.error;
} else {
throw new Error(
"scrapeURL error: " +
(Array.isArray(response.error)
? JSON.stringify(response.error)
: typeof response.error === "object"
? JSON.stringify({ ...response.error })
: response.error),
);
}
}
2024-12-15 02:54:49 -03:00
// This is where the returnvalue from the job is set
// onSuccess(response.document, mode);
2024-12-15 02:54:49 -03:00
engines = response.engines;
2024-11-07 20:57:33 +01:00
2024-12-15 02:54:49 -03:00
if (
(response.document.metadata.statusCode >= 200 &&
response.document.metadata.statusCode < 300) ||
response.document.metadata.statusCode === 304
) {
// status code is good -- do not attempt retry
break;
}
2024-12-16 22:24:00 +01:00
} catch (_error) {
error = _error;
engines =
response !== undefined
? response.engines
: typeof error === "object" && error !== null
? ((error as any).results ?? {})
: {};
2024-11-07 20:57:33 +01:00
}
}
const engineOrder = Object.entries(engines)
2024-12-15 02:54:49 -03:00
.sort((a, b) => a[1].startedAt - b[1].startedAt)
.map((x) => x[0]) as Engine[];
2024-11-07 20:57:33 +01:00
2024-12-15 02:54:49 -03:00
for (const engine of engineOrder) {
const result = engines[engine] as Exclude<
EngineResultsTracker[Engine],
undefined
>;
2025-03-06 17:36:08 -03:00
ScrapeEvents.insert(bull_job_id, {
type: "scrape",
url,
method: engine,
result: {
success: result.state === "success",
response_code:
result.state === "success" ? result.result.statusCode : undefined,
response_size:
result.state === "success" ? result.result.html.length : undefined,
error:
result.state === "error"
? result.error
: result.state === "timeout"
? "Timed out"
: undefined,
time_taken: result.finishedAt - result.startedAt,
},
});
2024-12-15 02:54:49 -03:00
}
if (error === undefined && response?.success) {
return response;
} else {
if (response !== undefined) {
return {
...response,
success: false,
error,
};
} else {
return {
success: false,
error,
logs: ["no logs -- error coming from runWebScraper"],
engines,
};
}
2024-04-15 17:01:47 -04:00
}
}
2024-12-11 19:46:11 -03:00
const saveJob = async (
job: Job,
result: any,
token: string,
mode: string,
2024-12-11 19:51:08 -03:00
engines?: EngineResultsTracker,
2024-12-11 19:46:11 -03:00
) => {
2024-06-27 16:00:45 -03:00
try {
2024-12-11 19:46:11 -03:00
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
if (useDbAuthentication) {
2024-06-27 16:00:45 -03:00
const { data, error } = await supabase_service
.from("firecrawl_jobs")
.update({ docs: result })
.eq("job_id", job.id);
2024-06-27 16:00:45 -03:00
if (error) throw new Error(error.message);
2024-08-07 18:49:22 +02:00
// try {
// if (mode === "crawl") {
// await job.moveToCompleted(null, token, false);
// } else {
// await job.moveToCompleted(result, token, false);
// }
// } catch (error) {
// // I think the job won't exist here anymore
// }
2024-12-11 19:46:11 -03:00
// } else {
// try {
// await job.moveToCompleted(result, token, false);
// } catch (error) {
// // I think the job won't exist here anymore
// }
2024-06-27 16:00:45 -03:00
}
2025-03-06 17:36:08 -03:00
ScrapeEvents.logJobEvent(job, "completed");
2024-06-27 16:00:45 -03:00
} catch (error) {
2024-12-17 22:01:41 +01:00
_logger.error(`🐂 Failed to update job status`, {
module: "runWebScraper",
method: "saveJob",
jobId: job.id,
scrapeId: job.id,
});
}
2024-07-12 19:07:59 -04:00
};