Files
Anton_wireframe/app/services/test_parser.py
T
bolade bbf6af58f0 Implement LLM-powered Investor Parser with CSV processing, SQL and vector database integration
- Added FastAPI application with a simple root endpoint.
- Developed LLMInvestorParser class for processing investor data from CSV files.
- Integrated OpenAI API for LLM enhancements and JSON cleaning.
- Implemented structured data extraction and saving to SQL database.
- Added functionality to save investor descriptions to ChromaDB for vector similarity search.
- Created command-line interface for processing files and searching investors.
- Added schema definitions for Investor and related data models using SQLAlchemy and Pydantic.
- Implemented logging for better traceability and error handling.
- Included requirements.txt for dependency management.
2025-08-28 22:51:58 +01:00

261 lines
9.5 KiB
Python

import json
import logging
from typing import Any, Dict, Optional
import chromadb
import pandas as pd
from db import get_session, init_database
from schema import CSVRow, Investor
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SimpleInvestorParser:
"""Simplified parser that works without OpenAI API for testing"""
def __init__(self):
# Initialize ChromaDB
self.chroma_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.chroma_client.get_or_create_collection(
name="investor_descriptions",
metadata={
"description": "Investor descriptions and investment thesis focus"
},
)
# Initialize database
init_database()
def parse_json_field(self, json_str: str) -> Dict[str, Any]:
"""Safely parse JSON string"""
if not json_str or json_str.strip() == "":
return {}
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
logger.warning(f"JSON parsing failed: {e}")
return {}
def extract_structured_data(self, csv_row: CSVRow) -> Dict[str, Any]:
"""Extract and structure data from CSV row"""
# Parse the investment firm profile
profile_data = {}
if csv_row.investment_firm_profile:
profile_data = self.parse_json_field(csv_row.investment_firm_profile)
# Create structured output
structured_data = {
"name": csv_row.name,
"website": csv_row.website or profile_data.get("websiteURL"),
"investor_description": profile_data.get("investorDescription", ""),
"investment_thesis_focus": profile_data.get("investmentThesisFocus", []),
"headquarters": profile_data.get("headquarters", ""),
"aum_info": profile_data.get("overallAssetsUnderManagement", {}),
"funds_info": profile_data.get("funds", []),
"crunchbase_urls": csv_row.crunchbase_linkedin_urls or "",
"crunchbase_extract": csv_row.crunchbase_firm_extract or "",
"linkedin_profile": csv_row.linkedin_investment_profile or "",
"source_truth_profile": csv_row.source_of_truth_profile or "",
}
return structured_data
def save_to_sql(self, investor_data: Dict[str, Any]) -> int:
"""Save investor data to SQL database"""
try:
with get_session() as session:
# Check if investor already exists
existing = (
session.query(Investor)
.filter_by(name=investor_data["name"])
.first()
)
if existing:
logger.info(f"Updating existing investor: {investor_data['name']}")
investor = existing
else:
logger.info(f"Creating new investor: {investor_data['name']}")
investor = Investor()
# Map data to investor object
investor.name = investor_data["name"]
investor.website = investor_data.get("website")
investor.investor_description = investor_data.get(
"investor_description"
)
investor.investment_thesis_focus = investor_data.get(
"investment_thesis_focus"
)
investor.headquarters = investor_data.get("headquarters")
# AUM information
aum_info = investor_data.get("aum_info") or {}
investor.aum_amount = aum_info.get("aumAmount")
investor.aum_as_of_date = aum_info.get("asOfDate")
investor.aum_source_url = aum_info.get("sourceUrl")
# Fund information
investor.funds_info = investor_data.get("funds_info", [])
# Raw data
investor.crunchbase_urls = investor_data.get("crunchbase_urls")
investor.crunchbase_extract = investor_data.get("crunchbase_extract")
investor.linkedin_profile = investor_data.get("linkedin_profile")
investor.source_truth_profile = investor_data.get(
"source_truth_profile"
)
if not existing:
session.add(investor)
session.flush() # Get the ID
return investor.id
except Exception as e:
logger.error(f"Failed to save to SQL: {e}")
raise
def save_to_vector_db(self, investor_id: int, investor_data: Dict[str, Any]):
"""Save investor description and focus to ChromaDB"""
try:
# Prepare text for embedding
description_text = investor_data.get("investor_description", "")
focus_areas = investor_data.get("investment_thesis_focus", [])
if isinstance(focus_areas, list):
focus_text = " ".join(focus_areas)
else:
focus_text = str(focus_areas)
# Combine description and focus for embedding
combined_text = f"{description_text} {focus_text}".strip()
if not combined_text:
logger.warning(f"No text to embed for investor {investor_data['name']}")
return
# Create metadata
metadata = {
"investor_id": investor_id,
"name": investor_data["name"],
"website": investor_data.get("website") or "",
"headquarters": investor_data.get("headquarters") or "",
"focus_areas_count": len(focus_areas)
if isinstance(focus_areas, list)
else 0,
}
# Add to ChromaDB
self.collection.add(
documents=[combined_text],
metadatas=[metadata],
ids=[f"investor_{investor_id}"],
)
logger.info(f"Added investor {investor_data['name']} to vector database")
except Exception as e:
logger.error(f"Failed to save to vector DB: {e}")
def process_csv_file(self, csv_file_path: str, limit: Optional[int] = None):
"""Process the entire CSV file"""
logger.info(f"Starting to process CSV file: {csv_file_path}")
# Read CSV
df = pd.read_csv(csv_file_path)
logger.info(f"Loaded {len(df)} rows from CSV")
if limit:
df = df.head(limit)
logger.info(f"Processing limited to {limit} rows")
processed_count = 0
error_count = 0
for index, row in df.iterrows():
try:
logger.info(f"Processing row {index + 1}/{len(df)}: {row['Name']}")
# Create CSVRow object
csv_row = CSVRow(
name=row["Name"],
website=row.get("Website"),
investment_firm_profile=row.get("Investment Firm Profile"),
crunchbase_linkedin_urls=row.get("Crunchbase & LinkedIn URLs"),
crunchbase_firm_extract=row.get("Crunchbase Firm Extract"),
linkedin_investment_profile=row.get("LinkedIn Investment Profile"),
source_of_truth_profile=row.get("Source of Truth Profile"),
)
# Extract structured data
structured_data = self.extract_structured_data(csv_row)
# Save to SQL database
investor_id = self.save_to_sql(structured_data)
# Save to vector database
self.save_to_vector_db(investor_id, structured_data)
processed_count += 1
# Progress update every 10 rows
if (index + 1) % 10 == 0:
logger.info(
f"Processed {processed_count} rows successfully, {error_count} errors"
)
except Exception as e:
error_count += 1
logger.error(
f"Error processing row {index + 1} ({row.get('Name', 'Unknown')}): {e}"
)
continue
logger.info(
f"Processing complete! Processed: {processed_count}, Errors: {error_count}"
)
return processed_count, error_count
def search_investors(self, query: str, limit: int = 5):
"""Search investors using vector similarity"""
try:
results = self.collection.query(query_texts=[query], n_results=limit)
return results
except Exception as e:
logger.error(f"Search failed: {e}")
return None
def main():
"""Main function to run the parser"""
parser = SimpleInvestorParser()
# Process the CSV file
csv_file = "/home/oluwasanmi/Documents/Work/MKD/anton_wireframe/New Excerpt 5 investors - Sheet1 parse.csv"
# Start with a small sample for testing
processed, errors = parser.process_csv_file(csv_file, limit=5)
print("Processing complete!")
print(f"Successfully processed: {processed} investors")
print(f"Errors encountered: {errors}")
# Test search functionality
print("\nTesting search functionality...")
results = parser.search_investors("bioeconomy circular economy")
if results:
print(f"Found {len(results['documents'][0])} similar investors")
for i, doc in enumerate(results["documents"][0]):
print(f" {i + 1}. {results['metadatas'][0][i]['name']}")
if __name__ == "__main__":
main()