Files
Anton_wireframe/app/services/insight.py
T
2025-11-26 08:04:11 +00:00

182 lines
7.4 KiB
Python

import asyncio
import logging
import os
from crawl4ai import AsyncWebCrawler
from ddgs import DDGS
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from schemas.insight_schema import InsightResponse
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("web_search_agent")
load_dotenv()
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
if not OPENROUTER_API_KEY:
logger.warning("OPENROUTER_API_KEY not set. LLM calls will fail if invoked.")
class QueryProcessor:
def __init__(self):
self.llm = ChatOpenAI(
api_key=OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
model="openai/gpt-4o-mini",
temperature=0,
)
self.agent = create_react_agent(
model=self.llm,
tools=[self.web_search],
response_format=InsightResponse,
)
self.ddg_search = DDGS()
async def crawl(self, url: str):
"""Tool to search the web using a web crawler. given the url"""
logger.info(f"\nCrawl tool called with url: {url}")
async with AsyncWebCrawler() as crawler:
results = await crawler.arun(url)
return results.markdown
def web_search(self, query: str):
"""Tool to search the web using google, provide the relevant query to get the information"""
logger.info(f"\nWeb Search Tool Called with query: {query}")
if query:
result = self.ddg_search.text(query, max_results=10)
return result
return "No query provided."
async def get_investor_insights(
self,
investor_name: str,
investor_website: str = None,
investor_description: str = None,
investor_headquarters: str = None,
investment_thesis: list = None,
portfolio_highlights: list = None,
) -> dict:
"""
Get investment pattern analysis and market position for an investor.
Args:
investor_name: Name of the investor/VC firm
investor_website: Website URL of the investor
investor_description: Description of the investor
investor_headquarters: Headquarters location
investment_thesis: List of investment thesis statements
portfolio_highlights: List of notable portfolio companies
Returns:
Dictionary with investment_pattern_analysis and market_position
"""
logger.info(f"Getting insights for investor: {investor_name}")
# Build context information
context_parts = [f'Investment Firm: "{investor_name}"']
if investor_website:
context_parts.append(f"Website: {investor_website}")
if investor_headquarters:
context_parts.append(f"Location: {investor_headquarters}")
if investor_description:
context_parts.append(f"Description: {investor_description}")
if investment_thesis and isinstance(investment_thesis, list):
thesis_str = ", ".join(
str(item) for item in investment_thesis[:3]
) # Limit to first 3
context_parts.append(f"Investment Focus: {thesis_str}")
if portfolio_highlights and isinstance(portfolio_highlights, list):
portfolio_str = ", ".join(
str(item) for item in portfolio_highlights[:5]
) # Limit to first 5
context_parts.append(f"Notable Portfolio Companies: {portfolio_str}")
context = "\n".join(context_parts)
prompt = f"""
Research and analyze the following investment firm:
{context}
CRITICAL INSTRUCTIONS:
- You MUST provide concrete, data-driven insights with specific numbers and percentages
- Use the web_search tool to find recent news, press releases, and investment databases (Crunchbase, PitchBook, etc.)
- If you cannot find sufficient data after searching, make reasonable inferences based on available information
- DO NOT state that data is unavailable or ambiguous - provide the best analysis possible with what you find
- Focus on ACTIONABLE insights, not disclaimers
- Only call the tool twice at most, be strategic in your searches
- Summarize your findings concisely and clearly
Provide insights in the InsightResponse schema format:
1. investment_pattern_analysis (MAX 3 SENTENCES):
- Recent investment activity and trends in the last 12-18 months
- Investment size ranges, deal frequency, and sector preferences
- Notable patterns (e.g., "increased AI investments by 40%", "average check size $5-10M")
- If specific numbers aren't available, provide reasonable estimates based on portfolio and market position
2. market_position (MAX 3 SENTENCES):
- Standing in the venture capital market
- Activity level in specific sectors and notable unicorn investments
- Deal leadership roles (lead vs co-lead) and market influence
- Regional or global market presence and competitive positioning
Use the web_search tool strategically. Search for:
- "{investor_name}" recent investments 2024 2025
- "{investor_name}" portfolio Crunchbase
- "{investor_name}" funding rounds news
- Specific portfolio companies if mentioned above
"""
try:
result = await self.agent.ainvoke({"messages": [("user", prompt)]})
# The agent with response_format=InsightResponse returns structured output
logger.info(f"Raw agent result keys: {result.keys()}")
# Check if structured_response exists and is an InsightResponse object
if "structured_response" in result:
structured = result["structured_response"]
logger.info(f"Structured response type: {type(structured)}")
# If it's already an InsightResponse object, convert to dict
if isinstance(structured, InsightResponse):
return structured.model_dump()
# If it's already a dict, return it
elif isinstance(structured, dict):
return structured
# Fallback: shouldn't reach here, but handle it gracefully
logger.warning("No structured_response found in result, using fallback")
return {
"investment_pattern_analysis": "Unable to retrieve investment pattern analysis at this time.",
"market_position": "Unable to retrieve market position at this time.",
}
except Exception as e:
logger.error(f"Error getting insights for {investor_name}: {e}")
logger.exception("Full exception details:")
return {
"investment_pattern_analysis": "Unable to retrieve investment pattern analysis at this time.",
"market_position": "Unable to retrieve market position at this time.",
}
async def main():
qp = QueryProcessor()
result = await qp.agent.ainvoke(
{"messages": [("user", "Can you tell me about 3T Finance investment company")]}
)
final_message = result["messages"][-1].content
print(final_message)
if __name__ == "__main__":
asyncio.run(main())