Files
Anton_wireframe/app/services/investor_parser.py
T
bolade bbf6af58f0 Implement LLM-powered Investor Parser with CSV processing, SQL and vector database integration
- Added FastAPI application with a simple root endpoint.
- Developed LLMInvestorParser class for processing investor data from CSV files.
- Integrated OpenAI API for LLM enhancements and JSON cleaning.
- Implemented structured data extraction and saving to SQL database.
- Added functionality to save investor descriptions to ChromaDB for vector similarity search.
- Created command-line interface for processing files and searching investors.
- Added schema definitions for Investor and related data models using SQLAlchemy and Pydantic.
- Implemented logging for better traceability and error handling.
- Included requirements.txt for dependency management.
2025-08-28 22:51:58 +01:00

450 lines
16 KiB
Python

#!/usr/bin/env python3
"""
LLM-powered Investor Parser
A comprehensive parser that processes investor CSV data and saves it to both SQL and vector databases.
Supports both simple parsing and LLM-enhanced parsing for better data quality.
Usage:
python investor_parser.py --help
python investor_parser.py --file="path/to/csv" --limit=10
python investor_parser.py --file="path/to/csv" --use-llm --limit=50
python investor_parser.py --search="bioeconomy circular"
"""
import argparse
import json
import logging
import os
from typing import Any, Dict, Optional
import chromadb
import pandas as pd
from dotenv import load_dotenv
from openai import OpenAI
from db import get_session, init_database
from schema import CSVRow, Investor
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class InvestorParser:
"""Complete investor parser with optional LLM enhancement"""
def __init__(self, use_llm: bool = False):
self.use_llm = use_llm
# Initialize OpenAI client if using LLM
if self.use_llm:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.warning(
"OpenAI API key not found. LLM features will be disabled."
)
self.use_llm = False
else:
self.openai_client = OpenAI(api_key=api_key)
logger.info("LLM enhancement enabled")
# Initialize ChromaDB
self.chroma_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.chroma_client.get_or_create_collection(
name="investor_descriptions",
metadata={
"description": "Investor descriptions and investment thesis focus"
},
)
# Initialize database
init_database()
def parse_json_field(self, json_str: str) -> Dict[str, Any]:
"""Safely parse JSON string with optional LLM assistance"""
if not json_str or json_str.strip() == "":
return {}
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
logger.warning(f"JSON parsing failed: {e}")
# Use LLM to clean JSON if available
if self.use_llm:
return self._llm_clean_json(json_str)
else:
return {}
def _llm_clean_json(self, malformed_json: str) -> Dict[str, Any]:
"""Use LLM to clean and parse malformed JSON"""
try:
prompt = f"""
The following text appears to be malformed JSON. Please clean it up and return valid JSON.
If it's not possible to create valid JSON, return an empty object {{}}.
Original text:
{malformed_json[:2000]} # Limit length for API
Return only the cleaned JSON, no explanations:
"""
response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0,
)
cleaned_json = response.choices[0].message.content.strip()
return json.loads(cleaned_json)
except Exception as e:
logger.error(f"LLM JSON cleaning failed: {e}")
return {}
def extract_structured_data(self, csv_row: CSVRow) -> Dict[str, Any]:
"""Extract and structure data from CSV row"""
# Parse the investment firm profile
profile_data = {}
if csv_row.investment_firm_profile:
profile_data = self.parse_json_field(csv_row.investment_firm_profile)
# Create structured output
structured_data = {
"name": csv_row.name,
"website": csv_row.website or profile_data.get("websiteURL"),
"investor_description": profile_data.get("investorDescription", ""),
"investment_thesis_focus": profile_data.get("investmentThesisFocus", []),
"headquarters": profile_data.get("headquarters", ""),
"aum_info": profile_data.get("overallAssetsUnderManagement", {}),
"funds_info": profile_data.get("funds", []),
"crunchbase_urls": csv_row.crunchbase_linkedin_urls or "",
"crunchbase_extract": csv_row.crunchbase_firm_extract or "",
"linkedin_profile": csv_row.linkedin_investment_profile or "",
"source_truth_profile": csv_row.source_of_truth_profile or "",
}
return structured_data
def enhance_with_llm(self, investor_data: Dict[str, Any]) -> Dict[str, Any]:
"""Use LLM to enhance and standardize investor data"""
if not self.use_llm:
return investor_data
try:
# Combine all available text for context
context_text = " ".join(
[
investor_data.get("investor_description", ""),
investor_data.get("crunchbase_extract", ""),
investor_data.get("linkedin_profile", ""),
investor_data.get("source_truth_profile", ""),
]
)
if not context_text.strip():
return investor_data
prompt = f"""
Based on the following information about an investor, please extract and standardize:
1. A concise investor description (2-3 sentences)
2. Investment thesis focus areas (list of specific focus areas)
3. Headquarters location (city, country format)
Investor: {investor_data["name"]}
Context: {context_text[:3000]} # Limit for API
Return in JSON format:
{{
"enhanced_description": "concise description here",
"standardized_focus": ["focus area 1", "focus area 2", ...],
"standardized_headquarters": "City, Country"
}}
"""
response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
)
enhanced_data = json.loads(response.choices[0].message.content)
# Update investor data with enhanced information
if enhanced_data.get("enhanced_description"):
investor_data["enhanced_description"] = enhanced_data[
"enhanced_description"
]
if enhanced_data.get("standardized_focus"):
investor_data["standardized_focus"] = enhanced_data[
"standardized_focus"
]
if enhanced_data.get("standardized_headquarters"):
investor_data["standardized_headquarters"] = enhanced_data[
"standardized_headquarters"
]
return investor_data
except Exception as e:
logger.error(f"LLM enhancement failed for {investor_data['name']}: {e}")
return investor_data
def save_to_sql(self, investor_data: Dict[str, Any]) -> int:
"""Save investor data to SQL database"""
try:
with get_session() as session:
# Check if investor already exists
existing = (
session.query(Investor)
.filter_by(name=investor_data["name"])
.first()
)
if existing:
logger.info(f"Updating existing investor: {investor_data['name']}")
investor = existing
else:
logger.info(f"Creating new investor: {investor_data['name']}")
investor = Investor()
# Map data to investor object
investor.name = investor_data["name"]
investor.website = investor_data.get("website")
investor.investor_description = investor_data.get(
"enhanced_description"
) or investor_data.get("investor_description")
investor.investment_thesis_focus = investor_data.get(
"standardized_focus"
) or investor_data.get("investment_thesis_focus")
investor.headquarters = investor_data.get(
"standardized_headquarters"
) or investor_data.get("headquarters")
# AUM information
aum_info = investor_data.get("aum_info") or {}
investor.aum_amount = aum_info.get("aumAmount")
investor.aum_as_of_date = aum_info.get("asOfDate")
investor.aum_source_url = aum_info.get("sourceUrl")
# Fund information
investor.funds_info = investor_data.get("funds_info", [])
# Raw data
investor.crunchbase_urls = investor_data.get("crunchbase_urls")
investor.crunchbase_extract = investor_data.get("crunchbase_extract")
investor.linkedin_profile = investor_data.get("linkedin_profile")
investor.source_truth_profile = investor_data.get(
"source_truth_profile"
)
if not existing:
session.add(investor)
session.flush() # Get the ID
return investor.id
except Exception as e:
logger.error(f"Failed to save to SQL: {e}")
raise
def save_to_vector_db(self, investor_id: int, investor_data: Dict[str, Any]):
"""Save investor description and focus to ChromaDB"""
try:
# Prepare text for embedding
description_text = investor_data.get(
"enhanced_description"
) or investor_data.get("investor_description", "")
focus_areas = investor_data.get("standardized_focus") or investor_data.get(
"investment_thesis_focus", []
)
if isinstance(focus_areas, list):
focus_text = " ".join(focus_areas)
else:
focus_text = str(focus_areas)
# Combine description and focus for embedding
combined_text = f"{description_text} {focus_text}".strip()
if not combined_text:
logger.warning(f"No text to embed for investor {investor_data['name']}")
return
# Create metadata
metadata = {
"investor_id": investor_id,
"name": investor_data["name"],
"website": investor_data.get("website") or "",
"headquarters": investor_data.get("standardized_headquarters")
or investor_data.get("headquarters")
or "",
"focus_areas_count": len(focus_areas)
if isinstance(focus_areas, list)
else 0,
}
# Add to ChromaDB
self.collection.add(
documents=[combined_text],
metadatas=[metadata],
ids=[f"investor_{investor_id}"],
)
logger.info(f"Added investor {investor_data['name']} to vector database")
except Exception as e:
logger.error(f"Failed to save to vector DB: {e}")
def process_csv_file(self, csv_file_path: str, limit: Optional[int] = None):
"""Process the entire CSV file"""
logger.info(f"Starting to process CSV file: {csv_file_path}")
# Read CSV
df = pd.read_csv(csv_file_path)
logger.info(f"Loaded {len(df)} rows from CSV")
if limit:
df = df.head(limit)
logger.info(f"Processing limited to {limit} rows")
processed_count = 0
error_count = 0
for index, row in df.iterrows():
try:
logger.info(f"Processing row {index + 1}/{len(df)}: {row['Name']}")
# Create CSVRow object
csv_row = CSVRow(
name=row["Name"],
website=row.get("Website"),
investment_firm_profile=row.get("Investment Firm Profile"),
crunchbase_linkedin_urls=row.get("Crunchbase & LinkedIn URLs"),
crunchbase_firm_extract=row.get("Crunchbase Firm Extract"),
linkedin_investment_profile=row.get("LinkedIn Investment Profile"),
source_of_truth_profile=row.get("Source of Truth Profile"),
)
# Extract structured data
structured_data = self.extract_structured_data(csv_row)
# Enhance with LLM if enabled
enhanced_data = self.enhance_with_llm(structured_data)
# Save to SQL database
investor_id = self.save_to_sql(enhanced_data)
# Save to vector database
self.save_to_vector_db(investor_id, enhanced_data)
processed_count += 1
# Progress update every 10 rows
if (index + 1) % 10 == 0:
logger.info(
f"Progress: {processed_count} processed, {error_count} errors"
)
except Exception as e:
error_count += 1
logger.error(
f"Error processing row {index + 1} ({row.get('Name', 'Unknown')}): {e}"
)
continue
logger.info(
f"Processing complete! Processed: {processed_count}, Errors: {error_count}"
)
return processed_count, error_count
def search_investors(self, query: str, limit: int = 10):
"""Search investors using vector similarity"""
try:
results = self.collection.query(query_texts=[query], n_results=limit)
return results
except Exception as e:
logger.error(f"Search failed: {e}")
return None
def main():
"""Main function with command line interface"""
parser = argparse.ArgumentParser(description="LLM-powered Investor Parser")
parser.add_argument("--file", type=str, help="Path to CSV file to process")
parser.add_argument("--limit", type=int, help="Limit number of rows to process")
parser.add_argument(
"--use-llm",
action="store_true",
help="Enable LLM enhancement (requires OpenAI API key)",
)
parser.add_argument("--search", type=str, help="Search query for vector database")
parser.add_argument(
"--search-limit",
type=int,
default=10,
help="Number of search results to return",
)
args = parser.parse_args()
# Initialize parser
investor_parser = InvestorParser(use_llm=args.use_llm)
if args.search:
# Perform search
logger.info(f"Searching for: {args.search}")
results = investor_parser.search_investors(args.search, args.search_limit)
if results and results["documents"][0]:
print(f"\nFound {len(results['documents'][0])} similar investors:")
for i, (doc, metadata) in enumerate(
zip(results["documents"][0], results["metadatas"][0])
):
print(f"{i + 1}. {metadata['name']}")
print(f" Website: {metadata.get('website', 'N/A')}")
print(f" HQ: {metadata.get('headquarters', 'N/A')}")
print(f" Focus areas: {metadata.get('focus_areas_count', 0)}")
print(f" Similarity score: {results['distances'][0][i]:.3f}")
print()
else:
print("No results found.")
elif args.file:
# Process CSV file
if not os.path.exists(args.file):
logger.error(f"File not found: {args.file}")
return
processed, errors = investor_parser.process_csv_file(args.file, args.limit)
print("\nProcessing complete!")
print(f"Successfully processed: {processed} investors")
print(f"Errors encountered: {errors}")
# Show some search examples
print("\nTrying some example searches...")
for query in ["bioeconomy", "venture capital", "sustainability"]:
results = investor_parser.search_investors(query, 3)
if results and results["documents"][0]:
print(f"\nTop matches for '{query}':")
for i, metadata in enumerate(results["metadatas"][0][:3]):
print(f" {i + 1}. {metadata['name']}")
else:
parser.print_help()
if __name__ == "__main__":
main()