diff --git a/app/main.py b/app/main.py index 92d2199..eb9cbdc 100644 --- a/app/main.py +++ b/app/main.py @@ -378,8 +378,11 @@ async def process_document( This endpoint uses AI to extract structured data from receipt images, including vendor, amount, date, and category information. - Optionally accepts user_location to guide tax calculations and depreciation - based on the user's location (format: "State/Province, Country" e.g., "Ontario, Canada"). + Optionally accepts: + - user_location: Guide tax calculations and depreciation based on location + (format: "State/Province, Country" e.g., "Ontario, Canada") + - ai_rules: Custom categorization rules to override default logic + (e.g., [{"condition": "vendor is Starbucks", "action": "Food"}]) """ try: # Get file info from database @@ -387,11 +390,20 @@ async def process_document( if not db_uploaded_file: raise HTTPException(status_code=404, detail=f"File {file_id} not found") + # Convert ai_rules from Pydantic models to dictionaries if provided + ai_rules_list = None + if request.ai_rules: + ai_rules_list = [ + {"condition": rule.condition, "action": rule.action} + for rule in request.ai_rules + ] + # Process the file using the stored file path receipt_data = await document_processor.process_file( db_uploaded_file.file_path, db_uploaded_file.file_type, user_location=request.user_location, + ai_rules=ai_rules_list, ) # Parse date for database storage @@ -570,9 +582,21 @@ async def match_specific_receipts(request: MatchSpecificRequest, db: db_dependen else: logger.info(f"Using default/provided user_location: {user_location}") + # Convert ai_rules from Pydantic models to dictionaries if provided + ai_rules_list = None + if request.ai_rules: + ai_rules_list = [ + {"condition": rule.condition, "action": rule.action} + for rule in request.ai_rules + ] + logger.info(f"Applying {len(ai_rules_list)} custom AI rules to matching") + try: matching_results = matching_engine.process_matching( - receipts, transactions, user_location=user_location + receipts, + transactions, + user_location=user_location, + ai_rules=ai_rules_list, ) logger.info(f"Matching completed, got {len(matching_results)} results") @@ -584,6 +608,13 @@ async def match_specific_receipts(request: MatchSpecificRequest, db: db_dependen # if result.tax_analysis and "final_tax_amount" in result.tax_analysis: # final_tax = result.tax_analysis["final_tax_amount"] + # Extract flag_for_review and auto_approve from tax_analysis if available + flag_for_review = None + auto_approve = None + if result.tax_analysis: + flag_for_review = result.tax_analysis.get("flag_for_review") + auto_approve = result.tax_analysis.get("auto_approve") + match_response = MatchResponse( receipt_id=result.receipt.id, transaction_id=result.transaction.id @@ -603,6 +634,8 @@ async def match_specific_receipts(request: MatchSpecificRequest, db: db_dependen if result.transaction else 0.0, tax_analysis=result.tax_analysis, + flag_for_review=flag_for_review, + auto_approve=auto_approve, ) match_responses.append(match_response) diff --git a/app/schemas.py b/app/schemas.py index 43003e8..488ef4b 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -132,6 +132,8 @@ class MatchResponse(BaseModel): transaction_vendor: str transaction_amount: float tax_analysis: Optional[dict] = None + flag_for_review: Optional[bool] = None + auto_approve: Optional[bool] = None class MatchingResponse(BaseModel): @@ -160,11 +162,17 @@ class DocumentUploadResponse(BaseModel): status: str +class AIRules(BaseModel): + condition: str + action: str + + class DocumentProcessRequest(BaseModel): file_id: Optional[str] = None user_location: Optional[str] = ( None # Format: "State/Province, Country" (e.g., "Ontario, Canada") ) + ai_rules: Optional[List[AIRules]] = None class DocumentProcessResponse(BaseModel): @@ -292,3 +300,4 @@ class MatchSpecificRequest(BaseModel): categorization_id: str user_location: Optional[str] = "Canada" # Kept for backward compatibility user_tax_info: Optional[UserTaxInfo] = None + ai_rules: Optional[List[AIRules]] = None diff --git a/app/services/ai_rules_matcher.py b/app/services/ai_rules_matcher.py new file mode 100644 index 0000000..bc51eb5 --- /dev/null +++ b/app/services/ai_rules_matcher.py @@ -0,0 +1,273 @@ +import json +import logging +from typing import Dict, List, Optional + +import groq +from config import settings +from schemas import Match + +logger = logging.getLogger(__name__) + + +class AIRulesMatcher: + """ + AI-powered rules engine for post-matching evaluation. + Uses LLM to intelligently apply custom rules and determine if matches should be: + - Flagged for manual review (flag_for_review=True) + - Auto-approved (auto_approve=True) + """ + + def __init__(self): + self.client = groq.Groq(api_key=settings.GROQ_API_KEY) + self.model = "llama-3.1-8b-instant" + + def apply_rules_to_matches( + self, matches: List[Match], ai_rules: Optional[List[Dict]] = None + ) -> List[Match]: + """ + Apply AI rules to all matches and add flag_for_review and auto_approve fields. + + Args: + matches: List of Match objects from the matching engine + ai_rules: Optional list of custom rules (format: [{"condition": str, "action": str}]) + + Returns: + Enhanced matches with tax_analysis containing flag_for_review and auto_approve + """ + if not matches: + return matches + + logger.info( + f"Applying AI rules to {len(matches)} matches with {len(ai_rules) if ai_rules else 0} custom rules" + ) + + # Built-in rule: currency mismatch should always flag for review + builtin_rules = [ + { + "condition": "receipt currency differs from transaction currency", + "action": "flag_for_review", + } + ] + + # Combine built-in rules with user-provided rules + all_rules = builtin_rules + (ai_rules if ai_rules else []) + + # Process each match + for match in matches: + try: + rule_evaluation = self._evaluate_rules_for_match(match, all_rules) + + # Initialize or update tax_analysis + if match.tax_analysis is None: + match.tax_analysis = {} + + # Add rule evaluation results + match.tax_analysis["flag_for_review"] = rule_evaluation[ + "flag_for_review" + ] + match.tax_analysis["auto_approve"] = rule_evaluation["auto_approve"] + match.tax_analysis["rules_applied"] = rule_evaluation["rules_applied"] + match.tax_analysis["rule_reasons"] = rule_evaluation["reasons"] + + # Update match reason with rule information + if rule_evaluation["flag_for_review"]: + match.match_reason += " | 🚩 FLAGGED FOR REVIEW" + if rule_evaluation["auto_approve"]: + match.match_reason += " | ✅ AUTO-APPROVED" + + logger.info( + f"Match {match.receipt.id} → {match.transaction.id}: " + f"flag_for_review={rule_evaluation['flag_for_review']}, " + f"auto_approve={rule_evaluation['auto_approve']}" + ) + + except Exception as e: + logger.error(f"Error applying rules to match: {str(e)}") + # Fail safe: flag for review if rule processing fails + if match.tax_analysis is None: + match.tax_analysis = {} + match.tax_analysis["flag_for_review"] = True + match.tax_analysis["auto_approve"] = False + match.tax_analysis["rule_reasons"] = [ + f"Rule evaluation error: {str(e)}" + ] + + return matches + + def _evaluate_rules_for_match( + self, match: Match, rules: List[Dict] + ) -> Dict[str, any]: + """ + Use LLM to evaluate all rules for a single match. + + Returns: + { + "flag_for_review": bool, + "auto_approve": bool, + "rules_applied": List[str], + "reasons": List[str] + } + """ + # Build context about the match + match_context = self._build_match_context(match) + + # Build rules context + rules_context = self._build_rules_context(rules) + + # Create prompt for LLM + prompt = f"""You are a financial matching rules engine. Analyze the following receipt-to-transaction match and apply the specified rules. + +MATCH DETAILS: +{match_context} + +RULES TO APPLY: +{rules_context} + +INSTRUCTIONS: +1. Evaluate each rule's condition against the match details +2. If a rule's condition is TRUE, apply the action: + - If action is "flag_for_review" or "review" → set flag_for_review = true + - If action is "auto_approve" or "approve" → set auto_approve = true + - For other actions, determine if they imply review or approval +3. If BOTH flag_for_review and auto_approve are triggered, flag_for_review takes priority +4. If NO rules match, set both to false (default behavior) + +IMPORTANT BUILT-IN RULE: +- If receipt currency differs from transaction currency → ALWAYS set flag_for_review = true + +Return ONLY a valid JSON object with this exact format: +{{ + "flag_for_review": boolean, + "auto_approve": boolean, + "rules_applied": ["list of rule conditions that matched"], + "reasons": ["list of reasons for the decisions"] +}} +""" + + try: + # Call LLM + response = self.client.chat.completions.create( + model=self.model, + messages=[ + { + "role": "system", + "content": "You are a financial rules evaluation assistant. You analyze transaction matches and apply business rules. Always respond with valid JSON only.", + }, + {"role": "user", "content": prompt}, + ], + temperature=0.1, + max_tokens=500, + ) + + result_text = response.choices[0].message.content.strip() + + # Parse JSON response + result = self._parse_llm_response(result_text) + + # Validate and enforce constraints + if result["flag_for_review"] and result["auto_approve"]: + logger.warning( + "Both flag_for_review and auto_approve were true, prioritizing flag_for_review" + ) + result["auto_approve"] = False + result["reasons"].append( + "Conflicting rules: prioritized manual review over auto-approval" + ) + + return result + + except Exception as e: + logger.error(f"LLM evaluation failed: {str(e)}") + # Fail safe: flag for review + return { + "flag_for_review": True, + "auto_approve": False, + "rules_applied": [], + "reasons": [f"Error evaluating rules: {str(e)}"], + } + + def _build_match_context(self, match: Match) -> str: + """Build a text description of the match for the LLM""" + receipt = match.receipt + transaction = match.transaction + + context = f"""Receipt Information: +- ID: {receipt.id} +- Vendor: {receipt.vendor} +- Amount: ${receipt.amount:.2f} +- Tax: ${receipt.tax:.2f} +- Category: {receipt.category} +- Description: {receipt.description} +- Date: {receipt.receipt_date} +- Currency: {receipt.currency} + +Transaction Information: +- ID: {transaction.id} +- Vendor: {transaction.vendor} +- Amount: ${transaction.amount:.2f} +- Date: {transaction.transaction_date} +- Notes: {transaction.notes} +- Currency: {transaction.currency} + +Match Quality: +- Confidence Score: {match.confidence_score:.2%} +- Match Reason: {match.match_reason} +""" + + # Add tax analysis if available + if match.tax_analysis: + context += f"\nTax Analysis:\n{json.dumps(match.tax_analysis, indent=2)}" + + return context + + def _build_rules_context(self, rules: List[Dict]) -> str: + """Build a formatted list of rules for the LLM""" + if not rules: + return "No custom rules provided. Apply default evaluation." + + rules_text = "" + for idx, rule in enumerate(rules, 1): + condition = rule.get("condition", "") + action = rule.get("action", "") + rules_text += f"{idx}. IF {condition} → THEN {action}\n" + + return rules_text + + def _parse_llm_response(self, response_text: str) -> Dict: + """Parse and validate LLM JSON response""" + try: + # Remove markdown code blocks if present + if "```json" in response_text: + response_text = response_text.split("```json")[1].split("```")[0] + elif "```" in response_text: + response_text = response_text.split("```")[1].split("```")[0] + + # Parse JSON + result = json.loads(response_text.strip()) + + # Validate required fields + if "flag_for_review" not in result: + result["flag_for_review"] = False + if "auto_approve" not in result: + result["auto_approve"] = False + if "rules_applied" not in result: + result["rules_applied"] = [] + if "reasons" not in result: + result["reasons"] = [] + + # Ensure boolean types + result["flag_for_review"] = bool(result["flag_for_review"]) + result["auto_approve"] = bool(result["auto_approve"]) + + return result + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse LLM response as JSON: {str(e)}") + logger.error(f"Response text: {response_text}") + # Return safe defaults + return { + "flag_for_review": True, # Fail safe to manual review + "auto_approve": False, + "rules_applied": [], + "reasons": ["Failed to parse LLM response"], + } diff --git a/app/services/document_processor.py b/app/services/document_processor.py index ac16e03..ffde9d9 100644 --- a/app/services/document_processor.py +++ b/app/services/document_processor.py @@ -18,7 +18,11 @@ class DocumentProcessor: self.model = "meta-llama/llama-4-scout-17b-16e-instruct" # Vision model async def process_file( - self, file_path: str, file_type: str, user_location: str = None + self, + file_path: str, + file_type: str, + user_location: str = None, + ai_rules: list = None, ) -> Dict[str, Any]: """Process uploaded file and extract receipt data @@ -26,25 +30,27 @@ class DocumentProcessor: file_path: Path to the file to process file_type: Type of file (jpg, pdf, etc.) user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada") + ai_rules: List of AI rules for categorization (e.g., [{"condition": "vendor is Starbucks", "action": "Food"}]) """ try: if file_type.lower() in ["jpg", "jpeg", "png", "gif", "bmp"]: - return await self._process_image(file_path, user_location) + return await self._process_image(file_path, user_location, ai_rules) elif file_type.lower() == "pdf": - return await self._process_pdf(file_path, user_location) + return await self._process_pdf(file_path, user_location, ai_rules) else: raise ValueError(f"Unsupported file type: {file_type}") except Exception as e: return {"error": str(e)} async def _process_image( - self, image_path: str, user_location: str = None + self, image_path: str, user_location: str = None, ai_rules: list = None ) -> Dict[str, Any]: """Extract data from image using Groq vision Args: image_path: Path to the image file user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada") + ai_rules: List of AI rules for categorization """ try: # Encode image to base64 @@ -62,6 +68,16 @@ class DocumentProcessor: - Apply depreciation rules based on the user's location. """ + # Build AI rules context for categorization + ai_rules_context = "" + if ai_rules and len(ai_rules) > 0: + ai_rules_context = "\n CATEGORIZATION RULES (IMPORTANT - Apply these first):" + for idx, rule in enumerate(ai_rules, 1): + condition = rule.get("condition", "") + action = rule.get("action", "") + ai_rules_context += f"\n {idx}. If {condition} → set category to '{action}'" + ai_rules_context += "\n - Apply these custom rules before using default categorization logic\n - If multiple rules match, use the first matching rule\n - If no rules match, use default categorization based on vendor type" + # Create Groq vision prompt prompt = f""" Analyze this receipt image and extract the following information in JSON format: @@ -89,9 +105,9 @@ class DocumentProcessor: - Total amount should be the final total including tax - Tax amount is separate tax line if available (if not clearly shown, calculate based on location) - Date should be the date on the receipt - - Categorize based on vendor type (Starbucks=Food, Shell=Transport, etc.) - Confidence score 0-1 based on how clear the receipt is - Currency should be the currency used on the receipt (e.g., "USD", "EUR", "CAD") + {ai_rules_context} {user_location_context} LOCATION & TAX RULES: - Extract location from receipt (look for store address, province/state, country) @@ -166,18 +182,19 @@ class DocumentProcessor: return base64.b64encode(image_file.read()).decode("utf-8") async def _process_pdf( - self, pdf_path: str, user_location: str = None + self, pdf_path: str, user_location: str = None, ai_rules: list = None ) -> Dict[str, Any]: """Extract data from PDF by converting to image first Args: pdf_path: Path to the PDF file user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada") + ai_rules: List of AI rules for categorization """ try: # For now, extract text from PDF and process as text text_content = self._extract_text_from_pdf(pdf_path) - return self._process_text_content(text_content, user_location) + return self._process_text_content(text_content, user_location, ai_rules) except Exception as e: return {"error": f"PDF processing error: {str(e)}"} @@ -195,13 +212,14 @@ class DocumentProcessor: return "" def _process_text_content( - self, text_content: str, user_location: str = None + self, text_content: str, user_location: str = None, ai_rules: list = None ) -> Dict[str, Any]: """Process text content using Groq (fallback for PDFs) Args: text_content: Extracted text from PDF user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada") + ai_rules: List of AI rules for categorization """ try: # Build user location context @@ -216,6 +234,16 @@ class DocumentProcessor: - Apply depreciation rules based on the user's location. """ + # Build AI rules context for categorization + ai_rules_context = "" + if ai_rules and len(ai_rules) > 0: + ai_rules_context = "\n CATEGORIZATION RULES (IMPORTANT - Apply these first):" + for idx, rule in enumerate(ai_rules, 1): + condition = rule.get("condition", "") + action = rule.get("action", "") + ai_rules_context += f"\n {idx}. If {condition} → set category to '{action}'" + ai_rules_context += "\n - Apply these custom rules before using default categorization logic\n - If multiple rules match, use the first matching rule\n - If no rules match, use default categorization based on vendor type" + prompt = f""" Analyze this receipt text and extract the following information in JSON format: @@ -247,9 +275,9 @@ class DocumentProcessor: - Total amount should be the final total including tax - Tax amount is separate tax line if available (if not clearly shown, calculate based on location) - Date should be the date on the receipt - - Categorize based on vendor type - Confidence score 0-1 based on clarity - Currency should be the currency used on the receipt (e.g., "USD", "EUR", "CAD") + {ai_rules_context} {user_location_context} LOCATION & TAX RULES: - Extract location from receipt (look for store address, province/state, country) diff --git a/app/services/matching_engine.py b/app/services/matching_engine.py index 9f86fe0..3feb2bf 100644 --- a/app/services/matching_engine.py +++ b/app/services/matching_engine.py @@ -1,8 +1,9 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from schemas import Match, Receipt, Transaction from services.ai_matcher import AIMatcher from services.ai_rules import AIRulesEngine +from services.ai_rules_matcher import AIRulesMatcher from services.feedback_logger import FeedbackLogger from services.llm_tax_analyzer import LLMTaxAnalyzer from services.manual_tax_calculator import ManualTaxCalculator @@ -15,6 +16,7 @@ class MatchingEngine: self.feedback_logger = FeedbackLogger() self.llm_tax_analyzer = LLMTaxAnalyzer() self.manual_tax_calculator = ManualTaxCalculator() + self.ai_rules_matcher = AIRulesMatcher() self.use_manual_tax_calculator = use_manual_tax_calculator def process_matching( @@ -22,51 +24,51 @@ class MatchingEngine: receipts: List[Receipt], transactions: List[Transaction], user_location: str = "ON", + ai_rules: Optional[List[Dict]] = None, ) -> List[Match]: # Get AI matches ai_matches = self.ai_matcher.match_receipts_to_transactions( receipts, transactions ) - # Apply traditional rules first (lightweight, no API calls) - for match in ai_matches: - rule_results = self.rules_engine.apply_rules( - match.receipt, match.transaction - ) + # # Apply traditional rules first (lightweight, no API calls) + # for match in ai_matches: + # rule_results = self.rules_engine.apply_rules( + # match.receipt, match.transaction + # ) - # Apply confidence boost from traditional rules - if rule_results["confidence_boost"] > 0: - match.confidence_score = min( - 1.0, match.confidence_score + rule_results["confidence_boost"] - ) - - # Auto-approve if rules say so - if rule_results["auto_approve"]: - match.confidence_score = 1.0 - match.match_reason += " (Auto-approved by rules)" - - # Apply tax analysis - use manual calculator or LLM based on configuration - if self.use_manual_tax_calculator: - # Use deterministic rule-based calculator - enhanced_matches = self._apply_manual_tax_analysis( - ai_matches, user_location - ) - # else: - # # Use LLM-based tax analysis in a SINGLE batch call - # try: - # enhanced_matches = ( - # self.llm_tax_analyzer.analyze_and_apply_tax_rules_batch( - # ai_matches, user_location - # ) + # # Apply confidence boost from traditional rules + # if rule_results["confidence_boost"] > 0: + # match.confidence_score = min( + # 1.0, match.confidence_score + rule_results["confidence_boost"] # ) - # except Exception as e: - # # If batch LLM analysis fails, log it and continue with matches as-is - # import logging - # logging.error(f"Batch LLM tax analysis failed: {str(e)}") - # for match in ai_matches: - # match.match_reason += " (Note: Advanced tax analysis unavailable)" - # enhanced_matches = ai_matches + # # Auto-approve if rules say so + # if rule_results["auto_approve"]: + # match.confidence_score = 1.0 + # match.match_reason += " (Auto-approved by rules)" + + # # Apply tax analysis - use manual calculator or LLM based on configuration + # if self.use_manual_tax_calculator: + # # Use deterministic rule-based calculator + # enhanced_matches = self._apply_manual_tax_analysis( + # ai_matches, user_location + # ) + # else: + # # No tax analysis, just use the matches as-is + # enhanced_matches = ai_matches + + # Apply AI rules for post-matching evaluation + # This adds flag_for_review and auto_approve fields based on custom rules + if ai_rules: + enhanced_matches = self.ai_rules_matcher.apply_rules_to_matches( + ai_matches, ai_rules + ) + else: + # Even without custom rules, apply built-in rules (e.g., currency mismatch) + enhanced_matches = self.ai_rules_matcher.apply_rules_to_matches( + ai_matches, None + ) return enhanced_matches