From 61e6af2b166df43f535abc865a2c49b2edd74642 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 14 Jan 2025 02:13:42 -0300 Subject: [PATCH] Nick: streaming callback experimental --- apps/api/src/lib/extract/extract-redis.ts | 5 +- .../api/src/lib/extract/extraction-service.ts | 46 +++++++++++++++---- apps/api/src/lib/extract/url-processor.ts | 3 ++ 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts index 0df700f4..18080dbe 100644 --- a/apps/api/src/lib/extract/extract-redis.ts +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -3,12 +3,15 @@ import { logger as _logger } from "../logger"; export enum ExtractStep { INITIAL = "initial", + MAP = "map", + MAP_RERANK = "map-rerank", MULTI_ENTITY = "multi-entity", MULTI_ENTITY_SCRAPE = "multi-entity-scrape", MULTI_ENTITY_EXTRACT = "multi-entity-extract", SCRAPE = "scrape", - MAP = "map", + EXTRACT = "extract", + COMPLETE = "complete", } diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 272d4fd7..3189e32d 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -170,6 +170,8 @@ export async function performExtraction( ], }); + let startMap = Date.now(); + let aggMapLinks: string[] = []; // Process URLs const urlPromises = request.urls.map((url) => processUrl( @@ -184,9 +186,20 @@ export async function performExtraction( includeSubdomains: request.includeSubdomains, schema: request.schema, }, - urlTraces, - ), - ); + urlTraces, + (links: string[]) => { + aggMapLinks.push(...links); + updateExtract(extractId, { + steps: [ + { + step: ExtractStep.MAP, + startedAt: startMap, + finishedAt: Date.now(), + discoveredLinks: aggMapLinks, + }, + ], + }); + })); const processedUrls = await Promise.all(urlPromises); const links = processedUrls.flat().filter((url) => url); @@ -205,8 +218,8 @@ export async function performExtraction( status: "processing", steps: [ { - step: ExtractStep.MAP, - startedAt: Date.now(), + step: ExtractStep.MAP_RERANK, + startedAt: startMap, finishedAt: Date.now(), discoveredLinks: links, }, @@ -221,6 +234,7 @@ export async function performExtraction( // if so, it splits the results into 2 types of completions: // 1. the first one is a completion that will extract the array of items // 2. the second one is multiple completions that will extract the items from the array + let startAnalyze = Date.now(); const { isMultiEntity, multiEntityKeys, reasoning, keyIndicators } = await analyzeSchemaAndPrompt(links, request.schema, request.prompt ?? ""); @@ -239,7 +253,7 @@ export async function performExtraction( steps: [ { step: ExtractStep.MULTI_ENTITY, - startedAt: Date.now(), + startedAt: startAnalyze, finishedAt: Date.now(), discoveredLinks: [], }, @@ -254,12 +268,14 @@ export async function performExtraction( steps: [ { step: ExtractStep.MULTI_ENTITY_SCRAPE, - startedAt: Date.now(), + startedAt: startAnalyze, finishedAt: Date.now(), discoveredLinks: links, }, ], }); + + let startScrape = Date.now(); const scrapePromises = links.map((url) => { if (!docsMap.has(url)) { return scrapeDocument( @@ -280,6 +296,20 @@ export async function performExtraction( (doc): doc is Document => doc !== null, ); + let endScrape = Date.now(); + + await updateExtract(extractId, { + status: "processing", + steps: [ + { + step: ExtractStep.MULTI_ENTITY_SCRAPE, + startedAt: startScrape, + finishedAt: endScrape, + discoveredLinks: links, + }, + ], + }); + for (const doc of multyEntityDocs) { if (doc?.metadata?.url) { docsMap.set(doc.metadata.url, doc); @@ -352,7 +382,7 @@ export async function performExtraction( steps: [ { step: ExtractStep.MULTI_ENTITY_EXTRACT, - startedAt: Date.now(), + startedAt: startScrape, finishedAt: Date.now(), discoveredLinks: [doc.metadata.url || doc.metadata.sourceURL || ""], }, diff --git a/apps/api/src/lib/extract/url-processor.ts b/apps/api/src/lib/extract/url-processor.ts index ccdfb332..eb5f0278 100644 --- a/apps/api/src/lib/extract/url-processor.ts +++ b/apps/api/src/lib/extract/url-processor.ts @@ -25,6 +25,7 @@ interface ProcessUrlOptions { export async function processUrl( options: ProcessUrlOptions, urlTraces: URLTrace[], + updateExtractCallback: (links: string[]) => void, ): Promise { const trace: URLTrace = { url: options.url, @@ -160,6 +161,8 @@ export async function processUrl( ); + updateExtractCallback(mappedLinks.map((x) => x.url)); + // Perform reranking using either prompt or schema let searchQuery = "";