Add AI rules support for document processing and matching; enhance tax analysis with flag_for_review and auto_approve fields

This commit is contained in:
bolade
2025-10-08 00:12:09 +01:00
parent f582110674
commit 2e020437a8
5 changed files with 394 additions and 49 deletions
+36 -3
View File
@@ -378,8 +378,11 @@ async def process_document(
This endpoint uses AI to extract structured data from receipt images, This endpoint uses AI to extract structured data from receipt images,
including vendor, amount, date, and category information. including vendor, amount, date, and category information.
Optionally accepts user_location to guide tax calculations and depreciation Optionally accepts:
based on the user's location (format: "State/Province, Country" e.g., "Ontario, Canada"). - user_location: Guide tax calculations and depreciation based on location
(format: "State/Province, Country" e.g., "Ontario, Canada")
- ai_rules: Custom categorization rules to override default logic
(e.g., [{"condition": "vendor is Starbucks", "action": "Food"}])
""" """
try: try:
# Get file info from database # Get file info from database
@@ -387,11 +390,20 @@ async def process_document(
if not db_uploaded_file: if not db_uploaded_file:
raise HTTPException(status_code=404, detail=f"File {file_id} not found") raise HTTPException(status_code=404, detail=f"File {file_id} not found")
# Convert ai_rules from Pydantic models to dictionaries if provided
ai_rules_list = None
if request.ai_rules:
ai_rules_list = [
{"condition": rule.condition, "action": rule.action}
for rule in request.ai_rules
]
# Process the file using the stored file path # Process the file using the stored file path
receipt_data = await document_processor.process_file( receipt_data = await document_processor.process_file(
db_uploaded_file.file_path, db_uploaded_file.file_path,
db_uploaded_file.file_type, db_uploaded_file.file_type,
user_location=request.user_location, user_location=request.user_location,
ai_rules=ai_rules_list,
) )
# Parse date for database storage # Parse date for database storage
@@ -570,9 +582,21 @@ async def match_specific_receipts(request: MatchSpecificRequest, db: db_dependen
else: else:
logger.info(f"Using default/provided user_location: {user_location}") logger.info(f"Using default/provided user_location: {user_location}")
# Convert ai_rules from Pydantic models to dictionaries if provided
ai_rules_list = None
if request.ai_rules:
ai_rules_list = [
{"condition": rule.condition, "action": rule.action}
for rule in request.ai_rules
]
logger.info(f"Applying {len(ai_rules_list)} custom AI rules to matching")
try: try:
matching_results = matching_engine.process_matching( matching_results = matching_engine.process_matching(
receipts, transactions, user_location=user_location receipts,
transactions,
user_location=user_location,
ai_rules=ai_rules_list,
) )
logger.info(f"Matching completed, got {len(matching_results)} results") logger.info(f"Matching completed, got {len(matching_results)} results")
@@ -584,6 +608,13 @@ async def match_specific_receipts(request: MatchSpecificRequest, db: db_dependen
# if result.tax_analysis and "final_tax_amount" in result.tax_analysis: # if result.tax_analysis and "final_tax_amount" in result.tax_analysis:
# final_tax = result.tax_analysis["final_tax_amount"] # final_tax = result.tax_analysis["final_tax_amount"]
# Extract flag_for_review and auto_approve from tax_analysis if available
flag_for_review = None
auto_approve = None
if result.tax_analysis:
flag_for_review = result.tax_analysis.get("flag_for_review")
auto_approve = result.tax_analysis.get("auto_approve")
match_response = MatchResponse( match_response = MatchResponse(
receipt_id=result.receipt.id, receipt_id=result.receipt.id,
transaction_id=result.transaction.id transaction_id=result.transaction.id
@@ -603,6 +634,8 @@ async def match_specific_receipts(request: MatchSpecificRequest, db: db_dependen
if result.transaction if result.transaction
else 0.0, else 0.0,
tax_analysis=result.tax_analysis, tax_analysis=result.tax_analysis,
flag_for_review=flag_for_review,
auto_approve=auto_approve,
) )
match_responses.append(match_response) match_responses.append(match_response)
+9
View File
@@ -132,6 +132,8 @@ class MatchResponse(BaseModel):
transaction_vendor: str transaction_vendor: str
transaction_amount: float transaction_amount: float
tax_analysis: Optional[dict] = None tax_analysis: Optional[dict] = None
flag_for_review: Optional[bool] = None
auto_approve: Optional[bool] = None
class MatchingResponse(BaseModel): class MatchingResponse(BaseModel):
@@ -160,11 +162,17 @@ class DocumentUploadResponse(BaseModel):
status: str status: str
class AIRules(BaseModel):
condition: str
action: str
class DocumentProcessRequest(BaseModel): class DocumentProcessRequest(BaseModel):
file_id: Optional[str] = None file_id: Optional[str] = None
user_location: Optional[str] = ( user_location: Optional[str] = (
None # Format: "State/Province, Country" (e.g., "Ontario, Canada") None # Format: "State/Province, Country" (e.g., "Ontario, Canada")
) )
ai_rules: Optional[List[AIRules]] = None
class DocumentProcessResponse(BaseModel): class DocumentProcessResponse(BaseModel):
@@ -292,3 +300,4 @@ class MatchSpecificRequest(BaseModel):
categorization_id: str categorization_id: str
user_location: Optional[str] = "Canada" # Kept for backward compatibility user_location: Optional[str] = "Canada" # Kept for backward compatibility
user_tax_info: Optional[UserTaxInfo] = None user_tax_info: Optional[UserTaxInfo] = None
ai_rules: Optional[List[AIRules]] = None
+273
View File
@@ -0,0 +1,273 @@
import json
import logging
from typing import Dict, List, Optional
import groq
from config import settings
from schemas import Match
logger = logging.getLogger(__name__)
class AIRulesMatcher:
"""
AI-powered rules engine for post-matching evaluation.
Uses LLM to intelligently apply custom rules and determine if matches should be:
- Flagged for manual review (flag_for_review=True)
- Auto-approved (auto_approve=True)
"""
def __init__(self):
self.client = groq.Groq(api_key=settings.GROQ_API_KEY)
self.model = "llama-3.1-8b-instant"
def apply_rules_to_matches(
self, matches: List[Match], ai_rules: Optional[List[Dict]] = None
) -> List[Match]:
"""
Apply AI rules to all matches and add flag_for_review and auto_approve fields.
Args:
matches: List of Match objects from the matching engine
ai_rules: Optional list of custom rules (format: [{"condition": str, "action": str}])
Returns:
Enhanced matches with tax_analysis containing flag_for_review and auto_approve
"""
if not matches:
return matches
logger.info(
f"Applying AI rules to {len(matches)} matches with {len(ai_rules) if ai_rules else 0} custom rules"
)
# Built-in rule: currency mismatch should always flag for review
builtin_rules = [
{
"condition": "receipt currency differs from transaction currency",
"action": "flag_for_review",
}
]
# Combine built-in rules with user-provided rules
all_rules = builtin_rules + (ai_rules if ai_rules else [])
# Process each match
for match in matches:
try:
rule_evaluation = self._evaluate_rules_for_match(match, all_rules)
# Initialize or update tax_analysis
if match.tax_analysis is None:
match.tax_analysis = {}
# Add rule evaluation results
match.tax_analysis["flag_for_review"] = rule_evaluation[
"flag_for_review"
]
match.tax_analysis["auto_approve"] = rule_evaluation["auto_approve"]
match.tax_analysis["rules_applied"] = rule_evaluation["rules_applied"]
match.tax_analysis["rule_reasons"] = rule_evaluation["reasons"]
# Update match reason with rule information
if rule_evaluation["flag_for_review"]:
match.match_reason += " | 🚩 FLAGGED FOR REVIEW"
if rule_evaluation["auto_approve"]:
match.match_reason += " | ✅ AUTO-APPROVED"
logger.info(
f"Match {match.receipt.id}{match.transaction.id}: "
f"flag_for_review={rule_evaluation['flag_for_review']}, "
f"auto_approve={rule_evaluation['auto_approve']}"
)
except Exception as e:
logger.error(f"Error applying rules to match: {str(e)}")
# Fail safe: flag for review if rule processing fails
if match.tax_analysis is None:
match.tax_analysis = {}
match.tax_analysis["flag_for_review"] = True
match.tax_analysis["auto_approve"] = False
match.tax_analysis["rule_reasons"] = [
f"Rule evaluation error: {str(e)}"
]
return matches
def _evaluate_rules_for_match(
self, match: Match, rules: List[Dict]
) -> Dict[str, any]:
"""
Use LLM to evaluate all rules for a single match.
Returns:
{
"flag_for_review": bool,
"auto_approve": bool,
"rules_applied": List[str],
"reasons": List[str]
}
"""
# Build context about the match
match_context = self._build_match_context(match)
# Build rules context
rules_context = self._build_rules_context(rules)
# Create prompt for LLM
prompt = f"""You are a financial matching rules engine. Analyze the following receipt-to-transaction match and apply the specified rules.
MATCH DETAILS:
{match_context}
RULES TO APPLY:
{rules_context}
INSTRUCTIONS:
1. Evaluate each rule's condition against the match details
2. If a rule's condition is TRUE, apply the action:
- If action is "flag_for_review" or "review" → set flag_for_review = true
- If action is "auto_approve" or "approve" → set auto_approve = true
- For other actions, determine if they imply review or approval
3. If BOTH flag_for_review and auto_approve are triggered, flag_for_review takes priority
4. If NO rules match, set both to false (default behavior)
IMPORTANT BUILT-IN RULE:
- If receipt currency differs from transaction currency → ALWAYS set flag_for_review = true
Return ONLY a valid JSON object with this exact format:
{{
"flag_for_review": boolean,
"auto_approve": boolean,
"rules_applied": ["list of rule conditions that matched"],
"reasons": ["list of reasons for the decisions"]
}}
"""
try:
# Call LLM
response = self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": "You are a financial rules evaluation assistant. You analyze transaction matches and apply business rules. Always respond with valid JSON only.",
},
{"role": "user", "content": prompt},
],
temperature=0.1,
max_tokens=500,
)
result_text = response.choices[0].message.content.strip()
# Parse JSON response
result = self._parse_llm_response(result_text)
# Validate and enforce constraints
if result["flag_for_review"] and result["auto_approve"]:
logger.warning(
"Both flag_for_review and auto_approve were true, prioritizing flag_for_review"
)
result["auto_approve"] = False
result["reasons"].append(
"Conflicting rules: prioritized manual review over auto-approval"
)
return result
except Exception as e:
logger.error(f"LLM evaluation failed: {str(e)}")
# Fail safe: flag for review
return {
"flag_for_review": True,
"auto_approve": False,
"rules_applied": [],
"reasons": [f"Error evaluating rules: {str(e)}"],
}
def _build_match_context(self, match: Match) -> str:
"""Build a text description of the match for the LLM"""
receipt = match.receipt
transaction = match.transaction
context = f"""Receipt Information:
- ID: {receipt.id}
- Vendor: {receipt.vendor}
- Amount: ${receipt.amount:.2f}
- Tax: ${receipt.tax:.2f}
- Category: {receipt.category}
- Description: {receipt.description}
- Date: {receipt.receipt_date}
- Currency: {receipt.currency}
Transaction Information:
- ID: {transaction.id}
- Vendor: {transaction.vendor}
- Amount: ${transaction.amount:.2f}
- Date: {transaction.transaction_date}
- Notes: {transaction.notes}
- Currency: {transaction.currency}
Match Quality:
- Confidence Score: {match.confidence_score:.2%}
- Match Reason: {match.match_reason}
"""
# Add tax analysis if available
if match.tax_analysis:
context += f"\nTax Analysis:\n{json.dumps(match.tax_analysis, indent=2)}"
return context
def _build_rules_context(self, rules: List[Dict]) -> str:
"""Build a formatted list of rules for the LLM"""
if not rules:
return "No custom rules provided. Apply default evaluation."
rules_text = ""
for idx, rule in enumerate(rules, 1):
condition = rule.get("condition", "")
action = rule.get("action", "")
rules_text += f"{idx}. IF {condition} → THEN {action}\n"
return rules_text
def _parse_llm_response(self, response_text: str) -> Dict:
"""Parse and validate LLM JSON response"""
try:
# Remove markdown code blocks if present
if "```json" in response_text:
response_text = response_text.split("```json")[1].split("```")[0]
elif "```" in response_text:
response_text = response_text.split("```")[1].split("```")[0]
# Parse JSON
result = json.loads(response_text.strip())
# Validate required fields
if "flag_for_review" not in result:
result["flag_for_review"] = False
if "auto_approve" not in result:
result["auto_approve"] = False
if "rules_applied" not in result:
result["rules_applied"] = []
if "reasons" not in result:
result["reasons"] = []
# Ensure boolean types
result["flag_for_review"] = bool(result["flag_for_review"])
result["auto_approve"] = bool(result["auto_approve"])
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse LLM response as JSON: {str(e)}")
logger.error(f"Response text: {response_text}")
# Return safe defaults
return {
"flag_for_review": True, # Fail safe to manual review
"auto_approve": False,
"rules_applied": [],
"reasons": ["Failed to parse LLM response"],
}
+37 -9
View File
@@ -18,7 +18,11 @@ class DocumentProcessor:
self.model = "meta-llama/llama-4-scout-17b-16e-instruct" # Vision model self.model = "meta-llama/llama-4-scout-17b-16e-instruct" # Vision model
async def process_file( async def process_file(
self, file_path: str, file_type: str, user_location: str = None self,
file_path: str,
file_type: str,
user_location: str = None,
ai_rules: list = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Process uploaded file and extract receipt data """Process uploaded file and extract receipt data
@@ -26,25 +30,27 @@ class DocumentProcessor:
file_path: Path to the file to process file_path: Path to the file to process
file_type: Type of file (jpg, pdf, etc.) file_type: Type of file (jpg, pdf, etc.)
user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada") user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada")
ai_rules: List of AI rules for categorization (e.g., [{"condition": "vendor is Starbucks", "action": "Food"}])
""" """
try: try:
if file_type.lower() in ["jpg", "jpeg", "png", "gif", "bmp"]: if file_type.lower() in ["jpg", "jpeg", "png", "gif", "bmp"]:
return await self._process_image(file_path, user_location) return await self._process_image(file_path, user_location, ai_rules)
elif file_type.lower() == "pdf": elif file_type.lower() == "pdf":
return await self._process_pdf(file_path, user_location) return await self._process_pdf(file_path, user_location, ai_rules)
else: else:
raise ValueError(f"Unsupported file type: {file_type}") raise ValueError(f"Unsupported file type: {file_type}")
except Exception as e: except Exception as e:
return {"error": str(e)} return {"error": str(e)}
async def _process_image( async def _process_image(
self, image_path: str, user_location: str = None self, image_path: str, user_location: str = None, ai_rules: list = None
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Extract data from image using Groq vision """Extract data from image using Groq vision
Args: Args:
image_path: Path to the image file image_path: Path to the image file
user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada") user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada")
ai_rules: List of AI rules for categorization
""" """
try: try:
# Encode image to base64 # Encode image to base64
@@ -62,6 +68,16 @@ class DocumentProcessor:
- Apply depreciation rules based on the user's location. - Apply depreciation rules based on the user's location.
""" """
# Build AI rules context for categorization
ai_rules_context = ""
if ai_rules and len(ai_rules) > 0:
ai_rules_context = "\n CATEGORIZATION RULES (IMPORTANT - Apply these first):"
for idx, rule in enumerate(ai_rules, 1):
condition = rule.get("condition", "")
action = rule.get("action", "")
ai_rules_context += f"\n {idx}. If {condition} → set category to '{action}'"
ai_rules_context += "\n - Apply these custom rules before using default categorization logic\n - If multiple rules match, use the first matching rule\n - If no rules match, use default categorization based on vendor type"
# Create Groq vision prompt # Create Groq vision prompt
prompt = f""" prompt = f"""
Analyze this receipt image and extract the following information in JSON format: Analyze this receipt image and extract the following information in JSON format:
@@ -89,9 +105,9 @@ class DocumentProcessor:
- Total amount should be the final total including tax - Total amount should be the final total including tax
- Tax amount is separate tax line if available (if not clearly shown, calculate based on location) - Tax amount is separate tax line if available (if not clearly shown, calculate based on location)
- Date should be the date on the receipt - Date should be the date on the receipt
- Categorize based on vendor type (Starbucks=Food, Shell=Transport, etc.)
- Confidence score 0-1 based on how clear the receipt is - Confidence score 0-1 based on how clear the receipt is
- Currency should be the currency used on the receipt (e.g., "USD", "EUR", "CAD") - Currency should be the currency used on the receipt (e.g., "USD", "EUR", "CAD")
{ai_rules_context}
{user_location_context} {user_location_context}
LOCATION & TAX RULES: LOCATION & TAX RULES:
- Extract location from receipt (look for store address, province/state, country) - Extract location from receipt (look for store address, province/state, country)
@@ -166,18 +182,19 @@ class DocumentProcessor:
return base64.b64encode(image_file.read()).decode("utf-8") return base64.b64encode(image_file.read()).decode("utf-8")
async def _process_pdf( async def _process_pdf(
self, pdf_path: str, user_location: str = None self, pdf_path: str, user_location: str = None, ai_rules: list = None
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Extract data from PDF by converting to image first """Extract data from PDF by converting to image first
Args: Args:
pdf_path: Path to the PDF file pdf_path: Path to the PDF file
user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada") user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada")
ai_rules: List of AI rules for categorization
""" """
try: try:
# For now, extract text from PDF and process as text # For now, extract text from PDF and process as text
text_content = self._extract_text_from_pdf(pdf_path) text_content = self._extract_text_from_pdf(pdf_path)
return self._process_text_content(text_content, user_location) return self._process_text_content(text_content, user_location, ai_rules)
except Exception as e: except Exception as e:
return {"error": f"PDF processing error: {str(e)}"} return {"error": f"PDF processing error: {str(e)}"}
@@ -195,13 +212,14 @@ class DocumentProcessor:
return "" return ""
def _process_text_content( def _process_text_content(
self, text_content: str, user_location: str = None self, text_content: str, user_location: str = None, ai_rules: list = None
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Process text content using Groq (fallback for PDFs) """Process text content using Groq (fallback for PDFs)
Args: Args:
text_content: Extracted text from PDF text_content: Extracted text from PDF
user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada") user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada")
ai_rules: List of AI rules for categorization
""" """
try: try:
# Build user location context # Build user location context
@@ -216,6 +234,16 @@ class DocumentProcessor:
- Apply depreciation rules based on the user's location. - Apply depreciation rules based on the user's location.
""" """
# Build AI rules context for categorization
ai_rules_context = ""
if ai_rules and len(ai_rules) > 0:
ai_rules_context = "\n CATEGORIZATION RULES (IMPORTANT - Apply these first):"
for idx, rule in enumerate(ai_rules, 1):
condition = rule.get("condition", "")
action = rule.get("action", "")
ai_rules_context += f"\n {idx}. If {condition} → set category to '{action}'"
ai_rules_context += "\n - Apply these custom rules before using default categorization logic\n - If multiple rules match, use the first matching rule\n - If no rules match, use default categorization based on vendor type"
prompt = f""" prompt = f"""
Analyze this receipt text and extract the following information in JSON format: Analyze this receipt text and extract the following information in JSON format:
@@ -247,9 +275,9 @@ class DocumentProcessor:
- Total amount should be the final total including tax - Total amount should be the final total including tax
- Tax amount is separate tax line if available (if not clearly shown, calculate based on location) - Tax amount is separate tax line if available (if not clearly shown, calculate based on location)
- Date should be the date on the receipt - Date should be the date on the receipt
- Categorize based on vendor type
- Confidence score 0-1 based on clarity - Confidence score 0-1 based on clarity
- Currency should be the currency used on the receipt (e.g., "USD", "EUR", "CAD") - Currency should be the currency used on the receipt (e.g., "USD", "EUR", "CAD")
{ai_rules_context}
{user_location_context} {user_location_context}
LOCATION & TAX RULES: LOCATION & TAX RULES:
- Extract location from receipt (look for store address, province/state, country) - Extract location from receipt (look for store address, province/state, country)
+39 -37
View File
@@ -1,8 +1,9 @@
from typing import Any, Dict, List from typing import Any, Dict, List, Optional
from schemas import Match, Receipt, Transaction from schemas import Match, Receipt, Transaction
from services.ai_matcher import AIMatcher from services.ai_matcher import AIMatcher
from services.ai_rules import AIRulesEngine from services.ai_rules import AIRulesEngine
from services.ai_rules_matcher import AIRulesMatcher
from services.feedback_logger import FeedbackLogger from services.feedback_logger import FeedbackLogger
from services.llm_tax_analyzer import LLMTaxAnalyzer from services.llm_tax_analyzer import LLMTaxAnalyzer
from services.manual_tax_calculator import ManualTaxCalculator from services.manual_tax_calculator import ManualTaxCalculator
@@ -15,6 +16,7 @@ class MatchingEngine:
self.feedback_logger = FeedbackLogger() self.feedback_logger = FeedbackLogger()
self.llm_tax_analyzer = LLMTaxAnalyzer() self.llm_tax_analyzer = LLMTaxAnalyzer()
self.manual_tax_calculator = ManualTaxCalculator() self.manual_tax_calculator = ManualTaxCalculator()
self.ai_rules_matcher = AIRulesMatcher()
self.use_manual_tax_calculator = use_manual_tax_calculator self.use_manual_tax_calculator = use_manual_tax_calculator
def process_matching( def process_matching(
@@ -22,51 +24,51 @@ class MatchingEngine:
receipts: List[Receipt], receipts: List[Receipt],
transactions: List[Transaction], transactions: List[Transaction],
user_location: str = "ON", user_location: str = "ON",
ai_rules: Optional[List[Dict]] = None,
) -> List[Match]: ) -> List[Match]:
# Get AI matches # Get AI matches
ai_matches = self.ai_matcher.match_receipts_to_transactions( ai_matches = self.ai_matcher.match_receipts_to_transactions(
receipts, transactions receipts, transactions
) )
# Apply traditional rules first (lightweight, no API calls) # # Apply traditional rules first (lightweight, no API calls)
for match in ai_matches: # for match in ai_matches:
rule_results = self.rules_engine.apply_rules( # rule_results = self.rules_engine.apply_rules(
match.receipt, match.transaction # match.receipt, match.transaction
) # )
# Apply confidence boost from traditional rules # # Apply confidence boost from traditional rules
if rule_results["confidence_boost"] > 0: # if rule_results["confidence_boost"] > 0:
match.confidence_score = min( # match.confidence_score = min(
1.0, match.confidence_score + rule_results["confidence_boost"] # 1.0, match.confidence_score + rule_results["confidence_boost"]
)
# Auto-approve if rules say so
if rule_results["auto_approve"]:
match.confidence_score = 1.0
match.match_reason += " (Auto-approved by rules)"
# Apply tax analysis - use manual calculator or LLM based on configuration
if self.use_manual_tax_calculator:
# Use deterministic rule-based calculator
enhanced_matches = self._apply_manual_tax_analysis(
ai_matches, user_location
)
# else:
# # Use LLM-based tax analysis in a SINGLE batch call
# try:
# enhanced_matches = (
# self.llm_tax_analyzer.analyze_and_apply_tax_rules_batch(
# ai_matches, user_location
# )
# ) # )
# except Exception as e:
# # If batch LLM analysis fails, log it and continue with matches as-is
# import logging
# logging.error(f"Batch LLM tax analysis failed: {str(e)}") # # Auto-approve if rules say so
# for match in ai_matches: # if rule_results["auto_approve"]:
# match.match_reason += " (Note: Advanced tax analysis unavailable)" # match.confidence_score = 1.0
# enhanced_matches = ai_matches # match.match_reason += " (Auto-approved by rules)"
# # Apply tax analysis - use manual calculator or LLM based on configuration
# if self.use_manual_tax_calculator:
# # Use deterministic rule-based calculator
# enhanced_matches = self._apply_manual_tax_analysis(
# ai_matches, user_location
# )
# else:
# # No tax analysis, just use the matches as-is
# enhanced_matches = ai_matches
# Apply AI rules for post-matching evaluation
# This adds flag_for_review and auto_approve fields based on custom rules
if ai_rules:
enhanced_matches = self.ai_rules_matcher.apply_rules_to_matches(
ai_matches, ai_rules
)
else:
# Even without custom rules, apply built-in rules (e.g., currency mismatch)
enhanced_matches = self.ai_rules_matcher.apply_rules_to_matches(
ai_matches, None
)
return enhanced_matches return enhanced_matches