8d467c8ca7
* feat: use strictNullChecking * feat: switch logger to Winston * feat(scrapeURL): first batch * fix(scrapeURL): error swallow * fix(scrapeURL): add timeout to EngineResultsTracker * fix(scrapeURL): report unexpected error to sentry * chore: remove unused modules * feat(transfomers/coerce): warn when a format's response is missing * feat(scrapeURL): feature flag priorities, engine quality sorting, PDF and DOCX support * (add note) * feat(scrapeURL): wip readme * feat(scrapeURL): LLM extract * feat(scrapeURL): better warnings * fix(scrapeURL/engines/fire-engine;playwright): fix screenshot * feat(scrapeURL): add forceEngine internal option * feat(scrapeURL/engines): scrapingbee * feat(scrapeURL/transformars): uploadScreenshot * feat(scrapeURL): more intense tests * bunch of stuff * get rid of WebScraper (mostly) * adapt batch scrape * add staging deploy workflow * fix yaml * fix logger issues * fix v1 test schema * feat(scrapeURL/fire-engine/chrome-cdp): remove wait inserts on actions * scrapeURL: v0 backwards compat * logger fixes * feat(scrapeurl): v0 returnOnlyUrls support * fix(scrapeURL/v0): URL leniency * fix(batch-scrape): ts non-nullable * fix(scrapeURL/fire-engine/chromecdp): fix wait action * fix(logger): remove error debug key * feat(requests.http): use dotenv expression * fix(scrapeURL/extractMetadata): extract custom metadata * fix crawl option conversion * feat(scrapeURL): Add retry logic to robustFetch * fix(scrapeURL): crawl stuff * fix(scrapeURL): LLM extract * fix(scrapeURL/v0): search fix * fix(tests/v0): grant larger response size to v0 crawl status * feat(scrapeURL): basic fetch engine * feat(scrapeURL): playwright engine * feat(scrapeURL): add url-specific parameters * Update readme and examples * added e2e tests for most parameters. Still a few actions, location and iframes to be done. * fixed type * Nick: * Update scrape.ts * Update index.ts * added actions and base64 check * Nick: skipTls feature flag? * 403 * todo * todo * fixes * yeet headers from url specific params * add warning when final engine has feature deficit * expose engine results tracker for ScrapeEvents implementation * ingest scrape events * fixed some tests * comment * Update index.test.ts * fixed rawHtml * Update index.test.ts * update comments * move geolocation to global f-e option, fix removeBase64Images * Nick: * trim url-specific params * Update index.ts --------- Co-authored-by: Eric Ciarla <ericciarla@yahoo.com> Co-authored-by: rafaelmmiller <8574157+rafaelmmiller@users.noreply.github.com> Co-authored-by: Nicolas <nicolascamara29@gmail.com>
150 lines
5.2 KiB
TypeScript
150 lines
5.2 KiB
TypeScript
import { Response } from "express";
|
|
import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, RequestWithAuth } from "./types";
|
|
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs } from "../../lib/crawl-redis";
|
|
import { getScrapeQueue } from "../../services/queue-service";
|
|
import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs";
|
|
import { configDotenv } from "dotenv";
|
|
import { Job, JobState } from "bullmq";
|
|
configDotenv();
|
|
|
|
export async function getJob(id: string) {
|
|
const job = await getScrapeQueue().getJob(id);
|
|
if (!job) return job;
|
|
|
|
if (process.env.USE_DB_AUTHENTICATION === "true") {
|
|
const supabaseData = await supabaseGetJobById(id);
|
|
|
|
if (supabaseData) {
|
|
job.returnvalue = supabaseData.docs;
|
|
}
|
|
}
|
|
|
|
job.returnvalue = Array.isArray(job.returnvalue) ? job.returnvalue[0] : job.returnvalue;
|
|
|
|
return job;
|
|
}
|
|
|
|
export async function getJobs(ids: string[]) {
|
|
const jobs: (Job & { id: string })[] = (await Promise.all(ids.map(x => getScrapeQueue().getJob(x)))).filter(x => x) as (Job & {id: string})[];
|
|
|
|
if (process.env.USE_DB_AUTHENTICATION === "true") {
|
|
const supabaseData = await supabaseGetJobsById(ids);
|
|
|
|
supabaseData.forEach(x => {
|
|
const job = jobs.find(y => y.id === x.job_id);
|
|
if (job) {
|
|
job.returnvalue = x.docs;
|
|
}
|
|
})
|
|
}
|
|
|
|
jobs.forEach(job => {
|
|
job.returnvalue = Array.isArray(job.returnvalue) ? job.returnvalue[0] : job.returnvalue;
|
|
});
|
|
|
|
return jobs;
|
|
}
|
|
|
|
export async function crawlStatusController(req: RequestWithAuth<CrawlStatusParams, undefined, CrawlStatusResponse>, res: Response<CrawlStatusResponse>, isBatch = false) {
|
|
const sc = await getCrawl(req.params.jobId);
|
|
if (!sc) {
|
|
return res.status(404).json({ success: false, error: "Job not found" });
|
|
}
|
|
|
|
if (sc.team_id !== req.auth.team_id) {
|
|
return res.status(403).json({ success: false, error: "Forbidden" });
|
|
}
|
|
|
|
const start = typeof req.query.skip === "string" ? parseInt(req.query.skip, 10) : 0;
|
|
const end = typeof req.query.limit === "string" ? (start + parseInt(req.query.limit, 10) - 1) : undefined;
|
|
|
|
let jobIDs = await getCrawlJobs(req.params.jobId);
|
|
let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const));
|
|
const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id));
|
|
|
|
const throttledJobsSet = new Set(throttledJobs);
|
|
|
|
const validJobStatuses: [string, JobState | "unknown"][] = [];
|
|
const validJobIDs: string[] = [];
|
|
|
|
for (const [id, status] of jobStatuses) {
|
|
if (!throttledJobsSet.has(id) && status !== "failed" && status !== "unknown") {
|
|
validJobStatuses.push([id, status]);
|
|
validJobIDs.push(id);
|
|
}
|
|
}
|
|
|
|
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : validJobStatuses.every(x => x[1] === "completed") ? "completed" : "scraping";
|
|
|
|
// Use validJobIDs instead of jobIDs for further processing
|
|
jobIDs = validJobIDs;
|
|
|
|
const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
|
|
const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1);
|
|
|
|
let doneJobs: Job[] = [];
|
|
|
|
if (end === undefined) { // determine 10 megabyte limit
|
|
let bytes = 0;
|
|
const bytesLimit = 10485760; // 10 MiB in bytes
|
|
const factor = 100; // chunking for faster retrieval
|
|
|
|
for (let i = 0; i < doneJobsOrder.length && bytes < bytesLimit; i += factor) {
|
|
// get current chunk and retrieve jobs
|
|
const currentIDs = doneJobsOrder.slice(i, i+factor);
|
|
const jobs = await getJobs(currentIDs);
|
|
|
|
// iterate through jobs and add them one them one to the byte counter
|
|
// both loops will break once we cross the byte counter
|
|
for (let ii = 0; ii < jobs.length && bytes < bytesLimit; ii++) {
|
|
const job = jobs[ii];
|
|
doneJobs.push(job);
|
|
bytes += JSON.stringify(job.returnvalue).length;
|
|
}
|
|
}
|
|
|
|
// if we ran over the bytes limit, remove the last document, except if it's the only document
|
|
if (bytes > bytesLimit && doneJobs.length !== 1) {
|
|
doneJobs.splice(doneJobs.length - 1, 1);
|
|
}
|
|
} else {
|
|
doneJobs = await getJobs(doneJobsOrder);
|
|
}
|
|
|
|
const data = doneJobs.map(x => x.returnvalue);
|
|
|
|
const protocol = process.env.ENV === "local" ? req.protocol : "https";
|
|
const nextURL = new URL(`${protocol}://${req.get("host")}/v1/${isBatch ? "batch/scrape" : "crawl"}/${req.params.jobId}`);
|
|
|
|
nextURL.searchParams.set("skip", (start + data.length).toString());
|
|
|
|
if (typeof req.query.limit === "string") {
|
|
nextURL.searchParams.set("limit", req.query.limit);
|
|
}
|
|
|
|
if (data.length > 0) {
|
|
if (!doneJobs[0].data.scrapeOptions.formats.includes("rawHtml")) {
|
|
for (let ii = 0; ii < doneJobs.length; ii++) {
|
|
if (data[ii]) {
|
|
delete data[ii].rawHtml;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
res.status(200).json({
|
|
success: true,
|
|
status,
|
|
completed: doneJobsLength,
|
|
total: jobIDs.length,
|
|
creditsUsed: jobIDs.length,
|
|
expiresAt: (await getCrawlExpiry(req.params.jobId)).toISOString(),
|
|
next:
|
|
status !== "scraping" && (start + data.length) === doneJobsLength // if there's not gonna be any documents after this
|
|
? undefined
|
|
: nextURL.href,
|
|
data: data,
|
|
});
|
|
}
|
|
|