1 Commits

37 changed files with 296 additions and 28641 deletions
+2 -1
View File
@@ -10,7 +10,8 @@
*__pycache__
/*.db
*.cypython
nohup.out
/preprocessor
Binary file not shown.
Binary file not shown.
Binary file not shown.
-6
View File
@@ -1,5 +1,4 @@
import os
from pathlib import Path
from typing import Annotated
from fastapi import Depends
@@ -10,10 +9,6 @@ from sqlalchemy.orm import Session, sessionmaker
Base = declarative_base()
# Database configuration
# Use the preprocessor's database for consistency
# Get absolute path to the preprocessor database
# APP_DIR = Path(__file__).parent.parent
# PREPROCESSOR_DB = APP_DIR.parent / "preprocessor" / "version_two.db"
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./investors.db")
# Create engine
@@ -43,7 +38,6 @@ def get_session_sync() -> Session:
"""Get a database session for synchronous operations"""
return SessionLocal()
def get_db_session():
"""Get a database session for direct use."""
return SessionLocal()
+10 -138
View File
@@ -2,7 +2,7 @@ import enum
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Table, Text, func
from sqlalchemy.orm import declarative_mixin, relationship
from sqlalchemy.types import JSON, Enum
from sqlalchemy.types import Enum
from db.db import Base
@@ -70,22 +70,6 @@ project_company_association = Table(
Column("company_id", Integer, ForeignKey("companies.id")),
)
# Association table for fund-stage many-to-many
fund_investment_stages_association = Table(
"fund_investment_stages",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("stage_id", Integer, ForeignKey("investment_stages.id")),
)
# Association table for fund-sector many-to-many
fund_sectors_association = Table(
"fund_sectors",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
class InvestorTable(Base, TimestampMixin):
__tablename__ = "investors"
@@ -93,47 +77,14 @@ class InvestorTable(Base, TimestampMixin):
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
# Basic investor info
website = Column(String, nullable=True)
headquarters = Column(String, nullable=True)
# AUM fields
aum = Column(Integer, nullable=True) # Store as integer for numerical filtering
aum_as_of_date = Column(String, nullable=True)
aum_source_url = Column(String, nullable=True)
# Check size (deprecated in favor of fund-level data, but keeping for backward compatibility)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
# Geographic focus (deprecated in favor of fund-level, but keeping for backward compatibility)
aum = Column(Integer, nullable=True) # Assets Under Management
check_size_lower = Column(Integer, nullable=True) # Lower bound
check_size_upper = Column(Integer, nullable=True) # Upper bound
geographic_focus = Column(String, nullable=True)
# Investment thesis and portfolio
investment_thesis = Column(JSON, nullable=True) # Array of thesis statements
portfolio_highlights = Column(
JSON, nullable=True
) # Array of portfolio company names
linked_documents = Column(JSON, nullable=True) # Array of document URLs
# Research metadata
researcher_notes = Column(Text, nullable=True)
missing_important_fields = Column(
JSON, nullable=True
) # Array of missing field names
sources = Column(JSON, nullable=True) # JSON object with source URLs
# Portfolio info
stage_focus = Column(Enum(InvestmentStage), nullable=True)
number_of_investments = Column(Integer, default=0, nullable=True)
# Relationships
team_members = relationship(
"InvestorMember", back_populates="investor", cascade="all, delete-orphan"
)
funds = relationship(
"FundTable", back_populates="investor", cascade="all, delete-orphan"
)
team_members = relationship("InvestorMember", back_populates="investor")
# Relationship to portfolio companies
portfolio_companies = relationship(
@@ -160,51 +111,12 @@ class InvestorMember(Base, TimestampMixin):
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
role = Column(String, nullable=True)
title = Column(String, nullable=True) # Alternative to role
email = Column(String, nullable=True)
source_url = Column(String, nullable=True) # URL where member info was found
investor_id = Column(Integer, ForeignKey("investors.id"))
investor = relationship("InvestorTable", back_populates="team_members")
class FundTable(Base, TimestampMixin):
__tablename__ = "funds"
id = Column(Integer, primary_key=True, index=True)
investor_id = Column(Integer, ForeignKey("investors.id"), nullable=False)
# Fund details
fund_name = Column(String, nullable=True)
fund_size = Column(
Integer, nullable=True
) # Store as integer for numerical filtering
fund_size_source_url = Column(String, nullable=True)
# Check size range (parsed from estimated_investment_size by LLM)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
source_url = Column(String, nullable=True)
source_provider = Column(String, nullable=True) # e.g., "Perplexity"
# Geographic focus as simple string
geographic_focus = Column(String, nullable=True)
# Relationships
investor = relationship("InvestorTable", back_populates="funds")
investment_stages = relationship(
"InvestmentStageTable",
secondary=fund_investment_stages_association,
back_populates="funds",
)
sectors = relationship(
"SectorTable",
secondary=fund_sectors_association,
back_populates="funds",
)
class CompanyTable(Base, TimestampMixin):
__tablename__ = "companies"
@@ -216,9 +128,7 @@ class CompanyTable(Base, TimestampMixin):
founded_year = Column(Integer, nullable=True)
website = Column(String, nullable=True)
members = relationship(
"CompanyMember", back_populates="company", cascade="all, delete-orphan"
)
members = relationship("CompanyMember", back_populates="company")
# Relationship back to investors
investors = relationship(
"InvestorTable",
@@ -248,43 +158,26 @@ class CompanyMember(Base, TimestampMixin):
company = relationship("CompanyTable", back_populates="members")
class InvestmentStageTable(Base, TimestampMixin):
__tablename__ = "investment_stages"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False, unique=True)
# Relationships
funds = relationship(
"FundTable",
secondary=fund_investment_stages_association,
back_populates="investment_stages",
)
class SectorTable(Base, TimestampMixin):
__tablename__ = "sectors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
# Relationships
# Add relationship back to investors
investors = relationship(
"InvestorTable",
secondary=investor_sector_association,
back_populates="sectors",
)
companies = relationship(
"CompanyTable", secondary=company_sector_association, back_populates="sectors"
)
projects = relationship(
"ProjectTable", secondary=project_sector_association, back_populates="sector"
)
funds = relationship(
"FundTable",
secondary=fund_sectors_association,
back_populates="sectors",
)
class ProjectTable(Base, TimestampMixin):
@@ -311,24 +204,3 @@ class ProjectTable(Base, TimestampMixin):
companies = relationship(
"CompanyTable", secondary=project_company_association, back_populates="projects"
)
class InvestorInsightCache(Base, TimestampMixin):
__tablename__ = "investor_insight_cache"
id = Column(Integer, primary_key=True, index=True)
investor_id = Column(
Integer, ForeignKey("investors.id"), nullable=False, unique=True
)
# Cached insights
investment_pattern_analysis = Column(Text, nullable=False)
market_position = Column(Text, nullable=False)
# Cache management
last_refreshed = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
# Relationship to investor
investor = relationship("InvestorTable")
+9 -48
View File
@@ -5,15 +5,8 @@ from db.db import Base, db_dependency, engine
from dotenv import load_dotenv
from fastapi import FastAPI, File, Form, UploadFile
from pydantic import BaseModel
from routers import (
companies,
folk_crm,
insight_route,
investors,
projects,
report_route,
)
from schemas.router_schemas import InvestmentResponse, PaginatedResponse
from routers import companies, investors, projects
from schemas.router_schemas import InvestorList
from services.llm_parser import InvestorProcessor
from services.querying import QueryProcessor
@@ -51,27 +44,6 @@ def health():
async def parse_csv(
db: db_dependency, file: UploadFile = File(...), is_investor: int = Form(...)
):
"""
Parse and import CSV data into the database.
**For investors:**
- Expected columns: Name, Website, Final Investor Profile, Final Profile sourcing
- Manually parses JSON profiles for efficiency
- Uses LLM only for currency conversion to USD
- Handles AUM, fund sizes, and check sizes as integers
**For companies:**
- Expected columns: Name, Website, Investor, Final Investor Profile (company profile)
- 100% manual JSON parsing - no LLM needed
- Extracts company details, executives, investors, and client categories
- Automatically links companies to investors in database
**Benefits:**
- Fast processing (5-10s per record)
- Low cost (minimal or no LLM usage)
- Accurate data extraction
- Automatic database persistence
"""
# Read uploaded CSV with pandas
content = await file.read()
df = pd.read_csv(io.StringIO(content.decode("utf-8")))
@@ -80,27 +52,19 @@ async def parse_csv(
processor = InvestorProcessor()
if is_investor == 1:
# Manual parser with LLM currency conversion
results = await processor.parse_investors(df, save_to_db=True)
# Results are already dicts from the new parser
return results
results = await processor.parse_investors(df)
else:
# Manual parser for companies (no LLM needed)
results = await processor.parse_companies(df, save_to_db=True)
# Results are already dicts from the new parser
return results
results = await processor.parse_companies(df)
# Convert Pydantic objects to dictionaries
return [r.model_dump() for r in results]
@app.post(
"/query", response_model=PaginatedResponse[InvestmentResponse], tags=["Querying"]
)
@app.post("/query", response_model=InvestorList, tags=["Querying"])
async def query_investors(request: QueryRequest):
"""
Query investors using natural language.
Returns fund-level matches (one row per fund) with investor details.
This ensures only relevant funds are included in the response.
Supports queries like:
- "Show me seed stage investors"
- "Find fintech investors in Silicon Valley"
@@ -115,11 +79,8 @@ async def query_investors(request: QueryRequest):
app.include_router(investors.router)
app.include_router(companies.router)
app.include_router(projects.router)
app.include_router(folk_crm.router)
app.include_router(insight_route.router)
app.include_router(report_route.router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app="main:app", host="0.0.0.0", port=8585)
uvicorn.run(app="main:app", host="0.0.0.0", port=8585, reload=True)
Binary file not shown.
Binary file not shown.
+14 -53
View File
@@ -1,10 +1,10 @@
from typing import Optional
from typing import List, Optional
from db.db import get_db
from db.models import CompanyTable, InvestorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from schemas.router_schemas import CompanyData, PaginatedResponse
from schemas.router_schemas import CompanyData
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Company Routes"])
@@ -29,34 +29,20 @@ class CompanyUpdate(BaseModel):
website: Optional[str] = None
@router.get("/companies", response_model=PaginatedResponse[CompanyData])
def read_companies(
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Get all companies with their investor relationships (paginated)"""
# Calculate offset
offset = (page - 1) * page_size
# Get total count
total_count = (
db.query(CompanyTable)
.filter(CompanyTable.name.isnot(None), CompanyTable.description.isnot(None))
.count()
)
# Get paginated results
@router.get("/companies", response_model=List[CompanyData])
def read_companies(db: Session = Depends(get_db)):
"""Get all companies with their investor relationships"""
companies = (
db.query(CompanyTable)
.filter(CompanyTable.name.isnot(None), CompanyTable.description.isnot(None))
.filter(
CompanyTable.name.isnot(None),
CompanyTable.description.isnot(None)
)
.options(
selectinload(CompanyTable.investors),
selectinload(CompanyTable.members),
selectinload(CompanyTable.sectors),
)
.offset(offset)
.limit(page_size)
.all()
)
@@ -71,19 +57,10 @@ def read_companies(
)
company_data_list.append(company_data)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=company_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
return company_data_list
@router.get("/companies/filter", response_model=PaginatedResponse[CompanyData])
@router.get("/companies/filter", response_model=List[CompanyData])
def filter_companies(
industry: Optional[str] = Query(
None, description="Filter by industry (partial match)"
@@ -99,11 +76,9 @@ def filter_companies(
investor_name: Optional[str] = Query(
None, description="Filter by investor name (partial match)"
),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Filter companies based on various criteria (paginated)"""
"""Filter companies based on various criteria"""
# Start with base query
query = db.query(CompanyTable).options(
@@ -137,12 +112,7 @@ def filter_companies(
InvestorTable.name.ilike(f"%{investor_name}%")
)
# Get total count before pagination
total_count = query.count()
# Calculate offset and apply pagination
offset = (page - 1) * page_size
companies = query.offset(offset).limit(page_size).all()
companies = query.all()
# Transform to CompanyData format
company_data_list = []
@@ -155,16 +125,7 @@ def filter_companies(
)
company_data_list.append(company_data)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=company_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
return company_data_list
@router.get("/companies/{company_id}", response_model=CompanyData)
-190
View File
@@ -1,190 +0,0 @@
from typing import List
from db.db import get_db
from db.models import InvestorTable
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from services.crm import folk
from sqlalchemy.orm import Session, selectinload
router = APIRouter(prefix="/folk", tags=["Folk CRM"])
class GroupResponse(BaseModel):
id: str
name: str
class SyncInvestorsRequest(BaseModel):
investor_ids: List[int]
group_id: str
class SyncResult(BaseModel):
investor_id: int
investor_name: str
company_id: str
company_name: str
team_members_synced: int
person_ids: List[str]
class SyncInvestorsResponse(BaseModel):
success: bool
synced_count: int
results: List[SyncResult]
errors: List[dict]
@router.get("/groups", response_model=List[GroupResponse])
def get_folk_groups():
"""Get all groups from Folk CRM.
Returns a list of groups with their id and name that can be used
to sync investors to Folk.
"""
try:
groups_data = folk.get_groups()
items = groups_data.get("data", {}).get("items", [])
return [GroupResponse(id=item["id"], name=item["name"]) for item in items]
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to fetch groups from Folk: {str(e)}"
)
@router.post("/sync-investors", response_model=SyncInvestorsResponse)
def sync_investors_to_folk(
request: SyncInvestorsRequest, db: Session = Depends(get_db)
):
"""Sync investors to Folk CRM as companies with their team members as people.
Takes a list of investor IDs and a Folk group ID, then:
1. Creates each investor as a company in the specified Folk group
2. Creates each team member as a person linked to that company
Args:
investor_ids: List of investor IDs from the database
group_id: Folk group ID where investors should be added
Returns:
Summary of sync operation including successes and errors
"""
# Fetch investors with their team members
investors = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id.in_(request.investor_ids))
.all()
)
if not investors:
raise HTTPException(
status_code=404, detail="No investors found with the provided IDs"
)
results = []
errors = []
for investor in investors:
try:
# Create company in Folk
company_data = folk.create_company(
name=investor.name,
group_id=request.group_id,
website=investor.website,
description=investor.description,
addresses=[investor.headquarters] if investor.headquarters else None,
)
company_id = company_data.get("data", {}).get("id")
if not company_id:
errors.append(
{
"investor_id": investor.id,
"investor_name": investor.name,
"error": "No company ID returned from Folk API",
}
)
continue
# Create team members as people
person_ids = []
team_members_synced = 0
for member in investor.team_members:
try:
# Extract first name and last name from full name
name_parts = member.name.split(maxsplit=1)
first_name = name_parts[0] if name_parts else member.name
last_name = name_parts[1] if len(name_parts) > 1 else ""
# Build URLs list from source_url if available
urls_list = None
if hasattr(member, "source_url") and member.source_url:
urls_list = [member.source_url]
# Build job title from title or role
job_title = None
if hasattr(member, "title") and member.title:
job_title = member.title
elif hasattr(member, "role") and member.role:
job_title = member.role
person_data = folk.create_person(
first_name=first_name,
last_name=last_name,
email=member.email,
company_id=company_id,
group_id=request.group_id,
urls=urls_list,
jobTitle=job_title,
)
person_id = person_data.get("data", {}).get("id")
if person_id:
person_ids.append(person_id)
team_members_synced += 1
except Exception as person_error:
# Log person creation error but continue with other members
errors.append(
{
"investor_id": investor.id,
"investor_name": investor.name,
"team_member_name": member.name,
"error": f"Failed to create person: {str(person_error)}",
}
)
results.append(
SyncResult(
investor_id=investor.id,
investor_name=investor.name,
company_id=company_id,
company_name=company_data.get("data", {}).get(
"name", investor.name
),
team_members_synced=team_members_synced,
person_ids=person_ids,
)
)
except Exception as e:
errors.append(
{
"investor_id": investor.id,
"investor_name": investor.name,
"error": str(e),
}
)
return SyncInvestorsResponse(
success=len(results) > 0,
synced_count=len(results),
results=results,
errors=errors,
)
-122
View File
@@ -1,122 +0,0 @@
from datetime import datetime, timedelta
from typing import Optional
from db.db import get_db
from db.models import InvestorInsightCache, InvestorTable, ProjectTable
from fastapi import APIRouter, Depends, HTTPException
from schemas.insight_schema import InsightResponse
from services.compatibility_score import (
calculate_project_investor_compatibility,
generate_compatibility_explanation,
)
from services.insight import QueryProcessor
from sqlalchemy.orm import Session
router = APIRouter()
@router.get(
"/insights/{investor_id}", response_model=InsightResponse, tags=["Insights"]
)
async def get_insights(
investor_id: int, project_id: Optional[int] = None, db: Session = Depends(get_db)
):
"""
Get investor insights including investment pattern analysis, market position,
and optionally compatibility score with a project.
Args:
investor_id: The ID of the investor to analyze
project_id: Optional project ID to calculate compatibility score
Returns:
InsightResponse with investment_pattern_analysis, market_position,
and compatibility_score (if project_id provided)
"""
# Get investor from database
investor = db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
if not investor:
raise HTTPException(
status_code=404, detail=f"Investor with id {investor_id} not found"
)
# Check if we have cached insights
cached_insights = (
db.query(InvestorInsightCache)
.filter(InvestorInsightCache.investor_id == investor_id)
.first()
)
# Determine if cache needs refresh (older than 1 month)
needs_refresh = True
if cached_insights:
# Calculate if cache is older than 1 month
cache_age = (
datetime.now(cached_insights.last_refreshed.tzinfo)
- cached_insights.last_refreshed
)
needs_refresh = cache_age > timedelta(days=30)
# Fetch new insights if needed
if needs_refresh:
# Initialize the query processor for insights
query_processor = QueryProcessor()
# Get investment pattern analysis and market position using web search
insights = await query_processor.get_investor_insights(
investor_name=investor.name,
investor_website=investor.website,
investor_description=investor.description,
investor_headquarters=investor.headquarters,
investment_thesis=investor.investment_thesis,
portfolio_highlights=investor.portfolio_highlights,
)
# Update or create cache entry
if cached_insights:
# Update existing cache
cached_insights.investment_pattern_analysis = insights[
"investment_pattern_analysis"
]
cached_insights.market_position = insights["market_position"]
cached_insights.last_refreshed = datetime.now(
cached_insights.last_refreshed.tzinfo
)
else:
# Create new cache entry
cached_insights = InvestorInsightCache(
investor_id=investor_id,
investment_pattern_analysis=insights["investment_pattern_analysis"],
market_position=insights["market_position"],
)
db.add(cached_insights)
db.commit()
db.refresh(cached_insights)
# Calculate compatibility score if project_id is provided
compatibility_score = None
if project_id:
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(
status_code=404, detail=f"Project with id {project_id} not found"
)
# Calculate the compatibility score
score = calculate_project_investor_compatibility(
project, investor, use_funds=True
)
# Generate detailed explanation
compatibility_score = generate_compatibility_explanation(
project, investor, score, use_funds=True
)
else:
compatibility_score = "Select a project to see compatibility analysis"
return InsightResponse(
investment_pattern_analysis=cached_insights.investment_pattern_analysis,
market_position=cached_insights.market_position,
compatibility_score=compatibility_score,
)
+113 -437
View File
@@ -1,18 +1,11 @@
from typing import Optional
from typing import List, Optional
from db.db import get_db
from db.models import FundTable, InvestorTable, ProjectTable, SectorTable
from db.models import InvestorTable, SectorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from schemas.router_schemas import (
CompanyMinimal,
InvestmentResponse,
InvestmentStage,
InvestorData,
PaginatedResponse,
SectorMinimal,
)
from services.compatibility_score import calculate_project_investor_compatibility
from schemas.router_schemas import InvestmentStage, InvestorData
from services.querying import QueryProcessor
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Investor Routes"])
@@ -22,152 +15,53 @@ router = APIRouter(tags=["Investor Routes"])
class InvestorCreate(BaseModel):
name: str
description: Optional[str] = None
website: Optional[str] = None
headquarters: Optional[str] = None
aum: int
check_size_lower: int
check_size_upper: int
geographic_focus: str
stage_focus: InvestmentStage
number_of_investments: int = 0
class InvestorUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
website: Optional[str] = None
headquarters: Optional[str] = None
aum: Optional[int] = None
check_size_lower: Optional[int] = None
check_size_upper: Optional[int] = None
geographic_focus: Optional[str] = None
stage_focus: Optional[InvestmentStage] = None
number_of_investments: Optional[int] = None
@router.get("/investors", response_model=PaginatedResponse[InvestmentResponse])
def read_investors(
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
project_id: Optional[int] = Query(
None, description="Optional project ID for compatibility scoring"
),
db: Session = Depends(get_db),
):
"""Get all investors with their funds as separate entries (paginated)
Each investor-fund combination is returned as a separate row.
An investor with 3 funds will appear as 3 entries.
If project_id is provided, calculates compatibility scores for each investor.
"""
# Calculate offset
offset = (page - 1) * page_size
# Get total count
total_count = db.query(InvestorTable).count()
# Load project if project_id provided
project = None
if project_id is not None:
project = (
db.query(ProjectTable)
.options(selectinload(ProjectTable.sector))
.filter(ProjectTable.id == project_id)
.first()
)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Get paginated results
@router.get("/investors", response_model=List[InvestorData])
def read_investors(db: Session = Depends(get_db)):
"""Get all investors with their related data"""
investors = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds).selectinload(FundTable.investment_stages),
selectinload(InvestorTable.funds).selectinload(FundTable.sectors),
)
.offset(offset)
.limit(page_size)
.all()
)
# Transform to InvestmentResponse format (one row per investor-fund combination)
investment_responses = []
# Transform InvestorTable objects to InvestorData format
investor_data_list = []
for investor in investors:
# Calculate compatibility score if project provided
compatibility_score = 1.0
if project is not None:
compatibility_score = calculate_project_investor_compatibility(
project=project, investor=investor, use_funds=True
)
investor_data = InvestorData(
investor=investor, # This maps to InvestorSchema
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
for company in investor.portfolio_companies[:3]
]
# If investor has funds, create one entry per fund
if investor.funds:
for fund in investor.funds:
# Get stage focus as comma-separated string
stage_focus = (
", ".join([stage.name for stage in fund.investment_stages])
if fund.investment_stages
else None
)
# Get top 3 sectors from fund (id and name only)
fund_sectors = [
SectorMinimal(id=sector.id, name=sector.name)
for sector in (fund.sectors[:3] if fund.sectors else [])
]
investment_response = InvestmentResponse(
id=investor.id,
name=f"{investor.name} - {fund.fund_name}"
if fund.fund_name
else investor.name,
aum=investor.aum,
check_size_lower=fund.check_size_lower,
check_size_upper=fund.check_size_upper,
geographic_focus=fund.geographic_focus,
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=compatibility_score,
)
investment_responses.append(investment_response)
else:
# If no funds, create one entry with null fund fields
investment_response = InvestmentResponse(
id=investor.id,
name=investor.name,
aum=investor.aum,
check_size_lower=None,
check_size_upper=None,
geographic_focus=None,
stage_focus=None,
portfolio_companies=portfolio_companies,
sectors=[],
compatibility_score=compatibility_score,
)
investment_responses.append(investment_response)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=investment_responses,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
return investor_data_list
@router.get("/investors/filter", response_model=PaginatedResponse[InvestmentResponse])
@router.get("/investors/filter", response_model=List[InvestorData])
def filter_investors(
stage: Optional[InvestmentStage] = Query(
None, description="Filter by investment stage"
@@ -180,145 +74,67 @@ def filter_investors(
sector: Optional[str] = Query(None, description="Sector name (partial match)"),
min_aum: Optional[int] = Query(None, description="Minimum AUM"),
max_aum: Optional[int] = Query(None, description="Maximum AUM"),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
project_id: Optional[int] = Query(
None, description="Optional project ID for compatibility scoring"
),
db: Session = Depends(get_db),
):
"""Filter investors based on various criteria (paginated)
"""Filter investors based on various criteria"""
Returns investor-fund combinations as separate rows.
Queries the funds table to find matching funds.
If project_id is provided, calculates compatibility scores for each investor.
"""
# Load project if project_id provided
project = None
if project_id is not None:
project = (
db.query(ProjectTable)
.options(selectinload(ProjectTable.sector))
.filter(ProjectTable.id == project_id)
.first()
)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Start with base query on funds table
query = db.query(FundTable).options(
selectinload(FundTable.investor).selectinload(
InvestorTable.portfolio_companies
),
selectinload(FundTable.investor).selectinload(InvestorTable.team_members),
selectinload(FundTable.investor).selectinload(InvestorTable.sectors),
selectinload(FundTable.investment_stages),
selectinload(FundTable.sectors),
# Start with base query
query = db.query(InvestorTable).options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
# Apply filters at fund level
# Apply filters
if stage:
query = query.filter(InvestorTable.stage_focus == stage)
if min_check_size is not None:
query = query.filter(FundTable.check_size_lower >= min_check_size)
query = query.filter(InvestorTable.check_size_lower >= min_check_size)
if max_check_size is not None:
query = query.filter(FundTable.check_size_upper <= max_check_size)
query = query.filter(InvestorTable.check_size_upper <= max_check_size)
if geography:
query = query.filter(FundTable.geographic_focus.ilike(f"%{geography}%"))
query = query.filter(InvestorTable.geographic_focus.ilike(f"%{geography}%"))
# Apply filters at investor level (through relationship)
if min_aum is not None:
query = query.join(FundTable.investor).filter(InvestorTable.aum >= min_aum)
query = query.filter(InvestorTable.aum >= min_aum)
if max_aum is not None:
if min_aum is None: # Only join if not already joined
query = query.join(FundTable.investor)
query = query.filter(InvestorTable.aum <= max_aum)
# Filter by sector if provided (at fund level)
# Filter by sector if provided
if sector:
query = query.join(FundTable.sectors).filter(
query = query.join(InvestorTable.sectors).filter(
SectorTable.name.ilike(f"%{sector}%")
)
# Get total count before pagination
total_count = query.count()
investors = query.all()
# Calculate offset and apply pagination
offset = (page - 1) * page_size
funds = query.offset(offset).limit(page_size).all()
# Transform to InvestmentResponse format (one row per fund)
investment_responses = []
for fund in funds:
investor = fund.investor
# Calculate compatibility score if project provided
compatibility_score = 1.0
if project is not None:
compatibility_score = calculate_project_investor_compatibility(
project=project, investor=investor, use_funds=True
)
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
for company in investor.portfolio_companies[:3]
]
# Get stage focus as comma-separated string
stage_focus = (
", ".join([stage.name for stage in fund.investment_stages])
if fund.investment_stages
else None
# Transform to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
# Get top 3 sectors from fund (id and name only)
fund_sectors = [
SectorMinimal(id=sector.id, name=sector.name)
for sector in (fund.sectors[:3] if fund.sectors else [])
]
investment_response = InvestmentResponse(
id=investor.id,
name=f"{investor.name} - {fund.fund_name}"
if fund.fund_name
else investor.name,
aum=investor.aum,
check_size_lower=fund.check_size_lower,
check_size_upper=fund.check_size_upper,
geographic_focus=fund.geographic_focus,
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=compatibility_score,
)
investment_responses.append(investment_response)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=investment_responses,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
return investor_data_list
@router.get("/investors/{investor_id}", response_model=InvestorData)
def read_investor(investor_id: int, db: Session = Depends(get_db)):
"""Get a specific investor by ID with all their funds"""
"""Get a specific investor by ID"""
investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds),
)
.filter(InvestorTable.id == investor_id)
.first()
@@ -327,13 +143,12 @@ def read_investor(investor_id: int, db: Session = Depends(get_db)):
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Transform to InvestorData format (includes funds array)
# Transform to InvestorData format
return InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
funds=investor.funds,
)
@@ -352,7 +167,6 @@ def create_investor(investor: InvestorCreate, db: Session = Depends(get_db)):
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds),
)
.filter(InvestorTable.id == db_investor.id)
.first()
@@ -364,91 +178,24 @@ def create_investor(investor: InvestorCreate, db: Session = Depends(get_db)):
portfolio_companies=investor_with_relations.portfolio_companies,
team_members=investor_with_relations.team_members,
sectors=investor_with_relations.sectors,
funds=investor_with_relations.funds,
)
@router.put("/investors/{investor_id}", response_model=InvestorData)
def update_investor(
investor_id: int, investor: InvestorUpdate, db: Session = Depends(get_db)
):
"""Update an existing investor"""
db_investor = (
db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
)
if not db_investor:
raise HTTPException(status_code=404, detail="Investor not found")
update_data = investor.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_investor, field, value)
db.commit()
db.refresh(db_investor)
# Reload with relationships
investor_with_relations = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds),
)
.filter(InvestorTable.id == investor_id)
.first()
)
# Transform to InvestorData format
return InvestorData(
investor=investor_with_relations,
portfolio_companies=investor_with_relations.portfolio_companies,
team_members=investor_with_relations.team_members,
sectors=investor_with_relations.sectors,
funds=investor_with_relations.funds,
)
@router.delete("/investors/{investor_id}")
def delete_investor(investor_id: int, db: Session = Depends(get_db)):
"""Delete an investor"""
db_investor = (
db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
)
if not db_investor:
raise HTTPException(status_code=404, detail="Investor not found")
db.delete(db_investor)
db.commit()
return {"message": "Investor deleted successfully"}
@router.get(
"/investors/{investor_id}/similar",
response_model=PaginatedResponse[InvestmentResponse],
)
@router.get("/investors/{investor_id}/similar", response_model=List[InvestorData])
def find_similar_investors(
investor_id: int,
investor_id: int,
limit: int = Query(10, description="Maximum number of similar investors to return"),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
db: Session = Depends(get_db)
):
"""Find investors similar to a given investor based on characteristics (paginated)
Returns investor-fund combinations as separate rows.
Queries the funds table to find matching funds.
"""
# Get the target investor to get their funds for comparison
"""Find investors similar to a given investor based on characteristics"""
# Get the target investor
target_investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds).selectinload(FundTable.investment_stages),
selectinload(InvestorTable.funds).selectinload(FundTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
@@ -457,149 +204,78 @@ def find_similar_investors(
if not target_investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Get target investor's sector IDs for comparison (from their funds)
target_sector_ids = set()
target_stage_ids = set()
target_check_ranges = []
target_geographies = []
# Get target investor's sector IDs for comparison
target_sector_ids = {sector.id for sector in target_investor.sectors}
for fund in target_investor.funds:
if fund.sectors:
target_sector_ids.update({sector.id for sector in fund.sectors})
if fund.investment_stages:
target_stage_ids.update({stage.id for stage in fund.investment_stages})
if fund.check_size_lower and fund.check_size_upper:
target_check_ranges.append((fund.check_size_lower, fund.check_size_upper))
if fund.geographic_focus:
target_geographies.append(fund.geographic_focus.lower())
# Query all funds from other investors
candidate_funds = (
db.query(FundTable)
# Query all other investors with their relationships
candidates = (
db.query(InvestorTable)
.options(
selectinload(FundTable.investor).selectinload(
InvestorTable.portfolio_companies
),
selectinload(FundTable.investor).selectinload(InvestorTable.team_members),
selectinload(FundTable.investor).selectinload(InvestorTable.sectors),
selectinload(FundTable.investment_stages),
selectinload(FundTable.sectors),
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.join(FundTable.investor)
.filter(InvestorTable.id != investor_id)
.all()
)
# Calculate similarity scores for each fund
scored_funds = []
for fund in candidate_funds:
# Calculate similarity scores
scored_investors = []
for candidate in candidates:
score = 0
# Stage focus match (30 points)
if candidate.stage_focus == target_investor.stage_focus:
score += 30
# Geographic focus match (20 points for exact, 10 for partial)
if fund.geographic_focus and target_geographies:
fund_geo_lower = fund.geographic_focus.lower()
for target_geo in target_geographies:
if fund_geo_lower == target_geo:
score += 20
break
elif fund_geo_lower in target_geo or target_geo in fund_geo_lower:
score += 10
break
if candidate.geographic_focus and target_investor.geographic_focus:
if candidate.geographic_focus.lower() == target_investor.geographic_focus.lower():
score += 20
elif (candidate.geographic_focus.lower() in target_investor.geographic_focus.lower() or
target_investor.geographic_focus.lower() in candidate.geographic_focus.lower()):
score += 10
# Check size overlap (20 points max)
if fund.check_size_lower and fund.check_size_upper and target_check_ranges:
max_overlap_score = 0
for target_lower, target_upper in target_check_ranges:
overlap_start = max(fund.check_size_lower, target_lower)
overlap_end = min(fund.check_size_upper, target_upper)
if overlap_end > overlap_start:
overlap = overlap_end - overlap_start
target_range = target_upper - target_lower
overlap_ratio = overlap / target_range if target_range > 0 else 0
max_overlap_score = max(max_overlap_score, int(20 * overlap_ratio))
score += max_overlap_score
if (candidate.check_size_lower and candidate.check_size_upper and
target_investor.check_size_lower and target_investor.check_size_upper):
# Calculate overlap percentage
overlap_start = max(candidate.check_size_lower, target_investor.check_size_lower)
overlap_end = min(candidate.check_size_upper, target_investor.check_size_upper)
if overlap_end > overlap_start:
overlap = overlap_end - overlap_start
target_range = target_investor.check_size_upper - target_investor.check_size_lower
overlap_ratio = overlap / target_range if target_range > 0 else 0
score += int(20 * overlap_ratio)
# AUM similarity (15 points max)
if fund.investor.aum and target_investor.aum:
aum_diff = abs(fund.investor.aum - target_investor.aum)
max_aum = max(fund.investor.aum, target_investor.aum)
if candidate.aum and target_investor.aum:
aum_diff = abs(candidate.aum - target_investor.aum)
max_aum = max(candidate.aum, target_investor.aum)
similarity_ratio = 1 - (aum_diff / max_aum) if max_aum > 0 else 0
score += int(15 * similarity_ratio)
# Sector overlap (30 points max)
if fund.sectors and target_sector_ids:
fund_sector_ids = {sector.id for sector in fund.sectors}
common_sectors = target_sector_ids.intersection(fund_sector_ids)
candidate_sector_ids = {sector.id for sector in candidate.sectors}
if target_sector_ids and candidate_sector_ids:
common_sectors = target_sector_ids.intersection(candidate_sector_ids)
overlap_ratio = len(common_sectors) / len(target_sector_ids)
score += int(30 * overlap_ratio)
# Investment stage match (15 points max)
if fund.investment_stages and target_stage_ids:
fund_stage_ids = {stage.id for stage in fund.investment_stages}
common_stages = target_stage_ids.intersection(fund_stage_ids)
overlap_ratio = len(common_stages) / len(target_stage_ids)
score += int(15 * overlap_ratio)
if score > 0: # Only include funds with some similarity
scored_funds.append((score, fund))
# Sort by score (descending) and take top N based on limit
scored_funds.sort(key=lambda x: x[0], reverse=True)
top_similar = scored_funds[:limit]
# Apply pagination to the top similar funds
total_count = len(top_similar)
offset = (page - 1) * page_size
paginated_similar = top_similar[offset : offset + page_size]
similar_funds = [fund for score, fund in paginated_similar]
# Transform to InvestmentResponse format (one row per fund)
investment_responses = []
for fund in similar_funds:
investor = fund.investor
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
for company in investor.portfolio_companies[:3]
]
# Get stage focus as comma-separated string
stage_focus = (
", ".join([stage.name for stage in fund.investment_stages])
if fund.investment_stages
else None
if score > 0: # Only include investors with some similarity
scored_investors.append((score, candidate))
# Sort by score (descending) and take top N
scored_investors.sort(key=lambda x: x[0], reverse=True)
similar_investors = [inv for score, inv in scored_investors[:limit]]
# Transform to InvestorData format
return [
InvestorData(
investor=inv,
portfolio_companies=inv.portfolio_companies,
team_members=inv.team_members,
sectors=inv.sectors,
)
# Get top 3 sectors from fund (id and name only)
fund_sectors = [
SectorMinimal(id=sector.id, name=sector.name)
for sector in (fund.sectors[:3] if fund.sectors else [])
]
investment_response = InvestmentResponse(
id=investor.id,
name=f"{investor.name} - {fund.fund_name}"
if fund.fund_name
else investor.name,
aum=investor.aum,
check_size_lower=fund.check_size_lower,
check_size_upper=fund.check_size_upper,
geographic_focus=fund.geographic_focus,
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=1.0,
)
investment_responses.append(investment_response)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=investment_responses,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
for inv in similar_investors
]
+8 -47
View File
@@ -14,26 +14,14 @@ from schemas.project_schemas import (
ProjectData,
ProjectUpdate,
)
from schemas.router_schemas import PaginatedResponse
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Project Routes"])
@router.get("/projects", response_model=PaginatedResponse[ProjectData])
def read_projects(
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Get all projects with their related data (paginated)"""
# Calculate offset
offset = (page - 1) * page_size
# Get total count
total_count = db.query(ProjectTable).count()
# Get paginated results
@router.get("/projects", response_model=List[ProjectData])
def read_projects(db: Session = Depends(get_db)):
"""Get all projects with their related data"""
projects = (
db.query(ProjectTable)
.options(
@@ -41,8 +29,6 @@ def read_projects(
selectinload(ProjectTable.investors),
selectinload(ProjectTable.companies),
)
.offset(offset)
.limit(page_size)
.all()
)
@@ -57,16 +43,7 @@ def read_projects(
)
project_data_list.append(project_data)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=project_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
return project_data_list
@router.get("/projects/{project_id}", response_model=ProjectData)
@@ -174,7 +151,7 @@ def delete_project(project_id: int, db: Session = Depends(get_db)):
return {"message": "Project deleted successfully"}
@router.get("/projects/filter", response_model=PaginatedResponse[ProjectData])
@router.get("/projects/filter", response_model=List[ProjectData])
def filter_projects(
stage: Optional[InvestmentStage] = Query(
None, description="Filter by project stage"
@@ -189,11 +166,9 @@ def filter_projects(
company_name: Optional[str] = Query(
None, description="Company name (partial match)"
),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
db: Session = Depends(get_db),
):
"""Filter projects based on various criteria (paginated)"""
"""Filter projects based on various criteria"""
# Start with base query
query = db.query(ProjectTable).options(
@@ -230,12 +205,7 @@ def filter_projects(
CompanyTable.name.ilike(f"%{company_name}%")
)
# Get total count before pagination
total_count = query.count()
# Calculate offset and apply pagination
offset = (page - 1) * page_size
projects = query.offset(offset).limit(page_size).all()
projects = query.all()
# Transform to ProjectData format
project_data_list = []
@@ -248,16 +218,7 @@ def filter_projects(
)
project_data_list.append(project_data)
# Calculate total pages
total_pages = (total_count + page_size - 1) // page_size
return PaginatedResponse(
items=project_data_list,
total=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
)
return project_data_list
# Association management routes
-121
View File
@@ -1,121 +0,0 @@
from typing import Optional
from db.db import get_db
from db.models import FundTable, InvestorTable, ProjectTable
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import Response
from services.report_gen import ReportGenerator
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Report Generation"])
@router.get("/report/investor/{investor_id}")
async def generate_investor_report(
investor_id: int,
project_id: Optional[int] = Query(
None, description="Optional project ID for compatibility analysis"
),
db: Session = Depends(get_db),
):
"""
Generate a PDF report for an investor profile.
Args:
investor_id: The ID of the investor to generate a report for
project_id: Optional project ID to include mandate match analysis
Returns:
PDF file as a downloadable response
"""
# Fetch investor data with all relationships
investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds).selectinload(FundTable.investment_stages),
selectinload(InvestorTable.funds).selectinload(FundTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
)
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Prepare investor data dictionary
investor_data = {
"name": investor.name,
"description": investor.description,
"website": investor.website,
"headquarters": investor.headquarters,
"aum": investor.aum,
"geographic_focus": investor.geographic_focus,
"portfolio_highlights": investor.portfolio_highlights or [],
"investment_thesis": investor.investment_thesis or [],
"sectors": [sector.name for sector in investor.sectors],
"team_members": [
{
"name": member.name,
"role": member.role,
"title": member.title,
"email": member.email,
}
for member in investor.team_members
],
"check_size_lower": None,
"check_size_upper": None,
"investment_stages": [],
}
# Get check sizes and stages from funds
if investor.funds:
# Use the first fund's data or aggregate
fund = investor.funds[0]
investor_data["check_size_lower"] = fund.check_size_lower
investor_data["check_size_upper"] = fund.check_size_upper
# Aggregate all investment stages from all funds
stages = set()
for fund in investor.funds:
for stage in fund.investment_stages:
stages.add(stage.name)
investor_data["investment_stages"] = list(stages)
# Fetch project data if project_id is provided
project_data = None
if project_id:
project = (
db.query(ProjectTable)
.options(selectinload(ProjectTable.sector))
.filter(ProjectTable.id == project_id)
.first()
)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project_data = {
"name": project.name,
"description": project.description,
"location": project.location,
"valuation": project.valuation,
"stage": project.stage.name if project.stage else None,
"sectors": [sector.name for sector in project.sector],
}
# Generate PDF report
report_generator = ReportGenerator()
pdf_bytes = await report_generator.generate_investor_report(
investor_data, project_data
)
# Return PDF as downloadable file
filename = f"{investor.name.replace(' ', '_')}_Report.pdf"
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
Binary file not shown.
-18
View File
@@ -1,18 +0,0 @@
from typing import Optional
from pydantic import BaseModel
class InsightResponse(BaseModel):
investment_pattern_analysis: str
market_position: str
compatibility_score: Optional[str] = None
class Config:
json_schema_extra = {
"example": {
"investment_pattern_analysis": "Sequoia has been increasingly active in AI/ML startups (43% increase in last 18 months). Their average investment size has grown 23% year-over-year, indicating confidence in larger rounds. Peak activity in Q2-Q3, suggesting seasonal investment patterns.",
"market_position": "Top 3 most active VC in enterprise software deals. Strong presence in unicorn companies (47 portfolio unicorns). Consistently leads or co-leads rounds, indicating decision-making influence.",
"compatibility_score": "0.85",
}
}
+1 -1
View File
@@ -30,7 +30,7 @@ class InvestorSchema(BaseModel):
check_size_lower: int | None
check_size_upper: int | None
geographic_focus: str | None
stage_focus: Optional[InvestmentStage] = None
stage_focus: InvestmentStage
number_of_investments: int | None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
+4
View File
@@ -258,6 +258,10 @@ class InvestorSchema(BaseModel):
default=None,
description="Geographic investment focus. Do not return any special characters, Just locations separated by commas. Leave empty if not clearly identifiable.",
)
stage_focus: InvestmentStage = Field(
default=InvestmentStage.SEED,
description="Investment stage focus. Use SEED as default if uncertain.",
)
number_of_investments: Optional[int] = Field(
default=None,
ge=0,
+7 -168
View File
@@ -1,12 +1,9 @@
from datetime import datetime
from enum import Enum
from typing import Any, Generic, List, Optional, TypeVar
from typing import List, Optional
from pydantic import BaseModel
# Generic type for pagination
T = TypeVar("T")
class InvestmentStage(str, Enum):
SEED = "SEED"
@@ -25,14 +22,6 @@ class SectorSchema(BaseModel):
from_attributes = True
class InvestmentStageSchema(BaseModel):
id: int
name: str
class Config:
from_attributes = True
class InvestorMemberSchema(BaseModel):
id: int
name: str
@@ -43,25 +32,6 @@ class InvestorMemberSchema(BaseModel):
from_attributes = True
class FundSchema(BaseModel):
id: int
fund_name: str | None
fund_size: int | None # Changed to int for numerical filtering
fund_size_source_url: str | None
check_size_lower: int | None # NEW: Lower bound of check size range
check_size_upper: int | None # NEW: Upper bound of check size range
source_url: str | None
source_provider: str | None
geographic_focus: str | None # Changed from List[str] to string
investment_stages: List[InvestmentStageSchema] | None # Changed to relationship
sectors: List[SectorSchema] | None # Changed to relationship
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class CompanyMemberSchema(BaseModel):
id: int
name: Optional[str]
@@ -92,20 +62,11 @@ class InvestorSchema(BaseModel):
id: int
name: str
description: Optional[str]
website: Optional[str] = None
headquarters: Optional[str] = None
aum: int | None
aum_as_of_date: str | None = None
aum_source_url: str | None = None
check_size_lower: int | None
check_size_upper: int | None
geographic_focus: str | None
investment_thesis: Any = (
None # Flexible JSON field - can be list, dict, or list of dicts
)
portfolio_highlights: Any = (
None # Flexible JSON field - can be list, dict, or list of dicts
)
stage_focus: InvestmentStage
number_of_investments: int | None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@@ -115,82 +76,22 @@ class InvestorSchema(BaseModel):
class InvestorData(BaseModel):
"""Comprehensive investor data schema - used for individual investor requests"""
"""Comprehensive investor data schema for LLM processing"""
investor: InvestorSchema
portfolio_companies: List[CompanySchema]
team_members: List[InvestorMemberSchema]
sectors: List[SectorSchema]
funds: List[FundSchema]
class Config:
from_attributes = True
class InvestorFundData(BaseModel):
"""Investor-Fund combined data - used for list/filter requests
Each row represents one investor-fund combination.
An investor with 3 funds will appear as 3 separate entries.
"""
# Investor fields
investor_id: int
investor_name: str
investor_description: Optional[str]
investor_website: Optional[str]
investor_headquarters: Optional[str]
aum: int | None
aum_as_of_date: str | None
aum_source_url: str | None
investment_thesis: Any = None # Flexible JSON field
portfolio_highlights: Any = None # Flexible JSON field
number_of_investments: int | None
# Fund fields
fund_id: int | None
fund_name: str | None
fund_size: int | None # Changed to int for numerical filtering
fund_size_source_url: str | None
check_size_lower: int | None # NEW: Lower bound of check size range
check_size_upper: int | None # NEW: Upper bound of check size range
geographic_focus: str | None # Changed from List[str] to string
fund_investment_stages: (
List[InvestmentStageSchema] | None
) # Changed to relationship
fund_sectors: List[SectorSchema] | None # Changed to relationship
# Related data
portfolio_companies: List[CompanySchema]
team_members: List[InvestorMemberSchema]
sectors: List[SectorSchema]
class Config:
from_attributes = True
class InvestorMinimal(BaseModel):
"""Minimal investor info with just id and name"""
id: int
name: str
class Config:
from_attributes = True
class CompanySchemaMinimal(BaseModel):
id: int
name: str
industry: str | None
location: str | None
founded_year: Optional[int]
website: Optional[str]
class Config:
from_attributes = True
class CompanyData(BaseModel): # Renamed from CompaniesData for consistency
company: CompanySchemaMinimal
investors: List[InvestorMinimal]
company: CompanySchema
sectors: List[SectorSchema]
members: List[CompanyMemberSchema]
investors: List[InvestorSchema]
class Config:
from_attributes = True
@@ -198,65 +99,3 @@ class CompanyData(BaseModel): # Renamed from CompaniesData for consistency
class InvestorList(BaseModel):
investors: List[InvestorData]
class InvestorFundList(BaseModel):
"""List of investor-fund combinations"""
investor_funds: List[InvestorFundData]
class CompanyMinimal(BaseModel):
"""Minimal company info with just id and name"""
id: int
name: str
class Config:
from_attributes = True
class SectorMinimal(BaseModel):
"""Minimal sector info with just id and name"""
id: int
name: str
class Config:
from_attributes = True
class InvestmentResponse(BaseModel):
"""Simplified investment response schema
One row per investor-fund combination with streamlined data
"""
id: int # Investor ID
name: (
str # Combination of investor name and fund name (e.g., "Investor A - Fund A")
)
aum: int | None # From investor
check_size_lower: int | None # From fund
check_size_upper: int | None # From fund
geographic_focus: str | None # From fund
stage_focus: str | None # Comma-separated stages from fund
portfolio_companies: List[CompanyMinimal] # Top 3 companies from investor
sectors: List[SectorMinimal] # Top 3 sectors from fund
compatibility_score: float # 0 to 1 (default 1 for now)
class Config:
from_attributes = True
class PaginatedResponse(BaseModel, Generic[T]):
"""Generic paginated response schema"""
items: List[T]
total: int
page: int
page_size: int
total_pages: int
class Config:
from_attributes = True
Binary file not shown.
Binary file not shown.
-646
View File
@@ -1,646 +0,0 @@
"""
Compatibility Score Service
This module calculates compatibility scores between projects and investors.
The scoring system evaluates multiple dimensions to determine how well a project
matches with an investor's investment criteria.
"""
from typing import List, Optional, Tuple
from db.models import FundTable, InvestorTable, ProjectTable
def calculate_project_investor_compatibility(
project: ProjectTable, investor: InvestorTable, use_funds: bool = True
) -> float:
"""
Calculate compatibility score between a project and an investor.
Args:
project: The project to evaluate
investor: The investor to compare against
use_funds: If True, evaluates against investor's funds. If False, uses investor-level data.
Returns:
A score between 0 and 1, where 1 is perfect match
Scoring breakdown (out of 100 points):
- Investment Stage Match: 30 points
- Sector Overlap: 30 points
- Geographic Match: 20 points
- Valuation/Check Size Fit: 20 points
"""
if use_funds and investor.funds:
# Calculate score for each fund and return the highest
max_score = 0.0
for fund in investor.funds:
fund_score = _calculate_project_fund_compatibility(project, fund)
max_score = max(max_score, fund_score)
return max_score
else:
# Use investor-level data (fallback)
return _calculate_project_investor_direct_compatibility(project, investor)
def calculate_project_investors_compatibility(
project: ProjectTable, investors: List[InvestorTable], use_funds: bool = True
) -> List[Tuple[InvestorTable, float]]:
"""
Calculate compatibility scores between a project and multiple investors.
Args:
project: The project to evaluate
investors: List of investors to compare against
use_funds: If True, evaluates against investors' funds. If False, uses investor-level data.
Returns:
List of tuples (investor, score) sorted by score descending
"""
scored_investors = []
for investor in investors:
score = calculate_project_investor_compatibility(project, investor, use_funds)
scored_investors.append((investor, score))
# Sort by score descending
scored_investors.sort(key=lambda x: x[1], reverse=True)
return scored_investors
def _calculate_project_fund_compatibility(
project: ProjectTable, fund: FundTable
) -> float:
"""
Calculate compatibility score between a project and a specific fund.
Scoring breakdown:
- Investment Stage Match: 30 points (all or nothing if stage exists)
- Sector Overlap: 30 points (proportional to overlap)
- Geographic Match: 20 points (exact=20, partial=10, none=0)
- Valuation/Check Size Fit: 20 points (proportional to fit)
Returns:
A score between 0 and 1
"""
total_score = 0
max_score = 100
# 1. Investment Stage Match (30 points)
stage_score = 0
if project.stage and fund.investment_stages:
# Check if project stage matches any of the fund's investment stages
fund_stage_names = {stage.name for stage in fund.investment_stages}
# Convert project.stage enum to string for comparison
project_stage_name = (
project.stage.value
if hasattr(project.stage, "value")
else str(project.stage)
)
if project_stage_name in fund_stage_names:
stage_score = 30
else:
# Partial credit for adjacent stages
stage_score = _calculate_stage_proximity(
project_stage_name, fund_stage_names
)
total_score += stage_score
# 2. Sector Overlap (30 points)
sector_score = 0
if project.sector and fund.sectors:
project_sector_ids = {sector.id for sector in project.sector}
fund_sector_ids = {sector.id for sector in fund.sectors}
if project_sector_ids and fund_sector_ids:
common_sectors = project_sector_ids.intersection(fund_sector_ids)
# Score based on what percentage of project sectors are covered by fund
overlap_ratio = len(common_sectors) / len(project_sector_ids)
sector_score = int(30 * overlap_ratio)
total_score += sector_score
# 3. Geographic Match (20 points)
geo_score = 0
if project.location and fund.geographic_focus:
project_location_lower = project.location.lower()
fund_geo_lower = (fund.geographic_focus or "").lower()
# Exact match
if project_location_lower == fund_geo_lower:
geo_score = 20
# Partial match (one contains the other)
elif (
project_location_lower in fund_geo_lower
or fund_geo_lower in project_location_lower
):
geo_score = 10
# Check for common geographic terms
elif _check_geographic_overlap(project_location_lower, fund_geo_lower):
geo_score = 5
total_score += geo_score
# 4. Valuation/Check Size Fit (20 points)
valuation_score = 0
if project.valuation and fund.check_size_lower and fund.check_size_upper:
# Check if project valuation falls within or near the check size range
# Typically, check size is a fraction of valuation (e.g., 10-20%)
# We'll assume check size represents potential investment amount
if fund.check_size_lower <= project.valuation <= fund.check_size_upper:
# Valuation is within the check size range (might be too small)
valuation_score = 10
else:
# Check if the check size is reasonable for this valuation
# Typical investment is 10-30% of valuation
reasonable_valuation_min = fund.check_size_lower * 3 # Investing ~33%
reasonable_valuation_max = fund.check_size_upper * 10 # Investing ~10%
if (
reasonable_valuation_min
<= project.valuation
<= reasonable_valuation_max
):
# Perfect fit
valuation_score = 20
elif project.valuation < reasonable_valuation_min:
# Project might be too small
ratio = (
project.valuation / reasonable_valuation_min
if reasonable_valuation_min > 0
else 0
)
valuation_score = int(10 * ratio)
else:
# Project might be too large
ratio = (
reasonable_valuation_max / project.valuation
if project.valuation > 0
else 0
)
valuation_score = int(10 * ratio)
total_score += valuation_score
# Convert to 0-1 scale
return total_score / max_score
def _calculate_project_investor_direct_compatibility(
project: ProjectTable, investor: InvestorTable
) -> float:
"""
Calculate compatibility using investor-level data (fallback when no funds available).
Uses the same scoring system but with investor-level attributes.
"""
total_score = 0
max_score = 100
# 1. Investment Stage - Skip this since investors don't have a direct stage field
# We could add 30 points to other categories, but for consistency, we'll leave it as 0
stage_score = 0
total_score += stage_score
# 2. Sector Overlap (30 points)
sector_score = 0
if project.sector and investor.sectors:
project_sector_ids = {sector.id for sector in project.sector}
investor_sector_ids = {sector.id for sector in investor.sectors}
if project_sector_ids and investor_sector_ids:
common_sectors = project_sector_ids.intersection(investor_sector_ids)
overlap_ratio = len(common_sectors) / len(project_sector_ids)
sector_score = int(30 * overlap_ratio)
total_score += sector_score
# 3. Geographic Match (20 points)
geo_score = 0
if project.location and investor.geographic_focus:
project_location_lower = project.location.lower()
investor_geo_lower = (investor.geographic_focus or "").lower()
if project_location_lower == investor_geo_lower:
geo_score = 20
elif (
project_location_lower in investor_geo_lower
or investor_geo_lower in project_location_lower
):
geo_score = 10
elif _check_geographic_overlap(project_location_lower, investor_geo_lower):
geo_score = 5
total_score += geo_score
# 4. Valuation/Check Size Fit (20 points)
valuation_score = 0
if project.valuation and investor.check_size_lower and investor.check_size_upper:
reasonable_valuation_min = investor.check_size_lower * 3
reasonable_valuation_max = investor.check_size_upper * 10
if reasonable_valuation_min <= project.valuation <= reasonable_valuation_max:
valuation_score = 20
elif project.valuation < reasonable_valuation_min:
ratio = (
project.valuation / reasonable_valuation_min
if reasonable_valuation_min > 0
else 0
)
valuation_score = int(10 * ratio)
else:
ratio = (
reasonable_valuation_max / project.valuation
if project.valuation > 0
else 0
)
valuation_score = int(10 * ratio)
total_score += valuation_score
# Convert to 0-1 scale
return total_score / max_score
def _calculate_stage_proximity(project_stage: str, fund_stages: set) -> int:
"""
Calculate proximity score between project stage and fund stages.
Awards partial credit for adjacent investment stages.
Stage progression: SEED -> SERIES_A -> SERIES_B -> SERIES_C -> GROWTH -> LATE_STAGE
Returns:
Score from 0-15 (half credit for adjacent stages)
"""
stage_order = ["SEED", "SERIES_A", "SERIES_B", "SERIES_C", "GROWTH", "LATE_STAGE"]
try:
project_idx = stage_order.index(project_stage)
except ValueError:
return 0
# Check for adjacent stages
adjacent_stages = []
if project_idx > 0:
adjacent_stages.append(stage_order[project_idx - 1])
if project_idx < len(stage_order) - 1:
adjacent_stages.append(stage_order[project_idx + 1])
for stage in fund_stages:
if stage in adjacent_stages:
return 15 # Half credit for adjacent stage
return 0
def _check_geographic_overlap(location1: str, location2: str) -> bool:
"""
Check for common geographic terms between two locations.
Examples:
- "San Francisco, CA" and "California" -> True
- "New York" and "USA" -> True (if both contain USA/US)
- "London, UK" and "United Kingdom" -> True
"""
# Common geographic groupings
geo_groups = [
["usa", "us", "united states", "america"],
["uk", "united kingdom", "britain"],
["california", "ca"],
["new york", "ny"],
["texas", "tx"],
["europe", "eu"],
["asia", "asian"],
["africa", "african"],
]
for group in geo_groups:
found_in_1 = any(term in location1 for term in group)
found_in_2 = any(term in location2 for term in group)
if found_in_1 and found_in_2:
return True
return False
def get_top_compatible_investors(
project: ProjectTable,
investors: List[InvestorTable],
limit: int = 10,
min_score: float = 0.0,
use_funds: bool = True,
) -> List[Tuple[InvestorTable, float]]:
"""
Get the top N most compatible investors for a project.
Args:
project: The project to find investors for
investors: List of all available investors
limit: Maximum number of investors to return
min_score: Minimum compatibility score threshold (0-1)
use_funds: If True, evaluates against investors' funds
Returns:
List of tuples (investor, score) sorted by score descending,
limited to 'limit' items and filtered by min_score
"""
scored_investors = calculate_project_investors_compatibility(
project, investors, use_funds
)
# Filter by minimum score
filtered_investors = [
(investor, score) for investor, score in scored_investors if score >= min_score
]
# Return top N
return filtered_investors[:limit]
def get_compatibility_score_breakdown(
project: ProjectTable, investor: InvestorTable, fund: Optional[FundTable] = None
) -> dict:
"""
Get a detailed breakdown of the compatibility score components.
Useful for debugging or showing users why a particular score was calculated.
Returns:
Dictionary with score components and explanations
"""
if fund:
total_score = 0
# Stage score
stage_score = 0
stage_match = False
if project.stage and fund.investment_stages:
fund_stage_names = {stage.name for stage in fund.investment_stages}
project_stage_name = (
project.stage.value
if hasattr(project.stage, "value")
else str(project.stage)
)
if project_stage_name in fund_stage_names:
stage_score = 30
stage_match = True
else:
stage_score = _calculate_stage_proximity(
project_stage_name, fund_stage_names
)
# Sector score
sector_score = 0
matching_sectors = []
if project.sector and fund.sectors:
project_sector_ids = {sector.id for sector in project.sector}
fund_sector_ids = {sector.id for sector in fund.sectors}
if project_sector_ids and fund_sector_ids:
common_sectors = project_sector_ids.intersection(fund_sector_ids)
matching_sectors = [
s.name for s in fund.sectors if s.id in common_sectors
]
overlap_ratio = len(common_sectors) / len(project_sector_ids)
sector_score = int(30 * overlap_ratio)
# Geographic score
geo_score = 0
geo_match_type = "none"
if project.location and fund.geographic_focus:
project_location_lower = project.location.lower()
fund_geo_lower = fund.geographic_focus.lower()
if project_location_lower == fund_geo_lower:
geo_score = 20
geo_match_type = "exact"
elif (
project_location_lower in fund_geo_lower
or fund_geo_lower in project_location_lower
):
geo_score = 10
geo_match_type = "partial"
elif _check_geographic_overlap(project_location_lower, fund_geo_lower):
geo_score = 5
geo_match_type = "regional"
# Valuation score
valuation_score = 0
valuation_fit = "unknown"
if project.valuation and fund.check_size_lower and fund.check_size_upper:
reasonable_valuation_min = fund.check_size_lower * 3
reasonable_valuation_max = fund.check_size_upper * 10
if (
reasonable_valuation_min
<= project.valuation
<= reasonable_valuation_max
):
valuation_score = 20
valuation_fit = "perfect"
elif project.valuation < reasonable_valuation_min:
ratio = (
project.valuation / reasonable_valuation_min
if reasonable_valuation_min > 0
else 0
)
valuation_score = int(10 * ratio)
valuation_fit = "too_small"
else:
ratio = (
reasonable_valuation_max / project.valuation
if project.valuation > 0
else 0
)
valuation_score = int(10 * ratio)
valuation_fit = "too_large"
total_score = stage_score + sector_score + geo_score + valuation_score
return {
"total_score": total_score / 100,
"breakdown": {
"stage": {
"score": stage_score,
"max_score": 30,
"match": stage_match,
"project_stage": project.stage.value if project.stage else None,
"fund_stages": [s.name for s in fund.investment_stages]
if fund.investment_stages
else [],
},
"sector": {
"score": sector_score,
"max_score": 30,
"matching_sectors": matching_sectors,
"project_sectors": [s.name for s in project.sector]
if project.sector
else [],
"fund_sectors": [s.name for s in fund.sectors]
if fund.sectors
else [],
},
"geography": {
"score": geo_score,
"max_score": 20,
"match_type": geo_match_type,
"project_location": project.location,
"fund_geography": fund.geographic_focus,
},
"valuation": {
"score": valuation_score,
"max_score": 20,
"fit": valuation_fit,
"project_valuation": project.valuation,
"fund_check_size_range": f"{fund.check_size_lower}-{fund.check_size_upper}"
if fund.check_size_lower
else None,
},
},
}
else:
# Investor-level breakdown (simplified)
return {
"total_score": _calculate_project_investor_direct_compatibility(
project, investor
),
"note": "Using investor-level data (no specific fund selected)",
}
def generate_compatibility_explanation(
project: ProjectTable, investor: InvestorTable, score: float, use_funds: bool = True
) -> str:
"""
Generate a detailed, natural language explanation of the compatibility score.
Args:
project: The project being evaluated
investor: The investor being compared against
score: The calculated compatibility score (0-1)
use_funds: Whether fund-level data was used
Returns:
A formatted string with the compatibility score and detailed explanation
"""
score_percentage = int(score * 100)
# Determine match quality
if score_percentage >= 80:
match_level = "Excellent match"
elif score_percentage >= 65:
match_level = "Strong match"
elif score_percentage >= 50:
match_level = "Good match"
elif score_percentage >= 35:
match_level = "Moderate match"
else:
match_level = "Limited match"
# Collect alignment factors
alignment_factors = []
recommendations = []
# Get the best matching fund if using funds
best_fund = None
if use_funds and investor.funds:
best_score = 0
for fund in investor.funds:
fund_score = _calculate_project_fund_compatibility(project, fund)
if fund_score > best_score:
best_score = fund_score
best_fund = fund
# Analyze sector alignment
if project.sector:
project_sectors = [s.name for s in project.sector if hasattr(s, "name")]
if best_fund and best_fund.sectors:
fund_sectors = {s.name for s in best_fund.sectors if hasattr(s, "name")}
common_sectors = set(project_sectors) & fund_sectors
if common_sectors:
sectors_str = ", ".join(list(common_sectors)[:2])
alignment_factors.append(f"{sectors_str} sector focus")
elif project_sectors:
recommendations.append(
f"Consider emphasizing any {project_sectors[0]} industry connections"
)
elif investor.sectors:
investor_sectors = {s.name for s in investor.sectors if hasattr(s, "name")}
common_sectors = set(project_sectors) & investor_sectors
if common_sectors:
sectors_str = ", ".join(list(common_sectors)[:2])
alignment_factors.append(f"{sectors_str} sector focus")
# Analyze stage alignment
if project.stage:
stage_name = (
project.stage.value
if hasattr(project.stage, "value")
else str(project.stage)
)
stage_display = stage_name.replace("_", " ").title()
if best_fund and best_fund.investment_stages:
fund_stage_names = {
s.name for s in best_fund.investment_stages if hasattr(s, "name")
}
if stage_name in fund_stage_names:
alignment_factors.append(f"{stage_display} stage")
else:
recommendations.append(
"Investor typically focuses on different stages; highlight your traction and growth metrics"
)
if not best_fund:
alignment_factors.append(f"{stage_display} stage")
# Analyze geographic alignment
if project.location:
if best_fund and best_fund.geographic_focus:
if (
project.location.lower() in best_fund.geographic_focus.lower()
or best_fund.geographic_focus.lower() in project.location.lower()
):
alignment_factors.append(f"{project.location} presence")
elif investor.headquarters:
if (
project.location.lower() in investor.headquarters.lower()
or investor.headquarters.lower() in project.location.lower()
):
alignment_factors.append(f"{project.location} market presence")
# Analyze valuation/check size fit
if project.valuation:
if best_fund and best_fund.check_size_lower and best_fund.check_size_upper:
reasonable_min = best_fund.check_size_lower * 3
reasonable_max = best_fund.check_size_upper * 10
if reasonable_min <= project.valuation <= reasonable_max:
alignment_factors.append("appropriate funding stage")
elif project.valuation < reasonable_min:
recommendations.append(
"You may be early for this investor; consider approaching at a later stage"
)
else:
recommendations.append(
"Consider highlighting your growth trajectory and market opportunity"
)
# Build the explanation
explanation_parts = [f"Based on your startup profile: {score_percentage}% match"]
if alignment_factors:
alignment_text = ", ".join(alignment_factors)
explanation_parts.append(f"{match_level}: {alignment_text}.")
else:
explanation_parts.append(f"{match_level}.")
if recommendations:
rec_text = recommendations[0] # Show the most important recommendation
explanation_parts.append(rec_text + ".")
return " ".join(explanation_parts)
-260
View File
@@ -1,260 +0,0 @@
import os
import sys
import requests
class FolkAPI:
BASE_URL = "https://api.folk.app/v1"
def __init__(self, api_key: str):
self.headers = {"Authorization": f"Bearer {api_key}"}
def get_groups(self):
"""Fetch all groups from Folk."""
url = f"{self.BASE_URL}/groups"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def create_company(
self,
name: str,
group_id: str = None,
website: str = None,
linkedin_url: str = None,
description: str = None,
emails=None,
phones=None,
addresses=None,
urls=None,
custom_field_values=None,
groups=None,
**kwargs,
):
"""Create a company (investor) in a specific group.
This method builds a payload matching Folk's Create Company API:
https://developer.folk.app/api-reference/companies/create-a-company
It keeps backward compatibility with the previous `group_id`,
`website` and `linkedin_url` arguments.
"""
url = f"{self.BASE_URL}/companies"
# Build the top-level payload expected by Folk
data = {"name": name}
if description:
data["description"] = description
# Groups: prefer explicit `groups`, else fall back to `group_id`
if groups:
# Accept either list of ids or list of dicts
formatted = []
for g in groups:
if isinstance(g, dict) and g.get("id"):
formatted.append({"id": g["id"]})
else:
formatted.append({"id": str(g)})
data["groups"] = formatted
elif group_id:
data["groups"] = [{"id": group_id}]
# Helper to normalize single or multiple inputs into lists
def _to_list(val):
if val is None:
return None
if isinstance(val, (list, tuple)):
return [v for v in val if v is not None]
return [val]
# URLs: include website and linkedin_url if provided and merge with urls
urls_list = _to_list(urls) or []
if website:
urls_list.append(website)
if linkedin_url:
urls_list.append(linkedin_url)
if urls_list:
data["urls"] = urls_list
# Emails/phones/addresses
emails_list = _to_list(emails)
if emails_list:
data["emails"] = emails_list
phones_list = _to_list(phones)
if phones_list:
data["phones"] = phones_list
addresses_list = _to_list(addresses)
if addresses_list:
data["addresses"] = addresses_list
# Custom field values follow the API's structure
if custom_field_values:
data["customFieldValues"] = custom_field_values
# Allow passing any additional top-level fields via kwargs (careful)
for k, v in kwargs.items():
# don't overwrite keys we explicitly set
if k not in data:
data[k] = v
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()
def create_person(
self,
first_name: str,
last_name: str,
email: str = None,
company_id: str = None,
group_id: str = None,
companies=None,
emails=None,
phones=None,
addresses=None,
urls=None,
custom_field_values=None,
groups=None,
**kwargs,
):
"""Create a person in the workspace.
Builds payload matching Folk's Create Person API: use camelCase
keys (firstName, lastName, groups, companies, emails, etc.).
Keeps backward compatibility with `company_id` and `group_id`.
"""
url = f"{self.BASE_URL}/people"
data = {"firstName": first_name, "lastName": last_name}
# Groups: explicit `groups` preferred, else fallback to `group_id`
if groups:
formatted = []
for g in groups:
if isinstance(g, dict) and g.get("id"):
formatted.append({"id": g["id"]})
else:
formatted.append({"id": str(g)})
data["groups"] = formatted
elif group_id:
data["groups"] = [{"id": group_id}]
# Companies: keep backward compatibility with company_id
if companies:
formatted = []
for c in companies:
if isinstance(c, dict):
formatted.append(c)
elif isinstance(c, str):
# treat as id
formatted.append({"id": c})
if formatted:
data["companies"] = formatted
elif company_id:
data["companies"] = [{"id": company_id}]
# Helper to normalize into lists
def _to_list(val):
if val is None:
return None
if isinstance(val, (list, tuple)):
return [v for v in val if v is not None]
return [val]
emails_list = _to_list(emails) or []
if email:
emails_list.insert(0, email)
if emails_list:
data["emails"] = emails_list
phones_list = _to_list(phones)
if phones_list:
data["phones"] = phones_list
addresses_list = _to_list(addresses)
if addresses_list:
data["addresses"] = addresses_list
urls_list = _to_list(urls)
if urls_list:
data["urls"] = urls_list
if custom_field_values:
data["customFieldValues"] = custom_field_values
# Allow passthrough of other top-level fields in kwargs
for k, v in kwargs.items():
if k not in data:
data[k] = v
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()
# Prefer getting the API key from the environment. If not set, fall back to the
# existing (hard-coded) key so behavior is unchanged for now.
DEFAULT_API_KEY = "FOLKfIGXuv74ML9EAajxyiUR39ePaNrZ"
api_key = os.environ.get("FOLK_API_KEY", DEFAULT_API_KEY)
folk = FolkAPI(api_key=api_key)
def example_flow():
# Step 1: Get groups
groups = folk.get_groups()
print(groups)
# Safely dig into the returned structure. The API returns groups under
# groups['data']['items'] (not groups['data'][0]). Handle missing/empty.
items = groups.get("data", {}).get("items", [])
if not items:
print("No groups returned by Folk API.")
sys.exit(1)
# Choose the first group as an example
group_id = items[0].get("id")
if not group_id:
print("No id found for the first group item.")
sys.exit(1)
# Step 2: Choose a group_id and create a company
company = folk.create_company(
name="2050 Investment Partners",
group_id=group_id,
website="https://2050.com",
linkedin_url="https://linkedin.com/company/2050-investments",
)
# Step 3: Add a person to the same group or company
person = folk.create_person(
first_name="John",
last_name="Doe",
email="john@2050.com",
company_id=company.get("data", {}).get("id"),
group_id=group_id,
)
print("Created company:", company)
print("Created person:", person)
if __name__ == "__main__":
try:
example_flow()
except requests.HTTPError as e:
# Try to include response body for easier debugging if available
resp = getattr(e, "response", None)
if resp is not None:
try:
body = resp.text
except Exception:
body = "<unreadable response body>"
print("HTTP error while talking to Folk API:", e)
print("Response status:", resp.status_code)
print("Response body:", body)
else:
print("HTTP error while talking to Folk API:", e)
sys.exit(1)
except Exception as e: # pragma: no cover - top-level safety
print("Unexpected error:", e)
sys.exit(1)
-177
View File
@@ -1,177 +0,0 @@
import asyncio
import logging
import os
from crawl4ai import AsyncWebCrawler
from ddgs import DDGS
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from schemas.insight_schema import InsightResponse
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("web_search_agent")
load_dotenv()
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
if not OPENROUTER_API_KEY:
logger.warning("OPENROUTER_API_KEY not set. LLM calls will fail if invoked.")
class QueryProcessor:
def __init__(self):
self.llm = ChatOpenAI(
api_key=OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
model="openai/gpt-4o-mini",
temperature=0,
)
self.agent = create_react_agent(
model=self.llm,
tools=[self.web_search],
response_format=InsightResponse,
)
self.ddg_search = DDGS()
async def crawl(self, url: str):
"""Tool to search the web using a web crawler. given the url"""
logger.info(f"\nCrawl tool called with url: {url}")
async with AsyncWebCrawler() as crawler:
results = await crawler.arun(url)
return results.markdown
def web_search(self, query: str):
"""Tool to search the web using google, provide the relevant query to get the information"""
logger.info(f"\nWeb Search Tool Called with query: {query}")
if query:
result = self.ddg_search.text(query, max_results=10, backend="google")
return result
return "No query provided."
async def get_investor_insights(
self,
investor_name: str,
investor_website: str = None,
investor_description: str = None,
investor_headquarters: str = None,
investment_thesis: list = None,
portfolio_highlights: list = None,
) -> dict:
"""
Get investment pattern analysis and market position for an investor.
Args:
investor_name: Name of the investor/VC firm
investor_website: Website URL of the investor
investor_description: Description of the investor
investor_headquarters: Headquarters location
investment_thesis: List of investment thesis statements
portfolio_highlights: List of notable portfolio companies
Returns:
Dictionary with investment_pattern_analysis and market_position
"""
logger.info(f"Getting insights for investor: {investor_name}")
# Build context information
context_parts = [f'Investment Firm: "{investor_name}"']
if investor_website:
context_parts.append(f"Website: {investor_website}")
if investor_headquarters:
context_parts.append(f"Location: {investor_headquarters}")
if investor_description:
context_parts.append(f"Description: {investor_description}")
if investment_thesis:
thesis_str = ", ".join(investment_thesis[:3]) # Limit to first 3
context_parts.append(f"Investment Focus: {thesis_str}")
if portfolio_highlights:
portfolio_str = ", ".join(portfolio_highlights[:5]) # Limit to first 5
context_parts.append(f"Notable Portfolio Companies: {portfolio_str}")
context = "\n".join(context_parts)
prompt = f"""
Research and analyze the following investment firm:
{context}
CRITICAL INSTRUCTIONS:
- You MUST provide concrete, data-driven insights with specific numbers and percentages
- Use the web_search tool to find recent news, press releases, and investment databases (Crunchbase, PitchBook, etc.)
- If you cannot find sufficient data after searching, make reasonable inferences based on available information
- DO NOT state that data is unavailable or ambiguous - provide the best analysis possible with what you find
- Focus on ACTIONABLE insights, not disclaimers
- Only call the tool twice at most, be strategic in your searches
- Summarize your findings concisely and clearly
Provide insights in the InsightResponse schema format:
1. investment_pattern_analysis (MAX 3 SENTENCES):
- Recent investment activity and trends in the last 12-18 months
- Investment size ranges, deal frequency, and sector preferences
- Notable patterns (e.g., "increased AI investments by 40%", "average check size $5-10M")
- If specific numbers aren't available, provide reasonable estimates based on portfolio and market position
2. market_position (MAX 3 SENTENCES):
- Standing in the venture capital market
- Activity level in specific sectors and notable unicorn investments
- Deal leadership roles (lead vs co-lead) and market influence
- Regional or global market presence and competitive positioning
Use the web_search tool strategically. Search for:
- "{investor_name}" recent investments 2024 2025
- "{investor_name}" portfolio Crunchbase
- "{investor_name}" funding rounds news
- Specific portfolio companies if mentioned above
"""
try:
result = await self.agent.ainvoke({"messages": [("user", prompt)]})
# The agent with response_format=InsightResponse returns structured output
logger.info(f"Raw agent result keys: {result.keys()}")
# Check if structured_response exists and is an InsightResponse object
if "structured_response" in result:
structured = result["structured_response"]
logger.info(f"Structured response type: {type(structured)}")
# If it's already an InsightResponse object, convert to dict
if isinstance(structured, InsightResponse):
return structured.model_dump()
# If it's already a dict, return it
elif isinstance(structured, dict):
return structured
# Fallback: shouldn't reach here, but handle it gracefully
logger.warning("No structured_response found in result, using fallback")
return {
"investment_pattern_analysis": "Unable to retrieve investment pattern analysis at this time.",
"market_position": "Unable to retrieve market position at this time.",
}
except Exception as e:
logger.error(f"Error getting insights for {investor_name}: {e}")
logger.exception("Full exception details:")
return {
"investment_pattern_analysis": "Unable to retrieve investment pattern analysis at this time.",
"market_position": "Unable to retrieve market position at this time.",
}
async def main():
qp = QueryProcessor()
result = await qp.agent.ainvoke(
{"messages": [("user", "Can you tell me about 3T Finance investment company")]}
)
final_message = result["messages"][-1].content
print(final_message)
if __name__ == "__main__":
asyncio.run(main())
+83 -679
View File
@@ -1,7 +1,5 @@
import asyncio
import json
import os
import re
from typing import Optional
import pandas as pd
@@ -9,35 +7,15 @@ from db.db import get_db_session
from db.models import (
CompanyMember,
CompanyTable,
FundTable,
InvestmentStageTable,
InvestorMember,
InvestorTable,
SectorTable,
)
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
from schemas.py_schemas import CompanyData, InvestorData
from sqlalchemy.orm import Session
class CurrencyConversion(BaseModel):
"""Schema for LLM currency conversion responses"""
amount_usd: int = 0
confidence: str = "high" # high, medium, low
notes: str = ""
class CheckSizeRange(BaseModel):
"""Schema for LLM check size range parsing from estimated investment size"""
lower_bound_usd: int = 0
upper_bound_usd: int = 0
confidence: str = "high" # high, medium, low
notes: str = ""
class InvestorProcessor:
def __init__(self):
self.llm = ChatOpenAI(
@@ -47,465 +25,9 @@ class InvestorProcessor:
temperature=0,
)
# Structured LLMs for specific parsing tasks
self.currency_converter_llm = self.llm.with_structured_output(
CurrencyConversion
)
self.check_size_parser_llm = self.llm.with_structured_output(CheckSizeRange)
# Keep legacy structured LLMs for backward compatibility
self.investor_structured_llm = self.llm.with_structured_output(InvestorData)
self.company_structured_llm = self.llm.with_structured_output(CompanyData)
async def convert_to_usd(self, amount_str: str) -> Optional[int]:
"""
Use LLM to convert currency amounts to USD integers.
Handles formats like:
- "EUR 850,000,000"
- "$5M"
- "GBP 10-20 million"
- "Approximately EUR 100 million"
"""
if not amount_str or amount_str == "Not Available" or amount_str == "0":
return None
try:
prompt = f"""Convert this amount to USD as an integer (whole number, no decimals).
If it's a range, use the midpoint. If already in USD, just extract the number.
Remove all commas and convert millions/billions to actual numbers.
Amount: {amount_str}
Examples:
- "EUR 850,000,000" -> 935000000 (assuming EUR to USD rate ~1.10)
- "$5M" -> 5000000
- "GBP 10-20 million" -> 18000000 (midpoint 15M * 1.20 rate)
- "Approximately EUR 100 million" -> 110000000
Return only the USD integer amount with current exchange rates."""
result = await self.currency_converter_llm.ainvoke(prompt)
return result.amount_usd if result.amount_usd > 0 else None
except Exception as e:
print(f"Error converting currency '{amount_str}': {e}")
return None
async def parse_check_size_range(
self, estimated_investment_str: str
) -> tuple[Optional[int], Optional[int]]:
"""
Use LLM to parse check size range from estimated investment size string.
Returns tuple of (lower_bound_usd, upper_bound_usd).
Handles formats like:
- "EUR 1,000 to 2,000"
- "$100K-$500K"
- "Between $1M and $5M"
- "Up to EUR 10 million"
- "$2M typical"
"""
if (
not estimated_investment_str
or estimated_investment_str == "Not Available"
or estimated_investment_str == "0"
):
return None, None
try:
prompt = f"""Parse this check size/investment range into lower and upper bounds in USD as integers.
Input: {estimated_investment_str}
Instructions:
- If it's a range (e.g., "EUR 1M to 5M"), extract both bounds
- If it's a single amount (e.g., "$2M typical"), use it as both lower and upper
- If it says "up to X", use 0 as lower and X as upper
- Convert all currencies to USD using current exchange rates
- Return integers (whole numbers, no decimals)
Examples:
- "EUR 1,000 to 2,000" -> lower: 1100, upper: 2200
- "$100K-$500K" -> lower: 100000, upper: 500000
- "Between $1M and $5M" -> lower: 1000000, upper: 5000000
- "Up to EUR 10 million" -> lower: 0, upper: 11000000
- "$2M typical" -> lower: 2000000, upper: 2000000
- "GBP 500K-2M" -> lower: 600000, upper: 2400000
Return the lower and upper bounds in USD."""
result = await self.check_size_parser_llm.ainvoke(prompt)
lower = result.lower_bound_usd if result.lower_bound_usd > 0 else None
upper = result.upper_bound_usd if result.upper_bound_usd > 0 else None
return lower, upper
except Exception as e:
print(f"Error parsing check size range '{estimated_investment_str}': {e}")
return None, None
def parse_json_profile(self, json_str: str) -> Optional[dict]:
"""
Manually parse the JSON profile from the CSV.
Returns a cleaned dictionary with the investor profile data.
"""
if not json_str or pd.isna(json_str):
return None
try:
# Parse JSON string
profile = json.loads(json_str)
return profile
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return None
async def process_investor_profile(
self, name: str, website: str, profile_json: str
) -> Optional[dict]:
"""
Process investor profile from CSV data.
Manually extracts fields and uses LLM only for currency conversion.
"""
profile = self.parse_json_profile(profile_json)
if not profile:
return None
try:
# Extract basic info
investor_data = {
"name": name.strip() if name else None,
"website": website.strip() if website else None,
"headquarters": profile.get("headquarters"),
"description": profile.get("investorDescription"),
"aum": None,
"aum_as_of_date": None,
"aum_source_url": None,
"investment_thesis": profile.get("investmentThesisFocus", []),
"portfolio_highlights": profile.get("portfolioHighlights", []),
"linked_documents": profile.get("linkedDocuments", []),
"researcher_notes": profile.get("researcherNotes"),
"missing_important_fields": profile.get("missingImportantFields", []),
"sources": profile.get("sources", {}),
"team_members": [],
"funds": [],
}
# Process AUM
aum_data = profile.get("overallAssetsUnderManagement", {})
if aum_data and isinstance(aum_data, dict):
aum_amount = aum_data.get("aumAmount")
if aum_amount and aum_amount != "Not Available":
# Convert AUM to USD integer
aum_usd = await self.convert_to_usd(aum_amount)
investor_data["aum"] = aum_usd
investor_data["aum_as_of_date"] = aum_data.get("asOfDate")
investor_data["aum_source_url"] = aum_data.get("sourceUrl")
# Process senior leadership
senior_leadership = profile.get("seniorLeadership", [])
for member in senior_leadership:
if isinstance(member, dict) and member.get("name"):
investor_data["team_members"].append(
{
"name": member.get("name"),
"title": member.get("title"),
"role": member.get("title"), # Use title as role
"email": None,
"source_url": member.get("sourceUrl"),
}
)
# Process funds
funds = profile.get("funds", [])
for fund in funds:
if isinstance(fund, dict):
fund_data = {
"fund_name": fund.get("fundName"),
"fund_size": None,
"fund_size_source_url": fund.get("fundSizeSourceUrl"),
"check_size_lower": None,
"check_size_upper": None,
"source_url": fund.get("sourceUrl"),
"source_provider": fund.get("sourceProvider"),
"geographic_focus": None, # Will be converted to string
"investment_stage_names": fund.get("investmentStageFocus", []),
"sector_names": fund.get("sectorFocus", []),
}
# Convert geographic focus from array to comma-separated string
geo_focus = fund.get("geographicFocus", [])
if geo_focus and isinstance(geo_focus, list):
fund_data["geographic_focus"] = ", ".join(geo_focus)
# Convert fund size to USD integer
fund_size_str = fund.get("fundSize")
if fund_size_str and fund_size_str != "Not Available":
fund_size_usd = await self.convert_to_usd(fund_size_str)
if fund_size_usd:
fund_data["fund_size"] = fund_size_usd # Store as integer
# Parse check size range from estimated investment size
est_size_str = fund.get("estimatedInvestmentSize")
if est_size_str and est_size_str != "Not Available":
check_lower, check_upper = await self.parse_check_size_range(
est_size_str
)
if check_lower is not None:
fund_data["check_size_lower"] = check_lower
if check_upper is not None:
fund_data["check_size_upper"] = check_upper
investor_data["funds"].append(fund_data)
return investor_data
except Exception as e:
print(f"Error processing investor profile for {name}: {e}")
return None
async def process_company_profile(
self, name: str, website: str, profile_json: str, investor_names: str = None
) -> Optional[dict]:
"""
Process company profile from CSV data.
Only extracts founded_year and key_executives - rest is in base database.
"""
profile = self.parse_json_profile(profile_json)
if not profile:
return None
try:
# Only extract founded_year and key_executives
company_data = {
"name": name.strip() if name else None,
"founded_year": None,
"key_executives": [],
}
# Process key executives/leadership
key_executives = profile.get("keyExecutives", [])
if not key_executives:
# Try alternative field names
key_executives = profile.get("seniorLeadership", [])
for exec_member in key_executives:
if isinstance(exec_member, dict) and exec_member.get("name"):
company_data["key_executives"].append(
{
"name": exec_member.get("name"),
"title": exec_member.get("title"),
"source_url": exec_member.get("sourceUrl"),
}
)
# Try to extract founding year from description
description = profile.get("companyDescription", "")
if description:
# Look for patterns like "founded in 2020", "Gegründet 2020", "founded 2020"
year_patterns = [
r"founded in (\d{4})",
r"founded (\d{4})",
r"Gegründet (\d{4})",
r"established in (\d{4})",
r"since (\d{4})",
r"\((\d{4})\)", # Year in parentheses
]
for pattern in year_patterns:
match = re.search(pattern, description, re.IGNORECASE)
if match:
try:
year = int(match.group(1))
if 1900 <= year <= 2025: # Sanity check
company_data["founded_year"] = year
break
except Exception:
continue
return company_data
except Exception as e:
print(f"Error processing company profile for {name}: {e}")
return None
def _save_parsed_company_to_db(
self, db: Session, company_data: dict
) -> Optional[CompanyTable]:
"""Save manually parsed company data to database - only updates founded_year and key_executives"""
try:
# Check if company already exists (should exist in base database)
existing_company = (
db.query(CompanyTable).filter_by(name=company_data["name"]).first()
)
if existing_company:
# Update only founded_year on existing company
company = existing_company
if company_data.get("founded_year"):
company.founded_year = company_data["founded_year"]
else:
# Company should already be in base database, but if not found, skip
print(
f"⚠️ Company '{company_data['name']}' not found in base database - skipping"
)
return None
# Add/update company members (key executives)
# First, remove existing members if updating
db.query(CompanyMember).filter_by(company_id=company.id).delete()
for exec_data in company_data.get("key_executives", []):
member = CompanyMember(
name=exec_data.get("name"),
role=exec_data.get("title"),
linkedin=exec_data.get(
"source_url"
), # Store source URL in linkedin field
company_id=company.id,
)
db.add(member)
return company
except Exception as e:
print(f"Error saving company to database: {e}")
db.rollback()
return None
def _save_parsed_investor_to_db(
self, db: Session, investor_data: dict
) -> Optional[InvestorTable]:
"""Save manually parsed investor data to database"""
try:
# Check if investor already exists
existing_investor = (
db.query(InvestorTable).filter_by(name=investor_data["name"]).first()
)
if existing_investor:
# Update existing investor
investor = existing_investor
investor.website = investor_data.get("website") or investor.website
investor.headquarters = (
investor_data.get("headquarters") or investor.headquarters
)
investor.description = (
investor_data.get("description") or investor.description
)
investor.aum = investor_data.get("aum") or investor.aum
investor.aum_as_of_date = (
investor_data.get("aum_as_of_date") or investor.aum_as_of_date
)
investor.aum_source_url = (
investor_data.get("aum_source_url") or investor.aum_source_url
)
investor.investment_thesis = (
investor_data.get("investment_thesis") or investor.investment_thesis
)
investor.portfolio_highlights = (
investor_data.get("portfolio_highlights")
or investor.portfolio_highlights
)
investor.linked_documents = (
investor_data.get("linked_documents") or investor.linked_documents
)
investor.researcher_notes = (
investor_data.get("researcher_notes") or investor.researcher_notes
)
investor.missing_important_fields = (
investor_data.get("missing_important_fields")
or investor.missing_important_fields
)
investor.sources = investor_data.get("sources") or investor.sources
else:
# Create new investor
investor = InvestorTable(
name=investor_data["name"],
website=investor_data.get("website"),
headquarters=investor_data.get("headquarters"),
description=investor_data.get("description"),
aum=investor_data.get("aum"),
aum_as_of_date=investor_data.get("aum_as_of_date"),
aum_source_url=investor_data.get("aum_source_url"),
investment_thesis=investor_data.get("investment_thesis"),
portfolio_highlights=investor_data.get("portfolio_highlights"),
linked_documents=investor_data.get("linked_documents"),
researcher_notes=investor_data.get("researcher_notes"),
missing_important_fields=investor_data.get(
"missing_important_fields"
),
sources=investor_data.get("sources"),
)
db.add(investor)
db.flush()
# Add/update team members
# First, remove existing team members if updating
if existing_investor:
db.query(InvestorMember).filter_by(investor_id=investor.id).delete()
for member_data in investor_data.get("team_members", []):
member = InvestorMember(
name=member_data.get("name"),
role=member_data.get("role"),
title=member_data.get("title"),
email=member_data.get("email"),
source_url=member_data.get("source_url"),
investor_id=investor.id,
)
db.add(member)
# Add/update funds
# First, remove existing funds if updating
if existing_investor:
db.query(FundTable).filter_by(investor_id=investor.id).delete()
for fund_data in investor_data.get("funds", []):
fund = FundTable(
investor_id=investor.id,
fund_name=fund_data.get("fund_name"),
fund_size=fund_data.get("fund_size"), # Now an integer
fund_size_source_url=fund_data.get("fund_size_source_url"),
check_size_lower=fund_data.get("check_size_lower"),
check_size_upper=fund_data.get("check_size_upper"),
source_url=fund_data.get("source_url"),
source_provider=fund_data.get("source_provider"),
geographic_focus=fund_data.get("geographic_focus"), # Now a string
)
db.add(fund)
db.flush() # Get the fund ID
# Add investment stages (many-to-many)
for stage_name in fund_data.get("investment_stage_names", []):
stage = self._get_or_create_investment_stage(db, stage_name)
fund.investment_stages.append(stage)
# Add sectors (many-to-many)
for sector_name in fund_data.get("sector_names", []):
sector = self._get_or_create_sector(db, sector_name)
fund.sectors.append(sector)
return investor
except Exception as e:
print(f"Error saving investor to database: {e}")
db.rollback()
return None
def _get_or_create_investment_stage(
self, db: Session, stage_name: str
) -> InvestmentStageTable:
"""Get existing investment stage or create new one"""
from db.models import InvestmentStageTable
stage = (
db.query(InvestmentStageTable)
.filter(InvestmentStageTable.name == stage_name)
.first()
)
if not stage:
stage = InvestmentStageTable(name=stage_name)
db.add(stage)
db.flush() # Get the ID without committing
return stage
def _get_or_create_sector(self, db: Session, sector_name: str) -> SectorTable:
"""Get existing sector or create new one"""
sector = db.query(SectorTable).filter(SectorTable.name == sector_name).first()
@@ -527,6 +49,7 @@ Return the lower and upper bounds in USD."""
check_size_lower=investor_data.investor.check_size_lower,
check_size_upper=investor_data.investor.check_size_upper,
geographic_focus=investor_data.investor.geographic_focus,
stage_focus=investor_data.investor.stage_focus,
number_of_investments=investor_data.investor.number_of_investments,
)
db.add(investor)
@@ -650,260 +173,141 @@ Return the lower and upper bounds in USD."""
print(f"Error processing row {row_idx + 1}: {e}")
return None
async def _process_single_investor(
self, idx: int, row: pd.Series, total_rows: int
) -> Optional[dict]:
"""Process a single investor row"""
try:
name = row.get("Name", "").strip() if pd.notna(row.get("Name")) else None
website = (
row.get("Website", "").strip() if pd.notna(row.get("Website")) else None
)
profile_json = (
row.get("Final Investor Profile", "")
if pd.notna(row.get("Final Investor Profile"))
else None
)
if not name or not profile_json:
print(f"⚠️ Row {idx + 1}: Skipping - missing name or profile")
return None
print(f"📊 Processing {idx + 1}/{total_rows}: {name}")
# Process the investor profile
investor_data = await self.process_investor_profile(
name, website, profile_json
)
if investor_data:
print(f"{name} parsed successfully")
return investor_data
else:
print(f" ⚠️ {name} failed to process")
return None
except Exception as e:
print(f"❌ Error processing row {idx + 1}: {e}")
return None
async def parse_investors(
self, df: pd.DataFrame, save_to_db: bool = True, batch_size: int = 10
):
"""
Parse investors from DataFrame using manual JSON parsing and LLM for currency conversion.
Processes multiple investors concurrently for better performance.
Expected CSV columns: Name, Website, Final Investor Profile, Final Profile sourcing
Args:
df: DataFrame with investor data
save_to_db: Whether to save to database
batch_size: Number of investors to process concurrently (default: 10)
"""
results = []
async def parse_investors(self, df, save_to_db: bool = True):
"""Parse investors from DataFrame and optionally save to database"""
investors = []
df = df[20:]
db = None
if save_to_db:
db = get_db_session()
try:
total_rows = len(df)
print(
f"\n🚀 Starting to process {total_rows} investors with batch size {batch_size}..."
)
# Process rows in batches asynchronously
batch_size = 20 # Adjust batch size as needed
rows = [(idx, row) for idx, row in df.iterrows()]
# Process in batches
for batch_start in range(0, total_rows, batch_size):
batch_end = min(batch_start + batch_size, total_rows)
print(
f"\n🔄 Processing batch {batch_start + 1}-{batch_end} of {total_rows}..."
)
for i in range(0, len(rows), batch_size):
batch = rows[i : i + batch_size]
# Create tasks for concurrent processing
tasks = []
for idx in range(batch_start, batch_end):
row = df.iloc[idx]
task = self._process_single_investor(idx, row, total_rows)
tasks.append(task)
# Process batch asynchronously
tasks = [
self._process_row(row, idx, is_investor=True) for idx, row in batch
]
# Process batch concurrently
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out None results and exceptions, then save to database
for investor_data in batch_results:
if investor_data and not isinstance(investor_data, Exception):
results.append(investor_data)
# Handle results from batch
for (idx, row), result in zip(batch, batch_results):
if isinstance(result, Exception):
print(f"Error processing row {idx}: {result}")
if db:
db.rollback()
continue
# Save to database
if result:
# Convert dict to InvestorData if needed
if isinstance(result, dict):
investor_data = InvestorData(**result)
else:
investor_data = result
investors.append(investor_data)
# Save to database if requested
if save_to_db and db:
try:
saved_investor = self._save_parsed_investor_to_db(
saved_investor = self._save_investor_to_db(
db, investor_data
)
if saved_investor:
print(
f" ✅ Saved {investor_data['name']} to database (ID: {saved_investor.id})"
)
else:
print(
f" ❌ Failed to save {investor_data['name']} to database"
)
db.commit()
print(
f"✅ Saved investor '{saved_investor.name}' to database"
)
except Exception as e:
db.rollback()
print(
f" ❌ Database error for {investor_data['name']}: {e}"
)
elif isinstance(investor_data, Exception):
print(f" ❌ Exception occurred: {investor_data}")
print(f"❌ Failed to save investor to database: {e}")
# Commit batch to database
if save_to_db and db:
try:
db.commit()
print(f"💾 Committed batch {batch_start + 1}-{batch_end}")
except Exception as e:
db.rollback()
print(f"❌ Failed to commit batch: {e}")
print(
f"Completed batch {i // batch_size + 1} of {(len(rows) + batch_size - 1) // batch_size}"
)
except Exception as e:
print(f"❌ Fatal error in parse_investors: {e}")
print(f"Error in batch processing: {e}")
if db:
db.rollback()
finally:
if db:
db.close()
print(f"\n🎉 Completed! Processed {len(results)}/{total_rows} investors")
return results
return investors
async def _process_single_company(
self, idx: int, row: pd.Series, total_rows: int
) -> Optional[dict]:
"""Process a single company row"""
try:
name = row.get("Name", "").strip() if pd.notna(row.get("Name")) else None
website = (
row.get("Website", "").strip() if pd.notna(row.get("Website")) else None
)
investor_names = (
row.get("Investor", "").strip()
if pd.notna(row.get("Investor"))
else None
)
profile_json = (
row.get("Final Investor Profile", "")
if pd.notna(row.get("Final Investor Profile"))
else None
)
if not name or not profile_json:
print(f"⚠️ Row {idx + 1}: Skipping - missing name or profile")
return None
print(f"📊 Processing {idx + 1}/{total_rows}: {name}")
# Process the company profile
company_data = await self.process_company_profile(
name, website, profile_json, investor_names
)
if company_data:
print(f"{name} parsed successfully")
return company_data
else:
print(f" ⚠️ {name} failed to process")
return None
except Exception as e:
print(f"❌ Error processing row {idx + 1}: {e}")
return None
async def parse_companies(
self, df: pd.DataFrame, save_to_db: bool = True, batch_size: int = 10
):
"""
Parse companies from DataFrame using manual JSON parsing.
Processes multiple companies concurrently for better performance.
Expected CSV columns: Name, Website, Investor, Final Investor Profile (actually company profile)
Args:
df: DataFrame with company data
save_to_db: Whether to save to database
batch_size: Number of companies to process concurrently (default: 10)
"""
results = []
async def parse_companies(self, df, save_to_db: bool = True):
"""Parse companies from DataFrame and optionally save to database"""
companies = []
df = df[20:]
db = None
if save_to_db:
db = get_db_session()
try:
total_rows = len(df)
print(
f"\n🚀 Starting to process {total_rows} companies with batch size {batch_size}..."
)
# Process rows in batches asynchronously
batch_size = 20 # Adjust batch size as needed
rows = [(idx, row) for idx, row in df.iterrows()]
# Process in batches
for batch_start in range(0, total_rows, batch_size):
batch_end = min(batch_start + batch_size, total_rows)
print(
f"\n🔄 Processing batch {batch_start + 1}-{batch_end} of {total_rows}..."
)
for i in range(0, len(rows), batch_size):
batch = rows[i : i + batch_size]
# Create tasks for concurrent processing
tasks = []
for idx in range(batch_start, batch_end):
row = df.iloc[idx]
task = self._process_single_company(idx, row, total_rows)
tasks.append(task)
# Process batch asynchronously
tasks = [
self._process_row(row, idx, is_investor=False) for idx, row in batch
]
# Process batch concurrently
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out None results and exceptions, then save to database
for company_data in batch_results:
if company_data and not isinstance(company_data, Exception):
results.append(company_data)
# Handle results from batch
for (idx, row), result in zip(batch, batch_results):
if isinstance(result, Exception):
print(f"Error processing row {idx}: {result}")
if db:
db.rollback()
continue
# Save to database
if result:
# Convert dict to CompanyData if needed
if isinstance(result, dict):
company_data = CompanyData(**result)
else:
company_data = result
companies.append(company_data)
# Save to database if requested
if save_to_db and db:
try:
saved_company = self._save_parsed_company_to_db(
saved_company = self._save_company_to_db(
db, company_data
)
if saved_company:
print(
f" ✅ Saved {company_data['name']} to database (ID: {saved_company.id})"
)
else:
print(
f" ❌ Failed to save {company_data['name']} to database"
)
db.commit()
print(
f"✅ Saved company '{saved_company.name}' to database"
)
except Exception as e:
db.rollback()
print(
f" ❌ Database error for {company_data['name']}: {e}"
)
elif isinstance(company_data, Exception):
print(f" ❌ Exception occurred: {company_data}")
print(f"❌ Failed to save company to database: {e}")
# Commit batch to database
if save_to_db and db:
try:
db.commit()
print(f"💾 Committed batch {batch_start + 1}-{batch_end}")
except Exception as e:
db.rollback()
print(f"❌ Failed to commit batch: {e}")
print(
f"Completed batch {i // batch_size + 1} of {(len(rows) + batch_size - 1) // batch_size}"
)
except Exception as e:
print(f"❌ Fatal error in parse_companies: {e}")
print(f"Error processing row {idx}: {e}")
if db:
db.rollback()
finally:
if db:
db.close()
print(f"\n🎉 Completed! Processed {len(results)}/{total_rows} companies")
return results
return companies
# async def main():
+45 -138
View File
@@ -1,26 +1,16 @@
import json
import logging
import os
from typing import List, Optional
from typing import List
from db.db import DATABASE_URL, get_db
from db.models import FundTable, InvestorTable, ProjectTable
from db.models import InvestorTable
from langchain import hub
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from schemas.router_schemas import (
CompanyMinimal,
InvestmentResponse,
PaginatedResponse,
SectorMinimal,
)
from schemas.py_schemas import InvestorData, InvestorList
from sqlalchemy.orm import selectinload
from services.compatibility_score import calculate_project_investor_compatibility
logger = logging.getLogger(__name__)
# Connect to SQLite
prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt")
db = SQLDatabase.from_uri(DATABASE_URL)
@@ -35,12 +25,12 @@ class QueryProcessor:
temperature=0,
)
self.toolkit = SQLDatabaseToolkit(db=db, llm=self.llm)
# Update system message to specifically request only fund IDs
# Update system message to specifically request only investor IDs
system_message_updated = (
prompt_template.format(dialect="SQLite", top_k=5)
+ "\n\nIMPORTANT: You must ONLY return the fund IDs (id field from the funds table) that match the user's criteria. "
+ "\n\nIMPORTANT: You must ONLY return the investor IDs (id field) that match the user's criteria. "
+ "Do NOT return any other information, explanations, or data. "
+ "Your response should be ONLY a comma-separated list of numbers representing the fund IDs. "
+ "Your response should be ONLY a comma-separated list of numbers representing the investor IDs. "
+ "Example format: 1, 5, 12, 23"
)
self.agent = create_react_agent(
@@ -49,163 +39,80 @@ class QueryProcessor:
prompt=system_message_updated,
)
def process_query(
self, question: str, project_id: Optional[int] = None
) -> PaginatedResponse[InvestmentResponse]:
"""Process a query using the LLM and return investment response data.
Args:
question: The natural language query to process
project_id: Optional project ID for compatibility scoring
"""
# Let the LLM handle all database interactions and filtering to get fund IDs
def process_query(self, question: str) -> InvestorList:
"""Process a query using the LLM and return investor data."""
# Let the LLM handle all database interactions and filtering to get IDs
response = self.agent.invoke(
{"messages": [("user", question)]},
config={"recursion_limit": 50},
)
# Extract the actual message content
logger.info(f"{response}")
final_message_content = response["messages"][-1].content
logger.info(f"AI Response: \n{final_message_content}")
# Extract fund IDs from the AI response
fund_ids = self._extract_fund_ids_from_response(final_message_content)
ai_response = (
response["messages"][-1].content if response.get("messages") else ""
)
# Fetch full fund data with investor relationships using the IDs
return self._fetch_funds_by_ids(fund_ids, project_id)
# Extract investor IDs from the AI response
investor_ids = self._extract_investor_ids_from_response(ai_response)
def _extract_fund_ids_from_response(self, ai_response: str) -> List[int]:
"""Extract fund IDs from AI response."""
# Fetch full investor data using the IDs
return self._fetch_investors_by_ids(investor_ids)
def _extract_investor_ids_from_response(self, ai_response: str) -> List[int]:
"""Extract investor IDs from AI response."""
import re
fund_ids = []
investor_ids = []
try:
# Try multiple patterns to extract IDs from the response
# Pattern 1: Simple numbers (assuming they are IDs)
numbers = re.findall(r"\b\d+\b", ai_response)
fund_ids = [int(num) for num in numbers]
investor_ids = [int(num) for num in numbers]
# Pattern 2: If response contains explicit ID references
id_matches = re.findall(r"\bid[:\s]*(\d+)", ai_response.lower())
if id_matches:
fund_ids = [int(id_str) for id_str in id_matches]
investor_ids = [int(id_str) for id_str in id_matches]
except Exception as e:
print(f"Error extracting IDs from response: {e}")
return []
return fund_ids
return investor_ids
def _fetch_funds_by_ids(
self, fund_ids: List[int], project_id: Optional[int] = None
) -> PaginatedResponse[InvestmentResponse]:
"""Fetch funds with all their relationships from the database using fund IDs.
Constructs response similar to read_investors but starting from funds.
Args:
fund_ids: List of fund IDs to fetch
project_id: Optional project ID for compatibility scoring
"""
if not fund_ids:
return PaginatedResponse(
items=[],
total=0,
page=1,
page_size=len(fund_ids) if fund_ids else 10,
total_pages=0,
)
def _fetch_investors_by_ids(self, investor_ids: List[int]) -> InvestorList:
"""Fetch investors with all their relationships from the database using IDs."""
if not investor_ids:
return InvestorList(investors=[])
# Get database session
db_session = next(get_db())
try:
# Load project if project_id provided
project = None
if project_id is not None:
project = (
db_session.query(ProjectTable)
.options(selectinload(ProjectTable.sector))
.filter(ProjectTable.id == project_id)
.first()
)
# Query funds with all necessary relationships loaded
funds = (
db_session.query(FundTable)
# Build query with all relationships loaded
query = (
db_session.query(InvestorTable)
.options(
selectinload(FundTable.investor).selectinload(
InvestorTable.portfolio_companies
),
selectinload(FundTable.investor).selectinload(
InvestorTable.team_members
),
selectinload(FundTable.investor).selectinload(
InvestorTable.sectors
),
selectinload(FundTable.investment_stages),
selectinload(FundTable.sectors),
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(FundTable.id.in_(fund_ids))
.all()
.filter(InvestorTable.id.in_(investor_ids))
)
# Transform to InvestmentResponse format (one row per fund)
investment_responses = []
for fund in funds:
investor = fund.investor
investors = query.all()
# Calculate compatibility score if project provided
compatibility_score = 1.0
if project is not None:
compatibility_score = calculate_project_investor_compatibility(
project=project, investor=investor, use_funds=True
)
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
for company in investor.portfolio_companies[:3]
]
# Get stage focus as comma-separated string
stage_focus = (
", ".join([stage.name for stage in fund.investment_stages])
if fund.investment_stages
else None
# Transform to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
# Get top 3 sectors from fund (id and name only)
fund_sectors = [
SectorMinimal(id=sector.id, name=sector.name)
for sector in (fund.sectors[:3] if fund.sectors else [])
]
investment_response = InvestmentResponse(
id=investor.id,
name=f"{investor.name} - {fund.fund_name}"
if fund.fund_name
else investor.name,
aum=investor.aum,
check_size_lower=fund.check_size_lower,
check_size_upper=fund.check_size_upper,
geographic_focus=fund.geographic_focus,
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=compatibility_score,
)
investment_responses.append(investment_response)
total_count = len(investment_responses)
total_pages = 1 if total_count > 0 else 0
return PaginatedResponse(
items=investment_responses,
total=total_count,
page=1,
page_size=total_count,
total_pages=total_pages,
)
return InvestorList(investors=investor_data_list)
finally:
db_session.close()
-259
View File
@@ -1,259 +0,0 @@
from pathlib import Path
from typing import Any, Dict, List, Optional
from jinja2 import Environment, FileSystemLoader
from playwright.async_api import async_playwright
class ReportGenerator:
"""Service for generating PDF reports from HTML templates"""
def __init__(self):
# Set up Jinja2 environment
template_dir = Path(__file__).parent.parent / "templates"
self.env = Environment(loader=FileSystemLoader(str(template_dir)))
async def generate_investor_report(
self,
investor_data: Dict[str, Any],
project_data: Optional[Dict[str, Any]] = None,
) -> bytes:
"""
Generate a PDF report for an investor profile.
Args:
investor_data: Dictionary containing investor information
project_data: Optional dictionary containing project information for compatibility analysis
Returns:
bytes: PDF file content
"""
# Prepare template context
context = self._prepare_context(investor_data, project_data)
# Render HTML from template
template = self.env.get_template("report.html")
html_content = template.render(**context)
# Convert HTML to PDF using Playwright
pdf_bytes = await self._html_to_pdf(html_content)
return pdf_bytes
def _prepare_context(
self,
investor_data: Dict[str, Any],
project_data: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Prepare the context dictionary for template rendering"""
context = {
"investor": investor_data,
"project": project_data,
"compatibility_score": 0,
"match_criteria": [],
"recommendation": None,
}
# If project data is provided, calculate compatibility
if project_data:
context["compatibility_score"] = self._calculate_compatibility_score(
investor_data, project_data
)
context["match_criteria"] = self._generate_match_criteria(
investor_data, project_data
)
context["recommendation"] = self._generate_recommendation(
context["compatibility_score"], context["match_criteria"]
)
return context
def _calculate_compatibility_score(
self, investor_data: Dict[str, Any], project_data: Dict[str, Any]
) -> int:
"""Calculate overall compatibility score between investor and project"""
score = 0
weights = {
"sector": 30,
"stage": 30,
"geography": 20,
"check_size": 15,
"thesis": 5,
}
# Sector match
investor_sectors = set(investor_data.get("sectors", []))
project_sectors = set(project_data.get("sectors", []))
if investor_sectors and project_sectors:
if investor_sectors & project_sectors:
score += weights["sector"]
# Stage match
investor_stages = set(investor_data.get("investment_stages", []))
project_stage = project_data.get("stage")
if project_stage and project_stage in investor_stages:
score += weights["stage"]
# Geography match
investor_geo = (investor_data.get("geographic_focus") or "").lower()
project_geo = (project_data.get("location") or "").lower()
if investor_geo and project_geo and investor_geo in project_geo:
score += weights["geography"]
# Check size match
project_valuation = project_data.get("valuation", 0)
check_lower = investor_data.get("check_size_lower") or 0
check_upper = investor_data.get("check_size_upper") or float("inf")
if (
check_lower
and check_upper
and check_lower <= project_valuation <= check_upper
):
score += weights["check_size"]
# Thesis alignment (simplified)
score += weights["thesis"]
return min(score, 100)
def _generate_match_criteria(
self, investor_data: Dict[str, Any], project_data: Dict[str, Any]
) -> List[Dict[str, str]]:
"""Generate detailed match criteria table"""
criteria = []
# Sector criterion
investor_sectors = investor_data.get("sectors", [])
project_sectors = project_data.get("sectors", [])
sector_match = (
"Perfect" if set(investor_sectors) & set(project_sectors) else "Mismatch"
)
criteria.append(
{
"name": "Sector",
"requirement": "Cybersecurity, B2B SaaS" if project_sectors else "N/A",
"evidence": ", ".join(investor_sectors[:3])
if investor_sectors
else "N/A",
"match": sector_match,
"weight": "30%",
}
)
# Stage criterion
investor_stages = investor_data.get("investment_stages", [])
project_stage = project_data.get("stage", "N/A")
stage_match = "Perfect" if project_stage in investor_stages else "Mismatch"
criteria.append(
{
"name": "Stage",
"requirement": str(project_stage),
"evidence": ", ".join(investor_stages) if investor_stages else "N/A",
"match": stage_match,
"weight": "30%",
}
)
# Geography criterion
investor_geo = investor_data.get("geographic_focus") or "N/A"
project_geo = project_data.get("location") or "N/A"
# Safe comparison handling None values
if investor_geo == "N/A" or project_geo == "N/A":
geo_match = (
"N/A" if investor_geo == "N/A" and project_geo == "N/A" else "Mismatch"
)
else:
investor_geo_lower = investor_geo.lower()
project_geo_lower = project_geo.lower()
geo_match = (
"Strong"
if investor_geo_lower in project_geo_lower
or project_geo_lower in investor_geo_lower
else "Mismatch"
)
criteria.append(
{
"name": "Geography",
"requirement": project_geo,
"evidence": investor_geo,
"match": geo_match,
"weight": "20%",
}
)
# Check Size criterion
check_lower = investor_data.get("check_size_lower") or 0
check_upper = investor_data.get("check_size_upper") or 0
project_val = project_data.get("valuation", 0)
check_evidence = "N/A"
if check_lower and check_upper:
check_evidence = (
f"{check_lower / 1000000:.0f}M - €{check_upper / 1000000:.0f}M"
)
elif check_lower:
check_evidence = f"{check_lower / 1000000:.0f}M+"
check_match = (
"Perfect"
if check_lower and check_upper and check_lower <= project_val <= check_upper
else "Strong"
if project_val > 0
else "N/A"
)
criteria.append(
{
"name": "Check Size",
"requirement": f"{project_val / 1000000:.0f}M"
if project_val
else "N/A",
"evidence": check_evidence,
"match": check_match,
"weight": "15%",
}
)
# Thesis criterion
thesis = investor_data.get("investment_thesis", [])
criteria.append(
{
"name": "Thesis",
"requirement": "Founder-led, ESG focus",
"evidence": ", ".join(thesis[:2]) if thesis else "Entrepreneur-led",
"match": "Strong",
"weight": "5%",
}
)
return criteria
def _generate_recommendation(
self, score: int, criteria: List[Dict[str, str]]
) -> str:
"""Generate recommendation text based on score and criteria"""
if score >= 85:
return "High Priority. A strong target due to exceptional alignment on the most heavily-weighted criteria: Sector and Stage. The strong geographic fit further solidifies this recommendation."
elif score >= 70:
return "Medium Priority. Good alignment on key criteria with some areas of strong fit. The geographic fit in the target region supports this recommendation."
else:
return "Low Priority. Limited alignment on key investment criteria. Consider for future evaluation if circumstances change."
async def _html_to_pdf(self, html_content: str) -> bytes:
"""Convert HTML content to PDF using Playwright"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
# Set content and wait for any dynamic content to load
await page.set_content(html_content, wait_until="networkidle")
# Generate PDF with proper settings
pdf_bytes = await page.pdf(
format="A4",
print_background=True,
margin={"top": "0", "right": "0", "bottom": "0", "left": "0"},
)
await browser.close()
return pdf_bytes
-322
View File
@@ -1,322 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Investor Profile Report</title>
<script src="https://cdn.tailwindcss.com"></script>
<style>
@page {
size: A4;
margin: 0;
}
html,
body {
margin: 0;
padding: 0;
height: 100%;
background: white;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI",
Roboto, sans-serif;
}
/* Each page is exactly one A4 sheet */
.page {
width: 210mm;
height: 297mm;
position: relative;
background: white;
overflow: hidden;
}
/* Adds a break between pages (for print/PDF) */
.page-with-break {
page-break-after: always;
}
/* Inner content wrapper for consistent padding */
.page-content {
box-sizing: border-box;
padding: 48px; /* equivalent to Tailwind p-12 */
height: 100%;
display: flex;
flex-direction: column;
}
.tag {
display: inline-block;
padding: 4px 12px;
background: #f3f4f6;
border-radius: 4px;
font-size: 12px;
margin: 4px;
}
/* Ensure the footer text stays inside page bounds */
.page-footer {
position: absolute;
bottom: 48px;
right: 48px;
font-size: 10px;
color: #9ca3af; /* Tailwind gray-400 */
}
</style>
</head>
<body>
<!-- Page 1 -->
<div class="page page-with-break">
<div class="page-content">
<div class="flex justify-between items-start mb-8">
<div>
<p class="text-sm text-gray-600 mb-2">Investor Profile</p>
<h1 class="text-4xl font-bold text-gray-900">
{{ investor.name }}
</h1>
</div>
<a
href="{{ investor.website }}"
target="_blank"
class="bg-gray-200 text-gray-700 px-4 py-2 rounded text-sm no-underline"
>Visit Website →</a
>
</div>
<div class="grid grid-cols-2 gap-8 flex-grow">
<!-- Left Column -->
<div>
<div class="mb-4">
<h2 class="text-sm font-bold text-gray-900 uppercase mb-4">
Investor Description
</h2>
<p class="text-sm text-gray-700 leading-relaxed">
{{ investor.description or 'No description available.' }}
</p>
</div>
<div class="mb-4">
<h2 class="text-sm font-bold text-gray-900 uppercase mb-4">
Portfolio Highlights
</h2>
<div class="flex flex-wrap gap-2">
{% if investor.portfolio_highlights %}
{% for company in investor.portfolio_highlights[:5] %}
<span class="tag">{{ company }}</span>
{% endfor %}
{% else %}
<p class="text-sm text-gray-500">
No portfolio highlights available
</p>
{% endif %}
</div>
</div>
<div class="mb-4">
<h2 class="text-sm font-bold text-gray-900 uppercase mb-4">
Senior Leadership
</h2>
{% if investor.team_members %}
{% for member in investor.team_members[:2] %}
<div class="mb-3">
<p class="text-sm font-semibold text-gray-900">
{{ member.name }}
</p>
<p class="text-sm text-gray-600">
{{ member.role or member.title or 'Team Member' }}
</p>
{% if member.email %}
<p class="text-xs text-blue-600">
{{ member.email }}
</p>
{% endif %}
</div>
{% endfor %}
{% else %}
<p class="text-sm text-gray-500">No team information available</p>
{% endif %}
</div>
</div>
<!-- Right Column -->
<div class="bg-gray-50 p-6 rounded-lg">
<h2 class="text-sm font-bold text-gray-900 uppercase mb-4">
Key Data
</h2>
<div class="space-y-3 text-sm">
<div>
<p class="text-xs text-gray-600">Headquarters:</p>
<p class="font-semibold text-gray-900">
{{ investor.headquarters or 'N/A' }}
</p>
</div>
<div>
<p class="text-xs text-gray-600">Sectors:</p>
<p class="font-semibold text-gray-900">
{% if investor.sectors %}
{{ investor.sectors | join(', ') }}
{% else %}
N/A
{% endif %}
</p>
</div>
<div>
<p class="text-xs text-gray-600">DACH Region:</p>
<p class="font-semibold text-gray-900">
{{ investor.geographic_focus or 'N/A' }}
</p>
</div>
<div>
<p class="text-xs text-gray-600">AUM (EUR million):</p>
<p class="font-semibold text-gray-900">
{% if investor.aum %}
€{{ '{:,.0f}'.format(investor.aum / 1000000) }}M
{% else %}
N/A
{% endif %}
</p>
</div>
<div class="mb-4">
<p class="text-xs text-gray-600 mb-1">
Investment Stage:
</p>
<p class="text-sm font-semibold text-gray-900">
{% if investor.investment_stages %} {{
investor.investment_stages | join(', ') }} {% else
%} N/A {% endif %}
</p>
</div>
<div class="mb-4">
<p class="text-xs text-gray-600 mb-1">
Est. Investment Size:
</p>
<p class="text-sm font-semibold text-gray-900">
{% if investor.check_size_lower and
investor.check_size_upper %} €{{
'{:,.0f}'.format(investor.check_size_lower /
1000000) }}M - €{{
'{:,.0f}'.format(investor.check_size_upper /
1000000) }}M {% elif investor.check_size_lower %}
€{{ '{:,.0f}'.format(investor.check_size_lower /
1000000) }}M+ {% else %} N/A {% endif %}
</p>
</div>
</div>
</div>
</div>
<div class="page-footer">Page 1</div>
</div>
</div>
<!-- Page 2 -->
{% if project %}
<div class="page">
<div class="page-content">
<h1 class="text-3xl font-bold text-gray-900 mb-8">
{{ investor.name }}: Mandate Match Analysis
</h1>
<!-- Overall Match Circle -->
<div class="flex justify-center mb-12">
<div class="text-center">
<p class="text-sm font-bold text-gray-700 uppercase mb-4">
Overall Mandate Match
</p>
<div
class="w-48 h-48 rounded-full border-8 border-green-400 flex items-center justify-center bg-green-50 mx-auto"
>
<span class="text-5xl font-bold text-green-600"
>{{ compatibility_score }}%</span
>
</div>
</div>
</div>
<!-- Mandate Alignment Analysis Table -->
<div class="mb-12">
<h2 class="text-xl font-bold text-gray-900 mb-6">
Mandate Alignment Analysis
</h2>
<table class="w-full border-collapse">
<thead>
<tr class="border-b-2 border-gray-300">
<th
class="text-left py-3 px-4 text-sm font-bold text-gray-700"
>
Criterion
</th>
<th
class="text-left py-3 px-4 text-sm font-bold text-gray-700"
>
Mandate Requirement
</th>
<th
class="text-left py-3 px-4 text-sm font-bold text-gray-700"
>
Investor Evidence (from Database)
</th>
<th
class="text-left py-3 px-4 text-sm font-bold text-gray-700"
>
Match Score
</th>
<th
class="text-left py-3 px-4 text-sm font-bold text-gray-700"
>
Weight
</th>
</tr>
</thead>
<tbody>
{% for criterion in match_criteria %}
<tr class="border-b border-gray-200">
<td class="py-4 px-4 text-sm text-gray-900">
{{ criterion.name }}
</td>
<td class="py-4 px-4 text-sm text-gray-700">
{{ criterion.requirement }}
</td>
<td class="py-4 px-4 text-sm text-gray-700">
{{ criterion.evidence }}
</td>
<td class="py-4 px-4 text-sm">
<span
class="{% if criterion.match == 'Perfect' %}text-green-600{% elif criterion.match == 'Strong' %}text-blue-600{% else %}text-yellow-600{% endif %} font-semibold"
>
{{ criterion.match }}
</span>
</td>
<td class="py-4 px-4 text-sm text-gray-700">
{{ criterion.weight }}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<!-- Final Recommendation -->
<div class="bg-blue-50 border-l-4 border-blue-500 p-6 rounded">
<h3 class="text-lg font-bold text-gray-900 mb-3">
Final Recommendation & Rationale
</h3>
<p class="text-sm text-gray-700 leading-relaxed">
{{ recommendation or "High Priority. A strong target due to
exceptional alignment on the most heavily-weighted criteria:
Sector and Stage. The strong geographic fit in the DACH
region further solidifies this recommendation." }}
</p>
</div>
<div class="absolute bottom-12 right-12 text-xs text-gray-400">
Page 2
</div>
</div>
{% endif %}
</body>
</html>
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
Binary file not shown.
-315
View File
@@ -1,315 +0,0 @@
import logging
import re
import unicodedata
import pandas as pd
from models import CompanyTable, InvestorTable, SectorTable, engine, init_database
from sqlalchemy.orm import sessionmaker
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Import the schema
init_database()
# ===================== Ingesting Original Data =====================#
def parse_investor_names(investor_names_str):
"""Parse comma-separated investor names and return a list"""
if pd.isna(investor_names_str) or investor_names_str == "":
return []
# Split by comma and clean whitespace
# investors = [name.strip() for name in str(investor_names_str).split(",")]
investors = [
clean_name(name.strip()) for name in str(investor_names_str).split(",")
]
return [investor for investor in investors if investor]
def parse_industries(industries_str):
"""Parse comma-separated industries and return a list"""
if pd.isna(industries_str) or industries_str == "":
return []
# Split by comma and clean whitespace
industries = [industry.strip() for industry in str(industries_str).split(",")]
return [industry for industry in industries if industry]
def clean_special_characters(text):
"""Clean special characters from text, converting to ASCII equivalents"""
if not text:
return text
# First remove ellipses and other problematic patterns
text = str(text).replace("...", "").replace("..", "")
# Normalize unicode characters to their closest ASCII equivalents
normalized = unicodedata.normalize("NFKD", text)
# Remove accents and convert to ASCII
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
# Remove any remaining non-alphanumeric characters except spaces, hyphens, and periods
cleaned = re.sub(r"[^a-zA-Z0-9\s\-\.]", "", ascii_text)
# Clean up multiple spaces
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
def clean_string(value):
"""Clean string values, converting empty/null/nan/0 to None and removing special characters"""
if (
pd.isna(value)
or value == ""
or str(value).lower() in ["nan", "null", "none", "0", "0.0"]
):
return None
# First clean special characters
cleaned = clean_special_characters(str(value).strip())
# Check if result is just "0" after cleaning
if cleaned in ["0", "0.0", "null", "nan", "none"]:
return None
return cleaned if cleaned else None
def clean_name(value):
"""Clean names (companies, investors) with special character handling"""
if (
pd.isna(value)
or value == ""
or str(value).lower() in ["nan", "null", "none", "0", "0.0"]
):
return None
# Clean special characters but be more permissive for names
text = str(value).strip()
# First remove ellipses and other problematic patterns
# text = text.replace("...", "").replace("..", "")
# Normalize unicode characters
normalized = unicodedata.normalize("NFKD", text)
# Convert to ASCII but keep more characters for business names
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
# Allow alphanumeric, spaces, hyphens, periods, parentheses, and ampersands
cleaned = re.sub(r"[^a-zA-Z0-9\s\-\.\(\)&]", "", ascii_text)
# Clean up multiple spaces
cleaned = re.sub(r"\s+", " ", cleaned).strip()
# Remove any trailing or leading periods
cleaned = cleaned.strip(".")
cleaned = cleaned.replace("..", "").replace("...", "")
# Check if result is just "0" after cleaning
if cleaned in ["0", "0.0", "null", "nan", "none"]:
return None
return cleaned if cleaned else None
def clean_integer(value):
"""Clean integer values, converting empty/null/nan/0 to None"""
if pd.isna(value) or str(value).lower() in ["nan", "null", "none", "", "0", "0.0"]:
return None
try:
cleaned_val = int(float(value))
return cleaned_val if cleaned_val > 0 else None
except (ValueError, TypeError):
return None
def parse_website(website_str: str):
try:
_, end = website_str.split(":")
if end == "0":
return None
return "https:" + end
except Exception:
return None
def ingest_data():
# Create database engine and session
Session = sessionmaker(bind=engine)
session = Session()
# Load CSV files
print("Loading CSV files...")
companies_df = pd.read_csv("companies.csv")
investors_df = pd.read_csv("investors.csv")
print(f"📊 Companies CSV: {len(companies_df)} rows")
print(f"📊 Investors CSV: {len(investors_df)} rows")
# Step 1: Ingest Investors
print("\n🔄 Step 1: Ingesting Investors...")
investors_processed = 0
for index, row in investors_df.iterrows():
try:
investor_name = clean_name(row.get("Filtered investor names", ""))
if investor_name:
# Check if investor already exists
existing_investor = (
session.query(InvestorTable).filter_by(name=investor_name).first()
)
if not existing_investor:
investor = InvestorTable(
name=investor_name,
description=clean_string(row.get("Business model", "")),
headquarters=clean_string(row.get("HQ", "")),
website=parse_website(str(row.get("Website", "")).strip()),
number_of_investments=clean_integer(
row.get("Number of investments")
),
)
session.add(investor)
investors_processed += 1
if investors_processed % 1000 == 0:
session.commit()
print(f" Committed {investors_processed} investors")
except Exception as e:
logger.error(f"Error processing investor {index}: {e}")
continue
session.commit()
print(f"✅ Investors completed: {investors_processed} processed")
# Step 2: Ingest Companies and Rounds
print("\n🔄 Step 2: Ingesting Companies and Sectors...")
companies_processed = 0
sectors_created = set()
for index, row in companies_df.iterrows():
try:
# Process company
company_name = clean_name(row.get("Organization Name", ""))
if not company_name:
continue
# Check if company already exists
existing_company = (
session.query(CompanyTable).filter_by(name=company_name).first()
)
if existing_company:
company = existing_company
else:
# Create company
company = CompanyTable(
name=company_name,
description=clean_string(row.get("Organization Description", "")),
location=clean_string(row.get("Organization Location", "")),
industry=clean_string(row.get("Organization Industries", "")),
website=clean_string(row.get("Organization Website", "")),
)
session.add(company)
session.flush() # Get the company ID
companies_processed += 1
# Process investor relationships
investor_names_str = row.get("Investor Names", "")
if pd.notna(investor_names_str) and investor_names_str:
investor_names = parse_investor_names(investor_names_str)
for investor_name in investor_names:
# Find investor in database
investor = (
session.query(InvestorTable)
.filter_by(name=investor_name.strip())
.first()
)
if investor:
# Add investor-company relationship
if company not in investor.portfolio_companies:
investor.portfolio_companies.append(company)
else:
print("This company has an investor not in DB:", investor_name)
# Process sectors/industries
industries_str = row.get("Organization Industries", "")
if pd.notna(industries_str) and industries_str:
industries = parse_industries(industries_str)
for industry_name in industries:
industry_name = industry_name.strip()
if industry_name:
# Check if sector exists
sector = (
session.query(SectorTable)
.filter_by(name=industry_name)
.first()
)
if not sector:
sector = SectorTable(name=industry_name)
session.add(sector)
session.flush()
sectors_created.add(industry_name)
# Add company-sector relationship
if sector not in company.sectors:
company.sectors.append(sector)
# Commit every 100 companies
if companies_processed % 100 == 0 and companies_processed > 0:
session.commit()
print(f" Processed {companies_processed} companies...")
except Exception as e:
logger.error(f"Error processing company {index}: {e}")
session.rollback()
continue
# Step 3: Link investors to sectors based on portfolio companies
print("\n🔄 Step 3: Linking Investors to Sectors...")
investors_linked_to_sectors = 0
all_investors = session.query(InvestorTable).all()
for investor in all_investors:
sectors = set()
for company in investor.portfolio_companies:
for sector in company.sectors:
sectors.add(sector)
# Add sectors to investor if not already present
for sector in sectors:
if sector not in investor.sectors:
investor.sectors.append(sector)
if sectors:
investors_linked_to_sectors += 1
session.commit()
print(f"✅ Linked {investors_linked_to_sectors} investors to sectors")
# Final commit
session.commit()
# Final counts
final_investors = session.query(InvestorTable).count()
final_companies = session.query(CompanyTable).count()
final_sectors = session.query(SectorTable).count()
print("\n🎉 Ingestion Complete!")
print(f" Investors: {final_investors}")
print(f" Companies: {final_companies}")
print(f" Sectors: {final_sectors}")
session.close()
if __name__ == "__main__":
ingest_data()
# print(clean_name("A... Energi"))
# print(clean_name("B.. Tech"))
# print(clean_name("A... Energi"))
-381
View File
@@ -1,381 +0,0 @@
import enum
from typing import Annotated
from fastapi import Depends
from sqlalchemy import (
Column,
DateTime,
ForeignKey,
Integer,
String,
Table,
Text,
create_engine,
func,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, declarative_mixin, relationship, sessionmaker
from sqlalchemy.types import JSON, Enum
Base = declarative_base()
# Database configuration
# DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./investors.db")
# Create engine
engine = create_engine("sqlite:///./investors.db", echo=False)
# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
def init_database():
"""Initialize the database by creating all tables"""
Base.metadata.create_all(bind=engine)
def get_session_sync() -> Session:
"""Get a database session for synchronous operations"""
return SessionLocal()
def get_db_session():
"""Get a database session for direct use."""
return SessionLocal()
@declarative_mixin
class TimestampMixin:
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class InvestmentStage(enum.Enum):
SEED = "SEED"
SERIES_A = "SERIES_A"
SERIES_B = "SERIES_B"
SERIES_C = "SERIES_C"
GROWTH = "GROWTH"
LATE_STAGE = "LATE_STAGE"
# Association table for many-to-many relationship between investors and companies
investor_company_association = Table(
"investor_companies",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("company_id", Integer, ForeignKey("companies.id")),
)
# Association table for investor-sector many-to-many
investor_sector_association = Table(
"investor_sectors",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
company_sector_association = Table(
"company_sector",
Base.metadata,
Column("company_id", Integer, ForeignKey("companies.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_sector_association = Table(
"project_sector",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
project_investor_association = Table(
"project_investors",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("investor_id", Integer, ForeignKey("investors.id")),
)
project_company_association = Table(
"project_companies",
Base.metadata,
Column("project_id", Integer, ForeignKey("projects.id")),
Column("company_id", Integer, ForeignKey("companies.id")),
)
# Association table for investor-stage many-to-many
investor_stage_association = Table(
"investor_stages",
Base.metadata,
Column("investor_id", Integer, ForeignKey("investors.id")),
Column("stage_id", Integer, ForeignKey("investment_stages.id")),
)
# Association table for fund-stage many-to-many
fund_investment_stages_association = Table(
"fund_investment_stages",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("stage_id", Integer, ForeignKey("investment_stages.id")),
)
# Association table for fund-sector many-to-many
fund_sectors_association = Table(
"fund_sectors",
Base.metadata,
Column("fund_id", Integer, ForeignKey("funds.id")),
Column("sector_id", Integer, ForeignKey("sectors.id")),
)
class InvestorTable(Base, TimestampMixin):
__tablename__ = "investors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
# Basic investor info
website = Column(String, nullable=True)
headquarters = Column(String, nullable=True)
# AUM fields
aum = Column(Integer, nullable=True) # Store as integer for numerical filtering
aum_as_of_date = Column(String, nullable=True)
aum_source_url = Column(String, nullable=True)
# Check size (deprecated in favor of fund-level data, but keeping for backward compatibility)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
# Geographic focus (deprecated in favor of fund-level, but keeping for backward compatibility)
geographic_focus = Column(String, nullable=True)
# Investment thesis and portfolio
investment_thesis = Column(JSON, nullable=True) # Array of thesis statements
portfolio_highlights = Column(
JSON, nullable=True
) # Array of portfolio company names
linked_documents = Column(JSON, nullable=True) # Array of document URLs
# Research metadata
researcher_notes = Column(Text, nullable=True)
missing_important_fields = Column(
JSON, nullable=True
) # Array of missing field names
sources = Column(JSON, nullable=True) # JSON object with source URLs
# Portfolio info
number_of_investments = Column(Integer, nullable=True)
# Relationships
team_members = relationship(
"InvestorMember", back_populates="investor", cascade="all, delete-orphan"
)
funds = relationship(
"FundTable", back_populates="investor", cascade="all, delete-orphan"
)
# Many-to-many relationship with investment stages
investment_stages = relationship(
"InvestmentStageTable",
secondary=investor_stage_association,
back_populates="investors",
)
# Relationship to portfolio companies
portfolio_companies = relationship(
"CompanyTable",
secondary=investor_company_association,
back_populates="investors",
)
sectors = relationship(
"SectorTable",
secondary=investor_sector_association,
back_populates="investors",
)
projects = relationship(
"ProjectTable",
secondary=project_investor_association,
back_populates="investors",
)
class InvestorMember(Base, TimestampMixin):
__tablename__ = "investor_members"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
role = Column(String, nullable=True)
title = Column(String, nullable=True) # Alternative to role
email = Column(String, nullable=True)
source_url = Column(String, nullable=True) # URL where member info was found
investor_id = Column(Integer, ForeignKey("investors.id"))
investor = relationship("InvestorTable", back_populates="team_members")
class FundTable(Base, TimestampMixin):
__tablename__ = "funds"
id = Column(Integer, primary_key=True, index=True)
investor_id = Column(Integer, ForeignKey("investors.id"), nullable=False)
# Fund details
fund_name = Column(String, nullable=True)
fund_size = Column(
Integer, nullable=True
) # Store as integer for numerical filtering
fund_size_source_url = Column(String, nullable=True)
# Check size range (parsed from estimated_investment_size by LLM)
check_size_lower = Column(Integer, nullable=True)
check_size_upper = Column(Integer, nullable=True)
source_url = Column(String, nullable=True)
source_provider = Column(String, nullable=True) # e.g., "Perplexity"
# Geographic focus as simple string
geographic_focus = Column(String, nullable=True)
# Relationships
investor = relationship("InvestorTable", back_populates="funds")
investment_stages = relationship(
"InvestmentStageTable",
secondary=fund_investment_stages_association,
back_populates="funds",
)
sectors = relationship(
"SectorTable",
secondary=fund_sectors_association,
back_populates="funds",
)
class InvestmentStageTable(Base, TimestampMixin):
__tablename__ = "investment_stages"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False, unique=True)
# Relationships
investors = relationship(
"InvestorTable",
secondary=investor_stage_association,
back_populates="investment_stages",
)
funds = relationship(
"FundTable",
secondary=fund_investment_stages_association,
back_populates="investment_stages",
)
class CompanyTable(Base, TimestampMixin):
__tablename__ = "companies"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
industry = Column(String, nullable=True)
location = Column(String, nullable=True)
description = Column(String, nullable=True)
founded_year = Column(Integer, nullable=True)
website = Column(String, nullable=True)
members = relationship(
"CompanyMember", back_populates="company", cascade="all, delete-orphan"
)
# Relationship back to investors
investors = relationship(
"InvestorTable",
secondary=investor_company_association,
back_populates="portfolio_companies",
)
sectors = relationship(
"SectorTable", secondary=company_sector_association, back_populates="companies"
)
projects = relationship(
"ProjectTable",
secondary=project_company_association,
back_populates="companies",
)
class CompanyMember(Base, TimestampMixin):
__tablename__ = "company_members"
id = Column(Integer, primary_key=True)
name = Column(String)
linkedin = Column(String, nullable=True)
role = Column(String, nullable=True)
company_id = Column(Integer, ForeignKey("companies.id"), nullable=False)
company = relationship("CompanyTable", back_populates="members")
class SectorTable(Base, TimestampMixin):
__tablename__ = "sectors"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
# Relationships
investors = relationship(
"InvestorTable",
secondary=investor_sector_association,
back_populates="sectors",
)
companies = relationship(
"CompanyTable", secondary=company_sector_association, back_populates="sectors"
)
projects = relationship(
"ProjectTable", secondary=project_sector_association, back_populates="sector"
)
funds = relationship(
"FundTable",
secondary=fund_sectors_association,
back_populates="sectors",
)
class ProjectTable(Base, TimestampMixin):
__tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
valuation = Column(Integer, nullable=True)
stage = Column(Enum(InvestmentStage), nullable=True)
location = Column(String, nullable=True)
description = Column(Text, nullable=True)
start_date = Column(DateTime, nullable=True)
end_date = Column(DateTime, nullable=True)
sector = relationship(
"SectorTable", secondary=project_sector_association, back_populates="projects"
)
investors = relationship(
"InvestorTable",
secondary=project_investor_association,
back_populates="projects",
)
companies = relationship(
"CompanyTable", secondary=project_company_association, back_populates="projects"
)
BIN
View File
Binary file not shown.
-40
View File
@@ -1,40 +1,26 @@
aiofiles==24.1.0
aiohappyeyeballs==2.6.1
aiohttp==3.12.15
aiosignal==1.4.0
aiosqlite==0.21.0
alphashape==1.3.1
annotated-types==0.7.0
anyio==4.10.0
attrs==25.3.0
backoff==2.2.1
bcrypt==4.3.0
beautifulsoup4==4.14.2
brotli==1.1.0
build==1.3.0
cachetools==5.5.2
certifi==2025.8.3
cffi==2.0.0
chardet==5.2.0
charset-normalizer==3.4.3
chromadb==1.0.20
click==8.2.1
click-log==0.4.0
coloredlogs==15.0.1
crawl4ai==0.7.4
cryptography==46.0.1
dataclasses-json==0.6.7
ddgs==9.5.2
distro==1.9.0
dnspython==2.7.0
durationpy==0.10
email-validator==2.3.0
fake-http-header==0.3.5
fake-useragent==2.2.0
fastapi==0.116.1
fastapi-cli==0.0.8
fastapi-cloud-cli==0.1.5
fastuuid==0.13.5
filelock==3.19.1
flatbuffers==25.2.10
frozenlist==1.7.0
@@ -44,24 +30,19 @@ googleapis-common-protos==1.70.0
greenlet==3.2.4
grpcio==1.74.0
h11==0.16.0
h2==4.3.0
hf-xet==1.1.8
hpack==4.1.0
httpcore==1.0.9
httptools==0.6.4
httpx==0.28.1
httpx-sse==0.4.1
huggingface-hub==0.34.4
humanfriendly==10.0
humanize==4.13.0
hyperframe==6.1.0
idna==3.10
importlib-metadata==8.7.0
importlib-resources==6.5.2
itsdangerous==2.2.0
jinja2==3.1.6
jiter==0.10.0
joblib==1.5.2
jsonpatch==1.33
jsonpointer==3.0.0
jsonschema==4.25.1
@@ -77,9 +58,6 @@ langgraph-checkpoint==2.1.1
langgraph-prebuilt==0.6.4
langgraph-sdk==0.2.4
langsmith==0.4.20
lark==1.3.0
litellm==1.77.5
lxml==5.4.0
markdown-it-py==4.0.0
markupsafe==3.0.2
marshmallow==3.26.1
@@ -88,8 +66,6 @@ mmh3==5.2.0
mpmath==1.3.0
multidict==6.6.4
mypy-extensions==1.1.0
networkx==3.5
nltk==3.9.1
numpy==2.3.2
oauthlib==3.3.1
onnxruntime==1.22.1
@@ -105,26 +81,18 @@ ormsgpack==1.10.0
overrides==7.7.0
packaging==25.0
pandas==2.3.2
patchright==1.55.2
pillow==11.3.0
pip==25.2
playwright==1.55.0
posthog==5.4.0
primp==0.15.0
propcache==0.3.2
protobuf==6.32.0
psutil==7.1.0
pyasn1==0.6.1
pyasn1-modules==0.4.2
pybase64==1.4.2
pycparser==2.23
pydantic==2.11.7
pydantic-core==2.33.2
pydantic-extra-types==2.10.5
pydantic-settings==2.10.1
pyee==13.0.0
pygments==2.19.2
pyopenssl==25.3.0
pypika==0.48.9
pyproject-hooks==1.2.0
python-dateutil==2.9.0.post0
@@ -132,7 +100,6 @@ python-dotenv==1.1.1
python-multipart==0.0.20
pytz==2025.2
pyyaml==6.0.2
rank-bm25==0.2.2
referencing==0.36.2
regex==2025.7.34
requests==2.32.5
@@ -143,24 +110,17 @@ rich-toolkit==0.15.0
rignore==0.6.4
rpds-py==0.27.1
rsa==4.9.1
rtree==1.4.1
scipy==1.16.2
sentry-sdk==2.35.1
shapely==2.1.2
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
snowballstemmer==2.2.0
soupsieve==2.8
sqlalchemy==2.0.43
starlette==0.47.3
sympy==1.14.0
tenacity==9.1.2
tf-playwright-stealth==1.2.0
tiktoken==0.11.0
tokenizers==0.21.4
tqdm==4.67.1
trimesh==4.8.3
typer==0.16.1
typing-extensions==4.15.0
typing-inspect==0.9.0