12 Commits

Author SHA1 Message Date
bolade c5c94936f3 Implement find_similar_investors endpoint to enhance investor similarity search; refactor update_investor logic and improve scoring mechanism for better results. 2025-10-01 23:31:48 +01:00
bolade 17bc5acbc8 Refactor investor similarity search to utilize AI for improved query generation; adjust DataFrame parsing to skip initial rows for better data handling. 2025-09-29 15:58:09 +01:00
bolade 6caea96658 Update server host and port configuration for deployment 2025-09-27 11:16:18 +01:00
bolade 6d902345c0 Refactor investor and company schemas to allow optional fields; update filtering logic in read_companies function and add find_similar_investors endpoint; change LLM model in InvestorProcessor and QueryProcessor for improved performance. 2025-09-27 10:45:08 +01:00
bolade d36367fbe9 Add project management functionality with CRUD operations and associations; introduce project schemas and update main application routing. 2025-09-27 08:53:59 +01:00
bolade abac19c6ae Update .gitignore to exclude __pycache__ directories and modify schemas to allow optional fields for better flexibility; adjust batch size in InvestorProcessor for improved processing efficiency. 2025-09-26 15:56:29 +01:00
bolade f2bbcb96f3 Refactor database models and schemas to allow nullable fields; update init_database function for improved initialization. 2025-09-26 15:24:42 +01:00
bolade 0f7beca5e1 made version 2 2025-09-25 17:00:38 +01:00
bolade b1b1c5ea1e Made improvements to parsing 2025-09-11 16:23:22 +01:00
bolade 29d9292cbd Fix database URL in db.py and update import path for schemas in llm_parser.py 2025-09-11 15:46:39 +01:00
bolade edd0ae910b Refactor investor and company management API with FastAPI integration
- Updated README.md to reflect new features and architecture.
- Implemented company management routes in app/api/companies.py.
- Enhanced main FastAPI application in app/main.py to include company routes and query processing.
- Improved querying capabilities in app/services/querying.py with natural language processing for investor searches.
- Updated requirements.txt to include necessary dependencies for FastAPI and related libraries.
- Added comprehensive error handling and response formatting for API endpoints.
2025-09-03 10:32:19 +01:00
bolade 84cbb888e6 Refactor investor-related schemas and models; implement investor CRUD operations and update stage_focus values to uppercase 2025-09-03 09:41:19 +01:00
47 changed files with 2262 additions and 4780 deletions
+4 -2
View File
@@ -8,8 +8,10 @@
/chroma_db
/*__pycache__*/
*__pycache__
/*.db
/*.cypython-*
*.cypython
/preprocessor
File diff suppressed because one or more lines are too long
-342
View File
@@ -1,342 +0,0 @@
# LLM-Powered Investor Parser
A comprehensive system for parsing investor data from CSV files and storing it in both SQL and vector databases for efficient retrieval and semantic search.
## Features
- **CSV Data Processing**: Parses complex investor data from CSV files with nested JSON fields
- **Dual Database Storage**: Saves structured data to SQL database and text data to vector database
- **LLM Enhancement**: Optional OpenAI GPT integration for data cleaning and enhancement
- **Semantic Search**: Vector similarity search for finding relevant investors
- **Robust Error Handling**: Graceful handling of malformed JSON and missing data
- **Command-Line Interface**: Easy-to-use CLI for batch processing and search
## Architecture
### Components
1. **Schema (`schema.py`)**: SQLAlchemy models and Pydantic validators
2. **Database (`db.py`)**: SQL database connection and session management
3. **Parser (`investor_parser.py`)**: Main parsing logic with LLM integration
4. **Test Parser (`test_parser.py`)**: Simplified parser without LLM dependencies
### Data Flow
```
CSV File → JSON Parsing → Data Extraction → LLM Enhancement → SQL Storage → Vector Storage
```
## Installation
### Prerequisites
- Python 3.12+
- UV package manager (or pip)
### Setup
1. Clone the repository and navigate to the project directory:
```bash
cd /path/to/anton_wireframe
```
2. Create and activate virtual environment using UV:
```bash
uv venv
source .venv/bin/activate # On Linux/Mac
```
3. Install dependencies:
```bash
uv pip install pandas sqlalchemy chromadb openai python-dotenv pydantic
```
4. Configure environment variables (optional for LLM features):
```bash
cp .env.example .env
# Edit .env and add your OpenAI API key
```
## Database Schema
### SQL Database (SQLite)
The `investors` table contains:
- **Basic Info**: name, website, headquarters
- **Investment Focus**: investor_description, investment_thesis_focus
- **Financial Data**: AUM amount, date, source URL
- **Fund Information**: JSON array of fund details
- **Raw Data**: Original CSV fields for reference
- **Metadata**: created_at, updated_at timestamps
### Vector Database (ChromaDB)
Stores embeddings of:
- Investor descriptions
- Investment thesis focus areas
- Combined text for semantic search
## Usage
### Command Line Interface
#### Process CSV File (Simple Mode)
```bash
python investor_parser.py --file "path/to/investors.csv" --limit 50
```
#### Process CSV File (LLM-Enhanced Mode)
```bash
python investor_parser.py --file "path/to/investors.csv" --limit 50 --use-llm
```
#### Search Investors
```bash
python investor_parser.py --search "bioeconomy sustainable agriculture" --search-limit 10
```
#### View Help
```bash
python investor_parser.py --help
```
### Python API
#### Basic Usage
```python
from investor_parser import InvestorParser
# Initialize parser (with or without LLM)
parser = InvestorParser(use_llm=True)
# Process CSV file
processed, errors = parser.process_csv_file("investors.csv", limit=100)
# Search investors
results = parser.search_investors("venture capital fintech", limit=5)
```
#### Direct Database Access
```python
from db import get_session
from schema import Investor
from sqlalchemy import select
# Query database
with get_session() as session:
investors = session.execute(select(Investor)).scalars().all()
for investor in investors:
print(f"{investor.name}: {investor.website}")
```
## Data Processing Pipeline
### 1. CSV Parsing
- Reads CSV with pandas
- Handles nested JSON fields in columns
- Validates data with Pydantic models
### 2. JSON Field Processing
- Direct parsing for well-formed JSON
- LLM-assisted cleaning for malformed JSON (when enabled)
- Graceful fallback to empty objects
### 3. Data Extraction
Extracts key fields:
- Company name and website
- Investor description
- Investment thesis/focus areas
- Headquarters location
- Assets Under Management (AUM)
- Fund information
### 4. LLM Enhancement (Optional)
When `--use-llm` is enabled:
- Standardizes investor descriptions
- Normalizes investment focus areas
- Cleans headquarters location format
- Repairs malformed JSON data
### 5. Dual Storage
- **SQL Database**: Structured, queryable data
- **Vector Database**: Semantic search capabilities
## Configuration
### Environment Variables (.env)
```bash
# OpenAI API Configuration (required for LLM features)
OPENAI_API_KEY=your_openai_api_key_here
# Database Configuration
DATABASE_URL=sqlite:///investors.db
```
### LLM Configuration
- Model: GPT-3.5-turbo (configurable)
- Temperature: 0.3 for enhancement, 0 for JSON cleaning
- Max tokens: Automatically managed
- Fallback: Graceful degradation when API unavailable
## Search Capabilities
### Vector Search Examples
```bash
# Find sustainable/ESG investors
python investor_parser.py --search "sustainability ESG impact investing"
# Find fintech investors
python investor_parser.py --search "financial technology digital payments"
# Find biotech/healthcare investors
python investor_parser.py --search "biotechnology healthcare pharmaceuticals"
# Find early-stage investors
python investor_parser.py --search "seed series A early stage venture"
```
### Search Results Include
- Investor name and website
- Headquarters location
- Number of focus areas
- Similarity score (lower = more similar)
## Error Handling
### Robust Processing
- Malformed JSON handling with LLM backup
- Missing data graceful degradation
- Individual row error isolation
- Comprehensive logging
### Common Issues and Solutions
1. **Invalid JSON in CSV**
- Solution: Enable LLM mode for automatic cleaning
- Fallback: Empty object insertion
2. **Missing OpenAI API Key**
- Solution: System automatically disables LLM features
- Falls back to basic parsing mode
3. **Database Connection Issues**
- Solution: Uses SQLite by default (no external dependencies)
- Configurable via DATABASE_URL
## Performance
### Benchmarks (Approximate)
- **Simple Mode**: ~2-5 seconds per row
- **LLM Mode**: ~5-15 seconds per row (depends on API latency)
- **Search**: <100ms for vector similarity queries
### Optimization Tips
1. Use `--limit` for testing and development
2. Process in batches for large datasets
3. Enable LLM mode only when data quality is crucial
4. Use local vector database for faster searches
## File Structure
```
anton_wireframe/
├── schema.py # Database models and validators
├── db.py # Database connection management
├── investor_parser.py # Main parser with CLI
├── test_parser.py # Simplified parser for testing
├── .env # Environment configuration
├── investors.db # SQLite database (created automatically)
├── chroma_db/ # Vector database directory
└── README.md # This documentation
```
## Example Output
### Processing Log
```
2025-08-27 19:45:46,614 - INFO - Database initialized successfully!
2025-08-27 19:45:46,690 - INFO - Starting to process CSV file: investors.csv
2025-08-27 19:45:46,690 - INFO - Loaded 82 rows from CSV
2025-08-27 19:45:46,690 - INFO - Processing limited to 20 rows
2025-08-27 19:45:46,691 - INFO - Processing row 1/20: European Circular Bioeconomy Fund
2025-08-27 19:45:46,692 - INFO - Creating new investor: European Circular Bioeconomy Fund
2025-08-27 19:45:46,693 - INFO - Added investor European Circular Bioeconomy Fund to vector database
...
2025-08-27 19:45:50,828 - INFO - Processing complete! Processed: 20, Errors: 0
```
### Search Results
```bash
$ python investor_parser.py --search "circular bioeconomy"
Found 4 similar investors:
1. European Circular Bioeconomy Fund
Website: https://www.ecbf.vc
HQ: ECBF Management GmbH, Poppelsdorfer Allee 175, 53115 Bonn, Germany
Focus areas: 6
Similarity score: 0.979
2. Astanor
Website: https://www.astanor.com/
HQ:
Focus areas: 5
Similarity score: 1.080
```
## Contributing
### Development Setup
1. Install development dependencies
2. Run tests: `python test_parser.py`
3. Lint code: Follow PEP 8 standards
4. Test with sample data before processing full datasets
### Adding Features
- New data extractors: Extend `extract_structured_data()`
- New LLM prompts: Modify `enhance_with_llm()`
- New search capabilities: Extend ChromaDB integration
## License
This project is part of the MKD Anton Wireframe system.
## Support
For issues and questions:
1. Check logs for detailed error messages
2. Verify environment configuration
3. Test with limited datasets first
4. Review CSV data format requirements
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
-8
View File
@@ -1,8 +0,0 @@
from fastapi.routing import apirouter
router = apirouter()
@router.get("/companies")
def read_companies():
return {"message": "list of companies"}
-8
View File
@@ -1,8 +0,0 @@
from fastapi import APIRouter
router = APIRouter()
@router.get("/investors")
def read_investors():
return {"message": "list of investors"}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+5 -2
View File
@@ -9,7 +9,7 @@ from sqlalchemy.orm import Session, sessionmaker
Base = declarative_base()
# Database configuration
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///investors_2.db")
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./investors.db")
# Create engine
engine = create_engine(DATABASE_URL, echo=False)
@@ -32,9 +32,12 @@ db_dependency = Annotated[Session, Depends(get_db)]
def init_database():
"""Initialize the database by creating all tables"""
Base.metadata.create_all(bind=engine)
print("Database initialized successfully!")
def get_session_sync() -> Session:
"""Get a database session for synchronous operations"""
return SessionLocal()
def get_db_session():
"""Get a database session for direct use."""
return SessionLocal()
+128 -39
View File
@@ -1,20 +1,27 @@
import datetime
import enum
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Table, Text
from sqlalchemy.orm import relationship
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Table, Text, func
from sqlalchemy.orm import declarative_mixin, relationship
from sqlalchemy.types import Enum
from db.db import Base
@declarative_mixin
class TimestampMixin:
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class InvestmentStage(enum.Enum):
SEED = "seed"
SERIES_A = "series_a"
SERIES_B = "series_b"
SERIES_C = "series_c"
GROWTH = "growth"
LATE_STAGE = "late_stage"
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
# Association table for many-to-many relationship between investors and companies
@@ -35,24 +42,49 @@ investor_sector_association = Table(
)
class InvestorTable(Base):
company_sector_association = Table(
"company_sector",
Base.metadata,
Column("company_id", Integer, ForeignKey("companies.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_sector_association = Table(
"project_sector",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_investor_association = Table(
"project_investors",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("investor_id", Integer, ForeignKey("investors.id")),
)
project_company_association = Table(
"project_companies",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("company_id", Integer, ForeignKey("companies.id")),
)
class InvestorTable(Base, TimestampMixin):
__tablename__ = "investors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
aum = Column(Integer, nullable=False) # Assets Under Management
check_size_lower = Column(Integer, nullable=False) # Lower bound
check_size_upper = Column(Integer, nullable=False) # Upper bound
geographic_focus = Column(String, nullable=False)
stage_focus = Column(Enum(InvestmentStage), nullable=False)
number_of_investments = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.datetime.now(datetime.UTC))
updated_at = Column(
DateTime,
default=datetime.datetime.now(datetime.UTC),
onupdate=datetime.datetime.now(datetime.UTC),
)
aum = Column(Integer, nullable=True) # Assets Under Management
check_size_lower = Column(Integer, nullable=True) # Lower bound
check_size_upper = Column(Integer, nullable=True) # Upper bound
geographic_focus = Column(String, nullable=True)
stage_focus = Column(Enum(InvestmentStage), nullable=True)
number_of_investments = Column(Integer, default=0, nullable=True)
team_members = relationship("InvestorMember", back_populates="investor")
# Relationship to portfolio companies
portfolio_companies = relationship(
@@ -60,30 +92,43 @@ class InvestorTable(Base):
secondary=investor_company_association,
back_populates="investors",
)
team_members = relationship("InvestorTeamMember", back_populates="investor")
sectors = relationship(
"SectorTable",
secondary=investor_sector_association,
back_populates="investors",
)
projects = relationship(
"ProjectTable",
secondary=project_investor_association,
back_populates="investors",
)
class CompanyTable(Base):
class InvestorMember(Base, TimestampMixin):
__tablename__ = "investor_members"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
role = Column(String, nullable=True)
email = Column(String, nullable=True)
investor_id = Column(Integer, ForeignKey("investors.id"))
investor = relationship("InvestorTable", back_populates="team_members")
class CompanyTable(Base, TimestampMixin):
__tablename__ = "companies"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
industry = Column(String, nullable=False)
location = Column(String, nullable=False)
industry = Column(String, nullable=True)
location = Column(String, nullable=True)
description = Column(String, nullable=True)
founded_year = Column(Integer, nullable=True)
website = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.datetime.now(datetime.UTC))
updated_at = Column(
DateTime,
default=datetime.datetime.now(datetime.UTC),
onupdate=datetime.datetime.now(datetime.UTC),
)
members = relationship("CompanyMember", back_populates="company")
# Relationship back to investors
investors = relationship(
"InvestorTable",
@@ -91,8 +136,29 @@ class CompanyTable(Base):
back_populates="portfolio_companies",
)
sectors = relationship(
"SectorTable", secondary=company_sector_association, back_populates="companies"
)
class SectorTable(Base):
projects = relationship(
"ProjectTable",
secondary=project_company_association,
back_populates="companies",
)
class CompanyMember(Base, TimestampMixin):
__tablename__ = "company_members"
id = Column(Integer, primary_key=True)
name = Column(String)
linkedin = Column(String, nullable=True)
role = Column(String, nullable=True)
company_id = Column(Integer, ForeignKey("companies.id"), nullable=False)
company = relationship("CompanyTable", back_populates="members")
class SectorTable(Base, TimestampMixin):
__tablename__ = "sectors"
id = Column(Integer, primary_key=True, index=True)
@@ -105,13 +171,36 @@ class SectorTable(Base):
back_populates="sectors",
)
companies = relationship(
"CompanyTable", secondary=company_sector_association, back_populates="sectors"
)
projects = relationship(
"ProjectTable", secondary=project_sector_association, back_populates="sector"
)
class ProjectTable(Base, TimestampMixin):
__tablename__ = "projects"
class InvestorTeamMember(Base):
__tablename__ = "investor_team"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
role = Column(String, nullable=False)
email = Column(String, nullable=False)
valuation = Column(Integer, nullable=True)
investor_id = Column(Integer, ForeignKey("investors.id"))
investor = relationship("InvestorTable", back_populates="team_members")
stage = Column(Enum(InvestmentStage), nullable=True)
location = Column(String, nullable=True)
description = Column(Text, nullable=True)
start_date = Column(DateTime, nullable=True)
end_date = Column(DateTime, nullable=True)
sector = relationship(
"SectorTable", secondary=project_sector_association, back_populates="projects"
)
investors = relationship(
"InvestorTable",
secondary=project_investor_association,
back_populates="projects",
)
companies = relationship(
"CompanyTable", secondary=project_company_association, back_populates="projects"
)
-115
View File
@@ -1,115 +0,0 @@
import json
from typing import List, Optional
from pydantic import BaseModel
from sqlalchemy import JSON, Column, DateTime, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
Base = declarative_base()
class Investor(Base):
__tablename__ = "investors"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(500), nullable=False)
website = Column(String(1000))
# Core investment information
investor_description = Column(Text)
investment_thesis_focus = Column(JSON) # List of focus areas
headquarters = Column(String(1000))
# AUM information
aum_amount = Column(String(200))
aum_as_of_date = Column(String(100))
aum_source_url = Column(String(1000))
# Fund information
funds_info = Column(JSON) # Complex fund data
# Raw data columns for reference
crunchbase_urls = Column(Text)
crunchbase_extract = Column(Text)
linkedin_profile = Column(Text)
source_truth_profile = Column(Text)
# Metadata
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def __repr__(self):
return f"<Investor(name='{self.name}', website='{self.website}')>"
# Pydantic models for data validation and parsing
class AUMInfo(BaseModel):
aumAmount: Optional[str] = None
asOfDate: Optional[str] = None
sourceUrl: Optional[str] = None
class FundInfo(BaseModel):
fundName: Optional[str] = None
fundSize: Optional[str] = None
vintage: Optional[str] = None
status: Optional[str] = None
description: Optional[str] = None
class InvestorProfile(BaseModel):
websiteURL: Optional[str] = None
investorDescription: Optional[str] = None
investmentThesisFocus: Optional[List[str]] = None
headquarters: Optional[str] = None
overallAssetsUnderManagement: Optional[AUMInfo] = None
funds: Optional[List[FundInfo]] = None
class CSVRow(BaseModel):
name: str
website: Optional[str] = None
investment_firm_profile: Optional[str] = None
crunchbase_linkedin_urls: Optional[str] = None
crunchbase_firm_extract: Optional[str] = None
linkedin_investment_profile: Optional[str] = None
source_of_truth_profile: Optional[str] = None
def get_combined_description(self) -> str:
"""Combine all description fields for vector embedding"""
descriptions = []
if self.investment_firm_profile:
try:
profile_data = json.loads(self.investment_firm_profile)
if isinstance(profile_data, dict):
desc = profile_data.get("investorDescription", "")
if desc:
descriptions.append(desc)
except (json.JSONDecodeError, TypeError):
pass
if self.crunchbase_firm_extract:
descriptions.append(self.crunchbase_firm_extract)
if self.linkedin_investment_profile:
descriptions.append(self.linkedin_investment_profile)
if self.source_of_truth_profile:
descriptions.append(self.source_of_truth_profile)
return " ".join(descriptions)
def get_investment_focus(self) -> List[str]:
"""Extract investment thesis focus"""
if self.investment_firm_profile:
try:
profile_data = json.loads(self.investment_firm_profile)
if isinstance(profile_data, dict):
focus = profile_data.get("investmentThesisFocus", [])
if isinstance(focus, list):
return focus
except (json.JSONDecodeError, TypeError):
pass
return []
+60 -18
View File
@@ -1,44 +1,86 @@
import io
import pandas as pd
from api import investors
from db.db import db_dependency, init_database
from fastapi import FastAPI, File, UploadFile
from services.openrouter import InvestorProcessor
from db.db import Base, db_dependency, engine
from dotenv import load_dotenv
from fastapi import FastAPI, File, Form, UploadFile
from pydantic import BaseModel
from routers import companies, investors, projects
from schemas.router_schemas import InvestorList
from services.llm_parser import InvestorProcessor
from services.querying import QueryProcessor
app = FastAPI()
app.include_router(investors.router)
load_dotenv()
def init_database():
"""Initialize the database by creating all tables"""
Base.metadata.create_all(bind=engine)
init_database()
app = FastAPI()
# Request models
class QueryRequest(BaseModel):
question: str
class Config:
json_schema_extra = {
"example": {
"question": "Find me deep tech investors that do deals in Europe under 5 million."
}
}
@app.get("/")
def read_root():
def health():
return {"Hello": "World"}
@app.post("/parse-csv")
async def parse_csv(db: db_dependency, file: UploadFile = File(...)):
@app.post("/parse-csv", tags=["CSV Upload"], response_model=list[dict])
async def parse_csv(
db: db_dependency, file: UploadFile = File(...), is_investor: int = Form(...)
):
# Read uploaded CSV with pandas
content = await file.read()
df = pd.read_csv(io.StringIO(content.decode("utf-8")))
# Process the dataframe
processor = InvestorProcessor(sql_session=db)
results = await processor.process_csv(df)
processor = InvestorProcessor()
if is_investor == 1:
results = await processor.parse_investors(df)
else:
results = await processor.parse_companies(df)
# Convert Pydantic objects to dictionaries
return {"results": [r.dict() for r in results]}
return [r.model_dump() for r in results]
@app.post("/query")
async def query_investors(db: db_dependency, question: str):
processor = QueryProcessor(sql_session=db)
results = processor.process_query(question)
return {"results": results}
@app.post("/query", response_model=InvestorList, tags=["Querying"])
async def query_investors(request: QueryRequest):
"""
Query investors using natural language.
Supports queries like:
- "Show me seed stage investors"
- "Find fintech investors in Silicon Valley"
- "Growth stage investors with $5M+ check sizes"
- "Healthcare investors in Europe"
"""
processor = QueryProcessor()
results = processor.process_query(request.question)
return results
app.include_router(investors.router)
app.include_router(companies.router)
app.include_router(projects.router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app="main:app", host="localhost", port=8000, reload=True)
uvicorn.run(app="main:app", host="0.0.0.0", port=8585, reload=True)
-77
View File
@@ -1,77 +0,0 @@
from pydantic import BaseModel
from datetime import datetime
from typing import List, Optional
from enum import Enum
class InvestmentStage(str, Enum):
SEED = "seed"
SERIES_A = "series_a"
SERIES_B = "series_b"
SERIES_C = "series_c"
GROWTH = "growth"
LATE_STAGE = "late_stage"
class SectorSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
class CompanySchema(BaseModel):
id: int
name: str
industry: str
location: str
founded_year: Optional[int]
website: Optional[str]
created_at: Optional[datetime]
updated_at: Optional[datetime]
class Config:
from_attributes = True
class InvestorTeamMemberSchema(BaseModel):
id: int
name: str
role: str
email: str
class Config:
from_attributes = True
class InvestorSchema(BaseModel):
id: int
name: str
description: Optional[str]
aum: int
check_size_lower: int
check_size_upper: int
geographic_focus: str
stage_focus: InvestmentStage
number_of_investments: int
created_at: Optional[datetime]
updated_at: Optional[datetime]
class Config:
from_attributes = True
class InvestorData(BaseModel):
"""Comprehensive investor data schema for LLM processing"""
investor: InvestorSchema
portfolio_companies: List[CompanySchema] = []
team_members: List[InvestorTeamMemberSchema] = []
sectors: List[SectorSchema] = []
class Config:
from_attributes = True
class InvestorList(BaseModel):
investors: List[InvestorData]
-38
View File
@@ -1,38 +0,0 @@
from typing import List
from pydantic import BaseModel
class Investor(BaseModel):
name: str
aum: int
check_size: str
sector_focus: str
stage_focus: str
region: str
investment_thesis: str
investor_description: str
class InvestorList(BaseModel):
investor_list: List[Investor]
class QueryResponse(BaseModel):
name: str
aum: int
check_size: str
sector_focus: str
stage_focus: str
region: str
investment_thesis: str
investor_description: str
reason: str
class QueryRequest(BaseModel):
question: str
class QueryResponseList(BaseModel):
responses: List[QueryResponse]
Binary file not shown.
Binary file not shown.
Binary file not shown.
+232
View File
@@ -0,0 +1,232 @@
from typing import List, Optional
from db.db import get_db
from db.models import CompanyTable, InvestorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from schemas.router_schemas import CompanyData
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Company Routes"])
# Request schemas for creating/updating
class CompanyCreate(BaseModel):
name: str
industry: str
location: str
description: Optional[str] = None
founded_year: Optional[int] = None
website: Optional[str] = None
class CompanyUpdate(BaseModel):
name: Optional[str] = None
industry: Optional[str] = None
location: Optional[str] = None
description: Optional[str] = None
founded_year: Optional[int] = None
website: Optional[str] = None
@router.get("/companies", response_model=List[CompanyData])
def read_companies(db: Session = Depends(get_db)):
"""Get all companies with their investor relationships"""
companies = (
db.query(CompanyTable)
.filter(
CompanyTable.name.isnot(None),
CompanyTable.description.isnot(None)
)
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.all()
)
# Transform CompanyTable objects to CompanyData format
company_data_list = []
for company in companies:
company_data = CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=company.sectors,
)
company_data_list.append(company_data)
return company_data_list
@router.get("/companies/filter", response_model=List[CompanyData])
def filter_companies(
industry: Optional[str] = Query(
None, description="Filter by industry (partial match)"
),
location: Optional[str] = Query(
None, description="Filter by location (partial match)"
),
founded_after: Optional[int] = Query(None, description="Founded after year"),
founded_before: Optional[int] = Query(None, description="Founded before year"),
has_website: Optional[bool] = Query(
None, description="Filter companies with/without website"
),
investor_name: Optional[str] = Query(
None, description="Filter by investor name (partial match)"
),
db: Session = Depends(get_db),
):
"""Filter companies based on various criteria"""
# Start with base query
query = db.query(CompanyTable).options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
# Apply filters
if industry:
query = query.filter(CompanyTable.industry.ilike(f"%{industry}%"))
if location:
query = query.filter(CompanyTable.location.ilike(f"%{location}%"))
if founded_after is not None:
query = query.filter(CompanyTable.founded_year >= founded_after)
if founded_before is not None:
query = query.filter(CompanyTable.founded_year <= founded_before)
if has_website is not None:
if has_website:
query = query.filter(CompanyTable.website.isnot(None))
else:
query = query.filter(CompanyTable.website.is_(None))
# Filter by investor if provided
if investor_name:
query = query.join(CompanyTable.investors).filter(
InvestorTable.name.ilike(f"%{investor_name}%")
)
companies = query.all()
# Transform to CompanyData format
company_data_list = []
for company in companies:
company_data = CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=company.sectors,
)
company_data_list.append(company_data)
return company_data_list
@router.get("/companies/{company_id}", response_model=CompanyData)
def read_company(company_id: int, db: Session = Depends(get_db)):
"""Get a specific company by ID with its investors"""
company = (
db.query(CompanyTable)
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == company_id)
.first()
)
if not company:
raise HTTPException(status_code=404, detail="Company not found")
# Transform to CompanyData format
return CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=company.sectors,
)
@router.post("/companies", response_model=CompanyData)
def create_company(company: CompanyCreate, db: Session = Depends(get_db)):
"""Create a new company"""
db_company = CompanyTable(**company.dict())
db.add(db_company)
db.commit()
db.refresh(db_company)
# Reload with relationships
company_with_relations = (
db.query(CompanyTable)
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == db_company.id)
.first()
)
# Transform to CompanyData format
return CompanyData(
company=company_with_relations,
investors=company_with_relations.investors,
members=company_with_relations.members,
sectors=company_with_relations.sectors,
)
@router.put("/companies/{company_id}", response_model=CompanyData)
def update_company(
company_id: int, company: CompanyUpdate, db: Session = Depends(get_db)
):
"""Update an existing company"""
db_company = db.query(CompanyTable).filter(CompanyTable.id == company_id).first()
if not db_company:
raise HTTPException(status_code=404, detail="Company not found")
update_data = company.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_company, field, value)
db.commit()
db.refresh(db_company)
# Reload with relationships
company_with_relations = (
db.query(CompanyTable)
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == company_id)
.first()
)
# Transform to CompanyData format
return CompanyData(
company=company_with_relations,
investors=company_with_relations.investors,
members=company_with_relations.members,
sectors=company_with_relations.sectors,
)
@router.delete("/companies/{company_id}")
def delete_company(company_id: int, db: Session = Depends(get_db)):
"""Delete a company"""
db_company = db.query(CompanyTable).filter(CompanyTable.id == company_id).first()
if not db_company:
raise HTTPException(status_code=404, detail="Company not found")
db.delete(db_company)
db.commit()
return {"message": "Company deleted successfully"}
+281
View File
@@ -0,0 +1,281 @@
from typing import List, Optional
from db.db import get_db
from db.models import InvestorTable, SectorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from schemas.router_schemas import InvestmentStage, InvestorData
from services.querying import QueryProcessor
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Investor Routes"])
# Request schemas for creating/updating
class InvestorCreate(BaseModel):
name: str
description: Optional[str] = None
aum: int
check_size_lower: int
check_size_upper: int
geographic_focus: str
stage_focus: InvestmentStage
number_of_investments: int = 0
class InvestorUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
aum: Optional[int] = None
check_size_lower: Optional[int] = None
check_size_upper: Optional[int] = None
geographic_focus: Optional[str] = None
stage_focus: Optional[InvestmentStage] = None
number_of_investments: Optional[int] = None
@router.get("/investors", response_model=List[InvestorData])
def read_investors(db: Session = Depends(get_db)):
"""Get all investors with their related data"""
investors = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.all()
)
# Transform InvestorTable objects to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor, # This maps to InvestorSchema
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
return investor_data_list
@router.get("/investors/filter", response_model=List[InvestorData])
def filter_investors(
stage: Optional[InvestmentStage] = Query(
None, description="Filter by investment stage"
),
min_check_size: Optional[int] = Query(None, description="Minimum check size"),
max_check_size: Optional[int] = Query(None, description="Maximum check size"),
geography: Optional[str] = Query(
None, description="Geographic focus (partial match)"
),
sector: Optional[str] = Query(None, description="Sector name (partial match)"),
min_aum: Optional[int] = Query(None, description="Minimum AUM"),
max_aum: Optional[int] = Query(None, description="Maximum AUM"),
db: Session = Depends(get_db),
):
"""Filter investors based on various criteria"""
# Start with base query
query = db.query(InvestorTable).options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
# Apply filters
if stage:
query = query.filter(InvestorTable.stage_focus == stage)
if min_check_size is not None:
query = query.filter(InvestorTable.check_size_lower >= min_check_size)
if max_check_size is not None:
query = query.filter(InvestorTable.check_size_upper <= max_check_size)
if geography:
query = query.filter(InvestorTable.geographic_focus.ilike(f"%{geography}%"))
if min_aum is not None:
query = query.filter(InvestorTable.aum >= min_aum)
if max_aum is not None:
query = query.filter(InvestorTable.aum <= max_aum)
# Filter by sector if provided
if sector:
query = query.join(InvestorTable.sectors).filter(
SectorTable.name.ilike(f"%{sector}%")
)
investors = query.all()
# Transform to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
return investor_data_list
@router.get("/investors/{investor_id}", response_model=InvestorData)
def read_investor(investor_id: int, db: Session = Depends(get_db)):
"""Get a specific investor by ID"""
investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
)
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Transform to InvestorData format
return InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
@router.post("/investors", response_model=InvestorData)
def create_investor(investor: InvestorCreate, db: Session = Depends(get_db)):
"""Create a new investor"""
db_investor = InvestorTable(**investor.dict())
db.add(db_investor)
db.commit()
db.refresh(db_investor)
# Reload with relationships
investor_with_relations = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == db_investor.id)
.first()
)
# Transform to InvestorData format
return InvestorData(
investor=investor_with_relations,
portfolio_companies=investor_with_relations.portfolio_companies,
team_members=investor_with_relations.team_members,
sectors=investor_with_relations.sectors,
)
@router.get("/investors/{investor_id}/similar", response_model=List[InvestorData])
def find_similar_investors(
investor_id: int,
limit: int = Query(10, description="Maximum number of similar investors to return"),
db: Session = Depends(get_db)
):
"""Find investors similar to a given investor based on characteristics"""
# Get the target investor
target_investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
)
if not target_investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Get target investor's sector IDs for comparison
target_sector_ids = {sector.id for sector in target_investor.sectors}
# Query all other investors with their relationships
candidates = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id != investor_id)
.all()
)
# Calculate similarity scores
scored_investors = []
for candidate in candidates:
score = 0
# Stage focus match (30 points)
if candidate.stage_focus == target_investor.stage_focus:
score += 30
# Geographic focus match (20 points for exact, 10 for partial)
if candidate.geographic_focus and target_investor.geographic_focus:
if candidate.geographic_focus.lower() == target_investor.geographic_focus.lower():
score += 20
elif (candidate.geographic_focus.lower() in target_investor.geographic_focus.lower() or
target_investor.geographic_focus.lower() in candidate.geographic_focus.lower()):
score += 10
# Check size overlap (20 points max)
if (candidate.check_size_lower and candidate.check_size_upper and
target_investor.check_size_lower and target_investor.check_size_upper):
# Calculate overlap percentage
overlap_start = max(candidate.check_size_lower, target_investor.check_size_lower)
overlap_end = min(candidate.check_size_upper, target_investor.check_size_upper)
if overlap_end > overlap_start:
overlap = overlap_end - overlap_start
target_range = target_investor.check_size_upper - target_investor.check_size_lower
overlap_ratio = overlap / target_range if target_range > 0 else 0
score += int(20 * overlap_ratio)
# AUM similarity (15 points max)
if candidate.aum and target_investor.aum:
aum_diff = abs(candidate.aum - target_investor.aum)
max_aum = max(candidate.aum, target_investor.aum)
similarity_ratio = 1 - (aum_diff / max_aum) if max_aum > 0 else 0
score += int(15 * similarity_ratio)
# Sector overlap (30 points max)
candidate_sector_ids = {sector.id for sector in candidate.sectors}
if target_sector_ids and candidate_sector_ids:
common_sectors = target_sector_ids.intersection(candidate_sector_ids)
overlap_ratio = len(common_sectors) / len(target_sector_ids)
score += int(30 * overlap_ratio)
if score > 0: # Only include investors with some similarity
scored_investors.append((score, candidate))
# Sort by score (descending) and take top N
scored_investors.sort(key=lambda x: x[0], reverse=True)
similar_investors = [inv for score, inv in scored_investors[:limit]]
# Transform to InvestorData format
return [
InvestorData(
investor=inv,
portfolio_companies=inv.portfolio_companies,
team_members=inv.team_members,
sectors=inv.sectors,
)
for inv in similar_investors
]
+447
View File
@@ -0,0 +1,447 @@
from typing import List, Optional
from db.db import get_db
from db.models import (
CompanyTable,
InvestorTable,
ProjectTable,
SectorTable,
)
from fastapi import APIRouter, Depends, HTTPException, Query
from schemas.project_schemas import (
InvestmentStage,
ProjectCreate,
ProjectData,
ProjectUpdate,
)
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Project Routes"])
@router.get("/projects", response_model=List[ProjectData])
def read_projects(db: Session = Depends(get_db)):
"""Get all projects with their related data"""
projects = (
db.query(ProjectTable)
.options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.all()
)
# Transform ProjectTable objects to ProjectData format
project_data_list = []
for project in projects:
project_data = ProjectData(
project=project,
sector=project.sector,
investors=project.investors,
companies=project.companies,
)
project_data_list.append(project_data)
return project_data_list
@router.get("/projects/{project_id}", response_model=ProjectData)
def read_project(project_id: int, db: Session = Depends(get_db)):
"""Get a specific project by ID"""
project = (
db.query(ProjectTable)
.options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.filter(ProjectTable.id == project_id)
.first()
)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return ProjectData(
project=project,
sector=project.sector,
investors=project.investors,
companies=project.companies,
)
@router.post("/projects", response_model=ProjectData)
def create_project(project: ProjectCreate, db: Session = Depends(get_db)):
"""Create a new project"""
db_project = ProjectTable(**project.dict())
db.add(db_project)
db.commit()
db.refresh(db_project)
# Reload with relationships
db_project = (
db.query(ProjectTable)
.options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.filter(ProjectTable.id == db_project.id)
.first()
)
return ProjectData(
project=db_project,
sector=db_project.sector,
investors=db_project.investors,
companies=db_project.companies,
)
@router.put("/projects/{project_id}", response_model=ProjectData)
def update_project(
project_id: int, project: ProjectUpdate, db: Session = Depends(get_db)
):
"""Update an existing project"""
db_project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not db_project:
raise HTTPException(status_code=404, detail="Project not found")
# Update only provided fields
update_data = project.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_project, key, value)
db.commit()
db.refresh(db_project)
# Reload with relationships
db_project = (
db.query(ProjectTable)
.options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.filter(ProjectTable.id == project_id)
.first()
)
return ProjectData(
project=db_project,
sector=db_project.sector,
investors=db_project.investors,
companies=db_project.companies,
)
@router.delete("/projects/{project_id}")
def delete_project(project_id: int, db: Session = Depends(get_db)):
"""Delete a project"""
db_project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not db_project:
raise HTTPException(status_code=404, detail="Project not found")
db.delete(db_project)
db.commit()
return {"message": "Project deleted successfully"}
@router.get("/projects/filter", response_model=List[ProjectData])
def filter_projects(
stage: Optional[InvestmentStage] = Query(
None, description="Filter by project stage"
),
min_valuation: Optional[int] = Query(None, description="Minimum valuation"),
max_valuation: Optional[int] = Query(None, description="Maximum valuation"),
location: Optional[str] = Query(None, description="Location (partial match)"),
sector: Optional[str] = Query(None, description="Sector name (partial match)"),
investor_name: Optional[str] = Query(
None, description="Investor name (partial match)"
),
company_name: Optional[str] = Query(
None, description="Company name (partial match)"
),
db: Session = Depends(get_db),
):
"""Filter projects based on various criteria"""
# Start with base query
query = db.query(ProjectTable).options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
# Apply filters
if stage:
query = query.filter(ProjectTable.stage == stage)
if min_valuation is not None:
query = query.filter(ProjectTable.valuation >= min_valuation)
if max_valuation is not None:
query = query.filter(ProjectTable.valuation <= max_valuation)
if location:
query = query.filter(ProjectTable.location.ilike(f"%{location}%"))
if sector:
query = query.join(ProjectTable.sector).filter(
SectorTable.name.ilike(f"%{sector}%")
)
if investor_name:
query = query.join(ProjectTable.investors).filter(
InvestorTable.name.ilike(f"%{investor_name}%")
)
if company_name:
query = query.join(ProjectTable.companies).filter(
CompanyTable.name.ilike(f"%{company_name}%")
)
projects = query.all()
# Transform to ProjectData format
project_data_list = []
for project in projects:
project_data = ProjectData(
project=project,
sector=project.sector,
investors=project.investors,
companies=project.companies,
)
project_data_list.append(project_data)
return project_data_list
# Association management routes
@router.post("/projects/{project_id}/investors/{investor_id}")
def add_investor_to_project(
project_id: int, investor_id: int, db: Session = Depends(get_db)
):
"""Add an investor to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if investor exists
investor = db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Check if association already exists
if investor in project.investors:
raise HTTPException(
status_code=400, detail="Investor already associated with project"
)
# Add association
project.investors.append(investor)
db.commit()
return {"message": "Investor added to project successfully"}
@router.delete("/projects/{project_id}/investors/{investor_id}")
def remove_investor_from_project(
project_id: int, investor_id: int, db: Session = Depends(get_db)
):
"""Remove an investor from a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if investor exists
investor = db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Check if association exists
if investor not in project.investors:
raise HTTPException(
status_code=400, detail="Investor not associated with project"
)
# Remove association
project.investors.remove(investor)
db.commit()
return {"message": "Investor removed from project successfully"}
@router.post("/projects/{project_id}/companies/{company_id}")
def add_company_to_project(
project_id: int, company_id: int, db: Session = Depends(get_db)
):
"""Add a company to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if company exists
company = db.query(CompanyTable).filter(CompanyTable.id == company_id).first()
if not company:
raise HTTPException(status_code=404, detail="Company not found")
# Check if association already exists
if company in project.companies:
raise HTTPException(
status_code=400, detail="Company already associated with project"
)
# Add association
project.companies.append(company)
db.commit()
return {"message": "Company added to project successfully"}
@router.delete("/projects/{project_id}/companies/{company_id}")
def remove_company_from_project(
project_id: int, company_id: int, db: Session = Depends(get_db)
):
"""Remove a company from a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if company exists
company = db.query(CompanyTable).filter(CompanyTable.id == company_id).first()
if not company:
raise HTTPException(status_code=404, detail="Company not found")
# Check if association exists
if company not in project.companies:
raise HTTPException(
status_code=400, detail="Company not associated with project"
)
# Remove association
project.companies.remove(company)
db.commit()
return {"message": "Company removed from project successfully"}
@router.post("/projects/{project_id}/sectors/{sector_id}")
def add_sector_to_project(
project_id: int, sector_id: int, db: Session = Depends(get_db)
):
"""Add a sector to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if sector exists
sector = db.query(SectorTable).filter(SectorTable.id == sector_id).first()
if not sector:
raise HTTPException(status_code=404, detail="Sector not found")
# Check if association already exists
if sector in project.sector:
raise HTTPException(
status_code=400, detail="Sector already associated with project"
)
# Add association
project.sector.append(sector)
db.commit()
return {"message": "Sector added to project successfully"}
@router.delete("/projects/{project_id}/sectors/{sector_id}")
def remove_sector_from_project(
project_id: int, sector_id: int, db: Session = Depends(get_db)
):
"""Remove a sector from a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if sector exists
sector = db.query(SectorTable).filter(SectorTable.id == sector_id).first()
if not sector:
raise HTTPException(status_code=404, detail="Sector not found")
# Check if association exists
if sector not in project.sector:
raise HTTPException(
status_code=400, detail="Sector not associated with project"
)
# Remove association
project.sector.remove(sector)
db.commit()
return {"message": "Sector removed from project successfully"}
# Bulk association management
@router.post("/projects/{project_id}/investors")
def add_multiple_investors_to_project(
project_id: int, investor_ids: List[int], db: Session = Depends(get_db)
):
"""Add multiple investors to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Get all investors
investors = db.query(InvestorTable).filter(InvestorTable.id.in_(investor_ids)).all()
if len(investors) != len(investor_ids):
raise HTTPException(status_code=404, detail="One or more investors not found")
# Add associations (only if not already associated)
added_count = 0
for investor in investors:
if investor not in project.investors:
project.investors.append(investor)
added_count += 1
db.commit()
return {"message": f"Added {added_count} investors to project successfully"}
@router.post("/projects/{project_id}/companies")
def add_multiple_companies_to_project(
project_id: int, company_ids: List[int], db: Session = Depends(get_db)
):
"""Add multiple companies to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Get all companies
companies = db.query(CompanyTable).filter(CompanyTable.id.in_(company_ids)).all()
if len(companies) != len(company_ids):
raise HTTPException(status_code=404, detail="One or more companies not found")
# Add associations (only if not already associated)
added_count = 0
for company in companies:
if company not in project.companies:
project.companies.append(company)
added_count += 1
db.commit()
return {"message": f"Added {added_count} companies to project successfully"}
Binary file not shown.
Binary file not shown.
+117
View File
@@ -0,0 +1,117 @@
from datetime import datetime
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel
class InvestmentStage(str, Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
class SectorSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
class InvestorSchema(BaseModel):
id: int
name: str
description: Optional[str]
aum: int | None
check_size_lower: int | None
check_size_upper: int | None
geographic_focus: str | None
stage_focus: InvestmentStage
number_of_investments: int | None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class CompanySchema(BaseModel):
id: int
name: str
industry: str | None
location: str | None
description: Optional[str]
founded_year: Optional[int]
website: Optional[str]
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class ProjectSchema(BaseModel):
id: int
name: str
valuation: int | None
stage: InvestmentStage | None
location: str | None
description: Optional[str]
start_date: Optional[datetime]
end_date: Optional[datetime]
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class ProjectCreate(BaseModel):
name: str
valuation: Optional[int] = None
stage: Optional[InvestmentStage] = None
location: Optional[str] = None
description: Optional[str] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
class ProjectUpdate(BaseModel):
name: Optional[str] = None
valuation: Optional[int] = None
stage: Optional[InvestmentStage] = None
location: Optional[str] = None
description: Optional[str] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
class ProjectData(BaseModel):
"""Comprehensive project data schema"""
project: ProjectSchema
sector: List[SectorSchema]
investors: List[InvestorSchema]
companies: List[CompanySchema]
class Config:
from_attributes = True
class ProjectInvestorAssociation(BaseModel):
project_id: int
investor_id: int
class ProjectCompanyAssociation(BaseModel):
project_id: int
company_id: int
class ProjectSectorAssociation(BaseModel):
project_id: int
sector_id: int
+356
View File
@@ -0,0 +1,356 @@
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field, field_validator
class InvestmentStage(str, Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
class SectorSchema(BaseModel):
"""
Expert parser: Only extract sector information if clearly identifiable.
Leave name empty if uncertain about the sector classification.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Sector ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Sector name. Leave empty string if not clearly identifiable from the data.",
)
@field_validator("name", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional id field"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class InvestorMemberSchema(BaseModel):
"""
Expert parser: Only extract team member information if clearly identifiable.
Leave fields empty if uncertain about the member details.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Member ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Team member name. Leave empty string if not clearly identifiable.",
)
role: Optional[str] = Field(
default=None,
description="Team member role/title. Leave empty string if not clearly identifiable.",
)
email: Optional[str] = Field(
default=None,
description="Team member email. Leave empty string if not clearly identifiable or not provided.",
)
investor_id: Optional[int] = Field(
default=None,
ge=0,
description="Investor ID, must be 0 or greater. Use 0 if uncertain.",
)
@field_validator("name", "role", "email", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "investor_id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class CompanyMemberSchema(BaseModel):
"""
Expert parser: Only extract company member information if clearly identifiable.
Leave fields empty if uncertain about the member details.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Member ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Company member name. Leave empty if not clearly identifiable.",
)
linkedin: Optional[str] = Field(
default=None,
description="LinkedIn profile URL. Leave empty if not provided or uncertain.",
)
role: Optional[str] = Field(
default=None,
description="Company member role/title. Leave empty if not clearly identifiable.",
)
company_id: Optional[int] = Field(
default=None,
ge=0,
description="Company ID, must be 0 or greater. Use 0 if uncertain.",
)
@field_validator("name", "linkedin", "role", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "company_id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class CompanySchema(BaseModel):
"""
Expert parser: Only extract company information if clearly identifiable.
Leave optional fields empty if uncertain. Integer values must be 0 or greater.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Company ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Company name. Leave empty string if not clearly identifiable.",
)
industry: Optional[str] = Field(
default=None,
description="Company industry/sector. Leave empty string if not clearly identifiable.",
)
location: Optional[str] = Field(
default=None,
description="Company location/address. Leave empty string if not clearly identifiable.",
)
description: Optional[str] = Field(
default=None,
description="Company description. Leave empty if not clearly available or uncertain.",
)
founded_year: Optional[int] = Field(
default=None,
ge=0,
description="Year company was founded, must be 0 or greater. Leave None if not clearly identifiable or uncertain.",
)
website: Optional[str] = Field(
default=None,
description="Company website URL. Leave empty if not provided or uncertain.",
)
@field_validator(
"name", "industry", "location", "description", "website", mode="before"
)
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "founded_year", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for founded_year"""
if v == 0:
return None
return v
@field_validator("founded_year", mode="before")
@classmethod
def validate_founded_year(cls, v):
"""Expert parser: Only accept clearly identifiable founding years"""
if v is None or v == "Not Available" or v == "" or v == "Unknown":
return None
if isinstance(v, str):
try:
year = int(v)
return year if year >= 0 else None
except ValueError:
return None
return v if isinstance(v, int) and v >= 0 else None
class Config:
from_attributes = True
class InvestorSchema(BaseModel):
"""
Expert parser: Only extract investor information if clearly identifiable.
Leave optional fields empty if uncertain. All numeric values must be 0 or greater.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Investor ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Investor name. Do not return any special characters, Just the name as a string.",
)
description: Optional[str] = Field(
default=None,
description="Investor description. Leave empty if not clearly available or uncertain.",
)
aum: Optional[int] = Field(
default=None,
ge=0,
description="Assets Under Management in USD, must be 0 or greater. Use 0 if not clearly identifiable or uncertain.",
)
check_size_lower: Optional[int] = Field(
default=None,
ge=0,
description="Lower bound of typical investment check size in USD, must be 0 or greater. Use 0 if not clearly identifiable.",
)
check_size_upper: Optional[int] = Field(
default=None,
ge=0,
description="Upper bound of typical investment check size in USD, must be 0 or greater. Use 0 if not clearly identifiable.",
)
geographic_focus: Optional[str] = Field(
default=None,
description="Geographic investment focus. Do not return any special characters, Just locations separated by commas. Leave empty if not clearly identifiable.",
)
stage_focus: InvestmentStage = Field(
default=InvestmentStage.SEED,
description="Investment stage focus. Use SEED as default if uncertain.",
)
number_of_investments: Optional[int] = Field(
default=None,
ge=0,
description="Total number of investments made, must be 0 or greater. Use 0 if not clearly identifiable.",
)
@field_validator("name", "description", "geographic_focus", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator(
"id",
"aum",
"check_size_lower",
"check_size_upper",
"number_of_investments",
mode="before",
)
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class InvestorData(BaseModel):
"""
Expert parser: Comprehensive investor data schema for LLM processing.
Only populate fields with clearly identifiable information. Leave lists empty if uncertain.
"""
investor: InvestorSchema = Field(
description="Core investor information. Only populate with clearly identifiable data."
)
portfolio_companies: List[CompanySchema] = Field(
default=[],
description="List of portfolio companies. Leave empty if not clearly identifiable.",
)
team_members: List[InvestorMemberSchema] = Field(
default=[],
description="List of team members. Leave empty if not clearly identifiable.",
)
sectors: List[SectorSchema] = Field(
default=[],
description="List of investment sectors. Leave empty if not clearly identifiable.",
)
class Config:
from_attributes = True
class CompanyData(BaseModel):
"""
Expert parser: Comprehensive company data schema for LLM processing.
Only populate fields with clearly identifiable information. Leave lists empty if uncertain.
"""
company: CompanySchema = Field(
description="Core company information. Only populate with clearly identifiable data."
)
sectors: List[SectorSchema] = Field(
default=[],
description="List of company sectors. Leave empty if not clearly identifiable.",
)
members: List[CompanyMemberSchema] = Field(
default=[],
description="List of company members. Leave empty if not clearly identifiable.",
)
investors: List[InvestorSchema] = Field(
default=[],
description="List of investors. Leave empty if not clearly identifiable.",
)
class Config:
from_attributes = True
class InvestorList(BaseModel):
"""Expert parser: List of investors with clearly identifiable information only."""
investors: List[InvestorData] = Field(
default=[],
description="List of investors. Leave empty if no clearly identifiable investors.",
)
+101
View File
@@ -0,0 +1,101 @@
from datetime import datetime
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel
class InvestmentStage(str, Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
class SectorSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
class InvestorMemberSchema(BaseModel):
id: int
name: str
role: str | None
email: str | None
class Config:
from_attributes = True
class CompanyMemberSchema(BaseModel):
id: int
name: Optional[str]
linkedin: Optional[str]
role: Optional[str]
company_id: int
class Config:
from_attributes = True
class CompanySchema(BaseModel):
id: int
name: str
industry: str | None
location: str | None
description: Optional[str]
founded_year: Optional[int]
website: Optional[str]
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class InvestorSchema(BaseModel):
id: int
name: str
description: Optional[str]
aum: int | None
check_size_lower: int | None
check_size_upper: int | None
geographic_focus: str | None
stage_focus: InvestmentStage
number_of_investments: int | None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class InvestorData(BaseModel):
"""Comprehensive investor data schema for LLM processing"""
investor: InvestorSchema
portfolio_companies: List[CompanySchema]
team_members: List[InvestorMemberSchema]
sectors: List[SectorSchema]
class Config:
from_attributes = True
class CompanyData(BaseModel): # Renamed from CompaniesData for consistency
company: CompanySchema
sectors: List[SectorSchema]
members: List[CompanyMemberSchema]
investors: List[InvestorSchema]
class Config:
from_attributes = True
class InvestorList(BaseModel):
investors: List[InvestorData]
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+305 -336
View File
@@ -1,368 +1,337 @@
import json
import logging
import asyncio
import os
from typing import Any, Dict, Optional
from typing import Optional
import chromadb
import pandas as pd
from dotenv import load_dotenv
from openai import OpenAI
from db import get_session, init_database
from schema import CSVRow, Investor
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from db.db import get_db_session
from db.models import (
CompanyMember,
CompanyTable,
InvestorMember,
InvestorTable,
SectorTable,
)
from langchain_openai import ChatOpenAI
from schemas.py_schemas import CompanyData, InvestorData
from sqlalchemy.orm import Session
class LLMInvestorParser:
class InvestorProcessor:
def __init__(self):
# Initialize OpenAI client
self.openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Initialize ChromaDB
self.chroma_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.chroma_client.get_or_create_collection(
name="investor_descriptions",
metadata={
"description": "Investor descriptions and investment thesis focus"
},
)
# Initialize database
init_database()
def parse_json_field(self, json_str: str) -> Dict[str, Any]:
"""Safely parse JSON string with LLM assistance if needed"""
if not json_str or json_str.strip() == "":
return {}
try:
# Try direct JSON parsing first
return json.loads(json_str)
except json.JSONDecodeError:
# If direct parsing fails, use LLM to clean and parse
logger.info("Direct JSON parsing failed, using LLM to clean JSON")
return self._llm_clean_json(json_str)
def _llm_clean_json(self, malformed_json: str) -> Dict[str, Any]:
"""Use LLM to clean and parse malformed JSON"""
try:
prompt = f"""
The following text appears to be malformed JSON. Please clean it up and return valid JSON.
If it's not possible to create valid JSON, return an empty object {{}}.
Original text:
{malformed_json[:2000]} # Limit length for API
Return only the cleaned JSON, no explanations:
"""
response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
self.llm = ChatOpenAI(
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url="https://openrouter.ai/api/v1",
model="openai/gpt-4o-mini",
temperature=0,
)
cleaned_json = response.choices[0].message.content.strip()
return json.loads(cleaned_json)
self.investor_structured_llm = self.llm.with_structured_output(InvestorData)
self.company_structured_llm = self.llm.with_structured_output(CompanyData)
except Exception as e:
logger.error(f"LLM JSON cleaning failed: {e}")
return {}
def _get_or_create_sector(self, db: Session, sector_name: str) -> SectorTable:
"""Get existing sector or create new one"""
sector = db.query(SectorTable).filter(SectorTable.name == sector_name).first()
if not sector:
sector = SectorTable(name=sector_name)
db.add(sector)
db.flush() # Get the ID without committing
return sector
def extract_structured_data(self, csv_row: CSVRow) -> Dict[str, Any]:
"""Extract and structure data from CSV row using LLM"""
# Parse the investment firm profile
profile_data = {}
if csv_row.investment_firm_profile:
profile_data = self.parse_json_field(csv_row.investment_firm_profile)
# Create structured output
structured_data = {
"name": csv_row.name,
"website": csv_row.website or profile_data.get("websiteURL"),
"investor_description": profile_data.get("investorDescription", ""),
"investment_thesis_focus": profile_data.get("investmentThesisFocus", []),
"headquarters": profile_data.get("headquarters", ""),
"aum_info": profile_data.get("overallAssetsUnderManagement", {}),
"funds_info": profile_data.get("funds", []),
"crunchbase_urls": csv_row.crunchbase_linkedin_urls or "",
"crunchbase_extract": csv_row.crunchbase_firm_extract or "",
"linkedin_profile": csv_row.linkedin_investment_profile or "",
"source_truth_profile": csv_row.source_of_truth_profile or "",
}
return structured_data
def enhance_with_llm(self, investor_data: Dict[str, Any]) -> Dict[str, Any]:
"""Use LLM to enhance and standardize investor data"""
try:
# Combine all available text for context
context_text = " ".join(
[
investor_data.get("investor_description", ""),
investor_data.get("crunchbase_extract", ""),
investor_data.get("linkedin_profile", ""),
investor_data.get("source_truth_profile", ""),
]
def _save_investor_to_db(
self, db: Session, investor_data: InvestorData
) -> InvestorTable:
"""Save investor data to database"""
# Create investor record
investor = InvestorTable(
name=investor_data.investor.name,
description=investor_data.investor.description,
aum=investor_data.investor.aum,
check_size_lower=investor_data.investor.check_size_lower,
check_size_upper=investor_data.investor.check_size_upper,
geographic_focus=investor_data.investor.geographic_focus,
stage_focus=investor_data.investor.stage_focus,
number_of_investments=investor_data.investor.number_of_investments,
)
db.add(investor)
db.flush() # Get the ID
if not context_text.strip():
return investor_data
prompt = f"""
Based on the following information about an investor, please extract and standardize:
1. A concise investor description (2-3 sentences)
2. Investment thesis focus areas (list of specific focus areas)
3. Headquarters location (city, country format)
Investor: {investor_data["name"]}
Context: {context_text[:3000]} # Limit for API
Return in JSON format:
{{
"enhanced_description": "concise description here",
"standardized_focus": ["focus area 1", "focus area 2", ...],
"standardized_headquarters": "City, Country"
}}
"""
response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
# Add team members
for member_data in investor_data.team_members:
member = InvestorMember(
name=member_data.name,
role=member_data.role,
email=member_data.email,
investor_id=investor.id,
)
db.add(member)
enhanced_data = json.loads(response.choices[0].message.content)
# Add sectors
for sector_data in investor_data.sectors:
sector = self._get_or_create_sector(db, sector_data.name)
investor.sectors.append(sector)
# Update investor data with enhanced information
if enhanced_data.get("enhanced_description"):
investor_data["enhanced_description"] = enhanced_data[
"enhanced_description"
]
# Add portfolio companies
for company_schema in investor_data.portfolio_companies:
# Convert CompanySchema to CompanyData format
company_data = CompanyData(
company=company_schema,
sectors=[], # Will be empty for portfolio companies
members=[], # Will be empty for portfolio companies
investors=[], # Will be empty for portfolio companies
)
company = self._save_company_to_db(db, company_data, skip_investors=True)
investor.portfolio_companies.append(company)
if enhanced_data.get("standardized_focus"):
investor_data["standardized_focus"] = enhanced_data[
"standardized_focus"
]
return investor
if enhanced_data.get("standardized_headquarters"):
investor_data["standardized_headquarters"] = enhanced_data[
"standardized_headquarters"
]
return investor_data
except Exception as e:
logger.error(f"LLM enhancement failed for {investor_data['name']}: {e}")
return investor_data
def save_to_sql(self, investor_data: Dict[str, Any]) -> int:
"""Save investor data to SQL database"""
try:
with get_session() as session:
# Check if investor already exists
existing = (
session.query(Investor)
.filter_by(name=investor_data["name"])
def _save_company_to_db(
self, db: Session, company_data: CompanyData, skip_investors: bool = False
) -> CompanyTable:
"""Save company data to database"""
# Check if company already exists
existing_company = (
db.query(CompanyTable)
.filter(CompanyTable.name == company_data.company.name)
.first()
)
if existing_company:
return existing_company
if existing:
logger.info(f"Updating existing investor: {investor_data['name']}")
investor = existing
# Create company record
company = CompanyTable(
name=company_data.company.name,
industry=company_data.company.industry,
location=company_data.company.location,
description=company_data.company.description,
founded_year=company_data.company.founded_year,
website=company_data.company.website,
)
db.add(company)
db.flush() # Get the ID
# Add company members
for member_data in company_data.members:
if member_data.name: # Only add members with names
member = CompanyMember(
name=member_data.name,
linkedin=member_data.linkedin,
role=member_data.role,
company_id=company.id,
)
db.add(member)
# Add sectors
for sector_data in company_data.sectors:
sector = self._get_or_create_sector(db, sector_data.name)
company.sectors.append(sector)
# Add investors (if not skipping to avoid circular references)
if not skip_investors:
for investor_data in company_data.investors:
# Look for existing investor by name
existing_investor = (
db.query(InvestorTable)
.filter(InvestorTable.name == investor_data.name)
.first()
)
if existing_investor:
company.investors.append(existing_investor)
return company
async def _process_row(
self, row: pd.Series, row_idx: int, is_investor: bool = True
) -> Optional[InvestorData | CompanyData]:
"""Process a single row of data"""
# Clean values to remove control characters
cleaned_row = {}
for key, value in row.items():
if pd.notna(value):
# Convert to string and clean control characters
clean_value = (
str(value).replace("\n", " ").replace("\r", " ").replace("\t", " ")
)
# Remove other control characters
clean_value = "".join(
char
for char in clean_value
if ord(char) >= 32 or char in ["\n", "\r", "\t"]
)
cleaned_row[key] = clean_value
row_str = ", ".join([f"{key}: {value}" for key, value in cleaned_row.items()])
try:
print(f"Processing row {row_idx + 1}...")
if is_investor:
result = await self.investor_structured_llm.ainvoke(row_str)
else:
logger.info(f"Creating new investor: {investor_data['name']}")
investor = Investor()
# Map data to investor object
investor.name = investor_data["name"]
investor.website = investor_data.get("website")
investor.investor_description = investor_data.get(
"enhanced_description"
) or investor_data.get("investor_description")
investor.investment_thesis_focus = investor_data.get(
"standardized_focus"
) or investor_data.get("investment_thesis_focus")
investor.headquarters = investor_data.get(
"standardized_headquarters"
) or investor_data.get("headquarters")
# AUM information
aum_info = investor_data.get("aum_info", {})
investor.aum_amount = aum_info.get("aumAmount")
investor.aum_as_of_date = aum_info.get("asOfDate")
investor.aum_source_url = aum_info.get("sourceUrl")
# Fund information
investor.funds_info = investor_data.get("funds_info", [])
# Raw data
investor.crunchbase_urls = investor_data.get("crunchbase_urls")
investor.crunchbase_extract = investor_data.get("crunchbase_extract")
investor.linkedin_profile = investor_data.get("linkedin_profile")
investor.source_truth_profile = investor_data.get(
"source_truth_profile"
)
if not existing:
session.add(investor)
session.flush() # Get the ID
return investor.id
result = await self.company_structured_llm.ainvoke(row_str)
if result:
return result.model_dump()
return None
except Exception as e:
logger.error(f"Failed to save to SQL: {e}")
raise
def save_to_vector_db(self, investor_id: int, investor_data: Dict[str, Any]):
"""Save investor description and focus to ChromaDB"""
try:
# Prepare text for embedding
description_text = investor_data.get(
"enhanced_description"
) or investor_data.get("investor_description", "")
focus_areas = investor_data.get("standardized_focus") or investor_data.get(
"investment_thesis_focus", []
)
if isinstance(focus_areas, list):
focus_text = " ".join(focus_areas)
else:
focus_text = str(focus_areas)
# Combine description and focus for embedding
combined_text = f"{description_text} {focus_text}".strip()
if not combined_text:
logger.warning(f"No text to embed for investor {investor_data['name']}")
return
# Create metadata
metadata = {
"investor_id": investor_id,
"name": investor_data["name"],
"website": investor_data.get("website", ""),
"headquarters": investor_data.get("standardized_headquarters")
or investor_data.get("headquarters", ""),
"focus_areas_count": len(focus_areas)
if isinstance(focus_areas, list)
else 0,
}
# Add to ChromaDB
self.collection.add(
documents=[combined_text],
metadatas=[metadata],
ids=[f"investor_{investor_id}"],
)
logger.info(f"Added investor {investor_data['name']} to vector database")
except Exception as e:
logger.error(f"Failed to save to vector DB: {e}")
def process_csv_file(self, csv_file_path: str, limit: Optional[int] = None):
"""Process the entire CSV file"""
logger.info(f"Starting to process CSV file: {csv_file_path}")
# Read CSV
df = pd.read_csv(csv_file_path)
logger.info(f"Loaded {len(df)} rows from CSV")
if limit:
df = df.head(limit)
logger.info(f"Processing limited to {limit} rows")
processed_count = 0
error_count = 0
for index, row in df.iterrows():
try:
logger.info(f"Processing row {index + 1}/{len(df)}: {row['Name']}")
# Create CSVRow object
csv_row = CSVRow(
name=row["Name"],
website=row.get("Website"),
investment_firm_profile=row.get("Investment Firm Profile"),
crunchbase_linkedin_urls=row.get("Crunchbase & LinkedIn URLs"),
crunchbase_firm_extract=row.get("Crunchbase Firm Extract"),
linkedin_investment_profile=row.get("LinkedIn Investment Profile"),
source_of_truth_profile=row.get("Source of Truth Profile"),
)
# Extract structured data
structured_data = self.extract_structured_data(csv_row)
# Enhance with LLM
enhanced_data = self.enhance_with_llm(structured_data)
# Save to SQL database
investor_id = self.save_to_sql(enhanced_data)
# Save to vector database
self.save_to_vector_db(investor_id, enhanced_data)
processed_count += 1
# Progress update every 10 rows
if (index + 1) % 10 == 0:
logger.info(
f"Processed {processed_count} rows successfully, {error_count} errors"
)
except Exception as e:
error_count += 1
logger.error(
f"Error processing row {index + 1} ({row.get('Name', 'Unknown')}): {e}"
)
continue
logger.info(
f"Processing complete! Processed: {processed_count}, Errors: {error_count}"
)
return processed_count, error_count
def search_investors(self, query: str, limit: int = 5):
"""Search investors using vector similarity"""
try:
results = self.collection.query(query_texts=[query], n_results=limit)
return results
except Exception as e:
logger.error(f"Search failed: {e}")
print(f"Error processing row {row_idx + 1}: {e}")
return None
async def parse_investors(self, df, save_to_db: bool = True):
"""Parse investors from DataFrame and optionally save to database"""
investors = []
df = df[20:]
db = None
if save_to_db:
db = get_db_session()
def main():
"""Main function to run the parser"""
parser = LLMInvestorParser()
try:
# Process rows in batches asynchronously
batch_size = 20 # Adjust batch size as needed
rows = [(idx, row) for idx, row in df.iterrows()]
# Process the CSV file
csv_file = "/home/oluwasanmi/Documents/Work/MKD/anton_wireframe/New Excerpt 5 investors - Sheet1 parse.csv"
for i in range(0, len(rows), batch_size):
batch = rows[i : i + batch_size]
# Start with a small sample for testing
processed, errors = parser.process_csv_file(csv_file, limit=5)
# Process batch asynchronously
tasks = [
self._process_row(row, idx, is_investor=True) for idx, row in batch
]
print("\nProcessing complete!")
print(f"Successfully processed: {processed} investors")
print(f"Errors encountered: {errors}")
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Test search functionality
print("\nTesting search functionality...")
results = parser.search_investors("bioeconomy circular economy")
if results:
print(f"Found {len(results['documents'][0])} similar investors")
for i, doc in enumerate(results["documents"][0]):
print(f" {i + 1}. {results['metadatas'][0][i]['name']}")
# Handle results from batch
for (idx, row), result in zip(batch, batch_results):
if isinstance(result, Exception):
print(f"Error processing row {idx}: {result}")
if db:
db.rollback()
continue
if result:
# Convert dict to InvestorData if needed
if isinstance(result, dict):
investor_data = InvestorData(**result)
else:
investor_data = result
investors.append(investor_data)
# Save to database if requested
if save_to_db and db:
try:
saved_investor = self._save_investor_to_db(
db, investor_data
)
db.commit()
print(
f"✅ Saved investor '{saved_investor.name}' to database"
)
except Exception as e:
db.rollback()
print(f"❌ Failed to save investor to database: {e}")
print(
f"Completed batch {i // batch_size + 1} of {(len(rows) + batch_size - 1) // batch_size}"
)
except Exception as e:
print(f"Error in batch processing: {e}")
if db:
db.rollback()
finally:
if db:
db.close()
return investors
async def parse_companies(self, df, save_to_db: bool = True):
"""Parse companies from DataFrame and optionally save to database"""
companies = []
df = df[20:]
db = None
if save_to_db:
db = get_db_session()
try:
# Process rows in batches asynchronously
batch_size = 20 # Adjust batch size as needed
rows = [(idx, row) for idx, row in df.iterrows()]
for i in range(0, len(rows), batch_size):
batch = rows[i : i + batch_size]
# Process batch asynchronously
tasks = [
self._process_row(row, idx, is_investor=False) for idx, row in batch
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results from batch
for (idx, row), result in zip(batch, batch_results):
if isinstance(result, Exception):
print(f"Error processing row {idx}: {result}")
if db:
db.rollback()
continue
if result:
# Convert dict to CompanyData if needed
if isinstance(result, dict):
company_data = CompanyData(**result)
else:
company_data = result
companies.append(company_data)
# Save to database if requested
if save_to_db and db:
try:
saved_company = self._save_company_to_db(
db, company_data
)
db.commit()
print(
f"✅ Saved company '{saved_company.name}' to database"
)
except Exception as e:
db.rollback()
print(f"❌ Failed to save company to database: {e}")
print(
f"Completed batch {i // batch_size + 1} of {(len(rows) + batch_size - 1) // batch_size}"
)
except Exception as e:
print(f"Error processing row {idx}: {e}")
if db:
db.rollback()
finally:
if db:
db.close()
return companies
if __name__ == "__main__":
main()
# async def main():
# """Main execution function"""
# # Initialize database tables
# print("🔧 Initializing database...")
# init_database()
# # Create processor
# processor = InvestorProcessor()
# print("📊 Processing companies...")
# companies = await processor.parse_companies(
# "data/19 Companies data.csv", save_to_db=True
# )
# print(f"Processed {len(companies)} companies")
# print("\n💰 Processing investors...")
# investors = await processor.parse_investors(
# "data/19 Investors data.csv", save_to_db=True
# )
# print(f"Processed {len(investors)} investors")
# print("\n✨ Processing complete!")
# if __name__ == "__main__":
# asyncio.run(main())
-293
View File
@@ -1,293 +0,0 @@
import asyncio
from typing import List, Optional
import chromadb
import pandas as pd
from db.models import CompanyTable, InvestorTable, InvestorTeamMember, SectorTable
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from py_schemas import InvestorData
from pydantic import BaseModel
from settings import settings
class InvestorList(BaseModel):
"""Schema for LLM structured output"""
investor_list: List[InvestorData]
class InvestorProcessor:
def __init__(
self,
sql_session: Optional[object] = None,
vector_db_client: Optional[object] = None,
):
self.template = """You are an expert data extraction assistant. Extract investor information from the provided CSV data and return it as a list of structured records.
Given the following CSV data rows:
{question}
For each row, extract and structure the following fields for the investor:
- name: The investor's full name
- description: Description of the investor
- aum: Assets under management (as integer, use 0 if not available)
- check_size_lower: Lower bound of investment check size (as integer)
- check_size_upper: Upper bound of investment check size (as integer)
- geographic_focus: Geographic region focus
- stage_focus: Investment stage focus (must be one of: seed, series_a, series_b, series_c, growth, late_stage)
- number_of_investments: Number of investments made (default 0)
Also extract related data:
- portfolio_companies: List of companies they've invested in
- team_members: List of team members with name, role, email
- sectors: List of sectors they focus on
Important:
- If a field is not available, use appropriate defaults
- stage_focus must be one of the valid enum values
- Return clean, valid JSON only
Return the data as a structured list of comprehensive investor data."""
self.prompt = PromptTemplate(
template=self.template, input_variables=["question"]
)
self.llm = ChatOpenAI(
api_key=settings.OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
model="google/gemini-2.5-flash-lite",
temperature=0,
)
self.structured_llm = self.llm.with_structured_output(InvestorList)
self.sql_session = sql_session
self.vector_db_client = vector_db_client
self.vector_db_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.vector_db_client.get_or_create_collection(
name="investor_descriptions",
metadata={
"description": "Investor descriptions and investment thesis focus"
},
)
async def _process_batch(
self, batch: pd.DataFrame, batch_idx: int
) -> List[InvestorData]:
"""Process a single batch of data"""
# Convert batch to string representation - clean the data
batch_str = ""
for idx, row in batch.iterrows():
# Clean values to remove control characters
cleaned_row = {}
for key, value in row.items():
if pd.notna(value):
# Convert to string and clean control characters
clean_value = (
str(value)
.replace("\n", " ")
.replace("\r", " ")
.replace("\t", " ")
)
# Remove other control characters
clean_value = "".join(
char
for char in clean_value
if ord(char) >= 32 or char in ["\n", "\r", "\t"]
)
cleaned_row[key] = clean_value
row_str = ", ".join(
[f"{key}: {value}" for key, value in cleaned_row.items()]
)
batch_str += f"Row {idx + 1}: {row_str}\n"
try:
print(f"Processing batch {batch_idx + 1}...")
batch_results = await self.structured_llm.ainvoke(batch_str)
return batch_results.investor_list
except Exception as e:
print(f"Error processing batch {batch_idx + 1}: {e}")
return []
async def _save_to_sql(self, investor_data_list: List[InvestorData]) -> None:
"""Save investors and related data to SQL database"""
if not self.sql_session:
return
try:
for investor_data in investor_data_list:
# Save investor
db_investor = InvestorTable(
name=investor_data.investor.name,
description=investor_data.investor.description,
aum=investor_data.investor.aum,
check_size_lower=investor_data.investor.check_size_lower,
check_size_upper=investor_data.investor.check_size_upper,
geographic_focus=investor_data.investor.geographic_focus,
stage_focus=investor_data.investor.stage_focus,
number_of_investments=investor_data.investor.number_of_investments,
)
self.sql_session.add(db_investor)
self.sql_session.flush() # Get the ID
# Save sectors and create associations
for sector_data in investor_data.sectors:
# Check if sector exists, create if not
existing_sector = (
self.sql_session.query(SectorTable)
.filter(SectorTable.name == sector_data.name)
.first()
)
if not existing_sector:
db_sector = SectorTable(name=sector_data.name)
self.sql_session.add(db_sector)
self.sql_session.flush()
# Add sector to investor's sectors
db_investor.sectors.append(db_sector)
else:
# Add existing sector to investor if not already there
if existing_sector not in db_investor.sectors:
db_investor.sectors.append(existing_sector)
# Save companies and create portfolio associations
for company_data in investor_data.portfolio_companies:
# Check if company exists, create if not
existing_company = (
self.sql_session.query(CompanyTable)
.filter(CompanyTable.name == company_data.name)
.first()
)
if not existing_company:
db_company = CompanyTable(
name=company_data.name,
industry=company_data.industry,
location=company_data.location,
founded_year=company_data.founded_year,
website=company_data.website,
)
self.sql_session.add(db_company)
self.sql_session.flush()
# Add to investor's portfolio
db_investor.portfolio_companies.append(db_company)
else:
# Add existing company to portfolio if not already there
if existing_company not in db_investor.portfolio_companies:
db_investor.portfolio_companies.append(existing_company)
# Save team members
for team_member_data in investor_data.team_members:
# Check if team member exists
existing_member = (
self.sql_session.query(InvestorTeamMember)
.filter(InvestorTeamMember.email == team_member_data.email)
.first()
)
if not existing_member:
db_team_member = InvestorTeamMember(
name=team_member_data.name,
role=team_member_data.role,
email=team_member_data.email,
investor_id=db_investor.id,
)
self.sql_session.add(db_team_member)
self.sql_session.commit()
print(f"Successfully saved {len(investor_data_list)} investors to database")
except Exception as e:
self.sql_session.rollback()
print(f"Error saving to SQL database: {e}")
raise
async def _save_to_vector_db(self, investor_data_list: List[InvestorData]) -> None:
"""Save investors to vector database"""
if not self.vector_db_client:
return
documents = []
metadatas = []
ids = []
for i, investor_data in enumerate(investor_data_list):
investor = investor_data.investor
sectors = ", ".join([s.name for s in investor_data.sectors])
companies = ", ".join([c.name for c in investor_data.portfolio_companies])
doc_text = f"""
Investor: {investor.name}
Description: {investor.description or "N/A"}
AUM: ${investor.aum:,}
Check Size: ${investor.check_size_lower:,} - ${investor.check_size_upper:,}
Geographic Focus: {investor.geographic_focus}
Stage Focus: {investor.stage_focus.value}
Sectors: {sectors}
Portfolio Companies: {companies}
""".strip()
documents.append(doc_text)
metadatas.append(
{
"name": investor.name,
"stage_focus": investor.stage_focus.value,
"geographic_focus": investor.geographic_focus,
"aum": investor.aum,
}
)
ids.append(
f"investor_{i}_{investor.name.replace(' ', '_').replace('/', '_')}"
)
if documents:
try:
self.collection.add(documents=documents, metadatas=metadatas, ids=ids)
print(
f"Successfully saved {len(documents)} investors to vector database"
)
except Exception as e:
print(f"Error saving to vector database: {e}")
async def process_csv(
self, df: pd.DataFrame, batch_size: int = 10, max_concurrent: int = 10
) -> List[InvestorData]:
"""Process CSV data in parallel batches and save to databases"""
results = []
# Create batches
batches = []
for i in range(0, len(df), batch_size):
batch = df.iloc[i : i + batch_size]
batches.append((batch, i // batch_size))
# Process batches with concurrency control
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(batch_data):
batch, batch_idx = batch_data
async with semaphore:
return await self._process_batch(batch, batch_idx)
# Execute all batches concurrently
batch_results = await asyncio.gather(
*[process_with_semaphore(batch_data) for batch_data in batches],
return_exceptions=True,
)
# Collect results, filtering out exceptions
for batch_result in batch_results:
if not isinstance(batch_result, Exception):
results.extend(batch_result)
# Save to databases
if results:
print(f"Successfully processed {len(results)} investors")
await self._save_to_sql(results)
await self._save_to_vector_db(results)
return results
+94 -59
View File
@@ -1,83 +1,118 @@
from typing import Optional
import os
from typing import List
import chromadb
from db.db import DATABASE_URL, get_db
from db.models import InvestorTable
from langchain import hub
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from py_schemas import InvestorList
from settings import settings
from schemas.py_schemas import InvestorData, InvestorList
from sqlalchemy.orm import selectinload
# Connect to SQLite
prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt")
db = SQLDatabase.from_uri("sqlite:///investors.db")
system_message = (
prompt_template.format(dialect="SQLite", top_k=5)
+ "\n Get answers from the Sql database and the vector database"
)
db = SQLDatabase.from_uri(DATABASE_URL)
class QueryProcessor:
def __init__(
self,
sql_session: Optional[object] = None,
vector_db_client: Optional[object] = None,
):
def __init__(self):
self.llm = ChatOpenAI(
api_key=settings.OPENROUTER_API_KEY,
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url="https://openrouter.ai/api/v1",
model="google/gemini-2.5-flash-lite",
temperature=0.3,
model="openai/gpt-4o-mini",
temperature=0,
)
self.toolkit = SQLDatabaseToolkit(db=db, llm=self.llm)
# Update system message to specifically request only investor IDs
system_message_updated = (
prompt_template.format(dialect="SQLite", top_k=5)
+ "\n\nIMPORTANT: You must ONLY return the investor IDs (id field) that match the user's criteria. "
+ "Do NOT return any other information, explanations, or data. "
+ "Your response should be ONLY a comma-separated list of numbers representing the investor IDs. "
+ "Example format: 1, 5, 12, 23"
)
self.agent = create_react_agent(
model=self.llm,
tools=self.toolkit.get_tools() + [self.query_vector_database],
prompt=system_message,
response_format=InvestorList,
tools=self.toolkit.get_tools(),
prompt=system_message_updated,
)
self.vector_db_client = vector_db_client
self.vector_db_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.vector_db_client.get_or_create_collection(
name="investor_descriptions",
metadata={
"description": "Investor descriptions and investment thesis focus"
},
)
def query_sql_database(self, query: str) -> Optional[InvestorList]:
"""Query the SQL database for investor information."""
if not self.sql_session:
return None
# Implement SQL querying logic here
result = self.sql_session.execute(query)
investors = result.scalars().all()
return InvestorList(investors=investors)
def query_vector_database(self, query: str) -> Optional[InvestorList]:
"""Query the vector database for investor information."""
if not self.vector_db_client:
return None
print("VECTOR STORE WAS CALLED")
# Query the collection directly, not passing collection as parameter
results = self.collection.query(
query_texts=[query], # ChromaDB expects a list of query texts
n_results=3, # Specify how many results you want
)
print(results)
# ChromaDB returns results in a different structure
# results will have 'documents', 'metadatas', 'ids', 'distances'
return results
def process_query(self, question: str) -> InvestorList:
"""Process a query using the LLM and return structured investor data."""
"""Process a query using the LLM and return investor data."""
# Let the LLM handle all database interactions and filtering to get IDs
response = self.agent.invoke(
{"messages": [("user", question)]},
)
return response
# Extract the actual message content
ai_response = (
response["messages"][-1].content if response.get("messages") else ""
)
# Extract investor IDs from the AI response
investor_ids = self._extract_investor_ids_from_response(ai_response)
# Fetch full investor data using the IDs
return self._fetch_investors_by_ids(investor_ids)
def _extract_investor_ids_from_response(self, ai_response: str) -> List[int]:
"""Extract investor IDs from AI response."""
import re
investor_ids = []
try:
# Try multiple patterns to extract IDs from the response
# Pattern 1: Simple numbers (assuming they are IDs)
numbers = re.findall(r"\b\d+\b", ai_response)
investor_ids = [int(num) for num in numbers]
# Pattern 2: If response contains explicit ID references
id_matches = re.findall(r"\bid[:\s]*(\d+)", ai_response.lower())
if id_matches:
investor_ids = [int(id_str) for id_str in id_matches]
except Exception as e:
print(f"Error extracting IDs from response: {e}")
return []
return investor_ids
def _fetch_investors_by_ids(self, investor_ids: List[int]) -> InvestorList:
"""Fetch investors with all their relationships from the database using IDs."""
if not investor_ids:
return InvestorList(investors=[])
# Get database session
db_session = next(get_db())
try:
# Build query with all relationships loaded
query = (
db_session.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id.in_(investor_ids))
)
investors = query.all()
# Transform to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
return InvestorList(investors=investor_data_list)
finally:
db_session.close()
-11
View File
@@ -1,11 +0,0 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
OPENROUTER_API_KEY: str
class Config:
env_file = ".env"
settings = Settings()
+139 -16
View File
@@ -1,16 +1,139 @@
# Core dependencies
pandas>=2.0.0
sqlalchemy>=2.0.0
pydantic>=2.0.0
# Vector database
chromadb>=0.4.0
# LLM integration
openai>=1.0.0
# Environment management
python-dotenv>=1.0.0
# Additional dependencies for data processing
typing-extensions>=4.0.0
aiohappyeyeballs==2.6.1
aiohttp==3.12.15
aiosignal==1.4.0
annotated-types==0.7.0
anyio==4.10.0
attrs==25.3.0
backoff==2.2.1
bcrypt==4.3.0
build==1.3.0
cachetools==5.5.2
certifi==2025.8.3
charset-normalizer==3.4.3
chromadb==1.0.20
click==8.2.1
coloredlogs==15.0.1
dataclasses-json==0.6.7
distro==1.9.0
dnspython==2.7.0
durationpy==0.10
email-validator==2.3.0
fastapi==0.116.1
fastapi-cli==0.0.8
fastapi-cloud-cli==0.1.5
filelock==3.19.1
flatbuffers==25.2.10
frozenlist==1.7.0
fsspec==2025.7.0
google-auth==2.40.3
googleapis-common-protos==1.70.0
greenlet==3.2.4
grpcio==1.74.0
h11==0.16.0
hf-xet==1.1.8
httpcore==1.0.9
httptools==0.6.4
httpx==0.28.1
httpx-sse==0.4.1
huggingface-hub==0.34.4
humanfriendly==10.0
idna==3.10
importlib-metadata==8.7.0
importlib-resources==6.5.2
itsdangerous==2.2.0
jinja2==3.1.6
jiter==0.10.0
jsonpatch==1.33
jsonpointer==3.0.0
jsonschema==4.25.1
jsonschema-specifications==2025.4.1
kubernetes==33.1.0
langchain==0.3.27
langchain-community==0.3.29
langchain-core==0.3.75
langchain-openai==0.3.32
langchain-text-splitters==0.3.10
langgraph==0.6.6
langgraph-checkpoint==2.1.1
langgraph-prebuilt==0.6.4
langgraph-sdk==0.2.4
langsmith==0.4.20
markdown-it-py==4.0.0
markupsafe==3.0.2
marshmallow==3.26.1
mdurl==0.1.2
mmh3==5.2.0
mpmath==1.3.0
multidict==6.6.4
mypy-extensions==1.1.0
numpy==2.3.2
oauthlib==3.3.1
onnxruntime==1.22.1
openai==1.102.0
opentelemetry-api==1.36.0
opentelemetry-exporter-otlp-proto-common==1.36.0
opentelemetry-exporter-otlp-proto-grpc==1.36.0
opentelemetry-proto==1.36.0
opentelemetry-sdk==1.36.0
opentelemetry-semantic-conventions==0.57b0
orjson==3.11.3
ormsgpack==1.10.0
overrides==7.7.0
packaging==25.0
pandas==2.3.2
pip==25.2
posthog==5.4.0
propcache==0.3.2
protobuf==6.32.0
pyasn1==0.6.1
pyasn1-modules==0.4.2
pybase64==1.4.2
pydantic==2.11.7
pydantic-core==2.33.2
pydantic-extra-types==2.10.5
pydantic-settings==2.10.1
pygments==2.19.2
pypika==0.48.9
pyproject-hooks==1.2.0
python-dateutil==2.9.0.post0
python-dotenv==1.1.1
python-multipart==0.0.20
pytz==2025.2
pyyaml==6.0.2
referencing==0.36.2
regex==2025.7.34
requests==2.32.5
requests-oauthlib==2.0.0
requests-toolbelt==1.0.0
rich==14.1.0
rich-toolkit==0.15.0
rignore==0.6.4
rpds-py==0.27.1
rsa==4.9.1
sentry-sdk==2.35.1
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
sqlalchemy==2.0.43
starlette==0.47.3
sympy==1.14.0
tenacity==9.1.2
tiktoken==0.11.0
tokenizers==0.21.4
tqdm==4.67.1
typer==0.16.1
typing-extensions==4.15.0
typing-inspect==0.9.0
typing-inspection==0.4.1
tzdata==2025.2
ujson==5.11.0
urllib3==2.5.0
uvicorn==0.35.0
uvloop==0.21.0
watchfiles==1.1.0
websocket-client==1.8.0
websockets==15.0.1
xxhash==3.5.0
yarl==1.20.1
zipp==3.23.0
zstandard==0.24.0