feat: Add insight generation functionality with compatibility scoring and web search integration

This commit is contained in:
bolade
2025-10-13 23:19:46 +01:00
parent 75fc8666ca
commit e386ebbdef
11 changed files with 413 additions and 2 deletions
+1 -1
View File
@@ -13,4 +13,4 @@
*.cypython
nohup.out
Binary file not shown.
+2 -1
View File
@@ -5,7 +5,7 @@ from db.db import Base, db_dependency, engine
from dotenv import load_dotenv
from fastapi import FastAPI, File, Form, UploadFile
from pydantic import BaseModel
from routers import companies, folk_crm, investors, projects
from routers import companies, folk_crm, insight_route, investors, projects
from schemas.router_schemas import InvestmentResponse, PaginatedResponse
from services.llm_parser import InvestorProcessor
from services.querying import QueryProcessor
@@ -109,6 +109,7 @@ app.include_router(investors.router)
app.include_router(companies.router)
app.include_router(projects.router)
app.include_router(folk_crm.router)
app.include_router(insight_route.router)
if __name__ == "__main__":
import uvicorn
Binary file not shown.
Binary file not shown.
+80
View File
@@ -0,0 +1,80 @@
from typing import Optional
from db.db import get_db
from db.models import InvestorTable, ProjectTable
from fastapi import APIRouter, Depends, HTTPException
from schemas.insight_schema import InsightResponse
from services.compatibility_score import (
calculate_project_investor_compatibility,
generate_compatibility_explanation,
)
from services.insight import QueryProcessor
from sqlalchemy.orm import Session
router = APIRouter()
@router.get(
"/insights/{investor_id}", response_model=InsightResponse, tags=["Insights"]
)
async def get_insights(
investor_id: int, project_id: Optional[int] = None, db: Session = Depends(get_db)
):
"""
Get investor insights including investment pattern analysis, market position,
and optionally compatibility score with a project.
Args:
investor_id: The ID of the investor to analyze
project_id: Optional project ID to calculate compatibility score
Returns:
InsightResponse with investment_pattern_analysis, market_position,
and compatibility_score (if project_id provided)
"""
# Get investor from database
investor = db.query(InvestorTable).filter(InvestorTable.id == investor_id).first()
if not investor:
raise HTTPException(
status_code=404, detail=f"Investor with id {investor_id} not found"
)
# Initialize the query processor for insights
query_processor = QueryProcessor()
# Get investment pattern analysis and market position using web search
insights = await query_processor.get_investor_insights(
investor_name=investor.name,
investor_website=investor.website,
investor_description=investor.description,
investor_headquarters=investor.headquarters,
investment_thesis=investor.investment_thesis,
portfolio_highlights=investor.portfolio_highlights,
)
# Calculate compatibility score if project_id is provided
compatibility_score = None
if project_id:
project = db.query(ProjectTable).filter(ProjectTable.id == project_id).first()
if not project:
raise HTTPException(
status_code=404, detail=f"Project with id {project_id} not found"
)
# Calculate the compatibility score
score = calculate_project_investor_compatibility(
project, investor, use_funds=True
)
# Generate detailed explanation
compatibility_score = generate_compatibility_explanation(
project, investor, score, use_funds=True
)
else:
compatibility_score = "Select a project to see compatibility analysis"
return InsightResponse(
investment_pattern_analysis=insights["investment_pattern_analysis"],
market_position=insights["market_position"],
compatibility_score=compatibility_score,
)
+18
View File
@@ -0,0 +1,18 @@
from typing import Optional
from pydantic import BaseModel
class InsightResponse(BaseModel):
investment_pattern_analysis: str
market_position: str
compatibility_score: Optional[str] = None
class Config:
json_schema_extra = {
"example": {
"investment_pattern_analysis": "Sequoia has been increasingly active in AI/ML startups (43% increase in last 18 months). Their average investment size has grown 23% year-over-year, indicating confidence in larger rounds. Peak activity in Q2-Q3, suggesting seasonal investment patterns.",
"market_position": "Top 3 most active VC in enterprise software deals. Strong presence in unicorn companies (47 portfolio unicorns). Consistently leads or co-leads rounds, indicating decision-making influence.",
"compatibility_score": "0.85",
}
}
Binary file not shown.
Binary file not shown.
+137
View File
@@ -507,3 +507,140 @@ def get_compatibility_score_breakdown(
),
"note": "Using investor-level data (no specific fund selected)",
}
def generate_compatibility_explanation(
project: ProjectTable, investor: InvestorTable, score: float, use_funds: bool = True
) -> str:
"""
Generate a detailed, natural language explanation of the compatibility score.
Args:
project: The project being evaluated
investor: The investor being compared against
score: The calculated compatibility score (0-1)
use_funds: Whether fund-level data was used
Returns:
A formatted string with the compatibility score and detailed explanation
"""
score_percentage = int(score * 100)
# Determine match quality
if score_percentage >= 80:
match_level = "Excellent match"
elif score_percentage >= 65:
match_level = "Strong match"
elif score_percentage >= 50:
match_level = "Good match"
elif score_percentage >= 35:
match_level = "Moderate match"
else:
match_level = "Limited match"
# Collect alignment factors
alignment_factors = []
recommendations = []
# Get the best matching fund if using funds
best_fund = None
if use_funds and investor.funds:
best_score = 0
for fund in investor.funds:
fund_score = _calculate_project_fund_compatibility(project, fund)
if fund_score > best_score:
best_score = fund_score
best_fund = fund
# Analyze sector alignment
if project.sector:
project_sectors = [s.name for s in project.sector if hasattr(s, "name")]
if best_fund and best_fund.sectors:
fund_sectors = {s.name for s in best_fund.sectors if hasattr(s, "name")}
common_sectors = set(project_sectors) & fund_sectors
if common_sectors:
sectors_str = ", ".join(list(common_sectors)[:2])
alignment_factors.append(f"{sectors_str} sector focus")
elif project_sectors:
recommendations.append(
f"Consider emphasizing any {project_sectors[0]} industry connections"
)
elif investor.sectors:
investor_sectors = {s.name for s in investor.sectors if hasattr(s, "name")}
common_sectors = set(project_sectors) & investor_sectors
if common_sectors:
sectors_str = ", ".join(list(common_sectors)[:2])
alignment_factors.append(f"{sectors_str} sector focus")
# Analyze stage alignment
if project.stage:
stage_name = (
project.stage.value
if hasattr(project.stage, "value")
else str(project.stage)
)
stage_display = stage_name.replace("_", " ").title()
if best_fund and best_fund.investment_stages:
fund_stage_names = {
s.name for s in best_fund.investment_stages if hasattr(s, "name")
}
if stage_name in fund_stage_names:
alignment_factors.append(f"{stage_display} stage")
else:
recommendations.append(
"Investor typically focuses on different stages; highlight your traction and growth metrics"
)
if not best_fund:
alignment_factors.append(f"{stage_display} stage")
# Analyze geographic alignment
if project.location:
if best_fund and best_fund.geographic_focus:
if (
project.location.lower() in best_fund.geographic_focus.lower()
or best_fund.geographic_focus.lower() in project.location.lower()
):
alignment_factors.append(f"{project.location} presence")
elif investor.headquarters:
if (
project.location.lower() in investor.headquarters.lower()
or investor.headquarters.lower() in project.location.lower()
):
alignment_factors.append(f"{project.location} market presence")
# Analyze valuation/check size fit
if project.valuation:
if best_fund and best_fund.check_size_lower and best_fund.check_size_upper:
reasonable_min = best_fund.check_size_lower * 3
reasonable_max = best_fund.check_size_upper * 10
if reasonable_min <= project.valuation <= reasonable_max:
alignment_factors.append("appropriate funding stage")
elif project.valuation < reasonable_min:
recommendations.append(
"You may be early for this investor; consider approaching at a later stage"
)
else:
recommendations.append(
"Consider highlighting your growth trajectory and market opportunity"
)
# Build the explanation
explanation_parts = [f"Based on your startup profile: {score_percentage}% match"]
if alignment_factors:
alignment_text = ", ".join(alignment_factors)
explanation_parts.append(f"{match_level}: {alignment_text}.")
else:
explanation_parts.append(f"{match_level}.")
if recommendations:
rec_text = recommendations[0] # Show the most important recommendation
explanation_parts.append(rec_text + ".")
return " ".join(explanation_parts)
+175
View File
@@ -0,0 +1,175 @@
import asyncio
import logging
import os
from crawl4ai import AsyncWebCrawler
from ddgs import DDGS
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from schemas.insight_schema import InsightResponse
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("web_search_agent")
load_dotenv()
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
if not OPENROUTER_API_KEY:
logger.warning("OPENROUTER_API_KEY not set. LLM calls will fail if invoked.")
class QueryProcessor:
def __init__(self):
self.llm = ChatOpenAI(
api_key=OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
model="openai/gpt-5-nano",
temperature=0,
)
self.agent = create_react_agent(
model=self.llm,
tools=[self.web_search],
response_format=InsightResponse,
)
self.ddg_search = DDGS()
async def crawl(self, url: str):
"""Tool to search the web using a web crawler. given the url"""
logger.info(f"\nCrawl tool called with url: {url}")
async with AsyncWebCrawler() as crawler:
results = await crawler.arun(url)
return results.markdown
def web_search(self, query: str):
"""Tool to search the web using google, provide the relevant query to get the information"""
logger.info(f"\nWeb Search Tool Called with query: {query}")
if query:
result = self.ddg_search.text(query, max_results=10, backend="google")
return result
return "No query provided."
async def get_investor_insights(
self,
investor_name: str,
investor_website: str = None,
investor_description: str = None,
investor_headquarters: str = None,
investment_thesis: list = None,
portfolio_highlights: list = None,
) -> dict:
"""
Get investment pattern analysis and market position for an investor.
Args:
investor_name: Name of the investor/VC firm
investor_website: Website URL of the investor
investor_description: Description of the investor
investor_headquarters: Headquarters location
investment_thesis: List of investment thesis statements
portfolio_highlights: List of notable portfolio companies
Returns:
Dictionary with investment_pattern_analysis and market_position
"""
logger.info(f"Getting insights for investor: {investor_name}")
# Build context information
context_parts = [f'Investment Firm: "{investor_name}"']
if investor_website:
context_parts.append(f"Website: {investor_website}")
if investor_headquarters:
context_parts.append(f"Location: {investor_headquarters}")
if investor_description:
context_parts.append(f"Description: {investor_description}")
if investment_thesis:
thesis_str = ", ".join(investment_thesis[:3]) # Limit to first 3
context_parts.append(f"Investment Focus: {thesis_str}")
if portfolio_highlights:
portfolio_str = ", ".join(portfolio_highlights[:5]) # Limit to first 5
context_parts.append(f"Notable Portfolio Companies: {portfolio_str}")
context = "\n".join(context_parts)
prompt = f"""
Research and analyze the following investment firm:
{context}
CRITICAL INSTRUCTIONS:
- You MUST provide concrete, data-driven insights with specific numbers and percentages
- Use the web_search tool to find recent news, press releases, and investment databases (Crunchbase, PitchBook, etc.)
- If you cannot find sufficient data after searching, make reasonable inferences based on available information
- DO NOT state that data is unavailable or ambiguous - provide the best analysis possible with what you find
- Focus on ACTIONABLE insights, not disclaimers
Provide insights in the InsightResponse schema format:
1. investment_pattern_analysis (MAX 3 SENTENCES):
- Recent investment activity and trends in the last 12-18 months
- Investment size ranges, deal frequency, and sector preferences
- Notable patterns (e.g., "increased AI investments by 40%", "average check size $5-10M")
- If specific numbers aren't available, provide reasonable estimates based on portfolio and market position
2. market_position (MAX 3 SENTENCES):
- Standing in the venture capital market
- Activity level in specific sectors and notable unicorn investments
- Deal leadership roles (lead vs co-lead) and market influence
- Regional or global market presence and competitive positioning
Use the web_search tool strategically. Search for:
- "{investor_name}" recent investments 2024 2025
- "{investor_name}" portfolio Crunchbase
- "{investor_name}" funding rounds news
- Specific portfolio companies if mentioned above
"""
try:
result = await self.agent.ainvoke({"messages": [("user", prompt)]})
# The agent with response_format=InsightResponse returns structured output
logger.info(f"Raw agent result keys: {result.keys()}")
# Check if structured_response exists and is an InsightResponse object
if "structured_response" in result:
structured = result["structured_response"]
logger.info(f"Structured response type: {type(structured)}")
# If it's already an InsightResponse object, convert to dict
if isinstance(structured, InsightResponse):
return structured.model_dump()
# If it's already a dict, return it
elif isinstance(structured, dict):
return structured
# Fallback: shouldn't reach here, but handle it gracefully
logger.warning("No structured_response found in result, using fallback")
return {
"investment_pattern_analysis": "Unable to retrieve investment pattern analysis at this time.",
"market_position": "Unable to retrieve market position at this time.",
}
except Exception as e:
logger.error(f"Error getting insights for {investor_name}: {e}")
logger.exception("Full exception details:")
return {
"investment_pattern_analysis": "Unable to retrieve investment pattern analysis at this time.",
"market_position": "Unable to retrieve market position at this time.",
}
async def main():
qp = QueryProcessor()
result = await qp.agent.ainvoke(
{"messages": [("user", "Can you tell me about 3T Finance investment company")]}
)
final_message = result["messages"][-1].content
print(final_message)
if __name__ == "__main__":
asyncio.run(main())