Files
Anton_wireframe/app/routers/investors.py
T

281 lines
9.9 KiB
Python

from typing import List, Optional
from db.db import get_db
from db.models import InvestorTable, SectorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from schemas.router_schemas import InvestmentStage, InvestorData
from services.querying import QueryProcessor
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Investor Routes"])
# Request schemas for creating/updating
class InvestorCreate(BaseModel):
name: str
description: Optional[str] = None
aum: int
check_size_lower: int
check_size_upper: int
geographic_focus: str
stage_focus: InvestmentStage
number_of_investments: int = 0
class InvestorUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
aum: Optional[int] = None
check_size_lower: Optional[int] = None
check_size_upper: Optional[int] = None
geographic_focus: Optional[str] = None
stage_focus: Optional[InvestmentStage] = None
number_of_investments: Optional[int] = None
@router.get("/investors", response_model=List[InvestorData])
def read_investors(db: Session = Depends(get_db)):
"""Get all investors with their related data"""
investors = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.all()
)
# Transform InvestorTable objects to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor, # This maps to InvestorSchema
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
return investor_data_list
@router.get("/investors/filter", response_model=List[InvestorData])
def filter_investors(
stage: Optional[InvestmentStage] = Query(
None, description="Filter by investment stage"
),
min_check_size: Optional[int] = Query(None, description="Minimum check size"),
max_check_size: Optional[int] = Query(None, description="Maximum check size"),
geography: Optional[str] = Query(
None, description="Geographic focus (partial match)"
),
sector: Optional[str] = Query(None, description="Sector name (partial match)"),
min_aum: Optional[int] = Query(None, description="Minimum AUM"),
max_aum: Optional[int] = Query(None, description="Maximum AUM"),
db: Session = Depends(get_db),
):
"""Filter investors based on various criteria"""
# Start with base query
query = db.query(InvestorTable).options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
# Apply filters
if stage:
query = query.filter(InvestorTable.stage_focus == stage)
if min_check_size is not None:
query = query.filter(InvestorTable.check_size_lower >= min_check_size)
if max_check_size is not None:
query = query.filter(InvestorTable.check_size_upper <= max_check_size)
if geography:
query = query.filter(InvestorTable.geographic_focus.ilike(f"%{geography}%"))
if min_aum is not None:
query = query.filter(InvestorTable.aum >= min_aum)
if max_aum is not None:
query = query.filter(InvestorTable.aum <= max_aum)
# Filter by sector if provided
if sector:
query = query.join(InvestorTable.sectors).filter(
SectorTable.name.ilike(f"%{sector}%")
)
investors = query.all()
# Transform to InvestorData format
investor_data_list = []
for investor in investors:
investor_data = InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
investor_data_list.append(investor_data)
return investor_data_list
@router.get("/investors/{investor_id}", response_model=InvestorData)
def read_investor(investor_id: int, db: Session = Depends(get_db)):
"""Get a specific investor by ID"""
investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
)
if not investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Transform to InvestorData format
return InvestorData(
investor=investor,
portfolio_companies=investor.portfolio_companies,
team_members=investor.team_members,
sectors=investor.sectors,
)
@router.post("/investors", response_model=InvestorData)
def create_investor(investor: InvestorCreate, db: Session = Depends(get_db)):
"""Create a new investor"""
db_investor = InvestorTable(**investor.dict())
db.add(db_investor)
db.commit()
db.refresh(db_investor)
# Reload with relationships
investor_with_relations = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == db_investor.id)
.first()
)
# Transform to InvestorData format
return InvestorData(
investor=investor_with_relations,
portfolio_companies=investor_with_relations.portfolio_companies,
team_members=investor_with_relations.team_members,
sectors=investor_with_relations.sectors,
)
@router.get("/investors/{investor_id}/similar", response_model=List[InvestorData])
def find_similar_investors(
investor_id: int,
limit: int = Query(10, description="Maximum number of similar investors to return"),
db: Session = Depends(get_db)
):
"""Find investors similar to a given investor based on characteristics"""
# Get the target investor
target_investor = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id == investor_id)
.first()
)
if not target_investor:
raise HTTPException(status_code=404, detail="Investor not found")
# Get target investor's sector IDs for comparison
target_sector_ids = {sector.id for sector in target_investor.sectors}
# Query all other investors with their relationships
candidates = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id != investor_id)
.all()
)
# Calculate similarity scores
scored_investors = []
for candidate in candidates:
score = 0
# Stage focus match (30 points)
if candidate.stage_focus == target_investor.stage_focus:
score += 30
# Geographic focus match (20 points for exact, 10 for partial)
if candidate.geographic_focus and target_investor.geographic_focus:
if candidate.geographic_focus.lower() == target_investor.geographic_focus.lower():
score += 20
elif (candidate.geographic_focus.lower() in target_investor.geographic_focus.lower() or
target_investor.geographic_focus.lower() in candidate.geographic_focus.lower()):
score += 10
# Check size overlap (20 points max)
if (candidate.check_size_lower and candidate.check_size_upper and
target_investor.check_size_lower and target_investor.check_size_upper):
# Calculate overlap percentage
overlap_start = max(candidate.check_size_lower, target_investor.check_size_lower)
overlap_end = min(candidate.check_size_upper, target_investor.check_size_upper)
if overlap_end > overlap_start:
overlap = overlap_end - overlap_start
target_range = target_investor.check_size_upper - target_investor.check_size_lower
overlap_ratio = overlap / target_range if target_range > 0 else 0
score += int(20 * overlap_ratio)
# AUM similarity (15 points max)
if candidate.aum and target_investor.aum:
aum_diff = abs(candidate.aum - target_investor.aum)
max_aum = max(candidate.aum, target_investor.aum)
similarity_ratio = 1 - (aum_diff / max_aum) if max_aum > 0 else 0
score += int(15 * similarity_ratio)
# Sector overlap (30 points max)
candidate_sector_ids = {sector.id for sector in candidate.sectors}
if target_sector_ids and candidate_sector_ids:
common_sectors = target_sector_ids.intersection(candidate_sector_ids)
overlap_ratio = len(common_sectors) / len(target_sector_ids)
score += int(30 * overlap_ratio)
if score > 0: # Only include investors with some similarity
scored_investors.append((score, candidate))
# Sort by score (descending) and take top N
scored_investors.sort(key=lambda x: x[0], reverse=True)
similar_investors = [inv for score, inv in scored_investors[:limit]]
# Transform to InvestorData format
return [
InvestorData(
investor=inv,
portfolio_companies=inv.portfolio_companies,
team_members=inv.team_members,
sectors=inv.sectors,
)
for inv in similar_investors
]