Enhance batch processing in LLMTaxAnalyzer with fallback to individual analysis on failure

This commit is contained in:
bolade
2025-10-05 20:03:46 +01:00
parent ae200bd30f
commit 7c412bcf9e
+108 -4
View File
@@ -59,26 +59,59 @@ class LLMTaxAnalyzer:
"""
Batch process all matches in a SINGLE LLM call to reduce costs.
Analyzes all receipt-transaction pairs together and applies tax rules.
Falls back to individual processing if batch fails.
"""
if not matches:
return matches
logger.info(f"Starting batch tax analysis for {len(matches)} matches")
# Build batch context for all matches
batch_context = self._build_batch_analysis_context(matches, user_location)
try:
batch_context = self._build_batch_analysis_context(matches, user_location)
except Exception as e:
logger.error(f"Error building batch context: {str(e)}")
# If we can't even build the context, return matches as-is
for match in matches:
match.match_reason += " (Batch analysis setup failed)"
return matches
# Get LLM analysis for ALL matches at once
llm_batch_analysis = self._get_llm_tax_analysis_batch(
batch_context, len(matches)
)
# Check if we got any analysis back
if not llm_batch_analysis:
logger.warning("Batch LLM analysis returned empty results")
# Fallback: Try processing each match individually if batch size is small
if (
len(matches) <= 5
): # Only fallback for small batches to avoid excessive API calls
logger.info(
f"Attempting individual processing fallback for {len(matches)} matches"
)
return self._process_matches_individually(matches, user_location)
else:
logger.warning(
f"Batch too large ({len(matches)} matches) for individual fallback - returning matches without enhanced tax analysis"
)
for match in matches:
match.match_reason += " (Batch tax analysis unavailable)"
return matches
logger.info(f"Received batch analysis for {len(llm_batch_analysis)} matches")
# Apply results to each match
enhanced_matches = []
for i, match in enumerate(matches):
try:
# Get the analysis for this specific match from the batch results
match_analysis = llm_batch_analysis.get(f"match_{i}", {})
match_key = f"match_{i}"
match_analysis = llm_batch_analysis.get(match_key, {})
if match_analysis:
if match_analysis and isinstance(match_analysis, dict):
# Apply the tax analysis to this match
enhanced_match = self._apply_tax_analysis_to_match(
match, match_analysis
@@ -86,6 +119,9 @@ class LLMTaxAnalyzer:
enhanced_matches.append(enhanced_match)
else:
# No analysis available for this match, use as-is
logger.warning(
f"No analysis found for match {i} (key: {match_key})"
)
match.match_reason += " (Tax analysis incomplete)"
enhanced_matches.append(match)
except Exception as e:
@@ -93,6 +129,38 @@ class LLMTaxAnalyzer:
match.match_reason += " (Tax analysis error)"
enhanced_matches.append(match)
logger.info(
f"Completed batch tax analysis, enhanced {len(enhanced_matches)} matches"
)
return enhanced_matches
def _process_matches_individually(self, matches: list, user_location: str) -> list:
"""
Fallback method: Process matches one at a time using the legacy method.
Only used when batch processing fails and batch size is small.
"""
logger.info(f"Processing {len(matches)} matches individually as fallback")
enhanced_matches = []
for i, match in enumerate(matches):
try:
# Use the legacy single-match analysis method
tax_analysis = self.analyze_and_apply_tax_rules(
match.receipt, match.transaction, user_location
)
# Apply the analysis to the match
enhanced_match = self._apply_tax_analysis_to_match(match, tax_analysis)
enhanced_matches.append(enhanced_match)
logger.info(
f"Successfully processed match {i + 1}/{len(matches)} individually"
)
except Exception as e:
logger.error(f"Error in individual processing for match {i}: {str(e)}")
match.match_reason += " (Individual tax analysis failed)"
enhanced_matches.append(match)
return enhanced_matches
def analyze_and_apply_tax_rules(
@@ -757,10 +825,24 @@ Return your response as a SINGLE JSON object in this format:
max_tokens=8000, # Higher limit for batch processing
)
content = response.choices[0].message.content.strip()
content = response.choices[0].message.content
# Validate that we got content
if not content:
logger.error("LLM returned empty response")
return {}
content = content.strip()
# Check if content is empty after stripping
if not content:
logger.error("LLM returned whitespace-only response")
return {}
logger.info(
f"LLM batch tax analysis received: {len(content)} characters for {num_matches} matches"
)
logger.debug(f"Raw LLM response: {content[:500]}...") # Log first 500 chars
# Parse the JSON response
json_str = content
@@ -769,11 +851,33 @@ Return your response as a SINGLE JSON object in this format:
elif "```" in content:
json_str = content.split("```")[1].split("```")[0].strip()
# Validate JSON string is not empty
if not json_str:
logger.error("Extracted JSON string is empty")
logger.error(f"Original content was: {content}")
return {}
batch_analysis = json.loads(json_str)
# Validate we got a dictionary back
if not isinstance(batch_analysis, dict):
logger.error(f"LLM returned non-dict type: {type(batch_analysis)}")
return {}
logger.info(
f"Successfully parsed batch analysis with {len(batch_analysis)} matches"
)
return batch_analysis
except json.JSONDecodeError as e:
logger.error(f"JSON decode error in batch LLM tax analysis: {str(e)}")
logger.error(
f"Failed to parse: {json_str[:500] if 'json_str' in locals() else 'N/A'}"
)
return {}
except Exception as e:
logger.error(f"Error getting batch LLM tax analysis: {str(e)}")
logger.error(f"Exception type: {type(e).__name__}")
# Return empty dict so each match can handle fallback individually
return {}