diff --git a/app/__pycache__/main.cpython-312.pyc b/app/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..d9c2a4c Binary files /dev/null and b/app/__pycache__/main.cpython-312.pyc differ diff --git a/app/__pycache__/pydantic_schemas.cpython-312.pyc b/app/__pycache__/pydantic_schemas.cpython-312.pyc new file mode 100644 index 0000000..49fa62c Binary files /dev/null and b/app/__pycache__/pydantic_schemas.cpython-312.pyc differ diff --git a/app/__pycache__/schemas.cpython-312.pyc b/app/__pycache__/schemas.cpython-312.pyc new file mode 100644 index 0000000..f60949d Binary files /dev/null and b/app/__pycache__/schemas.cpython-312.pyc differ diff --git a/app/__pycache__/settings.cpython-312.pyc b/app/__pycache__/settings.cpython-312.pyc new file mode 100644 index 0000000..22723b3 Binary files /dev/null and b/app/__pycache__/settings.cpython-312.pyc differ diff --git a/app/db/__pycache__/__init__.cpython-312.pyc b/app/db/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..3173eb0 Binary files /dev/null and b/app/db/__pycache__/__init__.cpython-312.pyc differ diff --git a/app/db/__pycache__/db.cpython-312.pyc b/app/db/__pycache__/db.cpython-312.pyc new file mode 100644 index 0000000..21b09ee Binary files /dev/null and b/app/db/__pycache__/db.cpython-312.pyc differ diff --git a/app/db/__pycache__/tables.cpython-312.pyc b/app/db/__pycache__/tables.cpython-312.pyc new file mode 100644 index 0000000..2086f0d Binary files /dev/null and b/app/db/__pycache__/tables.cpython-312.pyc differ diff --git a/app/db/db.py b/app/db/db.py index 81215fa..401bdd6 100644 --- a/app/db/db.py +++ b/app/db/db.py @@ -1,11 +1,12 @@ import os -from contextlib import contextmanager -from typing import Generator +from typing import Annotated +from fastapi import Depends from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session, sessionmaker -from schema import Base +Base = declarative_base() # Database configuration DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///investors.db") @@ -17,26 +18,23 @@ engine = create_engine(DATABASE_URL, echo=False) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + + +db_dependency = Annotated[Session, Depends(get_db)] + + def init_database(): """Initialize the database by creating all tables""" Base.metadata.create_all(bind=engine) print("Database initialized successfully!") -@contextmanager -def get_session() -> Generator[Session, None, None]: - """Get a database session with automatic cleanup""" - session = SessionLocal() - try: - yield session - session.commit() - except Exception as e: - session.rollback() - raise e - finally: - session.close() - - def get_session_sync() -> Session: """Get a database session for synchronous operations""" return SessionLocal() diff --git a/app/db/tables.py b/app/db/tables.py new file mode 100644 index 0000000..c66d162 --- /dev/null +++ b/app/db/tables.py @@ -0,0 +1,23 @@ +import datetime + +from sqlalchemy import Column, DateTime, Integer, String + +from db.db import Base + + +class InvestorTable(Base): + __tablename__ = "investors" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String, nullable=False) + aum = Column(Integer, nullable=False) + check_size = Column(String, nullable=False) + sector_focus = Column(String, nullable=False) + stage_focus = Column(String, nullable=False) + region = Column(String, nullable=False) + created_at = Column(DateTime, default=datetime.datetime.now(datetime.UTC)) + updated_at = Column( + DateTime, + default=datetime.datetime.now(datetime.UTC), + onupdate=datetime.datetime.now(datetime.UTC), + ) diff --git a/app/main.py b/app/main.py index 4781a23..a09042e 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,44 @@ -from fastapi import FastAPI +import io + +import pandas as pd +from db.db import db_dependency, init_database +from fastapi import FastAPI, File, UploadFile +from services.openrouter import InvestorProcessor + +from app.services.querying import QueryProcessor app = FastAPI() +init_database() + + @app.get("/") def read_root(): - return {"Hello": "World"} \ No newline at end of file + return {"Hello": "World"} + + +@app.post("/parse-csv") +async def parse_csv(db: db_dependency, file: UploadFile = File(...)): + # Read uploaded CSV with pandas + content = await file.read() + df = pd.read_csv(io.StringIO(content.decode("utf-8"))) + + # Process the dataframe + processor = InvestorProcessor(sql_session=db) + results = await processor.process_csv(df) + + # Convert Pydantic objects to dictionaries + return {"results": [r.dict() for r in results]} + + +@app.post("/query") +async def query_investors(db: db_dependency, question: str): + processor = QueryProcessor(sql_session=db) + results = processor.process_query(question) + return {"results": [r.dict() for r in results]} + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app="main:app", host="localhost", port=8000, reload=True) diff --git a/app/pydantic_schemas.py b/app/pydantic_schemas.py new file mode 100644 index 0000000..08588b5 --- /dev/null +++ b/app/pydantic_schemas.py @@ -0,0 +1,38 @@ +from typing import List + +from pydantic import BaseModel + + +class Investor(BaseModel): + name: str + aum: int + check_size: str + sector_focus: str + stage_focus: str + region: str + investment_thesis: str + investor_description: str + + +class InvestorList(BaseModel): + investor_list: List[Investor] + + +class QueryResponse(BaseModel): + name: str + aum: int + check_size: str + sector_focus: str + stage_focus: str + region: str + investment_thesis: str + investor_description: str + reason: str + + +class QueryRequest(BaseModel): + question: str + + +class QueryResponseList(BaseModel): + responses: List[QueryResponse] \ No newline at end of file diff --git a/app/services/__pycache__/__init__.cpython-312.pyc b/app/services/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..9191821 Binary files /dev/null and b/app/services/__pycache__/__init__.cpython-312.pyc differ diff --git a/app/services/__pycache__/langgraph_agent.cpython-312.pyc b/app/services/__pycache__/langgraph_agent.cpython-312.pyc new file mode 100644 index 0000000..fe524c0 Binary files /dev/null and b/app/services/__pycache__/langgraph_agent.cpython-312.pyc differ diff --git a/app/services/__pycache__/llm_parser.cpython-312.pyc b/app/services/__pycache__/llm_parser.cpython-312.pyc new file mode 100644 index 0000000..0867b81 Binary files /dev/null and b/app/services/__pycache__/llm_parser.cpython-312.pyc differ diff --git a/app/services/__pycache__/openrouter.cpython-312.pyc b/app/services/__pycache__/openrouter.cpython-312.pyc new file mode 100644 index 0000000..46d57bd Binary files /dev/null and b/app/services/__pycache__/openrouter.cpython-312.pyc differ diff --git a/app/services/__pycache__/settings.cpython-312.pyc b/app/services/__pycache__/settings.cpython-312.pyc new file mode 100644 index 0000000..64d23fc Binary files /dev/null and b/app/services/__pycache__/settings.cpython-312.pyc differ diff --git a/app/services/investor_parser.py b/app/services/investor_parser.py deleted file mode 100644 index 8e2864e..0000000 --- a/app/services/investor_parser.py +++ /dev/null @@ -1,449 +0,0 @@ -#!/usr/bin/env python3 -""" -LLM-powered Investor Parser - -A comprehensive parser that processes investor CSV data and saves it to both SQL and vector databases. -Supports both simple parsing and LLM-enhanced parsing for better data quality. - -Usage: - python investor_parser.py --help - python investor_parser.py --file="path/to/csv" --limit=10 - python investor_parser.py --file="path/to/csv" --use-llm --limit=50 - python investor_parser.py --search="bioeconomy circular" -""" - -import argparse -import json -import logging -import os -from typing import Any, Dict, Optional - -import chromadb -import pandas as pd -from dotenv import load_dotenv -from openai import OpenAI - -from db import get_session, init_database -from schema import CSVRow, Investor - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - - -class InvestorParser: - """Complete investor parser with optional LLM enhancement""" - - def __init__(self, use_llm: bool = False): - self.use_llm = use_llm - - # Initialize OpenAI client if using LLM - if self.use_llm: - api_key = os.getenv("OPENAI_API_KEY") - if not api_key: - logger.warning( - "OpenAI API key not found. LLM features will be disabled." - ) - self.use_llm = False - else: - self.openai_client = OpenAI(api_key=api_key) - logger.info("LLM enhancement enabled") - - # Initialize ChromaDB - self.chroma_client = chromadb.PersistentClient(path="./chroma_db") - self.collection = self.chroma_client.get_or_create_collection( - name="investor_descriptions", - metadata={ - "description": "Investor descriptions and investment thesis focus" - }, - ) - - # Initialize database - init_database() - - def parse_json_field(self, json_str: str) -> Dict[str, Any]: - """Safely parse JSON string with optional LLM assistance""" - if not json_str or json_str.strip() == "": - return {} - - try: - return json.loads(json_str) - except json.JSONDecodeError as e: - logger.warning(f"JSON parsing failed: {e}") - - # Use LLM to clean JSON if available - if self.use_llm: - return self._llm_clean_json(json_str) - else: - return {} - - def _llm_clean_json(self, malformed_json: str) -> Dict[str, Any]: - """Use LLM to clean and parse malformed JSON""" - try: - prompt = f""" - The following text appears to be malformed JSON. Please clean it up and return valid JSON. - If it's not possible to create valid JSON, return an empty object {{}}. - - Original text: - {malformed_json[:2000]} # Limit length for API - - Return only the cleaned JSON, no explanations: - """ - - response = self.openai_client.chat.completions.create( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content": prompt}], - temperature=0, - ) - - cleaned_json = response.choices[0].message.content.strip() - return json.loads(cleaned_json) - - except Exception as e: - logger.error(f"LLM JSON cleaning failed: {e}") - return {} - - def extract_structured_data(self, csv_row: CSVRow) -> Dict[str, Any]: - """Extract and structure data from CSV row""" - # Parse the investment firm profile - profile_data = {} - if csv_row.investment_firm_profile: - profile_data = self.parse_json_field(csv_row.investment_firm_profile) - - # Create structured output - structured_data = { - "name": csv_row.name, - "website": csv_row.website or profile_data.get("websiteURL"), - "investor_description": profile_data.get("investorDescription", ""), - "investment_thesis_focus": profile_data.get("investmentThesisFocus", []), - "headquarters": profile_data.get("headquarters", ""), - "aum_info": profile_data.get("overallAssetsUnderManagement", {}), - "funds_info": profile_data.get("funds", []), - "crunchbase_urls": csv_row.crunchbase_linkedin_urls or "", - "crunchbase_extract": csv_row.crunchbase_firm_extract or "", - "linkedin_profile": csv_row.linkedin_investment_profile or "", - "source_truth_profile": csv_row.source_of_truth_profile or "", - } - - return structured_data - - def enhance_with_llm(self, investor_data: Dict[str, Any]) -> Dict[str, Any]: - """Use LLM to enhance and standardize investor data""" - if not self.use_llm: - return investor_data - - try: - # Combine all available text for context - context_text = " ".join( - [ - investor_data.get("investor_description", ""), - investor_data.get("crunchbase_extract", ""), - investor_data.get("linkedin_profile", ""), - investor_data.get("source_truth_profile", ""), - ] - ) - - if not context_text.strip(): - return investor_data - - prompt = f""" - Based on the following information about an investor, please extract and standardize: - 1. A concise investor description (2-3 sentences) - 2. Investment thesis focus areas (list of specific focus areas) - 3. Headquarters location (city, country format) - - Investor: {investor_data["name"]} - Context: {context_text[:3000]} # Limit for API - - Return in JSON format: - {{ - "enhanced_description": "concise description here", - "standardized_focus": ["focus area 1", "focus area 2", ...], - "standardized_headquarters": "City, Country" - }} - """ - - response = self.openai_client.chat.completions.create( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content": prompt}], - temperature=0.3, - ) - - enhanced_data = json.loads(response.choices[0].message.content) - - # Update investor data with enhanced information - if enhanced_data.get("enhanced_description"): - investor_data["enhanced_description"] = enhanced_data[ - "enhanced_description" - ] - - if enhanced_data.get("standardized_focus"): - investor_data["standardized_focus"] = enhanced_data[ - "standardized_focus" - ] - - if enhanced_data.get("standardized_headquarters"): - investor_data["standardized_headquarters"] = enhanced_data[ - "standardized_headquarters" - ] - - return investor_data - - except Exception as e: - logger.error(f"LLM enhancement failed for {investor_data['name']}: {e}") - return investor_data - - def save_to_sql(self, investor_data: Dict[str, Any]) -> int: - """Save investor data to SQL database""" - try: - with get_session() as session: - # Check if investor already exists - existing = ( - session.query(Investor) - .filter_by(name=investor_data["name"]) - .first() - ) - - if existing: - logger.info(f"Updating existing investor: {investor_data['name']}") - investor = existing - else: - logger.info(f"Creating new investor: {investor_data['name']}") - investor = Investor() - - # Map data to investor object - investor.name = investor_data["name"] - investor.website = investor_data.get("website") - investor.investor_description = investor_data.get( - "enhanced_description" - ) or investor_data.get("investor_description") - investor.investment_thesis_focus = investor_data.get( - "standardized_focus" - ) or investor_data.get("investment_thesis_focus") - investor.headquarters = investor_data.get( - "standardized_headquarters" - ) or investor_data.get("headquarters") - - # AUM information - aum_info = investor_data.get("aum_info") or {} - investor.aum_amount = aum_info.get("aumAmount") - investor.aum_as_of_date = aum_info.get("asOfDate") - investor.aum_source_url = aum_info.get("sourceUrl") - - # Fund information - investor.funds_info = investor_data.get("funds_info", []) - - # Raw data - investor.crunchbase_urls = investor_data.get("crunchbase_urls") - investor.crunchbase_extract = investor_data.get("crunchbase_extract") - investor.linkedin_profile = investor_data.get("linkedin_profile") - investor.source_truth_profile = investor_data.get( - "source_truth_profile" - ) - - if not existing: - session.add(investor) - - session.flush() # Get the ID - return investor.id - - except Exception as e: - logger.error(f"Failed to save to SQL: {e}") - raise - - def save_to_vector_db(self, investor_id: int, investor_data: Dict[str, Any]): - """Save investor description and focus to ChromaDB""" - try: - # Prepare text for embedding - description_text = investor_data.get( - "enhanced_description" - ) or investor_data.get("investor_description", "") - focus_areas = investor_data.get("standardized_focus") or investor_data.get( - "investment_thesis_focus", [] - ) - - if isinstance(focus_areas, list): - focus_text = " ".join(focus_areas) - else: - focus_text = str(focus_areas) - - # Combine description and focus for embedding - combined_text = f"{description_text} {focus_text}".strip() - - if not combined_text: - logger.warning(f"No text to embed for investor {investor_data['name']}") - return - - # Create metadata - metadata = { - "investor_id": investor_id, - "name": investor_data["name"], - "website": investor_data.get("website") or "", - "headquarters": investor_data.get("standardized_headquarters") - or investor_data.get("headquarters") - or "", - "focus_areas_count": len(focus_areas) - if isinstance(focus_areas, list) - else 0, - } - - # Add to ChromaDB - self.collection.add( - documents=[combined_text], - metadatas=[metadata], - ids=[f"investor_{investor_id}"], - ) - - logger.info(f"Added investor {investor_data['name']} to vector database") - - except Exception as e: - logger.error(f"Failed to save to vector DB: {e}") - - def process_csv_file(self, csv_file_path: str, limit: Optional[int] = None): - """Process the entire CSV file""" - logger.info(f"Starting to process CSV file: {csv_file_path}") - - # Read CSV - df = pd.read_csv(csv_file_path) - logger.info(f"Loaded {len(df)} rows from CSV") - - if limit: - df = df.head(limit) - logger.info(f"Processing limited to {limit} rows") - - processed_count = 0 - error_count = 0 - - for index, row in df.iterrows(): - try: - logger.info(f"Processing row {index + 1}/{len(df)}: {row['Name']}") - - # Create CSVRow object - csv_row = CSVRow( - name=row["Name"], - website=row.get("Website"), - investment_firm_profile=row.get("Investment Firm Profile"), - crunchbase_linkedin_urls=row.get("Crunchbase & LinkedIn URLs"), - crunchbase_firm_extract=row.get("Crunchbase Firm Extract"), - linkedin_investment_profile=row.get("LinkedIn Investment Profile"), - source_of_truth_profile=row.get("Source of Truth Profile"), - ) - - # Extract structured data - structured_data = self.extract_structured_data(csv_row) - - # Enhance with LLM if enabled - enhanced_data = self.enhance_with_llm(structured_data) - - # Save to SQL database - investor_id = self.save_to_sql(enhanced_data) - - # Save to vector database - self.save_to_vector_db(investor_id, enhanced_data) - - processed_count += 1 - - # Progress update every 10 rows - if (index + 1) % 10 == 0: - logger.info( - f"Progress: {processed_count} processed, {error_count} errors" - ) - - except Exception as e: - error_count += 1 - logger.error( - f"Error processing row {index + 1} ({row.get('Name', 'Unknown')}): {e}" - ) - continue - - logger.info( - f"Processing complete! Processed: {processed_count}, Errors: {error_count}" - ) - return processed_count, error_count - - def search_investors(self, query: str, limit: int = 10): - """Search investors using vector similarity""" - try: - results = self.collection.query(query_texts=[query], n_results=limit) - - return results - - except Exception as e: - logger.error(f"Search failed: {e}") - return None - - -def main(): - """Main function with command line interface""" - parser = argparse.ArgumentParser(description="LLM-powered Investor Parser") - parser.add_argument("--file", type=str, help="Path to CSV file to process") - parser.add_argument("--limit", type=int, help="Limit number of rows to process") - parser.add_argument( - "--use-llm", - action="store_true", - help="Enable LLM enhancement (requires OpenAI API key)", - ) - parser.add_argument("--search", type=str, help="Search query for vector database") - parser.add_argument( - "--search-limit", - type=int, - default=10, - help="Number of search results to return", - ) - - args = parser.parse_args() - - # Initialize parser - investor_parser = InvestorParser(use_llm=args.use_llm) - - if args.search: - # Perform search - logger.info(f"Searching for: {args.search}") - results = investor_parser.search_investors(args.search, args.search_limit) - - if results and results["documents"][0]: - print(f"\nFound {len(results['documents'][0])} similar investors:") - for i, (doc, metadata) in enumerate( - zip(results["documents"][0], results["metadatas"][0]) - ): - print(f"{i + 1}. {metadata['name']}") - print(f" Website: {metadata.get('website', 'N/A')}") - print(f" HQ: {metadata.get('headquarters', 'N/A')}") - print(f" Focus areas: {metadata.get('focus_areas_count', 0)}") - print(f" Similarity score: {results['distances'][0][i]:.3f}") - print() - else: - print("No results found.") - - elif args.file: - # Process CSV file - if not os.path.exists(args.file): - logger.error(f"File not found: {args.file}") - return - - processed, errors = investor_parser.process_csv_file(args.file, args.limit) - - print("\nProcessing complete!") - print(f"Successfully processed: {processed} investors") - print(f"Errors encountered: {errors}") - - # Show some search examples - print("\nTrying some example searches...") - for query in ["bioeconomy", "venture capital", "sustainability"]: - results = investor_parser.search_investors(query, 3) - if results and results["documents"][0]: - print(f"\nTop matches for '{query}':") - for i, metadata in enumerate(results["metadatas"][0][:3]): - print(f" {i + 1}. {metadata['name']}") - - else: - parser.print_help() - - -if __name__ == "__main__": - main() diff --git a/app/services/langgraph_agent.py b/app/services/langgraph_agent.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/llm_parser.py b/app/services/llm_parser.py index e22238b..4465251 100644 --- a/app/services/llm_parser.py +++ b/app/services/llm_parser.py @@ -1,28 +1,368 @@ -import asyncio -import csv +import json +import logging +import os +from typing import Any, Dict, Optional -from openai import AsyncOpenAI -from pydantic import BaseModel +import chromadb +import pandas as pd +from dotenv import load_dotenv +from openai import OpenAI + +from db import get_session, init_database +from schema import CSVRow, Investor + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) -class RowSchema(BaseModel): - section: str - explanation: str +class LLMInvestorParser: + def __init__(self): + # Initialize OpenAI client + self.openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) -client = AsyncOpenAI() + # Initialize ChromaDB + self.chroma_client = chromadb.PersistentClient(path="./chroma_db") + self.collection = self.chroma_client.get_or_create_collection( + name="investor_descriptions", + metadata={ + "description": "Investor descriptions and investment thesis focus" + }, + ) -async def process_row(row): - resp = await client.chat.completions.create( - model="gpt-4o-mini", - messages=[{"role": "user", "content": f"Extract relevant section:\n{row}"}], - response_format={"type": "json_object"} # ensures JSON output - ) - return RowSchema.model_validate_json(resp.choices[0].message.content) + # Initialize database + init_database() -async def main(): - with open("data.csv") as f: - reader = csv.DictReader(f) - tasks = [process_row(row) for row in reader] - return await asyncio.gather(*tasks) + def parse_json_field(self, json_str: str) -> Dict[str, Any]: + """Safely parse JSON string with LLM assistance if needed""" + if not json_str or json_str.strip() == "": + return {} -results = asyncio.run(main()) \ No newline at end of file + try: + # Try direct JSON parsing first + return json.loads(json_str) + except json.JSONDecodeError: + # If direct parsing fails, use LLM to clean and parse + logger.info("Direct JSON parsing failed, using LLM to clean JSON") + return self._llm_clean_json(json_str) + + def _llm_clean_json(self, malformed_json: str) -> Dict[str, Any]: + """Use LLM to clean and parse malformed JSON""" + try: + prompt = f""" + The following text appears to be malformed JSON. Please clean it up and return valid JSON. + If it's not possible to create valid JSON, return an empty object {{}}. + + Original text: + {malformed_json[:2000]} # Limit length for API + + Return only the cleaned JSON, no explanations: + """ + + response = self.openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": prompt}], + temperature=0, + ) + + cleaned_json = response.choices[0].message.content.strip() + return json.loads(cleaned_json) + + except Exception as e: + logger.error(f"LLM JSON cleaning failed: {e}") + return {} + + def extract_structured_data(self, csv_row: CSVRow) -> Dict[str, Any]: + """Extract and structure data from CSV row using LLM""" + # Parse the investment firm profile + profile_data = {} + if csv_row.investment_firm_profile: + profile_data = self.parse_json_field(csv_row.investment_firm_profile) + + # Create structured output + structured_data = { + "name": csv_row.name, + "website": csv_row.website or profile_data.get("websiteURL"), + "investor_description": profile_data.get("investorDescription", ""), + "investment_thesis_focus": profile_data.get("investmentThesisFocus", []), + "headquarters": profile_data.get("headquarters", ""), + "aum_info": profile_data.get("overallAssetsUnderManagement", {}), + "funds_info": profile_data.get("funds", []), + "crunchbase_urls": csv_row.crunchbase_linkedin_urls or "", + "crunchbase_extract": csv_row.crunchbase_firm_extract or "", + "linkedin_profile": csv_row.linkedin_investment_profile or "", + "source_truth_profile": csv_row.source_of_truth_profile or "", + } + + return structured_data + + def enhance_with_llm(self, investor_data: Dict[str, Any]) -> Dict[str, Any]: + """Use LLM to enhance and standardize investor data""" + try: + # Combine all available text for context + context_text = " ".join( + [ + investor_data.get("investor_description", ""), + investor_data.get("crunchbase_extract", ""), + investor_data.get("linkedin_profile", ""), + investor_data.get("source_truth_profile", ""), + ] + ) + + if not context_text.strip(): + return investor_data + + prompt = f""" + Based on the following information about an investor, please extract and standardize: + 1. A concise investor description (2-3 sentences) + 2. Investment thesis focus areas (list of specific focus areas) + 3. Headquarters location (city, country format) + + Investor: {investor_data["name"]} + Context: {context_text[:3000]} # Limit for API + + Return in JSON format: + {{ + "enhanced_description": "concise description here", + "standardized_focus": ["focus area 1", "focus area 2", ...], + "standardized_headquarters": "City, Country" + }} + """ + + response = self.openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": prompt}], + temperature=0.3, + ) + + enhanced_data = json.loads(response.choices[0].message.content) + + # Update investor data with enhanced information + if enhanced_data.get("enhanced_description"): + investor_data["enhanced_description"] = enhanced_data[ + "enhanced_description" + ] + + if enhanced_data.get("standardized_focus"): + investor_data["standardized_focus"] = enhanced_data[ + "standardized_focus" + ] + + if enhanced_data.get("standardized_headquarters"): + investor_data["standardized_headquarters"] = enhanced_data[ + "standardized_headquarters" + ] + + return investor_data + + except Exception as e: + logger.error(f"LLM enhancement failed for {investor_data['name']}: {e}") + return investor_data + + def save_to_sql(self, investor_data: Dict[str, Any]) -> int: + """Save investor data to SQL database""" + try: + with get_session() as session: + # Check if investor already exists + existing = ( + session.query(Investor) + .filter_by(name=investor_data["name"]) + .first() + ) + + if existing: + logger.info(f"Updating existing investor: {investor_data['name']}") + investor = existing + else: + logger.info(f"Creating new investor: {investor_data['name']}") + investor = Investor() + + # Map data to investor object + investor.name = investor_data["name"] + investor.website = investor_data.get("website") + investor.investor_description = investor_data.get( + "enhanced_description" + ) or investor_data.get("investor_description") + investor.investment_thesis_focus = investor_data.get( + "standardized_focus" + ) or investor_data.get("investment_thesis_focus") + investor.headquarters = investor_data.get( + "standardized_headquarters" + ) or investor_data.get("headquarters") + + # AUM information + aum_info = investor_data.get("aum_info", {}) + investor.aum_amount = aum_info.get("aumAmount") + investor.aum_as_of_date = aum_info.get("asOfDate") + investor.aum_source_url = aum_info.get("sourceUrl") + + # Fund information + investor.funds_info = investor_data.get("funds_info", []) + + # Raw data + investor.crunchbase_urls = investor_data.get("crunchbase_urls") + investor.crunchbase_extract = investor_data.get("crunchbase_extract") + investor.linkedin_profile = investor_data.get("linkedin_profile") + investor.source_truth_profile = investor_data.get( + "source_truth_profile" + ) + + if not existing: + session.add(investor) + + session.flush() # Get the ID + return investor.id + + except Exception as e: + logger.error(f"Failed to save to SQL: {e}") + raise + + def save_to_vector_db(self, investor_id: int, investor_data: Dict[str, Any]): + """Save investor description and focus to ChromaDB""" + try: + # Prepare text for embedding + description_text = investor_data.get( + "enhanced_description" + ) or investor_data.get("investor_description", "") + focus_areas = investor_data.get("standardized_focus") or investor_data.get( + "investment_thesis_focus", [] + ) + + if isinstance(focus_areas, list): + focus_text = " ".join(focus_areas) + else: + focus_text = str(focus_areas) + + # Combine description and focus for embedding + combined_text = f"{description_text} {focus_text}".strip() + + if not combined_text: + logger.warning(f"No text to embed for investor {investor_data['name']}") + return + + # Create metadata + metadata = { + "investor_id": investor_id, + "name": investor_data["name"], + "website": investor_data.get("website", ""), + "headquarters": investor_data.get("standardized_headquarters") + or investor_data.get("headquarters", ""), + "focus_areas_count": len(focus_areas) + if isinstance(focus_areas, list) + else 0, + } + + # Add to ChromaDB + self.collection.add( + documents=[combined_text], + metadatas=[metadata], + ids=[f"investor_{investor_id}"], + ) + + logger.info(f"Added investor {investor_data['name']} to vector database") + + except Exception as e: + logger.error(f"Failed to save to vector DB: {e}") + + def process_csv_file(self, csv_file_path: str, limit: Optional[int] = None): + """Process the entire CSV file""" + logger.info(f"Starting to process CSV file: {csv_file_path}") + + # Read CSV + df = pd.read_csv(csv_file_path) + logger.info(f"Loaded {len(df)} rows from CSV") + + if limit: + df = df.head(limit) + logger.info(f"Processing limited to {limit} rows") + + processed_count = 0 + error_count = 0 + + for index, row in df.iterrows(): + try: + logger.info(f"Processing row {index + 1}/{len(df)}: {row['Name']}") + + # Create CSVRow object + csv_row = CSVRow( + name=row["Name"], + website=row.get("Website"), + investment_firm_profile=row.get("Investment Firm Profile"), + crunchbase_linkedin_urls=row.get("Crunchbase & LinkedIn URLs"), + crunchbase_firm_extract=row.get("Crunchbase Firm Extract"), + linkedin_investment_profile=row.get("LinkedIn Investment Profile"), + source_of_truth_profile=row.get("Source of Truth Profile"), + ) + + # Extract structured data + structured_data = self.extract_structured_data(csv_row) + + # Enhance with LLM + enhanced_data = self.enhance_with_llm(structured_data) + + # Save to SQL database + investor_id = self.save_to_sql(enhanced_data) + + # Save to vector database + self.save_to_vector_db(investor_id, enhanced_data) + + processed_count += 1 + + # Progress update every 10 rows + if (index + 1) % 10 == 0: + logger.info( + f"Processed {processed_count} rows successfully, {error_count} errors" + ) + + except Exception as e: + error_count += 1 + logger.error( + f"Error processing row {index + 1} ({row.get('Name', 'Unknown')}): {e}" + ) + continue + + logger.info( + f"Processing complete! Processed: {processed_count}, Errors: {error_count}" + ) + return processed_count, error_count + + def search_investors(self, query: str, limit: int = 5): + """Search investors using vector similarity""" + try: + results = self.collection.query(query_texts=[query], n_results=limit) + + return results + + except Exception as e: + logger.error(f"Search failed: {e}") + return None + + +def main(): + """Main function to run the parser""" + parser = LLMInvestorParser() + + # Process the CSV file + csv_file = "/home/oluwasanmi/Documents/Work/MKD/anton_wireframe/New Excerpt 5 investors - Sheet1 parse.csv" + + # Start with a small sample for testing + processed, errors = parser.process_csv_file(csv_file, limit=5) + + print("\nProcessing complete!") + print(f"Successfully processed: {processed} investors") + print(f"Errors encountered: {errors}") + + # Test search functionality + print("\nTesting search functionality...") + results = parser.search_investors("bioeconomy circular economy") + if results: + print(f"Found {len(results['documents'][0])} similar investors") + for i, doc in enumerate(results["documents"][0]): + print(f" {i + 1}. {results['metadatas'][0][i]['name']}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/app/services/openrouter.py b/app/services/openrouter.py new file mode 100644 index 0000000..6fa8a01 --- /dev/null +++ b/app/services/openrouter.py @@ -0,0 +1,178 @@ +import asyncio +from typing import List, Optional + +import chromadb +import pandas as pd +from db.tables import InvestorTable +from langchain_core.prompts import PromptTemplate +from langchain_openai import ChatOpenAI +from pydantic_schemas import Investor, InvestorList +from settings import settings + +# Add these imports for your databases +# from sqlalchemy.ext.asyncio import AsyncSession +# from your_vector_db import VectorDBClient + + +class InvestorProcessor: + def __init__( + self, + sql_session: Optional[object] = None, + vector_db_client: Optional[object] = None, + ): + self.template = """You are an expert data extraction assistant. Extract investor information from the provided CSV data and return it as a list of structured records. + +Given the following CSV data rows: +{question} + +For each row, extract and structure the following fields: +- name: The investor's full name +- aum: Assets under management (as integer, use 0 if not available) +- check_size: Investment check size (as string) +- sector_focus: Sector focus (as string) +- stage_focus: Investment stage focus (as string) +- region: Geographic region (as string) +- investment_thesis: Investment thesis (as string) +- investor_description: Description of the investor (as string) + +Important: +- If a field is not available in the data, use appropriate default values (empty string for text fields, 0 for numbers) +- Ensure all text fields are properly escaped and contain no control characters +- Return clean, valid JSON only + +Return the data as a structured list of investors.""" + + self.prompt = PromptTemplate( + template=self.template, input_variables=["question"] + ) + + self.llm = ChatOpenAI( + api_key=settings.OPENROUTER_API_KEY, + base_url="https://openrouter.ai/api/v1", + model="openai/gpt-oss-120b:fre", + temperature=0, + ) + + self.structured_llm = self.llm.with_structured_output(InvestorList) + self.sql_session = sql_session + self.vector_db_client = vector_db_client + + self.vector_db_client = chromadb.PersistentClient(path="./chroma_db") + self.collection = self.vector_db_client.get_or_create_collection( + name="investor_descriptions", + metadata={ + "description": "Investor descriptions and investment thesis focus" + }, + ) + + async def _process_batch(self, batch: pd.DataFrame, batch_idx: int) -> List: + """Process a single batch of data""" + # Convert batch to string representation - clean the data + batch_str = "" + for idx, row in batch.iterrows(): + # Clean values to remove control characters + cleaned_row = {} + for key, value in row.items(): + if pd.notna(value): + # Convert to string and clean control characters + clean_value = ( + str(value) + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " ") + ) + # Remove other control characters + clean_value = "".join( + char + for char in clean_value + if ord(char) >= 32 or char in ["\n", "\r", "\t"] + ) + cleaned_row[key] = clean_value + + row_str = ", ".join( + [f"{key}: {value}" for key, value in cleaned_row.items()] + ) + batch_str += f"Row {idx + 1}: {row_str}\n" + + try: + print(f"Processing batch {batch_idx + 1}...") + batch_results = await self.structured_llm.ainvoke(batch_str) + return batch_results.investor_list + except Exception as e: + print(f"Error processing batch {batch_idx + 1}: {e}") + return [] + + async def _save_to_sql(self, investors: List[Investor]) -> None: + """Save investors to SQL database""" + if not self.sql_session: + return + + # Implement SQL saving logic here + for investor in investors: + db_investor = InvestorTable( + name=investor.name, + aum=investor.aum, + check_size=investor.check_size, + sector_focus=investor.sector_focus, + stage_focus=investor.stage_focus, + region=investor.region, + ) + self.sql_session.add(db_investor) + self.sql_session.commit() + + async def _save_to_vector_db(self, investors: List[Investor]) -> None: + """Save investors to vector database""" + if not self.vector_db_client: + return + + documents = [] + metadatas = [] + ids = [] + + for i, investor in enumerate(investors): + doc_text = f"{investor.investor_description}\nInvestment Thesis: {investor.investment_thesis}" + documents.append(doc_text) + metadatas.append({"name": investor.name}) + ids.append(f"investor_{i}_{investor.name.replace(' ', '_')}") + + if documents: + # Use add method with proper parameters + self.collection.add(documents=documents, metadatas=metadatas, ids=ids) + + async def process_csv( + self, df: pd.DataFrame, batch_size: int = 10, max_concurrent: int = 10 + ) -> List: + """Process CSV data in parallel batches and save to databases""" + results = [] + + # Create batches + batches = [] + for i in range(0, len(df), batch_size): + batch = df.iloc[i : i + batch_size] + batches.append((batch, i // batch_size)) + + # Process batches with concurrency control + semaphore = asyncio.Semaphore(max_concurrent) + + async def process_with_semaphore(batch_data): + batch, batch_idx = batch_data + async with semaphore: + return await self._process_batch(batch, batch_idx) + + # Execute all batches concurrently + batch_results = await asyncio.gather( + *[process_with_semaphore(batch_data) for batch_data in batches], + return_exceptions=True, + ) + + # Collect results, filtering out exceptions + for batch_result in batch_results: + if not isinstance(batch_result, Exception): + results.extend(batch_result) + + # Save to databases + if results: + await self._save_to_sql(results) + await self._save_to_vector_db(results) + + return results diff --git a/app/services/querying.py b/app/services/querying.py new file mode 100644 index 0000000..ea80784 --- /dev/null +++ b/app/services/querying.py @@ -0,0 +1,61 @@ +from typing import Optional + +import chromadb +from langchain_openai import ChatOpenAI +from pydantic_schemas import Investor, InvestorList +from settings import settings + +# Add these imports for your databases +# from sqlalchemy.ext.asyncio import AsyncSession +# from your_vector_db import VectorDBClient + + +class QueryProcessor: + def __init__( + self, + sql_session: Optional[object] = None, + vector_db_client: Optional[object] = None, + ): + self.llm = ChatOpenAI( + api_key=settings.OPENROUTER_API_KEY, + base_url="https://openrouter.ai/api/v1", + model="openai/gpt-oss-120b:free", + temperature=0, + ) + + self.structured_llm = self.llm.with_structured_output(InvestorList) + self.sql_session = sql_session + self.vector_db_client = vector_db_client + + self.vector_db_client = chromadb.PersistentClient(path="./chroma_db") + self.collection = self.vector_db_client.get_or_create_collection( + name="investor_descriptions", + metadata={ + "description": "Investor descriptions and investment thesis focus" + }, + ) + + def query_sql_database(self, query: str) -> Optional[InvestorList]: + """Query the SQL database for investor information.""" + if not self.sql_session: + return None + + # Implement SQL querying logic here + result = self.sql_session.execute(query) + investors = result.scalars().all() + return InvestorList(investors=investors) + + def query_vector_database(self, query: str) -> Optional[InvestorList]: + """Query the vector database for investor information.""" + if not self.vector_db_client: + return None + + # Implement vector database querying logic here + results = self.vector_db_client.query(collection=self.collection, query=query) + investors = [Investor(**doc.metadata) for doc in results.documents] + return InvestorList(investors=investors) + + def process_query(self, question: str) -> InvestorList: + """Process a query using the LLM and return structured investor data.""" + response = self.structured_llm.predict(question=question) + return response diff --git a/app/settings.py b/app/settings.py index 2f9dd5b..a9376fe 100644 --- a/app/settings.py +++ b/app/settings.py @@ -1,10 +1,11 @@ from pydantic_settings import BaseSettings + class Settings(BaseSettings): - api_key: str - db_url: str + OPENROUTER_API_KEY: str class Config: env_file = ".env" -settings = Settings() \ No newline at end of file + +settings = Settings()