From b1b1c5ea1e043c2df8de54838fb51783229bb46c Mon Sep 17 00:00:00 2001 From: bolade Date: Thu, 11 Sep 2025 16:23:22 +0100 Subject: [PATCH] Made improvements to parsing --- README.md | 2 +- app/__pycache__/main.cpython-312.pyc | Bin 3398 -> 3401 bytes app/db/__pycache__/db.cpython-312.pyc | Bin 1625 -> 1623 bytes app/db/__pycache__/models.cpython-312.pyc | Bin 4444 -> 4444 bytes app/main.py | 2 +- .../__pycache__/openrouter_v2.cpython-312.pyc | Bin 0 -> 13928 bytes .../__pycache__/querying.cpython-312.pyc | Bin 10249 -> 10247 bytes app/services/openrouter_v2.py | 290 ++++++++++++++++++ app/services/querying.py | 2 +- 9 files changed, 293 insertions(+), 3 deletions(-) create mode 100644 app/services/__pycache__/openrouter_v2.cpython-312.pyc create mode 100644 app/services/openrouter_v2.py diff --git a/README.md b/README.md index 6f7f306..d5890cc 100644 --- a/README.md +++ b/README.md @@ -338,7 +338,7 @@ When `--use-llm` is enabled: OPENROUTER_API_KEY=your_openrouter_api_key_here # Database Configuration (optional, defaults to SQLite) -DATABASE_URL=sqlite:///investors_2.db +DATABASE_URL=sqlite:///investors.db # FastAPI Configuration API_HOST=localhost diff --git a/app/__pycache__/main.cpython-312.pyc b/app/__pycache__/main.cpython-312.pyc index 15c2f231fd84c16fd2cce0216311e58c1d4de20d..9dcd9436bfe5a15926f16e9ebd7468ed8cca6c89 100644 GIT binary patch delta 56 zcmX>mbyAA=G%qg~0}#x9bSR^2BX1_RxL9#&QCVhkYO!8^L26!6erZW+QGA)vpbxex)G%qg~0}z~M-;q(gkvEfDM7TJ$s4O!%wOB8|AT_TjzqBN^XmT(2K1RdI HK0JQ`#zzun diff --git a/app/db/__pycache__/db.cpython-312.pyc b/app/db/__pycache__/db.cpython-312.pyc index d31bfecf5de523ebab623990a0bc5102de249e6e..51bb0e04a44506c1c14823f7647dce2426faea29 100644 GIT binary patch delta 57 zcmcb~bDf9xG%qg~0}vPp?Z{ZakvEM|M69?lC$l8gN?%_;Gp{VQxFo-*cylLX4GW9o LWqzm0oNNIA?KlyG delta 59 zcmcc4bCZYnG%qg~0}u$^+m^9$BX1g`m_%`5PG(7}mA<}yW?orpaY=qralFyycE%bO N7U#?SE|b~W0st@f62<@k diff --git a/app/db/__pycache__/models.cpython-312.pyc b/app/db/__pycache__/models.cpython-312.pyc index 5076c67022d5d29b7f3e8c166ab3c5b08beb8cc8..e6daac03b0c7e80cd371cc1c38abe14a35a31876 100644 GIT binary patch delta 56 zcmcbkbVrHzG%qg~0}#wRw=*MqBd-Goqv~cqjxEfLZkw5Tb}=%>ZQjQ_jgc{Db0~it LBcssdEdnb5=YA0i delta 56 zcmcbkbVrHzG%qg~0}w3#xjiFiBd-GoquORajxEfL?wgr;b}=%>Z{Ei{jgc{Tb0~it LBcsUVEdnb5@JJCu diff --git a/app/main.py b/app/main.py index ed1caa1..2eb0264 100644 --- a/app/main.py +++ b/app/main.py @@ -6,7 +6,7 @@ from db.db import db_dependency, init_database from fastapi import FastAPI, File, UploadFile from py_schemas import InvestorList from pydantic import BaseModel -from services.openrouter import InvestorProcessor +from services.openrouter_v2 import InvestorProcessor from services.querying import QueryProcessor app = FastAPI() diff --git a/app/services/__pycache__/openrouter_v2.cpython-312.pyc b/app/services/__pycache__/openrouter_v2.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a386d811b8083e4bd0bc5f5568478f99003fb49f GIT binary patch literal 13928 zcmbVTeQ*@VmG9Zv+3#JgK6mv!=mWG!NJs)9ganq55MU5C0vr^R^|CXPR_up7vqEUI z8@p_h$SHCb=Ol>or68A71qX7`Rox$4>i!6}@8YUUU3QT0>`ic)y7-TC)z#I4$QcvY z-MyaK*zGyM(+6!K`C+7v7am5h}TNQZ_IV?K`<3tN1hRN6IGDpRdM zwO!X`Xk~MR$Nn*X8S&^O{hkqWu+0~aM#A3N0LPAVP;HxU zdb*8gxw8R3%eO^9ZXC3f<-BJ*TBql@0w`Aw9FX8~?cYZKs^5ICxOPQf%emCtH3BpN z6+f%Lh9DqQLR9=L(NXc2BqUMUH33{x%5m~P`U%!QULR|P%2c17iJlV zhmUY0l9hik=;h%BWG^|-%CphSoc8*I0T8rbojte`1N`v&Do4l*mWU%Srpr$>%Na*g zDlso+bQ8o4Qe8;!WoAhs6g8@BQB~AvN>&>xU5ObzLotRM#_M|J9m>Ehk{_tkrlxc8 z6dvw%Pt>Uru0t>I1Lc^N6r^#VvUSIG@P;Re{6z6+$4rJ4#Oznlva%eP znwqLsQ@NP2FjmGEAa2<6q?GN!9P&6ueCMI>CnGbok7H?Hm}bvSvs{$MOApdf&gaKV zi1xu~@=;hBt@MyWrPXB$30TY_d8yIkpls;qBuh_ok+T5?R`uS|6G~~mFhg@JEcGxQ zh(dw1FMsiKtUtmrtrp90;4B-Kix`gtgOOR#KDjcu$ZU`0n=j6Q3WQcAEEcsfxbUnh zS@-pLfDJOdtZ{J5)OPn+w$NC4^w7uhsGERsCK!}+fPrCoKNpa-aSuJ1d!ZxaaxH3M z!0MX`_0au1XiT1-2{SB5hkRk*1dFu`-30v&grn>P%eBxmJWFq-1LHJkDB5?{7YJhA z?twBuH~PhA4{2F zBNLo&dNSbmj)Riqd+6aT1qN~gPYXE%+6_}qfL$ogAuqfA%0=%&yNAEi&TbR-PF z@gACIS*8U-F2M3$Up93*oAS5N6ENb@$rc)KU0%5~+`;e+-s!xNac@o@(7q9wyjDy+ zi~<7^V%TxtOfX7s^;j(ZK|Z2RZs-Xvn~~=zo)Xg$E($X*5b^rujTagRqX3g1mzyQw z!Jpf7R-H}oeIN`UM`0WGhU9%X?<-8DGgNvu5S^4~R}0NWf@}*7yI~-RWhg7kyw5TO zxvGo|BVmi>KnOPxmSPVLUmX~SbtKPlI>4`;P|8Gu-HVG%a{=fi%w|~#JgzM_V)WX9 zk>~YUUogPXEC`y0B&;|1WI5>J_1O_xXXR&0m@e-4@hqA(9`36~8 zZX7$whIv@s*%hepWmdjqQkONrAX790SEPbK7+{0^;NINv6|)b-XiN zP@(`t#6~1|%_SYny)(^qCJfFTg_jX7oKsb3+ z(08zebMqC88FZ>Luo(GaT_l-8Y}ALv=W%ki&{_`fBa#7z5HK8vw-L^cDMd$~85%iy zZv=WFjev9Zpo!Rq@?HB;CQY~9&g_?O>=I!0xzu9 zK-BAvRjAU*LWWi~uM{}>UqbLQN?Xdy_Y(8^3-*Uy$kmuCEx&Z?;;E&vr9%%%oy!6l z)`uokQggSYN-Sv;O4?GTovEU&DR=opGgX4~O=$`2AFe_LD}infE_%PuflKjdoT~5Mg$QsDwmT&T){pd5WF3rlj}F$AMi z;elp8`6c=KTzv*5VhO#j zf1UD^6UeVS1&~dZRb2ljVoY)BP4r{(qV9r2kNjkUO6aq78_g(+7nA%j=D3cr#2^h{ zS$|BbTm*nQEyO)lnE5a-iW}9hUIh#9n=t=>L|!7lLs2MUikqg2@>nxT&ZbQtMRPX4 zj>xol7VSV6>g!RoL@l8KaWx5ZUhD07tvAKZ(C2JF33L)#lQ4hFJZDqlSz;EQBTu4P z;+Nzs;c>*OGD{J*5?)c*Fpu$mhc(e-!t$7)9~1V+gjquV{JDzfU?==m2O&^qbR3<6 z^`0Q2S-v=-N`*mZo`(glj}r`W7I6^&h`2-$=t6}a&FSQ}kRQ`fl!xSYLd5OD0iGKp zpnplghoC2Qty?9%f6~YCSes|m%VTrz1RDnI%I(f+-#q;S`V(6xUg%b9Z!qHb1^L~r z*-xNRh;N4A-_fI&(F3I4Sai=_b=Os~xb2O(s{?=e(zS+U$Dpuz@F9gt%kPtr|0$3A z{z?r^Jl3_)FlyP~piouIOKKu8h+n>~CEnmh9-3f$(+6Jo4J zLy70*AA$n-VGw{U(X<7*%IB@Ais~z#%bquzAD{wr+iM5s_b!y)Ehv2b+-v6+sqeq^ z(11$ai_{wjKP;`f_}t>LtDVcGo1T!PT5}Hm8bRpRN`n`#u8NRrVC|DQsTO@&`yK zm!2%~T-&tN|5o$0>ZQ{+3vWUdy(_l9yTz_Qrzv3hj2{JozS25G5`Wv@4dL6K!ofnz z+dU>oze8>vEJyEj5`!-NJKHKD{cZuKzw07!y4*a3$am{X26vI~c2StVivr3&A`;S4 zkpf6ycgIWJ8-$JDkJpv@U|lZnSw90qT3z8&YHnW46LW+ajNL)BK%V*?XaHSquZrum zcB`qZshun437lCY{!2NthW3|oY8dxVs9nR~@&wMTBj8JG*Vqf5P-a%==gPdHyPlB< zpa)IY$Q1TFt$X5Y_q27n_;o7lj>u)IJ2vctIxQy!;FJHr*yIWA&~QpbYt+)!ePL>Y zO3B4JF3owO*P@I3#VLLwit)Al#5GF;*HUH=1C;XBrEE~Yl97X2 zMPB?RQYN*eJk^+2KqeW_tLef!aSBk9h7;*50a)<1cAjRbSqusL@_*{_sBipy-LT4u zQ!rZ+#wSQdYusUUlt-1&l-9UsxwJ+&Zq%ZF<#7iT#=jBusO``$4XB${j$91XmfV|~ z0xI5=*F8Ypx{d^|H}uyt=MxF@6QriKYO^lD|Mz^-7(dOC&#UP|I&OkFqw%FYpZ-TX zPqS2$I@$!xnG03&jOzHjbU=^%4a_IY6C|TGpSG!`s5NNLr!3bL21Vb%`tX(%qwkOA_5=xO%^jO68Cc6pdf1sS z{2t`5t;_n8Uo3d?``Gqkq^hhO^0Y)-_yEYVva)&U148pS%Owy*fh38}pEjhEC8M0)-hhNGm zuH?*Xw8Eb&S#c3d4q(sY;*LY}cnm^SgG`B4vkZ^4HyZK6shg%*mY4q`F6jqk#w@#6 zP%IYI-Y%$3*^1Ko=LmCAYD4EL8X@d`3-*s{np5Q)#qu_xyiF|MCX{bW(OX2iOQ5?% zdO)BDQtn34-72_SMR$kb?npIuh>g31#$96LA))cmm%OekYV!_p^Haj+r^L-Cgv}=& z7Mf)Pt^^g9icVT^(xS6La5h~1`iiqXE*N5e`m&<-;73r-4y>;nnaoeB(|LMWy@}UpQYc7|*=}H?> zX~TURs%pO5(6;n-ar=<4eQ2d&-=aC?u2Rcu5!@};POi9jAt^t`xzp8)^qeaq#;c^zrQ9cj7S*q++3McmLK zY=ALp7aMzo#-3clvt9HI2%dpd!&b4OTWIKhXfvzKg zc>q1TQXAUE4PC;9u3Vz7L#*o->bg@+oxgIt^EfEl7>DFF=0GV`-`GFj_+)3etj|1o|4&9UG3a>3$HP!(&XbTU{axBopQ*)}mq5LoJrPS}Xv&zNi+93NR8@h6Et5!3$tV4>?#P zuY1lBJUDF`a5*yu;LvafFvd6a;FLCMIfLaqGvsNQ{BoWJ@-$BVvBMg2@G;A}N6R1t zZX0$GGxoR@LdTDbwQdfm5Deg^ZYpai=ZcT#K}&$FS{$0P`|9n~?kiBWq+0M-dG2m7 z=QY~HIPp;cQwSw!jH#CCz!Wis*@F=cQ6!*PKOtmnW@uK}>#u??eZ0XD!#<(5wtUyg8tP+!bt$%iZlGB6T+)FKH+A$2`HF*8V6H2 zkWHWnPT^ViWQ6}L@eg_BNg+L+# zGU}EaA`!gY42kedW(6$6%?rtdFZD(7MWio8!9`vbs+=Z672T`^xaFZGF?$Ar#f z@7Eo_=DXVd^RAzE-L5;n>^?DnAgw2=Tq#%WrNqU=LL`ldjTNbiI6rQoyDo*%1gZ$d#@77qBX^yJDGE4B$0+mhwmf!tM=Dyv>N38~t;R9*AcN=R+7rwh$)=O;y| zu|&EnTqY#URyQswPlRl}9Tmk+1vo5lJaLj4Z0zE7y{OEqm3 zoBD;O{#0fCmEOy}_svvc(L>T`cch)j?tJ~=YX?(Bm5cq0or}J!rk~q?Vi)P{0=<37 zf3tI$-Xqe(0zLd5`QGR<{VbfrC!M4JMe5+=P8f~*J;+v?)*0-3z&f~8aIrvi)(g)1 zYk}YFOddU+bk?spPuz8urpi1wfOx$%>Digwc_LZz+@DBYVG)3STj{3{cAyRWiBF}v z4v{HYy(OvAkV8=z*7i>gyWbgMQm-gY<9rnD^7(d-$2?(77%Ttohr`x=@m&XuU9a{TLC zPaycBROQRXU-U(s991`S*w*1Lz^m}v2)_dP%-k63&ZxNDMM-?e4j)4M;DR9?4TSLB zPPie7_w7745WnU$A4}*B_^sZA;Db ztg-&j?r7r&y=}lB8;QBrI2<#b#!g2|4y?cx2cwUl3;P2R$vmn`5?qRaKR`UZj36Y#1YE5H zcc3Kz*H6HS8uqg?c*w&NFG07s*CFuO6sO$+CR~I7UzWfZ7kKVR_|1%^9`*5OWcYMW z8TlXJ94pSr@Jg243kP;B;<5I4AqQ8VQl^7d9ZMMCSoIf<|9|6B;4ebZr^=-V6Dls9 zH{x3ja6_$8C~3SJUM|@_Z~l{>H1~naq0|NV!d-F2dfED>9fsBHd2K(;h2rc+y6C^! zxj1&m)&wJM_P~XK>V_*Lmq${254~?cymU@*G+lGwadh0aA5N2GwdGz--F*t;PY?IdZDQP>hN+=i)d>}+FE3qM<{|z3(G|vqOBuo>$q1? zHt+cKK`nBO5YQZ#NewJ#*ngi(sx@rGS_|NIXwaMN7wSwg}sY(+I!3 zH@GUt>9}qSwddxx-`Ip*htmks$)hJ$ah!gEKs$EcCmSeNs;J~r%f*(ooZi|g$M?+k z`JiB~d`JO<(jnXzkBeJ`NHRp{rop|fNCOLyYm>?Fw?~YRR|E(??^gMuEU$Rw@$rD- zc*i@vR2KAwC*W#UAPg{uV_RYEz^zqYxx?zrW#Ye~@df1DtdeDV&dbATO9<+~Ln4`` z=NP!$9Po2^Mas8bmA|o(1;2JM9-DfVKL^^){R)_2?IGEV1`CED{(u}Gq78pU1%E)r tAEKkn=;#Ni{R6b=162P3s`&s_KQxd8y>Rj&g6I?5h@NmgKsc0T^S@!qldJ#$ literal 0 HcmV?d00001 diff --git a/app/services/__pycache__/querying.cpython-312.pyc b/app/services/__pycache__/querying.cpython-312.pyc index b6c62aa3eb6a664915d0fdc49a962b559ec1b762..ba949b05af0851b9095b8393209181bdb2e12d52 100644 GIT binary patch delta 57 zcmeASXb<2$&CAQh00a@w4`noMFeuf=9Q%um*f`}#~W?dV_K-n NVse?)bn<0&69D&P5*7de diff --git a/app/services/openrouter_v2.py b/app/services/openrouter_v2.py new file mode 100644 index 0000000..d37120d --- /dev/null +++ b/app/services/openrouter_v2.py @@ -0,0 +1,290 @@ +import asyncio +from typing import List, Optional + +import chromadb +import pandas as pd +from db.models import CompanyTable, InvestorTable, InvestorTeamMember, SectorTable +from langchain_core.prompts import PromptTemplate +from langchain_openai import ChatOpenAI +from py_schemas import InvestorData +from pydantic import BaseModel +from settings import settings + + +class InvestorOutput(BaseModel): + """Schema for LLM structured output""" + + investor_data: InvestorData + + +class InvestorProcessor: + def __init__( + self, + sql_session: Optional[object] = None, + vector_db_client: Optional[object] = None, + ): + self.template = """You are an expert data extraction assistant. Extract investor information from the provided CSV data and return it as a structured record. + +Given the following CSV data row: +{question} + +Extract and structure the following fields for the investor: +- name: The investor's full name +- description: Description of the investor +- aum: Assets under management (as integer, use 0 if not available) +- check_size_lower: Lower bound of investment check size (as integer) +- check_size_upper: Upper bound of investment check size (as integer) +- geographic_focus: Geographic region focus +- stage_focus: Investment stage focus (must be one of: seed, series_a, series_b, series_c, growth, late_stage) +- number_of_investments: Number of investments made (default 0) + +Also extract related data: +- portfolio_companies: List of companies they've invested in +- team_members: List of team members with name, role, email +- sectors: List of sectors they focus on + +Important: +- If a field is not available, use appropriate defaults +- stage_focus must be one of the valid enum values +- Return clean, valid JSON only + +Return the data as a single comprehensive investor data record.""" + + self.prompt = PromptTemplate( + template=self.template, input_variables=["question"] + ) + + self.llm = ChatOpenAI( + api_key=settings.OPENROUTER_API_KEY, + base_url="https://openrouter.ai/api/v1", + model="google/gemini-2.5-flash-lite", + temperature=0, + ) + + self.structured_llm = self.llm.with_structured_output(InvestorOutput) + self.sql_session = sql_session + self.vector_db_client = vector_db_client + + self.vector_db_client = chromadb.PersistentClient(path="./chroma_db") + self.collection = self.vector_db_client.get_or_create_collection( + name="investor_descriptions", + metadata={ + "description": "Investor descriptions and investment thesis focus" + }, + ) + + async def _process_row( + self, row: pd.Series, row_idx: int + ) -> Optional[InvestorData]: + """Process a single row of data""" + # Clean values to remove control characters + cleaned_row = {} + for key, value in row.items(): + if pd.notna(value): + # Convert to string and clean control characters + clean_value = ( + str(value) + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " ") + ) + # Remove other control characters + clean_value = "".join( + char + for char in clean_value + if ord(char) >= 32 or char in ["\n", "\r", "\t"] + ) + cleaned_row[key] = clean_value + + row_str = ", ".join( + [f"{key}: {value}" for key, value in cleaned_row.items()] + ) + + try: + print(f"Processing row {row_idx + 1}...") + result = await self.structured_llm.ainvoke(row_str) + if result.investor_data: + return result.investor_data + return None + except Exception as e: + print(f"Error processing row {row_idx + 1}: {e}") + return None + + async def _save_to_sql(self, investor_data_list: List[InvestorData]) -> None: + """Save investors and related data to SQL database""" + if not self.sql_session: + return + + try: + for investor_data in investor_data_list: + # Save investor + db_investor = InvestorTable( + name=investor_data.investor.name, + description=investor_data.investor.description, + aum=investor_data.investor.aum, + check_size_lower=investor_data.investor.check_size_lower, + check_size_upper=investor_data.investor.check_size_upper, + geographic_focus=investor_data.investor.geographic_focus, + stage_focus=investor_data.investor.stage_focus, + number_of_investments=investor_data.investor.number_of_investments, + ) + self.sql_session.add(db_investor) + self.sql_session.flush() # Get the ID + + # Save sectors and create associations + for sector_data in investor_data.sectors: + # Check if sector exists, create if not + existing_sector = ( + self.sql_session.query(SectorTable) + .filter(SectorTable.name == sector_data.name) + .first() + ) + + if not existing_sector: + db_sector = SectorTable(name=sector_data.name) + self.sql_session.add(db_sector) + self.sql_session.flush() + # Add sector to investor's sectors + db_investor.sectors.append(db_sector) + else: + # Add existing sector to investor if not already there + if existing_sector not in db_investor.sectors: + db_investor.sectors.append(existing_sector) + + # Save companies and create portfolio associations + for company_data in investor_data.portfolio_companies: + # Check if company exists, create if not + existing_company = ( + self.sql_session.query(CompanyTable) + .filter(CompanyTable.name == company_data.name) + .first() + ) + + if not existing_company: + db_company = CompanyTable( + name=company_data.name, + industry=company_data.industry, + location=company_data.location, + founded_year=company_data.founded_year, + website=company_data.website, + ) + self.sql_session.add(db_company) + self.sql_session.flush() + + # Add to investor's portfolio + db_investor.portfolio_companies.append(db_company) + else: + # Add existing company to portfolio if not already there + if existing_company not in db_investor.portfolio_companies: + db_investor.portfolio_companies.append(existing_company) + + # Save team members + for team_member_data in investor_data.team_members: + # Check if team member exists + existing_member = ( + self.sql_session.query(InvestorTeamMember) + .filter(InvestorTeamMember.email == team_member_data.email) + .first() + ) + + if not existing_member: + db_team_member = InvestorTeamMember( + name=team_member_data.name, + role=team_member_data.role, + email=team_member_data.email, + investor_id=db_investor.id, + ) + self.sql_session.add(db_team_member) + + self.sql_session.commit() + print(f"Successfully saved {len(investor_data_list)} investors to database") + + except Exception as e: + self.sql_session.rollback() + print(f"Error saving to SQL database: {e}") + raise + + async def _save_to_vector_db(self, investor_data_list: List[InvestorData]) -> None: + """Save investors to vector database""" + if not self.vector_db_client: + return + + documents = [] + metadatas = [] + ids = [] + + for i, investor_data in enumerate(investor_data_list): + investor = investor_data.investor + sectors = ", ".join([s.name for s in investor_data.sectors]) + companies = ", ".join([c.name for c in investor_data.portfolio_companies]) + + doc_text = f""" + Investor: {investor.name} + Description: {investor.description or "N/A"} + AUM: ${investor.aum:,} + Check Size: ${investor.check_size_lower:,} - ${investor.check_size_upper:,} + Geographic Focus: {investor.geographic_focus} + Stage Focus: {investor.stage_focus.value} + Sectors: {sectors} + Portfolio Companies: {companies} + """.strip() + + documents.append(doc_text) + metadatas.append( + { + "name": investor.name, + "stage_focus": investor.stage_focus.value, + "geographic_focus": investor.geographic_focus, + "aum": investor.aum, + } + ) + ids.append( + f"investor_{i}_{investor.name.replace(' ', '_').replace('/', '_')}" + ) + + if documents: + try: + self.collection.add(documents=documents, metadatas=metadatas, ids=ids) + print( + f"Successfully saved {len(documents)} investors to vector database" + ) + except Exception as e: + print(f"Error saving to vector database: {e}") + + async def process_csv( + self, df: pd.DataFrame, max_concurrent: int = 10 + ) -> List[InvestorData]: + """Process CSV data one row at a time and save to databases""" + results = [] + + # Create semaphore for concurrency control + semaphore = asyncio.Semaphore(max_concurrent) + + async def process_row_with_semaphore(row_data): + row, row_idx = row_data + async with semaphore: + return await self._process_row(row, row_idx) + + # Create row tasks + row_tasks = [] + for idx, row in df.iterrows(): + row_tasks.append((row, idx)) + + # Execute all rows concurrently + row_results = await asyncio.gather( + *[process_row_with_semaphore(row_data) for row_data in row_tasks], + return_exceptions=True, + ) + + # Collect results, filtering out exceptions and None values + for row_result in row_results: + if not isinstance(row_result, Exception) and row_result is not None: + results.append(row_result) + + # Save to databases + if results: + print(f"Successfully processed {len(results)} investors") + await self._save_to_sql(results) + await self._save_to_vector_db(results) + + return results diff --git a/app/services/querying.py b/app/services/querying.py index da1c6a3..e76f94f 100644 --- a/app/services/querying.py +++ b/app/services/querying.py @@ -14,7 +14,7 @@ from sqlalchemy.orm import selectinload # Connect to SQLite prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt") -db = SQLDatabase.from_uri("sqlite:///investors_2.db") +db = SQLDatabase.from_uri("sqlite:///investors.db") system_message = ( prompt_template.format(dialect="SQLite", top_k=5) + "\n Get answers from the Sql database and the vector database"