made querying async

This commit is contained in:
bolade
2025-10-28 21:09:47 +01:00
parent bb03f6ade4
commit 02c8bb816f
7 changed files with 27 additions and 14 deletions
Binary file not shown.
Binary file not shown.
-1
View File
@@ -1,5 +1,4 @@
import os import os
from pathlib import Path
from typing import Annotated from typing import Annotated
from fastapi import Depends from fastapi import Depends
+4 -4
View File
@@ -1,6 +1,4 @@
import io import io
import logging
import os
import pandas as pd import pandas as pd
from db.db import Base, db_dependency, engine from db.db import Base, db_dependency, engine
@@ -44,6 +42,7 @@ class QueryRequest(BaseModel):
} }
} }
class CompanyQueryRequest(BaseModel): class CompanyQueryRequest(BaseModel):
question: str question: str
@@ -54,6 +53,7 @@ class CompanyQueryRequest(BaseModel):
} }
} }
@app.get("/") @app.get("/")
def health(): def health():
return {"Hello": "World"} return {"Hello": "World"}
@@ -122,7 +122,7 @@ async def query_investors(request: QueryRequest):
- "Healthcare investors in Europe" - "Healthcare investors in Europe"
""" """
processor = QueryProcessor() processor = QueryProcessor()
results = processor.process_query(request.question) results = await processor.process_query(request.question)
return results return results
@@ -143,7 +143,7 @@ async def query_companies(request: CompanyQueryRequest):
- "European startups founded after 2019" - "European startups founded after 2019"
""" """
processor = CompanyQueryProcessor() processor = CompanyQueryProcessor()
results = processor.process_query(request.question) results = await processor.process_query(request.question)
return results return results
Binary file not shown.
+12 -4
View File
@@ -1,3 +1,4 @@
import asyncio
import hashlib import hashlib
import logging import logging
import os import os
@@ -116,11 +117,18 @@ Return ONLY the SQL query, no explanations or markdown.""",
"""Generate cache key from normalized question.""" """Generate cache key from normalized question."""
return hashlib.md5(question.lower().strip().encode()).hexdigest() return hashlib.md5(question.lower().strip().encode()).hexdigest()
def process_query(self, question: str) -> PaginatedResponse[CompanyData]: # synchronous helper is provided below as `_process_query_sync` and an
"""Process a query by generating and executing SQL directly. # async wrapper `process_query` runs it in a thread. This keeps the
# FastAPI event loop non-blocking while reusing the existing sync code.
async def process_query(self, question: str) -> PaginatedResponse[CompanyData]:
"""Async wrapper for process_query. Runs blocking work in a thread to avoid
blocking the event loop.
"""
return await asyncio.to_thread(self._process_query_sync, question)
Args: def _process_query_sync(self, question: str) -> PaginatedResponse[CompanyData]:
question: The natural language query to process """Synchronous implementation of process_query. This is run in a thread by
the async wrapper above.
""" """
cache_key = self._get_cache_key(question) cache_key = self._get_cache_key(question)
+11 -5
View File
@@ -1,3 +1,4 @@
import asyncio
import hashlib import hashlib
import logging import logging
import os import os
@@ -126,14 +127,19 @@ Return ONLY the SQL query, no explanations or markdown.""",
"""Generate cache key from normalized question.""" """Generate cache key from normalized question."""
return hashlib.md5(question.lower().strip().encode()).hexdigest() return hashlib.md5(question.lower().strip().encode()).hexdigest()
def process_query( async def process_query(
self, question: str, project_id: Optional[int] = None self, question: str, project_id: Optional[int] = None
) -> PaginatedResponse[InvestmentResponse]: ) -> PaginatedResponse[InvestmentResponse]:
"""Process a query by generating and executing SQL directly. """Async wrapper for process_query. Runs blocking work in a thread to avoid
blocking the event loop.
"""
return await asyncio.to_thread(self._process_query_sync, question, project_id)
Args: def _process_query_sync(
question: The natural language query to process self, question: str, project_id: Optional[int] = None
project_id: Optional project ID for compatibility scoring ) -> PaginatedResponse[InvestmentResponse]:
"""Synchronous implementation of process_query. This is run in a thread by
the async wrapper above.
""" """
cache_key = self._get_cache_key(question) cache_key = self._get_cache_key(question)