diff --git a/app/services/__pycache__/llm_parser.cpython-312.pyc b/app/services/__pycache__/llm_parser.cpython-312.pyc index 30635c3..c929764 100644 Binary files a/app/services/__pycache__/llm_parser.cpython-312.pyc and b/app/services/__pycache__/llm_parser.cpython-312.pyc differ diff --git a/app/services/llm_parser.py b/app/services/llm_parser.py index 34d0aa5..b86ed1f 100644 --- a/app/services/llm_parser.py +++ b/app/services/llm_parser.py @@ -1,3 +1,4 @@ +import asyncio import json import os import re @@ -649,10 +650,55 @@ Return the lower and upper bounds in USD.""" print(f"Error processing row {row_idx + 1}: {e}") return None - async def parse_investors(self, df: pd.DataFrame, save_to_db: bool = True): + async def _process_single_investor( + self, idx: int, row: pd.Series, total_rows: int + ) -> Optional[dict]: + """Process a single investor row""" + try: + name = row.get("Name", "").strip() if pd.notna(row.get("Name")) else None + website = ( + row.get("Website", "").strip() if pd.notna(row.get("Website")) else None + ) + profile_json = ( + row.get("Final Investor Profile", "") + if pd.notna(row.get("Final Investor Profile")) + else None + ) + + if not name or not profile_json: + print(f"āš ļø Row {idx + 1}: Skipping - missing name or profile") + return None + + print(f"šŸ“Š Processing {idx + 1}/{total_rows}: {name}") + + # Process the investor profile + investor_data = await self.process_investor_profile( + name, website, profile_json + ) + + if investor_data: + print(f" āœ“ {name} parsed successfully") + return investor_data + else: + print(f" āš ļø {name} failed to process") + return None + + except Exception as e: + print(f"āŒ Error processing row {idx + 1}: {e}") + return None + + async def parse_investors( + self, df: pd.DataFrame, save_to_db: bool = True, batch_size: int = 10 + ): """ Parse investors from DataFrame using manual JSON parsing and LLM for currency conversion. + Processes multiple investors concurrently for better performance. Expected CSV columns: Name, Website, Final Investor Profile, Final Profile sourcing + + Args: + df: DataFrame with investor data + save_to_db: Whether to save to database + batch_size: Number of investors to process concurrently (default: 10) """ results = [] db = None @@ -661,50 +707,31 @@ Return the lower and upper bounds in USD.""" try: total_rows = len(df) - print(f"\nšŸš€ Starting to process {total_rows} investors...") + print( + f"\nšŸš€ Starting to process {total_rows} investors with batch size {batch_size}..." + ) - for idx, row in df.iterrows(): - try: - name = ( - row.get("Name", "").strip() - if pd.notna(row.get("Name")) - else None - ) - website = ( - row.get("Website", "").strip() - if pd.notna(row.get("Website")) - else None - ) - profile_json = ( - row.get("Final Investor Profile", "") - if pd.notna(row.get("Final Investor Profile")) - else None - ) + # Process in batches + for batch_start in range(0, total_rows, batch_size): + batch_end = min(batch_start + batch_size, total_rows) + print( + f"\nšŸ”„ Processing batch {batch_start + 1}-{batch_end} of {total_rows}..." + ) - if not name or not profile_json: - print(f"āš ļø Row {idx + 1}: Skipping - missing name or profile") - continue + # Create tasks for concurrent processing + tasks = [] + for idx in range(batch_start, batch_end): + row = df.iloc[idx] + task = self._process_single_investor(idx, row, total_rows) + tasks.append(task) - print(f"\nšŸ“Š Processing {idx + 1}/{total_rows}: {name}") + # Process batch concurrently + batch_results = await asyncio.gather(*tasks, return_exceptions=True) - # Process the investor profile - investor_data = await self.process_investor_profile( - name, website, profile_json - ) - - if investor_data: + # Filter out None results and exceptions, then save to database + for investor_data in batch_results: + if investor_data and not isinstance(investor_data, Exception): results.append(investor_data) - print(" āœ“ Parsed successfully") - print(f" - HQ: {investor_data.get('headquarters')}") - print( - f" - AUM: ${investor_data.get('aum'):,}" - if investor_data.get("aum") - else " - AUM: Not Available" - ) - print(f" - Funds: {len(investor_data.get('funds', []))}") - print( - f" - Team: {len(investor_data.get('team_members', []))}" - ) # Save to database if save_to_db and db: @@ -713,33 +740,29 @@ Return the lower and upper bounds in USD.""" db, investor_data ) if saved_investor: - db.commit() print( - f" āœ… Saved to database (ID: {saved_investor.id})" + f" āœ… Saved {investor_data['name']} to database (ID: {saved_investor.id})" ) else: - print(" āŒ Failed to save to database") + print( + f" āŒ Failed to save {investor_data['name']} to database" + ) except Exception as e: db.rollback() - print(f" āŒ Database error: {e}") - else: - print(" āš ļø Failed to process profile") + print( + f" āŒ Database error for {investor_data['name']}: {e}" + ) + elif isinstance(investor_data, Exception): + print(f" āŒ Exception occurred: {investor_data}") - # Commit every 10 investors to avoid memory issues - if save_to_db and db and (idx + 1) % 10 == 0: + # Commit batch to database + if save_to_db and db: + try: db.commit() - print(f"\nšŸ’¾ Committed batch at row {idx + 1}") - - except Exception as e: - print(f"āŒ Error processing row {idx + 1}: {e}") - if db: + print(f"šŸ’¾ Committed batch {batch_start + 1}-{batch_end}") + except Exception as e: db.rollback() - continue - - # Final commit - if save_to_db and db: - db.commit() - print("\nāœ… Final commit completed") + print(f"āŒ Failed to commit batch: {e}") except Exception as e: print(f"āŒ Fatal error in parse_investors: {e}") @@ -752,10 +775,60 @@ Return the lower and upper bounds in USD.""" print(f"\nšŸŽ‰ Completed! Processed {len(results)}/{total_rows} investors") return results - async def parse_companies(self, df: pd.DataFrame, save_to_db: bool = True): + async def _process_single_company( + self, idx: int, row: pd.Series, total_rows: int + ) -> Optional[dict]: + """Process a single company row""" + try: + name = row.get("Name", "").strip() if pd.notna(row.get("Name")) else None + website = ( + row.get("Website", "").strip() if pd.notna(row.get("Website")) else None + ) + investor_names = ( + row.get("Investor", "").strip() + if pd.notna(row.get("Investor")) + else None + ) + profile_json = ( + row.get("Final Investor Profile", "") + if pd.notna(row.get("Final Investor Profile")) + else None + ) + + if not name or not profile_json: + print(f"āš ļø Row {idx + 1}: Skipping - missing name or profile") + return None + + print(f"šŸ“Š Processing {idx + 1}/{total_rows}: {name}") + + # Process the company profile + company_data = await self.process_company_profile( + name, website, profile_json, investor_names + ) + + if company_data: + print(f" āœ“ {name} parsed successfully") + return company_data + else: + print(f" āš ļø {name} failed to process") + return None + + except Exception as e: + print(f"āŒ Error processing row {idx + 1}: {e}") + return None + + async def parse_companies( + self, df: pd.DataFrame, save_to_db: bool = True, batch_size: int = 10 + ): """ Parse companies from DataFrame using manual JSON parsing. + Processes multiple companies concurrently for better performance. Expected CSV columns: Name, Website, Investor, Final Investor Profile (actually company profile) + + Args: + df: DataFrame with company data + save_to_db: Whether to save to database + batch_size: Number of companies to process concurrently (default: 10) """ results = [] db = None @@ -764,58 +837,31 @@ Return the lower and upper bounds in USD.""" try: total_rows = len(df) - print(f"\nšŸš€ Starting to process {total_rows} companies...") + print( + f"\nšŸš€ Starting to process {total_rows} companies with batch size {batch_size}..." + ) - for idx, row in df.iterrows(): - try: - name = ( - row.get("Name", "").strip() - if pd.notna(row.get("Name")) - else None - ) - website = ( - row.get("Website", "").strip() - if pd.notna(row.get("Website")) - else None - ) - investor_names = ( - row.get("Investor", "").strip() - if pd.notna(row.get("Investor")) - else None - ) - profile_json = ( - row.get("Final Investor Profile", "") - if pd.notna(row.get("Final Investor Profile")) - else None - ) + # Process in batches + for batch_start in range(0, total_rows, batch_size): + batch_end = min(batch_start + batch_size, total_rows) + print( + f"\nšŸ”„ Processing batch {batch_start + 1}-{batch_end} of {total_rows}..." + ) - if not name or not profile_json: - print(f"āš ļø Row {idx + 1}: Skipping - missing name or profile") - continue + # Create tasks for concurrent processing + tasks = [] + for idx in range(batch_start, batch_end): + row = df.iloc[idx] + task = self._process_single_company(idx, row, total_rows) + tasks.append(task) - print(f"\nšŸ“Š Processing {idx + 1}/{total_rows}: {name}") + # Process batch concurrently + batch_results = await asyncio.gather(*tasks, return_exceptions=True) - # Process the company profile - company_data = await self.process_company_profile( - name, website, profile_json, investor_names - ) - - if company_data: + # Filter out None results and exceptions, then save to database + for company_data in batch_results: + if company_data and not isinstance(company_data, Exception): results.append(company_data) - print(" āœ“ Parsed successfully") - print(f" - Location: {company_data.get('location')}") - print(f" - Industry: {company_data.get('industry')}") - print( - f" - Founded: {company_data.get('founded_year')}" - if company_data.get("founded_year") - else " - Founded: Unknown" - ) - print( - f" - Executives: {len(company_data.get('key_executives', []))}" - ) - print( - f" - Investors: {len(company_data.get('investor_names', []))}" - ) # Save to database if save_to_db and db: @@ -824,33 +870,29 @@ Return the lower and upper bounds in USD.""" db, company_data ) if saved_company: - db.commit() print( - f" āœ… Saved to database (ID: {saved_company.id})" + f" āœ… Saved {company_data['name']} to database (ID: {saved_company.id})" ) else: - print(" āŒ Failed to save to database") + print( + f" āŒ Failed to save {company_data['name']} to database" + ) except Exception as e: db.rollback() - print(f" āŒ Database error: {e}") - else: - print(" āš ļø Failed to process profile") + print( + f" āŒ Database error for {company_data['name']}: {e}" + ) + elif isinstance(company_data, Exception): + print(f" āŒ Exception occurred: {company_data}") - # Commit every 10 companies to avoid memory issues - if save_to_db and db and (idx + 1) % 10 == 0: + # Commit batch to database + if save_to_db and db: + try: db.commit() - print(f"\nšŸ’¾ Committed batch at row {idx + 1}") - - except Exception as e: - print(f"āŒ Error processing row {idx + 1}: {e}") - if db: + print(f"šŸ’¾ Committed batch {batch_start + 1}-{batch_end}") + except Exception as e: db.rollback() - continue - - # Final commit - if save_to_db and db: - db.commit() - print("\nāœ… Final commit completed") + print(f"āŒ Failed to commit batch: {e}") except Exception as e: print(f"āŒ Fatal error in parse_companies: {e}") diff --git a/investors.db b/investors.db index e6f09a3..ea2d652 100644 Binary files a/investors.db and b/investors.db differ