2024-10-28 16:02:07 -03:00
import { Request , Response } from "express" ;
import {
2024-11-12 18:44:14 -03:00
// Document,
2024-10-28 16:02:07 -03:00
RequestWithAuth ,
ExtractRequest ,
extractRequestSchema ,
ExtractResponse ,
MapDocument ,
2024-11-12 18:44:14 -03:00
scrapeOptions ,
2024-10-28 16:02:07 -03:00
} from "./types" ;
2024-11-12 18:44:14 -03:00
import { Document } from "../../lib/entities" ;
import Redis from "ioredis" ;
import { configDotenv } from "dotenv" ;
import { performRanking } from "../../lib/ranker" ;
import { billTeam } from "../../services/billing/credit_billing" ;
import { logJob } from "../../services/logging/log_job" ;
import { logger } from "../../lib/logger" ;
import { getScrapeQueue } from "../../services/queue-service" ;
import { waitForJob } from "../../services/queue-jobs" ;
import { addScrapeJob } from "../../services/queue-jobs" ;
import { PlanType } from "../../types" ;
import { getJobPriority } from "../../lib/job-priority" ;
2024-11-13 18:06:20 -03:00
import { generateOpenAICompletions } from "../../scraper/scrapeURL/transformers/llmExtract" ;
import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist" ;
2024-11-14 14:57:38 -05:00
import { getMapResults } from "./map" ;
2024-11-20 12:48:10 -08:00
import { buildDocument } from "../../lib/extract/build-document" ;
2024-11-12 18:44:14 -03:00
configDotenv ( ) ;
const redis = new Redis ( process . env . REDIS_URL ! ) ;
const MAX_EXTRACT_LIMIT = 100 ;
2024-11-14 15:26:15 -05:00
const MAX_RANKING_LIMIT = 10 ;
2024-11-24 20:31:38 -08:00
const INITIAL_SCORE_THRESHOLD = 0.75 ;
const FALLBACK_SCORE_THRESHOLD = 0.5 ;
2024-11-24 20:37:58 -08:00
const MIN_REQUIRED_LINKS = 1 ;
2024-10-28 16:02:07 -03:00
2024-11-20 13:16:36 -08:00
/**
* Extracts data from the provided URLs based on the request parameters.
* Currently in beta.
* @param req - The request object containing authentication and extraction details.
* @param res - The response object to send the extraction results.
* @returns A promise that resolves when the extraction process is complete.
*/
2024-10-28 16:02:07 -03:00
export async function extractController (
req : RequestWithAuth < { } , ExtractResponse , ExtractRequest > ,
2024-11-13 18:06:20 -03:00
res : Response < ExtractResponse >
2024-10-28 16:02:07 -03:00
) {
2024-11-20 13:15:52 -08:00
const selfHosted = process . env . USE_DB_AUTHENTICATION !== "true" ;
2024-10-28 16:02:07 -03:00
req . body = extractRequestSchema . parse ( req . body ) ;
2024-11-12 18:44:14 -03:00
const id = crypto . randomUUID ( ) ;
2024-11-14 14:57:38 -05:00
let links : string [ ] = [ ] ;
let docs : Document [ ] = [ ] ;
const earlyReturn = false ;
2024-11-14 15:03:06 -05:00
// Process all URLs in parallel
const urlPromises = req . body . urls . map ( async ( url ) = > {
2024-11-14 15:41:42 -05:00
if ( url . includes ( '/*' ) || req . body . allowExternalLinks ) {
2024-11-14 14:57:38 -05:00
// Handle glob pattern URLs
const baseUrl = url . replace ( '/*' , '' ) ;
2024-11-20 12:48:10 -08:00
// const pathPrefix = baseUrl.split('/').slice(3).join('/'); // Get path after domain if any
2024-11-14 14:57:38 -05:00
const allowExternalLinks = req . body . allowExternalLinks ? ? true ;
let urlWithoutWww = baseUrl . replace ( "www." , "" ) ;
let mapUrl = req . body . prompt && allowExternalLinks
? ` ${ req . body . prompt } ${ urlWithoutWww } `
: req . body . prompt ? ` ${ req . body . prompt } site: ${ urlWithoutWww } `
: ` site: ${ urlWithoutWww } ` ;
const mapResults = await getMapResults ( {
url : baseUrl ,
search : req.body.prompt ,
teamId : req.auth.team_id ,
plan : req.auth.plan ,
allowExternalLinks ,
origin : req.body.origin ,
limit : req.body.limit ,
2024-11-20 13:15:52 -08:00
// If we're self-hosted, we don't want to ignore the sitemap, due to our fire-engine mapping
ignoreSitemap : ! selfHosted ? true : false ,
2024-11-14 14:57:38 -05:00
includeMetadata : true ,
includeSubdomains : req.body.includeSubdomains ,
2024-11-13 18:06:20 -03:00
} ) ;
2024-11-12 18:44:14 -03:00
2024-11-14 15:26:15 -05:00
let mappedLinks = mapResults . links as MapDocument [ ] ;
// Limit number of links to MAX_EXTRACT_LIMIT
mappedLinks = mappedLinks . slice ( 0 , MAX_EXTRACT_LIMIT ) ;
let mappedLinksRerank = mappedLinks . map ( x = > ` url: ${ x . url } , title: ${ x . title } , description: ${ x . description } ` ) ;
2024-11-14 14:57:38 -05:00
// Filter by path prefix if present
2024-11-20 12:48:10 -08:00
// wrong
// if (pathPrefix) {
// mappedLinks = mappedLinks.filter(x => x.url && x.url.includes(`/${pathPrefix}/`));
// }
2024-11-12 18:44:14 -03:00
2024-11-14 14:57:38 -05:00
if ( req . body . prompt ) {
2024-11-24 19:36:18 -08:00
// Get similarity scores between the search query and each link's context
2024-11-24 20:31:38 -08:00
const linksAndScores = await performRanking ( mappedLinksRerank , mappedLinks . map ( l = > l . url ) , mapUrl ) ;
2024-11-24 19:36:18 -08:00
2024-11-24 20:31:38 -08:00
// First try with high threshold
let filteredLinks = filterAndProcessLinks ( mappedLinks , linksAndScores , INITIAL_SCORE_THRESHOLD ) ;
// If we don't have enough high-quality links, try with lower threshold
if ( filteredLinks . length < MIN_REQUIRED_LINKS ) {
logger . info ( ` Only found ${ filteredLinks . length } links with score > ${ INITIAL_SCORE_THRESHOLD } . Trying lower threshold... ` ) ;
filteredLinks = filterAndProcessLinks ( mappedLinks , linksAndScores , FALLBACK_SCORE_THRESHOLD ) ;
if ( filteredLinks . length === 0 ) {
// If still no results, take top N results regardless of score
logger . warn ( ` No links found with score > ${ FALLBACK_SCORE_THRESHOLD } . Taking top ${ MIN_REQUIRED_LINKS } results. ` ) ;
filteredLinks = linksAndScores
. sort ( ( a , b ) = > b . score - a . score )
. slice ( 0 , MIN_REQUIRED_LINKS )
. map ( x = > mappedLinks . find ( link = > link . url === x . link ) )
. filter ( ( x ) : x is MapDocument = > x !== undefined && x . url !== undefined && ! isUrlBlocked ( x . url ) ) ;
}
}
mappedLinks = filteredLinks . slice ( 0 , MAX_RANKING_LIMIT ) ;
2024-11-14 14:57:38 -05:00
}
2024-11-13 18:06:20 -03:00
2024-11-14 15:26:15 -05:00
return mappedLinks . map ( x = > x . url ) as string [ ] ;
2024-11-12 18:44:14 -03:00
2024-11-14 14:57:38 -05:00
} else {
// Handle direct URLs without glob pattern
if ( ! isUrlBlocked ( url ) ) {
2024-11-14 15:03:06 -05:00
return [ url ] ;
2024-11-14 14:57:38 -05:00
}
2024-11-14 15:03:06 -05:00
return [ ] ;
2024-11-14 14:57:38 -05:00
}
2024-11-14 15:03:06 -05:00
} ) ;
// Wait for all URL processing to complete and flatten results
const processedUrls = await Promise . all ( urlPromises ) ;
links . push ( . . . processedUrls . flat ( ) ) ;
2024-11-12 18:44:14 -03:00
2024-11-24 20:31:38 -08:00
if ( links . length === 0 ) {
return res . status ( 400 ) . json ( {
success : false ,
error : "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs."
} ) ;
}
// Scrape all links in parallel with retries
2024-11-14 14:59:34 -05:00
const scrapePromises = links . map ( async ( url ) = > {
2024-11-12 18:44:14 -03:00
const origin = req . body . origin || "api" ;
2024-11-24 19:44:51 -08:00
const timeout = Math . floor ( ( req . body . timeout || 40000 ) * 0.7 ) || 30000 ; // Use 70% of total timeout for individual scrapes
2024-11-12 18:44:14 -03:00
const jobId = crypto . randomUUID ( ) ;
const jobPriority = await getJobPriority ( {
plan : req.auth.plan as PlanType ,
team_id : req.auth.team_id ,
basePriority : 10 ,
} ) ;
await addScrapeJob (
{
url ,
2024-11-14 14:59:34 -05:00
mode : "single_urls" ,
2024-11-12 18:44:14 -03:00
team_id : req.auth.team_id ,
scrapeOptions : scrapeOptions.parse ( { } ) ,
internalOptions : { } ,
plan : req.auth.plan ! ,
origin ,
is_scrape : true ,
} ,
{ } ,
jobId ,
jobPriority
) ;
try {
2024-11-24 19:44:51 -08:00
const doc = await waitForJob < Document > ( jobId , timeout ) ;
2024-11-14 14:59:34 -05:00
await getScrapeQueue ( ) . remove ( jobId ) ;
if ( earlyReturn ) {
return null ;
}
return doc ;
2024-11-12 18:44:14 -03:00
} catch ( e ) {
logger . error ( ` Error in scrapeController: ${ e } ` ) ;
if ( e instanceof Error && ( e . message . startsWith ( "Job wait" ) || e . message === "timeout" ) ) {
2024-11-14 14:59:34 -05:00
throw {
status : 408 ,
error : "Request timed out"
} ;
2024-11-12 18:44:14 -03:00
} else {
2024-11-14 14:59:34 -05:00
throw {
status : 500 ,
error : ` (Internal server error) - ${ ( e && e . message ) ? e.message : e } `
} ;
2024-11-12 18:44:14 -03:00
}
}
2024-11-14 14:59:34 -05:00
} ) ;
try {
const results = await Promise . all ( scrapePromises ) ;
docs . push ( . . . results . filter ( doc = > doc !== null ) . map ( x = > x ! ) ) ;
} catch ( e ) {
return res . status ( e . status ) . json ( {
success : false ,
error : e.error
} ) ;
2024-11-12 18:44:14 -03:00
}
2024-11-13 18:06:20 -03:00
const completions = await generateOpenAICompletions (
logger . child ( { method : "extractController/generateOpenAICompletions" } ) ,
{
mode : "llm" ,
2024-11-26 15:01:42 -03:00
systemPrompt : "Always prioritize using the provided content to answer the question. Do not make up an answer. Be concise and follow the schema if provided." ,
2024-11-14 14:57:38 -05:00
prompt : req.body.prompt ,
2024-11-13 18:06:20 -03:00
schema : req.body.schema ,
} ,
2024-11-20 12:48:10 -08:00
docs . map ( x = > buildDocument ( x ) ) . join ( '\n' )
2024-11-13 18:06:20 -03:00
) ;
2024-11-12 18:44:14 -03:00
2024-11-20 13:08:09 -08:00
// TODO: change this later
// While on beta, we're billing 5 credits per link discovered/scraped.
billTeam ( req . auth . team_id , req . acuc ? . sub_id , links . length * 5 ) . catch ( error = > {
logger . error ( ` Failed to bill team ${ req . auth . team_id } for ${ links . length * 5 } credits: ${ error } ` ) ;
} ) ;
2024-11-12 18:44:14 -03:00
2024-11-24 20:37:58 -08:00
let data = completions . extract ? ? { } ;
let warning = completions . warning ;
2024-11-13 18:06:20 -03:00
2024-11-20 13:08:09 -08:00
logJob ( {
job_id : id ,
success : true ,
message : "Extract completed" ,
num_docs : 1 ,
docs : data ,
time_taken : ( new Date ( ) . getTime ( ) - Date . now ( ) ) / 1000 ,
team_id : req.auth.team_id ,
mode : "extract" ,
url : req.body.urls.join ( ", " ) ,
scrapeOptions : req.body ,
origin : req.body.origin ? ? "api" ,
2024-11-20 13:09:46 -08:00
num_tokens : completions.numTokens ? ? 0
2024-11-20 13:08:09 -08:00
} ) ;
2024-11-20 13:09:46 -08:00
2024-10-28 16:02:07 -03:00
return res . status ( 200 ) . json ( {
success : true ,
2024-11-14 14:57:38 -05:00
data : data ,
scrape_id : id ,
2024-11-24 19:44:51 -08:00
warning : warning
2024-10-28 16:02:07 -03:00
} ) ;
2024-11-24 20:31:38 -08:00
}
2024-11-26 15:01:42 -03:00
/**
* Filters links based on their similarity score to the search query.
* @param mappedLinks - The list of mapped links to filter.
* @param linksAndScores - The list of links and their similarity scores.
* @param threshold - The score threshold to filter by.
* @returns The filtered list of links.
*/
2024-11-24 20:31:38 -08:00
function filterAndProcessLinks (
mappedLinks : MapDocument [ ] ,
linksAndScores : { link : string , linkWithContext : string , score : number , originalIndex : number } [ ] ,
threshold : number
) : MapDocument [ ] {
return linksAndScores
. filter ( x = > x . score > threshold )
. map ( x = > mappedLinks . find ( link = > link . url === x . link ) )
. filter ( ( x ) : x is MapDocument = > x !== undefined && x . url !== undefined && ! isUrlBlocked ( x . url ) ) ;
2024-11-12 18:44:14 -03:00
}