22 Commits

Author SHA1 Message Date
bolade cefe89bb67 feat: Update query endpoint to return paginated investment responses with fund details 2025-10-08 14:19:36 +01:00
bolade 58722f1102 feat: Enhance investor and company parsing with asynchronous batch processing 2025-10-08 13:29:25 +01:00
bolade be6fde9ba2 feat: Simplify company profile processing to only extract founded_year and key_executives 2025-10-08 13:20:08 +01:00
bolade 37e1ad01c4 feat: Update investor and fund schemas for streamlined investment responses 2025-10-08 11:48:26 +01:00
bolade faf92a3b47 feat: Implement pagination for companies, investors, and projects endpoints 2025-10-08 10:25:52 +01:00
bolade 26a1197db0 Refactor code structure for improved readability and maintainability 2025-10-08 10:03:30 +01:00
bolade 84e3c7b72a feat: Implement database ingestion for investors and companies
- Added main ingestion logic in main.py to process CSV files for investors and companies.
- Implemented data cleaning functions for names, strings, integers, and websites.
- Established relationships between investors, companies, and sectors using SQLAlchemy ORM.
- Created models for investors, companies, sectors, and their relationships in models.py.
- Set up logging for error tracking during data processing.
- Initialized database and created necessary tables.
2025-10-07 20:01:19 +01:00
bolade a9589e54f3 feat: Refactor Fund schema to use many-to-many relationships for investment stages and sectors
- Updated FundTable to replace JSON fields for investment stages and sectors with relationships.
- Introduced InvestmentStageTable and fund_investment_stages association table.
- Created fund_sectors association table for many-to-many relationship with sectors.
- Changed geographic_focus from JSON array to a simple string.
- Migrated existing data to new schema, ensuring data integrity and normalization.
- Updated related schemas, routers, and services to reflect new structure.
- Added migration script to handle data transformation and schema updates.
- Implemented tests to verify new relationships and data integrity.
2025-10-07 15:57:29 +01:00
bolade d341cacb9a Refactor investor and fund schemas to support new check size range
- Removed deprecated `stage_focus` column from `InvestorTable` and `InvestorSchema`.
- Updated `FundTable` to change `fund_size` from VARCHAR to INTEGER and added `check_size_lower` and `check_size_upper` columns.
- Modified API routes to return investor-fund combinations as separate entries.
- Created new `InvestorFundData` schema for combined investor-fund responses.
- Implemented LLM parsing for check size range from estimated investment size.
- Updated database migration script to reflect schema changes and ensure data integrity.
- Removed obsolete verification and test scripts related to the old schema.
2025-10-07 15:24:36 +01:00
bolade c0fbbdd917 Implement manual JSON parsing for company profiles; enhance data extraction and processing efficiency; add comprehensive test script for validation 2025-10-07 12:07:43 +01:00
bolade 1f3f08e80d Remove deprecated stage_focus column and update database path for consistency; add schema verification script and document schema mismatch fixes 2025-10-07 11:31:16 +01:00
bolade cd7172ed9f Add test script for manual JSON parser with LLM currency conversion
- Implemented a new test script `test_parser.py` to validate the functionality of the manual JSON parser.
- The script loads investor data from a CSV file and processes a sample of three investors.
- Results include detailed information about each investor, their funds, team members, and investment thesis.
- Added error handling for missing API key in the environment variables.
2025-10-06 14:07:28 +01:00
bolade c199f5423a Refactor code structure for improved readability and maintainability 2025-10-06 12:57:08 +01:00
bolade a2b3ceedbe Added funds table 2025-10-05 19:16:03 +01:00
bolade 3842171549 Update .gitignore to exclude preprocessor directory; refactor find_similar_investors function to improve similarity scoring based on investor characteristics and add limit parameter for results. 2025-10-01 23:29:29 +01:00
bolade 17bc5acbc8 Refactor investor similarity search to utilize AI for improved query generation; adjust DataFrame parsing to skip initial rows for better data handling. 2025-09-29 15:58:09 +01:00
bolade 6caea96658 Update server host and port configuration for deployment 2025-09-27 11:16:18 +01:00
bolade 6d902345c0 Refactor investor and company schemas to allow optional fields; update filtering logic in read_companies function and add find_similar_investors endpoint; change LLM model in InvestorProcessor and QueryProcessor for improved performance. 2025-09-27 10:45:08 +01:00
bolade d36367fbe9 Add project management functionality with CRUD operations and associations; introduce project schemas and update main application routing. 2025-09-27 08:53:59 +01:00
bolade abac19c6ae Update .gitignore to exclude __pycache__ directories and modify schemas to allow optional fields for better flexibility; adjust batch size in InvestorProcessor for improved processing efficiency. 2025-09-26 15:56:29 +01:00
bolade f2bbcb96f3 Refactor database models and schemas to allow nullable fields; update init_database function for improved initialization. 2025-09-26 15:24:42 +01:00
bolade 0f7beca5e1 made version 2 2025-09-25 17:00:38 +01:00
58 changed files with 28006 additions and 2367 deletions
+4 -3
View File
@@ -8,8 +8,9 @@
/chroma_db /chroma_db
/*__pycache__*/ *__pycache__
*.cypython
/*.db
/*.cypython-*
-577
View File
@@ -1,577 +0,0 @@
# LLM-Powered Investor & Company Management API
A comprehensive FastAPI-based system for managing investor and company data with LLM-powered CSV parsing, semantic search, and advanced filtering capabilities.
## Features
- **FastAPI REST API**: Modern, auto-documented API with OpenAPI/Swagger support
- **CSV Data Processing**: Parse complex investor data from CSV files using LLM assistance
- **Dual Database Storage**: Structured data in SQL database and semantic search via ChromaDB
- **Natural Language Queries**: AI-powered query processing for complex investor searches
- **Advanced Filtering**: Filter investors and companies by multiple criteria
- **Relationship Management**: Many-to-many relationships between investors, companies, and sectors
- **Auto-Generated Documentation**: Interactive API docs at `/docs`
## Architecture
### Components
1. **FastAPI Application (`app/main.py`)**: Main API server with route configuration
2. **Database Models (`app/db/models.py`)**: SQLAlchemy models for investors, companies, sectors
3. **Pydantic Schemas (`app/py_schemas.py`)**: Request/response validation and serialization
4. **API Routes**:
- `app/api/investors.py`: Investor CRUD operations and filtering
- `app/api/companies.py`: Company CRUD operations and filtering
5. **Services**:
- `app/services/openrouter.py`: LLM-powered CSV processing
- `app/services/querying.py`: Natural language query processing
6. **Database (`app/db/`)**: Database connection, models, and schemas
### Data Flow
```
CSV Upload → LLM Processing → Data Extraction → SQL Storage → Vector Storage → API Endpoints
Natural Language Query → AI Analysis → Database Filtering → Structured Response
```
## Installation
### Prerequisites
- Python 3.12+
- FastAPI and dependencies
### Setup
1. Clone the repository and navigate to the project directory:
```bash
cd /path/to/anton_wireframe
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Configure environment variables:
```bash
cp .env.example .env
# Edit .env and add your OpenRouter API key for LLM features
```
4. Initialize the database:
```bash
cd app
python -c "from db.db import init_database; init_database()"
```
5. Start the API server:
```bash
cd app
uvicorn main:app --reload --host localhost --port 8000
```
The API will be available at:
- **API Base**: http://localhost:8000
- **Interactive Docs**: http://localhost:8000/docs
- **ReDoc**: http://localhost:8000/redoc
## Database Schema
### SQL Database (SQLite)
#### Investors Table
- **Basic Info**: name, description, geographic_focus
- **Investment Data**: aum, check_size_lower, check_size_upper
- **Stage Focus**: investment stage (SEED, SERIES_A, etc.)
- **Relationships**: Many-to-many with companies and sectors
- **Team**: One-to-many with team members
- **Metadata**: created_at, updated_at timestamps
#### Companies Table
- **Basic Info**: name, industry, location
- **Details**: founded_year, website
- **Relationships**: Many-to-many with investors
- **Metadata**: created_at, updated_at timestamps
#### Association Tables
- **investor_companies**: Links investors to their portfolio companies
- **investor_sectors**: Links investors to their focus sectors
- **investor_team**: Team member details for each investor
#### Supporting Tables
- **sectors**: Investment focus areas (fintech, healthcare, etc.)
### Vector Database (ChromaDB)
Stores embeddings for semantic search of:
- Investor descriptions
- Investment thesis focus areas
- Combined investor profiles
## API Usage
### Interactive Documentation
Visit http://localhost:8000/docs for the auto-generated Swagger UI where you can:
- Explore all endpoints
- Test API calls directly
- View request/response schemas
- See example requests
### Core Endpoints
#### Investor Management
```bash
# Get all investors with relationships
GET /investors
# Filter investors by criteria
GET /investors/filter?stage=GROWTH&geography=US&sector=fintech&min_check_size=1000000
# Get specific investor
GET /investors/{investor_id}
# Create new investor
POST /investors
{
"name": "Example VC",
"description": "Early stage fintech investor",
"aum": 50000000,
"check_size_lower": 100000,
"check_size_upper": 2000000,
"geographic_focus": "US",
"stage_focus": "SEED",
"number_of_investments": 25
}
# Update investor
PUT /investors/{investor_id}
# Delete investor
DELETE /investors/{investor_id}
```
#### Company Management
```bash
# Get all companies with investor relationships
GET /companies
# Filter companies by criteria
GET /companies/filter?industry=fintech&location=San Francisco&founded_after=2015
# Get specific company
GET /companies/{company_id}
# Create new company
POST /companies
{
"name": "Example Startup",
"industry": "fintech",
"location": "San Francisco",
"founded_year": 2020,
"website": "https://example.com"
}
# Update company
PUT /companies/{company_id}
# Delete company
DELETE /companies/{company_id}
```
#### CSV Processing
```bash
# Upload and process CSV file
POST /parse-csv
Content-Type: multipart/form-data
File: investors.csv
```
#### Natural Language Queries
```bash
# Query investors using natural language
POST /query
{
"question": "Show me growth stage fintech investors in Silicon Valley with check sizes over $1 million"
}
```
### Advanced Filtering Examples
#### Investor Filters
```bash
# Early stage investors in Europe
GET /investors/filter?stage=SEED&geography=Europe
# High AUM growth investors
GET /investors/filter?stage=GROWTH&min_aum=100000000
# Healthcare investors with large checks
GET /investors/filter?sector=healthcare&min_check_size=5000000
# Specific geographic focus
GET /investors/filter?geography=Silicon Valley
```
#### Company Filters
```bash
# Recent fintech companies
GET /companies/filter?industry=fintech&founded_after=2020
# Companies with websites
GET /companies/filter?has_website=true
# Companies backed by specific investor
GET /companies/filter?investor_name=Sequoia
# Location-based filtering
GET /companies/filter?location=New York
```
### Response Format
All endpoints return structured JSON with full relationship data:
```json
{
"investor": {
"id": 1,
"name": "Example VC",
"description": "Early stage investor",
"aum": 50000000,
"check_size_lower": 100000,
"check_size_upper": 2000000,
"geographic_focus": "US",
"stage_focus": "SEED",
"number_of_investments": 25
},
"portfolio_companies": [
{
"id": 1,
"name": "StartupCo",
"industry": "fintech",
"location": "San Francisco"
}
],
"team_members": [
{
"id": 1,
"name": "John Partner",
"role": "Managing Partner",
"email": "john@examplevc.com"
}
],
"sectors": [
{
"id": 1,
"name": "fintech"
}
]
}
```
## Data Processing Pipeline
### 1. CSV Parsing
- Reads CSV with pandas
- Handles nested JSON fields in columns
- Validates data with Pydantic models
### 2. JSON Field Processing
- Direct parsing for well-formed JSON
- LLM-assisted cleaning for malformed JSON (when enabled)
- Graceful fallback to empty objects
### 3. Data Extraction
Extracts key fields:
- Company name and website
- Investor description
- Investment thesis/focus areas
- Headquarters location
- Assets Under Management (AUM)
- Fund information
### 4. LLM Enhancement (Optional)
When `--use-llm` is enabled:
- Standardizes investor descriptions
- Normalizes investment focus areas
- Cleans headquarters location format
- Repairs malformed JSON data
### 5. Dual Storage
- **SQL Database**: Structured, queryable data
- **Vector Database**: Semantic search capabilities
## Configuration
### Environment Variables (.env)
```bash
# OpenRouter API Configuration (required for LLM features)
OPENROUTER_API_KEY=your_openrouter_api_key_here
# Database Configuration (optional, defaults to SQLite)
DATABASE_URL=sqlite:///investors.db
# FastAPI Configuration
API_HOST=localhost
API_PORT=8000
```
### LLM Configuration
- **Provider**: OpenRouter (supports multiple models)
- **Default Model**: google/gemini-2.5-flash-lite
- **Temperature**: 0.3 for enhancement, 0 for structured data
- **Fallback**: Graceful degradation when API unavailable
## Natural Language Query Processing
The system supports intelligent natural language queries that automatically extract filters and search criteria:
### Query Examples
```bash
# Stage-based queries
"Show me seed stage investors"
"Find growth stage VCs"
# Geographic queries
"Investors in Silicon Valley"
"European venture capital firms"
# Sector-specific queries
"Fintech investors"
"Healthcare and biotech VCs"
# Size-based queries
"Investors with $5M+ check sizes"
"High AUM growth investors"
# Combined queries
"Growth stage fintech investors in the US with check sizes over $1 million"
"European healthcare investors focusing on early stage"
```
### Query Processing Features
- **Automatic Filter Extraction**: Detects investment stages, geographies, sectors, and check sizes
- **Semantic Understanding**: Uses AI to interpret complex queries
- **Database Integration**: Combines AI analysis with efficient SQL filtering
- **Complete Relationships**: Returns full investor data with portfolio companies, team members, and sectors
### Query Response
The `/query` endpoint returns a structured `InvestorList` with complete relationship data, making it easy to get comprehensive information about matching investors.
## Error Handling
### API Error Responses
The API provides clear HTTP status codes and error messages:
```json
// 404 Not Found
{
"detail": "Investor not found"
}
// 422 Validation Error
{
"detail": [
{
"loc": ["body", "stage_focus"],
"msg": "value is not a valid enumeration member",
"type": "type_error.enum"
}
]
}
```
### Robust Processing
- **Data Validation**: Pydantic models ensure data integrity
- **Relationship Management**: Automatic handling of foreign key constraints
- **LLM Fallbacks**: Graceful degradation when AI services unavailable
- **Transaction Safety**: Database rollbacks on errors
- **Comprehensive Logging**: Detailed error tracking and debugging
### Common Issues and Solutions
1. **Invalid Enum Values**
- Solution: Use uppercase enum values (SEED, GROWTH, etc.)
- Check: Investment stages must match defined enum
2. **Missing OpenRouter API Key**
- Solution: Set OPENROUTER_API_KEY in environment
- Fallback: CSV processing continues without LLM enhancement
3. **Database Connection Issues**
- Solution: Verify DATABASE_URL configuration
- Default: Uses SQLite (no external dependencies)
4. **Relationship Errors**
- Solution: Ensure proper foreign key relationships
- Check: Use existing sector/company IDs or create new ones
## Performance
### Benchmarks (Approximate)
- **API Response Time**: <200ms for standard queries
- **Database Queries**: <50ms for filtered searches with relationships
- **CSV Processing**: ~5-15 seconds per row (depends on LLM API latency)
- **Natural Language Queries**: ~2-5 seconds (AI processing + database query)
- **Vector Search**: <100ms for semantic similarity queries
### Optimization Features
1. **Eager Loading**: Efficient relationship loading with `selectinload()`
2. **Query Optimization**: Smart filtering to reduce database load
3. **Caching**: Database connection pooling and session management
4. **Pagination**: Built-in limits to prevent overwhelming responses
5. **Async Processing**: FastAPI async capabilities for better performance
### Production Recommendations
1. **Database**: Consider PostgreSQL for production workloads
2. **Caching**: Add Redis for frequently accessed data
3. **Load Balancing**: Deploy multiple API instances behind a load balancer
4. **Monitoring**: Implement logging and metrics collection
5. **Rate Limiting**: Add API rate limiting for public endpoints
## File Structure
```
anton_wireframe/
├── app/
│ ├── main.py # FastAPI application and main endpoints
│ ├── py_schemas.py # Pydantic models for validation
│ ├── settings.py # Configuration management
│ ├── api/
│ │ ├── __init__.py
│ │ ├── investors.py # Investor CRUD and filtering endpoints
│ │ └── companies.py # Company CRUD and filtering endpoints
│ ├── db/
│ │ ├── __init__.py
│ │ ├── db.py # Database connection and session management
│ │ ├── models.py # SQLAlchemy database models
│ │ └── new_schema.py # Additional schema definitions
│ └── services/
│ ├── __init__.py
│ ├── openrouter.py # LLM-powered CSV processing
│ ├── querying.py # Natural language query processing
│ └── langgraph_agent.py # AI agent configuration
├── chroma_db/ # Vector database directory
├── requirements.txt # Python dependencies
├── README.md # This documentation
└── .env # Environment configuration
```
## Example Usage Scenarios
### 1. Upload and Process Investor Data
```bash
# Upload CSV file via API
curl -X POST "http://localhost:8000/parse-csv" \
-H "Content-Type: multipart/form-data" \
-F "file=@investors.csv"
```
### 2. Find Specific Investors
```bash
# Natural language search
curl -X POST "http://localhost:8000/query" \
-H "Content-Type: application/json" \
-d '{"question": "Show me growth stage fintech investors in Silicon Valley with check sizes over $2 million"}'
# Structured filtering
curl "http://localhost:8000/investors/filter?stage=GROWTH&sector=fintech&geography=Silicon%20Valley&min_check_size=2000000"
```
### 3. Company Research
```bash
# Find companies in specific sector
curl "http://localhost:8000/companies/filter?industry=fintech&founded_after=2020"
# Find companies backed by specific investor
curl "http://localhost:8000/companies/filter?investor_name=Sequoia"
```
### 4. Investment Analysis
```bash
# Get investor with full portfolio
curl "http://localhost:8000/investors/1"
# Find all companies in a specific location
curl "http://localhost:8000/companies/filter?location=San%20Francisco"
```
## Development
### Running in Development Mode
```bash
cd app
uvicorn main:app --reload --host localhost --port 8000
```
### Testing the API
1. **Interactive Testing**: Visit http://localhost:8000/docs
2. **Manual Testing**: Use curl or Postman with the examples above
3. **Database Inspection**: Use SQLite browser to inspect `investors_2.db`
### Adding New Features
1. **New Endpoints**: Add routes to `api/investors.py` or `api/companies.py`
2. **New Models**: Update `db/models.py` and `py_schemas.py`
3. **New Filters**: Extend filtering logic in route handlers
4. **New LLM Features**: Modify `services/openrouter.py` or `services/querying.py`
## License
This project is part of the MKD Anton Wireframe system.
## Support
For issues and questions:
1. Check logs for detailed error messages
2. Verify environment configuration
3. Test with limited datasets first
4. Review CSV data format requirements
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
-233
View File
@@ -1,233 +0,0 @@
from typing import List, Optional
from db.db import get_db
from db.models import InvestorTable, SectorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from py_schemas import InvestmentStage, InvestorData
from pydantic import BaseModel
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Investor Routes"])
# Request schemas for creating/updating
class InvestorCreate(BaseModel):
name: str
description: str = None
aum: int
check_size_lower: int
check_size_upper: int
geographic_focus: str
stage_focus: InvestmentStage
number_of_investments: int = 0
class InvestorUpdate(BaseModel):
name: str = None
description: str = None
aum: int = None
check_size_lower: int = None
check_size_upper: int = None
geographic_focus: str = None
stage_focus: InvestmentStage = None
number_of_investments: int = None
@router.get("/investors", response_model=List[InvestorData])
def read_investors(db: Session = Depends(get_db)):
"""Get all investors with their related data"""
investors = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.all()
)
# Transform InvestorTable objects to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor, # This maps to InvestorSchema
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
return investor_data_list
@router.get("/investors/filter", response_model=List[InvestorData])
def filter_investors(
stage: Optional[InvestmentStage] = Query(
None, description="Filter by investment stage"
),
min_check_size: Optional[int] = Query(None, description="Minimum check size"),
max_check_size: Optional[int] = Query(None, description="Maximum check size"),
geography: Optional[str] = Query(
None, description="Geographic focus (partial match)"
),
sector: Optional[str] = Query(None, description="Sector name (partial match)"),
min_aum: Optional[int] = Query(None, description="Minimum AUM"),
max_aum: Optional[int] = Query(None, description="Maximum AUM"),
db: Session = Depends(get_db),
):
"""Filter investors based on various criteria"""
# Start with base query
query = db.query(InvestorTable).options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
# Apply filters
if stage:
query = query.filter(InvestorTable.stage_focus == stage)
if min_check_size is not None:
query = query.filter(InvestorTable.check_size_lower >= min_check_size)
if max_check_size is not None:
query = query.filter(InvestorTable.check_size_upper <= max_check_size)
if geography:
query = query.filter(InvestorTable.geographic_focus.ilike(f"%{geography}%"))
if min_aum is not None:
query = query.filter(InvestorTable.aum >= min_aum)
if max_aum is not None:
query = query.filter(InvestorTable.aum <= max_aum)
# Filter by sector if provided
if sector:
query = query.join(InvestorTable.sectors).filter(
SectorTable.name.ilike(f"%{sector}%")
)
investors = query.all()
# Transform to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
return investor_data_list
@router.get("/investors/{investor_id}", response_model=InvestorData)
def read_investor(investor_id: int, db: Session = Depends(get_db)):
"""Get a specific investor by ID"""
investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
)
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Transform to InvestorData format
return InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
@router.post("/investors", response_model=InvestorData)
def create_investor(investor: InvestorCreate, db: Session = Depends(get_db)):
"""Create a new investor"""
db_investor = InvestorTable(**investor.dict())
db.add(db_investor)
db.commit()
db.refresh(db_investor)
# Reload with relationships
investor_with_relations = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == db_investor.id)
.first()
)
# Transform to InvestorData format
return InvestorData(
investor=investor_with_relations,
portfolio_companies=investor_with_relations.portfolio_companies,
team_members=investor_with_relations.team_members,
sectors=investor_with_relations.sectors,
)
@router.put("/investors/{investor_id}", response_model=InvestorData)
def update_investor(
investor_id: int, investor: InvestorUpdate, db: Session = Depends(get_db)
):
"""Update an existing investor"""
db_investor = (
db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
)
if not db_investor:
raise HTTPException(status_code=404, detail="Investor not found")
update_data = investor.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_investor, field, value)
db.commit()
db.refresh(db_investor)
# Reload with relationships
investor_with_relations = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
)
# Transform to InvestorData format
return InvestorData(
investor=investor_with_relations,
portfolio_companies=investor_with_relations.portfolio_companies,
team_members=investor_with_relations.team_members,
sectors=investor_with_relations.sectors,
)
@router.delete("/investors/{investor_id}")
def delete_investor(investor_id: int, db: Session = Depends(get_db)):
"""Delete an investor"""
db_investor = (
db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
)
if not db_investor:
raise HTTPException(status_code=404, detail="Investor not found")
db.delete(db_investor)
db.commit()
return {"message": "Investor deleted successfully"}
-46
View File
@@ -1,46 +0,0 @@
from sqlalchemy.orm import Session
from db.models import InvestorTable
from db.db import get_db
def update_stage_focus_values():
"""Update existing stage_focus values from lowercase to uppercase"""
db = next(get_db())
try:
# Mapping of old lowercase values to new uppercase values
stage_mappings = {
'seed': 'SEED',
'series_a': 'SERIES_A',
'series_b': 'SERIES_B',
'series_c': 'SERIES_C',
'growth': 'GROWTH',
'late_stage': 'LATE_STAGE'
}
updated_count = 0
for old_value, new_value in stage_mappings.items():
# Update records with the old value
result = db.query(InvestorTable).filter(
InvestorTable.stage_focus == old_value
).update(
{InvestorTable.stage_focus: new_value},
synchronize_session=False
)
updated_count += result
print(f"Updated {result} records from '{old_value}' to '{new_value}'")
db.commit()
print(f"Successfully updated {updated_count} total records")
except Exception as e:
db.rollback()
print(f"Error updating stage_focus values: {e}")
raise
finally:
db.close()
# Run the update
if __name__ == "__main__":
update_stage_focus_values()
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+11 -2
View File
@@ -1,4 +1,5 @@
import os import os
from pathlib import Path
from typing import Annotated from typing import Annotated
from fastapi import Depends from fastapi import Depends
@@ -9,7 +10,11 @@ from sqlalchemy.orm import Session, sessionmaker
Base = declarative_base() Base = declarative_base()
# Database configuration # Database configuration
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///investors.db") # Use the preprocessor's database for consistency
# Get absolute path to the preprocessor database
# APP_DIR = Path(__file__).parent.parent
# PREPROCESSOR_DB = APP_DIR.parent / "preprocessor" / "version_two.db"
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./investors.db")
# Create engine # Create engine
engine = create_engine(DATABASE_URL, echo=False) engine = create_engine(DATABASE_URL, echo=False)
@@ -32,9 +37,13 @@ db_dependency = Annotated[Session, Depends(get_db)]
def init_database(): def init_database():
"""Initialize the database by creating all tables""" """Initialize the database by creating all tables"""
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
print("Database initialized successfully!")
def get_session_sync() -> Session: def get_session_sync() -> Session:
"""Get a database session for synchronous operations""" """Get a database session for synchronous operations"""
return SessionLocal() return SessionLocal()
def get_db_session():
"""Get a database session for direct use."""
return SessionLocal()
+231 -34
View File
@@ -1,13 +1,20 @@
import datetime
import enum import enum
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Table, Text from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Table, Text, func
from sqlalchemy.orm import relationship from sqlalchemy.orm import declarative_mixin, relationship
from sqlalchemy.types import Enum from sqlalchemy.types import JSON, Enum
from db.db import Base from db.db import Base
@declarative_mixin
class TimestampMixin:
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class InvestmentStage(enum.Enum): class InvestmentStage(enum.Enum):
SEED = "SEED" SEED = "SEED"
SERIES_A = "SERIES_A" SERIES_A = "SERIES_A"
@@ -16,6 +23,7 @@ class InvestmentStage(enum.Enum):
GROWTH = "GROWTH" GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE" LATE_STAGE = "LATE_STAGE"
# Association table for many-to-many relationship between investors and companies # Association table for many-to-many relationship between investors and companies
investor_company_association = Table( investor_company_association = Table(
"investor_companies", "investor_companies",
@@ -34,23 +42,97 @@ investor_sector_association = Table(
) )
class InvestorTable(Base): company_sector_association = Table(
"company_sector",
Base.metadata,
Column("company_id", Integer, ForeignKey("companies.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_sector_association = Table(
"project_sector",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_investor_association = Table(
"project_investors",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("investor_id", Integer, ForeignKey("investors.id")),
)
project_company_association = Table(
"project_companies",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("company_id", Integer, ForeignKey("companies.id")),
)
# Association table for fund-stage many-to-many
fund_investment_stages_association = Table(
"fund_investment_stages",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("stage_id", Integer, ForeignKey("investment_stages.id")),
)
# Association table for fund-sector many-to-many
fund_sectors_association = Table(
"fund_sectors",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
class InvestorTable(Base, TimestampMixin):
__tablename__ = "investors" __tablename__ = "investors"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False) name = Column(String, nullable=False)
description = Column(Text, nullable=True) description = Column(Text, nullable=True)
aum = Column(Integer, nullable=False) # Assets Under Management
check_size_lower = Column(Integer, nullable=False) # Lower bound # Basic investor info
check_size_upper = Column(Integer, nullable=False) # Upper bound website = Column(String, nullable=True)
geographic_focus = Column(String, nullable=False) headquarters = Column(String, nullable=True)
stage_focus = Column(Enum(InvestmentStage), nullable=False)
number_of_investments = Column(Integer, default=0) # AUM fields
created_at = Column(DateTime, default=datetime.datetime.now(datetime.UTC)) aum = Column(Integer, nullable=True) # Store as integer for numerical filtering
updated_at = Column( aum_as_of_date = Column(String, nullable=True)
DateTime, aum_source_url = Column(String, nullable=True)
default=datetime.datetime.now(datetime.UTC),
onupdate=datetime.datetime.now(datetime.UTC), # Check size (deprecated in favor of fund-level data, but keeping for backward compatibility)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
# Geographic focus (deprecated in favor of fund-level, but keeping for backward compatibility)
geographic_focus = Column(String, nullable=True)
# Investment thesis and portfolio
investment_thesis = Column(JSON, nullable=True) # Array of thesis statements
portfolio_highlights = Column(
JSON, nullable=True
) # Array of portfolio company names
linked_documents = Column(JSON, nullable=True) # Array of document URLs
# Research metadata
researcher_notes = Column(Text, nullable=True)
missing_important_fields = Column(
JSON, nullable=True
) # Array of missing field names
sources = Column(JSON, nullable=True) # JSON object with source URLs
# Portfolio info
number_of_investments = Column(Integer, default=0, nullable=True)
# Relationships
team_members = relationship(
"InvestorMember", back_populates="investor", cascade="all, delete-orphan"
)
funds = relationship(
"FundTable", back_populates="investor", cascade="all, delete-orphan"
) )
# Relationship to portfolio companies # Relationship to portfolio companies
@@ -59,30 +141,84 @@ class InvestorTable(Base):
secondary=investor_company_association, secondary=investor_company_association,
back_populates="investors", back_populates="investors",
) )
team_members = relationship("InvestorTeamMember", back_populates="investor")
sectors = relationship( sectors = relationship(
"SectorTable", "SectorTable",
secondary=investor_sector_association, secondary=investor_sector_association,
back_populates="investors", back_populates="investors",
) )
projects = relationship(
"ProjectTable",
secondary=project_investor_association,
back_populates="investors",
)
class CompanyTable(Base):
class InvestorMember(Base, TimestampMixin):
__tablename__ = "investor_members"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
role = Column(String, nullable=True)
title = Column(String, nullable=True) # Alternative to role
email = Column(String, nullable=True)
source_url = Column(String, nullable=True) # URL where member info was found
investor_id = Column(Integer, ForeignKey("investors.id"))
investor = relationship("InvestorTable", back_populates="team_members")
class FundTable(Base, TimestampMixin):
__tablename__ = "funds"
id = Column(Integer, primary_key=True, index=True)
investor_id = Column(Integer, ForeignKey("investors.id"), nullable=False)
# Fund details
fund_name = Column(String, nullable=True)
fund_size = Column(
Integer, nullable=True
) # Store as integer for numerical filtering
fund_size_source_url = Column(String, nullable=True)
# Check size range (parsed from estimated_investment_size by LLM)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
source_url = Column(String, nullable=True)
source_provider = Column(String, nullable=True) # e.g., "Perplexity"
# Geographic focus as simple string
geographic_focus = Column(String, nullable=True)
# Relationships
investor = relationship("InvestorTable", back_populates="funds")
investment_stages = relationship(
"InvestmentStageTable",
secondary=fund_investment_stages_association,
back_populates="funds",
)
sectors = relationship(
"SectorTable",
secondary=fund_sectors_association,
back_populates="funds",
)
class CompanyTable(Base, TimestampMixin):
__tablename__ = "companies" __tablename__ = "companies"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False) name = Column(String, nullable=False)
industry = Column(String, nullable=False) industry = Column(String, nullable=True)
location = Column(String, nullable=False) location = Column(String, nullable=True)
description = Column(String, nullable=True)
founded_year = Column(Integer, nullable=True) founded_year = Column(Integer, nullable=True)
website = Column(String, nullable=True) website = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.datetime.now(datetime.UTC))
updated_at = Column(
DateTime,
default=datetime.datetime.now(datetime.UTC),
onupdate=datetime.datetime.now(datetime.UTC),
)
members = relationship(
"CompanyMember", back_populates="company", cascade="all, delete-orphan"
)
# Relationship back to investors # Relationship back to investors
investors = relationship( investors = relationship(
"InvestorTable", "InvestorTable",
@@ -90,27 +226,88 @@ class CompanyTable(Base):
back_populates="portfolio_companies", back_populates="portfolio_companies",
) )
sectors = relationship(
"SectorTable", secondary=company_sector_association, back_populates="companies"
)
class SectorTable(Base): projects = relationship(
"ProjectTable",
secondary=project_company_association,
back_populates="companies",
)
class CompanyMember(Base, TimestampMixin):
__tablename__ = "company_members"
id = Column(Integer, primary_key=True)
name = Column(String)
linkedin = Column(String, nullable=True)
role = Column(String, nullable=True)
company_id = Column(Integer, ForeignKey("companies.id"), nullable=False)
company = relationship("CompanyTable", back_populates="members")
class InvestmentStageTable(Base, TimestampMixin):
__tablename__ = "investment_stages"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False, unique=True)
# Relationships
funds = relationship(
"FundTable",
secondary=fund_investment_stages_association,
back_populates="investment_stages",
)
class SectorTable(Base, TimestampMixin):
__tablename__ = "sectors" __tablename__ = "sectors"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False) name = Column(String, nullable=False)
# Add relationship back to investors # Relationships
investors = relationship( investors = relationship(
"InvestorTable", "InvestorTable",
secondary=investor_sector_association, secondary=investor_sector_association,
back_populates="sectors", back_populates="sectors",
) )
companies = relationship(
"CompanyTable", secondary=company_sector_association, back_populates="sectors"
)
projects = relationship(
"ProjectTable", secondary=project_sector_association, back_populates="sector"
)
funds = relationship(
"FundTable",
secondary=fund_sectors_association,
back_populates="sectors",
)
class InvestorTeamMember(Base): class ProjectTable(Base, TimestampMixin):
__tablename__ = "investor_team" __tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False) name = Column(String, nullable=False)
role = Column(String, nullable=False) valuation = Column(Integer, nullable=True)
email = Column(String, nullable=False)
investor_id = Column(Integer, ForeignKey("investors.id")) stage = Column(Enum(InvestmentStage), nullable=True)
investor = relationship("InvestorTable", back_populates="team_members") location = Column(String, nullable=True)
description = Column(Text, nullable=True)
start_date = Column(DateTime, nullable=True)
end_date = Column(DateTime, nullable=True)
sector = relationship(
"SectorTable", secondary=project_sector_association, back_populates="projects"
)
investors = relationship(
"InvestorTable",
secondary=project_investor_association,
back_populates="projects",
)
companies = relationship(
"CompanyTable", secondary=project_company_association, back_populates="projects"
)
-115
View File
@@ -1,115 +0,0 @@
import json
from typing import List, Optional
from pydantic import BaseModel
from sqlalchemy import JSON, Column, DateTime, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
Base = declarative_base()
class Investor(Base):
__tablename__ = "investors"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(500), nullable=False)
website = Column(String(1000))
# Core investment information
investor_description = Column(Text)
investment_thesis_focus = Column(JSON) # List of focus areas
headquarters = Column(String(1000))
# AUM information
aum_amount = Column(String(200))
aum_as_of_date = Column(String(100))
aum_source_url = Column(String(1000))
# Fund information
funds_info = Column(JSON) # Complex fund data
# Raw data columns for reference
crunchbase_urls = Column(Text)
crunchbase_extract = Column(Text)
linkedin_profile = Column(Text)
source_truth_profile = Column(Text)
# Metadata
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def __repr__(self):
return f"<Investor(name='{self.name}', website='{self.website}')>"
# Pydantic models for data validation and parsing
class AUMInfo(BaseModel):
aumAmount: Optional[str] = None
asOfDate: Optional[str] = None
sourceUrl: Optional[str] = None
class FundInfo(BaseModel):
fundName: Optional[str] = None
fundSize: Optional[str] = None
vintage: Optional[str] = None
status: Optional[str] = None
description: Optional[str] = None
class InvestorProfile(BaseModel):
websiteURL: Optional[str] = None
investorDescription: Optional[str] = None
investmentThesisFocus: Optional[List[str]] = None
headquarters: Optional[str] = None
overallAssetsUnderManagement: Optional[AUMInfo] = None
funds: Optional[List[FundInfo]] = None
class CSVRow(BaseModel):
name: str
website: Optional[str] = None
investment_firm_profile: Optional[str] = None
crunchbase_linkedin_urls: Optional[str] = None
crunchbase_firm_extract: Optional[str] = None
linkedin_investment_profile: Optional[str] = None
source_of_truth_profile: Optional[str] = None
def get_combined_description(self) -> str:
"""Combine all description fields for vector embedding"""
descriptions = []
if self.investment_firm_profile:
try:
profile_data = json.loads(self.investment_firm_profile)
if isinstance(profile_data, dict):
desc = profile_data.get("investorDescription", "")
if desc:
descriptions.append(desc)
except (json.JSONDecodeError, TypeError):
pass
if self.crunchbase_firm_extract:
descriptions.append(self.crunchbase_firm_extract)
if self.linkedin_investment_profile:
descriptions.append(self.linkedin_investment_profile)
if self.source_of_truth_profile:
descriptions.append(self.source_of_truth_profile)
return " ".join(descriptions)
def get_investment_focus(self) -> List[str]:
"""Extract investment thesis focus"""
if self.investment_firm_profile:
try:
profile_data = json.loads(self.investment_firm_profile)
if isinstance(profile_data, dict):
focus = profile_data.get("investmentThesisFocus", [])
if isinstance(focus, list):
return focus
except (json.JSONDecodeError, TypeError):
pass
return []
+63 -16
View File
@@ -1,17 +1,27 @@
import io import io
import pandas as pd import pandas as pd
from api import companies, investors from db.db import Base, db_dependency, engine
from db.db import db_dependency, init_database from dotenv import load_dotenv
from fastapi import FastAPI, File, UploadFile from fastapi import FastAPI, File, Form, UploadFile
from py_schemas import InvestorList
from pydantic import BaseModel from pydantic import BaseModel
from services.openrouter_v2 import InvestorProcessor from routers import companies, investors, projects
from schemas.router_schemas import InvestmentResponse, PaginatedResponse
from services.llm_parser import InvestorProcessor
from services.querying import QueryProcessor from services.querying import QueryProcessor
app = FastAPI() load_dotenv()
def init_database():
"""Initialize the database by creating all tables"""
Base.metadata.create_all(bind=engine)
init_database() init_database()
app = FastAPI()
# Request models # Request models
class QueryRequest(BaseModel): class QueryRequest(BaseModel):
@@ -20,7 +30,7 @@ class QueryRequest(BaseModel):
class Config: class Config:
json_schema_extra = { json_schema_extra = {
"example": { "example": {
"question": "Show me growth stage fintech investors in the US with check sizes over $1 million" "question": "Find me deep tech investors that do deals in Europe under 5 million."
} }
} }
@@ -31,38 +41,75 @@ def health():
@app.post("/parse-csv", tags=["CSV Upload"], response_model=list[dict]) @app.post("/parse-csv", tags=["CSV Upload"], response_model=list[dict])
async def parse_csv(db: db_dependency, file: UploadFile = File(...)): async def parse_csv(
db: db_dependency, file: UploadFile = File(...), is_investor: int = Form(...)
):
"""
Parse and import CSV data into the database.
**For investors:**
- Expected columns: Name, Website, Final Investor Profile, Final Profile sourcing
- Manually parses JSON profiles for efficiency
- Uses LLM only for currency conversion to USD
- Handles AUM, fund sizes, and check sizes as integers
**For companies:**
- Expected columns: Name, Website, Investor, Final Investor Profile (company profile)
- 100% manual JSON parsing - no LLM needed
- Extracts company details, executives, investors, and client categories
- Automatically links companies to investors in database
**Benefits:**
- Fast processing (5-10s per record)
- Low cost (minimal or no LLM usage)
- Accurate data extraction
- Automatic database persistence
"""
# Read uploaded CSV with pandas # Read uploaded CSV with pandas
content = await file.read() content = await file.read()
df = pd.read_csv(io.StringIO(content.decode("utf-8"))) df = pd.read_csv(io.StringIO(content.decode("utf-8")))
# Process the dataframe # Process the dataframe
processor = InvestorProcessor(sql_session=db) processor = InvestorProcessor()
results = await processor.process_csv(df)
# Convert Pydantic objects to dictionaries if is_investor == 1:
return [r.model_dump() for r in results] # Manual parser with LLM currency conversion
results = await processor.parse_investors(df, save_to_db=True)
# Results are already dicts from the new parser
return results
else:
# Manual parser for companies (no LLM needed)
results = await processor.parse_companies(df, save_to_db=True)
# Results are already dicts from the new parser
return results
@app.post("/query", response_model=InvestorList, tags=["Querying"]) @app.post(
async def query_investors(db: db_dependency, request: QueryRequest): "/query", response_model=PaginatedResponse[InvestmentResponse], tags=["Querying"]
)
async def query_investors(request: QueryRequest):
""" """
Query investors using natural language. Query investors using natural language.
Returns fund-level matches (one row per fund) with investor details.
This ensures only relevant funds are included in the response.
Supports queries like: Supports queries like:
- "Show me seed stage investors" - "Show me seed stage investors"
- "Find fintech investors in Silicon Valley" - "Find fintech investors in Silicon Valley"
- "Growth stage investors with $5M+ check sizes" - "Growth stage investors with $5M+ check sizes"
- "Healthcare investors in Europe" - "Healthcare investors in Europe"
""" """
processor = QueryProcessor(sql_session=db) processor = QueryProcessor()
results = processor.process_query(request.question) results = processor.process_query(request.question)
return results return results
app.include_router(investors.router) app.include_router(investors.router)
app.include_router(companies.router) app.include_router(companies.router)
app.include_router(projects.router)
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run(app="main:app", host="localhost", port=8000, reload=True) uvicorn.run(app="main:app", host="0.0.0.0", port=8585, reload=True)
-79
View File
@@ -1,79 +0,0 @@
from datetime import datetime
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel
class InvestmentStage(str, Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
class SectorSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
class CompanySchema(BaseModel):
id: int
name: str
industry: str
location: str
founded_year: Optional[int]
website: Optional[str]
created_at: Optional[datetime]
updated_at: Optional[datetime]
class Config:
from_attributes = True
class InvestorTeamMemberSchema(BaseModel):
id: int
name: str
role: str
email: str
class Config:
from_attributes = True
class InvestorSchema(BaseModel):
id: int
name: str
description: Optional[str]
aum: int
check_size_lower: int
check_size_upper: int
geographic_focus: str
stage_focus: InvestmentStage
number_of_investments: int
created_at: Optional[datetime]
updated_at: Optional[datetime]
class Config:
from_attributes = True
class InvestorData(BaseModel):
"""Comprehensive investor data schema for LLM processing"""
investor: InvestorSchema
portfolio_companies: List[CompanySchema] = []
team_members: List[InvestorTeamMemberSchema] = []
sectors: List[SectorSchema] = []
class Config:
from_attributes = True
class InvestorList(BaseModel):
investors: List[InvestorData]
-38
View File
@@ -1,38 +0,0 @@
from typing import List
from pydantic import BaseModel
class Investor(BaseModel):
name: str
aum: int
check_size: str
sector_focus: str
stage_focus: str
region: str
investment_thesis: str
investor_description: str
class InvestorList(BaseModel):
investor_list: List[Investor]
class QueryResponse(BaseModel):
name: str
aum: int
check_size: str
sector_focus: str
stage_focus: str
region: str
investment_thesis: str
investor_description: str
reason: str
class QueryRequest(BaseModel):
question: str
class QueryResponseList(BaseModel):
responses: List[QueryResponse]
Binary file not shown.
Binary file not shown.
Binary file not shown.
+106 -43
View File
@@ -1,10 +1,10 @@
from typing import List, Optional from typing import Optional
from db.db import get_db from db.db import get_db
from db.models import CompanyTable, InvestorTable from db.models import CompanyTable, InvestorTable
from fastapi import APIRouter, Depends, HTTPException, Query from fastapi import APIRouter, Depends, HTTPException, Query
from py_schemas import CompanySchema
from pydantic import BaseModel from pydantic import BaseModel
from schemas.router_schemas import CompanyData, PaginatedResponse
from sqlalchemy.orm import Session, selectinload from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Company Routes"]) router = APIRouter(tags=["Company Routes"])
@@ -15,6 +15,7 @@ class CompanyCreate(BaseModel):
name: str name: str
industry: str industry: str
location: str location: str
description: Optional[str] = None
founded_year: Optional[int] = None founded_year: Optional[int] = None
website: Optional[str] = None website: Optional[str] = None
@@ -23,52 +24,66 @@ class CompanyUpdate(BaseModel):
name: Optional[str] = None name: Optional[str] = None
industry: Optional[str] = None industry: Optional[str] = None
location: Optional[str] = None location: Optional[str] = None
description: Optional[str] = None
founded_year: Optional[int] = None founded_year: Optional[int] = None
website: Optional[str] = None website: Optional[str] = None
# Response schema with relationships @router.get("/companies", response_model=PaginatedResponse[CompanyData])
class CompanyData(BaseModel): def read_companies(
"""Comprehensive company data schema""" page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Get all companies with their investor relationships (paginated)"""
# Calculate offset
offset = (page - 1) * page_size
company: CompanySchema # Get total count
investors: List["InvestorBasic"] = [] total_count = (
db.query(CompanyTable)
.filter(CompanyTable.name.isnot(None), CompanyTable.description.isnot(None))
.count()
)
class Config: # Get paginated results
from_attributes = True
class InvestorBasic(BaseModel):
"""Basic investor info for company responses"""
id: int
name: str
geographic_focus: str
stage_focus: str
check_size_lower: int
check_size_upper: int
class Config:
from_attributes = True
@router.get("/companies", response_model=List[CompanyData])
def read_companies(db: Session = Depends(get_db)):
"""Get all companies with their investor relationships"""
companies = ( companies = (
db.query(CompanyTable).options(selectinload(CompanyTable.investors)).all() db.query(CompanyTable)
.filter(CompanyTable.name.isnot(None), CompanyTable.description.isnot(None))
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.offset(offset)
.limit(page_size)
.all()
) )
# Transform CompanyTable objects to CompanyData format # Transform CompanyTable objects to CompanyData format
company_data_list = [] company_data_list = []
for company in companies: for company in companies:
company_data = CompanyData(company=company, investors=company.investors) company_data = CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=company.sectors,
)
company_data_list.append(company_data) company_data_list.append(company_data)
return company_data_list # Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=company_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/companies/filter", response_model=List[CompanyData]) @router.get("/companies/filter", response_model=PaginatedResponse[CompanyData])
def filter_companies( def filter_companies(
industry: Optional[str] = Query( industry: Optional[str] = Query(
None, description="Filter by industry (partial match)" None, description="Filter by industry (partial match)"
@@ -84,12 +99,18 @@ def filter_companies(
investor_name: Optional[str] = Query( investor_name: Optional[str] = Query(
None, description="Filter by investor name (partial match)" None, description="Filter by investor name (partial match)"
), ),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db), db: Session = Depends(get_db),
): ):
"""Filter companies based on various criteria""" """Filter companies based on various criteria (paginated)"""
# Start with base query # Start with base query
query = db.query(CompanyTable).options(selectinload(CompanyTable.investors)) query = db.query(CompanyTable).options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
# Apply filters # Apply filters
if industry: if industry:
@@ -116,15 +137,34 @@ def filter_companies(
InvestorTable.name.ilike(f"%{investor_name}%") InvestorTable.name.ilike(f"%{investor_name}%")
) )
companies = query.all() # Get total count before pagination
total_count = query.count()
# Calculate offset and apply pagination
offset = (page - 1) * page_size
companies = query.offset(offset).limit(page_size).all()
# Transform to CompanyData format # Transform to CompanyData format
company_data_list = [] company_data_list = []
for company in companies: for company in companies:
company_data = CompanyData(company=company, investors=company.investors) company_data = CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=company.sectors,
)
company_data_list.append(company_data) company_data_list.append(company_data)
return company_data_list # Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=company_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/companies/{company_id}", response_model=CompanyData) @router.get("/companies/{company_id}", response_model=CompanyData)
@@ -132,7 +172,11 @@ def read_company(company_id: int, db: Session = Depends(get_db)):
"""Get a specific company by ID with its investors""" """Get a specific company by ID with its investors"""
company = ( company = (
db.query(CompanyTable) db.query(CompanyTable)
.options(selectinload(CompanyTable.investors)) .options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == company_id) .filter(CompanyTable.id == company_id)
.first() .first()
) )
@@ -141,7 +185,12 @@ def read_company(company_id: int, db: Session = Depends(get_db)):
raise HTTPException(status_code=404, detail="Company not found") raise HTTPException(status_code=404, detail="Company not found")
# Transform to CompanyData format # Transform to CompanyData format
return CompanyData(company=company, investors=company.investors) return CompanyData(
company=company,
investors=company.investors,
members=company.members,
sectors=company.sectors,
)
@router.post("/companies", response_model=CompanyData) @router.post("/companies", response_model=CompanyData)
@@ -155,14 +204,21 @@ def create_company(company: CompanyCreate, db: Session = Depends(get_db)):
# Reload with relationships # Reload with relationships
company_with_relations = ( company_with_relations = (
db.query(CompanyTable) db.query(CompanyTable)
.options(selectinload(CompanyTable.investors)) .options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == db_company.id) .filter(CompanyTable.id == db_company.id)
.first() .first()
) )
# Transform to CompanyData format # Transform to CompanyData format
return CompanyData( return CompanyData(
company=company_with_relations, investors=company_with_relations.investors company=company_with_relations,
investors=company_with_relations.investors,
members=company_with_relations.members,
sectors=company_with_relations.sectors,
) )
@@ -185,14 +241,21 @@ def update_company(
# Reload with relationships # Reload with relationships
company_with_relations = ( company_with_relations = (
db.query(CompanyTable) db.query(CompanyTable)
.options(selectinload(CompanyTable.investors)) .options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.filter(CompanyTable.id == company_id) .filter(CompanyTable.id == company_id)
.first() .first()
) )
# Transform to CompanyData format # Transform to CompanyData format
return CompanyData( return CompanyData(
company=company_with_relations, investors=company_with_relations.investors company=company_with_relations,
investors=company_with_relations.investors,
members=company_with_relations.members,
sectors=company_with_relations.sectors,
) )
+555
View File
@@ -0,0 +1,555 @@
from typing import Optional
from db.db import get_db
from db.models import FundTable, InvestorTable, SectorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from schemas.router_schemas import (
CompanyMinimal,
InvestmentResponse,
InvestmentStage,
InvestorData,
PaginatedResponse,
SectorMinimal,
)
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Investor Routes"])
# Request schemas for creating/updating
class InvestorCreate(BaseModel):
name: str
description: Optional[str] = None
website: Optional[str] = None
headquarters: Optional[str] = None
aum: int
check_size_lower: int
check_size_upper: int
geographic_focus: str
number_of_investments: int = 0
class InvestorUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
website: Optional[str] = None
headquarters: Optional[str] = None
aum: Optional[int] = None
check_size_lower: Optional[int] = None
check_size_upper: Optional[int] = None
geographic_focus: Optional[str] = None
number_of_investments: Optional[int] = None
@router.get("/investors", response_model=PaginatedResponse[InvestmentResponse])
def read_investors(
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Get all investors with their funds as separate entries (paginated)
Each investor-fund combination is returned as a separate row.
An investor with 3 funds will appear as 3 entries.
"""
# Calculate offset
offset = (page - 1) * page_size
# Get total count
total_count = db.query(InvestorTable).count()
# Get paginated results
investors = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds),
)
.offset(offset)
.limit(page_size)
.all()
)
# Transform to InvestmentResponse format (one row per investor-fund combination)
investment_responses = []
for investor in investors:
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
for company in investor.portfolio_companies[:3]
]
# If investor has funds, create one entry per fund
if investor.funds:
for fund in investor.funds:
# Get stage focus as comma-separated string
stage_focus = (
", ".join([stage.name for stage in fund.investment_stages])
if fund.investment_stages
else None
)
# Get top 3 sectors from fund (id and name only)
fund_sectors = [
SectorMinimal(id=sector.id, name=sector.name)
for sector in (fund.sectors[:3] if fund.sectors else [])
]
investment_response = InvestmentResponse(
id=investor.id,
name=f"{investor.name} - {fund.fund_name}"
if fund.fund_name
else investor.name,
aum=investor.aum,
check_size_lower=fund.check_size_lower,
check_size_upper=fund.check_size_upper,
geographic_focus=fund.geographic_focus,
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=1.0,
)
investment_responses.append(investment_response)
else:
# If no funds, create one entry with null fund fields
investment_response = InvestmentResponse(
id=investor.id,
name=investor.name,
aum=investor.aum,
check_size_lower=None,
check_size_upper=None,
geographic_focus=None,
stage_focus=None,
portfolio_companies=portfolio_companies,
sectors=[],
compatibility_score=1.0,
)
investment_responses.append(investment_response)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=investment_responses,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/investors/filter", response_model=PaginatedResponse[InvestmentResponse])
def filter_investors(
stage: Optional[InvestmentStage] = Query(
None, description="Filter by investment stage"
),
min_check_size: Optional[int] = Query(None, description="Minimum check size"),
max_check_size: Optional[int] = Query(None, description="Maximum check size"),
geography: Optional[str] = Query(
None, description="Geographic focus (partial match)"
),
sector: Optional[str] = Query(None, description="Sector name (partial match)"),
min_aum: Optional[int] = Query(None, description="Minimum AUM"),
max_aum: Optional[int] = Query(None, description="Maximum AUM"),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Filter investors based on various criteria (paginated)
Returns investor-fund combinations as separate rows.
Queries the funds table to find matching funds.
"""
# Start with base query on funds table
query = db.query(FundTable).options(
selectinload(FundTable.investor).selectinload(
InvestorTable.portfolio_companies
),
selectinload(FundTable.investor).selectinload(InvestorTable.team_members),
selectinload(FundTable.investor).selectinload(InvestorTable.sectors),
selectinload(FundTable.investment_stages),
selectinload(FundTable.sectors),
)
# Apply filters at fund level
if min_check_size is not None:
query = query.filter(FundTable.check_size_lower >= min_check_size)
if max_check_size is not None:
query = query.filter(FundTable.check_size_upper <= max_check_size)
if geography:
query = query.filter(FundTable.geographic_focus.ilike(f"%{geography}%"))
# Apply filters at investor level (through relationship)
if min_aum is not None:
query = query.join(FundTable.investor).filter(InvestorTable.aum >= min_aum)
if max_aum is not None:
if min_aum is None: # Only join if not already joined
query = query.join(FundTable.investor)
query = query.filter(InvestorTable.aum <= max_aum)
# Filter by sector if provided (at fund level)
if sector:
query = query.join(FundTable.sectors).filter(
SectorTable.name.ilike(f"%{sector}%")
)
# Get total count before pagination
total_count = query.count()
# Calculate offset and apply pagination
offset = (page - 1) * page_size
funds = query.offset(offset).limit(page_size).all()
# Transform to InvestmentResponse format (one row per fund)
investment_responses = []
for fund in funds:
investor = fund.investor
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
for company in investor.portfolio_companies[:3]
]
# Get stage focus as comma-separated string
stage_focus = (
", ".join([stage.name for stage in fund.investment_stages])
if fund.investment_stages
else None
)
# Get top 3 sectors from fund (id and name only)
fund_sectors = [
SectorMinimal(id=sector.id, name=sector.name)
for sector in (fund.sectors[:3] if fund.sectors else [])
]
investment_response = InvestmentResponse(
id=investor.id,
name=f"{investor.name} - {fund.fund_name}"
if fund.fund_name
else investor.name,
aum=investor.aum,
check_size_lower=fund.check_size_lower,
check_size_upper=fund.check_size_upper,
geographic_focus=fund.geographic_focus,
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=1.0,
)
investment_responses.append(investment_response)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=investment_responses,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/investors/{investor_id}", response_model=InvestorData)
def read_investor(investor_id: int, db: Session = Depends(get_db)):
"""Get a specific investor by ID with all their funds"""
investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds),
)
.filter(InvestorTable.id == investor_id)
.first()
)
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Transform to InvestorData format (includes funds array)
return InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
funds=investor.funds,
)
@router.post("/investors", response_model=InvestorData)
def create_investor(investor: InvestorCreate, db: Session = Depends(get_db)):
"""Create a new investor"""
db_investor = InvestorTable(**investor.dict())
db.add(db_investor)
db.commit()
db.refresh(db_investor)
# Reload with relationships
investor_with_relations = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds),
)
.filter(InvestorTable.id == db_investor.id)
.first()
)
# Transform to InvestorData format
return InvestorData(
investor=investor_with_relations,
portfolio_companies=investor_with_relations.portfolio_companies,
team_members=investor_with_relations.team_members,
sectors=investor_with_relations.sectors,
funds=investor_with_relations.funds,
)
@router.put("/investors/{investor_id}", response_model=InvestorData)
def update_investor(
investor_id: int, investor: InvestorUpdate, db: Session = Depends(get_db)
):
"""Update an existing investor"""
db_investor = (
db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
)
if not db_investor:
raise HTTPException(status_code=404, detail="Investor not found")
update_data = investor.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_investor, field, value)
db.commit()
db.refresh(db_investor)
# Reload with relationships
investor_with_relations = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds),
)
.filter(InvestorTable.id == investor_id)
.first()
)
# Transform to InvestorData format
return InvestorData(
investor=investor_with_relations,
portfolio_companies=investor_with_relations.portfolio_companies,
team_members=investor_with_relations.team_members,
sectors=investor_with_relations.sectors,
funds=investor_with_relations.funds,
)
@router.delete("/investors/{investor_id}")
def delete_investor(investor_id: int, db: Session = Depends(get_db)):
"""Delete an investor"""
db_investor = (
db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
)
if not db_investor:
raise HTTPException(status_code=404, detail="Investor not found")
db.delete(db_investor)
db.commit()
return {"message": "Investor deleted successfully"}
@router.get(
"/investors/{investor_id}/similar",
response_model=PaginatedResponse[InvestmentResponse],
)
def find_similar_investors(
investor_id: int,
limit: int = Query(10, description="Maximum number of similar investors to return"),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Find investors similar to a given investor based on characteristics (paginated)
Returns investor-fund combinations as separate rows.
Queries the funds table to find matching funds.
"""
# Get the target investor to get their funds for comparison
target_investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds).selectinload(FundTable.investment_stages),
selectinload(InvestorTable.funds).selectinload(FundTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
)
if not target_investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Get target investor's sector IDs for comparison (from their funds)
target_sector_ids = set()
target_stage_ids = set()
target_check_ranges = []
target_geographies = []
for fund in target_investor.funds:
if fund.sectors:
target_sector_ids.update({sector.id for sector in fund.sectors})
if fund.investment_stages:
target_stage_ids.update({stage.id for stage in fund.investment_stages})
if fund.check_size_lower and fund.check_size_upper:
target_check_ranges.append((fund.check_size_lower, fund.check_size_upper))
if fund.geographic_focus:
target_geographies.append(fund.geographic_focus.lower())
# Query all funds from other investors
candidate_funds = (
db.query(FundTable)
.options(
selectinload(FundTable.investor).selectinload(
InvestorTable.portfolio_companies
),
selectinload(FundTable.investor).selectinload(InvestorTable.team_members),
selectinload(FundTable.investor).selectinload(InvestorTable.sectors),
selectinload(FundTable.investment_stages),
selectinload(FundTable.sectors),
)
.join(FundTable.investor)
.filter(InvestorTable.id != investor_id)
.all()
)
# Calculate similarity scores for each fund
scored_funds = []
for fund in candidate_funds:
score = 0
# Geographic focus match (20 points for exact, 10 for partial)
if fund.geographic_focus and target_geographies:
fund_geo_lower = fund.geographic_focus.lower()
for target_geo in target_geographies:
if fund_geo_lower == target_geo:
score += 20
break
elif fund_geo_lower in target_geo or target_geo in fund_geo_lower:
score += 10
break
# Check size overlap (20 points max)
if fund.check_size_lower and fund.check_size_upper and target_check_ranges:
max_overlap_score = 0
for target_lower, target_upper in target_check_ranges:
overlap_start = max(fund.check_size_lower, target_lower)
overlap_end = min(fund.check_size_upper, target_upper)
if overlap_end > overlap_start:
overlap = overlap_end - overlap_start
target_range = target_upper - target_lower
overlap_ratio = overlap / target_range if target_range > 0 else 0
max_overlap_score = max(max_overlap_score, int(20 * overlap_ratio))
score += max_overlap_score
# AUM similarity (15 points max)
if fund.investor.aum and target_investor.aum:
aum_diff = abs(fund.investor.aum - target_investor.aum)
max_aum = max(fund.investor.aum, target_investor.aum)
similarity_ratio = 1 - (aum_diff / max_aum) if max_aum > 0 else 0
score += int(15 * similarity_ratio)
# Sector overlap (30 points max)
if fund.sectors and target_sector_ids:
fund_sector_ids = {sector.id for sector in fund.sectors}
common_sectors = target_sector_ids.intersection(fund_sector_ids)
overlap_ratio = len(common_sectors) / len(target_sector_ids)
score += int(30 * overlap_ratio)
# Investment stage match (15 points max)
if fund.investment_stages and target_stage_ids:
fund_stage_ids = {stage.id for stage in fund.investment_stages}
common_stages = target_stage_ids.intersection(fund_stage_ids)
overlap_ratio = len(common_stages) / len(target_stage_ids)
score += int(15 * overlap_ratio)
if score > 0: # Only include funds with some similarity
scored_funds.append((score, fund))
# Sort by score (descending) and take top N based on limit
scored_funds.sort(key=lambda x: x[0], reverse=True)
top_similar = scored_funds[:limit]
# Apply pagination to the top similar funds
total_count = len(top_similar)
offset = (page - 1) * page_size
paginated_similar = top_similar[offset : offset + page_size]
similar_funds = [fund for score, fund in paginated_similar]
# Transform to InvestmentResponse format (one row per fund)
investment_responses = []
for fund in similar_funds:
investor = fund.investor
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
for company in investor.portfolio_companies[:3]
]
# Get stage focus as comma-separated string
stage_focus = (
", ".join([stage.name for stage in fund.investment_stages])
if fund.investment_stages
else None
)
# Get top 3 sectors from fund (id and name only)
fund_sectors = [
SectorMinimal(id=sector.id, name=sector.name)
for sector in (fund.sectors[:3] if fund.sectors else [])
]
investment_response = InvestmentResponse(
id=investor.id,
name=f"{investor.name} - {fund.fund_name}"
if fund.fund_name
else investor.name,
aum=investor.aum,
check_size_lower=fund.check_size_lower,
check_size_upper=fund.check_size_upper,
geographic_focus=fund.geographic_focus,
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=1.0,
)
investment_responses.append(investment_response)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=investment_responses,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
+486
View File
@@ -0,0 +1,486 @@
from typing import List, Optional
from db.db import get_db
from db.models import (
CompanyTable,
InvestorTable,
ProjectTable,
SectorTable,
)
from fastapi import APIRouter, Depends, HTTPException, Query
from schemas.project_schemas import (
InvestmentStage,
ProjectCreate,
ProjectData,
ProjectUpdate,
)
from schemas.router_schemas import PaginatedResponse
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Project Routes"])
@router.get("/projects", response_model=PaginatedResponse[ProjectData])
def read_projects(
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Get all projects with their related data (paginated)"""
# Calculate offset
offset = (page - 1) * page_size
# Get total count
total_count = db.query(ProjectTable).count()
# Get paginated results
projects = (
db.query(ProjectTable)
.options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.offset(offset)
.limit(page_size)
.all()
)
# Transform ProjectTable objects to ProjectData format
project_data_list = []
for project in projects:
project_data = ProjectData(
project=project,
sector=project.sector,
investors=project.investors,
companies=project.companies,
)
project_data_list.append(project_data)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=project_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/projects/{project_id}", response_model=ProjectData)
def read_project(project_id: int, db: Session = Depends(get_db)):
"""Get a specific project by ID"""
project = (
db.query(ProjectTable)
.options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.filter(ProjectTable.id == project_id)
.first()
)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return ProjectData(
project=project,
sector=project.sector,
investors=project.investors,
companies=project.companies,
)
@router.post("/projects", response_model=ProjectData)
def create_project(project: ProjectCreate, db: Session = Depends(get_db)):
"""Create a new project"""
db_project = ProjectTable(**project.dict())
db.add(db_project)
db.commit()
db.refresh(db_project)
# Reload with relationships
db_project = (
db.query(ProjectTable)
.options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.filter(ProjectTable.id == db_project.id)
.first()
)
return ProjectData(
project=db_project,
sector=db_project.sector,
investors=db_project.investors,
companies=db_project.companies,
)
@router.put("/projects/{project_id}", response_model=ProjectData)
def update_project(
project_id: int, project: ProjectUpdate, db: Session = Depends(get_db)
):
"""Update an existing project"""
db_project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not db_project:
raise HTTPException(status_code=404, detail="Project not found")
# Update only provided fields
update_data = project.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_project, key, value)
db.commit()
db.refresh(db_project)
# Reload with relationships
db_project = (
db.query(ProjectTable)
.options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.filter(ProjectTable.id == project_id)
.first()
)
return ProjectData(
project=db_project,
sector=db_project.sector,
investors=db_project.investors,
companies=db_project.companies,
)
@router.delete("/projects/{project_id}")
def delete_project(project_id: int, db: Session = Depends(get_db)):
"""Delete a project"""
db_project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not db_project:
raise HTTPException(status_code=404, detail="Project not found")
db.delete(db_project)
db.commit()
return {"message": "Project deleted successfully"}
@router.get("/projects/filter", response_model=PaginatedResponse[ProjectData])
def filter_projects(
stage: Optional[InvestmentStage] = Query(
None, description="Filter by project stage"
),
min_valuation: Optional[int] = Query(None, description="Minimum valuation"),
max_valuation: Optional[int] = Query(None, description="Maximum valuation"),
location: Optional[str] = Query(None, description="Location (partial match)"),
sector: Optional[str] = Query(None, description="Sector name (partial match)"),
investor_name: Optional[str] = Query(
None, description="Investor name (partial match)"
),
company_name: Optional[str] = Query(
None, description="Company name (partial match)"
),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Filter projects based on various criteria (paginated)"""
# Start with base query
query = db.query(ProjectTable).options(
selectinload(ProjectTable.sector),
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
# Apply filters
if stage:
query = query.filter(ProjectTable.stage == stage)
if min_valuation is not None:
query = query.filter(ProjectTable.valuation >= min_valuation)
if max_valuation is not None:
query = query.filter(ProjectTable.valuation <= max_valuation)
if location:
query = query.filter(ProjectTable.location.ilike(f"%{location}%"))
if sector:
query = query.join(ProjectTable.sector).filter(
SectorTable.name.ilike(f"%{sector}%")
)
if investor_name:
query = query.join(ProjectTable.investors).filter(
InvestorTable.name.ilike(f"%{investor_name}%")
)
if company_name:
query = query.join(ProjectTable.companies).filter(
CompanyTable.name.ilike(f"%{company_name}%")
)
# Get total count before pagination
total_count = query.count()
# Calculate offset and apply pagination
offset = (page - 1) * page_size
projects = query.offset(offset).limit(page_size).all()
# Transform to ProjectData format
project_data_list = []
for project in projects:
project_data = ProjectData(
project=project,
sector=project.sector,
investors=project.investors,
companies=project.companies,
)
project_data_list.append(project_data)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=project_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
# Association management routes
@router.post("/projects/{project_id}/investors/{investor_id}")
def add_investor_to_project(
project_id: int, investor_id: int, db: Session = Depends(get_db)
):
"""Add an investor to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if investor exists
investor = db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Check if association already exists
if investor in project.investors:
raise HTTPException(
status_code=400, detail="Investor already associated with project"
)
# Add association
project.investors.append(investor)
db.commit()
return {"message": "Investor added to project successfully"}
@router.delete("/projects/{project_id}/investors/{investor_id}")
def remove_investor_from_project(
project_id: int, investor_id: int, db: Session = Depends(get_db)
):
"""Remove an investor from a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if investor exists
investor = db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Check if association exists
if investor not in project.investors:
raise HTTPException(
status_code=400, detail="Investor not associated with project"
)
# Remove association
project.investors.remove(investor)
db.commit()
return {"message": "Investor removed from project successfully"}
@router.post("/projects/{project_id}/companies/{company_id}")
def add_company_to_project(
project_id: int, company_id: int, db: Session = Depends(get_db)
):
"""Add a company to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if company exists
company = db.query(CompanyTable).filter(CompanyTable.id == company_id).first()
if not company:
raise HTTPException(status_code=404, detail="Company not found")
# Check if association already exists
if company in project.companies:
raise HTTPException(
status_code=400, detail="Company already associated with project"
)
# Add association
project.companies.append(company)
db.commit()
return {"message": "Company added to project successfully"}
@router.delete("/projects/{project_id}/companies/{company_id}")
def remove_company_from_project(
project_id: int, company_id: int, db: Session = Depends(get_db)
):
"""Remove a company from a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if company exists
company = db.query(CompanyTable).filter(CompanyTable.id == company_id).first()
if not company:
raise HTTPException(status_code=404, detail="Company not found")
# Check if association exists
if company not in project.companies:
raise HTTPException(
status_code=400, detail="Company not associated with project"
)
# Remove association
project.companies.remove(company)
db.commit()
return {"message": "Company removed from project successfully"}
@router.post("/projects/{project_id}/sectors/{sector_id}")
def add_sector_to_project(
project_id: int, sector_id: int, db: Session = Depends(get_db)
):
"""Add a sector to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if sector exists
sector = db.query(SectorTable).filter(SectorTable.id == sector_id).first()
if not sector:
raise HTTPException(status_code=404, detail="Sector not found")
# Check if association already exists
if sector in project.sector:
raise HTTPException(
status_code=400, detail="Sector already associated with project"
)
# Add association
project.sector.append(sector)
db.commit()
return {"message": "Sector added to project successfully"}
@router.delete("/projects/{project_id}/sectors/{sector_id}")
def remove_sector_from_project(
project_id: int, sector_id: int, db: Session = Depends(get_db)
):
"""Remove a sector from a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check if sector exists
sector = db.query(SectorTable).filter(SectorTable.id == sector_id).first()
if not sector:
raise HTTPException(status_code=404, detail="Sector not found")
# Check if association exists
if sector not in project.sector:
raise HTTPException(
status_code=400, detail="Sector not associated with project"
)
# Remove association
project.sector.remove(sector)
db.commit()
return {"message": "Sector removed from project successfully"}
# Bulk association management
@router.post("/projects/{project_id}/investors")
def add_multiple_investors_to_project(
project_id: int, investor_ids: List[int], db: Session = Depends(get_db)
):
"""Add multiple investors to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Get all investors
investors = db.query(InvestorTable).filter(InvestorTable.id.in_(investor_ids)).all()
if len(investors) != len(investor_ids):
raise HTTPException(status_code=404, detail="One or more investors not found")
# Add associations (only if not already associated)
added_count = 0
for investor in investors:
if investor not in project.investors:
project.investors.append(investor)
added_count += 1
db.commit()
return {"message": f"Added {added_count} investors to project successfully"}
@router.post("/projects/{project_id}/companies")
def add_multiple_companies_to_project(
project_id: int, company_ids: List[int], db: Session = Depends(get_db)
):
"""Add multiple companies to a project"""
# Check if project exists
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Get all companies
companies = db.query(CompanyTable).filter(CompanyTable.id.in_(company_ids)).all()
if len(companies) != len(company_ids):
raise HTTPException(status_code=404, detail="One or more companies not found")
# Add associations (only if not already associated)
added_count = 0
for company in companies:
if company not in project.companies:
project.companies.append(company)
added_count += 1
db.commit()
return {"message": f"Added {added_count} companies to project successfully"}
Binary file not shown.
Binary file not shown.
+117
View File
@@ -0,0 +1,117 @@
from datetime import datetime
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel
class InvestmentStage(str, Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
class SectorSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
class InvestorSchema(BaseModel):
id: int
name: str
description: Optional[str]
aum: int | None
check_size_lower: int | None
check_size_upper: int | None
geographic_focus: str | None
stage_focus: InvestmentStage
number_of_investments: int | None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class CompanySchema(BaseModel):
id: int
name: str
industry: str | None
location: str | None
description: Optional[str]
founded_year: Optional[int]
website: Optional[str]
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class ProjectSchema(BaseModel):
id: int
name: str
valuation: int | None
stage: InvestmentStage | None
location: str | None
description: Optional[str]
start_date: Optional[datetime]
end_date: Optional[datetime]
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class ProjectCreate(BaseModel):
name: str
valuation: Optional[int] = None
stage: Optional[InvestmentStage] = None
location: Optional[str] = None
description: Optional[str] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
class ProjectUpdate(BaseModel):
name: Optional[str] = None
valuation: Optional[int] = None
stage: Optional[InvestmentStage] = None
location: Optional[str] = None
description: Optional[str] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
class ProjectData(BaseModel):
"""Comprehensive project data schema"""
project: ProjectSchema
sector: List[SectorSchema]
investors: List[InvestorSchema]
companies: List[CompanySchema]
class Config:
from_attributes = True
class ProjectInvestorAssociation(BaseModel):
project_id: int
investor_id: int
class ProjectCompanyAssociation(BaseModel):
project_id: int
company_id: int
class ProjectSectorAssociation(BaseModel):
project_id: int
sector_id: int
+352
View File
@@ -0,0 +1,352 @@
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field, field_validator
class InvestmentStage(str, Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
class SectorSchema(BaseModel):
"""
Expert parser: Only extract sector information if clearly identifiable.
Leave name empty if uncertain about the sector classification.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Sector ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Sector name. Leave empty string if not clearly identifiable from the data.",
)
@field_validator("name", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional id field"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class InvestorMemberSchema(BaseModel):
"""
Expert parser: Only extract team member information if clearly identifiable.
Leave fields empty if uncertain about the member details.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Member ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Team member name. Leave empty string if not clearly identifiable.",
)
role: Optional[str] = Field(
default=None,
description="Team member role/title. Leave empty string if not clearly identifiable.",
)
email: Optional[str] = Field(
default=None,
description="Team member email. Leave empty string if not clearly identifiable or not provided.",
)
investor_id: Optional[int] = Field(
default=None,
ge=0,
description="Investor ID, must be 0 or greater. Use 0 if uncertain.",
)
@field_validator("name", "role", "email", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "investor_id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class CompanyMemberSchema(BaseModel):
"""
Expert parser: Only extract company member information if clearly identifiable.
Leave fields empty if uncertain about the member details.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Member ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Company member name. Leave empty if not clearly identifiable.",
)
linkedin: Optional[str] = Field(
default=None,
description="LinkedIn profile URL. Leave empty if not provided or uncertain.",
)
role: Optional[str] = Field(
default=None,
description="Company member role/title. Leave empty if not clearly identifiable.",
)
company_id: Optional[int] = Field(
default=None,
ge=0,
description="Company ID, must be 0 or greater. Use 0 if uncertain.",
)
@field_validator("name", "linkedin", "role", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "company_id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class CompanySchema(BaseModel):
"""
Expert parser: Only extract company information if clearly identifiable.
Leave optional fields empty if uncertain. Integer values must be 0 or greater.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Company ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Company name. Leave empty string if not clearly identifiable.",
)
industry: Optional[str] = Field(
default=None,
description="Company industry/sector. Leave empty string if not clearly identifiable.",
)
location: Optional[str] = Field(
default=None,
description="Company location/address. Leave empty string if not clearly identifiable.",
)
description: Optional[str] = Field(
default=None,
description="Company description. Leave empty if not clearly available or uncertain.",
)
founded_year: Optional[int] = Field(
default=None,
ge=0,
description="Year company was founded, must be 0 or greater. Leave None if not clearly identifiable or uncertain.",
)
website: Optional[str] = Field(
default=None,
description="Company website URL. Leave empty if not provided or uncertain.",
)
@field_validator(
"name", "industry", "location", "description", "website", mode="before"
)
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "founded_year", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for founded_year"""
if v == 0:
return None
return v
@field_validator("founded_year", mode="before")
@classmethod
def validate_founded_year(cls, v):
"""Expert parser: Only accept clearly identifiable founding years"""
if v is None or v == "Not Available" or v == "" or v == "Unknown":
return None
if isinstance(v, str):
try:
year = int(v)
return year if year >= 0 else None
except ValueError:
return None
return v if isinstance(v, int) and v >= 0 else None
class Config:
from_attributes = True
class InvestorSchema(BaseModel):
"""
Expert parser: Only extract investor information if clearly identifiable.
Leave optional fields empty if uncertain. All numeric values must be 0 or greater.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Investor ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Investor name. Do not return any special characters, Just the name as a string.",
)
description: Optional[str] = Field(
default=None,
description="Investor description. Leave empty if not clearly available or uncertain.",
)
aum: Optional[int] = Field(
default=None,
ge=0,
description="Assets Under Management in USD, must be 0 or greater. Use 0 if not clearly identifiable or uncertain.",
)
check_size_lower: Optional[int] = Field(
default=None,
ge=0,
description="Lower bound of typical investment check size in USD, must be 0 or greater. Use 0 if not clearly identifiable.",
)
check_size_upper: Optional[int] = Field(
default=None,
ge=0,
description="Upper bound of typical investment check size in USD, must be 0 or greater. Use 0 if not clearly identifiable.",
)
geographic_focus: Optional[str] = Field(
default=None,
description="Geographic investment focus. Do not return any special characters, Just locations separated by commas. Leave empty if not clearly identifiable.",
)
number_of_investments: Optional[int] = Field(
default=None,
ge=0,
description="Total number of investments made, must be 0 or greater. Use 0 if not clearly identifiable.",
)
@field_validator("name", "description", "geographic_focus", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator(
"id",
"aum",
"check_size_lower",
"check_size_upper",
"number_of_investments",
mode="before",
)
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class InvestorData(BaseModel):
"""
Expert parser: Comprehensive investor data schema for LLM processing.
Only populate fields with clearly identifiable information. Leave lists empty if uncertain.
"""
investor: InvestorSchema = Field(
description="Core investor information. Only populate with clearly identifiable data."
)
portfolio_companies: List[CompanySchema] = Field(
default=[],
description="List of portfolio companies. Leave empty if not clearly identifiable.",
)
team_members: List[InvestorMemberSchema] = Field(
default=[],
description="List of team members. Leave empty if not clearly identifiable.",
)
sectors: List[SectorSchema] = Field(
default=[],
description="List of investment sectors. Leave empty if not clearly identifiable.",
)
class Config:
from_attributes = True
class CompanyData(BaseModel):
"""
Expert parser: Comprehensive company data schema for LLM processing.
Only populate fields with clearly identifiable information. Leave lists empty if uncertain.
"""
company: CompanySchema = Field(
description="Core company information. Only populate with clearly identifiable data."
)
sectors: List[SectorSchema] = Field(
default=[],
description="List of company sectors. Leave empty if not clearly identifiable.",
)
members: List[CompanyMemberSchema] = Field(
default=[],
description="List of company members. Leave empty if not clearly identifiable.",
)
investors: List[InvestorSchema] = Field(
default=[],
description="List of investors. Leave empty if not clearly identifiable.",
)
class Config:
from_attributes = True
class InvestorList(BaseModel):
"""Expert parser: List of investors with clearly identifiable information only."""
investors: List[InvestorData] = Field(
default=[],
description="List of investors. Leave empty if no clearly identifiable investors.",
)
+262
View File
@@ -0,0 +1,262 @@
from datetime import datetime
from enum import Enum
from typing import Any, Generic, List, Optional, TypeVar
from pydantic import BaseModel
# Generic type for pagination
T = TypeVar("T")
class InvestmentStage(str, Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
class SectorSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
class InvestmentStageSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
class InvestorMemberSchema(BaseModel):
id: int
name: str
role: str | None
email: str | None
class Config:
from_attributes = True
class FundSchema(BaseModel):
id: int
fund_name: str | None
fund_size: int | None # Changed to int for numerical filtering
fund_size_source_url: str | None
check_size_lower: int | None # NEW: Lower bound of check size range
check_size_upper: int | None # NEW: Upper bound of check size range
source_url: str | None
source_provider: str | None
geographic_focus: str | None # Changed from List[str] to string
investment_stages: List[InvestmentStageSchema] | None # Changed to relationship
sectors: List[SectorSchema] | None # Changed to relationship
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class CompanyMemberSchema(BaseModel):
id: int
name: Optional[str]
linkedin: Optional[str]
role: Optional[str]
company_id: int
class Config:
from_attributes = True
class CompanySchema(BaseModel):
id: int
name: str
industry: str | None
location: str | None
description: Optional[str]
founded_year: Optional[int]
website: Optional[str]
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class InvestorSchema(BaseModel):
id: int
name: str
description: Optional[str]
website: Optional[str] = None
headquarters: Optional[str] = None
aum: int | None
aum_as_of_date: str | None = None
aum_source_url: str | None = None
check_size_lower: int | None
check_size_upper: int | None
geographic_focus: str | None
investment_thesis: Any = (
None # Flexible JSON field - can be list, dict, or list of dicts
)
portfolio_highlights: Any = (
None # Flexible JSON field - can be list, dict, or list of dicts
)
number_of_investments: int | None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class InvestorData(BaseModel):
"""Comprehensive investor data schema - used for individual investor requests"""
investor: InvestorSchema
portfolio_companies: List[CompanySchema]
team_members: List[InvestorMemberSchema]
sectors: List[SectorSchema]
funds: List[FundSchema]
class Config:
from_attributes = True
class InvestorFundData(BaseModel):
"""Investor-Fund combined data - used for list/filter requests
Each row represents one investor-fund combination.
An investor with 3 funds will appear as 3 separate entries.
"""
# Investor fields
investor_id: int
investor_name: str
investor_description: Optional[str]
investor_website: Optional[str]
investor_headquarters: Optional[str]
aum: int | None
aum_as_of_date: str | None
aum_source_url: str | None
investment_thesis: Any = None # Flexible JSON field
portfolio_highlights: Any = None # Flexible JSON field
number_of_investments: int | None
# Fund fields
fund_id: int | None
fund_name: str | None
fund_size: int | None # Changed to int for numerical filtering
fund_size_source_url: str | None
check_size_lower: int | None # NEW: Lower bound of check size range
check_size_upper: int | None # NEW: Upper bound of check size range
geographic_focus: str | None # Changed from List[str] to string
fund_investment_stages: (
List[InvestmentStageSchema] | None
) # Changed to relationship
fund_sectors: List[SectorSchema] | None # Changed to relationship
# Related data
portfolio_companies: List[CompanySchema]
team_members: List[InvestorMemberSchema]
sectors: List[SectorSchema]
class Config:
from_attributes = True
class InvestorMinimal(BaseModel):
"""Minimal investor info with just id and name"""
id: int
name: str
class Config:
from_attributes = True
class CompanySchemaMinimal(BaseModel):
id: int
name: str
industry: str | None
location: str | None
founded_year: Optional[int]
website: Optional[str]
class Config:
from_attributes = True
class CompanyData(BaseModel): # Renamed from CompaniesData for consistency
company: CompanySchemaMinimal
investors: List[InvestorMinimal]
class Config:
from_attributes = True
class InvestorList(BaseModel):
investors: List[InvestorData]
class InvestorFundList(BaseModel):
"""List of investor-fund combinations"""
investor_funds: List[InvestorFundData]
class CompanyMinimal(BaseModel):
"""Minimal company info with just id and name"""
id: int
name: str
class Config:
from_attributes = True
class SectorMinimal(BaseModel):
"""Minimal sector info with just id and name"""
id: int
name: str
class Config:
from_attributes = True
class InvestmentResponse(BaseModel):
"""Simplified investment response schema
One row per investor-fund combination with streamlined data
"""
id: int # Investor ID
name: (
str # Combination of investor name and fund name (e.g., "Investor A - Fund A")
)
aum: int | None # From investor
check_size_lower: int | None # From fund
check_size_upper: int | None # From fund
geographic_focus: str | None # From fund
stage_focus: str | None # Comma-separated stages from fund
portfolio_companies: List[CompanyMinimal] # Top 3 companies from investor
sectors: List[SectorMinimal] # Top 3 sectors from fund
compatibility_score: float # 0 to 1 (default 1 for now)
class Config:
from_attributes = True
class PaginatedResponse(BaseModel, Generic[T]):
"""Generic paginated response schema"""
items: List[T]
total: int
page: int
page_size: int
total_pages: int
class Config:
from_attributes = True
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
View File
View File
View File
+912 -347
View File
File diff suppressed because it is too large Load Diff
-293
View File
@@ -1,293 +0,0 @@
import asyncio
from typing import List, Optional
import chromadb
import pandas as pd
from db.models import CompanyTable, InvestorTable, InvestorTeamMember, SectorTable
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from py_schemas import InvestorData
from pydantic import BaseModel
from settings import settings
class InvestorList(BaseModel):
"""Schema for LLM structured output"""
investor_list: List[InvestorData]
class InvestorProcessor:
def __init__(
self,
sql_session: Optional[object] = None,
vector_db_client: Optional[object] = None,
):
self.template = """You are an expert data extraction assistant. Extract investor information from the provided CSV data and return it as a list of structured records.
Given the following CSV data rows:
{question}
For each row, extract and structure the following fields for the investor:
- name: The investor's full name
- description: Description of the investor
- aum: Assets under management (as integer, use 0 if not available)
- check_size_lower: Lower bound of investment check size (as integer)
- check_size_upper: Upper bound of investment check size (as integer)
- geographic_focus: Geographic region focus
- stage_focus: Investment stage focus (must be one of: seed, series_a, series_b, series_c, growth, late_stage)
- number_of_investments: Number of investments made (default 0)
Also extract related data:
- portfolio_companies: List of companies they've invested in
- team_members: List of team members with name, role, email
- sectors: List of sectors they focus on
Important:
- If a field is not available, use appropriate defaults
- stage_focus must be one of the valid enum values
- Return clean, valid JSON only
Return the data as a structured list of comprehensive investor data."""
self.prompt = PromptTemplate(
template=self.template, input_variables=["question"]
)
self.llm = ChatOpenAI(
api_key=settings.OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
model="google/gemini-2.5-flash-lite",
temperature=0,
)
self.structured_llm = self.llm.with_structured_output(InvestorList)
self.sql_session = sql_session
self.vector_db_client = vector_db_client
self.vector_db_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.vector_db_client.get_or_create_collection(
name="investor_descriptions",
metadata={
"description": "Investor descriptions and investment thesis focus"
},
)
async def _process_batch(
self, batch: pd.DataFrame, batch_idx: int
) -> List[InvestorData]:
"""Process a single batch of data"""
# Convert batch to string representation - clean the data
batch_str = ""
for idx, row in batch.iterrows():
# Clean values to remove control characters
cleaned_row = {}
for key, value in row.items():
if pd.notna(value):
# Convert to string and clean control characters
clean_value = (
str(value)
.replace("\n", " ")
.replace("\r", " ")
.replace("\t", " ")
)
# Remove other control characters
clean_value = "".join(
char
for char in clean_value
if ord(char) >= 32 or char in ["\n", "\r", "\t"]
)
cleaned_row[key] = clean_value
row_str = ", ".join(
[f"{key}: {value}" for key, value in cleaned_row.items()]
)
batch_str += f"Row {idx + 1}: {row_str}\n"
try:
print(f"Processing batch {batch_idx + 1}...")
batch_results = await self.structured_llm.ainvoke(batch_str)
return batch_results.investor_list
except Exception as e:
print(f"Error processing batch {batch_idx + 1}: {e}")
return []
async def _save_to_sql(self, investor_data_list: List[InvestorData]) -> None:
"""Save investors and related data to SQL database"""
if not self.sql_session:
return
try:
for investor_data in investor_data_list:
# Save investor
db_investor = InvestorTable(
name=investor_data.investor.name,
description=investor_data.investor.description,
aum=investor_data.investor.aum,
check_size_lower=investor_data.investor.check_size_lower,
check_size_upper=investor_data.investor.check_size_upper,
geographic_focus=investor_data.investor.geographic_focus,
stage_focus=investor_data.investor.stage_focus,
number_of_investments=investor_data.investor.number_of_investments,
)
self.sql_session.add(db_investor)
self.sql_session.flush() # Get the ID
# Save sectors and create associations
for sector_data in investor_data.sectors:
# Check if sector exists, create if not
existing_sector = (
self.sql_session.query(SectorTable)
.filter(SectorTable.name == sector_data.name)
.first()
)
if not existing_sector:
db_sector = SectorTable(name=sector_data.name)
self.sql_session.add(db_sector)
self.sql_session.flush()
# Add sector to investor's sectors
db_investor.sectors.append(db_sector)
else:
# Add existing sector to investor if not already there
if existing_sector not in db_investor.sectors:
db_investor.sectors.append(existing_sector)
# Save companies and create portfolio associations
for company_data in investor_data.portfolio_companies:
# Check if company exists, create if not
existing_company = (
self.sql_session.query(CompanyTable)
.filter(CompanyTable.name == company_data.name)
.first()
)
if not existing_company:
db_company = CompanyTable(
name=company_data.name,
industry=company_data.industry,
location=company_data.location,
founded_year=company_data.founded_year,
website=company_data.website,
)
self.sql_session.add(db_company)
self.sql_session.flush()
# Add to investor's portfolio
db_investor.portfolio_companies.append(db_company)
else:
# Add existing company to portfolio if not already there
if existing_company not in db_investor.portfolio_companies:
db_investor.portfolio_companies.append(existing_company)
# Save team members
for team_member_data in investor_data.team_members:
# Check if team member exists
existing_member = (
self.sql_session.query(InvestorTeamMember)
.filter(InvestorTeamMember.email == team_member_data.email)
.first()
)
if not existing_member:
db_team_member = InvestorTeamMember(
name=team_member_data.name,
role=team_member_data.role,
email=team_member_data.email,
investor_id=db_investor.id,
)
self.sql_session.add(db_team_member)
self.sql_session.commit()
print(f"Successfully saved {len(investor_data_list)} investors to database")
except Exception as e:
self.sql_session.rollback()
print(f"Error saving to SQL database: {e}")
raise
async def _save_to_vector_db(self, investor_data_list: List[InvestorData]) -> None:
"""Save investors to vector database"""
if not self.vector_db_client:
return
documents = []
metadatas = []
ids = []
for i, investor_data in enumerate(investor_data_list):
investor = investor_data.investor
sectors = ", ".join([s.name for s in investor_data.sectors])
companies = ", ".join([c.name for c in investor_data.portfolio_companies])
doc_text = f"""
Investor: {investor.name}
Description: {investor.description or "N/A"}
AUM: ${investor.aum:,}
Check Size: ${investor.check_size_lower:,} - ${investor.check_size_upper:,}
Geographic Focus: {investor.geographic_focus}
Stage Focus: {investor.stage_focus.value}
Sectors: {sectors}
Portfolio Companies: {companies}
""".strip()
documents.append(doc_text)
metadatas.append(
{
"name": investor.name,
"stage_focus": investor.stage_focus.value,
"geographic_focus": investor.geographic_focus,
"aum": investor.aum,
}
)
ids.append(
f"investor_{i}_{investor.name.replace(' ', '_').replace('/', '_')}"
)
if documents:
try:
self.collection.add(documents=documents, metadatas=metadatas, ids=ids)
print(
f"Successfully saved {len(documents)} investors to vector database"
)
except Exception as e:
print(f"Error saving to vector database: {e}")
async def process_csv(
self, df: pd.DataFrame, batch_size: int = 10, max_concurrent: int = 10
) -> List[InvestorData]:
"""Process CSV data in parallel batches and save to databases"""
results = []
# Create batches
batches = []
for i in range(0, len(df), batch_size):
batch = df.iloc[i : i + batch_size]
batches.append((batch, i // batch_size))
# Process batches with concurrency control
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(batch_data):
batch, batch_idx = batch_data
async with semaphore:
return await self._process_batch(batch, batch_idx)
# Execute all batches concurrently
batch_results = await asyncio.gather(
*[process_with_semaphore(batch_data) for batch_data in batches],
return_exceptions=True,
)
# Collect results, filtering out exceptions
for batch_result in batch_results:
if not isinstance(batch_result, Exception):
results.extend(batch_result)
# Save to databases
if results:
print(f"Successfully processed {len(results)} investors")
await self._save_to_sql(results)
await self._save_to_vector_db(results)
return results
-290
View File
@@ -1,290 +0,0 @@
import asyncio
from typing import List, Optional
import chromadb
import pandas as pd
from db.models import CompanyTable, InvestorTable, InvestorTeamMember, SectorTable
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from py_schemas import InvestorData
from pydantic import BaseModel
from settings import settings
class InvestorOutput(BaseModel):
"""Schema for LLM structured output"""
investor_data: InvestorData
class InvestorProcessor:
def __init__(
self,
sql_session: Optional[object] = None,
vector_db_client: Optional[object] = None,
):
self.template = """You are an expert data extraction assistant. Extract investor information from the provided CSV data and return it as a structured record.
Given the following CSV data row:
{question}
Extract and structure the following fields for the investor:
- name: The investor's full name
- description: Description of the investor
- aum: Assets under management (as integer, use 0 if not available)
- check_size_lower: Lower bound of investment check size (as integer)
- check_size_upper: Upper bound of investment check size (as integer)
- geographic_focus: Geographic region focus
- stage_focus: Investment stage focus (must be one of: seed, series_a, series_b, series_c, growth, late_stage)
- number_of_investments: Number of investments made (default 0)
Also extract related data:
- portfolio_companies: List of companies they've invested in
- team_members: List of team members with name, role, email
- sectors: List of sectors they focus on
Important:
- If a field is not available, use appropriate defaults
- stage_focus must be one of the valid enum values
- Return clean, valid JSON only
Return the data as a single comprehensive investor data record."""
self.prompt = PromptTemplate(
template=self.template, input_variables=["question"]
)
self.llm = ChatOpenAI(
api_key=settings.OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
model="google/gemini-2.5-flash-lite",
temperature=0,
)
self.structured_llm = self.llm.with_structured_output(InvestorOutput)
self.sql_session = sql_session
self.vector_db_client = vector_db_client
self.vector_db_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.vector_db_client.get_or_create_collection(
name="investor_descriptions",
metadata={
"description": "Investor descriptions and investment thesis focus"
},
)
async def _process_row(
self, row: pd.Series, row_idx: int
) -> Optional[InvestorData]:
"""Process a single row of data"""
# Clean values to remove control characters
cleaned_row = {}
for key, value in row.items():
if pd.notna(value):
# Convert to string and clean control characters
clean_value = (
str(value)
.replace("\n", " ")
.replace("\r", " ")
.replace("\t", " ")
)
# Remove other control characters
clean_value = "".join(
char
for char in clean_value
if ord(char) >= 32 or char in ["\n", "\r", "\t"]
)
cleaned_row[key] = clean_value
row_str = ", ".join(
[f"{key}: {value}" for key, value in cleaned_row.items()]
)
try:
print(f"Processing row {row_idx + 1}...")
result = await self.structured_llm.ainvoke(row_str)
if result.investor_data:
return result.investor_data
return None
except Exception as e:
print(f"Error processing row {row_idx + 1}: {e}")
return None
async def _save_to_sql(self, investor_data_list: List[InvestorData]) -> None:
"""Save investors and related data to SQL database"""
if not self.sql_session:
return
try:
for investor_data in investor_data_list:
# Save investor
db_investor = InvestorTable(
name=investor_data.investor.name,
description=investor_data.investor.description,
aum=investor_data.investor.aum,
check_size_lower=investor_data.investor.check_size_lower,
check_size_upper=investor_data.investor.check_size_upper,
geographic_focus=investor_data.investor.geographic_focus,
stage_focus=investor_data.investor.stage_focus,
number_of_investments=investor_data.investor.number_of_investments,
)
self.sql_session.add(db_investor)
self.sql_session.flush() # Get the ID
# Save sectors and create associations
for sector_data in investor_data.sectors:
# Check if sector exists, create if not
existing_sector = (
self.sql_session.query(SectorTable)
.filter(SectorTable.name == sector_data.name)
.first()
)
if not existing_sector:
db_sector = SectorTable(name=sector_data.name)
self.sql_session.add(db_sector)
self.sql_session.flush()
# Add sector to investor's sectors
db_investor.sectors.append(db_sector)
else:
# Add existing sector to investor if not already there
if existing_sector not in db_investor.sectors:
db_investor.sectors.append(existing_sector)
# Save companies and create portfolio associations
for company_data in investor_data.portfolio_companies:
# Check if company exists, create if not
existing_company = (
self.sql_session.query(CompanyTable)
.filter(CompanyTable.name == company_data.name)
.first()
)
if not existing_company:
db_company = CompanyTable(
name=company_data.name,
industry=company_data.industry,
location=company_data.location,
founded_year=company_data.founded_year,
website=company_data.website,
)
self.sql_session.add(db_company)
self.sql_session.flush()
# Add to investor's portfolio
db_investor.portfolio_companies.append(db_company)
else:
# Add existing company to portfolio if not already there
if existing_company not in db_investor.portfolio_companies:
db_investor.portfolio_companies.append(existing_company)
# Save team members
for team_member_data in investor_data.team_members:
# Check if team member exists
existing_member = (
self.sql_session.query(InvestorTeamMember)
.filter(InvestorTeamMember.email == team_member_data.email)
.first()
)
if not existing_member:
db_team_member = InvestorTeamMember(
name=team_member_data.name,
role=team_member_data.role,
email=team_member_data.email,
investor_id=db_investor.id,
)
self.sql_session.add(db_team_member)
self.sql_session.commit()
print(f"Successfully saved {len(investor_data_list)} investors to database")
except Exception as e:
self.sql_session.rollback()
print(f"Error saving to SQL database: {e}")
raise
async def _save_to_vector_db(self, investor_data_list: List[InvestorData]) -> None:
"""Save investors to vector database"""
if not self.vector_db_client:
return
documents = []
metadatas = []
ids = []
for i, investor_data in enumerate(investor_data_list):
investor = investor_data.investor
sectors = ", ".join([s.name for s in investor_data.sectors])
companies = ", ".join([c.name for c in investor_data.portfolio_companies])
doc_text = f"""
Investor: {investor.name}
Description: {investor.description or "N/A"}
AUM: ${investor.aum:,}
Check Size: ${investor.check_size_lower:,} - ${investor.check_size_upper:,}
Geographic Focus: {investor.geographic_focus}
Stage Focus: {investor.stage_focus.value}
Sectors: {sectors}
Portfolio Companies: {companies}
""".strip()
documents.append(doc_text)
metadatas.append(
{
"name": investor.name,
"stage_focus": investor.stage_focus.value,
"geographic_focus": investor.geographic_focus,
"aum": investor.aum,
}
)
ids.append(
f"investor_{i}_{investor.name.replace(' ', '_').replace('/', '_')}"
)
if documents:
try:
self.collection.add(documents=documents, metadatas=metadatas, ids=ids)
print(
f"Successfully saved {len(documents)} investors to vector database"
)
except Exception as e:
print(f"Error saving to vector database: {e}")
async def process_csv(
self, df: pd.DataFrame, max_concurrent: int = 10
) -> List[InvestorData]:
"""Process CSV data one row at a time and save to databases"""
results = []
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(max_concurrent)
async def process_row_with_semaphore(row_data):
row, row_idx = row_data
async with semaphore:
return await self._process_row(row, row_idx)
# Create row tasks
row_tasks = []
for idx, row in df.iterrows():
row_tasks.append((row, idx))
# Execute all rows concurrently
row_results = await asyncio.gather(
*[process_row_with_semaphore(row_data) for row_data in row_tasks],
return_exceptions=True,
)
# Collect results, filtering out exceptions and None values
for row_result in row_results:
if not isinstance(row_result, Exception) and row_result is not None:
results.append(row_result)
# Save to databases
if results:
print(f"Successfully processed {len(results)} investors")
await self._save_to_sql(results)
await self._save_to_vector_db(results)
return results
+137 -240
View File
@@ -1,88 +1,52 @@
from typing import List, Optional import os
from typing import List
import chromadb from db.db import DATABASE_URL, get_db
from db.models import InvestorTable from db.models import FundTable, InvestorTable
from langchain import hub from langchain import hub
from langchain_community.agent_toolkits import SQLDatabaseToolkit from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent from langgraph.prebuilt import create_react_agent
from py_schemas import InvestorData, InvestorList from schemas.router_schemas import (
from settings import settings CompanyMinimal,
InvestmentResponse,
PaginatedResponse,
SectorMinimal,
)
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
# Connect to SQLite # Connect to SQLite
prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt") prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt")
db = SQLDatabase.from_uri("sqlite:///investors.db") db = SQLDatabase.from_uri(DATABASE_URL)
system_message = (
prompt_template.format(dialect="SQLite", top_k=5)
+ "\n Get answers from the Sql database and the vector database"
)
class QueryProcessor: class QueryProcessor:
def __init__( def __init__(self):
self,
sql_session: Optional[object] = None,
vector_db_client: Optional[object] = None,
):
self.sql_session = sql_session
self.llm = ChatOpenAI( self.llm = ChatOpenAI(
api_key=settings.OPENROUTER_API_KEY, api_key=os.getenv("OPENROUTER_API_KEY"),
base_url="https://openrouter.ai/api/v1", base_url="https://openrouter.ai/api/v1",
model="google/gemini-2.5-flash-lite", model="x-ai/grok-4-fast",
temperature=0.3, temperature=0,
) )
self.toolkit = SQLDatabaseToolkit(db=db, llm=self.llm) self.toolkit = SQLDatabaseToolkit(db=db, llm=self.llm)
# Update system message to specifically request only fund IDs
system_message_updated = (
prompt_template.format(dialect="SQLite", top_k=5)
+ "\n\nIMPORTANT: You must ONLY return the fund IDs (id field from the funds table) that match the user's criteria. "
+ "Do NOT return any other information, explanations, or data. "
+ "Your response should be ONLY a comma-separated list of numbers representing the fund IDs. "
+ "Example format: 1, 5, 12, 23"
)
self.agent = create_react_agent( self.agent = create_react_agent(
model=self.llm, model=self.llm,
tools=self.toolkit.get_tools() + [self.query_vector_database], tools=self.toolkit.get_tools(),
prompt=system_message, prompt=system_message_updated,
)
self.vector_db_client = vector_db_client
self.vector_db_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.vector_db_client.get_or_create_collection(
name="investor_descriptions",
metadata={
"description": "Investor descriptions and investment thesis focus"
},
) )
def query_sql_database(self, query: str) -> Optional[InvestorList]: def process_query(self, question: str) -> PaginatedResponse[InvestmentResponse]:
"""Query the SQL database for investor information.""" """Process a query using the LLM and return investment response data."""
if not self.sql_session: # Let the LLM handle all database interactions and filtering to get fund IDs
return None
# Implement SQL querying logic here
result = self.sql_session.execute(query)
investors = result.scalars().all()
return InvestorList(investors=investors)
def query_vector_database(self, query: str) -> Optional[InvestorList]:
"""Query the vector database for investor information."""
if not self.vector_db_client:
return None
print("VECTOR STORE WAS CALLED")
# Query the collection directly, not passing collection as parameter
results = self.collection.query(
query_texts=[query], # ChromaDB expects a list of query texts
n_results=3, # Specify how many results you want
)
print(results)
# ChromaDB returns results in a different structure
# results will have 'documents', 'metadatas', 'ids', 'distances'
return results
def process_query(self, question: str) -> InvestorList:
"""Process a query using the LLM and return structured investor data."""
# Extract filters from the query first
filters = self._extract_filters_from_query(question)
# Get AI response for additional context
response = self.agent.invoke( response = self.agent.invoke(
{"messages": [("user", question)]}, {"messages": [("user", question)]},
) )
@@ -92,189 +56,122 @@ class QueryProcessor:
response["messages"][-1].content if response.get("messages") else "" response["messages"][-1].content if response.get("messages") else ""
) )
# Try to extract investor IDs or names from the AI response # Extract fund IDs from the AI response
investor_ids = self._extract_investor_info_from_response(ai_response) fund_ids = self._extract_fund_ids_from_response(ai_response)
# Fetch filtered investor data with relationships from database # Fetch full fund data with investor relationships using the IDs
return self._fetch_investors_with_relationships(investor_ids, filters) return self._fetch_funds_by_ids(fund_ids)
def _extract_investor_info_from_response(self, ai_response: str) -> List[int]: def _extract_fund_ids_from_response(self, ai_response: str) -> List[int]:
"""Extract investor IDs from AI response. This is a simple implementation.""" """Extract fund IDs from AI response."""
# This is a basic implementation - you might want to make it more sophisticated
# based on how your AI formats responses
investor_ids = []
# If the AI can't provide structured data, fall back to getting all investors
# that match basic criteria
try:
# Try to extract numbers that might be IDs
import re
ids = re.findall(r"\bid:\s*(\d+)", ai_response.lower())
investor_ids = [int(id_str) for id_str in ids]
except Exception:
pass
return investor_ids if investor_ids else []
def _extract_filters_from_query(self, question: str) -> dict:
"""Extract filter criteria from natural language query."""
question_lower = question.lower()
filters = {}
# Extract stage filters
if any(
stage in question_lower
for stage in [
"seed",
"series a",
"series b",
"series c",
"growth",
"late stage",
]
):
if "seed" in question_lower:
filters["stage"] = "SEED"
elif "series a" in question_lower:
filters["stage"] = "SERIES_A"
elif "series b" in question_lower:
filters["stage"] = "SERIES_B"
elif "series c" in question_lower:
filters["stage"] = "SERIES_C"
elif "growth" in question_lower:
filters["stage"] = "GROWTH"
elif "late stage" in question_lower:
filters["stage"] = "LATE_STAGE"
# Extract geographic filters
if any(
geo in question_lower
for geo in [
"us",
"usa",
"united states",
"europe",
"asia",
"silicon valley",
"bay area",
]
):
if (
"us" in question_lower
or "usa" in question_lower
or "united states" in question_lower
):
filters["geography"] = "US"
elif "europe" in question_lower:
filters["geography"] = "Europe"
elif "asia" in question_lower:
filters["geography"] = "Asia"
elif "silicon valley" in question_lower or "bay area" in question_lower:
filters["geography"] = "Silicon Valley"
# Extract sector filters
sectors = [
"fintech",
"healthcare",
"saas",
"ai",
"biotech",
"consumer",
"enterprise",
"crypto",
"blockchain",
]
for sector in sectors:
if sector in question_lower:
filters["sector"] = sector
break
# Extract check size filters (simple patterns)
import re import re
amounts = re.findall( fund_ids = []
r"\$?(\d+(?:,\d{3})*(?:\.\d+)?)\s*(?:million|m|k|thousand)", question_lower try:
) # Try multiple patterns to extract IDs from the response
if amounts: # Pattern 1: Simple numbers (assuming they are IDs)
amount = amounts[0].replace(",", "") numbers = re.findall(r"\b\d+\b", ai_response)
if "million" in question_lower or "m" in question_lower: fund_ids = [int(num) for num in numbers]
filters["min_check_size"] = int(float(amount) * 1000000)
elif "thousand" in question_lower or "k" in question_lower:
filters["min_check_size"] = int(float(amount) * 1000)
return filters # Pattern 2: If response contains explicit ID references
id_matches = re.findall(r"\bid[:\s]*(\d+)", ai_response.lower())
if id_matches:
fund_ids = [int(id_str) for id_str in id_matches]
def _fetch_investors_with_relationships( except Exception as e:
self, investor_ids: List[int] = None, filters: dict = None print(f"Error extracting IDs from response: {e}")
) -> InvestorList: return []
"""Fetch investors with all their relationships from the database."""
if not self.sql_session:
return InvestorList(investors=[])
# Import here to avoid circular imports return fund_ids
from db.models import SectorTable
# Build query with all relationships loaded def _fetch_funds_by_ids(
query = self.sql_session.query(InvestorTable).options( self, fund_ids: List[int]
selectinload(InvestorTable.portfolio_companies), ) -> PaginatedResponse[InvestmentResponse]:
selectinload(InvestorTable.team_members), """Fetch funds with all their relationships from the database using fund IDs.
selectinload(InvestorTable.sectors), Constructs response similar to read_investors but starting from funds."""
) if not fund_ids:
return PaginatedResponse(
# Apply filters if provided items=[],
if filters: total=0,
if "stage" in filters: page=1,
from db.models import InvestmentStage page_size=len(fund_ids) if fund_ids else 10,
total_pages=0,
stage_enum = getattr(InvestmentStage, filters["stage"])
query = query.filter(InvestorTable.stage_focus == stage_enum)
if "geography" in filters:
query = query.filter(
InvestorTable.geographic_focus.ilike(f"%{filters['geography']}%")
)
if "min_check_size" in filters:
query = query.filter(
InvestorTable.check_size_lower >= filters["min_check_size"]
)
if "max_check_size" in filters:
query = query.filter(
InvestorTable.check_size_upper <= filters["max_check_size"]
)
if "min_aum" in filters:
query = query.filter(InvestorTable.aum >= filters["min_aum"])
if "max_aum" in filters:
query = query.filter(InvestorTable.aum <= filters["max_aum"])
if "sector" in filters:
query = query.join(InvestorTable.sectors).filter(
SectorTable.name.ilike(f"%{filters['sector']}%")
)
# Filter by IDs if provided
if investor_ids:
query = query.filter(InvestorTable.id.in_(investor_ids))
else:
# If no specific IDs and no filters, limit to prevent overwhelming response
if not filters:
query = query.limit(10)
investors = query.all()
# Transform to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
) )
investor_data_list.append(investor_data)
return InvestorList(investors=investor_data_list) # Get database session
db_session = next(get_db())
try:
# Query funds with all necessary relationships loaded
funds = (
db_session.query(FundTable)
.options(
selectinload(FundTable.investor).selectinload(
InvestorTable.portfolio_companies
),
selectinload(FundTable.investor).selectinload(
InvestorTable.team_members
),
selectinload(FundTable.investor).selectinload(
InvestorTable.sectors
),
selectinload(FundTable.investment_stages),
selectinload(FundTable.sectors),
)
.filter(FundTable.id.in_(fund_ids))
.all()
)
# Transform to InvestmentResponse format (one row per fund)
investment_responses = []
for fund in funds:
investor = fund.investor
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
for company in investor.portfolio_companies[:3]
]
# Get stage focus as comma-separated string
stage_focus = (
", ".join([stage.name for stage in fund.investment_stages])
if fund.investment_stages
else None
)
# Get top 3 sectors from fund (id and name only)
fund_sectors = [
SectorMinimal(id=sector.id, name=sector.name)
for sector in (fund.sectors[:3] if fund.sectors else [])
]
investment_response = InvestmentResponse(
id=investor.id,
name=f"{investor.name} - {fund.fund_name}"
if fund.fund_name
else investor.name,
aum=investor.aum,
check_size_lower=fund.check_size_lower,
check_size_upper=fund.check_size_upper,
geographic_focus=fund.geographic_focus,
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=1.0,
)
investment_responses.append(investment_response)
total_count = len(investment_responses)
total_pages = 1 if total_count > 0 else 0
return PaginatedResponse(
items=investment_responses,
total=total_count,
page=1,
page_size=total_count,
total_pages=total_pages,
)
finally:
db_session.close()
View File
-11
View File
@@ -1,11 +0,0 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
OPENROUTER_API_KEY: str
class Config:
env_file = ".env"
settings = Settings()
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
Binary file not shown.
+315
View File
@@ -0,0 +1,315 @@
import logging
import re
import unicodedata
import pandas as pd
from models import CompanyTable, InvestorTable, SectorTable, engine, init_database
from sqlalchemy.orm import sessionmaker
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Import the schema
init_database()
# ===================== Ingesting Original Data =====================#
def parse_investor_names(investor_names_str):
"""Parse comma-separated investor names and return a list"""
if pd.isna(investor_names_str) or investor_names_str == "":
return []
# Split by comma and clean whitespace
# investors = [name.strip() for name in str(investor_names_str).split(",")]
investors = [
clean_name(name.strip()) for name in str(investor_names_str).split(",")
]
return [investor for investor in investors if investor]
def parse_industries(industries_str):
"""Parse comma-separated industries and return a list"""
if pd.isna(industries_str) or industries_str == "":
return []
# Split by comma and clean whitespace
industries = [industry.strip() for industry in str(industries_str).split(",")]
return [industry for industry in industries if industry]
def clean_special_characters(text):
"""Clean special characters from text, converting to ASCII equivalents"""
if not text:
return text
# First remove ellipses and other problematic patterns
text = str(text).replace("...", "").replace("..", "")
# Normalize unicode characters to their closest ASCII equivalents
normalized = unicodedata.normalize("NFKD", text)
# Remove accents and convert to ASCII
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
# Remove any remaining non-alphanumeric characters except spaces, hyphens, and periods
cleaned = re.sub(r"[^a-zA-Z0-9\s\-\.]", "", ascii_text)
# Clean up multiple spaces
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
def clean_string(value):
"""Clean string values, converting empty/null/nan/0 to None and removing special characters"""
if (
pd.isna(value)
or value == ""
or str(value).lower() in ["nan", "null", "none", "0", "0.0"]
):
return None
# First clean special characters
cleaned = clean_special_characters(str(value).strip())
# Check if result is just "0" after cleaning
if cleaned in ["0", "0.0", "null", "nan", "none"]:
return None
return cleaned if cleaned else None
def clean_name(value):
"""Clean names (companies, investors) with special character handling"""
if (
pd.isna(value)
or value == ""
or str(value).lower() in ["nan", "null", "none", "0", "0.0"]
):
return None
# Clean special characters but be more permissive for names
text = str(value).strip()
# First remove ellipses and other problematic patterns
# text = text.replace("...", "").replace("..", "")
# Normalize unicode characters
normalized = unicodedata.normalize("NFKD", text)
# Convert to ASCII but keep more characters for business names
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
# Allow alphanumeric, spaces, hyphens, periods, parentheses, and ampersands
cleaned = re.sub(r"[^a-zA-Z0-9\s\-\.\(\)&]", "", ascii_text)
# Clean up multiple spaces
cleaned = re.sub(r"\s+", " ", cleaned).strip()
# Remove any trailing or leading periods
cleaned = cleaned.strip(".")
cleaned = cleaned.replace("..", "").replace("...", "")
# Check if result is just "0" after cleaning
if cleaned in ["0", "0.0", "null", "nan", "none"]:
return None
return cleaned if cleaned else None
def clean_integer(value):
"""Clean integer values, converting empty/null/nan/0 to None"""
if pd.isna(value) or str(value).lower() in ["nan", "null", "none", "", "0", "0.0"]:
return None
try:
cleaned_val = int(float(value))
return cleaned_val if cleaned_val > 0 else None
except (ValueError, TypeError):
return None
def parse_website(website_str: str):
try:
_, end = website_str.split(":")
if end == "0":
return None
return "https:" + end
except Exception:
return None
def ingest_data():
# Create database engine and session
Session = sessionmaker(bind=engine)
session = Session()
# Load CSV files
print("Loading CSV files...")
companies_df = pd.read_csv("companies.csv")
investors_df = pd.read_csv("investors.csv")
print(f"📊 Companies CSV: {len(companies_df)} rows")
print(f"📊 Investors CSV: {len(investors_df)} rows")
# Step 1: Ingest Investors
print("\n🔄 Step 1: Ingesting Investors...")
investors_processed = 0
for index, row in investors_df.iterrows():
try:
investor_name = clean_name(row.get("Filtered investor names", ""))
if investor_name:
# Check if investor already exists
existing_investor = (
session.query(InvestorTable).filter_by(name=investor_name).first()
)
if not existing_investor:
investor = InvestorTable(
name=investor_name,
description=clean_string(row.get("Business model", "")),
headquarters=clean_string(row.get("HQ", "")),
website=parse_website(str(row.get("Website", "")).strip()),
number_of_investments=clean_integer(
row.get("Number of investments")
),
)
session.add(investor)
investors_processed += 1
if investors_processed % 1000 == 0:
session.commit()
print(f" Committed {investors_processed} investors")
except Exception as e:
logger.error(f"Error processing investor {index}: {e}")
continue
session.commit()
print(f"✅ Investors completed: {investors_processed} processed")
# Step 2: Ingest Companies and Rounds
print("\n🔄 Step 2: Ingesting Companies and Sectors...")
companies_processed = 0
sectors_created = set()
for index, row in companies_df.iterrows():
try:
# Process company
company_name = clean_name(row.get("Organization Name", ""))
if not company_name:
continue
# Check if company already exists
existing_company = (
session.query(CompanyTable).filter_by(name=company_name).first()
)
if existing_company:
company = existing_company
else:
# Create company
company = CompanyTable(
name=company_name,
description=clean_string(row.get("Organization Description", "")),
location=clean_string(row.get("Organization Location", "")),
industry=clean_string(row.get("Organization Industries", "")),
website=clean_string(row.get("Organization Website", "")),
)
session.add(company)
session.flush() # Get the company ID
companies_processed += 1
# Process investor relationships
investor_names_str = row.get("Investor Names", "")
if pd.notna(investor_names_str) and investor_names_str:
investor_names = parse_investor_names(investor_names_str)
for investor_name in investor_names:
# Find investor in database
investor = (
session.query(InvestorTable)
.filter_by(name=investor_name.strip())
.first()
)
if investor:
# Add investor-company relationship
if company not in investor.portfolio_companies:
investor.portfolio_companies.append(company)
else:
print("This company has an investor not in DB:", investor_name)
# Process sectors/industries
industries_str = row.get("Organization Industries", "")
if pd.notna(industries_str) and industries_str:
industries = parse_industries(industries_str)
for industry_name in industries:
industry_name = industry_name.strip()
if industry_name:
# Check if sector exists
sector = (
session.query(SectorTable)
.filter_by(name=industry_name)
.first()
)
if not sector:
sector = SectorTable(name=industry_name)
session.add(sector)
session.flush()
sectors_created.add(industry_name)
# Add company-sector relationship
if sector not in company.sectors:
company.sectors.append(sector)
# Commit every 100 companies
if companies_processed % 100 == 0 and companies_processed > 0:
session.commit()
print(f" Processed {companies_processed} companies...")
except Exception as e:
logger.error(f"Error processing company {index}: {e}")
session.rollback()
continue
# Step 3: Link investors to sectors based on portfolio companies
print("\n🔄 Step 3: Linking Investors to Sectors...")
investors_linked_to_sectors = 0
all_investors = session.query(InvestorTable).all()
for investor in all_investors:
sectors = set()
for company in investor.portfolio_companies:
for sector in company.sectors:
sectors.add(sector)
# Add sectors to investor if not already present
for sector in sectors:
if sector not in investor.sectors:
investor.sectors.append(sector)
if sectors:
investors_linked_to_sectors += 1
session.commit()
print(f"✅ Linked {investors_linked_to_sectors} investors to sectors")
# Final commit
session.commit()
# Final counts
final_investors = session.query(InvestorTable).count()
final_companies = session.query(CompanyTable).count()
final_sectors = session.query(SectorTable).count()
print("\n🎉 Ingestion Complete!")
print(f" Investors: {final_investors}")
print(f" Companies: {final_companies}")
print(f" Sectors: {final_sectors}")
session.close()
if __name__ == "__main__":
ingest_data()
# print(clean_name("A... Energi"))
# print(clean_name("B.. Tech"))
# print(clean_name("A... Energi"))
+381
View File
@@ -0,0 +1,381 @@
import enum
from typing import Annotated
from fastapi import Depends
from sqlalchemy import (
Column,
DateTime,
ForeignKey,
Integer,
String,
Table,
Text,
create_engine,
func,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, declarative_mixin, relationship, sessionmaker
from sqlalchemy.types import JSON, Enum
Base = declarative_base()
# Database configuration
# DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./investors.db")
# Create engine
engine = create_engine("sqlite:///./investors.db", echo=False)
# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
def init_database():
"""Initialize the database by creating all tables"""
Base.metadata.create_all(bind=engine)
def get_session_sync() -> Session:
"""Get a database session for synchronous operations"""
return SessionLocal()
def get_db_session():
"""Get a database session for direct use."""
return SessionLocal()
@declarative_mixin
class TimestampMixin:
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class InvestmentStage(enum.Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
# Association table for many-to-many relationship between investors and companies
investor_company_association = Table(
"investor_companies",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("company_id", Integer, ForeignKey("companies.id")),
)
# Association table for investor-sector many-to-many
investor_sector_association = Table(
"investor_sectors",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
company_sector_association = Table(
"company_sector",
Base.metadata,
Column("company_id", Integer, ForeignKey("companies.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_sector_association = Table(
"project_sector",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_investor_association = Table(
"project_investors",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("investor_id", Integer, ForeignKey("investors.id")),
)
project_company_association = Table(
"project_companies",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("company_id", Integer, ForeignKey("companies.id")),
)
# Association table for investor-stage many-to-many
investor_stage_association = Table(
"investor_stages",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("stage_id", Integer, ForeignKey("investment_stages.id")),
)
# Association table for fund-stage many-to-many
fund_investment_stages_association = Table(
"fund_investment_stages",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("stage_id", Integer, ForeignKey("investment_stages.id")),
)
# Association table for fund-sector many-to-many
fund_sectors_association = Table(
"fund_sectors",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
class InvestorTable(Base, TimestampMixin):
__tablename__ = "investors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
# Basic investor info
website = Column(String, nullable=True)
headquarters = Column(String, nullable=True)
# AUM fields
aum = Column(Integer, nullable=True) # Store as integer for numerical filtering
aum_as_of_date = Column(String, nullable=True)
aum_source_url = Column(String, nullable=True)
# Check size (deprecated in favor of fund-level data, but keeping for backward compatibility)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
# Geographic focus (deprecated in favor of fund-level, but keeping for backward compatibility)
geographic_focus = Column(String, nullable=True)
# Investment thesis and portfolio
investment_thesis = Column(JSON, nullable=True) # Array of thesis statements
portfolio_highlights = Column(
JSON, nullable=True
) # Array of portfolio company names
linked_documents = Column(JSON, nullable=True) # Array of document URLs
# Research metadata
researcher_notes = Column(Text, nullable=True)
missing_important_fields = Column(
JSON, nullable=True
) # Array of missing field names
sources = Column(JSON, nullable=True) # JSON object with source URLs
# Portfolio info
number_of_investments = Column(Integer, nullable=True)
# Relationships
team_members = relationship(
"InvestorMember", back_populates="investor", cascade="all, delete-orphan"
)
funds = relationship(
"FundTable", back_populates="investor", cascade="all, delete-orphan"
)
# Many-to-many relationship with investment stages
investment_stages = relationship(
"InvestmentStageTable",
secondary=investor_stage_association,
back_populates="investors",
)
# Relationship to portfolio companies
portfolio_companies = relationship(
"CompanyTable",
secondary=investor_company_association,
back_populates="investors",
)
sectors = relationship(
"SectorTable",
secondary=investor_sector_association,
back_populates="investors",
)
projects = relationship(
"ProjectTable",
secondary=project_investor_association,
back_populates="investors",
)
class InvestorMember(Base, TimestampMixin):
__tablename__ = "investor_members"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
role = Column(String, nullable=True)
title = Column(String, nullable=True) # Alternative to role
email = Column(String, nullable=True)
source_url = Column(String, nullable=True) # URL where member info was found
investor_id = Column(Integer, ForeignKey("investors.id"))
investor = relationship("InvestorTable", back_populates="team_members")
class FundTable(Base, TimestampMixin):
__tablename__ = "funds"
id = Column(Integer, primary_key=True, index=True)
investor_id = Column(Integer, ForeignKey("investors.id"), nullable=False)
# Fund details
fund_name = Column(String, nullable=True)
fund_size = Column(
Integer, nullable=True
) # Store as integer for numerical filtering
fund_size_source_url = Column(String, nullable=True)
# Check size range (parsed from estimated_investment_size by LLM)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
source_url = Column(String, nullable=True)
source_provider = Column(String, nullable=True) # e.g., "Perplexity"
# Geographic focus as simple string
geographic_focus = Column(String, nullable=True)
# Relationships
investor = relationship("InvestorTable", back_populates="funds")
investment_stages = relationship(
"InvestmentStageTable",
secondary=fund_investment_stages_association,
back_populates="funds",
)
sectors = relationship(
"SectorTable",
secondary=fund_sectors_association,
back_populates="funds",
)
class InvestmentStageTable(Base, TimestampMixin):
__tablename__ = "investment_stages"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False, unique=True)
# Relationships
investors = relationship(
"InvestorTable",
secondary=investor_stage_association,
back_populates="investment_stages",
)
funds = relationship(
"FundTable",
secondary=fund_investment_stages_association,
back_populates="investment_stages",
)
class CompanyTable(Base, TimestampMixin):
__tablename__ = "companies"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
industry = Column(String, nullable=True)
location = Column(String, nullable=True)
description = Column(String, nullable=True)
founded_year = Column(Integer, nullable=True)
website = Column(String, nullable=True)
members = relationship(
"CompanyMember", back_populates="company", cascade="all, delete-orphan"
)
# Relationship back to investors
investors = relationship(
"InvestorTable",
secondary=investor_company_association,
back_populates="portfolio_companies",
)
sectors = relationship(
"SectorTable", secondary=company_sector_association, back_populates="companies"
)
projects = relationship(
"ProjectTable",
secondary=project_company_association,
back_populates="companies",
)
class CompanyMember(Base, TimestampMixin):
__tablename__ = "company_members"
id = Column(Integer, primary_key=True)
name = Column(String)
linkedin = Column(String, nullable=True)
role = Column(String, nullable=True)
company_id = Column(Integer, ForeignKey("companies.id"), nullable=False)
company = relationship("CompanyTable", back_populates="members")
class SectorTable(Base, TimestampMixin):
__tablename__ = "sectors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
# Relationships
investors = relationship(
"InvestorTable",
secondary=investor_sector_association,
back_populates="sectors",
)
companies = relationship(
"CompanyTable", secondary=company_sector_association, back_populates="sectors"
)
projects = relationship(
"ProjectTable", secondary=project_sector_association, back_populates="sector"
)
funds = relationship(
"FundTable",
secondary=fund_sectors_association,
back_populates="sectors",
)
class ProjectTable(Base, TimestampMixin):
__tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
valuation = Column(Integer, nullable=True)
stage = Column(Enum(InvestmentStage), nullable=True)
location = Column(String, nullable=True)
description = Column(Text, nullable=True)
start_date = Column(DateTime, nullable=True)
end_date = Column(DateTime, nullable=True)
sector = relationship(
"SectorTable", secondary=project_sector_association, back_populates="projects"
)
investors = relationship(
"InvestorTable",
secondary=project_investor_association,
back_populates="projects",
)
companies = relationship(
"CompanyTable", secondary=project_company_association, back_populates="projects"
)
BIN
View File
Binary file not shown.