import json import logging import pandas as pd from models import FundTable, InvestorMember, InvestorTable, engine, init_database from sqlalchemy.orm import sessionmaker # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize database (create tables if they don't exist) init_database() def clean_value(value): """Clean values, converting 'Not Available', 'null', etc. to None""" if pd.isna(value): return None if isinstance(value, str): if value.strip() in ["Not Available", "null", "None", "", "0", "N/A"]: return None return value def parse_json_safely(json_str): """Safely parse JSON string""" try: if pd.isna(json_str) or json_str == "": return None if isinstance(json_str, dict): return json_str return json.loads(json_str) except (json.JSONDecodeError, TypeError) as e: logger.error(f"Error parsing JSON: {e}") return None def enrich_investors( csv_file_path: str, investor_name_column: str = "investor_name", enriched_data_column: str = "enriched_data", ): """ Enrich investors from CSV containing enriched JSON data. Args: csv_file_path: Path to CSV file with enriched investor data investor_name_column: Column name containing investor name enriched_data_column: Column name containing JSON data """ Session = sessionmaker(bind=engine) session = Session() # Load enriched data logger.info(f"Loading enriched investors from: {csv_file_path}") enriched_df = pd.read_csv(csv_file_path) logger.info(f"šŸ“Š Enriched Investors CSV: {len(enriched_df)} rows") investors_updated = 0 investors_created = 0 funds_created = 0 team_members_created = 0 investors_not_found = [] errors = [] for index, row in enriched_df.iterrows(): try: # Parse the JSON data column investor_data = parse_json_safely(row.get(enriched_data_column)) if not investor_data: logger.warning(f"Row {index}: No valid JSON data") continue # Get investor name from row or JSON investor_name = row.get(investor_name_column) if not investor_name and investor_data.get("websiteURL"): # Try to match by website if name not in CSV investor_name = None website = clean_value(investor_data.get("websiteURL")) # Find or create investor investor = None if investor_name: investor = ( session.query(InvestorTable).filter_by(name=investor_name).first() ) if not investor and investor_data.get("websiteURL"): website = clean_value(investor_data.get("websiteURL")) investor = ( session.query(InvestorTable).filter_by(website=website).first() ) # Create new investor if not found if not investor: if not investor_name: logger.warning(f"Row {index}: No investor name found, skipping") continue investor = InvestorTable(name=investor_name) session.add(investor) session.flush() # Get ID for new investor investors_created += 1 logger.info(f"Created new investor: {investor_name}") else: investors_updated += 1 # Update investor fields investor.description = ( clean_value(investor_data.get("investorDescription")) or investor.description ) investor.website = ( clean_value(investor_data.get("websiteURL")) or investor.website ) investor.headquarters = ( clean_value(investor_data.get("headquarters")) or investor.headquarters ) # Handle AUM aum_data = investor_data.get("overallAssetsUnderManagement", {}) if aum_data: investor.aum = clean_value(aum_data.get("aumAmount")) investor.aum_as_of_date = clean_value(aum_data.get("asOfDate")) investor.aum_source_url = clean_value(aum_data.get("sourceUrl")) # Handle investment thesis (stored as JSON array) thesis = investor_data.get("investmentThesisFocus") if thesis: investor.investment_thesis = thesis # Handle portfolio highlights (stored as JSON array) portfolio = investor_data.get("portfolioHighlights") if portfolio: investor.portfolio_highlights = portfolio # Handle linked documents linked_docs = investor_data.get("linkedDocuments") if linked_docs: investor.linked_documents = linked_docs # Handle researcher notes notes = investor_data.get("researcherNotes") if notes: investor.researcher_notes = clean_value(notes) # Handle missing important fields missing_fields = investor_data.get("missingImportantFields") if missing_fields: investor.missing_important_fields = missing_fields # Handle sources sources = investor_data.get("sources") if sources: investor.sources = sources # Process senior leadership / team members leadership = investor_data.get("seniorLeadership", []) for member_data in leadership: # Check if member already exists member_name = clean_value(member_data.get("name")) if not member_name: continue existing_member = ( session.query(InvestorMember) .filter_by(investor_id=investor.id, name=member_name) .first() ) if not existing_member: member = InvestorMember( investor_id=investor.id, name=member_name, title=clean_value(member_data.get("title")), role=clean_value(member_data.get("title")), # Use title as role source_url=clean_value(member_data.get("sourceUrl")), ) session.add(member) team_members_created += 1 # Process funds funds = investor_data.get("funds", []) for fund_data in funds: # Check if fund already exists (by name and investor) fund_name = clean_value(fund_data.get("fundName")) # Always create new fund or update if exists existing_fund = None if fund_name: existing_fund = ( session.query(FundTable) .filter_by(investor_id=investor.id, fund_name=fund_name) .first() ) if existing_fund: # Update existing fund fund = existing_fund else: # Create new fund fund = FundTable(investor_id=investor.id) session.add(fund) funds_created += 1 # Update fund fields fund.fund_name = fund_name fund.fund_size = clean_value(fund_data.get("fundSize")) fund.fund_size_source_url = clean_value( fund_data.get("fundSizeSourceUrl") ) fund.estimated_investment_size = clean_value( fund_data.get("estimatedInvestmentSize") ) fund.source_url = clean_value(fund_data.get("sourceUrl")) fund.source_provider = clean_value(fund_data.get("sourceProvider")) fund.geographic_focus = fund_data.get("geographicFocus") fund.investment_stage_focus = fund_data.get("investmentStageFocus") fund.sector_focus = fund_data.get("sectorFocus") # Commit every 10 investors if (investors_updated + investors_created) % 10 == 0: session.commit() logger.info( f" Processed {investors_updated + investors_created} investors, " f"created {funds_created} funds, {team_members_created} team members" ) except Exception as e: logger.error(f"Error processing row {index}: {e}") session.rollback() errors.append({"row": index, "error": str(e)}) continue # Final commit session.commit() # Print summary logger.info("\n" + "=" * 60) logger.info("šŸŽ‰ ENRICHMENT COMPLETE!") logger.info("=" * 60) logger.info(f" Investors Updated: {investors_updated}") logger.info(f" Investors Created: {investors_created}") logger.info(f" Funds Created: {funds_created}") logger.info(f" Team Members Created: {team_members_created}") logger.info(f" Errors: {len(errors)}") if investors_not_found: logger.info( f"\nāš ļø Investors not found in database ({len(investors_not_found)}):" ) for name in investors_not_found[:10]: # Show first 10 logger.info(f" - {name}") if len(investors_not_found) > 10: logger.info(f" ... and {len(investors_not_found) - 10} more") if errors: logger.info(f"\nāŒ Errors encountered ({len(errors)}):") for error in errors[:5]: # Show first 5 logger.info(f" Row {error['row']}: {error['error']}") if len(errors) > 5: logger.info(f" ... and {len(errors) - 5} more errors") session.close() logger.info("=" * 60) if __name__ == "__main__": import sys if len(sys.argv) < 2: print( "Usage: python enrich_investors.py [investor_name_column] [enriched_data_column]" ) print("\nExample:") print(" python enrich_investors.py enriched_investors.csv") print(" python enrich_investors.py enriched_investors.csv 'name' 'data'") sys.exit(1) csv_file = sys.argv[1] investor_col = sys.argv[2] if len(sys.argv) > 2 else "investor_name" data_col = sys.argv[3] if len(sys.argv) > 3 else "enriched_data" enrich_investors(csv_file, investor_col, data_col)