Implement manual JSON parsing for company profiles; enhance data extraction and processing efficiency; add comprehensive test script for validation
This commit is contained in:
+247
-53
@@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
import pandas as pd
|
||||
@@ -187,6 +187,157 @@ Return only the USD integer amount with current exchange rates."""
|
||||
print(f"Error processing investor profile for {name}: {e}")
|
||||
return None
|
||||
|
||||
async def process_company_profile(
|
||||
self, name: str, website: str, profile_json: str, investor_names: str = None
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
Process company profile from CSV data.
|
||||
Manually extracts fields without using LLM.
|
||||
"""
|
||||
profile = self.parse_json_profile(profile_json)
|
||||
if not profile:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Extract basic info
|
||||
company_data = {
|
||||
"name": name.strip() if name else None,
|
||||
"website": website.strip() if website else None,
|
||||
"description": profile.get("companyDescription"),
|
||||
"location": profile.get("geographicFocus"),
|
||||
"industry": profile.get("sectorDescription"),
|
||||
"founded_year": None, # Not typically in the company JSON
|
||||
"key_executives": [],
|
||||
"client_categories": profile.get("clientCategories", []),
|
||||
"product_description": profile.get("productDescription"),
|
||||
"linked_documents": profile.get("linkedDocuments", []),
|
||||
"researcher_notes": profile.get("researcherNotes"),
|
||||
"missing_important_fields": profile.get("missingImportantFields", []),
|
||||
"sources": profile.get("sources", {}),
|
||||
"investor_names": [],
|
||||
}
|
||||
|
||||
# Parse investor names from the Investor column
|
||||
if investor_names and pd.notna(investor_names):
|
||||
# Split by comma and clean
|
||||
investors = [inv.strip() for inv in str(investor_names).split(",")]
|
||||
company_data["investor_names"] = [inv for inv in investors if inv]
|
||||
|
||||
# Process key executives/leadership
|
||||
key_executives = profile.get("keyExecutives", [])
|
||||
if not key_executives:
|
||||
# Try alternative field names
|
||||
key_executives = profile.get("seniorLeadership", [])
|
||||
|
||||
for exec_member in key_executives:
|
||||
if isinstance(exec_member, dict) and exec_member.get("name"):
|
||||
company_data["key_executives"].append(
|
||||
{
|
||||
"name": exec_member.get("name"),
|
||||
"title": exec_member.get("title"),
|
||||
"source_url": exec_member.get("sourceUrl"),
|
||||
}
|
||||
)
|
||||
|
||||
# Try to extract founding year from description
|
||||
description = company_data.get("description", "")
|
||||
if description:
|
||||
# Look for patterns like "founded in 2020", "Gegründet 2020", "founded 2020"
|
||||
year_patterns = [
|
||||
r"founded in (\d{4})",
|
||||
r"founded (\d{4})",
|
||||
r"Gegründet (\d{4})",
|
||||
r"established in (\d{4})",
|
||||
r"since (\d{4})",
|
||||
r"\((\d{4})\)", # Year in parentheses
|
||||
]
|
||||
for pattern in year_patterns:
|
||||
match = re.search(pattern, description, re.IGNORECASE)
|
||||
if match:
|
||||
try:
|
||||
year = int(match.group(1))
|
||||
if 1900 <= year <= 2025: # Sanity check
|
||||
company_data["founded_year"] = year
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return company_data
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing company profile for {name}: {e}")
|
||||
return None
|
||||
|
||||
def _save_parsed_company_to_db(
|
||||
self, db: Session, company_data: dict
|
||||
) -> Optional[CompanyTable]:
|
||||
"""Save manually parsed company data to database"""
|
||||
try:
|
||||
# Check if company already exists
|
||||
existing_company = (
|
||||
db.query(CompanyTable).filter_by(name=company_data["name"]).first()
|
||||
)
|
||||
|
||||
if existing_company:
|
||||
# Update existing company
|
||||
company = existing_company
|
||||
company.website = company_data.get("website") or company.website
|
||||
company.location = company_data.get("location") or company.location
|
||||
company.description = (
|
||||
company_data.get("description") or company.description
|
||||
)
|
||||
company.industry = company_data.get("industry") or company.industry
|
||||
if company_data.get("founded_year"):
|
||||
company.founded_year = company_data["founded_year"]
|
||||
else:
|
||||
# Create new company
|
||||
company = CompanyTable(
|
||||
name=company_data["name"],
|
||||
website=company_data.get("website"),
|
||||
location=company_data.get("location"),
|
||||
description=company_data.get("description"),
|
||||
industry=company_data.get("industry"),
|
||||
founded_year=company_data.get("founded_year"),
|
||||
)
|
||||
db.add(company)
|
||||
db.flush()
|
||||
|
||||
# Add/update company members (key executives)
|
||||
# First, remove existing members if updating
|
||||
if existing_company:
|
||||
db.query(CompanyMember).filter_by(company_id=company.id).delete()
|
||||
|
||||
for exec_data in company_data.get("key_executives", []):
|
||||
member = CompanyMember(
|
||||
name=exec_data.get("name"),
|
||||
role=exec_data.get("title"),
|
||||
linkedin=exec_data.get(
|
||||
"source_url"
|
||||
), # Store source URL in linkedin field
|
||||
company_id=company.id,
|
||||
)
|
||||
db.add(member)
|
||||
|
||||
# Link to investors if provided
|
||||
for investor_name in company_data.get("investor_names", []):
|
||||
# Find investor in database
|
||||
investor = (
|
||||
db.query(InvestorTable)
|
||||
.filter_by(name=investor_name.strip())
|
||||
.first()
|
||||
)
|
||||
if investor:
|
||||
# Add company to investor's portfolio if not already there
|
||||
if company not in investor.portfolio_companies:
|
||||
investor.portfolio_companies.append(company)
|
||||
|
||||
return company
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error saving company to database: {e}")
|
||||
db.rollback()
|
||||
return None
|
||||
|
||||
def _save_parsed_investor_to_db(
|
||||
self, db: Session, investor_data: dict
|
||||
) -> Optional[InvestorTable]:
|
||||
@@ -546,73 +697,116 @@ Return only the USD integer amount with current exchange rates."""
|
||||
print(f"\n🎉 Completed! Processed {len(results)}/{total_rows} investors")
|
||||
return results
|
||||
|
||||
async def parse_companies(self, df, save_to_db: bool = True):
|
||||
"""Parse companies from DataFrame and optionally save to database"""
|
||||
companies = []
|
||||
df = df[20:]
|
||||
async def parse_companies(self, df: pd.DataFrame, save_to_db: bool = True):
|
||||
"""
|
||||
Parse companies from DataFrame using manual JSON parsing.
|
||||
Expected CSV columns: Name, Website, Investor, Final Investor Profile (actually company profile)
|
||||
"""
|
||||
results = []
|
||||
db = None
|
||||
if save_to_db:
|
||||
db = get_db_session()
|
||||
|
||||
try:
|
||||
# Process rows in batches asynchronously
|
||||
batch_size = 20 # Adjust batch size as needed
|
||||
rows = [(idx, row) for idx, row in df.iterrows()]
|
||||
total_rows = len(df)
|
||||
print(f"\n🚀 Starting to process {total_rows} companies...")
|
||||
|
||||
for i in range(0, len(rows), batch_size):
|
||||
batch = rows[i : i + batch_size]
|
||||
|
||||
# Process batch asynchronously
|
||||
tasks = [
|
||||
self._process_row(row, idx, is_investor=False) for idx, row in batch
|
||||
]
|
||||
|
||||
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Handle results from batch
|
||||
for (idx, row), result in zip(batch, batch_results):
|
||||
if isinstance(result, Exception):
|
||||
print(f"Error processing row {idx}: {result}")
|
||||
if db:
|
||||
db.rollback()
|
||||
continue
|
||||
|
||||
if result:
|
||||
# Convert dict to CompanyData if needed
|
||||
if isinstance(result, dict):
|
||||
company_data = CompanyData(**result)
|
||||
else:
|
||||
company_data = result
|
||||
|
||||
companies.append(company_data)
|
||||
|
||||
# Save to database if requested
|
||||
if save_to_db and db:
|
||||
try:
|
||||
saved_company = self._save_company_to_db(
|
||||
db, company_data
|
||||
)
|
||||
db.commit()
|
||||
print(
|
||||
f"✅ Saved company '{saved_company.name}' to database"
|
||||
)
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
print(f"❌ Failed to save company to database: {e}")
|
||||
|
||||
print(
|
||||
f"Completed batch {i // batch_size + 1} of {(len(rows) + batch_size - 1) // batch_size}"
|
||||
for idx, row in df.iterrows():
|
||||
try:
|
||||
name = (
|
||||
row.get("Name", "").strip()
|
||||
if pd.notna(row.get("Name"))
|
||||
else None
|
||||
)
|
||||
website = (
|
||||
row.get("Website", "").strip()
|
||||
if pd.notna(row.get("Website"))
|
||||
else None
|
||||
)
|
||||
investor_names = (
|
||||
row.get("Investor", "").strip()
|
||||
if pd.notna(row.get("Investor"))
|
||||
else None
|
||||
)
|
||||
profile_json = (
|
||||
row.get("Final Investor Profile", "")
|
||||
if pd.notna(row.get("Final Investor Profile"))
|
||||
else None
|
||||
)
|
||||
|
||||
if not name or not profile_json:
|
||||
print(f"⚠️ Row {idx + 1}: Skipping - missing name or profile")
|
||||
continue
|
||||
|
||||
print(f"\n📊 Processing {idx + 1}/{total_rows}: {name}")
|
||||
|
||||
# Process the company profile
|
||||
company_data = await self.process_company_profile(
|
||||
name, website, profile_json, investor_names
|
||||
)
|
||||
|
||||
if company_data:
|
||||
results.append(company_data)
|
||||
print(" ✓ Parsed successfully")
|
||||
print(f" - Location: {company_data.get('location')}")
|
||||
print(f" - Industry: {company_data.get('industry')}")
|
||||
print(
|
||||
f" - Founded: {company_data.get('founded_year')}"
|
||||
if company_data.get("founded_year")
|
||||
else " - Founded: Unknown"
|
||||
)
|
||||
print(
|
||||
f" - Executives: {len(company_data.get('key_executives', []))}"
|
||||
)
|
||||
print(
|
||||
f" - Investors: {len(company_data.get('investor_names', []))}"
|
||||
)
|
||||
|
||||
# Save to database
|
||||
if save_to_db and db:
|
||||
try:
|
||||
saved_company = self._save_parsed_company_to_db(
|
||||
db, company_data
|
||||
)
|
||||
if saved_company:
|
||||
db.commit()
|
||||
print(
|
||||
f" ✅ Saved to database (ID: {saved_company.id})"
|
||||
)
|
||||
else:
|
||||
print(" ❌ Failed to save to database")
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
print(f" ❌ Database error: {e}")
|
||||
else:
|
||||
print(" ⚠️ Failed to process profile")
|
||||
|
||||
# Commit every 10 companies to avoid memory issues
|
||||
if save_to_db and db and (idx + 1) % 10 == 0:
|
||||
db.commit()
|
||||
print(f"\n💾 Committed batch at row {idx + 1}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error processing row {idx + 1}: {e}")
|
||||
if db:
|
||||
db.rollback()
|
||||
continue
|
||||
|
||||
# Final commit
|
||||
if save_to_db and db:
|
||||
db.commit()
|
||||
print("\n✅ Final commit completed")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing row {idx}: {e}")
|
||||
print(f"❌ Fatal error in parse_companies: {e}")
|
||||
if db:
|
||||
db.rollback()
|
||||
finally:
|
||||
if db:
|
||||
db.close()
|
||||
|
||||
return companies
|
||||
print(f"\n🎉 Completed! Processed {len(results)}/{total_rows} companies")
|
||||
return results
|
||||
|
||||
|
||||
# async def main():
|
||||
|
||||
Reference in New Issue
Block a user