Nick: fixed crawl maps index dedup

This commit is contained in:
Nicolas
2025-01-21 16:22:27 -03:00
parent 720a429115
commit c7b219169b
2 changed files with 64 additions and 23 deletions
@@ -1,5 +1,5 @@
import { logger } from "../../lib/logger"; import { logger } from "../../lib/logger";
import { normalizeUrlOnlyHostname } from "../../lib/canonical-url"; import { normalizeUrl, normalizeUrlOnlyHostname } from "../../lib/canonical-url";
import { supabase_service } from "../../services/supabase"; import { supabase_service } from "../../services/supabase";
/** /**
@@ -23,7 +23,7 @@ async function querySitemapIndexFunction(url: string) {
throw error; throw error;
} }
const allUrls = data.map((entry) => entry.urls).flat(); const allUrls = [...new Set(data.map((entry) => entry.urls).flat().map(url => normalizeUrl(url)))];
return allUrls; return allUrls;
} catch (error) { } catch (error) {
@@ -18,6 +18,15 @@ interface CrawlMapOperation {
timestamp: string; timestamp: string;
} }
interface CrawlMapRecord {
id?: string;
origin_url: string;
urls: string[];
num_urls: number;
updated_at: string;
created_at?: string;
}
async function acquireLock(): Promise<boolean> { async function acquireLock(): Promise<boolean> {
const redis = redisConnection; const redis = redisConnection;
// Set lock with NX (only if it doesn't exist) and PX (millisecond expiry) // Set lock with NX (only if it doesn't exist) and PX (millisecond expiry)
@@ -65,51 +74,83 @@ async function processBatch() {
const origins = operations.map((op) => op.originUrl); const origins = operations.map((op) => op.originUrl);
const { data: existingMaps } = await supabase_service const { data: existingMaps } = await supabase_service
.from("crawl_maps") .from("crawl_maps")
.select("origin_url, urls") .select("id, origin_url, urls, updated_at")
.in("origin_url", origins); .in("origin_url", origins)
.order("updated_at", { ascending: false });
const existingMapsByOrigin = new Map( // Group maps by origin and handle duplicates
existingMaps?.map((map) => [map.origin_url, map.urls]) || [], const mapsByOrigin = new Map<string, any[]>();
); existingMaps?.forEach((map) => {
const maps = mapsByOrigin.get(map.origin_url) || [];
// Prepare updates and inserts maps.push(map);
interface CrawlMapRecord { mapsByOrigin.set(map.origin_url, maps);
origin_url: string; });
urls: string[];
num_urls: number;
updated_at: string;
created_at?: string;
}
// Handle duplicates and prepare updates
const updates: CrawlMapRecord[] = []; const updates: CrawlMapRecord[] = [];
const inserts: CrawlMapRecord[] = []; const inserts: CrawlMapRecord[] = [];
const duplicatesToDelete: string[] = [];
for (const op of operations) { for (const op of operations) {
const existingUrls = existingMapsByOrigin.get(op.originUrl); const existingForOrigin = mapsByOrigin.get(op.originUrl) || [];
if (existingUrls) { if (existingForOrigin.length > 0) {
// Merge URLs for update // Keep most recent entry and mark others for deletion
const [mostRecent, ...duplicates] = existingForOrigin;
if (duplicates.length > 0) {
duplicatesToDelete.push(...duplicates.map(d => d.id));
}
// Merge and deduplicate URLs
const mergedUrls = [ const mergedUrls = [
...new Set([...existingUrls, ...op.standardizedUrls]), ...new Set([
...mostRecent.urls,
...op.standardizedUrls.map(url => normalizeUrl(url))
])
]; ];
updates.push({ updates.push({
id: mostRecent.id, // Add id to ensure we update the correct record
origin_url: op.originUrl, origin_url: op.originUrl,
urls: mergedUrls, urls: mergedUrls,
num_urls: mergedUrls.length, num_urls: mergedUrls.length,
updated_at: op.timestamp, updated_at: op.timestamp,
}); });
} else { } else {
// Prepare insert // Prepare insert with deduplicated URLs
const deduplicatedUrls = [...new Set(op.standardizedUrls.map(url => normalizeUrl(url)))];
inserts.push({ inserts.push({
origin_url: op.originUrl, origin_url: op.originUrl,
urls: op.standardizedUrls, urls: deduplicatedUrls,
num_urls: op.standardizedUrls.length, num_urls: deduplicatedUrls.length,
created_at: op.timestamp, created_at: op.timestamp,
updated_at: op.timestamp, updated_at: op.timestamp,
}); });
} }
} }
// Delete duplicate entries
if (duplicatesToDelete.length > 0) {
logger.info(`🗑️ Deleting ${duplicatesToDelete.length} duplicate crawl maps in batches of 100`);
// Delete in batches of 100
for (let i = 0; i < duplicatesToDelete.length; i += 100) {
const batch = duplicatesToDelete.slice(i, i + 100);
const { error: deleteError } = await supabase_service
.from("crawl_maps")
.delete()
.in("id", batch);
if (deleteError) {
logger.error(`Failed to delete batch ${i/100 + 1} of duplicate crawl maps`, {
error: deleteError,
batchSize: batch.length,
startIndex: i
});
}
}
}
// Execute batch operations // Execute batch operations
if (updates.length > 0) { if (updates.length > 0) {
logger.info(`🔄 Updating ${updates.length} existing crawl maps`, { logger.info(`🔄 Updating ${updates.length} existing crawl maps`, {