feat: Enhance investor and company parsing with asynchronous batch processing

This commit is contained in:
bolade
2025-10-08 13:29:25 +01:00
parent be6fde9ba2
commit 58722f1102
3 changed files with 168 additions and 126 deletions
Binary file not shown.
+168 -126
View File
@@ -1,3 +1,4 @@
import asyncio
import json
import os
import re
@@ -649,10 +650,55 @@ Return the lower and upper bounds in USD."""
print(f"Error processing row {row_idx + 1}: {e}")
return None
async def parse_investors(self, df: pd.DataFrame, save_to_db: bool = True):
async def _process_single_investor(
self, idx: int, row: pd.Series, total_rows: int
) -> Optional[dict]:
"""Process a single investor row"""
try:
name = row.get("Name", "").strip() if pd.notna(row.get("Name")) else None
website = (
row.get("Website", "").strip() if pd.notna(row.get("Website")) else None
)
profile_json = (
row.get("Final Investor Profile", "")
if pd.notna(row.get("Final Investor Profile"))
else None
)
if not name or not profile_json:
print(f"⚠️ Row {idx + 1}: Skipping - missing name or profile")
return None
print(f"📊 Processing {idx + 1}/{total_rows}: {name}")
# Process the investor profile
investor_data = await self.process_investor_profile(
name, website, profile_json
)
if investor_data:
print(f"{name} parsed successfully")
return investor_data
else:
print(f" ⚠️ {name} failed to process")
return None
except Exception as e:
print(f"❌ Error processing row {idx + 1}: {e}")
return None
async def parse_investors(
self, df: pd.DataFrame, save_to_db: bool = True, batch_size: int = 10
):
"""
Parse investors from DataFrame using manual JSON parsing and LLM for currency conversion.
Processes multiple investors concurrently for better performance.
Expected CSV columns: Name, Website, Final Investor Profile, Final Profile sourcing
Args:
df: DataFrame with investor data
save_to_db: Whether to save to database
batch_size: Number of investors to process concurrently (default: 10)
"""
results = []
db = None
@@ -661,50 +707,31 @@ Return the lower and upper bounds in USD."""
try:
total_rows = len(df)
print(f"\n🚀 Starting to process {total_rows} investors...")
print(
f"\n🚀 Starting to process {total_rows} investors with batch size {batch_size}..."
)
for idx, row in df.iterrows():
try:
name = (
row.get("Name", "").strip()
if pd.notna(row.get("Name"))
else None
)
website = (
row.get("Website", "").strip()
if pd.notna(row.get("Website"))
else None
)
profile_json = (
row.get("Final Investor Profile", "")
if pd.notna(row.get("Final Investor Profile"))
else None
)
# Process in batches
for batch_start in range(0, total_rows, batch_size):
batch_end = min(batch_start + batch_size, total_rows)
print(
f"\n🔄 Processing batch {batch_start + 1}-{batch_end} of {total_rows}..."
)
if not name or not profile_json:
print(f"⚠️ Row {idx + 1}: Skipping - missing name or profile")
continue
# Create tasks for concurrent processing
tasks = []
for idx in range(batch_start, batch_end):
row = df.iloc[idx]
task = self._process_single_investor(idx, row, total_rows)
tasks.append(task)
print(f"\n📊 Processing {idx + 1}/{total_rows}: {name}")
# Process batch concurrently
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process the investor profile
investor_data = await self.process_investor_profile(
name, website, profile_json
)
if investor_data:
# Filter out None results and exceptions, then save to database
for investor_data in batch_results:
if investor_data and not isinstance(investor_data, Exception):
results.append(investor_data)
print(" ✓ Parsed successfully")
print(f" - HQ: {investor_data.get('headquarters')}")
print(
f" - AUM: ${investor_data.get('aum'):,}"
if investor_data.get("aum")
else " - AUM: Not Available"
)
print(f" - Funds: {len(investor_data.get('funds', []))}")
print(
f" - Team: {len(investor_data.get('team_members', []))}"
)
# Save to database
if save_to_db and db:
@@ -713,33 +740,29 @@ Return the lower and upper bounds in USD."""
db, investor_data
)
if saved_investor:
db.commit()
print(
f" ✅ Saved to database (ID: {saved_investor.id})"
f" ✅ Saved {investor_data['name']} to database (ID: {saved_investor.id})"
)
else:
print(" ❌ Failed to save to database")
print(
f" ❌ Failed to save {investor_data['name']} to database"
)
except Exception as e:
db.rollback()
print(f" ❌ Database error: {e}")
else:
print(" ⚠️ Failed to process profile")
print(
f" ❌ Database error for {investor_data['name']}: {e}"
)
elif isinstance(investor_data, Exception):
print(f" ❌ Exception occurred: {investor_data}")
# Commit every 10 investors to avoid memory issues
if save_to_db and db and (idx + 1) % 10 == 0:
# Commit batch to database
if save_to_db and db:
try:
db.commit()
print(f"\n💾 Committed batch at row {idx + 1}")
except Exception as e:
print(f"❌ Error processing row {idx + 1}: {e}")
if db:
print(f"💾 Committed batch {batch_start + 1}-{batch_end}")
except Exception as e:
db.rollback()
continue
# Final commit
if save_to_db and db:
db.commit()
print("\n✅ Final commit completed")
print(f"❌ Failed to commit batch: {e}")
except Exception as e:
print(f"❌ Fatal error in parse_investors: {e}")
@@ -752,10 +775,60 @@ Return the lower and upper bounds in USD."""
print(f"\n🎉 Completed! Processed {len(results)}/{total_rows} investors")
return results
async def parse_companies(self, df: pd.DataFrame, save_to_db: bool = True):
async def _process_single_company(
self, idx: int, row: pd.Series, total_rows: int
) -> Optional[dict]:
"""Process a single company row"""
try:
name = row.get("Name", "").strip() if pd.notna(row.get("Name")) else None
website = (
row.get("Website", "").strip() if pd.notna(row.get("Website")) else None
)
investor_names = (
row.get("Investor", "").strip()
if pd.notna(row.get("Investor"))
else None
)
profile_json = (
row.get("Final Investor Profile", "")
if pd.notna(row.get("Final Investor Profile"))
else None
)
if not name or not profile_json:
print(f"⚠️ Row {idx + 1}: Skipping - missing name or profile")
return None
print(f"📊 Processing {idx + 1}/{total_rows}: {name}")
# Process the company profile
company_data = await self.process_company_profile(
name, website, profile_json, investor_names
)
if company_data:
print(f"{name} parsed successfully")
return company_data
else:
print(f" ⚠️ {name} failed to process")
return None
except Exception as e:
print(f"❌ Error processing row {idx + 1}: {e}")
return None
async def parse_companies(
self, df: pd.DataFrame, save_to_db: bool = True, batch_size: int = 10
):
"""
Parse companies from DataFrame using manual JSON parsing.
Processes multiple companies concurrently for better performance.
Expected CSV columns: Name, Website, Investor, Final Investor Profile (actually company profile)
Args:
df: DataFrame with company data
save_to_db: Whether to save to database
batch_size: Number of companies to process concurrently (default: 10)
"""
results = []
db = None
@@ -764,58 +837,31 @@ Return the lower and upper bounds in USD."""
try:
total_rows = len(df)
print(f"\n🚀 Starting to process {total_rows} companies...")
print(
f"\n🚀 Starting to process {total_rows} companies with batch size {batch_size}..."
)
for idx, row in df.iterrows():
try:
name = (
row.get("Name", "").strip()
if pd.notna(row.get("Name"))
else None
)
website = (
row.get("Website", "").strip()
if pd.notna(row.get("Website"))
else None
)
investor_names = (
row.get("Investor", "").strip()
if pd.notna(row.get("Investor"))
else None
)
profile_json = (
row.get("Final Investor Profile", "")
if pd.notna(row.get("Final Investor Profile"))
else None
)
# Process in batches
for batch_start in range(0, total_rows, batch_size):
batch_end = min(batch_start + batch_size, total_rows)
print(
f"\n🔄 Processing batch {batch_start + 1}-{batch_end} of {total_rows}..."
)
if not name or not profile_json:
print(f"⚠️ Row {idx + 1}: Skipping - missing name or profile")
continue
# Create tasks for concurrent processing
tasks = []
for idx in range(batch_start, batch_end):
row = df.iloc[idx]
task = self._process_single_company(idx, row, total_rows)
tasks.append(task)
print(f"\n📊 Processing {idx + 1}/{total_rows}: {name}")
# Process batch concurrently
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process the company profile
company_data = await self.process_company_profile(
name, website, profile_json, investor_names
)
if company_data:
# Filter out None results and exceptions, then save to database
for company_data in batch_results:
if company_data and not isinstance(company_data, Exception):
results.append(company_data)
print(" ✓ Parsed successfully")
print(f" - Location: {company_data.get('location')}")
print(f" - Industry: {company_data.get('industry')}")
print(
f" - Founded: {company_data.get('founded_year')}"
if company_data.get("founded_year")
else " - Founded: Unknown"
)
print(
f" - Executives: {len(company_data.get('key_executives', []))}"
)
print(
f" - Investors: {len(company_data.get('investor_names', []))}"
)
# Save to database
if save_to_db and db:
@@ -824,33 +870,29 @@ Return the lower and upper bounds in USD."""
db, company_data
)
if saved_company:
db.commit()
print(
f" ✅ Saved to database (ID: {saved_company.id})"
f" ✅ Saved {company_data['name']} to database (ID: {saved_company.id})"
)
else:
print(" ❌ Failed to save to database")
print(
f" ❌ Failed to save {company_data['name']} to database"
)
except Exception as e:
db.rollback()
print(f" ❌ Database error: {e}")
else:
print(" ⚠️ Failed to process profile")
print(
f" ❌ Database error for {company_data['name']}: {e}"
)
elif isinstance(company_data, Exception):
print(f" ❌ Exception occurred: {company_data}")
# Commit every 10 companies to avoid memory issues
if save_to_db and db and (idx + 1) % 10 == 0:
# Commit batch to database
if save_to_db and db:
try:
db.commit()
print(f"\n💾 Committed batch at row {idx + 1}")
except Exception as e:
print(f"❌ Error processing row {idx + 1}: {e}")
if db:
print(f"💾 Committed batch {batch_start + 1}-{batch_end}")
except Exception as e:
db.rollback()
continue
# Final commit
if save_to_db and db:
db.commit()
print("\n✅ Final commit completed")
print(f"❌ Failed to commit batch: {e}")
except Exception as e:
print(f"❌ Fatal error in parse_companies: {e}")
BIN
View File
Binary file not shown.