Added funds table
This commit is contained in:
@@ -0,0 +1,287 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pandas as pd
|
||||
from models import FundTable, InvestorMember, InvestorTable, engine, init_database
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Initialize database (create tables if they don't exist)
|
||||
init_database()
|
||||
|
||||
|
||||
def clean_value(value):
|
||||
"""Clean values, converting 'Not Available', 'null', etc. to None"""
|
||||
if pd.isna(value):
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
if value.strip() in ["Not Available", "null", "None", "", "0", "N/A"]:
|
||||
return None
|
||||
return value
|
||||
|
||||
|
||||
def parse_json_safely(json_str):
|
||||
"""Safely parse JSON string"""
|
||||
try:
|
||||
if pd.isna(json_str) or json_str == "":
|
||||
return None
|
||||
if isinstance(json_str, dict):
|
||||
return json_str
|
||||
return json.loads(json_str)
|
||||
except (json.JSONDecodeError, TypeError) as e:
|
||||
logger.error(f"Error parsing JSON: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def enrich_investors(
|
||||
csv_file_path: str,
|
||||
investor_name_column: str = "investor_name",
|
||||
enriched_data_column: str = "enriched_data",
|
||||
):
|
||||
"""
|
||||
Enrich investors from CSV containing enriched JSON data.
|
||||
|
||||
Args:
|
||||
csv_file_path: Path to CSV file with enriched investor data
|
||||
investor_name_column: Column name containing investor name
|
||||
enriched_data_column: Column name containing JSON data
|
||||
"""
|
||||
Session = sessionmaker(bind=engine)
|
||||
session = Session()
|
||||
|
||||
# Load enriched data
|
||||
logger.info(f"Loading enriched investors from: {csv_file_path}")
|
||||
enriched_df = pd.read_csv(csv_file_path)
|
||||
|
||||
logger.info(f"📊 Enriched Investors CSV: {len(enriched_df)} rows")
|
||||
|
||||
investors_updated = 0
|
||||
investors_created = 0
|
||||
funds_created = 0
|
||||
team_members_created = 0
|
||||
investors_not_found = []
|
||||
errors = []
|
||||
|
||||
for index, row in enriched_df.iterrows():
|
||||
try:
|
||||
# Parse the JSON data column
|
||||
investor_data = parse_json_safely(row.get(enriched_data_column))
|
||||
|
||||
if not investor_data:
|
||||
logger.warning(f"Row {index}: No valid JSON data")
|
||||
continue
|
||||
|
||||
# Get investor name from row or JSON
|
||||
investor_name = row.get(investor_name_column)
|
||||
if not investor_name and investor_data.get("websiteURL"):
|
||||
# Try to match by website if name not in CSV
|
||||
investor_name = None
|
||||
website = clean_value(investor_data.get("websiteURL"))
|
||||
|
||||
# Find or create investor
|
||||
investor = None
|
||||
if investor_name:
|
||||
investor = (
|
||||
session.query(InvestorTable).filter_by(name=investor_name).first()
|
||||
)
|
||||
|
||||
if not investor and investor_data.get("websiteURL"):
|
||||
website = clean_value(investor_data.get("websiteURL"))
|
||||
investor = (
|
||||
session.query(InvestorTable).filter_by(website=website).first()
|
||||
)
|
||||
|
||||
# Create new investor if not found
|
||||
if not investor:
|
||||
if not investor_name:
|
||||
logger.warning(f"Row {index}: No investor name found, skipping")
|
||||
continue
|
||||
|
||||
investor = InvestorTable(name=investor_name)
|
||||
session.add(investor)
|
||||
session.flush() # Get ID for new investor
|
||||
investors_created += 1
|
||||
logger.info(f"Created new investor: {investor_name}")
|
||||
else:
|
||||
investors_updated += 1
|
||||
|
||||
# Update investor fields
|
||||
investor.description = (
|
||||
clean_value(investor_data.get("investorDescription"))
|
||||
or investor.description
|
||||
)
|
||||
investor.website = (
|
||||
clean_value(investor_data.get("websiteURL")) or investor.website
|
||||
)
|
||||
investor.headquarters = (
|
||||
clean_value(investor_data.get("headquarters")) or investor.headquarters
|
||||
)
|
||||
|
||||
# Handle AUM
|
||||
aum_data = investor_data.get("overallAssetsUnderManagement", {})
|
||||
if aum_data:
|
||||
investor.aum = clean_value(aum_data.get("aumAmount"))
|
||||
investor.aum_as_of_date = clean_value(aum_data.get("asOfDate"))
|
||||
investor.aum_source_url = clean_value(aum_data.get("sourceUrl"))
|
||||
|
||||
# Handle investment thesis (stored as JSON array)
|
||||
thesis = investor_data.get("investmentThesisFocus")
|
||||
if thesis:
|
||||
investor.investment_thesis = thesis
|
||||
|
||||
# Handle portfolio highlights (stored as JSON array)
|
||||
portfolio = investor_data.get("portfolioHighlights")
|
||||
if portfolio:
|
||||
investor.portfolio_highlights = portfolio
|
||||
|
||||
# Handle linked documents
|
||||
linked_docs = investor_data.get("linkedDocuments")
|
||||
if linked_docs:
|
||||
investor.linked_documents = linked_docs
|
||||
|
||||
# Handle researcher notes
|
||||
notes = investor_data.get("researcherNotes")
|
||||
if notes:
|
||||
investor.researcher_notes = clean_value(notes)
|
||||
|
||||
# Handle missing important fields
|
||||
missing_fields = investor_data.get("missingImportantFields")
|
||||
if missing_fields:
|
||||
investor.missing_important_fields = missing_fields
|
||||
|
||||
# Handle sources
|
||||
sources = investor_data.get("sources")
|
||||
if sources:
|
||||
investor.sources = sources
|
||||
|
||||
# Process senior leadership / team members
|
||||
leadership = investor_data.get("seniorLeadership", [])
|
||||
for member_data in leadership:
|
||||
# Check if member already exists
|
||||
member_name = clean_value(member_data.get("name"))
|
||||
if not member_name:
|
||||
continue
|
||||
|
||||
existing_member = (
|
||||
session.query(InvestorMember)
|
||||
.filter_by(investor_id=investor.id, name=member_name)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not existing_member:
|
||||
member = InvestorMember(
|
||||
investor_id=investor.id,
|
||||
name=member_name,
|
||||
title=clean_value(member_data.get("title")),
|
||||
role=clean_value(member_data.get("title")), # Use title as role
|
||||
source_url=clean_value(member_data.get("sourceUrl")),
|
||||
)
|
||||
session.add(member)
|
||||
team_members_created += 1
|
||||
|
||||
# Process funds
|
||||
funds = investor_data.get("funds", [])
|
||||
for fund_data in funds:
|
||||
# Check if fund already exists (by name and investor)
|
||||
fund_name = clean_value(fund_data.get("fundName"))
|
||||
|
||||
# Always create new fund or update if exists
|
||||
existing_fund = None
|
||||
if fund_name:
|
||||
existing_fund = (
|
||||
session.query(FundTable)
|
||||
.filter_by(investor_id=investor.id, fund_name=fund_name)
|
||||
.first()
|
||||
)
|
||||
|
||||
if existing_fund:
|
||||
# Update existing fund
|
||||
fund = existing_fund
|
||||
else:
|
||||
# Create new fund
|
||||
fund = FundTable(investor_id=investor.id)
|
||||
session.add(fund)
|
||||
funds_created += 1
|
||||
|
||||
# Update fund fields
|
||||
fund.fund_name = fund_name
|
||||
fund.fund_size = clean_value(fund_data.get("fundSize"))
|
||||
fund.fund_size_source_url = clean_value(
|
||||
fund_data.get("fundSizeSourceUrl")
|
||||
)
|
||||
fund.estimated_investment_size = clean_value(
|
||||
fund_data.get("estimatedInvestmentSize")
|
||||
)
|
||||
fund.source_url = clean_value(fund_data.get("sourceUrl"))
|
||||
fund.source_provider = clean_value(fund_data.get("sourceProvider"))
|
||||
fund.geographic_focus = fund_data.get("geographicFocus")
|
||||
fund.investment_stage_focus = fund_data.get("investmentStageFocus")
|
||||
fund.sector_focus = fund_data.get("sectorFocus")
|
||||
|
||||
# Commit every 10 investors
|
||||
if (investors_updated + investors_created) % 10 == 0:
|
||||
session.commit()
|
||||
logger.info(
|
||||
f" Processed {investors_updated + investors_created} investors, "
|
||||
f"created {funds_created} funds, {team_members_created} team members"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing row {index}: {e}")
|
||||
session.rollback()
|
||||
errors.append({"row": index, "error": str(e)})
|
||||
continue
|
||||
|
||||
# Final commit
|
||||
session.commit()
|
||||
|
||||
# Print summary
|
||||
logger.info("\n" + "=" * 60)
|
||||
logger.info("🎉 ENRICHMENT COMPLETE!")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f" Investors Updated: {investors_updated}")
|
||||
logger.info(f" Investors Created: {investors_created}")
|
||||
logger.info(f" Funds Created: {funds_created}")
|
||||
logger.info(f" Team Members Created: {team_members_created}")
|
||||
logger.info(f" Errors: {len(errors)}")
|
||||
|
||||
if investors_not_found:
|
||||
logger.info(
|
||||
f"\n⚠️ Investors not found in database ({len(investors_not_found)}):"
|
||||
)
|
||||
for name in investors_not_found[:10]: # Show first 10
|
||||
logger.info(f" - {name}")
|
||||
if len(investors_not_found) > 10:
|
||||
logger.info(f" ... and {len(investors_not_found) - 10} more")
|
||||
|
||||
if errors:
|
||||
logger.info(f"\n❌ Errors encountered ({len(errors)}):")
|
||||
for error in errors[:5]: # Show first 5
|
||||
logger.info(f" Row {error['row']}: {error['error']}")
|
||||
if len(errors) > 5:
|
||||
logger.info(f" ... and {len(errors) - 5} more errors")
|
||||
|
||||
session.close()
|
||||
logger.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print(
|
||||
"Usage: python enrich_investors.py <csv_file_path> [investor_name_column] [enriched_data_column]"
|
||||
)
|
||||
print("\nExample:")
|
||||
print(" python enrich_investors.py enriched_investors.csv")
|
||||
print(" python enrich_investors.py enriched_investors.csv 'name' 'data'")
|
||||
sys.exit(1)
|
||||
|
||||
csv_file = sys.argv[1]
|
||||
investor_col = sys.argv[2] if len(sys.argv) > 2 else "investor_name"
|
||||
data_col = sys.argv[3] if len(sys.argv) > 3 else "enriched_data"
|
||||
|
||||
enrich_investors(csv_file, investor_col, data_col)
|
||||
Reference in New Issue
Block a user