feat: Implement database ingestion for investors and companies

- Added main ingestion logic in main.py to process CSV files for investors and companies.
- Implemented data cleaning functions for names, strings, integers, and websites.
- Established relationships between investors, companies, and sectors using SQLAlchemy ORM.
- Created models for investors, companies, sectors, and their relationships in models.py.
- Set up logging for error tracking during data processing.
- Initialized database and created necessary tables.
This commit is contained in:
bolade
2025-10-07 20:01:19 +01:00
parent a9589e54f3
commit 84e3c7b72a
32 changed files with 4 additions and 33994 deletions
-604
View File
@@ -1,604 +0,0 @@
# Fund Relationship Schema Update
## Summary of Changes
### Database Schema Changes
**FundTable Updated:**
1. `geographic_focus`: Changed from `JSON` array to `STRING` (comma-separated values)
2. `investment_stage_focus`: **REMOVED** - replaced with many-to-many relationship
3. `sector_focus`: **REMOVED** - replaced with many-to-many relationship
**New Tables:**
1. `investment_stages` - Stores investment stage names (replaces enum)
2. `fund_investment_stages` - Association table for fund ↔ stage many-to-many
3. `fund_sectors` - Association table for fund ↔ sector many-to-many
### Why These Changes?
#### 1. Geographic Focus: JSON → String
- **Before**: `["Europe", "North America", "Asia"]`
- **After**: `"Europe, North America, Asia"`
- **Reason**: Simpler to display, easier to search with `LIKE` queries
#### 2. Investment Stages: JSON → Many-to-Many Relationship
- **Before**: JSON array stored in fund table
- **After**: Proper many-to-many relationship via association table
- **Benefits**:
- Can filter funds by specific stages efficiently
- Can join stages across multiple funds
- Centralized stage management
- Better data normalization
#### 3. Sectors: JSON → Many-to-Many Relationship
- **Before**: JSON array stored in fund table
- **After**: Proper many-to-many relationship with existing `SectorTable`
- **Benefits**:
- Reuses existing sector data
- Can filter/aggregate by sector across funds
- Maintains referential integrity
- Consistent with investor-sector relationship pattern
## Migration Details
### Successfully Executed
**411 fund records** migrated
**377 stage relationships** created from old JSON data
**1,445 sector relationships** created from old JSON data
**11 investment stages** seeded: Seed, Pre-Seed, Series A, Series B, Series C, Series D+, Growth, Late Stage, IPO, Venture, Early Stage
### Data Transformation Examples
**Geographic Focus:**
```python
# Before
fund.geographic_focus = ["Europe", "North America"] # JSON
# After
fund.geographic_focus = "Europe, North America" # String
```
**Investment Stages:**
```python
# Before
fund.investment_stage_focus = ["Seed", "Series A"] # JSON
# After
fund.investment_stages = [
InvestmentStageTable(id=1, name="Seed"),
InvestmentStageTable(id=3, name="Series A")
] # Relationship
```
**Sectors:**
```python
# Before
fund.sector_focus = ["Fintech", "Healthcare"] # JSON
# After
fund.sectors = [
SectorTable(id=5, name="Fintech"),
SectorTable(id=12, name="Healthcare")
] # Relationship
```
## Database Schema
### Investment Stages Table
```sql
CREATE TABLE investment_stages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR NOT NULL UNIQUE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME
);
```
### Fund Investment Stages Association
```sql
CREATE TABLE fund_investment_stages (
fund_id INTEGER NOT NULL,
stage_id INTEGER NOT NULL,
PRIMARY KEY (fund_id, stage_id),
FOREIGN KEY (fund_id) REFERENCES funds (id) ON DELETE CASCADE,
FOREIGN KEY (stage_id) REFERENCES investment_stages (id) ON DELETE CASCADE
);
```
### Fund Sectors Association
```sql
CREATE TABLE fund_sectors (
fund_id INTEGER NOT NULL,
sector_id INTEGER NOT NULL,
PRIMARY KEY (fund_id, sector_id),
FOREIGN KEY (fund_id) REFERENCES funds (id) ON DELETE CASCADE,
FOREIGN KEY (sector_id) REFERENCES sectors (id) ON DELETE CASCADE
);
```
### Updated Funds Table
```sql
CREATE TABLE funds (
id INTEGER PRIMARY KEY AUTOINCREMENT,
investor_id INTEGER NOT NULL,
fund_name VARCHAR,
fund_size INTEGER,
fund_size_source_url VARCHAR,
check_size_lower INTEGER,
check_size_upper INTEGER,
source_url VARCHAR,
source_provider VARCHAR,
geographic_focus VARCHAR, -- Changed from JSON to VARCHAR
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME,
FOREIGN KEY (investor_id) REFERENCES investors (id)
);
```
## Code Changes
### 1. Models (Both app/db/models.py and preprocessor/models.py)
**Added Association Tables:**
```python
# Association table for fund-stage many-to-many
fund_investment_stages_association = Table(
"fund_investment_stages",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("stage_id", Integer, ForeignKey("investment_stages.id")),
)
# Association table for fund-sector many-to-many
fund_sectors_association = Table(
"fund_sectors",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
```
**Updated FundTable:**
```python
class FundTable(Base, TimestampMixin):
__tablename__ = "funds"
id = Column(Integer, primary_key=True, index=True)
investor_id = Column(Integer, ForeignKey("investors.id"), nullable=False)
# Fund details
fund_name = Column(String, nullable=True)
fund_size = Column(Integer, nullable=True)
fund_size_source_url = Column(String, nullable=True)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
source_url = Column(String, nullable=True)
source_provider = Column(String, nullable=True)
# Geographic focus as simple string
geographic_focus = Column(String, nullable=True)
# Relationships
investor = relationship("InvestorTable", back_populates="funds")
investment_stages = relationship(
"InvestmentStageTable",
secondary=fund_investment_stages_association,
back_populates="funds",
)
sectors = relationship(
"SectorTable",
secondary=fund_sectors_association,
back_populates="funds",
)
```
**New InvestmentStageTable:**
```python
class InvestmentStageTable(Base, TimestampMixin):
__tablename__ = "investment_stages"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False, unique=True)
# Relationships
funds = relationship(
"FundTable",
secondary=fund_investment_stages_association,
back_populates="investment_stages",
)
```
**Updated SectorTable:**
```python
class SectorTable(Base, TimestampMixin):
__tablename__ = "sectors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
# Relationships
investors = relationship(...)
companies = relationship(...)
projects = relationship(...)
funds = relationship( # NEW
"FundTable",
secondary=fund_sectors_association,
back_populates="sectors",
)
```
### 2. Router Schemas (app/schemas/router_schemas.py)
**New InvestmentStageSchema:**
```python
class InvestmentStageSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
```
**Updated FundSchema:**
```python
class FundSchema(BaseModel):
id: int
fund_name: str | None
fund_size: int | None
fund_size_source_url: str | None
check_size_lower: int | None
check_size_upper: int | None
source_url: str | None
source_provider: str | None
geographic_focus: str | None # Changed from List[str]
investment_stages: List[InvestmentStageSchema] | None # Changed from List[str]
sectors: List[SectorSchema] | None # Changed from List[str]
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
```
**Updated InvestorFundData:**
```python
class InvestorFundData(BaseModel):
# ... investor fields ...
# Fund fields
fund_id: int | None
fund_name: str | None
fund_size: int | None
fund_size_source_url: str | None
check_size_lower: int | None
check_size_upper: int | None
geographic_focus: str | None # Changed from List[str]
fund_investment_stages: List[InvestmentStageSchema] | None # NEW name
fund_sectors: List[SectorSchema] | None # NEW name
# ... related data ...
```
### 3. LLM Parser (app/services/llm_parser.py)
**Updated Fund Processing:**
```python
# Process funds
funds = profile.get("funds", [])
for fund in funds:
if isinstance(fund, dict):
fund_data = {
"fund_name": fund.get("fundName"),
"fund_size": None,
"fund_size_source_url": fund.get("fundSizeSourceUrl"),
"check_size_lower": None,
"check_size_upper": None,
"source_url": fund.get("sourceUrl"),
"source_provider": fund.get("sourceProvider"),
"geographic_focus": None, # Will be converted to string
"investment_stage_names": fund.get("investmentStageFocus", []),
"sector_names": fund.get("sectorFocus", []),
}
# Convert geographic focus from array to comma-separated string
geo_focus = fund.get("geographicFocus", [])
if geo_focus and isinstance(geo_focus, list):
fund_data["geographic_focus"] = ", ".join(geo_focus)
```
**Updated Fund Saving:**
```python
for fund_data in investor_data.get("funds", []):
fund = FundTable(
investor_id=investor.id,
fund_name=fund_data.get("fund_name"),
fund_size=fund_data.get("fund_size"),
fund_size_source_url=fund_data.get("fund_size_source_url"),
check_size_lower=fund_data.get("check_size_lower"),
check_size_upper=fund_data.get("check_size_upper"),
source_url=fund_data.get("source_url"),
source_provider=fund_data.get("source_provider"),
geographic_focus=fund_data.get("geographic_focus"), # String
)
db.add(fund)
db.flush() # Get the fund ID
# Add investment stages (many-to-many)
for stage_name in fund_data.get("investment_stage_names", []):
stage = self._get_or_create_investment_stage(db, stage_name)
fund.investment_stages.append(stage)
# Add sectors (many-to-many)
for sector_name in fund_data.get("sector_names", []):
sector = self._get_or_create_sector(db, sector_name)
fund.sectors.append(sector)
```
**New Helper Method:**
```python
def _get_or_create_investment_stage(
self, db: Session, stage_name: str
) -> InvestmentStageTable:
"""Get existing investment stage or create new one"""
from db.models import InvestmentStageTable
stage = (
db.query(InvestmentStageTable)
.filter(InvestmentStageTable.name == stage_name)
.first()
)
if not stage:
stage = InvestmentStageTable(name=stage_name)
db.add(stage)
db.flush()
return stage
```
### 4. Router (app/routers/investors.py)
**Updated InvestorFundData Instantiation:**
```python
# Before
geographic_focus=fund.geographic_focus, # Was List[str]
investment_stage_focus=fund.investment_stage_focus, # Was List[str]
sector_focus=fund.sector_focus, # Was List[str]
# After
geographic_focus=fund.geographic_focus, # Now str
fund_investment_stages=fund.investment_stages, # Now relationship
fund_sectors=fund.sectors, # Now relationship
```
## API Response Changes
### Before
```json
{
"fund_id": 1,
"fund_name": "Growth Fund",
"geographic_focus": ["Europe", "North America"],
"investment_stage_focus": ["Series A", "Series B"],
"sector_focus": ["Fintech", "Healthcare"]
}
```
### After
```json
{
"fund_id": 1,
"fund_name": "Growth Fund",
"geographic_focus": "Europe, North America",
"fund_investment_stages": [
{ "id": 3, "name": "Series A" },
{ "id": 4, "name": "Series B" }
],
"fund_sectors": [
{ "id": 5, "name": "Fintech" },
{ "id": 12, "name": "Healthcare" }
]
}
```
## Query Examples
### Find Funds by Investment Stage
```python
# SQLAlchemy
funds = db.query(FundTable).join(
FundTable.investment_stages
).filter(
InvestmentStageTable.name == "Series A"
).all()
# SQL
SELECT f.* FROM funds f
JOIN fund_investment_stages fis ON f.id = fis.fund_id
JOIN investment_stages s ON fis.stage_id = s.id
WHERE s.name = 'Series A';
```
### Find Funds by Sector
```python
# SQLAlchemy
funds = db.query(FundTable).join(
FundTable.sectors
).filter(
SectorTable.name == "Fintech"
).all()
# SQL
SELECT f.* FROM funds f
JOIN fund_sectors fs ON f.id = fs.fund_id
JOIN sectors s ON fs.sector_id = s.id
WHERE s.name = 'Fintech';
```
### Find Funds by Geographic Focus
```python
# SQLAlchemy
funds = db.query(FundTable).filter(
FundTable.geographic_focus.ilike("%Europe%")
).all()
# SQL
SELECT * FROM funds
WHERE geographic_focus LIKE '%Europe%';
```
### Complex Query: Funds Investing in Fintech at Series A in Europe
```python
funds = db.query(FundTable).join(
FundTable.investment_stages
).join(
FundTable.sectors
).filter(
InvestmentStageTable.name == "Series A",
SectorTable.name == "Fintech",
FundTable.geographic_focus.ilike("%Europe%")
).all()
```
## Benefits
### 1. Better Data Normalization ✨
- Investment stages and sectors are now properly normalized
- No duplicate data stored in JSON arrays
- Single source of truth for stage/sector names
### 2. Efficient Filtering 🔍
- Can filter funds by stages/sectors using SQL JOINs
- No need to parse JSON for queries
- Database indexes can be used effectively
### 3. Data Integrity 🛡️
- Foreign key constraints ensure referential integrity
- Can't reference non-existent stages or sectors
- Cascade deletes work properly
### 4. Easier Aggregations 📊
```sql
-- Count funds per investment stage
SELECT s.name, COUNT(DISTINCT f.id) as fund_count
FROM investment_stages s
LEFT JOIN fund_investment_stages fis ON s.id = fis.stage_id
LEFT JOIN funds f ON fis.fund_id = f.id
GROUP BY s.name;
-- Count funds per sector
SELECT s.name, COUNT(DISTINCT f.id) as fund_count
FROM sectors s
LEFT JOIN fund_sectors fs ON s.id = fs.sector_id
LEFT JOIN funds f ON fs.fund_id = f.id
GROUP BY s.name;
```
### 5. Consistent Pattern 🎯
- Follows same many-to-many pattern as:
- Investors ↔ Sectors
- Companies ↔ Sectors
- Projects ↔ Sectors
- Makes codebase more maintainable
## Frontend Updates Required
### Geographic Focus
```typescript
// OLD
const geoList = fund.geographic_focus.join(", ");
// NEW
const geoStr = fund.geographic_focus; // Already a string
```
### Investment Stages
```typescript
// OLD
const stages = fund.investment_stage_focus; // string[]
// NEW
const stages = fund.fund_investment_stages.map((s) => s.name); // InvestmentStageSchema[]
```
### Sectors
```typescript
// OLD
const sectors = fund.sector_focus; // string[]
// NEW
const sectors = fund.fund_sectors.map((s) => s.name); // SectorSchema[]
```
## Files Modified
1.`preprocessor/models.py` - Updated FundTable, added association tables
2.`app/db/models.py` - Updated FundTable, added InvestmentStageTable
3.`app/schemas/router_schemas.py` - Updated FundSchema, InvestorFundData
4.`app/services/llm_parser.py` - Updated fund processing logic
5.`app/routers/investors.py` - Updated response formatting
6.`preprocessor/migrate_fund_relationships.py` - Migration script (NEW)
## Migration Status
**Database migrated**: 411 fund records updated
**377 stage relationships** created from old JSON data
**1,445 sector relationships** created from old JSON data
**11 investment stages** seeded
**All code updated**: Models, schemas, parsers, routers
**No errors**: All files compile successfully
## Next Steps
1. **Test the API** with new response structure
2. **Update frontend** to use new field formats
3. **Re-parse CSV** (optional) to ensure all new data uses the correct structure
4. **Update filtering UI** to leverage the new relationships
## Summary
The fund schema has been successfully refactored to:
- Store `geographic_focus` as a simple string for easier display
- Use proper many-to-many relationships for `investment_stages`
- Use proper many-to-many relationships with existing `sectors` table
- Enable efficient filtering and aggregation by stage/sector
- Maintain better data normalization and integrity
This enables powerful queries like "Show me all Fintech funds investing at Series A in Europe" with simple SQL JOINs! 🎉
Binary file not shown.
Binary file not shown.
Binary file not shown.
+3 -3
View File
@@ -12,9 +12,9 @@ Base = declarative_base()
# Database configuration # Database configuration
# Use the preprocessor's database for consistency # Use the preprocessor's database for consistency
# Get absolute path to the preprocessor database # Get absolute path to the preprocessor database
APP_DIR = Path(__file__).parent.parent # APP_DIR = Path(__file__).parent.parent
PREPROCESSOR_DB = APP_DIR.parent / "preprocessor" / "version_two.db" # PREPROCESSOR_DB = APP_DIR.parent / "preprocessor" / "version_two.db"
DATABASE_URL = os.getenv("DATABASE_URL", f"sqlite:///{PREPROCESSOR_DB}") DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./version_two.db")
# Create engine # Create engine
engine = create_engine(DATABASE_URL, echo=False) engine = create_engine(DATABASE_URL, echo=False)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Can't render this file because it is too large.
Can't render this file because it is too large.
Binary file not shown.
@@ -23,7 +23,7 @@ Base = declarative_base()
# DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./investors.db") # DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./investors.db")
# Create engine # Create engine
engine = create_engine("sqlite:///./version_two.db", echo=False) engine = create_engine("sqlite:///./investors.db", echo=False)
# Create session factory # Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
-255
View File
@@ -1,255 +0,0 @@
# Database Schema Update - Enriched Investor Data & Funds
## Overview
Updated the database schema to support enriched investor data with multiple funds per investor.
## Key Changes
### 1. **InvestorTable - New Fields**
#### Basic Info
- `headquarters` - Investor headquarters location
- `website` - Investor website URL (moved from nullable)
#### AUM (Assets Under Management)
- `aum` - Changed from Integer to String to preserve currency (e.g., "EUR 850,000,000")
- `aum_as_of_date` - Date when AUM was measured
- `aum_source_url` - Source URL for AUM information
#### Investment Information
- `investment_thesis` - JSON array of thesis statements
- `portfolio_highlights` - JSON array of notable portfolio companies
- `linked_documents` - JSON array of document URLs
#### Research Metadata
- `researcher_notes` - Free-text notes from research
- `missing_important_fields` - JSON array of field names that are missing
- `sources` - JSON object mapping field names to source URLs
#### Deprecated Fields (kept for backward compatibility)
- `check_size_lower/upper` - Now handled at fund level
- `geographic_focus` - Now handled at fund level
- `stage_focus` - Now handled at fund level
### 2. **FundTable - NEW TABLE**
Represents individual funds managed by an investor. One investor can have multiple funds.
**Fields:**
- `id` - Primary key
- `investor_id` - Foreign key to InvestorTable
- `fund_name` - Name of the fund
- `fund_size` - Size of fund (string to preserve currency)
- `fund_size_source_url` - Source URL for fund size
- `estimated_investment_size` - Typical investment range (e.g., "EUR 1,000 to 2,000")
- `source_url` - Source URL for fund information
- `source_provider` - Provider of information (e.g., "Perplexity")
- `geographic_focus` - JSON array of regions/countries
- `investment_stage_focus` - JSON array of investment stages
- `sector_focus` - JSON array of sectors
**Relationship:**
- Many-to-One with InvestorTable
- Cascade delete (deleting investor deletes all funds)
### 3. **InvestorMember - Enhanced**
Added fields for senior leadership data:
- `title` - Alternative to role field
- `source_url` - URL where member info was found
## Data Model
```
InvestorTable (1) -----> (Many) FundTable
|
|-----> (Many) InvestorMember
|-----> (Many) CompanyTable (portfolio_companies)
|-----> (Many) SectorTable
|-----> (Many) InvestmentStageTable
```
## Frontend Strategy
### Flattened Response
The frontend will receive a **flattened** view where each fund appears as a separate investor entry:
```
Investor A + Fund 1 → Row 1
Investor A + Fund 2 → Row 2
Investor A + Fund 3 → Row 3
Investor B + Fund 1 → Row 4
```
### Benefits:
1. ✅ No frontend schema changes needed
2. ✅ Each row represents a distinct investment opportunity
3. ✅ Filtering and querying work naturally
4. ✅ Compatibility scoring can be done per fund
5. ✅ Backend maintains proper normalization
## Files Modified
### Preprocessor
- `preprocessor/models.py` - Updated schema with all new fields and FundTable
- `preprocessor/enrich_investors.py` - **NEW** Script to ingest enriched data
### App
- `app/db/models.py` - Updated schema to match preprocessor
## Usage
### 1. Run Initial Data Ingestion (if not done)
```bash
cd preprocessor
python main.py
```
### 2. Run Enrichment
```bash
cd preprocessor
python enrich_investors.py enriched_investors.csv investor_name enriched_data
```
**CSV Format:**
| investor_name | enriched_data |
|---------------|---------------|
| Anaxago | {"funds": [...], "headquarters": "...", ...} |
| VC Firm B | {...} |
### 3. Reinitialize Database (if needed)
```bash
# Backup first!
cp version_two.db version_two.db.backup
# Delete and reinitialize
rm version_two.db
python main.py # Run initial ingestion
python enrich_investors.py enriched_investors.csv # Run enrichment
```
## Enrichment Script Features
**Upsert Logic** - Creates new investors or updates existing ones
**Duplicate Prevention** - Won't create duplicate funds or team members
**Flexible Matching** - Matches by name or website
**Batch Commits** - Commits every 10 investors for performance
**Error Handling** - Continues on errors, reports at end
**Detailed Logging** - Shows progress and summary
## Next Steps
### 1. Create Compatibility Scorer Service
See the design doc for the `CompatibilityScorer` service that will:
- Calculate match scores for both filtered and queried results
- Provide detailed breakdown of scoring
- Work with fund-level criteria
### 2. Update API Endpoints
- Modify `GET /investors` to flatten funds
- Update `GET /investors/filter` to query funds table
- Enhance `/query` endpoint to extract parameters and score
### 3. Update Frontend Schemas (Pydantic)
Add optional fields to response schemas:
- `compatibility_score: Optional[float]`
- `match_details: Optional[dict]`
- Fund-related fields in `InvestorData`
## Example Enriched JSON
```json
{
"websiteURL": "http://www.anaxago.com",
"headquarters": "Paris, France",
"investorDescription": "Anaxago is an investment group...",
"overallAssetsUnderManagement": {
"aumAmount": "EUR 850,000,000",
"asOfDate": "Not Available",
"sourceUrl": "http://www.anaxago.com"
},
"investmentThesisFocus": ["Sustainable real estate", "Climate tech"],
"portfolioHighlights": ["Tilak Healthcare", "Innovorder"],
"funds": [
{
"fundName": "Crowdfunding Immobilier",
"fundSize": "Not Available",
"estimatedInvestmentSize": "EUR 1,000 to 2,000",
"geographicFocus": ["France"],
"investmentStageFocus": ["Seed", "Early Stage"],
"sectorFocus": ["Real Estate"],
"sourceUrl": "http://www.anaxago.com/investissement"
}
],
"seniorLeadership": [
{
"name": "Joachim Dupont",
"title": "Co-fondateur et président",
"sourceUrl": "https://capital.anaxago.com/equipe"
}
],
"researcherNotes": "No explicit official fund sizes found",
"missingImportantFields": ["fundSize"],
"sources": {
"funds": "http://www.anaxago.com/investissement",
"headquarters": "http://www.anaxago.com/contact"
}
}
```
## Database Migration
If you have existing data:
```python
# Migration script (if needed)
from models import InvestorTable, engine
from sqlalchemy import text
with engine.connect() as conn:
# Add new columns (SQLAlchemy will handle this with create_all)
# But if you need manual migration:
# Convert AUM from Integer to String
conn.execute(text("ALTER TABLE investors ADD COLUMN aum_new TEXT"))
conn.execute(text("UPDATE investors SET aum_new = CAST(aum AS TEXT) WHERE aum IS NOT NULL"))
conn.execute(text("ALTER TABLE investors DROP COLUMN aum"))
conn.execute(text("ALTER TABLE investors RENAME COLUMN aum_new TO aum"))
conn.commit()
```
## Questions?
- **Q: What if an investor has no funds?**
A: They'll appear once with all fund fields as NULL
- **Q: How do we handle fund updates?**
A: Enrichment script updates existing funds by fund_name + investor_id
- **Q: Can we query by fund criteria?**
A: Yes! Join InvestorTable with FundTable and filter on fund fields
- **Q: How does compatibility scoring work?**
A: See the separate `CompatibilityScorer` service design
-202
View File
@@ -1,202 +0,0 @@
# ✅ Base Database Ingestion Complete!
**Date:** October 5, 2025
**Database:** `version_two.db`
## 📊 Summary Statistics
| Entity | Count |
| ---------------------------------- | ------ |
| **Investors** | 9,315 |
| **Companies** | 6,877 |
| **Sectors** | 639 |
| **Investor-Company Relationships** | 22,548 |
| **Investor-Sector Relationships** | 75,307 |
## 🎯 Top Investors by Portfolio Size
1. **Bpifrance** - 211 companies
2. **European Innovation Council** - 183 companies
3. **Business Growth Fund** - 84 companies
4. **HTGF (High-Tech Gruenderfonds)** - 74 companies
5. **EIT InnoEnergy** - 72 companies
## 📁 Source Files
- **Companies CSV**: 13,027 rows
- **Investors CSV**: 11,045 rows
- **Investors Ingested**: 9,315 (some duplicates/invalid entries filtered out)
## 🗃️ Database Structure
### Tables Created:
-`investors` - Core investor data
-`companies` - Portfolio companies
-`sectors` - Industry sectors
-`funds` - (Empty, will be populated during enrichment)
-`investor_members` - (Empty, will be populated during enrichment)
-`company_members` - Company team members
-`investment_stages` - Investment stage definitions
- ✅ Association tables for relationships
### Current Data:
- ✅ Investor names and basic info (website, investment count)
- ✅ Company details (name, location, industry, description)
- ✅ Sectors extracted from company industries
- ✅ Investor → Company relationships (who invested in what)
- ✅ Investor → Sector relationships (derived from portfolio)
### Missing (To Be Added via Enrichment):
- ⏳ Investor headquarters
- ⏳ AUM (Assets Under Management) details
- ⏳ Investment thesis
- ⏳ Portfolio highlights
- ⏳ Fund details (multiple funds per investor)
- ⏳ Senior leadership/team members
- ⏳ Research notes and sources
## 🔄 Next Steps
### 1. Prepare Enriched Data CSV
Your enriched CSV should have this structure:
```csv
investor_name,enriched_data
"212","{\"websiteURL\": \"...\", \"funds\": [...], ...}"
"301","{...}"
```
### 2. Run Enrichment Script
```bash
cd preprocessor
python enrich_investors.py enriched_investors.csv investor_name enriched_data
```
This will:
- ✅ Add fund details (multiple funds per investor)
- ✅ Update AUM information
- ✅ Add investment thesis
- ✅ Add portfolio highlights
- ✅ Add senior leadership
- ✅ Add research notes and sources
### 3. Verify Enriched Data
```bash
python3 << 'EOF'
from models import InvestorTable, FundTable, get_db_session
session = get_db_session()
# Check enriched data
investor = session.query(InvestorTable).filter_by(name="Anaxago").first()
if investor:
print(f"Investor: {investor.name}")
print(f"HQ: {investor.headquarters}")
print(f"AUM: {investor.aum}")
print(f"Funds: {len(investor.funds)}")
for fund in investor.funds:
print(f" - {fund.fund_name}")
session.close()
EOF
```
## 📝 Sample Queries
### Get Investor with Portfolio
```python
from models import InvestorTable, get_db_session
session = get_db_session()
investor = session.query(InvestorTable).filter_by(name="Bpifrance").first()
print(f"Investor: {investor.name}")
print(f"Website: {investor.website}")
print(f"Investments: {investor.number_of_investments}")
print(f"Portfolio Companies: {len(investor.portfolio_companies)}")
print(f"Sectors: {[s.name for s in investor.sectors[:5]]}")
session.close()
```
### Get Companies by Sector
```python
from models import CompanyTable, SectorTable, get_db_session
session = get_db_session()
sector = session.query(SectorTable).filter_by(name="AgTech").first()
print(f"Sector: {sector.name}")
print(f"Companies: {len(sector.companies)}")
for company in sector.companies[:5]:
print(f" - {company.name}")
session.close()
```
### Get Investor's Sector Distribution
```python
from models import InvestorTable, get_db_session
session = get_db_session()
investor = session.query(InvestorTable).filter_by(name="Bpifrance").first()
sectors = {}
for company in investor.portfolio_companies:
for sector in company.sectors:
sectors[sector.name] = sectors.get(sector.name, 0) + 1
# Top sectors
for sector, count in sorted(sectors.items(), key=lambda x: x[1], reverse=True)[:5]:
print(f"{sector}: {count} companies")
session.close()
```
## ⚠️ Known Issues
### Investors Not Found in DB
Some companies reference investors that weren't in the investors CSV:
- The Venture Collective
- Sarah Leary
- Transpose
- ND Capital
- InvestSud
- Third Swedish National Pension Fund
- Union Tech Ventures
- Vasuki Tech Fund
- MSA Novo
- And others...
These are likely individual angel investors or smaller funds not in the main investor list. They are recorded but not linked.
## 🔒 Backup
A backup of the database was created before ingestion:
- `version_two.db.backup_YYYYMMDD_HHMMSS`
## 📧 Support
For issues or questions:
1. Check the logs for error messages
2. Verify CSV file formats
3. Ensure all required columns are present
4. Check for duplicate entries
---
**Status:** ✅ Base database created successfully
**Ready for:** Enrichment phase with detailed investor data
-285
View File
@@ -1,285 +0,0 @@
# Quick Start Guide - Enriched Investor Data
## 🚀 Setup
### 1. Backup Your Database
```bash
cd preprocessor
cp version_two.db version_two.db.backup
```
### 2. Run Migration (for existing databases)
```bash
python migrate_database.py version_two.db
# Type 'yes' when prompted
```
### 3. Verify Schema
```bash
python3 -c "from models import init_database; init_database(); print('✅ Schema OK!')"
```
## 📊 Enriching Investor Data
### CSV Format
Your enriched CSV should have these columns:
- `investor_name` - Name of the investor (used to match existing records)
- `enriched_data` - JSON string with enriched data
**Example:**
```csv
investor_name,enriched_data
Anaxago,"{""websiteURL"": ""http://www.anaxago.com"", ""headquarters"": ""Paris, France"", ""funds"": [...]}"
VC Firm B,"{...}"
```
### Run Enrichment
```bash
python enrich_investors.py enriched_investors.csv
```
**With custom column names:**
```bash
python enrich_investors.py myfile.csv name_column data_column
```
### What Gets Updated
**Investor Level:**
- ✅ Description
- ✅ Website
- ✅ Headquarters
- ✅ AUM (amount, date, source)
- ✅ Investment thesis
- ✅ Portfolio highlights
- ✅ Linked documents
- ✅ Researcher notes
- ✅ Missing fields metadata
- ✅ Sources
**Fund Level (creates new records):**
- ✅ Fund name
- ✅ Fund size
- ✅ Estimated investment size
- ✅ Geographic focus (array)
- ✅ Investment stages (array)
- ✅ Sector focus (array)
- ✅ Source URL and provider
**Team Members (creates new records):**
- ✅ Name
- ✅ Title/Role
- ✅ Source URL
## 📋 JSON Structure
```json
{
"websiteURL": "http://www.example.com",
"headquarters": "San Francisco, CA",
"investorDescription": "Leading VC firm...",
"overallAssetsUnderManagement": {
"aumAmount": "USD 1,500,000,000",
"asOfDate": "2024-Q4",
"sourceUrl": "http://source.com"
},
"investmentThesisFocus": [
"AI and Machine Learning",
"Climate Tech"
],
"portfolioHighlights": [
"Company A",
"Company B"
],
"linkedDocuments": [
"http://doc1.com",
"http://doc2.com"
],
"funds": [
{
"fundName": "Fund I",
"fundSize": "USD 500,000,000",
"fundSizeSourceUrl": "http://source.com",
"estimatedInvestmentSize": "USD 5M to 15M",
"geographicFocus": ["North America", "Europe"],
"investmentStageFocus": ["Series A", "Series B"],
"sectorFocus": ["AI", "SaaS"],
"sourceUrl": "http://fund-info.com",
"sourceProvider": "Crunchbase"
},
{
"fundName": "Fund II",
"fundSize": "USD 750,000,000",
...
}
],
"seniorLeadership": [
{
"name": "John Doe",
"title": "Managing Partner",
"sourceUrl": "http://linkedin.com/johndoe"
}
],
"researcherNotes": "Notes about this investor...",
"missingImportantFields": ["fundSize", "checkSize"],
"sources": {
"funds": "http://source1.com",
"headquarters": "http://source2.com"
}
}
```
## 🔍 Querying
### Check Funds Created
```python
from models import InvestorTable, FundTable, get_db_session
session = get_db_session()
# Get investor with funds
investor = session.query(InvestorTable).filter_by(name="Anaxago").first()
print(f"Investor: {investor.name}")
print(f"Funds: {len(investor.funds)}")
for fund in investor.funds:
print(f" - {fund.fund_name}: {fund.fund_size}")
print(f" Geographic: {fund.geographic_focus}")
print(f" Stages: {fund.investment_stage_focus}")
print(f" Sectors: {fund.sector_focus}")
session.close()
```
### Get All Funds
```python
funds = session.query(FundTable).all()
print(f"Total funds: {len(funds)}")
for fund in funds:
print(f"{fund.investor.name} - {fund.fund_name}")
```
## 🎯 Next Steps
### 1. Update API to Flatten Funds
```python
# In app/routers/investors.py
@router.get("/investors")
def get_investors(db: Session = Depends(get_db)):
investors = db.query(InvestorTable).all()
flattened = []
for investor in investors:
if investor.funds:
for fund in investor.funds:
flattened.append({
"id": f"{investor.id}_fund_{fund.id}",
"name": investor.name,
"description": investor.description,
# ... investor fields ...
"fund_name": fund.fund_name,
"fund_size": fund.fund_size,
"geographic_focus": fund.geographic_focus,
# ... fund fields ...
})
else:
# Investor with no funds
flattened.append({...})
return flattened
```
### 2. Create Compatibility Scorer
See `DATABASE_SCHEMA_UPDATE.md` for the `CompatibilityScorer` service design.
### 3. Test the Enrichment
```python
# Quick test
from models import InvestorTable, FundTable, get_db_session
session = get_db_session()
# Count investors with funds
investors_with_funds = session.query(InvestorTable).join(FundTable).distinct().count()
total_investors = session.query(InvestorTable).count()
total_funds = session.query(FundTable).count()
print(f"Investors: {total_investors}")
print(f"Investors with funds: {investors_with_funds}")
print(f"Total funds: {total_funds}")
print(f"Avg funds per investor: {total_funds / investors_with_funds if investors_with_funds > 0 else 0:.2f}")
session.close()
```
## ❓ Troubleshooting
### "No module named 'models'"
```bash
# Make sure you're in the preprocessor directory
cd preprocessor
python enrich_investors.py ...
```
### "Duplicate fund entries"
The script matches funds by `fund_name + investor_id`. If you run enrichment twice with the same data, funds will be updated, not duplicated.
### "Investor not found"
The script tries to match by:
1. Investor name
2. Website URL
If neither matches, the investor will be created as new.
### Check Logs
The enrichment script provides detailed logging:
- ✅ Successes
- ⚠️ Warnings (missing data)
- ❌ Errors (with row numbers)
## 📚 Resources
- **Schema Documentation**: `DATABASE_SCHEMA_UPDATE.md`
- **Migration Script**: `migrate_database.py`
- **Enrichment Script**: `enrich_investors.py`
- **Models**: `models.py`
## 🎉 Success Indicators
After enrichment, you should see:
- ✅ New `funds` table populated
- ✅ Investor fields updated with enriched data
- ✅ Team members added
- ✅ No duplicate funds for same investor
- ✅ JSON fields properly stored
-287
View File
@@ -1,287 +0,0 @@
import json
import logging
import pandas as pd
from models import FundTable, InvestorMember, InvestorTable, engine, init_database
from sqlalchemy.orm import sessionmaker
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize database (create tables if they don't exist)
init_database()
def clean_value(value):
"""Clean values, converting 'Not Available', 'null', etc. to None"""
if pd.isna(value):
return None
if isinstance(value, str):
if value.strip() in ["Not Available", "null", "None", "", "0", "N/A"]:
return None
return value
def parse_json_safely(json_str):
"""Safely parse JSON string"""
try:
if pd.isna(json_str) or json_str == "":
return None
if isinstance(json_str, dict):
return json_str
return json.loads(json_str)
except (json.JSONDecodeError, TypeError) as e:
logger.error(f"Error parsing JSON: {e}")
return None
def enrich_investors(
csv_file_path: str,
investor_name_column: str = "investor_name",
enriched_data_column: str = "enriched_data",
):
"""
Enrich investors from CSV containing enriched JSON data.
Args:
csv_file_path: Path to CSV file with enriched investor data
investor_name_column: Column name containing investor name
enriched_data_column: Column name containing JSON data
"""
Session = sessionmaker(bind=engine)
session = Session()
# Load enriched data
logger.info(f"Loading enriched investors from: {csv_file_path}")
enriched_df = pd.read_csv(csv_file_path)
logger.info(f"📊 Enriched Investors CSV: {len(enriched_df)} rows")
investors_updated = 0
investors_created = 0
funds_created = 0
team_members_created = 0
investors_not_found = []
errors = []
for index, row in enriched_df.iterrows():
try:
# Parse the JSON data column
investor_data = parse_json_safely(row.get(enriched_data_column))
if not investor_data:
logger.warning(f"Row {index}: No valid JSON data")
continue
# Get investor name from row or JSON
investor_name = row.get(investor_name_column)
if not investor_name and investor_data.get("websiteURL"):
# Try to match by website if name not in CSV
investor_name = None
website = clean_value(investor_data.get("websiteURL"))
# Find or create investor
investor = None
if investor_name:
investor = (
session.query(InvestorTable).filter_by(name=investor_name).first()
)
if not investor and investor_data.get("websiteURL"):
website = clean_value(investor_data.get("websiteURL"))
investor = (
session.query(InvestorTable).filter_by(website=website).first()
)
# Create new investor if not found
if not investor:
if not investor_name:
logger.warning(f"Row {index}: No investor name found, skipping")
continue
investor = InvestorTable(name=investor_name)
session.add(investor)
session.flush() # Get ID for new investor
investors_created += 1
logger.info(f"Created new investor: {investor_name}")
else:
investors_updated += 1
# Update investor fields
investor.description = (
clean_value(investor_data.get("investorDescription"))
or investor.description
)
investor.website = (
clean_value(investor_data.get("websiteURL")) or investor.website
)
investor.headquarters = (
clean_value(investor_data.get("headquarters")) or investor.headquarters
)
# Handle AUM
aum_data = investor_data.get("overallAssetsUnderManagement", {})
if aum_data:
investor.aum = clean_value(aum_data.get("aumAmount"))
investor.aum_as_of_date = clean_value(aum_data.get("asOfDate"))
investor.aum_source_url = clean_value(aum_data.get("sourceUrl"))
# Handle investment thesis (stored as JSON array)
thesis = investor_data.get("investmentThesisFocus")
if thesis:
investor.investment_thesis = thesis
# Handle portfolio highlights (stored as JSON array)
portfolio = investor_data.get("portfolioHighlights")
if portfolio:
investor.portfolio_highlights = portfolio
# Handle linked documents
linked_docs = investor_data.get("linkedDocuments")
if linked_docs:
investor.linked_documents = linked_docs
# Handle researcher notes
notes = investor_data.get("researcherNotes")
if notes:
investor.researcher_notes = clean_value(notes)
# Handle missing important fields
missing_fields = investor_data.get("missingImportantFields")
if missing_fields:
investor.missing_important_fields = missing_fields
# Handle sources
sources = investor_data.get("sources")
if sources:
investor.sources = sources
# Process senior leadership / team members
leadership = investor_data.get("seniorLeadership", [])
for member_data in leadership:
# Check if member already exists
member_name = clean_value(member_data.get("name"))
if not member_name:
continue
existing_member = (
session.query(InvestorMember)
.filter_by(investor_id=investor.id, name=member_name)
.first()
)
if not existing_member:
member = InvestorMember(
investor_id=investor.id,
name=member_name,
title=clean_value(member_data.get("title")),
role=clean_value(member_data.get("title")), # Use title as role
source_url=clean_value(member_data.get("sourceUrl")),
)
session.add(member)
team_members_created += 1
# Process funds
funds = investor_data.get("funds", [])
for fund_data in funds:
# Check if fund already exists (by name and investor)
fund_name = clean_value(fund_data.get("fundName"))
# Always create new fund or update if exists
existing_fund = None
if fund_name:
existing_fund = (
session.query(FundTable)
.filter_by(investor_id=investor.id, fund_name=fund_name)
.first()
)
if existing_fund:
# Update existing fund
fund = existing_fund
else:
# Create new fund
fund = FundTable(investor_id=investor.id)
session.add(fund)
funds_created += 1
# Update fund fields
fund.fund_name = fund_name
fund.fund_size = clean_value(fund_data.get("fundSize"))
fund.fund_size_source_url = clean_value(
fund_data.get("fundSizeSourceUrl")
)
fund.estimated_investment_size = clean_value(
fund_data.get("estimatedInvestmentSize")
)
fund.source_url = clean_value(fund_data.get("sourceUrl"))
fund.source_provider = clean_value(fund_data.get("sourceProvider"))
fund.geographic_focus = fund_data.get("geographicFocus")
fund.investment_stage_focus = fund_data.get("investmentStageFocus")
fund.sector_focus = fund_data.get("sectorFocus")
# Commit every 10 investors
if (investors_updated + investors_created) % 10 == 0:
session.commit()
logger.info(
f" Processed {investors_updated + investors_created} investors, "
f"created {funds_created} funds, {team_members_created} team members"
)
except Exception as e:
logger.error(f"Error processing row {index}: {e}")
session.rollback()
errors.append({"row": index, "error": str(e)})
continue
# Final commit
session.commit()
# Print summary
logger.info("\n" + "=" * 60)
logger.info("🎉 ENRICHMENT COMPLETE!")
logger.info("=" * 60)
logger.info(f" Investors Updated: {investors_updated}")
logger.info(f" Investors Created: {investors_created}")
logger.info(f" Funds Created: {funds_created}")
logger.info(f" Team Members Created: {team_members_created}")
logger.info(f" Errors: {len(errors)}")
if investors_not_found:
logger.info(
f"\n⚠️ Investors not found in database ({len(investors_not_found)}):"
)
for name in investors_not_found[:10]: # Show first 10
logger.info(f" - {name}")
if len(investors_not_found) > 10:
logger.info(f" ... and {len(investors_not_found) - 10} more")
if errors:
logger.info(f"\n❌ Errors encountered ({len(errors)}):")
for error in errors[:5]: # Show first 5
logger.info(f" Row {error['row']}: {error['error']}")
if len(errors) > 5:
logger.info(f" ... and {len(errors) - 5} more errors")
session.close()
logger.info("=" * 60)
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(
"Usage: python enrich_investors.py <csv_file_path> [investor_name_column] [enriched_data_column]"
)
print("\nExample:")
print(" python enrich_investors.py enriched_investors.csv")
print(" python enrich_investors.py enriched_investors.csv 'name' 'data'")
sys.exit(1)
csv_file = sys.argv[1]
investor_col = sys.argv[2] if len(sys.argv) > 2 else "investor_name"
data_col = sys.argv[3] if len(sys.argv) > 3 else "enriched_data"
enrich_investors(csv_file, investor_col, data_col)
-513
View File
@@ -1,513 +0,0 @@
# Investor: 212
{
"investor": {
"id": null,
"name": "212",
"description": "Growth-oriented venture capital firm investing in B2B technology across Turkey, Central and Eastern Europe, and the MENA region. Operates multiple funds (including 212 NexT and Simya-related funds) and pursues multi-stage opportunities (seed to growth).",
"aum": 80000000,
"check_size_lower": 500000,
"check_size_upper": 3000000,
"geographic_focus": "Turkey, Central and Eastern Europe (CEE), Middle East & North Africa (MENA) including UAE, Europe",
"number_of_investments": 57
},
"portfolio_companies": [
{
"id": null,
"name": "RemotePass",
"industry": "Fintech / HRTech",
"location": "UAE",
"description": "Onboards, manages, and pays remote staff across 150+ countries; offers multi-currency payroll and related HR tools.",
"founded_year": 2020,
"website": "https://remotepass.com/"
},
{
"id": null,
"name": "Flow48",
"industry": "Fintech / SME lending",
"location": "UAE",
"description": "SME working capital financing platform using ERP, payment gateway and ecommerce data for risk assessment.",
"founded_year": 2021,
"website": null
},
{
"id": null,
"name": "Getmobil",
"industry": "Marketplace / E-commerce",
"location": "Istanbul, Türkiye",
"description": "Marketplace for buying/selling second-hand electronics; renewal center certified by Turkish Ministry of Trade.",
"founded_year": 2018,
"website": "https://getmobil.com/"
},
{
"id": null,
"name": "SOCRadar",
"industry": "Cybersecurity",
"location": "Istanbul, Türkiye",
"description": "Extended Threat Intelligence (XTI) platform combining EASM, DRPS and CTI for security operations.",
"founded_year": 2019,
"website": "https://socradar.io/"
},
{
"id": null,
"name": "Trio Mobil",
"industry": "Industrial IoT / AI",
"location": "Istanbul, Türkiye",
"description": "AI-driven Industrial IoT platform enabling real-time analytics and safety improvements in facilities.",
"founded_year": 2021,
"website": "https://www.triomobil.com/"
},
{
"id": null,
"name": "PhilosopherKing",
"industry": "Gaming / AI",
"location": "Las Vegas, US",
"description": "AI-powered gaming platform delivering dynamic, real-time interactive storytelling.",
"founded_year": 2023,
"website": "https://philosopherking.ai"
},
{
"id": null,
"name": "OneFive",
"industry": "Materials / Packaging AI",
"location": "Germany",
"description": "AI-driven biomaterials platform to replace single-use plastics in packaging.",
"founded_year": 2020,
"website": "https://www.one-five.com"
},
{
"id": null,
"name": "EverDye",
"industry": "Textile / Green Tech",
"location": "France",
"description": "Bio-based pigment technology enabling low-energy, low-emission dyeing processes.",
"founded_year": 2021,
"website": "https://everdye.fr"
},
{
"id": null,
"name": "Eluvium",
"industry": "AI / Data Analytics",
"location": "London, UK",
"description": "AI-driven data agents to transform unstructured information into actionable insights for manufacturing and procurement.",
"founded_year": 2024,
"website": "https://www.eluvium.ai/"
},
{
"id": null,
"name": "Khenda",
"industry": "Manufacturing / AI",
"location": "Ann Arbor, Michigan, USA",
"description": "AI-powered video analytics to extract production metrics from existing security camera footage.",
"founded_year": 2021,
"website": "https://www.khenda.com/"
},
{
"id": null,
"name": "Fazla",
"industry": "Waste / Sustainability SaaS",
"location": "Türkiye",
"description": "Technology-based solutions to reduce waste and emissions across value chains.",
"founded_year": 2021,
"website": null
}
],
"team_members": [
{
"id": null,
"name": "Ali H. Karabey",
"role": "Founding Partner, Growth Funds",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Ali Naci Temel",
"role": "Operations & Investment I, 212 NexT",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Barbaros Ozbugutu",
"role": "Experts | Leadership Management",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Cagdas Yildiz",
"role": "Investment | Simya VC",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Caglar Urcan",
"role": "Investment I, 212 NexT",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Can Deniz Tokman",
"role": "Investment I, Growth Funds",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Emin Taha Celik",
"role": "Investment I, Growth Funds",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Cenk Sezginsoy",
"role": "Experts | Venture Partner",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Can Abacigil",
"role": "Experts | Product Development",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Doğukan Kara",
"role": "Operations | Finance",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Ebru Elmas Gürses",
"role": "Operations | Finance",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Eren Baydemir",
"role": "Experts | Product Management",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Erim Hayretci",
"role": "Operations | Venture Fellow",
"email": null,
"investor_id": null
}
],
"sectors": [
{
"id": null,
"name": "Artificial Intelligence"
},
{
"id": null,
"name": "Cybersecurity"
},
{
"id": null,
"name": "Fintech"
},
{
"id": null,
"name": "Industrial IoT"
},
{
"id": null,
"name": "E-commerce / Marketplace"
},
{
"id": null,
"name": "Gaming / Entertainment"
},
{
"id": null,
"name": "Sustainability / Green Tech"
},
{
"id": null,
"name": "Data & Analytics"
},
{
"id": null,
"name": "Enterprise Software"
}
],
"investment_stages": [
{
"id": null,
"stage": "SEED"
},
{
"id": null,
"stage": "SERIES_A"
},
{
"id": null,
"stage": "SERIES_B"
},
{
"id": null,
"stage": "SERIES_C"
},
{
"id": null,
"stage": "GROWTH"
},
{
"id": null,
"stage": "LATE_STAGE"
}
]
}
# Investor: 301
{
"investor": {
"id": null,
"name": "301 INC",
"description": "The venture capital arm of General Mills. We invest in driven and passionate founders across the food ecosystem and partner with founder teams to help realize their ambitions.",
"aum": null,
"check_size_lower": null,
"check_size_upper": null,
"geographic_focus": "United States",
"number_of_investments": 21
},
"team_members": [
{
"id": null,
"name": "Kristen Harvey",
"role": "Managing Director, 301 INC",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Miles Swammi",
"role": "Sr. Principal, Business Development, 301 INC",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Taylor Sankovich",
"role": "Sr. Principal, Commercial Partnerships, 301 INC",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Steven Schweiger",
"role": "Principal, Investments, 301 INC",
"email": null,
"investor_id": null
}
],
"sectors": [
{
"id": null,
"name": "Food & Beverage"
},
{
"id": null,
"name": "Foodtech"
},
{
"id": null,
"name": "CPG"
},
{
"id": null,
"name": "Consumer Goods"
}
],
"investment_stages": [
{
"id": null,
"stage": "SEED"
},
{
"id": null,
"stage": "SERIES_A"
}
]
}
# Investor: 2050
{
"investor": {
"id": null,
"name": "2050",
"description": "An ecosystemic venture fund backing mission-driven founders advancing a sustainable economy. Operates via an evergreen model including 2050.do (management company), 2050.ventures (Article 9 SFDR evergreen fund) and 2050.commons. Emphasizes aligned ecosystems, open strategic resources, and portfolio-wide social/environmental impact aligned with the UN SDGs (the Five Essentials).",
"aum": 130000000,
"check_size_lower": null,
"check_size_upper": null,
"geographic_focus": "Europe, Africa",
"number_of_investments": 13
},
"team_members": [
{
"id": null,
"name": "Marie Ekeland",
"role": "Founder & CEO",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Olivier Mathiot",
"role": "General Manager",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Aude Duprat",
"role": "General Secretary",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Guillaume Bregeras",
"role": "Chief Knowledge Officer & General Manager",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Charly Berthet",
"role": "Investor",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Meyha Camara",
"role": "Communication Manager",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Diana Krantz",
"role": "Investor",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Matthieu Scetbun",
"role": "Chief Financial Officer",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Sindre Østgård",
"role": "Chief Aligner",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Éric Carreel",
"role": "Co-founder & Chairman",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Kimo Paula",
"role": "Co-founder & CCO",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Christian Couturier",
"role": "Director, Solagro",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Marieke van Iperen",
"role": "Co-founder & CEO, Settly",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Laura Beaulier",
"role": "CEO, Climate Dividends",
"email": null,
"investor_id": null
},
{
"id": null,
"name": "Arnaud Le Rodallec",
"role": "Co-founder & CPO/CTO, Fifteen",
"email": null,
"investor_id": null
}
],
"sectors": [
{
"id": null,
"name": "Climate & Sustainability"
},
{
"id": null,
"name": "Ocean / Maritime"
},
{
"id": null,
"name": "Food & Agriculture"
},
{
"id": null,
"name": "Education & Learning"
},
{
"id": null,
"name": "Human & Social Impact"
},
{
"id": null,
"name": "Climate Finance & Ecosystem Alignment"
}
],
"investment_stages": [
{
"id": null,
"stage": "SEED"
},
{
"id": null,
"stage": "SERIES_A"
},
{
"id": null,
"stage": "SERIES_B"
},
{
"id": null,
"stage": "SERIES_C"
},
{
"id": null,
"stage": "GROWTH"
}
]
}
-131
View File
@@ -1,131 +0,0 @@
"""
Migration script to update existing database schema
Converts AUM from INTEGER to TEXT and adds new columns
"""
import logging
import sqlite3
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def migrate_database(db_path="version_two.db"):
"""Migrate existing database to new schema"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
logger.info("Starting database migration...")
try:
# Check current schema
cursor.execute("PRAGMA table_info(investors);")
columns = {col[1]: col[2] for col in cursor.fetchall()}
# 1. Convert AUM from INTEGER to TEXT
if "aum" in columns and columns["aum"] == "INTEGER":
logger.info("Converting AUM from INTEGER to TEXT...")
cursor.execute("ALTER TABLE investors RENAME COLUMN aum TO aum_old;")
cursor.execute("ALTER TABLE investors ADD COLUMN aum TEXT;")
cursor.execute(
"UPDATE investors SET aum = CAST(aum_old AS TEXT) WHERE aum_old IS NOT NULL;"
)
cursor.execute("ALTER TABLE investors DROP COLUMN aum_old;")
logger.info("✅ AUM converted to TEXT")
# 2. Add new columns if they don't exist
new_columns = {
"headquarters": "TEXT",
"aum_as_of_date": "TEXT",
"aum_source_url": "TEXT",
"investment_thesis": "JSON",
"portfolio_highlights": "JSON",
"linked_documents": "JSON",
"researcher_notes": "TEXT",
"missing_important_fields": "JSON",
"sources": "JSON",
}
for col_name, col_type in new_columns.items():
if col_name not in columns:
logger.info(f"Adding column: {col_name} ({col_type})")
cursor.execute(
f"ALTER TABLE investors ADD COLUMN {col_name} {col_type};"
)
# 3. Add new columns to investor_members if they don't exist
cursor.execute("PRAGMA table_info(investor_members);")
member_columns = {col[1]: col[2] for col in cursor.fetchall()}
if "title" not in member_columns:
logger.info("Adding 'title' to investor_members")
cursor.execute("ALTER TABLE investor_members ADD COLUMN title TEXT;")
if "source_url" not in member_columns:
logger.info("Adding 'source_url' to investor_members")
cursor.execute("ALTER TABLE investor_members ADD COLUMN source_url TEXT;")
# 4. Check if funds table exists
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='funds';"
)
if not cursor.fetchone():
logger.info("Creating funds table...")
cursor.execute("""
CREATE TABLE funds (
id INTEGER NOT NULL PRIMARY KEY,
investor_id INTEGER NOT NULL,
fund_name VARCHAR,
fund_size VARCHAR,
fund_size_source_url VARCHAR,
estimated_investment_size VARCHAR,
source_url VARCHAR,
source_provider VARCHAR,
geographic_focus JSON,
investment_stage_focus JSON,
sector_focus JSON,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME,
FOREIGN KEY(investor_id) REFERENCES investors (id)
);
""")
logger.info("✅ Funds table created")
conn.commit()
logger.info("\n🎉 Migration completed successfully!")
# Show summary
cursor.execute("PRAGMA table_info(investors);")
investor_cols = cursor.fetchall()
logger.info(f"\nInvestors table now has {len(investor_cols)} columns")
cursor.execute("SELECT COUNT(*) FROM investors;")
investor_count = cursor.fetchone()[0]
logger.info(f"Investors in database: {investor_count}")
cursor.execute("SELECT COUNT(*) FROM funds;")
fund_count = cursor.fetchone()[0]
logger.info(f"Funds in database: {fund_count}")
except Exception as e:
logger.error(f"Migration failed: {e}")
conn.rollback()
raise
finally:
conn.close()
if __name__ == "__main__":
import sys
db_file = sys.argv[1] if len(sys.argv) > 1 else "version_two.db"
print(f"Migrating database: {db_file}")
print("⚠️ This will modify your database. Make sure you have a backup!")
response = input("Continue? (yes/no): ")
if response.lower() in ["yes", "y"]:
migrate_database(db_file)
else:
print("Migration cancelled")
-250
View File
@@ -1,250 +0,0 @@
#!/usr/bin/env python3
"""
Migration script to update fund table schema:
1. Change geographic_focus from JSON to STRING
2. Create investment_stages table and fund_investment_stages association table
3. Create fund_sectors association table for many-to-many with sectors
4. Remove investment_stage_focus and sector_focus JSON columns
"""
import sqlite3
from pathlib import Path
def migrate_fund_relationships():
db_path = Path(__file__).parent / "version_two.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
print("🔄 Starting fund relationships migration...")
try:
# Step 1: Drop and recreate investment_stages table with correct schema
print("1️⃣ Recreating investment_stages table...")
cursor.execute("DROP TABLE IF EXISTS investment_stages")
cursor.execute("""
CREATE TABLE investment_stages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR NOT NULL UNIQUE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME
)
""")
# Insert standard investment stages
stages = [
"Seed",
"Pre-Seed",
"Series A",
"Series B",
"Series C",
"Series D+",
"Growth",
"Late Stage",
"IPO",
"Venture",
"Early Stage",
]
for stage in stages:
cursor.execute(
"""
INSERT OR IGNORE INTO investment_stages (name) VALUES (?)
""",
(stage,),
)
print(f" ✅ Created investment_stages table with {len(stages)} stages")
# Step 2: Create fund_investment_stages association table
print("2️⃣ Creating fund_investment_stages association table...")
cursor.execute("""
CREATE TABLE IF NOT EXISTS fund_investment_stages (
fund_id INTEGER NOT NULL,
stage_id INTEGER NOT NULL,
PRIMARY KEY (fund_id, stage_id),
FOREIGN KEY (fund_id) REFERENCES funds (id) ON DELETE CASCADE,
FOREIGN KEY (stage_id) REFERENCES investment_stages (id) ON DELETE CASCADE
)
""")
print(" ✅ Created fund_investment_stages association table")
# Step 3: Create fund_sectors association table
print("3️⃣ Creating fund_sectors association table...")
cursor.execute("""
CREATE TABLE IF NOT EXISTS fund_sectors (
fund_id INTEGER NOT NULL,
sector_id INTEGER NOT NULL,
PRIMARY KEY (fund_id, sector_id),
FOREIGN KEY (fund_id) REFERENCES funds (id) ON DELETE CASCADE,
FOREIGN KEY (sector_id) REFERENCES sectors (id) ON DELETE CASCADE
)
""")
print(" ✅ Created fund_sectors association table")
# Step 4: Get current funds table columns
cursor.execute("PRAGMA table_info(funds)")
columns = {col[1]: col for col in cursor.fetchall()}
print(f"\n📊 Current funds table has {len(columns)} columns")
# Step 5: Create new funds table with updated schema
print("4️⃣ Creating new funds table schema...")
cursor.execute("""
CREATE TABLE funds_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
investor_id INTEGER NOT NULL,
fund_name VARCHAR,
fund_size INTEGER,
fund_size_source_url VARCHAR,
check_size_lower INTEGER,
check_size_upper INTEGER,
source_url VARCHAR,
source_provider VARCHAR,
geographic_focus VARCHAR,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME,
FOREIGN KEY (investor_id) REFERENCES investors (id)
)
""")
# Step 6: Copy data from old table to new table
print("5️⃣ Copying data from old funds table...")
cursor.execute("""
INSERT INTO funds_new (
id, investor_id, fund_name, fund_size, fund_size_source_url,
check_size_lower, check_size_upper, source_url, source_provider,
geographic_focus, created_at, updated_at
)
SELECT
id, investor_id, fund_name, fund_size, fund_size_source_url,
check_size_lower, check_size_upper, source_url, source_provider,
CASE
WHEN geographic_focus IS NOT NULL AND geographic_focus != '[]'
THEN REPLACE(REPLACE(geographic_focus, '["', ''), '"]', '')
ELSE NULL
END as geographic_focus,
created_at, updated_at
FROM funds
""")
rows_copied = cursor.rowcount
print(f" ✅ Copied {rows_copied} rows")
# Step 7: Migrate investment_stage_focus data to association table
print("6️⃣ Migrating investment stage focus data...")
cursor.execute("""
SELECT id, investment_stage_focus FROM funds
WHERE investment_stage_focus IS NOT NULL AND investment_stage_focus != '[]'
""")
funds_with_stages = cursor.fetchall()
stage_migrations = 0
for fund_id, stages_json in funds_with_stages:
if stages_json:
try:
import json
stages = json.loads(stages_json)
for stage_name in stages:
# Find matching stage
cursor.execute(
"""
SELECT id FROM investment_stages WHERE name = ?
""",
(stage_name,),
)
result = cursor.fetchone()
if result:
stage_id = result[0]
cursor.execute(
"""
INSERT OR IGNORE INTO fund_investment_stages (fund_id, stage_id)
VALUES (?, ?)
""",
(fund_id, stage_id),
)
stage_migrations += 1
except:
pass
print(f" ✅ Migrated {stage_migrations} stage relationships")
# Step 8: Migrate sector_focus data to association table
print("7️⃣ Migrating sector focus data...")
cursor.execute("""
SELECT id, sector_focus FROM funds
WHERE sector_focus IS NOT NULL AND sector_focus != '[]'
""")
funds_with_sectors = cursor.fetchall()
sector_migrations = 0
for fund_id, sectors_json in funds_with_sectors:
if sectors_json:
try:
import json
sectors = json.loads(sectors_json)
for sector_name in sectors:
# Find or create sector
cursor.execute(
"""
SELECT id FROM sectors WHERE name = ?
""",
(sector_name,),
)
result = cursor.fetchone()
if result:
sector_id = result[0]
else:
cursor.execute(
"""
INSERT INTO sectors (name) VALUES (?)
""",
(sector_name,),
)
sector_id = cursor.lastrowid
cursor.execute(
"""
INSERT OR IGNORE INTO fund_sectors (fund_id, sector_id)
VALUES (?, ?)
""",
(fund_id, sector_id),
)
sector_migrations += 1
except:
pass
print(f" ✅ Migrated {sector_migrations} sector relationships")
# Step 9: Drop old funds table
print("8️⃣ Dropping old funds table...")
cursor.execute("DROP TABLE funds")
# Step 10: Rename new table to funds
print("9️⃣ Renaming funds_new to funds...")
cursor.execute("ALTER TABLE funds_new RENAME TO funds")
# Commit all changes
conn.commit()
print("\n✅ Migration completed successfully!")
print("\n📝 Summary:")
print(f" - Created investment_stages table with {len(stages)} stages")
print(" - Created fund_investment_stages association table")
print(" - Created fund_sectors association table")
print(f" - Migrated {rows_copied} fund records")
print(f" - Migrated {stage_migrations} stage relationships")
print(f" - Migrated {sector_migrations} sector relationships")
print(" - geographic_focus: JSON → STRING")
print(" - investment_stage_focus: REMOVED (now in fund_investment_stages)")
print(" - sector_focus: REMOVED (now in fund_sectors)")
except Exception as e:
conn.rollback()
print(f"\n❌ Migration failed: {e}")
raise
finally:
conn.close()
if __name__ == "__main__":
migrate_fund_relationships()
-159
View File
@@ -1,159 +0,0 @@
"""
Migration script to update FundTable schema:
- Change fund_size from VARCHAR to INTEGER
- Remove estimated_investment_size column
- Add check_size_lower INTEGER column
- Add check_size_upper INTEGER column
"""
import sys
from pathlib import Path
# Add preprocessor to path
sys.path.insert(0, str(Path(__file__).parent))
from models import engine
from sqlalchemy import text
def migrate_fund_table():
"""
Migrate the funds table to add check_size fields and update fund_size type.
SQLite doesn't support ALTER COLUMN directly, so we need to:
1. Create new table with correct schema
2. Copy data from old table
3. Drop old table
4. Rename new table
"""
print("🔄 Starting fund table migration...")
with engine.connect() as conn:
# Start transaction
trans = conn.begin()
try:
# Check if migration is needed
result = conn.execute(text("PRAGMA table_info(funds)"))
columns = {row[1]: row[2] for row in result}
if "check_size_lower" in columns and "check_size_upper" in columns:
print("✅ Migration already applied - check_size columns exist")
return
print("📊 Current columns:", list(columns.keys()))
# Create new table with updated schema
print("\n1️⃣ Creating new funds table with updated schema...")
conn.execute(
text("""
CREATE TABLE IF NOT EXISTS funds_new (
id INTEGER PRIMARY KEY,
investor_id INTEGER NOT NULL,
fund_name VARCHAR,
fund_size INTEGER,
fund_size_source_url VARCHAR,
check_size_lower INTEGER,
check_size_upper INTEGER,
source_url VARCHAR,
source_provider VARCHAR,
geographic_focus JSON,
investment_stage_focus JSON,
sector_focus JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at DATETIME,
FOREIGN KEY (investor_id) REFERENCES investors(id)
)
""")
)
# Copy data from old table to new table
print("2️⃣ Copying data from old table...")
# Check if old estimated_investment_size column exists
if "estimated_investment_size" in columns:
# We have estimated_investment_size but it's a string
# We'll set check_size fields to NULL for now - they'll be repopulated when re-parsing
conn.execute(
text("""
INSERT INTO funds_new (
id, investor_id, fund_name, fund_size, fund_size_source_url,
check_size_lower, check_size_upper,
source_url, source_provider,
geographic_focus, investment_stage_focus, sector_focus,
created_at, updated_at
)
SELECT
id, investor_id, fund_name,
CAST(fund_size AS INTEGER) as fund_size,
fund_size_source_url,
NULL as check_size_lower,
NULL as check_size_upper,
source_url, source_provider,
geographic_focus, investment_stage_focus, sector_focus,
created_at, updated_at
FROM funds
""")
)
else:
# No estimated_investment_size column (fresh install or already migrated partially)
conn.execute(
text("""
INSERT INTO funds_new (
id, investor_id, fund_name, fund_size, fund_size_source_url,
check_size_lower, check_size_upper,
source_url, source_provider,
geographic_focus, investment_stage_focus, sector_focus,
created_at, updated_at
)
SELECT
id, investor_id, fund_name,
CAST(fund_size AS INTEGER) as fund_size,
fund_size_source_url,
NULL as check_size_lower,
NULL as check_size_upper,
source_url, source_provider,
geographic_focus, investment_stage_focus, sector_focus,
created_at, updated_at
FROM funds
""")
)
rows_copied = conn.execute(
text("SELECT COUNT(*) FROM funds_new")
).fetchone()[0]
print(f" ✅ Copied {rows_copied} rows")
# Drop old table
print("3️⃣ Dropping old funds table...")
conn.execute(text("DROP TABLE funds"))
# Rename new table
print("4️⃣ Renaming funds_new to funds...")
conn.execute(text("ALTER TABLE funds_new RENAME TO funds"))
# Commit transaction
trans.commit()
print("\n✅ Migration completed successfully!")
print("\n📝 Summary:")
print(" - fund_size: VARCHAR → INTEGER")
print(" - estimated_investment_size: REMOVED")
print(" - check_size_lower: ADDED (INTEGER)")
print(" - check_size_upper: ADDED (INTEGER)")
print(f" - {rows_copied} fund records migrated")
print(
"\n⚠️ Note: check_size_lower and check_size_upper are NULL for existing records."
)
print(" Run the investor CSV parser again to populate these fields.")
except Exception as e:
trans.rollback()
print(f"\n❌ Migration failed: {e}")
raise
if __name__ == "__main__":
migrate_fund_table()
-367
View File
@@ -1,367 +0,0 @@
import enum
from typing import Annotated
from fastapi import Depends
from sqlalchemy import (
Column,
DateTime,
ForeignKey,
Integer,
String,
Tableclass InvestorMember(Base, TimestampMixin):
__tablename__ = "investor_members"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
role = Column(String, nullable=True)
title = Column(String, nullable=True) # Alternative to role
email = Column(String, nullable=True)
source_url = Column(String, nullable=True) # URL where member info was found
investor_id = Column(Integer, ForeignKey("investors.id"))
investor = relationship("InvestorTable", back_populates="team_members")
class FundTable(Base, TimestampMixin):
__tablename__ = "funds"
id = Column(Integer, primary_key=True, index=True)
investor_id = Column(Integer, ForeignKey("investors.id"), nullable=False)
# Fund details
fund_name = Column(String, nullable=True)
fund_size = Column(String, nullable=True) # Store as string to preserve currency
fund_size_source_url = Column(String, nullable=True)
estimated_investment_size = Column(String, nullable=True) # e.g., "EUR 1,000 to 2,000"
source_url = Column(String, nullable=True)
source_provider = Column(String, nullable=True) # e.g., "Perplexity"
# JSON array fields
geographic_focus = Column(JSON, nullable=True) # Array of regions/countries
investment_stage_focus = Column(JSON, nullable=True) # Array of stages
sector_focus = Column(JSON, nullable=True) # Array of sectors
# Relationships
investor = relationship("InvestorTable", back_populates="funds")
class InvestmentStageTable(Base, TimestampMixin): create_engine,
func,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, declarative_mixin, relationship, sessionmaker
from sqlalchemy.types import Enum, JSON, JSON
Base = declarative_base()
# Database configuration
# DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./investors.db")
# Create engine
engine = create_engine("sqlite:///./version_two.db", echo=False)
# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
def init_database():
"""Initialize the database by creating all tables"""
Base.metadata.create_all(bind=engine)
def get_session_sync() -> Session:
"""Get a database session for synchronous operations"""
return SessionLocal()
def get_db_session():
"""Get a database session for direct use."""
return SessionLocal()
@declarative_mixin
class TimestampMixin:
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class InvestmentStage(enum.Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
# Association table for many-to-many relationship between investors and companies
investor_company_association = Table(
"investor_companies",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("company_id", Integer, ForeignKey("companies.id")),
)
# Association table for investor-sector many-to-many
investor_sector_association = Table(
"investor_sectors",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
company_sector_association = Table(
"company_sector",
Base.metadata,
Column("company_id", Integer, ForeignKey("companies.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_sector_association = Table(
"project_sector",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_investor_association = Table(
"project_investors",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("investor_id", Integer, ForeignKey("investors.id")),
)
project_company_association = Table(
"project_companies",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("company_id", Integer, ForeignKey("companies.id")),
)
# Association table for investor-stage many-to-many
investor_stage_association = Table(
"investor_stages",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("stage_id", Integer, ForeignKey("investment_stages.id")),
)
class InvestorTable(Base, TimestampMixin):
__tablename__ = "investors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
# Basic investor info
website = Column(String, nullable=True)
headquarters = Column(String, nullable=True)
# AUM fields
aum = Column(String, nullable=True) # Store as string to preserve currency (e.g., "EUR 850,000,000")
aum_as_of_date = Column(String, nullable=True)
aum_source_url = Column(String, nullable=True)
# Check size (deprecated in favor of fund-level data, but keeping for backward compatibility)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
# Geographic focus (deprecated in favor of fund-level, but keeping for backward compatibility)
geographic_focus = Column(String, nullable=True)
# Investment thesis and portfolio
investment_thesis = Column(JSON, nullable=True) # Array of thesis statements
portfolio_highlights = Column(JSON, nullable=True) # Array of portfolio company names
linked_documents = Column(JSON, nullable=True) # Array of document URLs
# Research metadata
researcher_notes = Column(Text, nullable=True)
missing_important_fields = Column(JSON, nullable=True) # Array of missing field names
sources = Column(JSON, nullable=True) # JSON object with source URLs
# Portfolio info
number_of_investments = Column(Integer, nullable=True)
# Relationships
team_members = relationship("InvestorMember", back_populates="investor")
funds = relationship("FundTable", back_populates="investor", cascade="all, delete-orphan")
# Many-to-many relationship with investment stages
investment_stages = relationship(
"InvestmentStageTable",
secondary=investor_stage_association,
back_populates="investors",
)
# Relationship to portfolio companies
portfolio_companies = relationship(
"CompanyTable",
secondary=investor_company_association,
back_populates="investors",
)
sectors = relationship(
"SectorTable",
secondary=investor_sector_association,
back_populates="investors",
)
projects = relationship(
"ProjectTable",
secondary=project_investor_association,
back_populates="investors",
)
class InvestorMember(Base, TimestampMixin):
__tablename__ = "investor_members"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
role = Column(String, nullable=True)
title = Column(String, nullable=True) # Alternative to role
email = Column(String, nullable=True)
source_url = Column(String, nullable=True) # URL where member info was found
investor_id = Column(Integer, ForeignKey("investors.id"))
investor = relationship("InvestorTable", back_populates="team_members")
class FundTable(Base, TimestampMixin):
__tablename__ = "funds"
id = Column(Integer, primary_key=True, index=True)
investor_id = Column(Integer, ForeignKey("investors.id"), nullable=False)
# Fund details
fund_name = Column(String, nullable=True)
fund_size = Column(String, nullable=True) # Store as string to preserve currency
fund_size_source_url = Column(String, nullable=True)
estimated_investment_size = Column(String, nullable=True) # e.g., "EUR 1,000 to 2,000"
source_url = Column(String, nullable=True)
source_provider = Column(String, nullable=True) # e.g., "Perplexity"
# JSON array fields
geographic_focus = Column(JSON, nullable=True) # Array of regions/countries
investment_stage_focus = Column(JSON, nullable=True) # Array of stages
sector_focus = Column(JSON, nullable=True) # Array of sectors
# Relationships
investor = relationship("InvestorTable", back_populates="funds")
class InvestmentStageTable(Base, TimestampMixin):
__tablename__ = "investment_stages"
id = Column(Integer, primary_key=True, index=True)
stage = Column(Enum(InvestmentStage), nullable=False, unique=True)
# Relationship back to investors
investors = relationship(
"InvestorTable",
secondary=investor_stage_association,
back_populates="investment_stages",
)
class CompanyTable(Base, TimestampMixin):
__tablename__ = "companies"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
industry = Column(String, nullable=True)
location = Column(String, nullable=True)
description = Column(String, nullable=True)
founded_year = Column(Integer, nullable=True)
website = Column(String, nullable=True)
members = relationship("CompanyMember", back_populates="company")
# Relationship back to investors
investors = relationship(
"InvestorTable",
secondary=investor_company_association,
back_populates="portfolio_companies",
)
sectors = relationship(
"SectorTable", secondary=company_sector_association, back_populates="companies"
)
projects = relationship(
"ProjectTable",
secondary=project_company_association,
back_populates="companies",
)
class CompanyMember(Base, TimestampMixin):
__tablename__ = "company_members"
id = Column(Integer, primary_key=True)
name = Column(String)
linkedin = Column(String, nullable=True)
role = Column(String, nullable=True)
company_id = Column(Integer, ForeignKey("companies.id"), nullable=False)
company = relationship("CompanyTable", back_populates="members")
class SectorTable(Base, TimestampMixin):
__tablename__ = "sectors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
# Add relationship back to investors
investors = relationship(
"InvestorTable",
secondary=investor_sector_association,
back_populates="sectors",
)
companies = relationship(
"CompanyTable", secondary=company_sector_association, back_populates="sectors"
)
projects = relationship(
"ProjectTable", secondary=project_sector_association, back_populates="sector"
)
class ProjectTable(Base, TimestampMixin):
__tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
valuation = Column(Integer, nullable=True)
stage = Column(Enum(InvestmentStage), nullable=True)
location = Column(String, nullable=True)
description = Column(Text, nullable=True)
start_date = Column(DateTime, nullable=True)
end_date = Column(DateTime, nullable=True)
sector = relationship(
"SectorTable", secondary=project_sector_association, back_populates="projects"
)
investors = relationship(
"InvestorTable",
secondary=project_investor_association,
back_populates="projects",
)
companies = relationship(
"CompanyTable", secondary=project_company_association, back_populates="projects"
)
-121
View File
@@ -1,121 +0,0 @@
#!/usr/bin/env python3
"""
Quick verification script for the database
"""
from models import CompanyTable, FundTable, InvestorTable, SectorTable, get_db_session
def verify_database():
session = get_db_session()
print("=" * 60)
print("🔍 DATABASE VERIFICATION")
print("=" * 60)
# Count records
investor_count = session.query(InvestorTable).count()
company_count = session.query(CompanyTable).count()
sector_count = session.query(SectorTable).count()
fund_count = session.query(FundTable).count()
print("\n📊 Record Counts:")
print(f" Investors: {investor_count:,}")
print(f" Companies: {company_count:,}")
print(f" Sectors: {sector_count:,}")
print(f" Funds: {fund_count:,}")
# Check relationships
investors_with_companies = (
session.query(InvestorTable)
.filter(InvestorTable.portfolio_companies.any())
.count()
)
investors_with_sectors = (
session.query(InvestorTable).filter(InvestorTable.sectors.any()).count()
)
print("\n🔗 Relationships:")
print(f" Investors with portfolio companies: {investors_with_companies:,}")
print(f" Investors with sectors: {investors_with_sectors:,}")
# Sample data quality checks
investors_with_website = (
session.query(InvestorTable).filter(InvestorTable.website.isnot(None)).count()
)
investors_with_investments = (
session.query(InvestorTable)
.filter(
InvestorTable.number_of_investments.isnot(None),
InvestorTable.number_of_investments > 0,
)
.count()
)
print("\n✅ Data Quality:")
print(
f" Investors with website: {investors_with_website:,} ({investors_with_website / investor_count * 100:.1f}%)"
)
print(
f" Investors with investment count: {investors_with_investments:,} ({investors_with_investments / investor_count * 100:.1f}%)"
)
# Check for enrichment readiness
investors_with_aum = (
session.query(InvestorTable).filter(InvestorTable.aum.isnot(None)).count()
)
investors_with_headquarters = (
session.query(InvestorTable)
.filter(InvestorTable.headquarters.isnot(None))
.count()
)
investors_with_thesis = (
session.query(InvestorTable)
.filter(InvestorTable.investment_thesis.isnot(None))
.count()
)
print("\n🎯 Enrichment Status:")
print(f" Investors with AUM: {investors_with_aum:,}")
print(f" Investors with HQ: {investors_with_headquarters:,}")
print(f" Investors with thesis: {investors_with_thesis:,}")
print(f" Investors with funds: {fund_count:,}")
if fund_count == 0:
print("\n⚠️ No funds found - enrichment needed!")
# Show a random sample
import random
sample_investors = session.query(InvestorTable).limit(1000).all()
sample = random.sample(sample_investors, min(3, len(sample_investors)))
print("\n📋 Random Sample:")
for inv in sample:
print(f"\n {inv.name}")
print(f" Website: {inv.website or 'N/A'}")
print(f" Investments: {inv.number_of_investments or 'N/A'}")
print(f" Portfolio: {len(inv.portfolio_companies)} companies")
print(f" Sectors: {len(inv.sectors)} sectors")
if inv.funds:
print(f" Funds: {len(inv.funds)}")
session.close()
print("\n" + "=" * 60)
if fund_count == 0:
print("📝 Next step: Run enrichment script")
print(" python enrich_investors.py enriched_investors.csv")
else:
print("✅ Database is enriched and ready!")
print("=" * 60)
if __name__ == "__main__":
verify_database()
Binary file not shown.
Binary file not shown.
-349
View File
@@ -1,349 +0,0 @@
import asyncio
import logging
import os
from typing import Optional
from crawl4ai import AsyncWebCrawler
from web_crawler_schemas import InvestorDataScrape
from ddgs import DDGS
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from models import (
CompanyTable,
InvestmentStageTable,
InvestorMember,
InvestorTable,
SectorTable,
engine,
)
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session()
# ------------------------------------------------------------------
# Logging setup
# ------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("web_search_agent")
# ------------------------------------------------------------------
# Environment
# ------------------------------------------------------------------
load_dotenv()
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
if not OPENROUTER_API_KEY:
logger.warning("OPENROUTER_API_KEY not set. LLM calls will fail if invoked.")
class QueryProcessor:
def __init__(self, sql_session: Optional[object] = None):
self.sql_session = sql_session
self.llm = ChatOpenAI(
api_key=OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
model="openai/gpt-5-nano",
temperature=0,
)
self.agent = create_react_agent(
model=self.llm,
tools=[self.crawl, self.web_search],
response_format=InvestorDataScrape,
)
self.ddg_search = DDGS()
async def fill_investor(self, investor: InvestorTable):
inv_dict = {
col.name: getattr(investor, col.name) for col in investor.__table__.columns
}
website = inv_dict.get("website", "No Website")
name = inv_dict.get("name", "Unknown")
description = inv_dict.get("description", "No description")
aum = inv_dict.get("aum", "Unknown")
check_size_lower = inv_dict.get("check_size_lower", "Unknown")
check_size_upper = inv_dict.get("check_size_upper", "Unknown")
geographic_focus = inv_dict.get("geographic_focus", "Unknown")
number_of_investments = inv_dict.get("number_of_investments", "Unknown")
print(website)
prompt = f"""
You are a crawler agent. You will be provided with information about a venture capital investor and their website.
Your task is to navigate the website to find and enrich the existing information.
If the website is not available, use the `web_search` tool to google the name of the investor company.
Use the `crawl` tool to visit web pages and extract information.
Current investor information:
- Name: {name}
- Website: {website}
- Description: {description}
- Assets Under Management: {aum}
- Check Size Lower: {check_size_lower}
- Check Size Upper: {check_size_upper}
- Geographic Focus: {geographic_focus}
- Number of Investments: {number_of_investments}
IMPORTANT: Investment Stages - Investors often focus on MULTIPLE stages. Look for:
- "Seed to Series A" = [SEED, SERIES_A]
- "Early stage" = [SEED, SERIES_A]
- "Growth stage" = [SERIES_B, SERIES_C, GROWTH]
- "Multi-stage" = [SEED, SERIES_A, SERIES_B, SERIES_C]
- "Late stage" = [GROWTH, LATE_STAGE]
- "Series A and B" = [SERIES_A, SERIES_B]
IMPORTANT: Additional guidance for AUM and Check Size
- "Check size" may also be written as "ticket size", "investment size", "typical investment range", or "investment amount".
- "Assets under management (AUM)" may also be called "fund size", "capital under management", or "fund raised".
- If not on the official website, search news and databases like Crunchbase, PitchBook, Dealroom, TechCrunch, PRNewswire, or EU-Startups.
- Look for numbers with currency symbols (€,$,£) followed by "M", "B", "million", or "billion".
- Example: "fund size €200M", "typical tickets $15M", "raised £1 billion".
Follow these steps:
1. Use the `crawl` tool with the main website URL to get the initial content.
2. Analyze the returned content. Look for links or sections related to the information you need (About, Team, Portfolio, Investments, Funds).
3. If you find a relevant URL, call the `crawl` tool again with that new URL to get more detailed information.
4. If AUM or check size are still missing, immediately perform 12 `web_search` queries such as:
- "{name} fund size site:techcrunch.com"
- "{name} ticket size site:eu-startups.com"
- "{name} raises fund site:prnewswire.com"
5. Continue this process, exploring relevant pages, until you have gathered all the required information.
6. Extract and update the following information:
- investor: Core investor data (name, description, aum, check_size_lower, check_size_upper, geographic_focus, number_of_investments)
- team_members: List of key members with name, role, and email/LinkedIn
- sectors: List of investment sectors they focus on
- investment_stages: List of ALL investment stages they focus on (can be multiple!)
7. If any information is not available or cannot be improved, leave it as null or use existing data.
Stop crawling/searching once you have found the missing information or confirmed it is not available online.
Website: {website}
"""
return prompt
async def crawl(self, url: str):
"""Tool to search the web using a web crawler. given the url"""
print(f"🕷️ Crawling: {url}")
try:
if url == "No Website" or not url or url.strip() == "":
return "No website provided for this investor. Please use web_search to find information."
async with AsyncWebCrawler() as crawler:
results = await crawler.arun(url)
return results.markdown[:5000] # Limit content to avoid token limits
except Exception as e:
print(f"❌ Failed to crawl {url}: {e}")
return f"Failed to crawl website: {e}. Please try web_search instead."
def web_search(self, query: str):
"""Tool to search the web using google"""
print(f"🔍 Searching: {query}")
try:
result = self.ddg_search.text(query, max_results=10, backend="google")
# Format results for better LLM consumption
formatted_results = []
for r in result:
formatted_results.append(
{
"title": r.get("title", ""),
"url": r.get("href", ""),
"snippet": r.get("body", ""),
}
)
return formatted_results
except Exception as e:
print(f"❌ Search failed: {e}")
return f"Search failed: {e}"
def needs_enrichment(investor: InvestorTable) -> bool:
"""Check if an investor needs enrichment based on missing fields"""
missing_fields = []
if not investor.description:
missing_fields.append("description")
if not investor.aum:
missing_fields.append("aum")
if not investor.check_size_lower or not investor.check_size_upper:
missing_fields.append("check_size")
if not investor.geographic_focus:
missing_fields.append("geographic_focus")
if not investor.investment_stages:
missing_fields.append("investment_stages")
if not investor.team_members:
missing_fields.append("team_members")
if missing_fields:
print(f"Investor {investor.name} missing: {', '.join(missing_fields)}")
return True
return False
def update_investor(session, investor: InvestorTable, data: InvestorDataScrape):
"""Update an InvestorTable row with extracted data, safely handling members and relationships."""
# --- Core investor info ---
if data.investor.description:
investor.description = data.investor.description
if data.investor.aum:
investor.aum = data.investor.aum
if data.investor.check_size_lower:
investor.check_size_lower = data.investor.check_size_lower
if data.investor.check_size_upper:
investor.check_size_upper = data.investor.check_size_upper
if data.investor.geographic_focus:
investor.geographic_focus = data.investor.geographic_focus
if data.investor.number_of_investments:
investor.number_of_investments = data.investor.number_of_investments
# --- Investment Stages (NEW) ---
if data.investment_stages:
# Get current stage IDs for comparison
current_stage_enums = {stage.stage for stage in investor.investment_stages}
for stage_data in data.investment_stages:
if stage_data.stage not in current_stage_enums:
# Check if stage already exists in database
existing_stage = (
session.query(InvestmentStageTable)
.filter_by(stage=stage_data.stage)
.first()
)
if not existing_stage:
# Create new stage record
existing_stage = InvestmentStageTable(stage=stage_data.stage)
session.add(existing_stage)
session.flush() # Get the ID
# Add to investor's stages
investor.investment_stages.append(existing_stage)
# --- Team Members ---
if data.team_members:
# Index current members by name for quick lookup
current_members = {m.name.lower(): m for m in investor.team_members if m.name}
for m in data.team_members:
if not m.name:
continue
normalized = m.name.strip().lower()
if normalized in current_members:
# Update existing member
member_obj = current_members[normalized]
if m.role:
member_obj.role = m.role
if m.email:
member_obj.email = m.email
else:
# Create new member
member_obj = InvestorMember(
name=m.name.strip(),
role=m.role,
email=m.email,
investor=investor,
)
session.add(member_obj)
# --- Sectors ---
if data.sectors:
for sector_data in data.sectors:
if not sector_data.name:
continue
# Check if sector already exists
existing_sector = (
session.query(SectorTable).filter_by(name=sector_data.name).first()
)
if not existing_sector:
existing_sector = SectorTable(name=sector_data.name)
session.add(existing_sector)
session.flush() # Get the ID
# Add relationship if not already exists
if existing_sector not in investor.sectors:
investor.sectors.append(existing_sector)
# --- Portfolio Companies ---
# if data.portfolio_companies:
# for company_data in data.portfolio_companies:
# if not company_data.name:
# continue
# # Check if company already exists
# existing_company = (
# session.query(CompanyTable).filter_by(name=company_data.name).first()
# )
# if not existing_company:
# existing_company = CompanyTable(
# name=company_data.name,
# industry=company_data.industry,
# location=company_data.location,
# description=company_data.description,
# founded_year=company_data.founded_year,
# website=company_data.website,
# )
# session.add(existing_company)
# session.flush() # Get the ID
# # Add relationship if not already exists
# if existing_company not in investor.portfolio_companies:
# investor.portfolio_companies.append(existing_company)
session.add(investor)
session.commit()
return investor
# ------------------------------------------------------------------
# Main
# ------------------------------------------------------------------
async def main():
qp = QueryProcessor(sql_session=session)
all_investors = qp.sql_session.query(InvestorTable).all() if qp.sql_session else []
# Filter investors that need enrichment
investors_to_enrich = [inv for inv in all_investors if needs_enrichment(inv)]
# print(
# f"Found {len(investors_to_enrich)} investors that need enrichment out of {len(all_investors)} total"
# )
# Process first 10 that need enrichment
for inv in investors_to_enrich[:10]:
try:
print(f"\n🔄 Processing investor: {inv.name}")
prompt = await qp.fill_investor(inv)
ai_response = await qp.agent.ainvoke({"messages": [("user", f"{prompt}")]})
extracted = ai_response["structured_response"]
# Save JSON backup
with open("enriched_investors.json", "a") as f:
f.write(f"# Investor: {inv.name}\n")
f.write(extracted.model_dump_json(indent=2) + "\n\n")
# Update database
update_investor(session, inv, extracted)
print(f"✅ Updated investor {inv.name} (id={inv.id})")
except Exception as e:
logger.error(f"Failed to enrich investor {getattr(inv, 'id', None)}: {e}")
continue
if __name__ == "__main__":
asyncio.run(main())
-408
View File
@@ -1,408 +0,0 @@
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field, field_validator
class InvestmentStage(str, Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
class SectorSchema(BaseModel):
"""
Expert parser: Only extract sector information if clearly identifiable.
Leave name empty if uncertain about the sector classification.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Sector ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Sector name. Leave empty string if not clearly identifiable from the data.",
)
@field_validator("name", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional id field"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class InvestorMemberSchema(BaseModel):
"""
Expert parser: Only extract team member information if clearly identifiable.
Leave fields empty if uncertain about the member details.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Member ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Team member name. Leave empty string if not clearly identifiable.",
)
role: Optional[str] = Field(
default=None,
description="Team member role/title. Leave empty string if not clearly identifiable.",
)
email: Optional[str] = Field(
default=None,
description="Team member email. Leave empty string if not clearly identifiable or not provided.",
)
investor_id: Optional[int] = Field(
default=None,
ge=0,
description="Investor ID, must be 0 or greater. Use 0 if uncertain.",
)
@field_validator("name", "role", "email", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "investor_id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class CompanyMemberSchema(BaseModel):
"""
Expert parser: Only extract company member information if clearly identifiable.
Leave fields empty if uncertain about the member details.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Member ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Company member name. Leave empty if not clearly identifiable.",
)
linkedin: Optional[str] = Field(
default=None,
description="LinkedIn profile URL. Leave empty if not provided or uncertain.",
)
role: Optional[str] = Field(
default=None,
description="Company member role/title. Leave empty if not clearly identifiable.",
)
company_id: Optional[int] = Field(
default=None,
ge=0,
description="Company ID, must be 0 or greater. Use 0 if uncertain.",
)
@field_validator("name", "linkedin", "role", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "company_id", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class CompanySchema(BaseModel):
"""
Expert parser: Only extract company information if clearly identifiable.
Leave optional fields empty if uncertain. Integer values must be 0 or greater.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Company ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Company name. Leave empty string if not clearly identifiable.",
)
industry: Optional[str] = Field(
default=None,
description="Company industry/sector. Leave empty string if not clearly identifiable.",
)
location: Optional[str] = Field(
default=None,
description="Company location/address. Leave empty string if not clearly identifiable.",
)
description: Optional[str] = Field(
default=None,
description="Company description. Leave empty if not clearly available or uncertain.",
)
founded_year: Optional[int] = Field(
default=None,
ge=0,
description="Year company was founded, must be 0 or greater. Leave None if not clearly identifiable or uncertain.",
)
website: Optional[str] = Field(
default=None,
description="Company website URL. Leave empty if not provided or uncertain.",
)
@field_validator(
"name", "industry", "location", "description", "website", mode="before"
)
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator("id", "founded_year", mode="before")
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for founded_year"""
if v == 0:
return None
return v
@field_validator("founded_year", mode="before")
@classmethod
def validate_founded_year(cls, v):
"""Expert parser: Only accept clearly identifiable founding years"""
if v is None or v == "Not Available" or v == "" or v == "Unknown":
return None
if isinstance(v, str):
try:
year = int(v)
return year if year >= 0 else None
except ValueError:
return None
return v if isinstance(v, int) and v >= 0 else None
class Config:
from_attributes = True
class InvestmentStageSchema(BaseModel):
"""
Investment stage schema for many-to-many relationship.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Stage ID, must be 0 or greater. Use 0 if uncertain.",
)
stage: InvestmentStage = Field(
description="Investment stage enum value. Must be one of: SEED, SERIES_A, SERIES_B, SERIES_C, GROWTH, LATE_STAGE"
)
@field_validator("id", mode="before")
@classmethod
def validate_id(cls, v):
"""Convert 0 to None for optional id field"""
if v == 0:
return None
return v
class Config:
from_attributes = True
use_enum_values = True
class InvestorSchema(BaseModel):
"""
Expert parser: Only extract investor information if clearly identifiable.
Leave optional fields empty if uncertain. All numeric values must be 0 or greater.
"""
id: Optional[int] = Field(
default=None,
ge=0,
description="Investor ID, must be 0 or greater. Use 0 if uncertain.",
)
name: Optional[str] = Field(
default=None,
description="Investor name. Do not return any special characters, Just the name as a string.",
)
description: Optional[str] = Field(
default=None,
description="Investor description. Leave empty if not clearly available or uncertain.",
)
aum: Optional[int] = Field(
default=None,
ge=0,
description="Assets Under Management in USD, must be 0 or greater. Use 0 if not clearly identifiable or uncertain.",
)
check_size_lower: Optional[int] = Field(
default=None,
ge=0,
description="Lower bound of typical investment check size in USD, must be 0 or greater. Use 0 if not clearly identifiable.",
)
check_size_upper: Optional[int] = Field(
default=None,
ge=0,
description="Upper bound of typical investment check size in USD, must be 0 or greater. Use 0 if not clearly identifiable.",
)
geographic_focus: Optional[str] = Field(
default=None,
description="Geographic investment focus. Do not return any special characters, Just locations separated by commas. Leave empty if not clearly identifiable.",
)
number_of_investments: Optional[int] = Field(
default=None,
ge=0,
description="Total number of investments made, must be 0 or greater. Use 0 if not clearly identifiable.",
)
@field_validator("name", "description", "geographic_focus", mode="before")
@classmethod
def empty_string_to_none(cls, v):
"""Convert empty strings to None"""
if v == "" or (isinstance(v, str) and v.strip() == ""):
return None
return v
@field_validator(
"id",
"aum",
"check_size_lower",
"check_size_upper",
"number_of_investments",
mode="before",
)
@classmethod
def zero_to_none(cls, v):
"""Convert 0 to None for optional integer fields"""
if v == 0:
return None
return v
class Config:
from_attributes = True
class InvestorData(BaseModel):
"""
Expert parser: Comprehensive investor data schema for LLM processing.
Only populate fields with clearly identifiable information. Leave lists empty if uncertain.
"""
investor: InvestorSchema = Field(
description="Core investor information. Only populate with clearly identifiable data."
)
portfolio_companies: List[CompanySchema] = Field(
default=[],
description="List of portfolio companies. Leave empty if not clearly identifiable.",
)
team_members: List[InvestorMemberSchema] = Field(
default=[],
description="List of team members. Leave empty if not clearly identifiable.",
)
sectors: List[SectorSchema] = Field(
default=[],
description="List of investment sectors. Leave empty if not clearly identifiable.",
)
investment_stages: List[InvestmentStageSchema] = Field(
default=[],
description="List of investment stages the investor focuses on (can be multiple). Look for terms like 'seed to series A', 'early stage', 'multi-stage', etc. Leave empty if not clearly identifiable.",
)
class Config:
from_attributes = True
class InvestorDataScrape(BaseModel):
"""
Expert parser: Comprehensive investor data schema for LLM processing.
Only populate fields with clearly identifiable information. Leave lists empty if uncertain.
"""
investor: InvestorSchema = Field(
description="Core investor information. Only populate with clearly identifiable data."
)
team_members: List[InvestorMemberSchema] = Field(
default=[],
description="List of team members. Leave empty if not clearly identifiable.",
)
sectors: List[SectorSchema] = Field(
default=[],
description="List of investment sectors. Leave empty if not clearly identifiable.",
)
investment_stages: List[InvestmentStageSchema] = Field(
default=[],
description="List of investment stages the investor focuses on (can be multiple). Look for terms like 'seed to series A', 'early stage', 'multi-stage', etc. Leave empty if not clearly identifiable.",
)
class Config:
from_attributes = True
class CompanyData(BaseModel):
"""
Expert parser: Comprehensive company data schema for LLM processing.
Only populate fields with clearly identifiable information. Leave lists empty if uncertain.
"""
company: CompanySchema = Field(
description="Core company information. Only populate with clearly identifiable data."
)
sectors: List[SectorSchema] = Field(
default=[],
description="List of company sectors. Leave empty if not clearly identifiable.",
)
members: List[CompanyMemberSchema] = Field(
default=[],
description="List of company members. Leave empty if not clearly identifiable.",
)
investors: List[InvestorSchema] = Field(
default=[],
description="List of investors. Leave empty if not clearly identifiable.",
)
class Config:
from_attributes = True
class InvestorList(BaseModel):
"""Expert parser: List of investors with clearly identifiable information only."""
investors: List[InvestorData] = Field(
default=[],
description="List of investors. Leave empty if no clearly identifiable investors.",
)
-123
View File
@@ -1,123 +0,0 @@
#!/usr/bin/env python3
"""
Quick verification script to test the new fund relationship schema
"""
import sys
sys.path.insert(0, "/home/oluwasanmi/Documents/Work/MKD/anton_wireframe/preprocessor")
from models import FundTable, InvestmentStageTable, SectorTable, get_db_session
def test_fund_relationships():
"""Test the new fund relationship schema"""
db = get_db_session()
print("🧪 Testing Fund Relationship Schema\n")
# Test 1: Check investment stages
print("1️⃣ Investment Stages:")
stages = db.query(InvestmentStageTable).all()
print(f" Found {len(stages)} stages:")
for stage in stages[:5]:
print(f" - {stage.name}")
print()
# Test 2: Check fund with relationships
print("2️⃣ Sample Fund with Relationships:")
fund = db.query(FundTable).filter(FundTable.fund_name.isnot(None)).first()
if fund:
print(f" Fund: {fund.fund_name}")
print(f" Geographic Focus: {fund.geographic_focus}")
print(f" Investment Stages ({len(fund.investment_stages)}):")
for stage in fund.investment_stages[:3]:
print(f" - {stage.name}")
print(f" Sectors ({len(fund.sectors)}):")
for sector in fund.sectors[:3]:
print(f" - {sector.name}")
else:
print(" No funds found")
print()
# Test 3: Check association tables
print("3️⃣ Association Table Stats:")
# Count fund-stage relationships
from sqlalchemy import text
result = db.execute(text("SELECT COUNT(*) FROM fund_investment_stages"))
stage_count = result.scalar()
print(f" Fund-Stage relationships: {stage_count}")
# Count fund-sector relationships
result = db.execute(text("SELECT COUNT(*) FROM fund_sectors"))
sector_count = result.scalar()
print(f" Fund-Sector relationships: {sector_count}")
print()
# Test 4: Query funds by stage
print("4️⃣ Query Test - Funds with 'Series A' stage:")
series_a_funds = (
db.query(FundTable)
.join(FundTable.investment_stages)
.filter(InvestmentStageTable.name.ilike("%Series A%"))
.limit(3)
.all()
)
print(f" Found {len(series_a_funds)} funds:")
for fund in series_a_funds:
print(f" - {fund.fund_name or 'Unnamed'}")
stages = [s.name for s in fund.investment_stages]
print(f" Stages: {', '.join(stages)}")
print()
# Test 5: Query funds by sector
print("5️⃣ Query Test - Funds investing in first sector:")
first_sector = db.query(SectorTable).first()
if first_sector:
sector_funds = (
db.query(FundTable)
.join(FundTable.sectors)
.filter(SectorTable.id == first_sector.id)
.limit(3)
.all()
)
print(f" Sector: {first_sector.name}")
print(f" Found {len(sector_funds)} funds:")
for fund in sector_funds:
print(f" - {fund.fund_name or 'Unnamed'}")
print()
# Test 6: Geographic focus string search
print("6️⃣ Query Test - Funds with Europe in geographic focus:")
europe_funds = (
db.query(FundTable)
.filter(FundTable.geographic_focus.ilike("%Europe%"))
.limit(3)
.all()
)
print(f" Found {len(europe_funds)} funds:")
for fund in europe_funds:
print(f" - {fund.fund_name or 'Unnamed'}")
print(f" Geographic Focus: {fund.geographic_focus}")
print()
print("✅ All tests completed successfully!")
db.close()
if __name__ == "__main__":
try:
test_fund_relationships()
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
View File