643 lines
25 KiB
Python
643 lines
25 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
from typing import Optional
|
|
|
|
import pandas as pd
|
|
from db.db import get_db_session
|
|
from db.models import (
|
|
CompanyMember,
|
|
CompanyTable,
|
|
FundTable,
|
|
InvestorMember,
|
|
InvestorTable,
|
|
SectorTable,
|
|
)
|
|
from langchain_openai import ChatOpenAI
|
|
from pydantic import BaseModel
|
|
from schemas.py_schemas import CompanyData, InvestorData
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
class CurrencyConversion(BaseModel):
|
|
"""Schema for LLM currency conversion responses"""
|
|
|
|
amount_usd: int = 0
|
|
confidence: str = "high" # high, medium, low
|
|
notes: str = ""
|
|
|
|
|
|
class InvestorProcessor:
|
|
def __init__(self):
|
|
self.llm = ChatOpenAI(
|
|
api_key=os.getenv("OPENROUTER_API_KEY"),
|
|
base_url="https://openrouter.ai/api/v1",
|
|
model="openai/gpt-4o-mini",
|
|
temperature=0,
|
|
)
|
|
|
|
# Only use structured LLM for currency conversion
|
|
self.currency_converter_llm = self.llm.with_structured_output(
|
|
CurrencyConversion
|
|
)
|
|
# Keep legacy structured LLMs for backward compatibility
|
|
self.investor_structured_llm = self.llm.with_structured_output(InvestorData)
|
|
self.company_structured_llm = self.llm.with_structured_output(CompanyData)
|
|
|
|
async def convert_to_usd(self, amount_str: str) -> Optional[int]:
|
|
"""
|
|
Use LLM to convert currency amounts to USD integers.
|
|
Handles formats like:
|
|
- "EUR 850,000,000"
|
|
- "$5M"
|
|
- "GBP 10-20 million"
|
|
- "Approximately EUR 100 million"
|
|
"""
|
|
if not amount_str or amount_str == "Not Available" or amount_str == "0":
|
|
return None
|
|
|
|
try:
|
|
prompt = f"""Convert this amount to USD as an integer (whole number, no decimals).
|
|
If it's a range, use the midpoint. If already in USD, just extract the number.
|
|
Remove all commas and convert millions/billions to actual numbers.
|
|
|
|
Amount: {amount_str}
|
|
|
|
Examples:
|
|
- "EUR 850,000,000" -> 935000000 (assuming EUR to USD rate ~1.10)
|
|
- "$5M" -> 5000000
|
|
- "GBP 10-20 million" -> 18000000 (midpoint 15M * 1.20 rate)
|
|
- "Approximately EUR 100 million" -> 110000000
|
|
|
|
Return only the USD integer amount with current exchange rates."""
|
|
|
|
result = await self.currency_converter_llm.ainvoke(prompt)
|
|
return result.amount_usd if result.amount_usd > 0 else None
|
|
except Exception as e:
|
|
print(f"Error converting currency '{amount_str}': {e}")
|
|
return None
|
|
|
|
def parse_json_profile(self, json_str: str) -> Optional[dict]:
|
|
"""
|
|
Manually parse the JSON profile from the CSV.
|
|
Returns a cleaned dictionary with the investor profile data.
|
|
"""
|
|
if not json_str or pd.isna(json_str):
|
|
return None
|
|
|
|
try:
|
|
# Parse JSON string
|
|
profile = json.loads(json_str)
|
|
return profile
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error parsing JSON: {e}")
|
|
return None
|
|
|
|
async def process_investor_profile(
|
|
self, name: str, website: str, profile_json: str
|
|
) -> Optional[dict]:
|
|
"""
|
|
Process investor profile from CSV data.
|
|
Manually extracts fields and uses LLM only for currency conversion.
|
|
"""
|
|
profile = self.parse_json_profile(profile_json)
|
|
if not profile:
|
|
return None
|
|
|
|
try:
|
|
# Extract basic info
|
|
investor_data = {
|
|
"name": name.strip() if name else None,
|
|
"website": website.strip() if website else None,
|
|
"headquarters": profile.get("headquarters"),
|
|
"description": profile.get("investorDescription"),
|
|
"aum": None,
|
|
"aum_as_of_date": None,
|
|
"aum_source_url": None,
|
|
"investment_thesis": profile.get("investmentThesisFocus", []),
|
|
"portfolio_highlights": profile.get("portfolioHighlights", []),
|
|
"linked_documents": profile.get("linkedDocuments", []),
|
|
"researcher_notes": profile.get("researcherNotes"),
|
|
"missing_important_fields": profile.get("missingImportantFields", []),
|
|
"sources": profile.get("sources", {}),
|
|
"team_members": [],
|
|
"funds": [],
|
|
}
|
|
|
|
# Process AUM
|
|
aum_data = profile.get("overallAssetsUnderManagement", {})
|
|
if aum_data and isinstance(aum_data, dict):
|
|
aum_amount = aum_data.get("aumAmount")
|
|
if aum_amount and aum_amount != "Not Available":
|
|
# Convert AUM to USD integer
|
|
aum_usd = await self.convert_to_usd(aum_amount)
|
|
investor_data["aum"] = aum_usd
|
|
investor_data["aum_as_of_date"] = aum_data.get("asOfDate")
|
|
investor_data["aum_source_url"] = aum_data.get("sourceUrl")
|
|
|
|
# Process senior leadership
|
|
senior_leadership = profile.get("seniorLeadership", [])
|
|
for member in senior_leadership:
|
|
if isinstance(member, dict) and member.get("name"):
|
|
investor_data["team_members"].append(
|
|
{
|
|
"name": member.get("name"),
|
|
"title": member.get("title"),
|
|
"role": member.get("title"), # Use title as role
|
|
"email": None,
|
|
"source_url": member.get("sourceUrl"),
|
|
}
|
|
)
|
|
|
|
# Process funds
|
|
funds = profile.get("funds", [])
|
|
for fund in funds:
|
|
if isinstance(fund, dict):
|
|
fund_data = {
|
|
"fund_name": fund.get("fundName"),
|
|
"fund_size": None,
|
|
"fund_size_source_url": fund.get("fundSizeSourceUrl"),
|
|
"estimated_investment_size": None,
|
|
"source_url": fund.get("sourceUrl"),
|
|
"source_provider": fund.get("sourceProvider"),
|
|
"geographic_focus": fund.get("geographicFocus", []),
|
|
"investment_stage_focus": fund.get("investmentStageFocus", []),
|
|
"sector_focus": fund.get("sectorFocus", []),
|
|
}
|
|
|
|
# Convert fund size to USD
|
|
fund_size_str = fund.get("fundSize")
|
|
if fund_size_str and fund_size_str != "Not Available":
|
|
fund_size_usd = await self.convert_to_usd(fund_size_str)
|
|
if fund_size_usd:
|
|
fund_data["fund_size"] = str(fund_size_usd)
|
|
|
|
# Convert estimated investment size
|
|
est_size_str = fund.get("estimatedInvestmentSize")
|
|
if est_size_str and est_size_str != "Not Available":
|
|
est_size_usd = await self.convert_to_usd(est_size_str)
|
|
if est_size_usd:
|
|
fund_data["estimated_investment_size"] = str(est_size_usd)
|
|
|
|
investor_data["funds"].append(fund_data)
|
|
|
|
return investor_data
|
|
|
|
except Exception as e:
|
|
print(f"Error processing investor profile for {name}: {e}")
|
|
return None
|
|
|
|
def _save_parsed_investor_to_db(
|
|
self, db: Session, investor_data: dict
|
|
) -> Optional[InvestorTable]:
|
|
"""Save manually parsed investor data to database"""
|
|
try:
|
|
# Check if investor already exists
|
|
existing_investor = (
|
|
db.query(InvestorTable).filter_by(name=investor_data["name"]).first()
|
|
)
|
|
|
|
if existing_investor:
|
|
# Update existing investor
|
|
investor = existing_investor
|
|
investor.website = investor_data.get("website") or investor.website
|
|
investor.headquarters = (
|
|
investor_data.get("headquarters") or investor.headquarters
|
|
)
|
|
investor.description = (
|
|
investor_data.get("description") or investor.description
|
|
)
|
|
investor.aum = investor_data.get("aum") or investor.aum
|
|
investor.aum_as_of_date = (
|
|
investor_data.get("aum_as_of_date") or investor.aum_as_of_date
|
|
)
|
|
investor.aum_source_url = (
|
|
investor_data.get("aum_source_url") or investor.aum_source_url
|
|
)
|
|
investor.investment_thesis = (
|
|
investor_data.get("investment_thesis") or investor.investment_thesis
|
|
)
|
|
investor.portfolio_highlights = (
|
|
investor_data.get("portfolio_highlights")
|
|
or investor.portfolio_highlights
|
|
)
|
|
investor.linked_documents = (
|
|
investor_data.get("linked_documents") or investor.linked_documents
|
|
)
|
|
investor.researcher_notes = (
|
|
investor_data.get("researcher_notes") or investor.researcher_notes
|
|
)
|
|
investor.missing_important_fields = (
|
|
investor_data.get("missing_important_fields")
|
|
or investor.missing_important_fields
|
|
)
|
|
investor.sources = investor_data.get("sources") or investor.sources
|
|
else:
|
|
# Create new investor
|
|
investor = InvestorTable(
|
|
name=investor_data["name"],
|
|
website=investor_data.get("website"),
|
|
headquarters=investor_data.get("headquarters"),
|
|
description=investor_data.get("description"),
|
|
aum=investor_data.get("aum"),
|
|
aum_as_of_date=investor_data.get("aum_as_of_date"),
|
|
aum_source_url=investor_data.get("aum_source_url"),
|
|
investment_thesis=investor_data.get("investment_thesis"),
|
|
portfolio_highlights=investor_data.get("portfolio_highlights"),
|
|
linked_documents=investor_data.get("linked_documents"),
|
|
researcher_notes=investor_data.get("researcher_notes"),
|
|
missing_important_fields=investor_data.get(
|
|
"missing_important_fields"
|
|
),
|
|
sources=investor_data.get("sources"),
|
|
)
|
|
db.add(investor)
|
|
db.flush()
|
|
|
|
# Add/update team members
|
|
# First, remove existing team members if updating
|
|
if existing_investor:
|
|
db.query(InvestorMember).filter_by(investor_id=investor.id).delete()
|
|
|
|
for member_data in investor_data.get("team_members", []):
|
|
member = InvestorMember(
|
|
name=member_data.get("name"),
|
|
role=member_data.get("role"),
|
|
title=member_data.get("title"),
|
|
email=member_data.get("email"),
|
|
source_url=member_data.get("source_url"),
|
|
investor_id=investor.id,
|
|
)
|
|
db.add(member)
|
|
|
|
# Add/update funds
|
|
# First, remove existing funds if updating
|
|
if existing_investor:
|
|
db.query(FundTable).filter_by(investor_id=investor.id).delete()
|
|
|
|
for fund_data in investor_data.get("funds", []):
|
|
fund = FundTable(
|
|
investor_id=investor.id,
|
|
fund_name=fund_data.get("fund_name"),
|
|
fund_size=fund_data.get("fund_size"),
|
|
fund_size_source_url=fund_data.get("fund_size_source_url"),
|
|
estimated_investment_size=fund_data.get(
|
|
"estimated_investment_size"
|
|
),
|
|
source_url=fund_data.get("source_url"),
|
|
source_provider=fund_data.get("source_provider"),
|
|
geographic_focus=fund_data.get("geographic_focus"),
|
|
investment_stage_focus=fund_data.get("investment_stage_focus"),
|
|
sector_focus=fund_data.get("sector_focus"),
|
|
)
|
|
db.add(fund)
|
|
|
|
return investor
|
|
|
|
except Exception as e:
|
|
print(f"Error saving investor to database: {e}")
|
|
db.rollback()
|
|
return None
|
|
|
|
def _get_or_create_sector(self, db: Session, sector_name: str) -> SectorTable:
|
|
"""Get existing sector or create new one"""
|
|
sector = db.query(SectorTable).filter(SectorTable.name == sector_name).first()
|
|
if not sector:
|
|
sector = SectorTable(name=sector_name)
|
|
db.add(sector)
|
|
db.flush() # Get the ID without committing
|
|
return sector
|
|
|
|
def _save_investor_to_db(
|
|
self, db: Session, investor_data: InvestorData
|
|
) -> InvestorTable:
|
|
"""Save investor data to database"""
|
|
# Create investor record
|
|
investor = InvestorTable(
|
|
name=investor_data.investor.name,
|
|
description=investor_data.investor.description,
|
|
aum=investor_data.investor.aum,
|
|
check_size_lower=investor_data.investor.check_size_lower,
|
|
check_size_upper=investor_data.investor.check_size_upper,
|
|
geographic_focus=investor_data.investor.geographic_focus,
|
|
number_of_investments=investor_data.investor.number_of_investments,
|
|
)
|
|
db.add(investor)
|
|
db.flush() # Get the ID
|
|
|
|
# Add team members
|
|
for member_data in investor_data.team_members:
|
|
member = InvestorMember(
|
|
name=member_data.name,
|
|
role=member_data.role,
|
|
email=member_data.email,
|
|
investor_id=investor.id,
|
|
)
|
|
db.add(member)
|
|
|
|
# Add sectors
|
|
for sector_data in investor_data.sectors:
|
|
sector = self._get_or_create_sector(db, sector_data.name)
|
|
investor.sectors.append(sector)
|
|
|
|
# Add portfolio companies
|
|
for company_schema in investor_data.portfolio_companies:
|
|
# Convert CompanySchema to CompanyData format
|
|
company_data = CompanyData(
|
|
company=company_schema,
|
|
sectors=[], # Will be empty for portfolio companies
|
|
members=[], # Will be empty for portfolio companies
|
|
investors=[], # Will be empty for portfolio companies
|
|
)
|
|
company = self._save_company_to_db(db, company_data, skip_investors=True)
|
|
investor.portfolio_companies.append(company)
|
|
|
|
return investor
|
|
|
|
def _save_company_to_db(
|
|
self, db: Session, company_data: CompanyData, skip_investors: bool = False
|
|
) -> CompanyTable:
|
|
"""Save company data to database"""
|
|
# Check if company already exists
|
|
existing_company = (
|
|
db.query(CompanyTable)
|
|
.filter(CompanyTable.name == company_data.company.name)
|
|
.first()
|
|
)
|
|
if existing_company:
|
|
return existing_company
|
|
|
|
# Create company record
|
|
company = CompanyTable(
|
|
name=company_data.company.name,
|
|
industry=company_data.company.industry,
|
|
location=company_data.company.location,
|
|
description=company_data.company.description,
|
|
founded_year=company_data.company.founded_year,
|
|
website=company_data.company.website,
|
|
)
|
|
db.add(company)
|
|
db.flush() # Get the ID
|
|
|
|
# Add company members
|
|
for member_data in company_data.members:
|
|
if member_data.name: # Only add members with names
|
|
member = CompanyMember(
|
|
name=member_data.name,
|
|
linkedin=member_data.linkedin,
|
|
role=member_data.role,
|
|
company_id=company.id,
|
|
)
|
|
db.add(member)
|
|
|
|
# Add sectors
|
|
for sector_data in company_data.sectors:
|
|
sector = self._get_or_create_sector(db, sector_data.name)
|
|
company.sectors.append(sector)
|
|
|
|
# Add investors (if not skipping to avoid circular references)
|
|
if not skip_investors:
|
|
for investor_data in company_data.investors:
|
|
# Look for existing investor by name
|
|
existing_investor = (
|
|
db.query(InvestorTable)
|
|
.filter(InvestorTable.name == investor_data.name)
|
|
.first()
|
|
)
|
|
if existing_investor:
|
|
company.investors.append(existing_investor)
|
|
|
|
return company
|
|
|
|
async def _process_row(
|
|
self, row: pd.Series, row_idx: int, is_investor: bool = True
|
|
) -> Optional[InvestorData | CompanyData]:
|
|
"""Process a single row of data"""
|
|
# Clean values to remove control characters
|
|
cleaned_row = {}
|
|
for key, value in row.items():
|
|
if pd.notna(value):
|
|
# Convert to string and clean control characters
|
|
clean_value = (
|
|
str(value).replace("\n", " ").replace("\r", " ").replace("\t", " ")
|
|
)
|
|
# Remove other control characters
|
|
clean_value = "".join(
|
|
char
|
|
for char in clean_value
|
|
if ord(char) >= 32 or char in ["\n", "\r", "\t"]
|
|
)
|
|
cleaned_row[key] = clean_value
|
|
|
|
row_str = ", ".join([f"{key}: {value}" for key, value in cleaned_row.items()])
|
|
try:
|
|
print(f"Processing row {row_idx + 1}...")
|
|
if is_investor:
|
|
result = await self.investor_structured_llm.ainvoke(row_str)
|
|
else:
|
|
result = await self.company_structured_llm.ainvoke(row_str)
|
|
if result:
|
|
return result.model_dump()
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error processing row {row_idx + 1}: {e}")
|
|
return None
|
|
|
|
async def parse_investors(self, df: pd.DataFrame, save_to_db: bool = True):
|
|
"""
|
|
Parse investors from DataFrame using manual JSON parsing and LLM for currency conversion.
|
|
Expected CSV columns: Name, Website, Final Investor Profile, Final Profile sourcing
|
|
"""
|
|
results = []
|
|
db = None
|
|
if save_to_db:
|
|
db = get_db_session()
|
|
|
|
try:
|
|
total_rows = len(df)
|
|
print(f"\n🚀 Starting to process {total_rows} investors...")
|
|
|
|
for idx, row in df.iterrows():
|
|
try:
|
|
name = (
|
|
row.get("Name", "").strip()
|
|
if pd.notna(row.get("Name"))
|
|
else None
|
|
)
|
|
website = (
|
|
row.get("Website", "").strip()
|
|
if pd.notna(row.get("Website"))
|
|
else None
|
|
)
|
|
profile_json = (
|
|
row.get("Final Investor Profile", "")
|
|
if pd.notna(row.get("Final Investor Profile"))
|
|
else None
|
|
)
|
|
|
|
if not name or not profile_json:
|
|
print(f"⚠️ Row {idx + 1}: Skipping - missing name or profile")
|
|
continue
|
|
|
|
print(f"\n📊 Processing {idx + 1}/{total_rows}: {name}")
|
|
|
|
# Process the investor profile
|
|
investor_data = await self.process_investor_profile(
|
|
name, website, profile_json
|
|
)
|
|
|
|
if investor_data:
|
|
results.append(investor_data)
|
|
print(" ✓ Parsed successfully")
|
|
print(f" - HQ: {investor_data.get('headquarters')}")
|
|
print(
|
|
f" - AUM: ${investor_data.get('aum'):,}"
|
|
if investor_data.get("aum")
|
|
else " - AUM: Not Available"
|
|
)
|
|
print(f" - Funds: {len(investor_data.get('funds', []))}")
|
|
print(
|
|
f" - Team: {len(investor_data.get('team_members', []))}"
|
|
)
|
|
|
|
# Save to database
|
|
if save_to_db and db:
|
|
try:
|
|
saved_investor = self._save_parsed_investor_to_db(
|
|
db, investor_data
|
|
)
|
|
if saved_investor:
|
|
db.commit()
|
|
print(
|
|
f" ✅ Saved to database (ID: {saved_investor.id})"
|
|
)
|
|
else:
|
|
print(" ❌ Failed to save to database")
|
|
except Exception as e:
|
|
db.rollback()
|
|
print(f" ❌ Database error: {e}")
|
|
else:
|
|
print(" ⚠️ Failed to process profile")
|
|
|
|
# Commit every 10 investors to avoid memory issues
|
|
if save_to_db and db and (idx + 1) % 10 == 0:
|
|
db.commit()
|
|
print(f"\n💾 Committed batch at row {idx + 1}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error processing row {idx + 1}: {e}")
|
|
if db:
|
|
db.rollback()
|
|
continue
|
|
|
|
# Final commit
|
|
if save_to_db and db:
|
|
db.commit()
|
|
print("\n✅ Final commit completed")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Fatal error in parse_investors: {e}")
|
|
if db:
|
|
db.rollback()
|
|
finally:
|
|
if db:
|
|
db.close()
|
|
|
|
print(f"\n🎉 Completed! Processed {len(results)}/{total_rows} investors")
|
|
return results
|
|
|
|
async def parse_companies(self, df, save_to_db: bool = True):
|
|
"""Parse companies from DataFrame and optionally save to database"""
|
|
companies = []
|
|
df = df[20:]
|
|
db = None
|
|
if save_to_db:
|
|
db = get_db_session()
|
|
|
|
try:
|
|
# Process rows in batches asynchronously
|
|
batch_size = 20 # Adjust batch size as needed
|
|
rows = [(idx, row) for idx, row in df.iterrows()]
|
|
|
|
for i in range(0, len(rows), batch_size):
|
|
batch = rows[i : i + batch_size]
|
|
|
|
# Process batch asynchronously
|
|
tasks = [
|
|
self._process_row(row, idx, is_investor=False) for idx, row in batch
|
|
]
|
|
|
|
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Handle results from batch
|
|
for (idx, row), result in zip(batch, batch_results):
|
|
if isinstance(result, Exception):
|
|
print(f"Error processing row {idx}: {result}")
|
|
if db:
|
|
db.rollback()
|
|
continue
|
|
|
|
if result:
|
|
# Convert dict to CompanyData if needed
|
|
if isinstance(result, dict):
|
|
company_data = CompanyData(**result)
|
|
else:
|
|
company_data = result
|
|
|
|
companies.append(company_data)
|
|
|
|
# Save to database if requested
|
|
if save_to_db and db:
|
|
try:
|
|
saved_company = self._save_company_to_db(
|
|
db, company_data
|
|
)
|
|
db.commit()
|
|
print(
|
|
f"✅ Saved company '{saved_company.name}' to database"
|
|
)
|
|
except Exception as e:
|
|
db.rollback()
|
|
print(f"❌ Failed to save company to database: {e}")
|
|
|
|
print(
|
|
f"Completed batch {i // batch_size + 1} of {(len(rows) + batch_size - 1) // batch_size}"
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing row {idx}: {e}")
|
|
if db:
|
|
db.rollback()
|
|
finally:
|
|
if db:
|
|
db.close()
|
|
|
|
return companies
|
|
|
|
|
|
# async def main():
|
|
# """Main execution function"""
|
|
# # Initialize database tables
|
|
# print("🔧 Initializing database...")
|
|
# init_database()
|
|
|
|
# # Create processor
|
|
# processor = InvestorProcessor()
|
|
|
|
# print("📊 Processing companies...")
|
|
# companies = await processor.parse_companies(
|
|
# "data/19 Companies data.csv", save_to_db=True
|
|
# )
|
|
# print(f"Processed {len(companies)} companies")
|
|
|
|
# print("\n💰 Processing investors...")
|
|
# investors = await processor.parse_investors(
|
|
# "data/19 Investors data.csv", save_to_db=True
|
|
# )
|
|
# print(f"Processed {len(investors)} investors")
|
|
# print("\n✨ Processing complete!")
|
|
|
|
|
|
# if __name__ == "__main__":
|
|
# asyncio.run(main())
|