Files

244 lines
11 KiB
Python

import groq
from datetime import datetime, timedelta
from typing import List, Tuple
import config
from models import Receipt, Transaction, Match
import time
import logging
import asyncio
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AIMatcher:
def __init__(self):
self.client = groq.Groq(api_key=config.GROQ_API_KEY)
self.model = "llama3-8b-8192"
self.max_retries = 3
self.retry_delay = 2 # seconds - increased for rate limiting
self.rate_limit_delay = 1.0 # seconds between API calls
self.last_api_call = 0
def match_receipts_to_transactions(self, receipts: List[Receipt], transactions: List[Transaction]) -> List[Match]:
"""Match receipts to transactions using AI"""
logger.info(f"Starting AI matching for {len(receipts)} receipts against {len(transactions)} transactions")
matches = []
for i, receipt in enumerate(receipts):
logger.info(f"Processing receipt {i+1}/{len(receipts)}: {receipt.vendor} - ${receipt.amount}")
# Rate limiting
self._rate_limit()
# Get the BEST match for this receipt (highest confidence score)
best_match = self._find_best_match(receipt, transactions)
if best_match:
matches.append(best_match)
logger.info(f"Found match: {best_match.confidence_score:.3f} - {best_match.match_reason}")
else:
logger.warning(f"No match found for receipt: {receipt.vendor} - ${receipt.amount}")
# Sort by confidence score (highest first)
matches = sorted(matches, key=lambda x: x.confidence_score, reverse=True)
logger.info(f"AI matching completed. Found {len(matches)} matches")
return matches
def _rate_limit(self):
"""Implement rate limiting to avoid API quota exhaustion"""
current_time = time.time()
time_since_last_call = current_time - self.last_api_call
if time_since_last_call < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - time_since_last_call
logger.debug(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time)
self.last_api_call = time.time()
def _find_best_match(self, receipt: Receipt, transactions: List[Transaction]) -> Match:
"""Find the BEST match for a receipt (highest confidence score)"""
candidates = self._filter_candidates(receipt, transactions)
if not candidates:
logger.warning(f"No candidates found for receipt: {receipt.vendor} - ${receipt.amount}")
return None
logger.info(f"Found {len(candidates)} candidates for receipt: {receipt.vendor}")
best_match = None
highest_score = 0
for transaction in candidates:
score, reason = self._calculate_match_score(receipt, transaction)
logger.debug(f"Score {score:.3f} for transaction {transaction.vendor}: {reason}")
# Keep the match with the highest score, regardless of how low it is
if score > highest_score:
highest_score = score
best_match = Match(receipt, transaction, score, reason)
return best_match
def _filter_candidates(self, receipt: Receipt, transactions: List[Transaction]) -> List[Transaction]:
"""Filter transactions to create a reasonable candidate list"""
candidates = []
amount_threshold = receipt.amount * 2.0 # 200% threshold - very inclusive
for transaction in transactions:
# Use absolute value for transaction amount comparison
transaction_amount_abs = abs(transaction.amount)
# Only exclude transactions with obviously different amounts
if abs(receipt.amount - transaction_amount_abs) <= amount_threshold:
candidates.append(transaction)
logger.debug(f"Filtered {len(transactions)} transactions to {len(candidates)} candidates")
return candidates
def _calculate_match_score(self, receipt: Receipt, transaction: Transaction) -> Tuple[float, str]:
"""Calculate match score using AI"""
# Calculate differences for the AI to consider
date_diff = abs((receipt.receipt_date - transaction.transaction_date).days)
transaction_amount_abs = abs(transaction.amount)
amount_diff = abs(receipt.amount - transaction_amount_abs)
amount_percent_diff = (amount_diff / receipt.amount) * 100 if receipt.amount > 0 else 0
prompt = f"""
Compare this receipt with this transaction and provide a confidence score (0-1) and brief reason.
Receipt: {receipt.vendor}, ${receipt.amount}, {receipt.receipt_date.strftime('%Y-%m-%d')}
Receipt Description: {receipt.description}
Receipt Category: {receipt.category}
Transaction: {transaction.vendor}, ${transaction.amount} (absolute: ${transaction_amount_abs}), {transaction.transaction_date.strftime('%Y-%m-%d')}
Transaction Notes: {transaction.notes}
Differences:
- Date difference: {date_diff} days
- Amount difference: ${amount_diff} ({amount_percent_diff:.1f}%)
- Vendor comparison: "{receipt.vendor}" vs "{transaction.vendor}"
- Description/Notes comparison: "{receipt.description}" vs "{transaction.notes}"
- Category: {receipt.category}
Score this potential match based on how likely it is the correct match:
- Perfect matches (same vendor, amount, date): 0.95-1.0
- High confidence (minor differences): 0.8-0.94
- Medium confidence (moderate differences): 0.6-0.79
- Low confidence (significant differences): 0.4-0.59
- Very low confidence (major differences): 0.2-0.39
- Minimal similarity: 0.1-0.19
- No meaningful similarity: 0.0-0.09
Consider description and category similarity in your scoring.
IMPORTANT: Return ONLY the score and reason separated by a pipe character.
Format: [score]|[reason]
Example: 0.85|Same vendor, same amount, 2 days apart
"""
for attempt in range(self.max_retries):
try:
result = self._call_groq_api_with_timeout(prompt, timeout=30) # Increased timeout
# Parse the result - handle multiple formats
score, reason = self._parse_ai_response(result)
logger.debug(f"AI Response: {result}")
logger.debug(f"Parsed: score={score}, reason={reason}")
return score, reason
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed for receipt {receipt.id}: {str(e)}")
if attempt < self.max_retries - 1:
# Exponential backoff for rate limiting
sleep_time = self.retry_delay * (2 ** attempt)
logger.info(f"Waiting {sleep_time} seconds before retry...")
time.sleep(sleep_time)
else:
logger.error(f"All attempts failed for receipt {receipt.id}")
return 0.0, f"AI error after {self.max_retries} attempts: {str(e)}"
def _parse_ai_response(self, result: str) -> Tuple[float, str]:
"""Parse AI response with robust error handling"""
result = result.strip()
logger.debug(f"Parsing AI response: {result}")
# Try to find score in various formats
if '|' in result:
parts = result.split('|')
logger.debug(f"Split response into {len(parts)} parts: {parts}")
# Look for a numeric score in any part
for i, part in enumerate(parts):
part = part.strip()
try:
# Remove any non-numeric characters except decimal point
score_str_clean = ''.join(c for c in part if c.isdigit() or c == '.')
if score_str_clean:
score = float(score_str_clean)
if 0 <= score <= 1: # Valid confidence score
# Get reason from other parts
reason_parts = [p.strip() for j, p in enumerate(parts) if j != i and p.strip()]
reason = ' | '.join(reason_parts) if reason_parts else "Score extracted"
logger.debug(f"Found score {score} in part {i}, reason: {reason}")
return score, reason
except ValueError:
continue
# Try to extract just a number from the response
try:
import re
numbers = re.findall(r'\d+\.?\d*', result)
if numbers:
for num_str in numbers:
score = float(num_str)
if 0 <= score <= 1: # Valid confidence score
logger.debug(f"Extracted score {score} from response")
return score, f"Extracted from response: {result[:50]}..."
except (ValueError, IndexError):
pass
# Fallback - try to find any number and normalize it
try:
import re
numbers = re.findall(r'\d+\.?\d*', result)
if numbers:
score = float(numbers[0])
# Normalize to 0-1 range if it's a percentage or other scale
if score > 1:
score = score / 100 # Assume percentage
score = max(0, min(1, score)) # Clamp to 0-1
logger.debug(f"Normalized score {score} from response")
return score, f"Normalized from response: {result[:50]}..."
except (ValueError, IndexError):
pass
# Final fallback
logger.warning(f"Could not parse AI response: {result}")
return 0.0, f"Unparseable response: {result[:50]}..."
def _call_groq_api_with_timeout(self, prompt: str, timeout: int = 15) -> str:
"""Make API call with timeout and retry logic"""
import concurrent.futures
def api_call():
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=200,
temperature=0.1
)
return response.choices[0].message.content.strip()
except Exception as e:
raise e
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(api_call)
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
raise Exception(f"API call timed out after {timeout} seconds")
except Exception as e:
raise e