import json import os import re from typing import Optional import pandas as pd from db.db import get_db_session from db.models import ( CompanyMember, CompanyTable, FundTable, InvestmentStageTable, InvestorMember, InvestorTable, SectorTable, ) from langchain_openai import ChatOpenAI from pydantic import BaseModel from schemas.py_schemas import CompanyData, InvestorData from sqlalchemy.orm import Session class CurrencyConversion(BaseModel): """Schema for LLM currency conversion responses""" amount_usd: int = 0 confidence: str = "high" # high, medium, low notes: str = "" class CheckSizeRange(BaseModel): """Schema for LLM check size range parsing from estimated investment size""" lower_bound_usd: int = 0 upper_bound_usd: int = 0 confidence: str = "high" # high, medium, low notes: str = "" class InvestorProcessor: def __init__(self): self.llm = ChatOpenAI( api_key=os.getenv("OPENROUTER_API_KEY"), base_url="https://openrouter.ai/api/v1", model="openai/gpt-4o-mini", temperature=0, ) # Structured LLMs for specific parsing tasks self.currency_converter_llm = self.llm.with_structured_output( CurrencyConversion ) self.check_size_parser_llm = self.llm.with_structured_output(CheckSizeRange) # Keep legacy structured LLMs for backward compatibility self.investor_structured_llm = self.llm.with_structured_output(InvestorData) self.company_structured_llm = self.llm.with_structured_output(CompanyData) async def convert_to_usd(self, amount_str: str) -> Optional[int]: """ Use LLM to convert currency amounts to USD integers. Handles formats like: - "EUR 850,000,000" - "$5M" - "GBP 10-20 million" - "Approximately EUR 100 million" """ if not amount_str or amount_str == "Not Available" or amount_str == "0": return None try: prompt = f"""Convert this amount to USD as an integer (whole number, no decimals). If it's a range, use the midpoint. If already in USD, just extract the number. Remove all commas and convert millions/billions to actual numbers. Amount: {amount_str} Examples: - "EUR 850,000,000" -> 935000000 (assuming EUR to USD rate ~1.10) - "$5M" -> 5000000 - "GBP 10-20 million" -> 18000000 (midpoint 15M * 1.20 rate) - "Approximately EUR 100 million" -> 110000000 Return only the USD integer amount with current exchange rates.""" result = await self.currency_converter_llm.ainvoke(prompt) return result.amount_usd if result.amount_usd > 0 else None except Exception as e: print(f"Error converting currency '{amount_str}': {e}") return None async def parse_check_size_range( self, estimated_investment_str: str ) -> tuple[Optional[int], Optional[int]]: """ Use LLM to parse check size range from estimated investment size string. Returns tuple of (lower_bound_usd, upper_bound_usd). Handles formats like: - "EUR 1,000 to 2,000" - "$100K-$500K" - "Between $1M and $5M" - "Up to EUR 10 million" - "$2M typical" """ if ( not estimated_investment_str or estimated_investment_str == "Not Available" or estimated_investment_str == "0" ): return None, None try: prompt = f"""Parse this check size/investment range into lower and upper bounds in USD as integers. Input: {estimated_investment_str} Instructions: - If it's a range (e.g., "EUR 1M to 5M"), extract both bounds - If it's a single amount (e.g., "$2M typical"), use it as both lower and upper - If it says "up to X", use 0 as lower and X as upper - Convert all currencies to USD using current exchange rates - Return integers (whole numbers, no decimals) Examples: - "EUR 1,000 to 2,000" -> lower: 1100, upper: 2200 - "$100K-$500K" -> lower: 100000, upper: 500000 - "Between $1M and $5M" -> lower: 1000000, upper: 5000000 - "Up to EUR 10 million" -> lower: 0, upper: 11000000 - "$2M typical" -> lower: 2000000, upper: 2000000 - "GBP 500K-2M" -> lower: 600000, upper: 2400000 Return the lower and upper bounds in USD.""" result = await self.check_size_parser_llm.ainvoke(prompt) lower = result.lower_bound_usd if result.lower_bound_usd > 0 else None upper = result.upper_bound_usd if result.upper_bound_usd > 0 else None return lower, upper except Exception as e: print(f"Error parsing check size range '{estimated_investment_str}': {e}") return None, None def parse_json_profile(self, json_str: str) -> Optional[dict]: """ Manually parse the JSON profile from the CSV. Returns a cleaned dictionary with the investor profile data. """ if not json_str or pd.isna(json_str): return None try: # Parse JSON string profile = json.loads(json_str) return profile except json.JSONDecodeError as e: print(f"Error parsing JSON: {e}") return None async def process_investor_profile( self, name: str, website: str, profile_json: str ) -> Optional[dict]: """ Process investor profile from CSV data. Manually extracts fields and uses LLM only for currency conversion. """ profile = self.parse_json_profile(profile_json) if not profile: return None try: # Extract basic info investor_data = { "name": name.strip() if name else None, "website": website.strip() if website else None, "headquarters": profile.get("headquarters"), "description": profile.get("investorDescription"), "aum": None, "aum_as_of_date": None, "aum_source_url": None, "investment_thesis": profile.get("investmentThesisFocus", []), "portfolio_highlights": profile.get("portfolioHighlights", []), "linked_documents": profile.get("linkedDocuments", []), "researcher_notes": profile.get("researcherNotes"), "missing_important_fields": profile.get("missingImportantFields", []), "sources": profile.get("sources", {}), "team_members": [], "funds": [], } # Process AUM aum_data = profile.get("overallAssetsUnderManagement", {}) if aum_data and isinstance(aum_data, dict): aum_amount = aum_data.get("aumAmount") if aum_amount and aum_amount != "Not Available": # Convert AUM to USD integer aum_usd = await self.convert_to_usd(aum_amount) investor_data["aum"] = aum_usd investor_data["aum_as_of_date"] = aum_data.get("asOfDate") investor_data["aum_source_url"] = aum_data.get("sourceUrl") # Process senior leadership senior_leadership = profile.get("seniorLeadership", []) for member in senior_leadership: if isinstance(member, dict) and member.get("name"): investor_data["team_members"].append( { "name": member.get("name"), "title": member.get("title"), "role": member.get("title"), # Use title as role "email": None, "source_url": member.get("sourceUrl"), } ) # Process funds funds = profile.get("funds", []) for fund in funds: if isinstance(fund, dict): fund_data = { "fund_name": fund.get("fundName"), "fund_size": None, "fund_size_source_url": fund.get("fundSizeSourceUrl"), "check_size_lower": None, "check_size_upper": None, "source_url": fund.get("sourceUrl"), "source_provider": fund.get("sourceProvider"), "geographic_focus": None, # Will be converted to string "investment_stage_names": fund.get("investmentStageFocus", []), "sector_names": fund.get("sectorFocus", []), } # Convert geographic focus from array to comma-separated string geo_focus = fund.get("geographicFocus", []) if geo_focus and isinstance(geo_focus, list): fund_data["geographic_focus"] = ", ".join(geo_focus) # Convert fund size to USD integer fund_size_str = fund.get("fundSize") if fund_size_str and fund_size_str != "Not Available": fund_size_usd = await self.convert_to_usd(fund_size_str) if fund_size_usd: fund_data["fund_size"] = fund_size_usd # Store as integer # Parse check size range from estimated investment size est_size_str = fund.get("estimatedInvestmentSize") if est_size_str and est_size_str != "Not Available": check_lower, check_upper = await self.parse_check_size_range( est_size_str ) if check_lower is not None: fund_data["check_size_lower"] = check_lower if check_upper is not None: fund_data["check_size_upper"] = check_upper investor_data["funds"].append(fund_data) return investor_data except Exception as e: print(f"Error processing investor profile for {name}: {e}") return None async def process_company_profile( self, name: str, website: str, profile_json: str, investor_names: str = None ) -> Optional[dict]: """ Process company profile from CSV data. Manually extracts fields without using LLM. """ profile = self.parse_json_profile(profile_json) if not profile: return None try: # Extract basic info company_data = { "name": name.strip() if name else None, "website": website.strip() if website else None, "description": profile.get("companyDescription"), "location": profile.get("geographicFocus"), "industry": profile.get("sectorDescription"), "founded_year": None, # Not typically in the company JSON "key_executives": [], "client_categories": profile.get("clientCategories", []), "product_description": profile.get("productDescription"), "linked_documents": profile.get("linkedDocuments", []), "researcher_notes": profile.get("researcherNotes"), "missing_important_fields": profile.get("missingImportantFields", []), "sources": profile.get("sources", {}), "investor_names": [], } # Parse investor names from the Investor column if investor_names and pd.notna(investor_names): # Split by comma and clean investors = [inv.strip() for inv in str(investor_names).split(",")] company_data["investor_names"] = [inv for inv in investors if inv] # Process key executives/leadership key_executives = profile.get("keyExecutives", []) if not key_executives: # Try alternative field names key_executives = profile.get("seniorLeadership", []) for exec_member in key_executives: if isinstance(exec_member, dict) and exec_member.get("name"): company_data["key_executives"].append( { "name": exec_member.get("name"), "title": exec_member.get("title"), "source_url": exec_member.get("sourceUrl"), } ) # Try to extract founding year from description description = company_data.get("description", "") if description: # Look for patterns like "founded in 2020", "Gegrรผndet 2020", "founded 2020" year_patterns = [ r"founded in (\d{4})", r"founded (\d{4})", r"Gegrรผndet (\d{4})", r"established in (\d{4})", r"since (\d{4})", r"\((\d{4})\)", # Year in parentheses ] for pattern in year_patterns: match = re.search(pattern, description, re.IGNORECASE) if match: try: year = int(match.group(1)) if 1900 <= year <= 2025: # Sanity check company_data["founded_year"] = year break except Exception: continue return company_data except Exception as e: print(f"Error processing company profile for {name}: {e}") return None def _save_parsed_company_to_db( self, db: Session, company_data: dict ) -> Optional[CompanyTable]: """Save manually parsed company data to database""" try: # Check if company already exists existing_company = ( db.query(CompanyTable).filter_by(name=company_data["name"]).first() ) if existing_company: # Update existing company company = existing_company company.website = company_data.get("website") or company.website company.location = company_data.get("location") or company.location company.description = ( company_data.get("description") or company.description ) company.industry = company_data.get("industry") or company.industry if company_data.get("founded_year"): company.founded_year = company_data["founded_year"] else: # Create new company company = CompanyTable( name=company_data["name"], website=company_data.get("website"), location=company_data.get("location"), description=company_data.get("description"), industry=company_data.get("industry"), founded_year=company_data.get("founded_year"), ) db.add(company) db.flush() # Add/update company members (key executives) # First, remove existing members if updating if existing_company: db.query(CompanyMember).filter_by(company_id=company.id).delete() for exec_data in company_data.get("key_executives", []): member = CompanyMember( name=exec_data.get("name"), role=exec_data.get("title"), linkedin=exec_data.get( "source_url" ), # Store source URL in linkedin field company_id=company.id, ) db.add(member) # Link to investors if provided for investor_name in company_data.get("investor_names", []): # Find investor in database investor = ( db.query(InvestorTable) .filter_by(name=investor_name.strip()) .first() ) if investor: # Add company to investor's portfolio if not already there if company not in investor.portfolio_companies: investor.portfolio_companies.append(company) return company except Exception as e: print(f"Error saving company to database: {e}") db.rollback() return None def _save_parsed_investor_to_db( self, db: Session, investor_data: dict ) -> Optional[InvestorTable]: """Save manually parsed investor data to database""" try: # Check if investor already exists existing_investor = ( db.query(InvestorTable).filter_by(name=investor_data["name"]).first() ) if existing_investor: # Update existing investor investor = existing_investor investor.website = investor_data.get("website") or investor.website investor.headquarters = ( investor_data.get("headquarters") or investor.headquarters ) investor.description = ( investor_data.get("description") or investor.description ) investor.aum = investor_data.get("aum") or investor.aum investor.aum_as_of_date = ( investor_data.get("aum_as_of_date") or investor.aum_as_of_date ) investor.aum_source_url = ( investor_data.get("aum_source_url") or investor.aum_source_url ) investor.investment_thesis = ( investor_data.get("investment_thesis") or investor.investment_thesis ) investor.portfolio_highlights = ( investor_data.get("portfolio_highlights") or investor.portfolio_highlights ) investor.linked_documents = ( investor_data.get("linked_documents") or investor.linked_documents ) investor.researcher_notes = ( investor_data.get("researcher_notes") or investor.researcher_notes ) investor.missing_important_fields = ( investor_data.get("missing_important_fields") or investor.missing_important_fields ) investor.sources = investor_data.get("sources") or investor.sources else: # Create new investor investor = InvestorTable( name=investor_data["name"], website=investor_data.get("website"), headquarters=investor_data.get("headquarters"), description=investor_data.get("description"), aum=investor_data.get("aum"), aum_as_of_date=investor_data.get("aum_as_of_date"), aum_source_url=investor_data.get("aum_source_url"), investment_thesis=investor_data.get("investment_thesis"), portfolio_highlights=investor_data.get("portfolio_highlights"), linked_documents=investor_data.get("linked_documents"), researcher_notes=investor_data.get("researcher_notes"), missing_important_fields=investor_data.get( "missing_important_fields" ), sources=investor_data.get("sources"), ) db.add(investor) db.flush() # Add/update team members # First, remove existing team members if updating if existing_investor: db.query(InvestorMember).filter_by(investor_id=investor.id).delete() for member_data in investor_data.get("team_members", []): member = InvestorMember( name=member_data.get("name"), role=member_data.get("role"), title=member_data.get("title"), email=member_data.get("email"), source_url=member_data.get("source_url"), investor_id=investor.id, ) db.add(member) # Add/update funds # First, remove existing funds if updating if existing_investor: db.query(FundTable).filter_by(investor_id=investor.id).delete() for fund_data in investor_data.get("funds", []): fund = FundTable( investor_id=investor.id, fund_name=fund_data.get("fund_name"), fund_size=fund_data.get("fund_size"), # Now an integer fund_size_source_url=fund_data.get("fund_size_source_url"), check_size_lower=fund_data.get("check_size_lower"), check_size_upper=fund_data.get("check_size_upper"), source_url=fund_data.get("source_url"), source_provider=fund_data.get("source_provider"), geographic_focus=fund_data.get("geographic_focus"), # Now a string ) db.add(fund) db.flush() # Get the fund ID # Add investment stages (many-to-many) for stage_name in fund_data.get("investment_stage_names", []): stage = self._get_or_create_investment_stage(db, stage_name) fund.investment_stages.append(stage) # Add sectors (many-to-many) for sector_name in fund_data.get("sector_names", []): sector = self._get_or_create_sector(db, sector_name) fund.sectors.append(sector) return investor except Exception as e: print(f"Error saving investor to database: {e}") db.rollback() return None def _get_or_create_investment_stage( self, db: Session, stage_name: str ) -> InvestmentStageTable: """Get existing investment stage or create new one""" from db.models import InvestmentStageTable stage = ( db.query(InvestmentStageTable) .filter(InvestmentStageTable.name == stage_name) .first() ) if not stage: stage = InvestmentStageTable(name=stage_name) db.add(stage) db.flush() # Get the ID without committing return stage def _get_or_create_sector(self, db: Session, sector_name: str) -> SectorTable: """Get existing sector or create new one""" sector = db.query(SectorTable).filter(SectorTable.name == sector_name).first() if not sector: sector = SectorTable(name=sector_name) db.add(sector) db.flush() # Get the ID without committing return sector def _save_investor_to_db( self, db: Session, investor_data: InvestorData ) -> InvestorTable: """Save investor data to database""" # Create investor record investor = InvestorTable( name=investor_data.investor.name, description=investor_data.investor.description, aum=investor_data.investor.aum, check_size_lower=investor_data.investor.check_size_lower, check_size_upper=investor_data.investor.check_size_upper, geographic_focus=investor_data.investor.geographic_focus, number_of_investments=investor_data.investor.number_of_investments, ) db.add(investor) db.flush() # Get the ID # Add team members for member_data in investor_data.team_members: member = InvestorMember( name=member_data.name, role=member_data.role, email=member_data.email, investor_id=investor.id, ) db.add(member) # Add sectors for sector_data in investor_data.sectors: sector = self._get_or_create_sector(db, sector_data.name) investor.sectors.append(sector) # Add portfolio companies for company_schema in investor_data.portfolio_companies: # Convert CompanySchema to CompanyData format company_data = CompanyData( company=company_schema, sectors=[], # Will be empty for portfolio companies members=[], # Will be empty for portfolio companies investors=[], # Will be empty for portfolio companies ) company = self._save_company_to_db(db, company_data, skip_investors=True) investor.portfolio_companies.append(company) return investor def _save_company_to_db( self, db: Session, company_data: CompanyData, skip_investors: bool = False ) -> CompanyTable: """Save company data to database""" # Check if company already exists existing_company = ( db.query(CompanyTable) .filter(CompanyTable.name == company_data.company.name) .first() ) if existing_company: return existing_company # Create company record company = CompanyTable( name=company_data.company.name, industry=company_data.company.industry, location=company_data.company.location, description=company_data.company.description, founded_year=company_data.company.founded_year, website=company_data.company.website, ) db.add(company) db.flush() # Get the ID # Add company members for member_data in company_data.members: if member_data.name: # Only add members with names member = CompanyMember( name=member_data.name, linkedin=member_data.linkedin, role=member_data.role, company_id=company.id, ) db.add(member) # Add sectors for sector_data in company_data.sectors: sector = self._get_or_create_sector(db, sector_data.name) company.sectors.append(sector) # Add investors (if not skipping to avoid circular references) if not skip_investors: for investor_data in company_data.investors: # Look for existing investor by name existing_investor = ( db.query(InvestorTable) .filter(InvestorTable.name == investor_data.name) .first() ) if existing_investor: company.investors.append(existing_investor) return company async def _process_row( self, row: pd.Series, row_idx: int, is_investor: bool = True ) -> Optional[InvestorData | CompanyData]: """Process a single row of data""" # Clean values to remove control characters cleaned_row = {} for key, value in row.items(): if pd.notna(value): # Convert to string and clean control characters clean_value = ( str(value).replace("\n", " ").replace("\r", " ").replace("\t", " ") ) # Remove other control characters clean_value = "".join( char for char in clean_value if ord(char) >= 32 or char in ["\n", "\r", "\t"] ) cleaned_row[key] = clean_value row_str = ", ".join([f"{key}: {value}" for key, value in cleaned_row.items()]) try: print(f"Processing row {row_idx + 1}...") if is_investor: result = await self.investor_structured_llm.ainvoke(row_str) else: result = await self.company_structured_llm.ainvoke(row_str) if result: return result.model_dump() return None except Exception as e: print(f"Error processing row {row_idx + 1}: {e}") return None async def parse_investors(self, df: pd.DataFrame, save_to_db: bool = True): """ Parse investors from DataFrame using manual JSON parsing and LLM for currency conversion. Expected CSV columns: Name, Website, Final Investor Profile, Final Profile sourcing """ results = [] db = None if save_to_db: db = get_db_session() try: total_rows = len(df) print(f"\n๐Ÿš€ Starting to process {total_rows} investors...") for idx, row in df.iterrows(): try: name = ( row.get("Name", "").strip() if pd.notna(row.get("Name")) else None ) website = ( row.get("Website", "").strip() if pd.notna(row.get("Website")) else None ) profile_json = ( row.get("Final Investor Profile", "") if pd.notna(row.get("Final Investor Profile")) else None ) if not name or not profile_json: print(f"โš ๏ธ Row {idx + 1}: Skipping - missing name or profile") continue print(f"\n๐Ÿ“Š Processing {idx + 1}/{total_rows}: {name}") # Process the investor profile investor_data = await self.process_investor_profile( name, website, profile_json ) if investor_data: results.append(investor_data) print(" โœ“ Parsed successfully") print(f" - HQ: {investor_data.get('headquarters')}") print( f" - AUM: ${investor_data.get('aum'):,}" if investor_data.get("aum") else " - AUM: Not Available" ) print(f" - Funds: {len(investor_data.get('funds', []))}") print( f" - Team: {len(investor_data.get('team_members', []))}" ) # Save to database if save_to_db and db: try: saved_investor = self._save_parsed_investor_to_db( db, investor_data ) if saved_investor: db.commit() print( f" โœ… Saved to database (ID: {saved_investor.id})" ) else: print(" โŒ Failed to save to database") except Exception as e: db.rollback() print(f" โŒ Database error: {e}") else: print(" โš ๏ธ Failed to process profile") # Commit every 10 investors to avoid memory issues if save_to_db and db and (idx + 1) % 10 == 0: db.commit() print(f"\n๐Ÿ’พ Committed batch at row {idx + 1}") except Exception as e: print(f"โŒ Error processing row {idx + 1}: {e}") if db: db.rollback() continue # Final commit if save_to_db and db: db.commit() print("\nโœ… Final commit completed") except Exception as e: print(f"โŒ Fatal error in parse_investors: {e}") if db: db.rollback() finally: if db: db.close() print(f"\n๐ŸŽ‰ Completed! Processed {len(results)}/{total_rows} investors") return results async def parse_companies(self, df: pd.DataFrame, save_to_db: bool = True): """ Parse companies from DataFrame using manual JSON parsing. Expected CSV columns: Name, Website, Investor, Final Investor Profile (actually company profile) """ results = [] db = None if save_to_db: db = get_db_session() try: total_rows = len(df) print(f"\n๐Ÿš€ Starting to process {total_rows} companies...") for idx, row in df.iterrows(): try: name = ( row.get("Name", "").strip() if pd.notna(row.get("Name")) else None ) website = ( row.get("Website", "").strip() if pd.notna(row.get("Website")) else None ) investor_names = ( row.get("Investor", "").strip() if pd.notna(row.get("Investor")) else None ) profile_json = ( row.get("Final Investor Profile", "") if pd.notna(row.get("Final Investor Profile")) else None ) if not name or not profile_json: print(f"โš ๏ธ Row {idx + 1}: Skipping - missing name or profile") continue print(f"\n๐Ÿ“Š Processing {idx + 1}/{total_rows}: {name}") # Process the company profile company_data = await self.process_company_profile( name, website, profile_json, investor_names ) if company_data: results.append(company_data) print(" โœ“ Parsed successfully") print(f" - Location: {company_data.get('location')}") print(f" - Industry: {company_data.get('industry')}") print( f" - Founded: {company_data.get('founded_year')}" if company_data.get("founded_year") else " - Founded: Unknown" ) print( f" - Executives: {len(company_data.get('key_executives', []))}" ) print( f" - Investors: {len(company_data.get('investor_names', []))}" ) # Save to database if save_to_db and db: try: saved_company = self._save_parsed_company_to_db( db, company_data ) if saved_company: db.commit() print( f" โœ… Saved to database (ID: {saved_company.id})" ) else: print(" โŒ Failed to save to database") except Exception as e: db.rollback() print(f" โŒ Database error: {e}") else: print(" โš ๏ธ Failed to process profile") # Commit every 10 companies to avoid memory issues if save_to_db and db and (idx + 1) % 10 == 0: db.commit() print(f"\n๐Ÿ’พ Committed batch at row {idx + 1}") except Exception as e: print(f"โŒ Error processing row {idx + 1}: {e}") if db: db.rollback() continue # Final commit if save_to_db and db: db.commit() print("\nโœ… Final commit completed") except Exception as e: print(f"โŒ Fatal error in parse_companies: {e}") if db: db.rollback() finally: if db: db.close() print(f"\n๐ŸŽ‰ Completed! Processed {len(results)}/{total_rows} companies") return results # async def main(): # """Main execution function""" # # Initialize database tables # print("๐Ÿ”ง Initializing database...") # init_database() # # Create processor # processor = InvestorProcessor() # print("๐Ÿ“Š Processing companies...") # companies = await processor.parse_companies( # "data/19 Companies data.csv", save_to_db=True # ) # print(f"Processed {len(companies)} companies") # print("\n๐Ÿ’ฐ Processing investors...") # investors = await processor.parse_investors( # "data/19 Investors data.csv", save_to_db=True # ) # print(f"Processed {len(investors)} investors") # print("\nโœจ Processing complete!") # if __name__ == "__main__": # asyncio.run(main())