Refactor main application structure and improve logging

- Reorganized imports in main.py for better readability and structure.
- Enhanced logging configuration and added more detailed log messages throughout the application.
- Improved error handling and response formatting in transaction import endpoints.
- Streamlined transaction processing logic for CSV and image uploads.
- Updated matching engine to enhance match results with rules and improved logging.
- Refactored tax rules engine for better clarity and maintainability.
- Cleaned up requirements.txt by removing specific versioning for easier dependency management.
This commit is contained in:
bolade
2025-08-06 16:12:53 +01:00
parent 5b3c066cea
commit 1f530da7c4
5 changed files with 668 additions and 346 deletions
+304 -79
View File
@@ -1,115 +1,322 @@
import groq
from datetime import datetime, timedelta
from typing import List, Tuple
import config
from models import Receipt, Transaction, Match
import time
import logging import logging
import asyncio import time
from typing import List, Tuple
import groq
import config
from models import Match, Receipt, Transaction
# Set up logging # Set up logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class AIMatcher: class AIMatcher:
def __init__(self): def __init__(self, use_batch_matching=True):
self.client = groq.Groq(api_key=config.GROQ_API_KEY) self.client = groq.Groq(api_key=config.GROQ_API_KEY)
self.model = "llama3-8b-8192" self.model = "llama3-8b-8192"
self.max_retries = 3 self.max_retries = 3
self.retry_delay = 2 # seconds - increased for rate limiting self.retry_delay = 2 # seconds - increased for rate limiting
self.rate_limit_delay = 1.0 # seconds between API calls self.rate_limit_delay = 1.0 # seconds between API calls
self.last_api_call = 0 self.last_api_call = 0
self.use_batch_matching = (
def match_receipts_to_transactions(self, receipts: List[Receipt], transactions: List[Transaction]) -> List[Match]: use_batch_matching # Toggle between new and legacy methods
)
def match_receipts_to_transactions(
self, receipts: List[Receipt], transactions: List[Transaction]
) -> List[Match]:
"""Match receipts to transactions using AI""" """Match receipts to transactions using AI"""
logger.info(f"Starting AI matching for {len(receipts)} receipts against {len(transactions)} transactions") logger.info(
f"Starting AI matching for {len(receipts)} receipts against {len(transactions)} transactions"
)
matches = [] matches = []
for i, receipt in enumerate(receipts): for i, receipt in enumerate(receipts):
logger.info(f"Processing receipt {i+1}/{len(receipts)}: {receipt.vendor} - ${receipt.amount}") logger.info(
f"Processing receipt {i + 1}/{len(receipts)}: {receipt.vendor} - ${receipt.amount}"
)
# Rate limiting # Rate limiting
self._rate_limit() self._rate_limit()
# Get the BEST match for this receipt (highest confidence score) # Get the BEST match for this receipt (highest confidence score)
best_match = self._find_best_match(receipt, transactions) best_match = self._find_best_match(receipt, transactions)
if best_match: if best_match:
matches.append(best_match) matches.append(best_match)
logger.info(f"Found match: {best_match.confidence_score:.3f} - {best_match.match_reason}") logger.info(
f"Found match: {best_match.confidence_score:.3f} - {best_match.match_reason}"
)
else: else:
logger.warning(f"No match found for receipt: {receipt.vendor} - ${receipt.amount}") logger.warning(
f"No match found for receipt: {receipt.vendor} - ${receipt.amount}"
)
# Sort by confidence score (highest first) # Sort by confidence score (highest first)
matches = sorted(matches, key=lambda x: x.confidence_score, reverse=True) matches = sorted(matches, key=lambda x: x.confidence_score, reverse=True)
logger.info(f"AI matching completed. Found {len(matches)} matches") logger.info(f"AI matching completed. Found {len(matches)} matches")
return matches return matches
def _rate_limit(self): def _rate_limit(self):
"""Implement rate limiting to avoid API quota exhaustion""" """Implement rate limiting to avoid API quota exhaustion"""
current_time = time.time() current_time = time.time()
time_since_last_call = current_time - self.last_api_call time_since_last_call = current_time - self.last_api_call
if time_since_last_call < self.rate_limit_delay: if time_since_last_call < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - time_since_last_call sleep_time = self.rate_limit_delay - time_since_last_call
logger.debug(f"Rate limiting: sleeping for {sleep_time:.2f} seconds") logger.debug(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time) time.sleep(sleep_time)
self.last_api_call = time.time() self.last_api_call = time.time()
def _find_best_match(self, receipt: Receipt, transactions: List[Transaction]) -> Match: def _find_best_match(
"""Find the BEST match for a receipt (highest confidence score)""" self, receipt: Receipt, transactions: List[Transaction]
) -> Match:
"""Find the BEST match for a receipt using a single AI call for all candidates"""
candidates = self._filter_candidates(receipt, transactions) candidates = self._filter_candidates(receipt, transactions)
if not candidates: if not candidates:
logger.warning(f"No candidates found for receipt: {receipt.vendor} - ${receipt.amount}") logger.warning(
f"No candidates found for receipt: {receipt.vendor} - ${receipt.amount}"
)
return None return None
logger.info(f"Found {len(candidates)} candidates for receipt: {receipt.vendor}") logger.info(f"Found {len(candidates)} candidates for receipt: {receipt.vendor}")
best_match = None # Choose matching method based on configuration
highest_score = 0 if self.use_batch_matching:
# New efficient method: single AI call for all candidates
for transaction in candidates: best_match = self._find_best_match_single_call(receipt, candidates)
score, reason = self._calculate_match_score(receipt, transaction) else:
logger.debug(f"Score {score:.3f} for transaction {transaction.vendor}: {reason}") # Legacy method: individual AI calls (fallback)
best_match = self._find_best_match_legacy(receipt, candidates)
# Keep the match with the highest score, regardless of how low it is
if score > highest_score:
highest_score = score
best_match = Match(receipt, transaction, score, reason)
return best_match return best_match
def _filter_candidates(self, receipt: Receipt, transactions: List[Transaction]) -> List[Transaction]: def _find_best_match_single_call(
self, receipt: Receipt, candidates: List[Transaction]
) -> Match:
"""Find the best match using a single AI call to evaluate all candidates"""
if not candidates:
return None
# Limit candidates to avoid token limits (adjust based on your needs)
max_candidates = 10
if len(candidates) > max_candidates:
# Sort by amount similarity and take top candidates
candidates = sorted(
candidates, key=lambda t: abs(receipt.amount - abs(t.amount))
)[:max_candidates]
logger.info(
f"Limited candidates to top {max_candidates} by amount similarity"
)
# Build comprehensive prompt with all candidates
candidates_text = ""
for i, transaction in enumerate(candidates):
transaction_amount_abs = abs(transaction.amount)
date_diff = abs((receipt.receipt_date - transaction.transaction_date).days)
amount_diff = abs(receipt.amount - transaction_amount_abs)
amount_percent_diff = (
(amount_diff / receipt.amount) * 100 if receipt.amount > 0 else 0
)
candidates_text += f"""
Candidate {i + 1}:
- Vendor: {transaction.vendor}
- Amount: ${transaction.amount} (absolute: ${transaction_amount_abs})
- Date: {transaction.transaction_date.strftime("%Y-%m-%d")} ({date_diff} days difference)
- Notes: {transaction.notes}
- Amount difference: ${amount_diff} ({amount_percent_diff:.1f}%)
"""
prompt = f"""
You are an expert at matching receipts to bank transactions. Analyze the receipt below against ALL the candidate transactions and return the BEST match.
RECEIPT TO MATCH:
- Vendor: {receipt.vendor}
- Amount: ${receipt.amount}
- Date: {receipt.receipt_date.strftime("%Y-%m-%d")}
- Description: {receipt.description}
- Category: {receipt.category}
CANDIDATE TRANSACTIONS:
{candidates_text}
SCORING CRITERIA:
- Perfect matches (same vendor, amount, date): 0.95-1.0
- High confidence (minor differences): 0.8-0.94
- Medium confidence (moderate differences): 0.6-0.79
- Low confidence (significant differences): 0.4-0.59
- Very low confidence (major differences): 0.2-0.39
- Minimal similarity: 0.1-0.19
- No meaningful similarity: 0.0-0.09
Consider vendor name similarity, amount accuracy, date proximity, and description/notes relevance.
IMPORTANT: You MUST return the candidate with the highest match score, even if it's very low. Never return NONE.
Return ONLY the best match in this exact format:
CANDIDATE_NUMBER|CONFIDENCE_SCORE|REASON
Example: 3|0.87|Same vendor name, exact amount match, 1 day apart
Example of low match: 5|0.15|Best available option despite significant differences in vendor and amount
"""
for attempt in range(self.max_retries):
try:
result = self._call_groq_api_with_timeout(
prompt, timeout=45
) # Longer timeout for complex prompt
# Parse the single result
candidate_num, score, reason = self._parse_single_match_response(result)
if candidate_num == -1: # Parsing error occurred
logger.warning(
f"Failed to parse AI response for receipt: {receipt.vendor}"
)
return None
if 0 <= candidate_num < len(candidates):
best_transaction = candidates[candidate_num]
logger.info(
f"AI selected candidate {candidate_num + 1}: {best_transaction.vendor} (score: {score:.3f})"
)
return Match(receipt, best_transaction, score, reason)
else:
logger.warning(
f"AI returned invalid candidate number: {candidate_num}"
)
return None
except Exception as e:
logger.warning(
f"Attempt {attempt + 1} failed for receipt {receipt.id}: {str(e)}"
)
if attempt < self.max_retries - 1:
sleep_time = self.retry_delay * (2**attempt)
logger.info(f"Waiting {sleep_time} seconds before retry...")
time.sleep(sleep_time)
else:
logger.error(f"All attempts failed for receipt {receipt.id}")
return None
return None
def _parse_single_match_response(self, result: str) -> Tuple[int, float, str]:
"""Parse AI response for single best match"""
result = result.strip()
logger.debug(f"Parsing single match response: {result}")
try:
if result.upper().startswith("NONE"):
# This should not happen with new prompt, but handle as parsing error
logger.warning(
"AI returned NONE despite being instructed to always return best match"
)
return -1, 0.0, "AI returned NONE unexpectedly"
if "|" in result:
parts = result.split("|")
if len(parts) >= 3:
candidate_str = parts[0].strip()
score_str = parts[1].strip()
reason = "|".join(parts[2:]).strip()
# Extract candidate number
import re
candidate_match = re.search(r"\d+", candidate_str)
if candidate_match:
candidate_num = (
int(candidate_match.group()) - 1
) # Convert to 0-based index
else:
raise ValueError("No candidate number found")
# Extract score
score_clean = "".join(
c for c in score_str if c.isdigit() or c == "."
)
score = float(score_clean) if score_clean else 0.0
# Ensure score is in valid range
score = max(0.0, min(1.0, score))
logger.debug(
f"Parsed: candidate={candidate_num}, score={score}, reason={reason}"
)
return candidate_num, score, reason
except Exception as e:
logger.warning(f"Error parsing single match response: {e}")
# Fallback
logger.warning(f"Could not parse single match response: {result}")
return -1, 0.0, f"Parse error: {result[:50]}..."
def _filter_candidates(
self, receipt: Receipt, transactions: List[Transaction]
) -> List[Transaction]:
"""Filter transactions to create a reasonable candidate list""" """Filter transactions to create a reasonable candidate list"""
candidates = [] candidates = []
amount_threshold = receipt.amount * 2.0 # 200% threshold - very inclusive amount_threshold = receipt.amount * 2.0 # 200% threshold - very inclusive
for transaction in transactions: for transaction in transactions:
# Use absolute value for transaction amount comparison # Use absolute value for transaction amount comparison
transaction_amount_abs = abs(transaction.amount) transaction_amount_abs = abs(transaction.amount)
# Only exclude transactions with obviously different amounts # Only exclude transactions with obviously different amounts
if abs(receipt.amount - transaction_amount_abs) <= amount_threshold: if abs(receipt.amount - transaction_amount_abs) <= amount_threshold:
candidates.append(transaction) candidates.append(transaction)
logger.debug(f"Filtered {len(transactions)} transactions to {len(candidates)} candidates") logger.debug(
f"Filtered {len(transactions)} transactions to {len(candidates)} candidates"
)
return candidates return candidates
def _calculate_match_score(self, receipt: Receipt, transaction: Transaction) -> Tuple[float, str]: def _find_best_match_legacy(
self, receipt: Receipt, transactions: List[Transaction]
) -> Match:
"""Legacy method: Find the best match using individual API calls (kept as fallback)"""
candidates = self._filter_candidates(receipt, transactions)
if not candidates:
return None
best_match = None
highest_score = 0
for transaction in candidates:
score, reason = self._calculate_match_score(receipt, transaction)
logger.debug(
f"Score {score:.3f} for transaction {transaction.vendor}: {reason}"
)
if score > highest_score:
highest_score = score
best_match = Match(receipt, transaction, score, reason)
return best_match
def _calculate_match_score(
self, receipt: Receipt, transaction: Transaction
) -> Tuple[float, str]:
"""Calculate match score using AI""" """Calculate match score using AI"""
# Calculate differences for the AI to consider # Calculate differences for the AI to consider
date_diff = abs((receipt.receipt_date - transaction.transaction_date).days) date_diff = abs((receipt.receipt_date - transaction.transaction_date).days)
transaction_amount_abs = abs(transaction.amount) transaction_amount_abs = abs(transaction.amount)
amount_diff = abs(receipt.amount - transaction_amount_abs) amount_diff = abs(receipt.amount - transaction_amount_abs)
amount_percent_diff = (amount_diff / receipt.amount) * 100 if receipt.amount > 0 else 0 amount_percent_diff = (
(amount_diff / receipt.amount) * 100 if receipt.amount > 0 else 0
)
prompt = f""" prompt = f"""
Compare this receipt with this transaction and provide a confidence score (0-1) and brief reason. Compare this receipt with this transaction and provide a confidence score (0-1) and brief reason, the reason must be a single sentence without any special formatting.
Receipt: {receipt.vendor}, ${receipt.amount}, {receipt.receipt_date.strftime('%Y-%m-%d')} Receipt: {receipt.vendor}, ${receipt.amount}, {receipt.receipt_date.strftime("%Y-%m-%d")}
Receipt Description: {receipt.description} Receipt Description: {receipt.description}
Receipt Category: {receipt.category} Receipt Category: {receipt.category}
Transaction: {transaction.vendor}, ${transaction.amount} (absolute: ${transaction_amount_abs}), {transaction.transaction_date.strftime('%Y-%m-%d')} Transaction: {transaction.vendor}, ${transaction.amount} (absolute: ${transaction_amount_abs}), {transaction.transaction_date.strftime("%Y-%m-%d")}
Transaction Notes: {transaction.notes} Transaction Notes: {transaction.notes}
Differences: Differences:
@@ -135,61 +342,78 @@ class AIMatcher:
Format: [score]|[reason] Format: [score]|[reason]
Example: 0.85|Same vendor, same amount, 2 days apart Example: 0.85|Same vendor, same amount, 2 days apart
""" """
for attempt in range(self.max_retries): for attempt in range(self.max_retries):
try: try:
result = self._call_groq_api_with_timeout(prompt, timeout=30) # Increased timeout result = self._call_groq_api_with_timeout(
prompt, timeout=30
) # Increased timeout
# Parse the result - handle multiple formats # Parse the result - handle multiple formats
score, reason = self._parse_ai_response(result) score, reason = self._parse_ai_response(result)
logger.debug(f"AI Response: {result}") logger.debug(f"AI Response: {result}")
logger.debug(f"Parsed: score={score}, reason={reason}") logger.debug(f"Parsed: score={score}, reason={reason}")
return score, reason return score, reason
except Exception as e: except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed for receipt {receipt.id}: {str(e)}") logger.warning(
f"Attempt {attempt + 1} failed for receipt {receipt.id}: {str(e)}"
)
if attempt < self.max_retries - 1: if attempt < self.max_retries - 1:
# Exponential backoff for rate limiting # Exponential backoff for rate limiting
sleep_time = self.retry_delay * (2 ** attempt) sleep_time = self.retry_delay * (2**attempt)
logger.info(f"Waiting {sleep_time} seconds before retry...") logger.info(f"Waiting {sleep_time} seconds before retry...")
time.sleep(sleep_time) time.sleep(sleep_time)
else: else:
logger.error(f"All attempts failed for receipt {receipt.id}") logger.error(f"All attempts failed for receipt {receipt.id}")
return 0.0, f"AI error after {self.max_retries} attempts: {str(e)}" return 0.0, f"AI error after {self.max_retries} attempts: {str(e)}"
def _parse_ai_response(self, result: str) -> Tuple[float, str]: def _parse_ai_response(self, result: str) -> Tuple[float, str]:
"""Parse AI response with robust error handling""" """Parse AI response with robust error handling"""
result = result.strip() result = result.strip()
logger.debug(f"Parsing AI response: {result}") logger.debug(f"Parsing AI response: {result}")
# Try to find score in various formats # Try to find score in various formats
if '|' in result: if "|" in result:
parts = result.split('|') parts = result.split("|")
logger.debug(f"Split response into {len(parts)} parts: {parts}") logger.debug(f"Split response into {len(parts)} parts: {parts}")
# Look for a numeric score in any part # Look for a numeric score in any part
for i, part in enumerate(parts): for i, part in enumerate(parts):
part = part.strip() part = part.strip()
try: try:
# Remove any non-numeric characters except decimal point # Remove any non-numeric characters except decimal point
score_str_clean = ''.join(c for c in part if c.isdigit() or c == '.') score_str_clean = "".join(
c for c in part if c.isdigit() or c == "."
)
if score_str_clean: if score_str_clean:
score = float(score_str_clean) score = float(score_str_clean)
if 0 <= score <= 1: # Valid confidence score if 0 <= score <= 1: # Valid confidence score
# Get reason from other parts # Get reason from other parts
reason_parts = [p.strip() for j, p in enumerate(parts) if j != i and p.strip()] reason_parts = [
reason = ' | '.join(reason_parts) if reason_parts else "Score extracted" p.strip()
logger.debug(f"Found score {score} in part {i}, reason: {reason}") for j, p in enumerate(parts)
if j != i and p.strip()
]
reason = (
" | ".join(reason_parts)
if reason_parts
else "Score extracted"
)
logger.debug(
f"Found score {score} in part {i}, reason: {reason}"
)
return score, reason return score, reason
except ValueError: except ValueError:
continue continue
# Try to extract just a number from the response # Try to extract just a number from the response
try: try:
import re import re
numbers = re.findall(r'\d+\.?\d*', result)
numbers = re.findall(r"\d+\.?\d*", result)
if numbers: if numbers:
for num_str in numbers: for num_str in numbers:
score = float(num_str) score = float(num_str)
@@ -198,11 +422,12 @@ class AIMatcher:
return score, f"Extracted from response: {result[:50]}..." return score, f"Extracted from response: {result[:50]}..."
except (ValueError, IndexError): except (ValueError, IndexError):
pass pass
# Fallback - try to find any number and normalize it # Fallback - try to find any number and normalize it
try: try:
import re import re
numbers = re.findall(r'\d+\.?\d*', result)
numbers = re.findall(r"\d+\.?\d*", result)
if numbers: if numbers:
score = float(numbers[0]) score = float(numbers[0])
# Normalize to 0-1 range if it's a percentage or other scale # Normalize to 0-1 range if it's a percentage or other scale
@@ -213,27 +438,27 @@ class AIMatcher:
return score, f"Normalized from response: {result[:50]}..." return score, f"Normalized from response: {result[:50]}..."
except (ValueError, IndexError): except (ValueError, IndexError):
pass pass
# Final fallback # Final fallback
logger.warning(f"Could not parse AI response: {result}") logger.warning(f"Could not parse AI response: {result}")
return 0.0, f"Unparseable response: {result[:50]}..." return 0.0, f"Unparseable response: {result[:50]}..."
def _call_groq_api_with_timeout(self, prompt: str, timeout: int = 15) -> str: def _call_groq_api_with_timeout(self, prompt: str, timeout: int = 15) -> str:
"""Make API call with timeout and retry logic""" """Make API call with timeout and retry logic"""
import concurrent.futures import concurrent.futures
def api_call(): def api_call():
try: try:
response = self.client.chat.completions.create( response = self.client.chat.completions.create(
model=self.model, model=self.model,
messages=[{"role": "user", "content": prompt}], messages=[{"role": "user", "content": prompt}],
max_tokens=200, max_tokens=200,
temperature=0.1 temperature=0.1,
) )
return response.choices[0].message.content.strip() return response.choices[0].message.content.strip()
except Exception as e: except Exception as e:
raise e raise e
try: try:
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(api_call) future = executor.submit(api_call)
@@ -241,4 +466,4 @@ class AIMatcher:
except concurrent.futures.TimeoutError: except concurrent.futures.TimeoutError:
raise Exception(f"API call timed out after {timeout} seconds") raise Exception(f"API call timed out after {timeout} seconds")
except Exception as e: except Exception as e:
raise e raise e
+237 -157
View File
@@ -1,37 +1,37 @@
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime
from typing import List
import uuid
import csv import csv
import io import io
import logging import logging
import uuid
from datetime import datetime
from typing import List
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
# Configure logging # Configure logging
from ai_rules import AIRule
from api_models import (
DocumentProcessResponse,
DocumentUploadResponse,
MatchingResponse,
MatchResponse,
RuleRequest,
)
from document_processor import DocumentProcessor
from matching_engine import MatchingEngine
from models import Receipt, Transaction
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[ handlers=[logging.FileHandler("app.log"), logging.StreamHandler()],
logging.FileHandler('app.log'),
logging.StreamHandler()
]
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from api_models import (
MatchingRequest, MatchingResponse, MatchResponse,
ApprovalRequest, RuleRequest, DocumentUploadResponse,
DocumentProcessResponse, TransactionRequest
)
from models import Receipt, Transaction, Match
from matching_engine import MatchingEngine
from ai_rules import AIRule
from document_processor import DocumentProcessor
app = FastAPI( app = FastAPI(
title="AI Bookkeeper - Data Science Engine", title="AI Bookkeeper - Data Science Engine",
description="AI-powered receipt-to-transaction matching engine. Receives transaction data and provides intelligent matching capabilities.", description="AI-powered receipt-to-transaction matching engine. Receives transaction data and provides intelligent matching capabilities.",
version="1.0.0" version="1.0.0",
) )
# CORS middleware # CORS middleware
@@ -54,19 +54,22 @@ uploaded_files = {}
stored_transactions = [] stored_transactions = []
processed_receipts = {} processed_receipts = {}
@app.get("/") @app.get("/")
async def root(): async def root():
"""Health check endpoint""" """Health check endpoint"""
return { return {
"message": "AI Bookkeeper Data Science Engine is running", "message": "AI Bookkeeper Data Science Engine is running",
"version": "1.0.0", "version": "1.0.0",
"status": "healthy" "status": "healthy",
} }
# ============================================================================ # ============================================================================
# TRANSACTION IMPORT ENDPOINTS # TRANSACTION IMPORT ENDPOINTS
# ============================================================================ # ============================================================================
@app.post("/transactions/import/csv") @app.post("/transactions/import/csv")
async def import_transactions_csv(file: UploadFile = File(...)): async def import_transactions_csv(file: UploadFile = File(...)):
""" """
@@ -74,54 +77,65 @@ async def import_transactions_csv(file: UploadFile = File(...)):
""" """
try: try:
content = await file.read() content = await file.read()
decoded = content.decode('utf-8') decoded = content.decode("utf-8")
reader = csv.DictReader(io.StringIO(decoded)) reader = csv.DictReader(io.StringIO(decoded))
transactions = [] transactions = []
errors = [] errors = []
for idx, row in enumerate(reader): for idx, row in enumerate(reader):
try: try:
# Use correct headers and strip whitespace # Use correct headers and strip whitespace
account_number = row.get('Account Number') or row.get('Account Number '.strip()) account_number = row.get("Account Number") or row.get(
txn_date_raw = row.get('Transaction Date') or row.get('Transaction Date '.strip()) "Account Number ".strip()
amount_raw = row.get('Amount') or row.get('Amount '.strip()) )
payee_name = row.get('Description 2') or row.get('Description 2 '.strip()) txn_date_raw = row.get("Transaction Date") or row.get(
memo = f"{row.get('Account Type','').strip()} {row.get('Cheque Number','').strip()} {row.get('Description 1','').strip()}".strip() "Transaction Date ".strip()
)
amount_raw = row.get("Amount") or row.get("Amount ".strip())
payee_name = row.get("Description 2") or row.get(
"Description 2 ".strip()
)
memo = f"{row.get('Account Type', '').strip()} {row.get('Cheque Number', '').strip()} {row.get('Description 1', '').strip()}".strip()
# Compose ID # Compose ID
txn_id = f"{account_number}_{idx+1}" txn_id = f"{account_number}_{idx + 1}"
# Parse date (try multiple formats) # Parse date (try multiple formats)
txn_date_str = txn_date_raw.strip() txn_date_str = txn_date_raw.strip()
txn_date = None txn_date = None
for fmt in ("%m/%d/%y", "%m/%d/%Y"): for fmt in ("%m/%d/%y", "%m/%d/%Y"):
try: try:
txn_date = datetime.strptime(txn_date_str, fmt).strftime("%Y-%m-%d") txn_date = datetime.strptime(txn_date_str, fmt).strftime(
"%Y-%m-%d"
)
break break
except Exception: except Exception:
continue continue
if not txn_date: if not txn_date:
raise ValueError(f"Could not parse date: {txn_date_str}") raise ValueError(f"Could not parse date: {txn_date_str}")
# Parse amount # Parse amount
amount = float(amount_raw.replace(',', '').strip()) amount = float(amount_raw.replace(",", "").strip())
transactions.append({ transactions.append(
"id": txn_id, {
"txn_date": txn_date, "id": txn_id,
"amount": amount, "txn_date": txn_date,
"payee_name": payee_name.strip(), "amount": amount,
"memo": memo "payee_name": payee_name.strip(),
}) "memo": memo,
}
)
except Exception as e: except Exception as e:
errors.append(f"Row {idx+1}: {str(e)}") errors.append(f"Row {idx + 1}: {str(e)}")
# Store transactions globally for auto-matching # Store transactions globally for auto-matching
global stored_transactions global stored_transactions
stored_transactions = transactions stored_transactions = transactions
return { return {
"imported_count": len(transactions), "imported_count": len(transactions),
"converted_transactions": transactions, "converted_transactions": transactions,
"errors": errors "errors": errors,
} }
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/transactions/import/image") @app.post("/transactions/import/image")
async def import_transactions_from_image(file: UploadFile = File(...)): async def import_transactions_from_image(file: UploadFile = File(...)):
""" """
@@ -129,107 +143,125 @@ async def import_transactions_from_image(file: UploadFile = File(...)):
""" """
try: try:
# Validate file type # Validate file type
allowed_types = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'pdf'] allowed_types = ["jpg", "jpeg", "png", "gif", "bmp", "pdf"]
file_extension = file.filename.split('.')[-1].lower() file_extension = file.filename.split(".")[-1].lower()
if file_extension not in allowed_types: if file_extension not in allowed_types:
raise HTTPException(status_code=400, detail=f"Unsupported file type. Allowed: {allowed_types}") raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Allowed: {allowed_types}",
)
# Read file content # Read file content
content = await file.read() content = await file.read()
# Save file to disk # Save file to disk
image_path = await document_processor.save_uploaded_file(content, file.filename) image_path = await document_processor.save_uploaded_file(content, file.filename)
# Extract transactions from image (pass file path) # Extract transactions from image (pass file path)
extraction_result = await document_processor.extract_transactions_from_image(image_path) extraction_result = await document_processor.extract_transactions_from_image(
image_path
)
if not extraction_result.get("extraction_success", False): if not extraction_result.get("extraction_success", False):
raise HTTPException(status_code=500, detail=extraction_result.get("error", "Extraction failed")) raise HTTPException(
status_code=500,
detail=extraction_result.get("error", "Extraction failed"),
)
extracted_transactions = extraction_result.get("transactions", []) extracted_transactions = extraction_result.get("transactions", [])
# Store transactions globally for auto-matching # Store transactions globally for auto-matching
global stored_transactions global stored_transactions
stored_transactions = [] stored_transactions = []
for idx, txn in enumerate(extracted_transactions): for idx, txn in enumerate(extracted_transactions):
try: try:
txn_id = f"img_{file.filename}_{idx+1}" txn_id = f"img_{file.filename}_{idx + 1}"
txn_date_raw = txn.get("date") txn_date_raw = txn.get("date")
amount = txn.get("amount") amount = txn.get("amount")
vendor = txn.get("vendor") vendor = txn.get("vendor")
memo = txn.get("memo", "") memo = txn.get("memo", "")
# Parse date to YYYY-MM-DD format # Parse date to YYYY-MM-DD format
txn_date = document_processor._parse_date_to_iso(txn_date_raw) txn_date = document_processor._parse_date_to_iso(txn_date_raw)
if not txn_date: if not txn_date:
# Fallback: use current year if parsing fails # Fallback: use current year if parsing fails
txn_date = f"2024-{txn_date_raw}" txn_date = f"2024-{txn_date_raw}"
stored_transactions.append({ stored_transactions.append(
"id": txn_id, {
"txn_date": txn_date, "id": txn_id,
"amount": amount, "txn_date": txn_date,
"payee_name": vendor, "amount": amount,
"memo": memo "payee_name": vendor,
}) "memo": memo,
except Exception as e: }
)
except Exception:
continue continue
return { return {
"imported_count": len(stored_transactions), "imported_count": len(stored_transactions),
"converted_transactions": stored_transactions, "converted_transactions": stored_transactions,
"errors": [] "errors": [],
} }
except Exception as e: except Exception as e:
logger.error(f"Error importing transactions from image: {str(e)}") logger.error(f"Error importing transactions from image: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# ============================================================================ # ============================================================================
# DOCUMENT PROCESSING ENDPOINTS # DOCUMENT PROCESSING ENDPOINTS
# ============================================================================ # ============================================================================
@app.post("/upload-multiple", response_model=List[DocumentUploadResponse]) @app.post("/upload-multiple", response_model=List[DocumentUploadResponse])
async def upload_multiple_documents(files: List[UploadFile] = File(...)): async def upload_multiple_documents(files: List[UploadFile] = File(...)):
""" """
Upload multiple receipt images for processing. Upload multiple receipt images for processing.
This endpoint accepts multiple image files and returns file IDs This endpoint accepts multiple image files and returns file IDs
that can be used with the /process/{file_id} endpoint. that can be used with the /process/{file_id} endpoint.
""" """
try: try:
responses = [] responses = []
for file in files: for file in files:
# Validate file type # Validate file type
allowed_types = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'pdf'] allowed_types = ["jpg", "jpeg", "png", "gif", "bmp", "pdf"]
file_extension = file.filename.split('.')[-1].lower() file_extension = file.filename.split(".")[-1].lower()
if file_extension not in allowed_types: if file_extension not in allowed_types:
raise HTTPException(status_code=400, detail=f"Unsupported file type for {file.filename}. Allowed: {allowed_types}") raise HTTPException(
status_code=400,
detail=f"Unsupported file type for {file.filename}. Allowed: {allowed_types}",
)
# Generate unique file ID # Generate unique file ID
file_id = str(uuid.uuid4()) file_id = str(uuid.uuid4())
# Read and store file content # Read and store file content
content = await file.read() content = await file.read()
uploaded_files[file_id] = { uploaded_files[file_id] = {
"filename": file.filename, "filename": file.filename,
"content": content, "content": content,
"upload_date": datetime.now() "upload_date": datetime.now(),
} }
responses.append(DocumentUploadResponse( responses.append(
file_id=file_id, DocumentUploadResponse(
filename=file.filename, file_id=file_id,
file_type=file_extension, filename=file.filename,
upload_date=datetime.now(), file_type=file_extension,
status="uploaded" upload_date=datetime.now(),
)) status="uploaded",
)
)
return responses return responses
except Exception as e: except Exception as e:
logger.error(f"Error uploading documents: {str(e)}") logger.error(f"Error uploading documents: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/process/{file_id}", response_model=DocumentProcessResponse) @app.post("/process/{file_id}", response_model=DocumentProcessResponse)
async def process_document(file_id: str): async def process_document(file_id: str):
""" """
Process a previously uploaded document to extract receipt information. Process a previously uploaded document to extract receipt information.
This endpoint uses AI to extract structured data from receipt images, This endpoint uses AI to extract structured data from receipt images,
including vendor, amount, date, and category information. including vendor, amount, date, and category information.
""" """
@@ -237,17 +269,19 @@ async def process_document(file_id: str):
# Check if file exists # Check if file exists
if file_id not in uploaded_files: if file_id not in uploaded_files:
raise HTTPException(status_code=404, detail=f"File {file_id} not found") raise HTTPException(status_code=404, detail=f"File {file_id} not found")
file_data = uploaded_files[file_id] file_data = uploaded_files[file_id]
# Save file temporarily and process it # Save file temporarily and process it
file_path = await document_processor.save_uploaded_file(file_data["content"], file_data["filename"]) file_path = await document_processor.save_uploaded_file(
file_type = file_data["filename"].split('.')[-1].lower() file_data["content"], file_data["filename"]
)
file_type = file_data["filename"].split(".")[-1].lower()
receipt_data = await document_processor.process_file(file_path, file_type) receipt_data = await document_processor.process_file(file_path, file_type)
# Store processed receipt # Store processed receipt
processed_receipts[file_id] = receipt_data processed_receipts[file_id] = receipt_data
return DocumentProcessResponse( return DocumentProcessResponse(
file_id=file_id, file_id=file_id,
extraction_success=receipt_data.get("extraction_success", False), extraction_success=receipt_data.get("extraction_success", False),
@@ -258,35 +292,40 @@ async def process_document(file_id: str):
date=receipt_data.get("date", ""), date=receipt_data.get("date", ""),
category=receipt_data.get("category", ""), category=receipt_data.get("category", ""),
confidence=receipt_data.get("confidence", 0.0), confidence=receipt_data.get("confidence", 0.0),
error=receipt_data.get("error", None) error=receipt_data.get("error", None),
) )
except Exception as e: except Exception as e:
logger.error(f"Error processing document {file_id}: {str(e)}") logger.error(f"Error processing document {file_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# ============================================================================ # ============================================================================
# MATCHING ENDPOINTS # MATCHING ENDPOINTS
# ============================================================================ # ============================================================================
@app.post("/match-specific", response_model=MatchingResponse) @app.post("/match-specific", response_model=MatchingResponse)
async def match_specific_receipts(file_ids: List[str]): async def match_specific_receipts(file_ids: List[str]):
""" """
Match specific receipts against imported transactions. Match specific receipts against imported transactions.
This endpoint takes a list of receipt file IDs and matches them against This endpoint takes a list of receipt file IDs and matches them against
the currently imported transactions using AI-powered matching logic. the currently imported transactions using AI-powered matching logic.
""" """
try: try:
logger.info(f"Starting match-specific for file IDs: {file_ids}") logger.info(f"Starting match-specific for file IDs: {file_ids}")
# Check if transactions are imported # Check if transactions are imported
if not stored_transactions: if not stored_transactions:
logger.warning("No transactions imported") logger.warning("No transactions imported")
raise HTTPException(status_code=400, detail="No transactions imported. Please upload CSV first.") raise HTTPException(
status_code=400,
detail="No transactions imported. Please upload CSV first.",
)
logger.info(f"Found {len(stored_transactions)} stored transactions") logger.info(f"Found {len(stored_transactions)} stored transactions")
# Convert stored transactions to Transaction objects # Convert stored transactions to Transaction objects
transactions = [] transactions = []
for txn in stored_transactions: for txn in stored_transactions:
@@ -297,32 +336,38 @@ async def match_specific_receipts(file_ids: List[str]):
transaction_date=txn_date, transaction_date=txn_date,
amount=txn["amount"], amount=txn["amount"],
vendor=txn["payee_name"], vendor=txn["payee_name"],
notes=txn["memo"] notes=txn["memo"],
) )
transactions.append(transaction) transactions.append(transaction)
except Exception as e: except Exception as e:
logger.warning(f"Error converting transaction {txn['id']}: {str(e)}") logger.warning(f"Error converting transaction {txn['id']}: {str(e)}")
continue continue
logger.info(f"Converted {len(transactions)} transactions") logger.info(f"Converted {len(transactions)} transactions")
# Get receipts for the specified file IDs # Get receipts for the specified file IDs
receipts = [] receipts = []
missing_files = [] missing_files = []
for file_id in file_ids: for file_id in file_ids:
if file_id in processed_receipts: if file_id in processed_receipts:
receipt_data = processed_receipts[file_id] receipt_data = processed_receipts[file_id]
logger.info(f"DEBUG: receipt_data for {file_id}: {receipt_data}") logger.info(f"DEBUG: receipt_data for {file_id}: {receipt_data}")
logger.info(f"DEBUG: receipt_data keys for {file_id}: {list(receipt_data.keys())}") logger.info(
f"DEBUG: receipt_data keys for {file_id}: {list(receipt_data.keys())}"
)
try: try:
# Handle missing date field # Handle missing date field
if "date" not in receipt_data or not receipt_data["date"]: if "date" not in receipt_data or not receipt_data["date"]:
logger.warning(f"Missing date for receipt {file_id}, using current date") logger.warning(
f"Missing date for receipt {file_id}, using current date"
)
receipt_date = datetime.now() receipt_date = datetime.now()
else: else:
receipt_date = datetime.strptime(receipt_data["date"], "%Y-%m-%d") receipt_date = datetime.strptime(
receipt_data["date"], "%Y-%m-%d"
)
# Handle missing amount field - try multiple possible keys # Handle missing amount field - try multiple possible keys
amount = receipt_data.get("amount") amount = receipt_data.get("amount")
if amount is None: if amount is None:
@@ -330,37 +375,43 @@ async def match_specific_receipts(file_ids: List[str]):
if amount is None: if amount is None:
amount = receipt_data.get("amount_total") amount = receipt_data.get("amount_total")
if amount is None: if amount is None:
logger.warning(f"Missing amount for receipt {file_id}, using 0.0") logger.warning(
f"Missing amount for receipt {file_id}, using 0.0"
)
amount = 0.0 amount = 0.0
# Ensure amount is a float # Ensure amount is a float
try: try:
amount = float(amount) amount = float(amount)
except (ValueError, TypeError): except (ValueError, TypeError):
logger.warning(f"Invalid amount '{amount}' for receipt {file_id}, using 0.0") logger.warning(
f"Invalid amount '{amount}' for receipt {file_id}, using 0.0"
)
amount = 0.0 amount = 0.0
logger.info(f"DEBUG: amount for {file_id}: {amount}") logger.info(f"DEBUG: amount for {file_id}: {amount}")
# Handle missing vendor field # Handle missing vendor field
vendor = receipt_data.get("vendor", "") vendor = receipt_data.get("vendor", "")
if not vendor: if not vendor:
logger.warning(f"Missing vendor for receipt {file_id}, using 'Unknown'") logger.warning(
f"Missing vendor for receipt {file_id}, using 'Unknown'"
)
vendor = "Unknown" vendor = "Unknown"
# Handle missing category field # Handle missing category field
category = receipt_data.get("category", "Other") category = receipt_data.get("category", "Other")
# Handle description field # Handle description field
description = receipt_data.get("description", "") description = receipt_data.get("description", "")
# Handle tax field # Handle tax field
tax = receipt_data.get("tax", receipt_data.get("tax_amount", 0.0)) tax = receipt_data.get("tax", receipt_data.get("tax_amount", 0.0))
try: try:
tax = float(tax) tax = float(tax)
except (ValueError, TypeError): except (ValueError, TypeError):
tax = 0.0 tax = 0.0
receipt = Receipt( receipt = Receipt(
id=file_id, id=file_id,
file_name=uploaded_files[file_id]["filename"], file_name=uploaded_files[file_id]["filename"],
@@ -370,35 +421,47 @@ async def match_specific_receipts(file_ids: List[str]):
tax=tax, tax=tax,
vendor=vendor, vendor=vendor,
category=category, category=category,
description=description description=description,
) )
receipts.append(receipt) receipts.append(receipt)
logger.info(f"Added receipt: {receipt.vendor} - ${receipt.amount}") logger.info(f"Added receipt: {receipt.vendor} - ${receipt.amount}")
except Exception as e: except Exception as e:
logger.warning(f"Error creating receipt object for {file_id}: {str(e)}") logger.warning(
f"Error creating receipt object for {file_id}: {str(e)}"
)
missing_files.append(f"{file_id} (error: {str(e)})") missing_files.append(f"{file_id} (error: {str(e)})")
else: else:
logger.warning(f"Receipt {file_id} not found in processed_receipts") logger.warning(f"Receipt {file_id} not found in processed_receipts")
missing_files.append(f"{file_id} (not found)") missing_files.append(f"{file_id} (not found)")
if missing_files: if missing_files:
logger.error(f"Missing files: {missing_files}") logger.error(f"Missing files: {missing_files}")
raise HTTPException(status_code=400, detail=f"Missing files: {missing_files}") raise HTTPException(
status_code=400, detail=f"Missing files: {missing_files}"
logger.info(f"Processing {len(receipts)} receipts against {len(transactions)} transactions") )
logger.info(
f"Processing {len(receipts)} receipts against {len(transactions)} transactions"
)
# Perform matching # Perform matching
try: try:
logger.info("Starting direct matching call (without ThreadPoolExecutor)") logger.info("Starting direct matching call (without ThreadPoolExecutor)")
logger.info(f"matching_engine type: {type(matching_engine)}") logger.info(f"matching_engine type: {type(matching_engine)}")
logger.info(f"matching_engine.process_matching type: {type(matching_engine.process_matching)}") logger.info(
f"matching_engine.process_matching type: {type(matching_engine.process_matching)}"
)
logger.info(f"receipts type: {type(receipts)}, length: {len(receipts)}") logger.info(f"receipts type: {type(receipts)}, length: {len(receipts)}")
logger.info(f"transactions type: {type(transactions)}, length: {len(transactions)}") logger.info(
f"transactions type: {type(transactions)}, length: {len(transactions)}"
)
matches = matching_engine.process_matching(receipts, transactions) matches = matching_engine.process_matching(receipts, transactions)
logger.info(f"Matching completed successfully. Found {len(matches)} matches") logger.info(
f"Matching completed successfully. Found {len(matches)} matches"
)
# Convert matches to response format # Convert matches to response format
match_responses = [] match_responses = []
for match in matches: for match in matches:
@@ -411,7 +474,7 @@ async def match_specific_receipts(file_ids: List[str]):
logger.info(f" receipt_amount: {match.receipt.amount}") logger.info(f" receipt_amount: {match.receipt.amount}")
logger.info(f" transaction_vendor: {match.transaction.vendor}") logger.info(f" transaction_vendor: {match.transaction.vendor}")
logger.info(f" transaction_amount: {match.transaction.amount}") logger.info(f" transaction_amount: {match.transaction.amount}")
match_response = MatchResponse( match_response = MatchResponse(
receipt_id=match.receipt.id, receipt_id=match.receipt.id,
transaction_id=match.transaction.id, transaction_id=match.transaction.id,
@@ -423,53 +486,62 @@ async def match_specific_receipts(file_ids: List[str]):
receipt_category=match.receipt.category, receipt_category=match.receipt.category,
receipt_tax_amount=match.receipt.tax, receipt_tax_amount=match.receipt.tax,
transaction_vendor=match.transaction.vendor, transaction_vendor=match.transaction.vendor,
transaction_amount=match.transaction.amount transaction_amount=match.transaction.amount,
) )
match_responses.append(match_response) match_responses.append(match_response)
logger.info(f"Successfully created MatchResponse for {match.receipt.vendor} -> {match.transaction.vendor}") logger.info(
f"Successfully created MatchResponse for {match.receipt.vendor} -> {match.transaction.vendor}"
)
logger.info(f"Formatted {len(match_responses)} match responses") logger.info(f"Formatted {len(match_responses)} match responses")
# Calculate statistics # Calculate statistics
if match_responses: if match_responses:
high_confidence = sum(1 for m in match_responses if m.confidence_score >= 0.8) high_confidence = sum(
1 for m in match_responses if m.confidence_score >= 0.8
)
low_confidence = len(match_responses) - high_confidence low_confidence = len(match_responses) - high_confidence
avg_score = sum(m.confidence_score for m in match_responses) / len(match_responses) avg_score = sum(m.confidence_score for m in match_responses) / len(
match_responses
)
else: else:
high_confidence = low_confidence = avg_score = 0 high_confidence = low_confidence = avg_score = 0
stats = { stats = {
"total": len(match_responses), "total": len(match_responses),
"high_confidence": high_confidence, "high_confidence": high_confidence,
"low_confidence": low_confidence, "low_confidence": low_confidence,
"avg_score": round(avg_score, 2) "avg_score": round(avg_score, 2),
} }
logger.info(f"Generated stats: {stats}") logger.info(f"Generated stats: {stats}")
logger.info(f"Match-specific completed successfully with {len(match_responses)} matches") logger.info(
f"Match-specific completed successfully with {len(match_responses)} matches"
return MatchingResponse(
matches=match_responses,
stats=stats
) )
return MatchingResponse(matches=match_responses, stats=stats)
except Exception as e: except Exception as e:
logger.error(f"Exception in matching section: {str(e)}") logger.error(f"Exception in matching section: {str(e)}")
logger.error(f"Exception type: {type(e)}") logger.error(f"Exception type: {type(e)}")
logger.error(f"Exception args: {e.args}") logger.error(f"Exception args: {e.args}")
logger.error(f"Traceback: {e.__traceback__}") logger.error(f"Traceback: {e.__traceback__}")
raise HTTPException(status_code=500, detail=f"Unexpected matching error: {str(e)}") raise HTTPException(
status_code=500, detail=f"Unexpected matching error: {str(e)}"
)
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Unexpected error in match_specific_receipts: {str(e)}") logger.error(f"Unexpected error in match_specific_receipts: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# ============================================================================ # ============================================================================
# RULES MANAGEMENT ENDPOINTS # RULES MANAGEMENT ENDPOINTS
# ============================================================================ # ============================================================================
@app.post("/rules") @app.post("/rules")
async def add_rule(request: RuleRequest): async def add_rule(request: RuleRequest):
""" """
@@ -480,16 +552,17 @@ async def add_rule(request: RuleRequest):
name=request.name, name=request.name,
condition=request.condition, condition=request.condition,
action=request.action, action=request.action,
source=request.source source=request.source,
) )
matching_engine.rules_engine.rules.append(new_rule) matching_engine.rules_engine.rules.append(new_rule)
return {"message": f"Rule '{request.name}' added successfully"} return {"message": f"Rule '{request.name}' added successfully"}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.get("/rules") @app.get("/rules")
async def get_rules(): async def get_rules():
""" """
@@ -498,19 +571,22 @@ async def get_rules():
try: try:
rules = [] rules = []
for rule in matching_engine.rules_engine.rules: for rule in matching_engine.rules_engine.rules:
rules.append({ rules.append(
"name": rule.name, {
"condition": rule.condition, "name": rule.name,
"action": rule.action, "condition": rule.condition,
"source": rule.source, "action": rule.action,
"status": rule.status "source": rule.source,
}) "status": rule.status,
}
)
return {"rules": rules} return {"rules": rules}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.delete("/rules/{rule_name}") @app.delete("/rules/{rule_name}")
async def delete_rule(rule_name: str): async def delete_rule(rule_name: str):
""" """
@@ -522,18 +598,20 @@ async def delete_rule(rule_name: str):
if rule.name == rule_name: if rule.name == rule_name:
del rules[i] del rules[i]
return {"message": f"Rule '{rule_name}' deleted successfully"} return {"message": f"Rule '{rule_name}' deleted successfully"}
raise HTTPException(status_code=404, detail=f"Rule '{rule_name}' not found") raise HTTPException(status_code=404, detail=f"Rule '{rule_name}' not found")
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# ============================================================================ # ============================================================================
# STATISTICS ENDPOINT # STATISTICS ENDPOINT
# ============================================================================ # ============================================================================
@app.get("/stats") @app.get("/stats")
async def get_stats(): async def get_stats():
""" """
@@ -544,12 +622,14 @@ async def get_stats():
"total_transactions": len(stored_transactions), "total_transactions": len(stored_transactions),
"total_receipts": len(processed_receipts), "total_receipts": len(processed_receipts),
"total_uploaded_files": len(uploaded_files), "total_uploaded_files": len(uploaded_files),
"rules_count": len(matching_engine.rules_engine.rules) "rules_count": len(matching_engine.rules_engine.rules),
} }
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8343) uvicorn.run(app, host="0.0.0.0", port=8343)
+36 -24
View File
@@ -1,46 +1,53 @@
from typing import List, Dict, Any from typing import Any, Dict, List
from datetime import datetime
from ai_matcher import AIMatcher from ai_matcher import AIMatcher
from ai_rules import AIRulesEngine from ai_rules import AIRulesEngine
from feedback_logger import FeedbackLogger from feedback_logger import FeedbackLogger
from models import Receipt, Transaction, Match from models import Match, Receipt, Transaction
class MatchingEngine: class MatchingEngine:
def __init__(self): def __init__(self):
self.ai_matcher = AIMatcher() self.ai_matcher = AIMatcher()
self.rules_engine = AIRulesEngine() self.rules_engine = AIRulesEngine()
self.feedback_logger = FeedbackLogger() self.feedback_logger = FeedbackLogger()
def process_matching(self, receipts: List[Receipt], transactions: List[Transaction]) -> List[Match]: def process_matching(
self, receipts: List[Receipt], transactions: List[Transaction]
) -> List[Match]:
# Get AI matches # Get AI matches
ai_matches = self.ai_matcher.match_receipts_to_transactions(receipts, transactions) ai_matches = self.ai_matcher.match_receipts_to_transactions(
receipts, transactions
)
# Apply rules and enhance matches # Apply rules and enhance matches
enhanced_matches = [] enhanced_matches = []
for match in ai_matches: for match in ai_matches:
enhanced_match = self._enhance_match_with_rules(match) enhanced_match = self._enhance_match_with_rules(match)
enhanced_matches.append(enhanced_match) enhanced_matches.append(enhanced_match)
return enhanced_matches return enhanced_matches
def _enhance_match_with_rules(self, match: Match) -> Match: def _enhance_match_with_rules(self, match: Match) -> Match:
rule_results = self.rules_engine.apply_rules(match.receipt, match.transaction) rule_results = self.rules_engine.apply_rules(match.receipt, match.transaction)
# Apply confidence boost from rules # Apply confidence boost from rules
if rule_results["confidence_boost"] > 0: if rule_results["confidence_boost"] > 0:
match.confidence_score = min(1.0, match.confidence_score + rule_results["confidence_boost"]) match.confidence_score = min(
1.0, match.confidence_score + rule_results["confidence_boost"]
)
# Auto-approve if rules say so # Auto-approve if rules say so
if rule_results["auto_approve"]: if rule_results["auto_approve"]:
match.confidence_score = 1.0 match.confidence_score = 1.0
match.match_reason += " (Auto-approved by rules)" match.match_reason += " (Auto-approved by rules)"
# Add tax analysis to match # Add tax analysis to match
if rule_results.get("tax_analysis"): if rule_results.get("tax_analysis"):
match.tax_analysis = rule_results["tax_analysis"] match.tax_analysis = rule_results["tax_analysis"]
return match return match
def approve_match(self, match: Match, user_id: str): def approve_match(self, match: Match, user_id: str):
# Log the approval # Log the approval
self.feedback_logger.log_override( self.feedback_logger.log_override(
@@ -48,9 +55,9 @@ class MatchingEngine:
original_match=f"AI Score: {match.confidence_score}", original_match=f"AI Score: {match.confidence_score}",
correction="Approved", correction="Approved",
reason="User approved match", reason="User approved match",
user_id=user_id user_id=user_id,
) )
def reject_match(self, match: Match, reason: str, user_id: str): def reject_match(self, match: Match, reason: str, user_id: str):
# Log the rejection # Log the rejection
self.feedback_logger.log_override( self.feedback_logger.log_override(
@@ -58,20 +65,25 @@ class MatchingEngine:
original_match=f"AI Score: {match.confidence_score}", original_match=f"AI Score: {match.confidence_score}",
correction="Rejected", correction="Rejected",
reason=reason, reason=reason,
user_id=user_id user_id=user_id,
) )
def get_matching_stats(self, matches: List[Match]) -> Dict[str, Any]: def get_matching_stats(self, matches: List[Match]) -> Dict[str, Any]:
if not matches: if not matches:
return {"total": 0, "high_confidence": 0, "low_confidence": 0, "avg_score": 0} return {
"total": 0,
"high_confidence": 0,
"low_confidence": 0,
"avg_score": 0,
}
high_confidence = len([m for m in matches if m.confidence_score >= 0.8]) high_confidence = len([m for m in matches if m.confidence_score >= 0.8])
low_confidence = len([m for m in matches if m.confidence_score < 0.8]) low_confidence = len([m for m in matches if m.confidence_score < 0.8])
avg_score = sum(m.confidence_score for m in matches) / len(matches) avg_score = sum(m.confidence_score for m in matches) / len(matches)
return { return {
"total": len(matches), "total": len(matches),
"high_confidence": high_confidence, "high_confidence": high_confidence,
"low_confidence": low_confidence, "low_confidence": low_confidence,
"avg_score": round(avg_score, 3) "avg_score": round(avg_score, 3),
} }
+16 -16
View File
@@ -1,16 +1,16 @@
groq>=0.5.0 groq
python-dotenv==1.0.0 python-dotenv
pandas==2.1.4 pandas
numpy==1.24.3 numpy
fastapi==0.104.1 fastapi
uvicorn==0.24.0 uvicorn
pydantic==2.5.0 pydantic
requests==2.31.0 requests
python-multipart==0.0.6 python-multipart
Pillow==10.0.1 Pillow
PyPDF2==3.0.1 PyPDF2
aiofiles==23.2.1 aiofiles
google-auth==2.23.4 google-auth
google-auth-oauthlib==1.1.0 google-auth-oauthlib
google-auth-httplib2==0.1.1 google-auth-httplib2
google-api-python-client==2.108.0 google-api-python-client
+75 -70
View File
@@ -1,13 +1,14 @@
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
from models import Receipt, Transaction, Address, Asset
import logging import logging
from typing import Any, Dict, Optional
from models import Address, Asset, Receipt, Transaction
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class TaxRulesEngine: class TaxRulesEngine:
"""Engine to handle tax calculations based on the four tax rules""" """Engine to handle tax calculations based on the four tax rules"""
# Provincial tax rates (simplified - in production, use a tax rate API) # Provincial tax rates (simplified - in production, use a tax rate API)
PROVINCIAL_TAX_RATES = { PROVINCIAL_TAX_RATES = {
"ON": 0.13, # Ontario HST "ON": 0.13, # Ontario HST
@@ -24,10 +25,10 @@ class TaxRulesEngine:
"NU": 0.05, # Nunavut "NU": 0.05, # Nunavut
"YT": 0.05, # Yukon "YT": 0.05, # Yukon
} }
def __init__(self): def __init__(self):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
def apply_sales_tax_rule(self, receipt: Receipt) -> Dict[str, Any]: def apply_sales_tax_rule(self, receipt: Receipt) -> Dict[str, Any]:
""" """
Sales Tax Rule: Apply correct sales tax based on billing vs shipping addresses Sales Tax Rule: Apply correct sales tax based on billing vs shipping addresses
@@ -35,43 +36,45 @@ class TaxRulesEngine:
try: try:
# Determine which address to use for tax calculation # Determine which address to use for tax calculation
tax_address = self._get_tax_address(receipt) tax_address = self._get_tax_address(receipt)
if not tax_address: if not tax_address:
return { return {
"success": False, "success": False,
"error": "No valid address found for tax calculation", "error": "No valid address found for tax calculation",
"calculated_tax": 0.0, "calculated_tax": 0.0,
"tax_rate": 0.0 "tax_rate": 0.0,
} }
# Get tax rate for the province # Get tax rate for the province
tax_rate = self.PROVINCIAL_TAX_RATES.get(tax_address.province, 0.0) tax_rate = self.PROVINCIAL_TAX_RATES.get(tax_address.province, 0.0)
# Calculate tax amount # Calculate tax amount
calculated_tax = receipt.amount * tax_rate calculated_tax = receipt.amount * tax_rate
return { return {
"success": True, "success": True,
"calculated_tax": calculated_tax, "calculated_tax": calculated_tax,
"tax_rate": tax_rate, "tax_rate": tax_rate,
"tax_address": tax_address.province, "tax_address": tax_address.province,
"rule_applied": "Sales Tax Rule" "rule_applied": "Sales Tax Rule",
} }
except Exception as e: except Exception as e:
self.logger.error(f"Error applying sales tax rule: {str(e)}") self.logger.error(f"Error applying sales tax rule: {str(e)}")
return { return {
"success": False, "success": False,
"error": str(e), "error": str(e),
"calculated_tax": 0.0, "calculated_tax": 0.0,
"tax_rate": 0.0 "tax_rate": 0.0,
} }
def _get_tax_address(self, receipt: Receipt) -> Optional[Address]: def _get_tax_address(self, receipt: Receipt) -> Optional[Address]:
"""Determine which address to use for tax calculation""" """Determine which address to use for tax calculation"""
# Rule: Use shipping address if different from billing, otherwise use billing # Rule: Use shipping address if different from billing, otherwise use billing
if receipt.shipping_address and receipt.billing_address: if receipt.shipping_address and receipt.billing_address:
if self._addresses_different(receipt.billing_address, receipt.shipping_address): if self._addresses_different(
receipt.billing_address, receipt.shipping_address
):
return receipt.shipping_address return receipt.shipping_address
else: else:
return receipt.billing_address return receipt.billing_address
@@ -81,14 +84,18 @@ class TaxRulesEngine:
return receipt.billing_address return receipt.billing_address
else: else:
return None return None
def _addresses_different(self, billing: Address, shipping: Address) -> bool: def _addresses_different(self, billing: Address, shipping: Address) -> bool:
"""Check if billing and shipping addresses are different""" """Check if billing and shipping addresses are different"""
return (billing.province != shipping.province or return (
billing.city != shipping.city or billing.province != shipping.province
billing.postal_code != shipping.postal_code) or billing.city != shipping.city
or billing.postal_code != shipping.postal_code
def apply_fx_rule(self, receipt: Receipt, transaction: Transaction) -> Dict[str, Any]: )
def apply_fx_rule(
self, receipt: Receipt, transaction: Transaction
) -> Dict[str, Any]:
""" """
Foreign Exchange Rule: Handle currency mismatches Foreign Exchange Rule: Handle currency mismatches
""" """
@@ -96,7 +103,7 @@ class TaxRulesEngine:
# Check for currency mismatch # Check for currency mismatch
if receipt.currency != transaction.currency: if receipt.currency != transaction.currency:
fx_discrepancy = abs(receipt.amount - abs(transaction.amount)) fx_discrepancy = abs(receipt.amount - abs(transaction.amount))
return { return {
"success": True, "success": True,
"fx_discrepancy": fx_discrepancy, "fx_discrepancy": fx_discrepancy,
@@ -105,26 +112,28 @@ class TaxRulesEngine:
"receipt_amount": receipt.amount, "receipt_amount": receipt.amount,
"transaction_amount": abs(transaction.amount), "transaction_amount": abs(transaction.amount),
"requires_manual_review": True, "requires_manual_review": True,
"rule_applied": "Foreign Exchange Rule" "rule_applied": "Foreign Exchange Rule",
} }
else: else:
return { return {
"success": True, "success": True,
"fx_discrepancy": 0.0, "fx_discrepancy": 0.0,
"requires_manual_review": False, "requires_manual_review": False,
"rule_applied": "No FX Rule (same currency)" "rule_applied": "No FX Rule (same currency)",
} }
except Exception as e: except Exception as e:
self.logger.error(f"Error applying FX rule: {str(e)}") self.logger.error(f"Error applying FX rule: {str(e)}")
return { return {
"success": False, "success": False,
"error": str(e), "error": str(e),
"fx_discrepancy": 0.0, "fx_discrepancy": 0.0,
"requires_manual_review": False "requires_manual_review": False,
} }
def calculate_straight_line_depreciation(self, asset: Asset, year: int) -> Dict[str, Any]: def calculate_straight_line_depreciation(
self, asset: Asset, year: int
) -> Dict[str, Any]:
""" """
Straight-Line Depreciation for accounting purposes Straight-Line Depreciation for accounting purposes
""" """
@@ -133,28 +142,26 @@ class TaxRulesEngine:
return { return {
"success": False, "success": False,
"error": f"Year {year} exceeds useful life of {asset.useful_life_years} years", "error": f"Year {year} exceeds useful life of {asset.useful_life_years} years",
"depreciation": 0.0 "depreciation": 0.0,
} }
# Straight-line formula: (Cost - Residual Value) / Useful Life # Straight-line formula: (Cost - Residual Value) / Useful Life
annual_depreciation = (asset.purchase_amount - asset.residual_value) / asset.useful_life_years annual_depreciation = (
asset.purchase_amount - asset.residual_value
) / asset.useful_life_years
return { return {
"success": True, "success": True,
"depreciation": annual_depreciation, "depreciation": annual_depreciation,
"book_value": asset.purchase_amount - (annual_depreciation * year), "book_value": asset.purchase_amount - (annual_depreciation * year),
"method": "Straight-Line", "method": "Straight-Line",
"rule_applied": "Depreciation Rule (Accounting)" "rule_applied": "Depreciation Rule (Accounting)",
} }
except Exception as e: except Exception as e:
self.logger.error(f"Error calculating straight-line depreciation: {str(e)}") self.logger.error(f"Error calculating straight-line depreciation: {str(e)}")
return { return {"success": False, "error": str(e), "depreciation": 0.0}
"success": False,
"error": str(e),
"depreciation": 0.0
}
def calculate_cca_depreciation(self, asset: Asset, year: int) -> Dict[str, Any]: def calculate_cca_depreciation(self, asset: Asset, year: int) -> Dict[str, Any]:
""" """
CCA (Capital Cost Allowance) Depreciation for tax purposes CCA (Capital Cost Allowance) Depreciation for tax purposes
@@ -164,40 +171,36 @@ class TaxRulesEngine:
return { return {
"success": False, "success": False,
"error": "Year must be at least 1", "error": "Year must be at least 1",
"depreciation": 0.0 "depreciation": 0.0,
} }
# CCA uses declining balance method # CCA uses declining balance method
book_value = asset.purchase_amount book_value = asset.purchase_amount
total_depreciation = 0.0 total_depreciation = 0.0
for current_year in range(1, year + 1): for current_year in range(1, year + 1):
# CCA is calculated on the declining balance # CCA is calculated on the declining balance
cca_amount = book_value * asset.cca_rate cca_amount = book_value * asset.cca_rate
book_value -= cca_amount book_value -= cca_amount
total_depreciation += cca_amount total_depreciation += cca_amount
# Stop if book value reaches residual value # Stop if book value reaches residual value
if book_value <= asset.residual_value: if book_value <= asset.residual_value:
break break
return { return {
"success": True, "success": True,
"depreciation": cca_amount, # Current year depreciation "depreciation": cca_amount, # Current year depreciation
"total_depreciation": total_depreciation, "total_depreciation": total_depreciation,
"book_value": max(book_value, asset.residual_value), "book_value": max(book_value, asset.residual_value),
"method": "CCA Declining Balance", "method": "CCA Declining Balance",
"rule_applied": "Depreciation Rule (Tax)" "rule_applied": "Depreciation Rule (Tax)",
} }
except Exception as e: except Exception as e:
self.logger.error(f"Error calculating CCA depreciation: {str(e)}") self.logger.error(f"Error calculating CCA depreciation: {str(e)}")
return { return {"success": False, "error": str(e), "depreciation": 0.0}
"success": False,
"error": str(e),
"depreciation": 0.0
}
def apply_meals_entertainment_rule(self, receipt: Receipt) -> Dict[str, Any]: def apply_meals_entertainment_rule(self, receipt: Receipt) -> Dict[str, Any]:
""" """
Meals & Entertainment Tax Deduction Rule Meals & Entertainment Tax Deduction Rule
@@ -208,36 +211,38 @@ class TaxRulesEngine:
"success": True, "success": True,
"tax_deduction": receipt.amount, "tax_deduction": receipt.amount,
"accounting_deduction": receipt.amount, "accounting_deduction": receipt.amount,
"rule_applied": "No M&E Rule (not meals/entertainment)" "rule_applied": "No M&E Rule (not meals/entertainment)",
} }
# For tax purposes: 50% deductible # For tax purposes: 50% deductible
tax_deduction = receipt.amount * 0.5 tax_deduction = receipt.amount * 0.5
# For accounting purposes: 100% deductible # For accounting purposes: 100% deductible
accounting_deduction = receipt.amount accounting_deduction = receipt.amount
# Sales tax is fully deductible for accounting # Sales tax is fully deductible for accounting
tax_on_meal = receipt.tax tax_on_meal = receipt.tax
return { return {
"success": True, "success": True,
"tax_deduction": tax_deduction, "tax_deduction": tax_deduction,
"accounting_deduction": accounting_deduction, "accounting_deduction": accounting_deduction,
"tax_on_meal": tax_on_meal, "tax_on_meal": tax_on_meal,
"rule_applied": "Meals & Entertainment Rule" "rule_applied": "Meals & Entertainment Rule",
} }
except Exception as e: except Exception as e:
self.logger.error(f"Error applying meals & entertainment rule: {str(e)}") self.logger.error(f"Error applying meals & entertainment rule: {str(e)}")
return { return {
"success": False, "success": False,
"error": str(e), "error": str(e),
"tax_deduction": 0.0, "tax_deduction": 0.0,
"accounting_deduction": 0.0 "accounting_deduction": 0.0,
} }
def apply_all_tax_rules(self, receipt: Receipt, transaction: Transaction = None) -> Dict[str, Any]: def apply_all_tax_rules(
self, receipt: Receipt, transaction: Transaction = None
) -> Dict[str, Any]:
""" """
Apply all tax rules to a receipt Apply all tax rules to a receipt
""" """
@@ -246,26 +251,26 @@ class TaxRulesEngine:
"rules_applied": [], "rules_applied": [],
"sales_tax": {}, "sales_tax": {},
"fx_analysis": {}, "fx_analysis": {},
"meals_entertainment": {} "meals_entertainment": {},
} }
# Apply Sales Tax Rule # Apply Sales Tax Rule
sales_tax_result = self.apply_sales_tax_rule(receipt) sales_tax_result = self.apply_sales_tax_rule(receipt)
results["sales_tax"] = sales_tax_result results["sales_tax"] = sales_tax_result
if sales_tax_result["success"]: if sales_tax_result["success"]:
results["rules_applied"].append("Sales Tax Rule") results["rules_applied"].append("Sales Tax Rule")
# Apply FX Rule (if transaction provided) # Apply FX Rule (if transaction provided)
if transaction: if transaction:
fx_result = self.apply_fx_rule(receipt, transaction) fx_result = self.apply_fx_rule(receipt, transaction)
results["fx_analysis"] = fx_result results["fx_analysis"] = fx_result
if fx_result["success"]: if fx_result["success"]:
results["rules_applied"].append("Foreign Exchange Rule") results["rules_applied"].append("Foreign Exchange Rule")
# Apply Meals & Entertainment Rule # Apply Meals & Entertainment Rule
me_result = self.apply_meals_entertainment_rule(receipt) me_result = self.apply_meals_entertainment_rule(receipt)
results["meals_entertainment"] = me_result results["meals_entertainment"] = me_result
if me_result["success"]: if me_result["success"]:
results["rules_applied"].append("Meals & Entertainment Rule") results["rules_applied"].append("Meals & Entertainment Rule")
return results return results