From 4c99638d94c61e8e0a0ddfd8cc0afcbe34902c3e Mon Sep 17 00:00:00 2001 From: bolade Date: Thu, 28 Aug 2025 23:09:14 +0100 Subject: [PATCH] Remove deprecated demo, ingest, schema, and test parser files; add new LLM parser implementation and settings configuration --- app/db/__init__.py | 0 app/services/__init__.py | 0 app/services/demo.py | 82 -------- app/services/ingest.py | 368 ------------------------------------ app/services/llm_parser.py | 28 +++ app/services/schema.py | 109 ----------- app/services/test_parser.py | 260 ------------------------- app/settings.py | 10 + 8 files changed, 38 insertions(+), 819 deletions(-) create mode 100644 app/db/__init__.py create mode 100644 app/services/__init__.py delete mode 100644 app/services/demo.py delete mode 100644 app/services/ingest.py delete mode 100644 app/services/schema.py delete mode 100644 app/services/test_parser.py create mode 100644 app/settings.py diff --git a/app/db/__init__.py b/app/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/demo.py b/app/services/demo.py deleted file mode 100644 index 3057c3a..0000000 --- a/app/services/demo.py +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env python3 -""" -Quick demonstration of the LLM Investor Parser functionality. -This script shows how to use the system programmatically. -""" - -from sqlalchemy import func, select - -from db import get_session -from investor_parser import InvestorParser -from schema import Investor - - -def main(): - print("šŸš€ LLM Investor Parser Demo") - print("=" * 50) - - # Initialize parser (without LLM for demo) - parser = InvestorParser(use_llm=False) - - # Show current database stats - with get_session() as session: - count = session.scalar(select(func.count(Investor.id))) - print(f"šŸ“Š Current database: {count} investors") - - # Demonstrate search functionality - print("\nšŸ” Search Examples:") - - search_queries = [ - "circular bioeconomy sustainable", - "venture capital early stage", - "fintech financial technology", - "healthcare biotechnology", - "climate sustainability", - ] - - for query in search_queries: - print(f"\nšŸ”Ž Searching for: '{query}'") - results = parser.search_investors(query, limit=3) - - if results and results["documents"][0]: - for i, metadata in enumerate(results["metadatas"][0]): - score = results["distances"][0][i] - print(f" {i + 1}. {metadata['name']} (score: {score:.3f})") - else: - print(" No results found") - - # Show detailed investor information - print("\nšŸ“‹ Detailed Investor Sample:") - - with get_session() as session: - investor = session.execute( - select(Investor).where(Investor.investor_description.isnot(None)).limit(1) - ).scalar_one_or_none() - - if investor: - print(f"\nšŸ¢ {investor.name}") - print(f"🌐 Website: {investor.website}") - print(f"šŸ“ HQ: {investor.headquarters or 'Not specified'}") - print(f"šŸ“ Description: {investor.investor_description[:200]}...") - - if investor.investment_thesis_focus: - print( - f"\nšŸŽÆ Investment Focus ({len(investor.investment_thesis_focus)} areas):" - ) - for i, focus in enumerate(investor.investment_thesis_focus[:3], 1): - print(f" {i}. {focus}") - if len(investor.investment_thesis_focus) > 3: - print(f" ... and {len(investor.investment_thesis_focus) - 3} more") - - if investor.aum_amount: - print(f"\nšŸ’° AUM: {investor.aum_amount}") - - print("\nāœ… Demo complete!") - print("\nTo run the full parser:") - print(" python investor_parser.py --file 'your_file.csv' --limit 50") - print("\nTo search investors:") - print(" python investor_parser.py --search 'your search query'") - - -if __name__ == "__main__": - main() diff --git a/app/services/ingest.py b/app/services/ingest.py deleted file mode 100644 index 66e2bf0..0000000 --- a/app/services/ingest.py +++ /dev/null @@ -1,368 +0,0 @@ -import json -import logging -import os -from typing import Any, Dict, Optional - -import chromadb -import pandas as pd -from dotenv import load_dotenv -from openai import OpenAI - -from db import get_session, init_database -from schema import CSVRow, Investor - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class LLMInvestorParser: - def __init__(self): - # Initialize OpenAI client - self.openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - - # Initialize ChromaDB - self.chroma_client = chromadb.PersistentClient(path="./chroma_db") - self.collection = self.chroma_client.get_or_create_collection( - name="investor_descriptions", - metadata={ - "description": "Investor descriptions and investment thesis focus" - }, - ) - - # Initialize database - init_database() - - def parse_json_field(self, json_str: str) -> Dict[str, Any]: - """Safely parse JSON string with LLM assistance if needed""" - if not json_str or json_str.strip() == "": - return {} - - try: - # Try direct JSON parsing first - return json.loads(json_str) - except json.JSONDecodeError: - # If direct parsing fails, use LLM to clean and parse - logger.info("Direct JSON parsing failed, using LLM to clean JSON") - return self._llm_clean_json(json_str) - - def _llm_clean_json(self, malformed_json: str) -> Dict[str, Any]: - """Use LLM to clean and parse malformed JSON""" - try: - prompt = f""" - The following text appears to be malformed JSON. Please clean it up and return valid JSON. - If it's not possible to create valid JSON, return an empty object {{}}. - - Original text: - {malformed_json[:2000]} # Limit length for API - - Return only the cleaned JSON, no explanations: - """ - - response = self.openai_client.chat.completions.create( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content": prompt}], - temperature=0, - ) - - cleaned_json = response.choices[0].message.content.strip() - return json.loads(cleaned_json) - - except Exception as e: - logger.error(f"LLM JSON cleaning failed: {e}") - return {} - - def extract_structured_data(self, csv_row: CSVRow) -> Dict[str, Any]: - """Extract and structure data from CSV row using LLM""" - # Parse the investment firm profile - profile_data = {} - if csv_row.investment_firm_profile: - profile_data = self.parse_json_field(csv_row.investment_firm_profile) - - # Create structured output - structured_data = { - "name": csv_row.name, - "website": csv_row.website or profile_data.get("websiteURL"), - "investor_description": profile_data.get("investorDescription", ""), - "investment_thesis_focus": profile_data.get("investmentThesisFocus", []), - "headquarters": profile_data.get("headquarters", ""), - "aum_info": profile_data.get("overallAssetsUnderManagement", {}), - "funds_info": profile_data.get("funds", []), - "crunchbase_urls": csv_row.crunchbase_linkedin_urls or "", - "crunchbase_extract": csv_row.crunchbase_firm_extract or "", - "linkedin_profile": csv_row.linkedin_investment_profile or "", - "source_truth_profile": csv_row.source_of_truth_profile or "", - } - - return structured_data - - def enhance_with_llm(self, investor_data: Dict[str, Any]) -> Dict[str, Any]: - """Use LLM to enhance and standardize investor data""" - try: - # Combine all available text for context - context_text = " ".join( - [ - investor_data.get("investor_description", ""), - investor_data.get("crunchbase_extract", ""), - investor_data.get("linkedin_profile", ""), - investor_data.get("source_truth_profile", ""), - ] - ) - - if not context_text.strip(): - return investor_data - - prompt = f""" - Based on the following information about an investor, please extract and standardize: - 1. A concise investor description (2-3 sentences) - 2. Investment thesis focus areas (list of specific focus areas) - 3. Headquarters location (city, country format) - - Investor: {investor_data["name"]} - Context: {context_text[:3000]} # Limit for API - - Return in JSON format: - {{ - "enhanced_description": "concise description here", - "standardized_focus": ["focus area 1", "focus area 2", ...], - "standardized_headquarters": "City, Country" - }} - """ - - response = self.openai_client.chat.completions.create( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content": prompt}], - temperature=0.3, - ) - - enhanced_data = json.loads(response.choices[0].message.content) - - # Update investor data with enhanced information - if enhanced_data.get("enhanced_description"): - investor_data["enhanced_description"] = enhanced_data[ - "enhanced_description" - ] - - if enhanced_data.get("standardized_focus"): - investor_data["standardized_focus"] = enhanced_data[ - "standardized_focus" - ] - - if enhanced_data.get("standardized_headquarters"): - investor_data["standardized_headquarters"] = enhanced_data[ - "standardized_headquarters" - ] - - return investor_data - - except Exception as e: - logger.error(f"LLM enhancement failed for {investor_data['name']}: {e}") - return investor_data - - def save_to_sql(self, investor_data: Dict[str, Any]) -> int: - """Save investor data to SQL database""" - try: - with get_session() as session: - # Check if investor already exists - existing = ( - session.query(Investor) - .filter_by(name=investor_data["name"]) - .first() - ) - - if existing: - logger.info(f"Updating existing investor: {investor_data['name']}") - investor = existing - else: - logger.info(f"Creating new investor: {investor_data['name']}") - investor = Investor() - - # Map data to investor object - investor.name = investor_data["name"] - investor.website = investor_data.get("website") - investor.investor_description = investor_data.get( - "enhanced_description" - ) or investor_data.get("investor_description") - investor.investment_thesis_focus = investor_data.get( - "standardized_focus" - ) or investor_data.get("investment_thesis_focus") - investor.headquarters = investor_data.get( - "standardized_headquarters" - ) or investor_data.get("headquarters") - - # AUM information - aum_info = investor_data.get("aum_info", {}) - investor.aum_amount = aum_info.get("aumAmount") - investor.aum_as_of_date = aum_info.get("asOfDate") - investor.aum_source_url = aum_info.get("sourceUrl") - - # Fund information - investor.funds_info = investor_data.get("funds_info", []) - - # Raw data - investor.crunchbase_urls = investor_data.get("crunchbase_urls") - investor.crunchbase_extract = investor_data.get("crunchbase_extract") - investor.linkedin_profile = investor_data.get("linkedin_profile") - investor.source_truth_profile = investor_data.get( - "source_truth_profile" - ) - - if not existing: - session.add(investor) - - session.flush() # Get the ID - return investor.id - - except Exception as e: - logger.error(f"Failed to save to SQL: {e}") - raise - - def save_to_vector_db(self, investor_id: int, investor_data: Dict[str, Any]): - """Save investor description and focus to ChromaDB""" - try: - # Prepare text for embedding - description_text = investor_data.get( - "enhanced_description" - ) or investor_data.get("investor_description", "") - focus_areas = investor_data.get("standardized_focus") or investor_data.get( - "investment_thesis_focus", [] - ) - - if isinstance(focus_areas, list): - focus_text = " ".join(focus_areas) - else: - focus_text = str(focus_areas) - - # Combine description and focus for embedding - combined_text = f"{description_text} {focus_text}".strip() - - if not combined_text: - logger.warning(f"No text to embed for investor {investor_data['name']}") - return - - # Create metadata - metadata = { - "investor_id": investor_id, - "name": investor_data["name"], - "website": investor_data.get("website", ""), - "headquarters": investor_data.get("standardized_headquarters") - or investor_data.get("headquarters", ""), - "focus_areas_count": len(focus_areas) - if isinstance(focus_areas, list) - else 0, - } - - # Add to ChromaDB - self.collection.add( - documents=[combined_text], - metadatas=[metadata], - ids=[f"investor_{investor_id}"], - ) - - logger.info(f"Added investor {investor_data['name']} to vector database") - - except Exception as e: - logger.error(f"Failed to save to vector DB: {e}") - - def process_csv_file(self, csv_file_path: str, limit: Optional[int] = None): - """Process the entire CSV file""" - logger.info(f"Starting to process CSV file: {csv_file_path}") - - # Read CSV - df = pd.read_csv(csv_file_path) - logger.info(f"Loaded {len(df)} rows from CSV") - - if limit: - df = df.head(limit) - logger.info(f"Processing limited to {limit} rows") - - processed_count = 0 - error_count = 0 - - for index, row in df.iterrows(): - try: - logger.info(f"Processing row {index + 1}/{len(df)}: {row['Name']}") - - # Create CSVRow object - csv_row = CSVRow( - name=row["Name"], - website=row.get("Website"), - investment_firm_profile=row.get("Investment Firm Profile"), - crunchbase_linkedin_urls=row.get("Crunchbase & LinkedIn URLs"), - crunchbase_firm_extract=row.get("Crunchbase Firm Extract"), - linkedin_investment_profile=row.get("LinkedIn Investment Profile"), - source_of_truth_profile=row.get("Source of Truth Profile"), - ) - - # Extract structured data - structured_data = self.extract_structured_data(csv_row) - - # Enhance with LLM - enhanced_data = self.enhance_with_llm(structured_data) - - # Save to SQL database - investor_id = self.save_to_sql(enhanced_data) - - # Save to vector database - self.save_to_vector_db(investor_id, enhanced_data) - - processed_count += 1 - - # Progress update every 10 rows - if (index + 1) % 10 == 0: - logger.info( - f"Processed {processed_count} rows successfully, {error_count} errors" - ) - - except Exception as e: - error_count += 1 - logger.error( - f"Error processing row {index + 1} ({row.get('Name', 'Unknown')}): {e}" - ) - continue - - logger.info( - f"Processing complete! Processed: {processed_count}, Errors: {error_count}" - ) - return processed_count, error_count - - def search_investors(self, query: str, limit: int = 5): - """Search investors using vector similarity""" - try: - results = self.collection.query(query_texts=[query], n_results=limit) - - return results - - except Exception as e: - logger.error(f"Search failed: {e}") - return None - - -def main(): - """Main function to run the parser""" - parser = LLMInvestorParser() - - # Process the CSV file - csv_file = "/home/oluwasanmi/Documents/Work/MKD/anton_wireframe/New Excerpt 5 investors - Sheet1 parse.csv" - - # Start with a small sample for testing - processed, errors = parser.process_csv_file(csv_file, limit=5) - - print("\nProcessing complete!") - print(f"Successfully processed: {processed} investors") - print(f"Errors encountered: {errors}") - - # Test search functionality - print("\nTesting search functionality...") - results = parser.search_investors("bioeconomy circular economy") - if results: - print(f"Found {len(results['documents'][0])} similar investors") - for i, doc in enumerate(results["documents"][0]): - print(f" {i + 1}. {results['metadatas'][0][i]['name']}") - - -if __name__ == "__main__": - main() diff --git a/app/services/llm_parser.py b/app/services/llm_parser.py index e69de29..e22238b 100644 --- a/app/services/llm_parser.py +++ b/app/services/llm_parser.py @@ -0,0 +1,28 @@ +import asyncio +import csv + +from openai import AsyncOpenAI +from pydantic import BaseModel + + +class RowSchema(BaseModel): + section: str + explanation: str + +client = AsyncOpenAI() + +async def process_row(row): + resp = await client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": f"Extract relevant section:\n{row}"}], + response_format={"type": "json_object"} # ensures JSON output + ) + return RowSchema.model_validate_json(resp.choices[0].message.content) + +async def main(): + with open("data.csv") as f: + reader = csv.DictReader(f) + tasks = [process_row(row) for row in reader] + return await asyncio.gather(*tasks) + +results = asyncio.run(main()) \ No newline at end of file diff --git a/app/services/schema.py b/app/services/schema.py deleted file mode 100644 index f78cf59..0000000 --- a/app/services/schema.py +++ /dev/null @@ -1,109 +0,0 @@ -from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, Float -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.sql import func -from pydantic import BaseModel -from typing import List, Optional, Dict, Any -import json - -Base = declarative_base() - -class Investor(Base): - __tablename__ = 'investors' - - id = Column(Integer, primary_key=True, autoincrement=True) - name = Column(String(500), nullable=False) - website = Column(String(1000)) - - # Core investment information - investor_description = Column(Text) - investment_thesis_focus = Column(JSON) # List of focus areas - headquarters = Column(String(1000)) - - # AUM information - aum_amount = Column(String(200)) - aum_as_of_date = Column(String(100)) - aum_source_url = Column(String(1000)) - - # Fund information - funds_info = Column(JSON) # Complex fund data - - # Raw data columns for reference - crunchbase_urls = Column(Text) - crunchbase_extract = Column(Text) - linkedin_profile = Column(Text) - source_truth_profile = Column(Text) - - # Metadata - created_at = Column(DateTime(timezone=True), server_default=func.now()) - updated_at = Column(DateTime(timezone=True), onupdate=func.now()) - - def __repr__(self): - return f"" - -# Pydantic models for data validation and parsing -class AUMInfo(BaseModel): - aumAmount: Optional[str] = None - asOfDate: Optional[str] = None - sourceUrl: Optional[str] = None - -class FundInfo(BaseModel): - fundName: Optional[str] = None - fundSize: Optional[str] = None - vintage: Optional[str] = None - status: Optional[str] = None - description: Optional[str] = None - -class InvestorProfile(BaseModel): - websiteURL: Optional[str] = None - investorDescription: Optional[str] = None - investmentThesisFocus: Optional[List[str]] = None - headquarters: Optional[str] = None - overallAssetsUnderManagement: Optional[AUMInfo] = None - funds: Optional[List[FundInfo]] = None - -class CSVRow(BaseModel): - name: str - website: Optional[str] = None - investment_firm_profile: Optional[str] = None - crunchbase_linkedin_urls: Optional[str] = None - crunchbase_firm_extract: Optional[str] = None - linkedin_investment_profile: Optional[str] = None - source_of_truth_profile: Optional[str] = None - - def get_combined_description(self) -> str: - """Combine all description fields for vector embedding""" - descriptions = [] - - if self.investment_firm_profile: - try: - profile_data = json.loads(self.investment_firm_profile) - if isinstance(profile_data, dict): - desc = profile_data.get('investorDescription', '') - if desc: - descriptions.append(desc) - except (json.JSONDecodeError, TypeError): - pass - - if self.crunchbase_firm_extract: - descriptions.append(self.crunchbase_firm_extract) - - if self.linkedin_investment_profile: - descriptions.append(self.linkedin_investment_profile) - - if self.source_of_truth_profile: - descriptions.append(self.source_of_truth_profile) - - return " ".join(descriptions) - - def get_investment_focus(self) -> List[str]: - """Extract investment thesis focus""" - if self.investment_firm_profile: - try: - profile_data = json.loads(self.investment_firm_profile) - if isinstance(profile_data, dict): - focus = profile_data.get('investmentThesisFocus', []) - if isinstance(focus, list): - return focus - except (json.JSONDecodeError, TypeError): - pass - return [] diff --git a/app/services/test_parser.py b/app/services/test_parser.py deleted file mode 100644 index 8b3418d..0000000 --- a/app/services/test_parser.py +++ /dev/null @@ -1,260 +0,0 @@ -import json -import logging -from typing import Any, Dict, Optional - -import chromadb -import pandas as pd - -from db import get_session, init_database -from schema import CSVRow, Investor - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class SimpleInvestorParser: - """Simplified parser that works without OpenAI API for testing""" - - def __init__(self): - # Initialize ChromaDB - self.chroma_client = chromadb.PersistentClient(path="./chroma_db") - self.collection = self.chroma_client.get_or_create_collection( - name="investor_descriptions", - metadata={ - "description": "Investor descriptions and investment thesis focus" - }, - ) - - # Initialize database - init_database() - - def parse_json_field(self, json_str: str) -> Dict[str, Any]: - """Safely parse JSON string""" - if not json_str or json_str.strip() == "": - return {} - - try: - return json.loads(json_str) - except json.JSONDecodeError as e: - logger.warning(f"JSON parsing failed: {e}") - return {} - - def extract_structured_data(self, csv_row: CSVRow) -> Dict[str, Any]: - """Extract and structure data from CSV row""" - # Parse the investment firm profile - profile_data = {} - if csv_row.investment_firm_profile: - profile_data = self.parse_json_field(csv_row.investment_firm_profile) - - # Create structured output - structured_data = { - "name": csv_row.name, - "website": csv_row.website or profile_data.get("websiteURL"), - "investor_description": profile_data.get("investorDescription", ""), - "investment_thesis_focus": profile_data.get("investmentThesisFocus", []), - "headquarters": profile_data.get("headquarters", ""), - "aum_info": profile_data.get("overallAssetsUnderManagement", {}), - "funds_info": profile_data.get("funds", []), - "crunchbase_urls": csv_row.crunchbase_linkedin_urls or "", - "crunchbase_extract": csv_row.crunchbase_firm_extract or "", - "linkedin_profile": csv_row.linkedin_investment_profile or "", - "source_truth_profile": csv_row.source_of_truth_profile or "", - } - - return structured_data - - def save_to_sql(self, investor_data: Dict[str, Any]) -> int: - """Save investor data to SQL database""" - try: - with get_session() as session: - # Check if investor already exists - existing = ( - session.query(Investor) - .filter_by(name=investor_data["name"]) - .first() - ) - - if existing: - logger.info(f"Updating existing investor: {investor_data['name']}") - investor = existing - else: - logger.info(f"Creating new investor: {investor_data['name']}") - investor = Investor() - - # Map data to investor object - investor.name = investor_data["name"] - investor.website = investor_data.get("website") - investor.investor_description = investor_data.get( - "investor_description" - ) - investor.investment_thesis_focus = investor_data.get( - "investment_thesis_focus" - ) - investor.headquarters = investor_data.get("headquarters") - - # AUM information - aum_info = investor_data.get("aum_info") or {} - investor.aum_amount = aum_info.get("aumAmount") - investor.aum_as_of_date = aum_info.get("asOfDate") - investor.aum_source_url = aum_info.get("sourceUrl") - - # Fund information - investor.funds_info = investor_data.get("funds_info", []) - - # Raw data - investor.crunchbase_urls = investor_data.get("crunchbase_urls") - investor.crunchbase_extract = investor_data.get("crunchbase_extract") - investor.linkedin_profile = investor_data.get("linkedin_profile") - investor.source_truth_profile = investor_data.get( - "source_truth_profile" - ) - - if not existing: - session.add(investor) - - session.flush() # Get the ID - return investor.id - - except Exception as e: - logger.error(f"Failed to save to SQL: {e}") - raise - - def save_to_vector_db(self, investor_id: int, investor_data: Dict[str, Any]): - """Save investor description and focus to ChromaDB""" - try: - # Prepare text for embedding - description_text = investor_data.get("investor_description", "") - focus_areas = investor_data.get("investment_thesis_focus", []) - - if isinstance(focus_areas, list): - focus_text = " ".join(focus_areas) - else: - focus_text = str(focus_areas) - - # Combine description and focus for embedding - combined_text = f"{description_text} {focus_text}".strip() - - if not combined_text: - logger.warning(f"No text to embed for investor {investor_data['name']}") - return - - # Create metadata - metadata = { - "investor_id": investor_id, - "name": investor_data["name"], - "website": investor_data.get("website") or "", - "headquarters": investor_data.get("headquarters") or "", - "focus_areas_count": len(focus_areas) - if isinstance(focus_areas, list) - else 0, - } - - # Add to ChromaDB - self.collection.add( - documents=[combined_text], - metadatas=[metadata], - ids=[f"investor_{investor_id}"], - ) - - logger.info(f"Added investor {investor_data['name']} to vector database") - - except Exception as e: - logger.error(f"Failed to save to vector DB: {e}") - - def process_csv_file(self, csv_file_path: str, limit: Optional[int] = None): - """Process the entire CSV file""" - logger.info(f"Starting to process CSV file: {csv_file_path}") - - # Read CSV - df = pd.read_csv(csv_file_path) - logger.info(f"Loaded {len(df)} rows from CSV") - - if limit: - df = df.head(limit) - logger.info(f"Processing limited to {limit} rows") - - processed_count = 0 - error_count = 0 - - for index, row in df.iterrows(): - try: - logger.info(f"Processing row {index + 1}/{len(df)}: {row['Name']}") - - # Create CSVRow object - csv_row = CSVRow( - name=row["Name"], - website=row.get("Website"), - investment_firm_profile=row.get("Investment Firm Profile"), - crunchbase_linkedin_urls=row.get("Crunchbase & LinkedIn URLs"), - crunchbase_firm_extract=row.get("Crunchbase Firm Extract"), - linkedin_investment_profile=row.get("LinkedIn Investment Profile"), - source_of_truth_profile=row.get("Source of Truth Profile"), - ) - - # Extract structured data - structured_data = self.extract_structured_data(csv_row) - - # Save to SQL database - investor_id = self.save_to_sql(structured_data) - - # Save to vector database - self.save_to_vector_db(investor_id, structured_data) - - processed_count += 1 - - # Progress update every 10 rows - if (index + 1) % 10 == 0: - logger.info( - f"Processed {processed_count} rows successfully, {error_count} errors" - ) - - except Exception as e: - error_count += 1 - logger.error( - f"Error processing row {index + 1} ({row.get('Name', 'Unknown')}): {e}" - ) - continue - - logger.info( - f"Processing complete! Processed: {processed_count}, Errors: {error_count}" - ) - return processed_count, error_count - - def search_investors(self, query: str, limit: int = 5): - """Search investors using vector similarity""" - try: - results = self.collection.query(query_texts=[query], n_results=limit) - - return results - - except Exception as e: - logger.error(f"Search failed: {e}") - return None - - -def main(): - """Main function to run the parser""" - parser = SimpleInvestorParser() - - # Process the CSV file - csv_file = "/home/oluwasanmi/Documents/Work/MKD/anton_wireframe/New Excerpt 5 investors - Sheet1 parse.csv" - - # Start with a small sample for testing - processed, errors = parser.process_csv_file(csv_file, limit=5) - - print("Processing complete!") - print(f"Successfully processed: {processed} investors") - print(f"Errors encountered: {errors}") - - # Test search functionality - print("\nTesting search functionality...") - results = parser.search_investors("bioeconomy circular economy") - if results: - print(f"Found {len(results['documents'][0])} similar investors") - for i, doc in enumerate(results["documents"][0]): - print(f" {i + 1}. {results['metadatas'][0][i]['name']}") - - -if __name__ == "__main__": - main() diff --git a/app/settings.py b/app/settings.py new file mode 100644 index 0000000..2f9dd5b --- /dev/null +++ b/app/settings.py @@ -0,0 +1,10 @@ +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + api_key: str + db_url: str + + class Config: + env_file = ".env" + +settings = Settings() \ No newline at end of file