feat: Integrate Folk CRM API for investor synchronization and compatibility scoring

This commit is contained in:
bolade
2025-10-08 19:21:46 +01:00
parent cefe89bb67
commit 64f9364fcd
9 changed files with 1055 additions and 14 deletions
Binary file not shown.
+2 -1
View File
@@ -5,7 +5,7 @@ from db.db import Base, db_dependency, engine
from dotenv import load_dotenv
from fastapi import FastAPI, File, Form, UploadFile
from pydantic import BaseModel
from routers import companies, investors, projects
from routers import companies, folk_crm, investors, projects
from schemas.router_schemas import InvestmentResponse, PaginatedResponse
from services.llm_parser import InvestorProcessor
from services.querying import QueryProcessor
@@ -108,6 +108,7 @@ async def query_investors(request: QueryRequest):
app.include_router(investors.router)
app.include_router(companies.router)
app.include_router(projects.router)
app.include_router(folk_crm.router)
if __name__ == "__main__":
import uvicorn
Binary file not shown.
+190
View File
@@ -0,0 +1,190 @@
from typing import List
from db.db import get_db
from db.models import InvestorTable
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from services.crm import folk
from sqlalchemy.orm import Session, selectinload
router = APIRouter(prefix="/folk", tags=["Folk CRM"])
class GroupResponse(BaseModel):
id: str
name: str
class SyncInvestorsRequest(BaseModel):
investor_ids: List[int]
group_id: str
class SyncResult(BaseModel):
investor_id: int
investor_name: str
company_id: str
company_name: str
team_members_synced: int
person_ids: List[str]
class SyncInvestorsResponse(BaseModel):
success: bool
synced_count: int
results: List[SyncResult]
errors: List[dict]
@router.get("/groups", response_model=List[GroupResponse])
def get_folk_groups():
"""Get all groups from Folk CRM.
Returns a list of groups with their id and name that can be used
to sync investors to Folk.
"""
try:
groups_data = folk.get_groups()
items = groups_data.get("data", {}).get("items", [])
return [GroupResponse(id=item["id"], name=item["name"]) for item in items]
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to fetch groups from Folk: {str(e)}"
)
@router.post("/sync-investors", response_model=SyncInvestorsResponse)
def sync_investors_to_folk(
request: SyncInvestorsRequest, db: Session = Depends(get_db)
):
"""Sync investors to Folk CRM as companies with their team members as people.
Takes a list of investor IDs and a Folk group ID, then:
1. Creates each investor as a company in the specified Folk group
2. Creates each team member as a person linked to that company
Args:
investor_ids: List of investor IDs from the database
group_id: Folk group ID where investors should be added
Returns:
Summary of sync operation including successes and errors
"""
# Fetch investors with their team members
investors = (
db.query(InvestorTable)
.options(
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
)
.filter(InvestorTable.id.in_(request.investor_ids))
.all()
)
if not investors:
raise HTTPException(
status_code=404, detail="No investors found with the provided IDs"
)
results = []
errors = []
for investor in investors:
try:
# Create company in Folk
company_data = folk.create_company(
name=investor.name,
group_id=request.group_id,
website=investor.website,
description=investor.description,
addresses=[investor.headquarters] if investor.headquarters else None,
)
company_id = company_data.get("data", {}).get("id")
if not company_id:
errors.append(
{
"investor_id": investor.id,
"investor_name": investor.name,
"error": "No company ID returned from Folk API",
}
)
continue
# Create team members as people
person_ids = []
team_members_synced = 0
for member in investor.team_members:
try:
# Extract first name and last name from full name
name_parts = member.name.split(maxsplit=1)
first_name = name_parts[0] if name_parts else member.name
last_name = name_parts[1] if len(name_parts) > 1 else ""
# Build URLs list from source_url if available
urls_list = None
if hasattr(member, "source_url") and member.source_url:
urls_list = [member.source_url]
# Build job title from title or role
job_title = None
if hasattr(member, "title") and member.title:
job_title = member.title
elif hasattr(member, "role") and member.role:
job_title = member.role
person_data = folk.create_person(
first_name=first_name,
last_name=last_name,
email=member.email,
company_id=company_id,
group_id=request.group_id,
urls=urls_list,
jobTitle=job_title,
)
person_id = person_data.get("data", {}).get("id")
if person_id:
person_ids.append(person_id)
team_members_synced += 1
except Exception as person_error:
# Log person creation error but continue with other members
errors.append(
{
"investor_id": investor.id,
"investor_name": investor.name,
"team_member_name": member.name,
"error": f"Failed to create person: {str(person_error)}",
}
)
results.append(
SyncResult(
investor_id=investor.id,
investor_name=investor.name,
company_id=company_id,
company_name=company_data.get("data", {}).get(
"name", investor.name
),
team_members_synced=team_members_synced,
person_ids=person_ids,
)
)
except Exception as e:
errors.append(
{
"investor_id": investor.id,
"investor_name": investor.name,
"error": str(e),
}
)
return SyncInvestorsResponse(
success=len(results) > 0,
synced_count=len(results),
results=results,
errors=errors,
)
+55 -5
View File
@@ -1,7 +1,7 @@
from typing import Optional
from db.db import get_db
from db.models import FundTable, InvestorTable, SectorTable
from db.models import FundTable, InvestorTable, ProjectTable, SectorTable
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from schemas.router_schemas import (
@@ -12,6 +12,7 @@ from schemas.router_schemas import (
PaginatedResponse,
SectorMinimal,
)
from services.compatibility_score import calculate_project_investor_compatibility
from sqlalchemy.orm import Session, selectinload
router = APIRouter(tags=["Investor Routes"])
@@ -46,12 +47,17 @@ class InvestorUpdate(BaseModel):
def read_investors(
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
project_id: Optional[int] = Query(
None, description="Optional project ID for compatibility scoring"
),
db: Session = Depends(get_db),
):
"""Get all investors with their funds as separate entries (paginated)
Each investor-fund combination is returned as a separate row.
An investor with 3 funds will appear as 3 entries.
If project_id is provided, calculates compatibility scores for each investor.
"""
# Calculate offset
offset = (page - 1) * page_size
@@ -59,6 +65,18 @@ def read_investors(
# Get total count
total_count = db.query(InvestorTable).count()
# Load project if project_id provided
project = None
if project_id is not None:
project = (
db.query(ProjectTable)
.options(selectinload(ProjectTable.sector))
.filter(ProjectTable.id == project_id)
.first()
)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Get paginated results
investors = (
db.query(InvestorTable)
@@ -66,7 +84,8 @@ def read_investors(
selectinload(InvestorTable.portfolio_companies),
selectinload(InvestorTable.team_members),
selectinload(InvestorTable.sectors),
selectinload(InvestorTable.funds),
selectinload(InvestorTable.funds).selectinload(FundTable.investment_stages),
selectinload(InvestorTable.funds).selectinload(FundTable.sectors),
)
.offset(offset)
.limit(page_size)
@@ -76,6 +95,13 @@ def read_investors(
# Transform to InvestmentResponse format (one row per investor-fund combination)
investment_responses = []
for investor in investors:
# Calculate compatibility score if project provided
compatibility_score = 1.0
if project is not None:
compatibility_score = calculate_project_investor_compatibility(
project=project, investor=investor, use_funds=True
)
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
@@ -110,7 +136,7 @@ def read_investors(
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=1.0,
compatibility_score=compatibility_score,
)
investment_responses.append(investment_response)
else:
@@ -125,7 +151,7 @@ def read_investors(
stage_focus=None,
portfolio_companies=portfolio_companies,
sectors=[],
compatibility_score=1.0,
compatibility_score=compatibility_score,
)
investment_responses.append(investment_response)
@@ -156,14 +182,31 @@ def filter_investors(
max_aum: Optional[int] = Query(None, description="Maximum AUM"),
page: int = Query(1, ge=1, description="Page number (starts at 1)"),
page_size: int = Query(10, ge=1, le=100, description="Items per page (max 100)"),
project_id: Optional[int] = Query(
None, description="Optional project ID for compatibility scoring"
),
db: Session = Depends(get_db),
):
"""Filter investors based on various criteria (paginated)
Returns investor-fund combinations as separate rows.
Queries the funds table to find matching funds.
If project_id is provided, calculates compatibility scores for each investor.
"""
# Load project if project_id provided
project = None
if project_id is not None:
project = (
db.query(ProjectTable)
.options(selectinload(ProjectTable.sector))
.filter(ProjectTable.id == project_id)
.first()
)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Start with base query on funds table
query = db.query(FundTable).options(
selectinload(FundTable.investor).selectinload(
@@ -212,6 +255,13 @@ def filter_investors(
for fund in funds:
investor = fund.investor
# Calculate compatibility score if project provided
compatibility_score = 1.0
if project is not None:
compatibility_score = calculate_project_investor_compatibility(
project=project, investor=investor, use_funds=True
)
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
@@ -243,7 +293,7 @@ def filter_investors(
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=1.0,
compatibility_score=compatibility_score,
)
investment_responses.append(investment_response)
Binary file not shown.
+509
View File
@@ -0,0 +1,509 @@
"""
Compatibility Score Service
This module calculates compatibility scores between projects and investors.
The scoring system evaluates multiple dimensions to determine how well a project
matches with an investor's investment criteria.
"""
from typing import List, Optional, Tuple
from db.models import FundTable, InvestorTable, ProjectTable
def calculate_project_investor_compatibility(
project: ProjectTable, investor: InvestorTable, use_funds: bool = True
) -> float:
"""
Calculate compatibility score between a project and an investor.
Args:
project: The project to evaluate
investor: The investor to compare against
use_funds: If True, evaluates against investor's funds. If False, uses investor-level data.
Returns:
A score between 0 and 1, where 1 is perfect match
Scoring breakdown (out of 100 points):
- Investment Stage Match: 30 points
- Sector Overlap: 30 points
- Geographic Match: 20 points
- Valuation/Check Size Fit: 20 points
"""
if use_funds and investor.funds:
# Calculate score for each fund and return the highest
max_score = 0.0
for fund in investor.funds:
fund_score = _calculate_project_fund_compatibility(project, fund)
max_score = max(max_score, fund_score)
return max_score
else:
# Use investor-level data (fallback)
return _calculate_project_investor_direct_compatibility(project, investor)
def calculate_project_investors_compatibility(
project: ProjectTable, investors: List[InvestorTable], use_funds: bool = True
) -> List[Tuple[InvestorTable, float]]:
"""
Calculate compatibility scores between a project and multiple investors.
Args:
project: The project to evaluate
investors: List of investors to compare against
use_funds: If True, evaluates against investors' funds. If False, uses investor-level data.
Returns:
List of tuples (investor, score) sorted by score descending
"""
scored_investors = []
for investor in investors:
score = calculate_project_investor_compatibility(project, investor, use_funds)
scored_investors.append((investor, score))
# Sort by score descending
scored_investors.sort(key=lambda x: x[1], reverse=True)
return scored_investors
def _calculate_project_fund_compatibility(
project: ProjectTable, fund: FundTable
) -> float:
"""
Calculate compatibility score between a project and a specific fund.
Scoring breakdown:
- Investment Stage Match: 30 points (all or nothing if stage exists)
- Sector Overlap: 30 points (proportional to overlap)
- Geographic Match: 20 points (exact=20, partial=10, none=0)
- Valuation/Check Size Fit: 20 points (proportional to fit)
Returns:
A score between 0 and 1
"""
total_score = 0
max_score = 100
# 1. Investment Stage Match (30 points)
stage_score = 0
if project.stage and fund.investment_stages:
# Check if project stage matches any of the fund's investment stages
fund_stage_names = {stage.name for stage in fund.investment_stages}
# Convert project.stage enum to string for comparison
project_stage_name = (
project.stage.value
if hasattr(project.stage, "value")
else str(project.stage)
)
if project_stage_name in fund_stage_names:
stage_score = 30
else:
# Partial credit for adjacent stages
stage_score = _calculate_stage_proximity(
project_stage_name, fund_stage_names
)
total_score += stage_score
# 2. Sector Overlap (30 points)
sector_score = 0
if project.sector and fund.sectors:
project_sector_ids = {sector.id for sector in project.sector}
fund_sector_ids = {sector.id for sector in fund.sectors}
if project_sector_ids and fund_sector_ids:
common_sectors = project_sector_ids.intersection(fund_sector_ids)
# Score based on what percentage of project sectors are covered by fund
overlap_ratio = len(common_sectors) / len(project_sector_ids)
sector_score = int(30 * overlap_ratio)
total_score += sector_score
# 3. Geographic Match (20 points)
geo_score = 0
if project.location and fund.geographic_focus:
project_location_lower = project.location.lower()
fund_geo_lower = fund.geographic_focus.lower()
# Exact match
if project_location_lower == fund_geo_lower:
geo_score = 20
# Partial match (one contains the other)
elif (
project_location_lower in fund_geo_lower
or fund_geo_lower in project_location_lower
):
geo_score = 10
# Check for common geographic terms
elif _check_geographic_overlap(project_location_lower, fund_geo_lower):
geo_score = 5
total_score += geo_score
# 4. Valuation/Check Size Fit (20 points)
valuation_score = 0
if project.valuation and fund.check_size_lower and fund.check_size_upper:
# Check if project valuation falls within or near the check size range
# Typically, check size is a fraction of valuation (e.g., 10-20%)
# We'll assume check size represents potential investment amount
if fund.check_size_lower <= project.valuation <= fund.check_size_upper:
# Valuation is within the check size range (might be too small)
valuation_score = 10
else:
# Check if the check size is reasonable for this valuation
# Typical investment is 10-30% of valuation
reasonable_valuation_min = fund.check_size_lower * 3 # Investing ~33%
reasonable_valuation_max = fund.check_size_upper * 10 # Investing ~10%
if (
reasonable_valuation_min
<= project.valuation
<= reasonable_valuation_max
):
# Perfect fit
valuation_score = 20
elif project.valuation < reasonable_valuation_min:
# Project might be too small
ratio = (
project.valuation / reasonable_valuation_min
if reasonable_valuation_min > 0
else 0
)
valuation_score = int(10 * ratio)
else:
# Project might be too large
ratio = (
reasonable_valuation_max / project.valuation
if project.valuation > 0
else 0
)
valuation_score = int(10 * ratio)
total_score += valuation_score
# Convert to 0-1 scale
return total_score / max_score
def _calculate_project_investor_direct_compatibility(
project: ProjectTable, investor: InvestorTable
) -> float:
"""
Calculate compatibility using investor-level data (fallback when no funds available).
Uses the same scoring system but with investor-level attributes.
"""
total_score = 0
max_score = 100
# 1. Investment Stage - Skip this since investors don't have a direct stage field
# We could add 30 points to other categories, but for consistency, we'll leave it as 0
stage_score = 0
total_score += stage_score
# 2. Sector Overlap (30 points)
sector_score = 0
if project.sector and investor.sectors:
project_sector_ids = {sector.id for sector in project.sector}
investor_sector_ids = {sector.id for sector in investor.sectors}
if project_sector_ids and investor_sector_ids:
common_sectors = project_sector_ids.intersection(investor_sector_ids)
overlap_ratio = len(common_sectors) / len(project_sector_ids)
sector_score = int(30 * overlap_ratio)
total_score += sector_score
# 3. Geographic Match (20 points)
geo_score = 0
if project.location and investor.geographic_focus:
project_location_lower = project.location.lower()
investor_geo_lower = investor.geographic_focus.lower()
if project_location_lower == investor_geo_lower:
geo_score = 20
elif (
project_location_lower in investor_geo_lower
or investor_geo_lower in project_location_lower
):
geo_score = 10
elif _check_geographic_overlap(project_location_lower, investor_geo_lower):
geo_score = 5
total_score += geo_score
# 4. Valuation/Check Size Fit (20 points)
valuation_score = 0
if project.valuation and investor.check_size_lower and investor.check_size_upper:
reasonable_valuation_min = investor.check_size_lower * 3
reasonable_valuation_max = investor.check_size_upper * 10
if reasonable_valuation_min <= project.valuation <= reasonable_valuation_max:
valuation_score = 20
elif project.valuation < reasonable_valuation_min:
ratio = (
project.valuation / reasonable_valuation_min
if reasonable_valuation_min > 0
else 0
)
valuation_score = int(10 * ratio)
else:
ratio = (
reasonable_valuation_max / project.valuation
if project.valuation > 0
else 0
)
valuation_score = int(10 * ratio)
total_score += valuation_score
# Convert to 0-1 scale
return total_score / max_score
def _calculate_stage_proximity(project_stage: str, fund_stages: set) -> int:
"""
Calculate proximity score between project stage and fund stages.
Awards partial credit for adjacent investment stages.
Stage progression: SEED -> SERIES_A -> SERIES_B -> SERIES_C -> GROWTH -> LATE_STAGE
Returns:
Score from 0-15 (half credit for adjacent stages)
"""
stage_order = ["SEED", "SERIES_A", "SERIES_B", "SERIES_C", "GROWTH", "LATE_STAGE"]
try:
project_idx = stage_order.index(project_stage)
except ValueError:
return 0
# Check for adjacent stages
adjacent_stages = []
if project_idx > 0:
adjacent_stages.append(stage_order[project_idx - 1])
if project_idx < len(stage_order) - 1:
adjacent_stages.append(stage_order[project_idx + 1])
for stage in fund_stages:
if stage in adjacent_stages:
return 15 # Half credit for adjacent stage
return 0
def _check_geographic_overlap(location1: str, location2: str) -> bool:
"""
Check for common geographic terms between two locations.
Examples:
- "San Francisco, CA" and "California" -> True
- "New York" and "USA" -> True (if both contain USA/US)
- "London, UK" and "United Kingdom" -> True
"""
# Common geographic groupings
geo_groups = [
["usa", "us", "united states", "america"],
["uk", "united kingdom", "britain"],
["california", "ca"],
["new york", "ny"],
["texas", "tx"],
["europe", "eu"],
["asia", "asian"],
["africa", "african"],
]
for group in geo_groups:
found_in_1 = any(term in location1 for term in group)
found_in_2 = any(term in location2 for term in group)
if found_in_1 and found_in_2:
return True
return False
def get_top_compatible_investors(
project: ProjectTable,
investors: List[InvestorTable],
limit: int = 10,
min_score: float = 0.0,
use_funds: bool = True,
) -> List[Tuple[InvestorTable, float]]:
"""
Get the top N most compatible investors for a project.
Args:
project: The project to find investors for
investors: List of all available investors
limit: Maximum number of investors to return
min_score: Minimum compatibility score threshold (0-1)
use_funds: If True, evaluates against investors' funds
Returns:
List of tuples (investor, score) sorted by score descending,
limited to 'limit' items and filtered by min_score
"""
scored_investors = calculate_project_investors_compatibility(
project, investors, use_funds
)
# Filter by minimum score
filtered_investors = [
(investor, score) for investor, score in scored_investors if score >= min_score
]
# Return top N
return filtered_investors[:limit]
def get_compatibility_score_breakdown(
project: ProjectTable, investor: InvestorTable, fund: Optional[FundTable] = None
) -> dict:
"""
Get a detailed breakdown of the compatibility score components.
Useful for debugging or showing users why a particular score was calculated.
Returns:
Dictionary with score components and explanations
"""
if fund:
total_score = 0
# Stage score
stage_score = 0
stage_match = False
if project.stage and fund.investment_stages:
fund_stage_names = {stage.name for stage in fund.investment_stages}
project_stage_name = (
project.stage.value
if hasattr(project.stage, "value")
else str(project.stage)
)
if project_stage_name in fund_stage_names:
stage_score = 30
stage_match = True
else:
stage_score = _calculate_stage_proximity(
project_stage_name, fund_stage_names
)
# Sector score
sector_score = 0
matching_sectors = []
if project.sector and fund.sectors:
project_sector_ids = {sector.id for sector in project.sector}
fund_sector_ids = {sector.id for sector in fund.sectors}
if project_sector_ids and fund_sector_ids:
common_sectors = project_sector_ids.intersection(fund_sector_ids)
matching_sectors = [
s.name for s in fund.sectors if s.id in common_sectors
]
overlap_ratio = len(common_sectors) / len(project_sector_ids)
sector_score = int(30 * overlap_ratio)
# Geographic score
geo_score = 0
geo_match_type = "none"
if project.location and fund.geographic_focus:
project_location_lower = project.location.lower()
fund_geo_lower = fund.geographic_focus.lower()
if project_location_lower == fund_geo_lower:
geo_score = 20
geo_match_type = "exact"
elif (
project_location_lower in fund_geo_lower
or fund_geo_lower in project_location_lower
):
geo_score = 10
geo_match_type = "partial"
elif _check_geographic_overlap(project_location_lower, fund_geo_lower):
geo_score = 5
geo_match_type = "regional"
# Valuation score
valuation_score = 0
valuation_fit = "unknown"
if project.valuation and fund.check_size_lower and fund.check_size_upper:
reasonable_valuation_min = fund.check_size_lower * 3
reasonable_valuation_max = fund.check_size_upper * 10
if (
reasonable_valuation_min
<= project.valuation
<= reasonable_valuation_max
):
valuation_score = 20
valuation_fit = "perfect"
elif project.valuation < reasonable_valuation_min:
ratio = (
project.valuation / reasonable_valuation_min
if reasonable_valuation_min > 0
else 0
)
valuation_score = int(10 * ratio)
valuation_fit = "too_small"
else:
ratio = (
reasonable_valuation_max / project.valuation
if project.valuation > 0
else 0
)
valuation_score = int(10 * ratio)
valuation_fit = "too_large"
total_score = stage_score + sector_score + geo_score + valuation_score
return {
"total_score": total_score / 100,
"breakdown": {
"stage": {
"score": stage_score,
"max_score": 30,
"match": stage_match,
"project_stage": project.stage.value if project.stage else None,
"fund_stages": [s.name for s in fund.investment_stages]
if fund.investment_stages
else [],
},
"sector": {
"score": sector_score,
"max_score": 30,
"matching_sectors": matching_sectors,
"project_sectors": [s.name for s in project.sector]
if project.sector
else [],
"fund_sectors": [s.name for s in fund.sectors]
if fund.sectors
else [],
},
"geography": {
"score": geo_score,
"max_score": 20,
"match_type": geo_match_type,
"project_location": project.location,
"fund_geography": fund.geographic_focus,
},
"valuation": {
"score": valuation_score,
"max_score": 20,
"fit": valuation_fit,
"project_valuation": project.valuation,
"fund_check_size_range": f"{fund.check_size_lower}-{fund.check_size_upper}"
if fund.check_size_lower
else None,
},
},
}
else:
# Investor-level breakdown (simplified)
return {
"total_score": _calculate_project_investor_direct_compatibility(
project, investor
),
"note": "Using investor-level data (no specific fund selected)",
}
+260
View File
@@ -0,0 +1,260 @@
import os
import sys
import requests
class FolkAPI:
BASE_URL = "https://api.folk.app/v1"
def __init__(self, api_key: str):
self.headers = {"Authorization": f"Bearer {api_key}"}
def get_groups(self):
"""Fetch all groups from Folk."""
url = f"{self.BASE_URL}/groups"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def create_company(
self,
name: str,
group_id: str = None,
website: str = None,
linkedin_url: str = None,
description: str = None,
emails=None,
phones=None,
addresses=None,
urls=None,
custom_field_values=None,
groups=None,
**kwargs,
):
"""Create a company (investor) in a specific group.
This method builds a payload matching Folk's Create Company API:
https://developer.folk.app/api-reference/companies/create-a-company
It keeps backward compatibility with the previous `group_id`,
`website` and `linkedin_url` arguments.
"""
url = f"{self.BASE_URL}/companies"
# Build the top-level payload expected by Folk
data = {"name": name}
if description:
data["description"] = description
# Groups: prefer explicit `groups`, else fall back to `group_id`
if groups:
# Accept either list of ids or list of dicts
formatted = []
for g in groups:
if isinstance(g, dict) and g.get("id"):
formatted.append({"id": g["id"]})
else:
formatted.append({"id": str(g)})
data["groups"] = formatted
elif group_id:
data["groups"] = [{"id": group_id}]
# Helper to normalize single or multiple inputs into lists
def _to_list(val):
if val is None:
return None
if isinstance(val, (list, tuple)):
return [v for v in val if v is not None]
return [val]
# URLs: include website and linkedin_url if provided and merge with urls
urls_list = _to_list(urls) or []
if website:
urls_list.append(website)
if linkedin_url:
urls_list.append(linkedin_url)
if urls_list:
data["urls"] = urls_list
# Emails/phones/addresses
emails_list = _to_list(emails)
if emails_list:
data["emails"] = emails_list
phones_list = _to_list(phones)
if phones_list:
data["phones"] = phones_list
addresses_list = _to_list(addresses)
if addresses_list:
data["addresses"] = addresses_list
# Custom field values follow the API's structure
if custom_field_values:
data["customFieldValues"] = custom_field_values
# Allow passing any additional top-level fields via kwargs (careful)
for k, v in kwargs.items():
# don't overwrite keys we explicitly set
if k not in data:
data[k] = v
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()
def create_person(
self,
first_name: str,
last_name: str,
email: str = None,
company_id: str = None,
group_id: str = None,
companies=None,
emails=None,
phones=None,
addresses=None,
urls=None,
custom_field_values=None,
groups=None,
**kwargs,
):
"""Create a person in the workspace.
Builds payload matching Folk's Create Person API: use camelCase
keys (firstName, lastName, groups, companies, emails, etc.).
Keeps backward compatibility with `company_id` and `group_id`.
"""
url = f"{self.BASE_URL}/people"
data = {"firstName": first_name, "lastName": last_name}
# Groups: explicit `groups` preferred, else fallback to `group_id`
if groups:
formatted = []
for g in groups:
if isinstance(g, dict) and g.get("id"):
formatted.append({"id": g["id"]})
else:
formatted.append({"id": str(g)})
data["groups"] = formatted
elif group_id:
data["groups"] = [{"id": group_id}]
# Companies: keep backward compatibility with company_id
if companies:
formatted = []
for c in companies:
if isinstance(c, dict):
formatted.append(c)
elif isinstance(c, str):
# treat as id
formatted.append({"id": c})
if formatted:
data["companies"] = formatted
elif company_id:
data["companies"] = [{"id": company_id}]
# Helper to normalize into lists
def _to_list(val):
if val is None:
return None
if isinstance(val, (list, tuple)):
return [v for v in val if v is not None]
return [val]
emails_list = _to_list(emails) or []
if email:
emails_list.insert(0, email)
if emails_list:
data["emails"] = emails_list
phones_list = _to_list(phones)
if phones_list:
data["phones"] = phones_list
addresses_list = _to_list(addresses)
if addresses_list:
data["addresses"] = addresses_list
urls_list = _to_list(urls)
if urls_list:
data["urls"] = urls_list
if custom_field_values:
data["customFieldValues"] = custom_field_values
# Allow passthrough of other top-level fields in kwargs
for k, v in kwargs.items():
if k not in data:
data[k] = v
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()
# Prefer getting the API key from the environment. If not set, fall back to the
# existing (hard-coded) key so behavior is unchanged for now.
DEFAULT_API_KEY = "FOLKfIGXuv74ML9EAajxyiUR39ePaNrZ"
api_key = os.environ.get("FOLK_API_KEY", DEFAULT_API_KEY)
folk = FolkAPI(api_key=api_key)
def example_flow():
# Step 1: Get groups
groups = folk.get_groups()
print(groups)
# Safely dig into the returned structure. The API returns groups under
# groups['data']['items'] (not groups['data'][0]). Handle missing/empty.
items = groups.get("data", {}).get("items", [])
if not items:
print("No groups returned by Folk API.")
sys.exit(1)
# Choose the first group as an example
group_id = items[0].get("id")
if not group_id:
print("No id found for the first group item.")
sys.exit(1)
# Step 2: Choose a group_id and create a company
company = folk.create_company(
name="2050 Investment Partners",
group_id=group_id,
website="https://2050.com",
linkedin_url="https://linkedin.com/company/2050-investments",
)
# Step 3: Add a person to the same group or company
person = folk.create_person(
first_name="John",
last_name="Doe",
email="john@2050.com",
company_id=company.get("data", {}).get("id"),
group_id=group_id,
)
print("Created company:", company)
print("Created person:", person)
if __name__ == "__main__":
try:
example_flow()
except requests.HTTPError as e:
# Try to include response body for easier debugging if available
resp = getattr(e, "response", None)
if resp is not None:
try:
body = resp.text
except Exception:
body = "<unreadable response body>"
print("HTTP error while talking to Folk API:", e)
print("Response status:", resp.status_code)
print("Response body:", body)
else:
print("HTTP error while talking to Folk API:", e)
sys.exit(1)
except Exception as e: # pragma: no cover - top-level safety
print("Unexpected error:", e)
sys.exit(1)
+39 -8
View File
@@ -1,8 +1,8 @@
import os
from typing import List
from typing import List, Optional
from db.db import DATABASE_URL, get_db
from db.models import FundTable, InvestorTable
from db.models import FundTable, InvestorTable, ProjectTable
from langchain import hub
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase
@@ -16,6 +16,8 @@ from schemas.router_schemas import (
)
from sqlalchemy.orm import selectinload
from services.compatibility_score import calculate_project_investor_compatibility
# Connect to SQLite
prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt")
db = SQLDatabase.from_uri(DATABASE_URL)
@@ -44,8 +46,15 @@ class QueryProcessor:
prompt=system_message_updated,
)
def process_query(self, question: str) -> PaginatedResponse[InvestmentResponse]:
"""Process a query using the LLM and return investment response data."""
def process_query(
self, question: str, project_id: Optional[int] = None
) -> PaginatedResponse[InvestmentResponse]:
"""Process a query using the LLM and return investment response data.
Args:
question: The natural language query to process
project_id: Optional project ID for compatibility scoring
"""
# Let the LLM handle all database interactions and filtering to get fund IDs
response = self.agent.invoke(
{"messages": [("user", question)]},
@@ -60,7 +69,7 @@ class QueryProcessor:
fund_ids = self._extract_fund_ids_from_response(ai_response)
# Fetch full fund data with investor relationships using the IDs
return self._fetch_funds_by_ids(fund_ids)
return self._fetch_funds_by_ids(fund_ids, project_id)
def _extract_fund_ids_from_response(self, ai_response: str) -> List[int]:
"""Extract fund IDs from AI response."""
@@ -85,10 +94,15 @@ class QueryProcessor:
return fund_ids
def _fetch_funds_by_ids(
self, fund_ids: List[int]
self, fund_ids: List[int], project_id: Optional[int] = None
) -> PaginatedResponse[InvestmentResponse]:
"""Fetch funds with all their relationships from the database using fund IDs.
Constructs response similar to read_investors but starting from funds."""
Constructs response similar to read_investors but starting from funds.
Args:
fund_ids: List of fund IDs to fetch
project_id: Optional project ID for compatibility scoring
"""
if not fund_ids:
return PaginatedResponse(
items=[],
@@ -102,6 +116,16 @@ class QueryProcessor:
db_session = next(get_db())
try:
# Load project if project_id provided
project = None
if project_id is not None:
project = (
db_session.query(ProjectTable)
.options(selectinload(ProjectTable.sector))
.filter(ProjectTable.id == project_id)
.first()
)
# Query funds with all necessary relationships loaded
funds = (
db_session.query(FundTable)
@@ -127,6 +151,13 @@ class QueryProcessor:
for fund in funds:
investor = fund.investor
# Calculate compatibility score if project provided
compatibility_score = 1.0
if project is not None:
compatibility_score = calculate_project_investor_compatibility(
project=project, investor=investor, use_funds=True
)
# Get top 3 portfolio companies (id and name only)
portfolio_companies = [
CompanyMinimal(id=company.id, name=company.name)
@@ -158,7 +189,7 @@ class QueryProcessor:
stage_focus=stage_focus,
portfolio_companies=portfolio_companies,
sectors=fund_sectors,
compatibility_score=1.0,
compatibility_score=compatibility_score,
)
investment_responses.append(investment_response)