274 lines
9.6 KiB
Python
274 lines
9.6 KiB
Python
import json
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
|
|
import groq
|
|
from config import settings
|
|
from schemas import Match
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AIRulesMatcher:
|
|
"""
|
|
AI-powered rules engine for post-matching evaluation.
|
|
Uses LLM to intelligently apply custom rules and determine if matches should be:
|
|
- Flagged for manual review (flag_for_review=True)
|
|
- Auto-approved (auto_approve=True)
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.client = groq.Groq(api_key=settings.GROQ_API_KEY)
|
|
self.model = settings.model
|
|
|
|
def apply_rules_to_matches(
|
|
self, matches: List[Match], ai_rules: Optional[List[Dict]] = None
|
|
) -> List[Match]:
|
|
"""
|
|
Apply AI rules to all matches and add flag_for_review and auto_approve fields.
|
|
|
|
Args:
|
|
matches: List of Match objects from the matching engine
|
|
ai_rules: Optional list of custom rules (format: [{"condition": str, "action": str}])
|
|
|
|
Returns:
|
|
Enhanced matches with tax_analysis containing flag_for_review and auto_approve
|
|
"""
|
|
if not matches:
|
|
return matches
|
|
|
|
logger.info(
|
|
f"Applying AI rules to {len(matches)} matches with {len(ai_rules) if ai_rules else 0} custom rules"
|
|
)
|
|
|
|
# Built-in rule: currency mismatch should always flag for review
|
|
builtin_rules = [
|
|
{
|
|
"condition": "receipt currency differs from transaction currency",
|
|
"action": "flag_for_review",
|
|
}
|
|
]
|
|
|
|
# Combine built-in rules with user-provided rules
|
|
all_rules = builtin_rules + (ai_rules if ai_rules else [])
|
|
|
|
# Process each match
|
|
for match in matches:
|
|
try:
|
|
rule_evaluation = self._evaluate_rules_for_match(match, all_rules)
|
|
|
|
# Initialize or update tax_analysis
|
|
if match.tax_analysis is None:
|
|
match.tax_analysis = {}
|
|
|
|
# Add rule evaluation results
|
|
match.tax_analysis["flag_for_review"] = rule_evaluation[
|
|
"flag_for_review"
|
|
]
|
|
match.tax_analysis["auto_approve"] = rule_evaluation["auto_approve"]
|
|
match.tax_analysis["rules_applied"] = rule_evaluation["rules_applied"]
|
|
match.tax_analysis["rule_reasons"] = rule_evaluation["reasons"]
|
|
|
|
# Update match reason with rule information
|
|
if rule_evaluation["flag_for_review"]:
|
|
match.match_reason += " | 🚩 FLAGGED FOR REVIEW"
|
|
if rule_evaluation["auto_approve"]:
|
|
match.match_reason += " | ✅ AUTO-APPROVED"
|
|
|
|
logger.info(
|
|
f"Match {match.receipt.id} → {match.transaction.id}: "
|
|
f"flag_for_review={rule_evaluation['flag_for_review']}, "
|
|
f"auto_approve={rule_evaluation['auto_approve']}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying rules to match: {str(e)}")
|
|
# Fail safe: flag for review if rule processing fails
|
|
if match.tax_analysis is None:
|
|
match.tax_analysis = {}
|
|
match.tax_analysis["flag_for_review"] = True
|
|
match.tax_analysis["auto_approve"] = False
|
|
match.tax_analysis["rule_reasons"] = [
|
|
f"Rule evaluation error: {str(e)}"
|
|
]
|
|
|
|
return matches
|
|
|
|
def _evaluate_rules_for_match(
|
|
self, match: Match, rules: List[Dict]
|
|
) -> Dict[str, any]:
|
|
"""
|
|
Use LLM to evaluate all rules for a single match.
|
|
|
|
Returns:
|
|
{
|
|
"flag_for_review": bool,
|
|
"auto_approve": bool,
|
|
"rules_applied": List[str],
|
|
"reasons": List[str]
|
|
}
|
|
"""
|
|
# Build context about the match
|
|
match_context = self._build_match_context(match)
|
|
|
|
# Build rules context
|
|
rules_context = self._build_rules_context(rules)
|
|
|
|
# Create prompt for LLM
|
|
prompt = f"""You are a financial matching rules engine. Analyze the following receipt-to-transaction match and apply the specified rules.
|
|
|
|
MATCH DETAILS:
|
|
{match_context}
|
|
|
|
RULES TO APPLY:
|
|
{rules_context}
|
|
|
|
INSTRUCTIONS:
|
|
1. Evaluate each rule's condition against the match details
|
|
2. If a rule's condition is TRUE, apply the action:
|
|
- If action is "flag_for_review" or "review" → set flag_for_review = true
|
|
- If action is "auto_approve" or "approve" → set auto_approve = true
|
|
- For other actions, determine if they imply review or approval
|
|
3. If BOTH flag_for_review and auto_approve are triggered, flag_for_review takes priority
|
|
4. If NO rules match, set both to false (default behavior)
|
|
|
|
IMPORTANT BUILT-IN RULE:
|
|
- If receipt currency differs from transaction currency → ALWAYS set flag_for_review = true
|
|
|
|
Return ONLY a valid JSON object with this exact format:
|
|
{{
|
|
"flag_for_review": boolean,
|
|
"auto_approve": boolean,
|
|
"rules_applied": ["list of rule conditions that matched"],
|
|
"reasons": ["list of reasons for the decisions"]
|
|
}}
|
|
"""
|
|
|
|
try:
|
|
# Call LLM
|
|
response = self.client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": "You are a financial rules evaluation assistant. You analyze transaction matches and apply business rules. Always respond with valid JSON only.",
|
|
},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
temperature=0.1,
|
|
max_tokens=500,
|
|
)
|
|
|
|
result_text = response.choices[0].message.content.strip()
|
|
|
|
# Parse JSON response
|
|
result = self._parse_llm_response(result_text)
|
|
|
|
# Validate and enforce constraints
|
|
if result["flag_for_review"] and result["auto_approve"]:
|
|
logger.warning(
|
|
"Both flag_for_review and auto_approve were true, prioritizing flag_for_review"
|
|
)
|
|
result["auto_approve"] = False
|
|
result["reasons"].append(
|
|
"Conflicting rules: prioritized manual review over auto-approval"
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"LLM evaluation failed: {str(e)}")
|
|
# Fail safe: flag for review
|
|
return {
|
|
"flag_for_review": True,
|
|
"auto_approve": False,
|
|
"rules_applied": [],
|
|
"reasons": [f"Error evaluating rules: {str(e)}"],
|
|
}
|
|
|
|
def _build_match_context(self, match: Match) -> str:
|
|
"""Build a text description of the match for the LLM"""
|
|
receipt = match.receipt
|
|
transaction = match.transaction
|
|
|
|
context = f"""Receipt Information:
|
|
- ID: {receipt.id}
|
|
- Vendor: {receipt.vendor}
|
|
- Amount: ${receipt.amount:.2f}
|
|
- Tax: ${receipt.tax:.2f}
|
|
- Category: {receipt.category}
|
|
- Description: {receipt.description}
|
|
- Date: {receipt.receipt_date}
|
|
- Currency: {receipt.currency}
|
|
|
|
Transaction Information:
|
|
- ID: {transaction.id}
|
|
- Vendor: {transaction.vendor}
|
|
- Amount: ${transaction.amount:.2f}
|
|
- Date: {transaction.transaction_date}
|
|
- Notes: {transaction.notes}
|
|
- Currency: {transaction.currency}
|
|
|
|
Match Quality:
|
|
- Confidence Score: {match.confidence_score:.2%}
|
|
- Match Reason: {match.match_reason}
|
|
"""
|
|
|
|
# Add tax analysis if available
|
|
if match.tax_analysis:
|
|
context += f"\nTax Analysis:\n{json.dumps(match.tax_analysis, indent=2)}"
|
|
|
|
return context
|
|
|
|
def _build_rules_context(self, rules: List[Dict]) -> str:
|
|
"""Build a formatted list of rules for the LLM"""
|
|
if not rules:
|
|
return "No custom rules provided. Apply default evaluation."
|
|
|
|
rules_text = ""
|
|
for idx, rule in enumerate(rules, 1):
|
|
condition = rule.get("condition", "")
|
|
action = rule.get("action", "")
|
|
rules_text += f"{idx}. IF {condition} → THEN {action}\n"
|
|
|
|
return rules_text
|
|
|
|
def _parse_llm_response(self, response_text: str) -> Dict:
|
|
"""Parse and validate LLM JSON response"""
|
|
try:
|
|
# Remove markdown code blocks if present
|
|
if "```json" in response_text:
|
|
response_text = response_text.split("```json")[1].split("```")[0]
|
|
elif "```" in response_text:
|
|
response_text = response_text.split("```")[1].split("```")[0]
|
|
|
|
# Parse JSON
|
|
result = json.loads(response_text.strip())
|
|
|
|
# Validate required fields
|
|
if "flag_for_review" not in result:
|
|
result["flag_for_review"] = False
|
|
if "auto_approve" not in result:
|
|
result["auto_approve"] = False
|
|
if "rules_applied" not in result:
|
|
result["rules_applied"] = []
|
|
if "reasons" not in result:
|
|
result["reasons"] = []
|
|
|
|
# Ensure boolean types
|
|
result["flag_for_review"] = bool(result["flag_for_review"])
|
|
result["auto_approve"] = bool(result["auto_approve"])
|
|
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse LLM response as JSON: {str(e)}")
|
|
logger.error(f"Response text: {response_text}")
|
|
# Return safe defaults
|
|
return {
|
|
"flag_for_review": True, # Fail safe to manual review
|
|
"auto_approve": False,
|
|
"rules_applied": [],
|
|
"reasons": ["Failed to parse LLM response"],
|
|
}
|