first commit

This commit is contained in:
bolade
2025-08-05 22:25:51 +01:00
commit 5b3c066cea
14 changed files with 2670 additions and 0 deletions
+229
View File
@@ -0,0 +1,229 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be added to the global gitignore or merged into this project gitignore. For a PyCharm
# project, it is recommended to include the following files:
# .idea/
# *.iml
# *.ipr
# *.iws
# VS Code
.vscode/
# macOS
.DS_Store
.AppleDouble
.LSOverride
# Windows
Thumbs.db
ehthumbs.db
Desktop.ini
# Linux
*~
# Temporary files
*.tmp
*.temp
*.swp
*.swo
*~
# Log files
*.log
# Database files
*.db
*.sqlite
*.sqlite3
# Configuration files with sensitive data
config.ini
secrets.json
.env.local
.env.production
# Test files
test_*.py
*_test.py
tests/
# Documentation
docs/
*.md
!README.md
# IDE files
.idea/
.vscode/
*.sublime-*
.atom/
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
uploads/
chequing statement.csv
test_images/
.cursorrules.md
+262
View File
@@ -0,0 +1,262 @@
# AI Bookkeeper - Data Science Engine
AI-powered receipt-to-transaction matching engine using Groq LLM. This is a **Data Science Engine** that provides intelligent matching capabilities for backend applications.
## 🎯 Purpose
This Data Science Engine receives QuickBooks transaction data from backend applications and provides:
- **AI-powered receipt processing** (OCR and data extraction)
- **Intelligent receipt-transaction matching** with confidence scores
- **Configurable AI rules** for business logic
- **Feedback logging** for continuous improvement
- **RESTful API** for easy integration
## 🚀 Quick Start
### 1. Install Dependencies
```bash
pip install -r requirements.txt
```
### 2. Configure API Keys
Create a `.env` file in the project root with your Groq API key:
```bash
# Create .env file
echo "GROQ_API_KEY=your_actual_groq_api_key_here" > .env
```
**Important**: Get your API key from [Groq Console](https://console.groq.com/)
### 3. Start the Server
```bash
# Option 1: Using the main script
python main.py
# Option 2: Using uvicorn directly
uvicorn main:app --host 0.0.0.0 --port 8343 --reload
```
### 4. Access API Documentation
- **Swagger UI**: http://localhost:8343/docs
- **ReDoc**: http://localhost:8343/redoc
## 📋 API Endpoints
### Transaction Import
- `POST /transactions/import/csv` - Import transactions from CSV file
- `POST /transactions/import/image` - Import transactions from image/PDF
### Receipt Processing
- `POST /upload-multiple` - Upload multiple receipt documents
- `POST /process/{file_id}` - Extract data from uploaded documents
### AI Matching Engine
- `POST /match-specific` - Match specific receipts to transactions using AI
### AI Rules Management
- `POST /rules` - Add new AI rules
- `GET /rules` - List all active rules
- `DELETE /rules/{rule_name}` - Delete rules
### System Monitoring
- `GET /stats` - Get system statistics and performance metrics
- `GET /` - Health check endpoint
## 🔧 Core Components
### **AIMatcher** (`ai_matcher.py`)
- Uses Groq LLM to compare receipts and transactions
- Provides confidence scores and reasoning
- Configurable matching criteria (amount, date, vendor)
- Rate limiting to prevent API quota exhaustion
### **AIRulesEngine** (`ai_rules.py`)
- Applies business rules for auto-approval and categorization
- Configurable rule conditions and actions
- Supports system and user-generated rules
- Safe condition evaluation with proper error handling
### **DocumentProcessor** (`document_processor.py`)
- AI-powered receipt data extraction using Groq vision model
- Supports PDF and image formats
- Robust JSON parsing with error handling
- Extracts vendor, amount, date, tax, and category information
### **MatchingEngine** (`matching_engine.py`)
- Main orchestrator combining all components
- Handles the complete matching workflow
- Provides statistics and feedback logging
- Configurable confidence thresholds
### **FeedbackLogger** (`feedback_logger.py`)
- Tracks manual overrides for AI training
- Maintains audit trail of user decisions
- Enables continuous model improvement
## 📊 Configuration
Edit `config.py` to adjust:
- **Confidence threshold** (default: 0.3)
- **Date tolerance days** (default: 7)
- **Amount tolerance percent** (default: 5%)
- **Groq API key** (from environment variable)
## 🔄 Integration Workflow
### 1. Import Transactions
```bash
# Import from CSV
curl -X POST -F "file=@transactions.csv" http://localhost:8343/transactions/import/csv
# Import from image
curl -X POST -F "file=@statement.jpg" http://localhost:8343/transactions/import/image
```
### 2. Upload and Process Receipts
```bash
# Upload receipts
curl -X POST -F "files=@receipt1.jpg" -F "files=@receipt2.jpg" http://localhost:8343/upload-multiple
# Process a specific receipt
curl -X POST http://localhost:8343/process/{file_id}
```
### 3. AI Matching
```bash
# Match specific receipts
curl -X POST -H "Content-Type: application/json" \
-d '["file_id_1", "file_id_2"]' \
http://localhost:8343/match-specific
```
### 4. Check Results
```bash
# Get system stats
curl http://localhost:8343/stats
# View AI rules
curl http://localhost:8343/rules
```
## 🎯 Key Features
- **AI-powered matching** with confidence scores
- **Rule-based auto-approval** and categorization
- **Feedback logging** for continuous improvement
- **Configurable matching parameters**
- **RESTful JSON API** for easy backend integration
- **Comprehensive error handling**
- **Rate limiting** to prevent API quota exhaustion
- **Robust JSON parsing** for AI responses
## 📝 Data Formats
### Transaction Input (CSV)
```csv
Date,Description,Amount,Category
2024-01-15,Starbucks Coffee,12.50,Food & Dining
2024-01-16,Office Supplies,45.99,Office
```
### Receipt Processing Output
```json
{
"vendor": "Starbucks",
"total_amount": 12.50,
"tax_amount": 1.25,
"date": "2024-01-15",
"category": "Food & Dining",
"confidence": 0.95,
"extraction_success": true
}
```
### Match Result Output
```json
{
"receipt_id": "uuid",
"transaction_id": "transaction_123",
"confidence_score": 0.95,
"match_reason": "Same vendor, minor date difference (Auto-approved by rules)",
"receipt_vendor": "Starbucks",
"receipt_amount": 12.50,
"transaction_vendor": "STARBUCKS",
"transaction_amount": 12.50
}
```
## 🔍 AI Matching Criteria
The engine uses multiple criteria for matching:
1. **Amount Similarity** - Compares receipt and transaction amounts (5% tolerance)
2. **Date Proximity** - Checks date closeness (7-day tolerance)
3. **Vendor Matching** - AI-powered vendor name comparison using Groq LLM
4. **Rule-based Auto-approval** - Automatic approval for exact matches and high-confidence matches
## 🛠️ Development
### Project Structure
```
├── main.py # FastAPI application entry point
├── ai_matcher.py # AI-powered matching logic
├── ai_rules.py # Business rules engine
├── document_processor.py # Receipt data extraction
├── matching_engine.py # Main matching orchestrator
├── feedback_logger.py # User feedback tracking
├── models.py # Pydantic data models
├── api_models.py # API request/response models
├── config.py # Configuration settings
├── requirements.txt # Python dependencies
└── test_images/ # Test image files
```
### Running Tests
```bash
# Test the server
curl http://localhost:8343/
# Test stats endpoint
curl http://localhost:8343/stats
# Test rules endpoint
curl http://localhost:8343/rules
```
## 🚀 Production Deployment
For production deployment:
- Replace in-memory storage with a database (PostgreSQL recommended)
- Configure proper authentication and authorization
- Set up monitoring and logging (ELK stack recommended)
- Use environment variables for all configuration
- Implement proper error handling and retries
- Set up rate limiting and API quotas
- Configure CORS for frontend integration
- Use HTTPS in production
## 📞 Support
This Data Science Engine is designed to be integrated with backend applications that handle:
- QuickBooks API connections
- User interface and workflows
- Data persistence and management
- External integrations
The engine focuses purely on AI/ML capabilities and provides a clean JSON API for backend integration.
## 🔧 Troubleshooting
### Common Issues
1. **API Key Error**: Ensure `GROQ_API_KEY` is set in your `.env` file
2. **Port Already in Use**: Kill existing process with `pkill -f "python main.py"`
3. **Import Errors**: Install dependencies with `pip install -r requirements.txt`
4. **Rate Limiting**: The system includes built-in rate limiting to prevent API quota exhaustion
### Logs
Check the application logs for detailed error information:
```bash
tail -f app.log
```
+244
View File
@@ -0,0 +1,244 @@
import groq
from datetime import datetime, timedelta
from typing import List, Tuple
import config
from models import Receipt, Transaction, Match
import time
import logging
import asyncio
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AIMatcher:
def __init__(self):
self.client = groq.Groq(api_key=config.GROQ_API_KEY)
self.model = "llama3-8b-8192"
self.max_retries = 3
self.retry_delay = 2 # seconds - increased for rate limiting
self.rate_limit_delay = 1.0 # seconds between API calls
self.last_api_call = 0
def match_receipts_to_transactions(self, receipts: List[Receipt], transactions: List[Transaction]) -> List[Match]:
"""Match receipts to transactions using AI"""
logger.info(f"Starting AI matching for {len(receipts)} receipts against {len(transactions)} transactions")
matches = []
for i, receipt in enumerate(receipts):
logger.info(f"Processing receipt {i+1}/{len(receipts)}: {receipt.vendor} - ${receipt.amount}")
# Rate limiting
self._rate_limit()
# Get the BEST match for this receipt (highest confidence score)
best_match = self._find_best_match(receipt, transactions)
if best_match:
matches.append(best_match)
logger.info(f"Found match: {best_match.confidence_score:.3f} - {best_match.match_reason}")
else:
logger.warning(f"No match found for receipt: {receipt.vendor} - ${receipt.amount}")
# Sort by confidence score (highest first)
matches = sorted(matches, key=lambda x: x.confidence_score, reverse=True)
logger.info(f"AI matching completed. Found {len(matches)} matches")
return matches
def _rate_limit(self):
"""Implement rate limiting to avoid API quota exhaustion"""
current_time = time.time()
time_since_last_call = current_time - self.last_api_call
if time_since_last_call < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - time_since_last_call
logger.debug(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time)
self.last_api_call = time.time()
def _find_best_match(self, receipt: Receipt, transactions: List[Transaction]) -> Match:
"""Find the BEST match for a receipt (highest confidence score)"""
candidates = self._filter_candidates(receipt, transactions)
if not candidates:
logger.warning(f"No candidates found for receipt: {receipt.vendor} - ${receipt.amount}")
return None
logger.info(f"Found {len(candidates)} candidates for receipt: {receipt.vendor}")
best_match = None
highest_score = 0
for transaction in candidates:
score, reason = self._calculate_match_score(receipt, transaction)
logger.debug(f"Score {score:.3f} for transaction {transaction.vendor}: {reason}")
# Keep the match with the highest score, regardless of how low it is
if score > highest_score:
highest_score = score
best_match = Match(receipt, transaction, score, reason)
return best_match
def _filter_candidates(self, receipt: Receipt, transactions: List[Transaction]) -> List[Transaction]:
"""Filter transactions to create a reasonable candidate list"""
candidates = []
amount_threshold = receipt.amount * 2.0 # 200% threshold - very inclusive
for transaction in transactions:
# Use absolute value for transaction amount comparison
transaction_amount_abs = abs(transaction.amount)
# Only exclude transactions with obviously different amounts
if abs(receipt.amount - transaction_amount_abs) <= amount_threshold:
candidates.append(transaction)
logger.debug(f"Filtered {len(transactions)} transactions to {len(candidates)} candidates")
return candidates
def _calculate_match_score(self, receipt: Receipt, transaction: Transaction) -> Tuple[float, str]:
"""Calculate match score using AI"""
# Calculate differences for the AI to consider
date_diff = abs((receipt.receipt_date - transaction.transaction_date).days)
transaction_amount_abs = abs(transaction.amount)
amount_diff = abs(receipt.amount - transaction_amount_abs)
amount_percent_diff = (amount_diff / receipt.amount) * 100 if receipt.amount > 0 else 0
prompt = f"""
Compare this receipt with this transaction and provide a confidence score (0-1) and brief reason.
Receipt: {receipt.vendor}, ${receipt.amount}, {receipt.receipt_date.strftime('%Y-%m-%d')}
Receipt Description: {receipt.description}
Receipt Category: {receipt.category}
Transaction: {transaction.vendor}, ${transaction.amount} (absolute: ${transaction_amount_abs}), {transaction.transaction_date.strftime('%Y-%m-%d')}
Transaction Notes: {transaction.notes}
Differences:
- Date difference: {date_diff} days
- Amount difference: ${amount_diff} ({amount_percent_diff:.1f}%)
- Vendor comparison: "{receipt.vendor}" vs "{transaction.vendor}"
- Description/Notes comparison: "{receipt.description}" vs "{transaction.notes}"
- Category: {receipt.category}
Score this potential match based on how likely it is the correct match:
- Perfect matches (same vendor, amount, date): 0.95-1.0
- High confidence (minor differences): 0.8-0.94
- Medium confidence (moderate differences): 0.6-0.79
- Low confidence (significant differences): 0.4-0.59
- Very low confidence (major differences): 0.2-0.39
- Minimal similarity: 0.1-0.19
- No meaningful similarity: 0.0-0.09
Consider description and category similarity in your scoring.
IMPORTANT: Return ONLY the score and reason separated by a pipe character.
Format: [score]|[reason]
Example: 0.85|Same vendor, same amount, 2 days apart
"""
for attempt in range(self.max_retries):
try:
result = self._call_groq_api_with_timeout(prompt, timeout=30) # Increased timeout
# Parse the result - handle multiple formats
score, reason = self._parse_ai_response(result)
logger.debug(f"AI Response: {result}")
logger.debug(f"Parsed: score={score}, reason={reason}")
return score, reason
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed for receipt {receipt.id}: {str(e)}")
if attempt < self.max_retries - 1:
# Exponential backoff for rate limiting
sleep_time = self.retry_delay * (2 ** attempt)
logger.info(f"Waiting {sleep_time} seconds before retry...")
time.sleep(sleep_time)
else:
logger.error(f"All attempts failed for receipt {receipt.id}")
return 0.0, f"AI error after {self.max_retries} attempts: {str(e)}"
def _parse_ai_response(self, result: str) -> Tuple[float, str]:
"""Parse AI response with robust error handling"""
result = result.strip()
logger.debug(f"Parsing AI response: {result}")
# Try to find score in various formats
if '|' in result:
parts = result.split('|')
logger.debug(f"Split response into {len(parts)} parts: {parts}")
# Look for a numeric score in any part
for i, part in enumerate(parts):
part = part.strip()
try:
# Remove any non-numeric characters except decimal point
score_str_clean = ''.join(c for c in part if c.isdigit() or c == '.')
if score_str_clean:
score = float(score_str_clean)
if 0 <= score <= 1: # Valid confidence score
# Get reason from other parts
reason_parts = [p.strip() for j, p in enumerate(parts) if j != i and p.strip()]
reason = ' | '.join(reason_parts) if reason_parts else "Score extracted"
logger.debug(f"Found score {score} in part {i}, reason: {reason}")
return score, reason
except ValueError:
continue
# Try to extract just a number from the response
try:
import re
numbers = re.findall(r'\d+\.?\d*', result)
if numbers:
for num_str in numbers:
score = float(num_str)
if 0 <= score <= 1: # Valid confidence score
logger.debug(f"Extracted score {score} from response")
return score, f"Extracted from response: {result[:50]}..."
except (ValueError, IndexError):
pass
# Fallback - try to find any number and normalize it
try:
import re
numbers = re.findall(r'\d+\.?\d*', result)
if numbers:
score = float(numbers[0])
# Normalize to 0-1 range if it's a percentage or other scale
if score > 1:
score = score / 100 # Assume percentage
score = max(0, min(1, score)) # Clamp to 0-1
logger.debug(f"Normalized score {score} from response")
return score, f"Normalized from response: {result[:50]}..."
except (ValueError, IndexError):
pass
# Final fallback
logger.warning(f"Could not parse AI response: {result}")
return 0.0, f"Unparseable response: {result[:50]}..."
def _call_groq_api_with_timeout(self, prompt: str, timeout: int = 15) -> str:
"""Make API call with timeout and retry logic"""
import concurrent.futures
def api_call():
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=200,
temperature=0.1
)
return response.choices[0].message.content.strip()
except Exception as e:
raise e
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(api_call)
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
raise Exception(f"API call timed out after {timeout} seconds")
except Exception as e:
raise e
+126
View File
@@ -0,0 +1,126 @@
from dataclasses import dataclass
from typing import Dict, Any, List
import config
from models import Receipt, Transaction
from tax_rules_engine import TaxRulesEngine
@dataclass
class AIRule:
name: str
condition: str
action: str
source: str
status: str = "active"
class AIRulesEngine:
def __init__(self):
self.rules: List[AIRule] = []
self.tax_rules_engine = TaxRulesEngine()
self._load_default_rules()
def _load_default_rules(self):
self.rules = [
AIRule("exact_amount_match", "amount_diff <= 0.01", "auto_approve", "system"),
AIRule("same_vendor_same_date", "vendor_match and date_diff <= 1", "high_confidence", "system"),
AIRule("gas_station_pattern", "vendor_contains_gas_or_fuel", "categorize_transport", "system"),
# Tax-related rules
AIRule("fx_currency_mismatch", "currency_mismatch", "flag_fx_review", "tax_system"),
AIRule("meals_entertainment", "is_meals_entertainment", "apply_me_tax_rule", "tax_system"),
AIRule("provincial_tax_calculation", "has_address_info", "calculate_provincial_tax", "tax_system")
]
def apply_rules(self, receipt: Receipt, transaction: Transaction) -> Dict[str, Any]:
results = {"auto_approve": False, "confidence_boost": 0, "category": None, "tax_analysis": {}}
for rule in self.rules:
if rule.status != "active":
continue
if self._evaluate_condition(rule.condition, receipt, transaction):
self._execute_action(rule.action, results, receipt, transaction)
return results
def _evaluate_condition(self, condition: str, receipt: Receipt, transaction: Transaction) -> bool:
"""Safely evaluate rule conditions without using eval()"""
amount_diff = abs(receipt.amount - abs(transaction.amount))
date_diff = abs((receipt.receipt_date - transaction.transaction_date).days)
vendor_match = receipt.vendor.lower() in transaction.vendor.lower() or transaction.vendor.lower() in receipt.vendor.lower()
vendor_lower = receipt.vendor.lower()
vendor_contains_gas_or_fuel = 'gas' in vendor_lower or 'fuel' in vendor_lower
# Tax-related conditions
currency_mismatch = receipt.currency != transaction.currency
is_meals_entertainment = receipt.is_meals_entertainment
has_address_info = receipt.billing_address is not None or receipt.shipping_address is not None
# Handle specific condition types safely
if condition == "amount_diff <= 0.01":
return amount_diff <= 0.01
elif condition == "vendor_match and date_diff <= 1":
return vendor_match and date_diff <= 1
elif condition == "vendor_contains_gas_or_fuel":
return vendor_contains_gas_or_fuel
elif condition == "currency_mismatch":
return currency_mismatch
elif condition == "is_meals_entertainment":
return is_meals_entertainment
elif condition == "has_address_info":
return has_address_info
else:
# For any other conditions, try to evaluate them safely
try:
# Only allow safe operations
safe_globals = {
"amount_diff": amount_diff,
"date_diff": date_diff,
"vendor_match": vendor_match,
"vendor_contains_gas_or_fuel": vendor_contains_gas_or_fuel,
"currency_mismatch": currency_mismatch,
"is_meals_entertainment": is_meals_entertainment,
"has_address_info": has_address_info,
"receipt": receipt,
"transaction": transaction,
"abs": abs,
"len": len,
"min": min,
"max": max,
"sum": sum,
"round": round
}
return eval(condition, safe_globals, {})
except (SyntaxError, NameError, TypeError) as e:
print(f"Warning: Invalid condition '{condition}': {e}")
return False
def _execute_action(self, action: str, results: Dict[str, Any], receipt: Receipt, transaction: Transaction):
if action == "auto_approve":
results["auto_approve"] = True
elif action == "high_confidence":
results["confidence_boost"] += 0.2
elif action == "categorize_transport":
results["category"] = "Transportation"
elif action == "flag_fx_review":
# Apply FX rule and flag for review
fx_result = self.tax_rules_engine.apply_fx_rule(receipt, transaction)
results["tax_analysis"]["fx"] = fx_result
if fx_result.get("requires_manual_review", False):
results["confidence_boost"] -= 0.1 # Reduce confidence for FX issues
elif action == "apply_me_tax_rule":
# Apply meals & entertainment rule
me_result = self.tax_rules_engine.apply_meals_entertainment_rule(receipt)
results["tax_analysis"]["meals_entertainment"] = me_result
elif action == "calculate_provincial_tax":
# Calculate provincial tax
tax_result = self.tax_rules_engine.apply_sales_tax_rule(receipt)
results["tax_analysis"]["sales_tax"] = tax_result
def add_rule(self, rule: AIRule):
self.rules.append(rule)
def remove_rule(self, rule_name: str):
self.rules = [r for r in self.rules if r.name != rule_name]
def apply_tax_rules(self, receipt: Receipt, transaction: Transaction = None) -> Dict[str, Any]:
"""Apply all tax rules to a receipt/transaction pair"""
return self.tax_rules_engine.apply_all_tax_rules(receipt, transaction)
+120
View File
@@ -0,0 +1,120 @@
from pydantic import BaseModel
from datetime import datetime
from typing import List, Optional
class AddressRequest(BaseModel):
province: str
city: str
postal_code: str
country: str = "Canada"
class ReceiptRequest(BaseModel):
id: str
file_name: str
upload_date: datetime
receipt_date: datetime
amount: float
tax: float
vendor: str
category: str
description: str
# Tax rule fields
billing_address: Optional[AddressRequest] = None
shipping_address: Optional[AddressRequest] = None
currency: str = "CAD"
is_meals_entertainment: bool = False
class TransactionRequest(BaseModel):
id: str
transaction_date: datetime
amount: float
vendor: str
notes: str
# Tax rule fields
currency: str = "CAD"
fx_rate: Optional[float] = None
class AssetRequest(BaseModel):
id: str
name: str
purchase_date: datetime
purchase_amount: float
useful_life_years: int
residual_value: float
cca_rate: float
asset_class: str
class MatchingRequest(BaseModel):
receipt_ids: List[str]
transaction_ids: List[str]
class MatchResponse(BaseModel):
receipt_id: str
transaction_id: str
confidence_score: float
match_reason: str
tax_analysis: Optional[dict] = None
# Currency information
receipt_currency: str = "CAD"
transaction_currency: str = "CAD"
currency_match: bool = True
class MatchingResponse(BaseModel):
matches: List[MatchResponse]
stats: dict
class ApprovalRequest(BaseModel):
match_id: str
approved: bool
reason: Optional[str] = None
class RuleRequest(BaseModel):
name: str
condition: str
action: str
source: str = "user"
class DocumentUploadResponse(BaseModel):
file_id: str
filename: str
upload_date: datetime
status: str
class DocumentProcessResponse(BaseModel):
file_id: str
extraction_success: bool
vendor: Optional[str] = None
description: Optional[str] = None
total_amount: Optional[float] = None
tax_amount: Optional[float] = None
date: Optional[str] = None
category: Optional[str] = None
confidence: Optional[float] = None
error: Optional[str] = None
# New tax-related models
class TaxCalculationRequest(BaseModel):
receipt_id: str
transaction_id: Optional[str] = None
class TaxCalculationResponse(BaseModel):
receipt_id: str
rules_applied: List[str]
sales_tax: dict
fx_analysis: Optional[dict] = None
meals_entertainment: dict
class DepreciationRequest(BaseModel):
asset: AssetRequest
year: int
method: str # "straight_line" or "cca"
class DepreciationResponse(BaseModel):
asset_id: str
year: int
method: str
depreciation: float
book_value: float
total_depreciation: Optional[float] = None
success: bool
error: Optional[str] = None
+15
View File
@@ -0,0 +1,15 @@
import os
from dotenv import load_dotenv
load_dotenv()
# Get API key from environment variable with fallback
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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")
# Validate API key
if not GROQ_API_KEY or GROQ_API_KEY == "your_api_key_here":
raise ValueError("GROQ_API_KEY environment variable is not set or invalid. Please set it in your .env file.")
CONFIDENCE_THRESHOLD = 0.3
DATE_TOLERANCE_DAYS = 7
AMOUNT_TOLERANCE_PERCENT = 0.05
+498
View File
@@ -0,0 +1,498 @@
import groq
import base64
import io
from PIL import Image
import PyPDF2
from typing import Dict, Any, List, Optional
import config
import os
import aiofiles
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self):
self.client = groq.Groq(api_key=config.GROQ_API_KEY)
self.model = "meta-llama/llama-4-scout-17b-16e-instruct" # Vision model
async def process_file(self, file_path: str, file_type: str) -> Dict[str, Any]:
"""Process uploaded file and extract receipt data"""
try:
if file_type.lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']:
return await self._process_image(file_path)
elif file_type.lower() == 'pdf':
return await self._process_pdf(file_path)
else:
raise ValueError(f"Unsupported file type: {file_type}")
except Exception as e:
return {"error": str(e)}
async def _process_image(self, image_path: str) -> Dict[str, Any]:
"""Extract data from image using Groq vision"""
try:
# Encode image to base64
base64_image = self._encode_image(image_path)
# Create Groq vision prompt
prompt = """
Analyze this receipt image and extract the following information in JSON format:
{
"vendor": "Store/company name",
"description": "Detailed description of items/services purchased",
"total_amount": 0.00,
"tax_amount": 0.00,
"date": "YYYY-MM-DD",
"category": "Food/Transport/Office/Other",
"confidence": 0.95
}
Rules:
- Extract vendor name as it appears on receipt
- Extract description of items/services purchased (e.g., "Coffee and sandwich", "Gasoline", "Office supplies")
- Total amount should be the final total including tax
- Tax amount is separate tax line if available
- Date should be the date on the receipt
- Categorize based on vendor type (Starbucks=Food, Shell=Transport, etc.)
- Confidence score 0-1 based on how clear the receipt is
Return only valid JSON.
"""
# Call Groq vision API with correct format
response = self.client.chat.completions.create(
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
},
},
],
}
],
model=self.model,
max_tokens=500,
temperature=0.1
)
# Parse response
result_text = response.choices[0].message.content.strip()
return self._parse_extraction_result(result_text)
except Exception as e:
return {"error": f"Image processing error: {str(e)}"}
def _encode_image(self, image_path: str) -> str:
"""Encode image to base64 string"""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
async def _process_pdf(self, pdf_path: str) -> Dict[str, Any]:
"""Extract data from PDF by converting to image first"""
try:
# For now, extract text from PDF and process as text
text_content = self._extract_text_from_pdf(pdf_path)
return self._process_text_content(text_content)
except Exception as e:
return {"error": f"PDF processing error: {str(e)}"}
def _extract_text_from_pdf(self, pdf_path: str) -> str:
"""Extract text from PDF"""
try:
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text
except Exception as e:
return ""
def _process_text_content(self, text_content: str) -> Dict[str, Any]:
"""Process text content using Groq (fallback for PDFs)"""
try:
prompt = f"""
Analyze this receipt text and extract the following information in JSON format:
Receipt Text:
{text_content}
Extract:
{{
"vendor": "Store/company name",
"description": "Detailed description of items/services purchased",
"total_amount": 0.00,
"tax_amount": 0.00,
"date": "YYYY-MM-DD",
"category": "Food/Transport/Office/Other",
"confidence": 0.95
}}
Rules:
- Extract vendor name as it appears on receipt
- Extract description of items/services purchased (e.g., "Coffee and sandwich", "Gasoline", "Office supplies")
- Total amount should be the final total including tax
- Tax amount is separate tax line if available
- Date should be the date on the receipt
- Categorize based on vendor type
- Confidence score 0-1 based on clarity
Return only valid JSON.
"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
temperature=0.1
)
result_text = response.choices[0].message.content.strip()
return self._parse_extraction_result(result_text)
except Exception as e:
return {"error": f"Text processing error: {str(e)}"}
def _parse_extraction_result(self, result_text: str) -> Dict[str, Any]:
"""Parse Groq response and extract JSON data"""
try:
# Clean up response and extract JSON
import json
import re
# Find JSON in response - try multiple patterns
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
if json_match:
json_str = json_match.group()
# Clean up common JSON issues
json_str = re.sub(r',\s*([}\]])', r'\1', json_str) # Remove trailing commas
json_str = re.sub(r'([{,])\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', json_str) # Quote unquoted keys
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
# Try to fix common JSON issues
logger.warning(f"Initial JSON parsing failed: {e}")
# Try to extract individual fields using regex
vendor_match = re.search(r'"vendor"\s*:\s*"([^"]*)"', json_str)
description_match = re.search(r'"description"\s*:\s*"([^"]*)"', json_str)
total_amount_match = re.search(r'"total_amount"\s*:\s*([0-9.]+)', json_str)
tax_amount_match = re.search(r'"tax_amount"\s*:\s*([0-9.]+)', json_str)
date_match = re.search(r'"date"\s*:\s*"([^"]*)"', json_str)
category_match = re.search(r'"category"\s*:\s*"([^"]*)"', json_str)
confidence_match = re.search(r'"confidence"\s*:\s*([0-9.]+)', json_str)
data = {
"vendor": vendor_match.group(1) if vendor_match else "",
"description": description_match.group(1) if description_match else "",
"total_amount": float(total_amount_match.group(1)) if total_amount_match else 0.0,
"tax_amount": float(tax_amount_match.group(1)) if tax_amount_match else 0.0,
"date": date_match.group(1) if date_match else "",
"category": category_match.group(1) if category_match else "Other",
"confidence": float(confidence_match.group(1)) if confidence_match else 0.5
}
# Validate and clean data
return {
"vendor": str(data.get("vendor", "")).strip(),
"description": str(data.get("description", "")).strip(),
"total_amount": float(data.get("total_amount", 0)),
"tax_amount": float(data.get("tax_amount", 0)),
"date": str(data.get("date", "")).strip(),
"category": str(data.get("category", "Other")).strip(),
"confidence": float(data.get("confidence", 0.5)),
"extraction_success": True
}
else:
# Try to extract fields from plain text
logger.warning("No JSON found in response, attempting text extraction")
return self._extract_from_plain_text(result_text)
except Exception as e:
logger.error(f"JSON parsing error: {str(e)}")
return {"error": f"JSON parsing error: {str(e)}", "extraction_success": False}
def _extract_from_plain_text(self, text: str) -> Dict[str, Any]:
"""Extract receipt data from plain text when JSON parsing fails"""
try:
import re
# Extract vendor (look for common patterns)
vendor_patterns = [
r'(?:vendor|store|merchant|company)\s*[:\-]?\s*([A-Za-z0-9\s&.,]+)',
r'([A-Z][A-Za-z0-9\s&.,]{3,30})', # Capitalized words
]
vendor = ""
for pattern in vendor_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
vendor = match.group(1).strip()
break
# Extract amount (look for currency patterns)
amount_patterns = [
r'\$?\s*([0-9,]+\.?[0-9]*)',
r'(?:total|amount|sum)\s*[:\-]?\s*\$?\s*([0-9,]+\.?[0-9]*)',
]
total_amount = 0.0
for pattern in amount_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
try:
total_amount = float(match.group(1).replace(',', ''))
break
except ValueError:
continue
# Extract date
date_patterns = [
r'(\d{4}-\d{2}-\d{2})',
r'(\d{1,2}/\d{1,2}/\d{2,4})',
r'(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2},?\s+\d{4}',
]
date = ""
for pattern in date_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
date = match.group(0)
break
return {
"vendor": vendor or "Unknown",
"total_amount": total_amount,
"tax_amount": 0.0,
"date": date or "",
"category": "Other",
"confidence": 0.3, # Low confidence for text extraction
"extraction_success": True
}
except Exception as e:
logger.error(f"Text extraction error: {str(e)}")
return {
"vendor": "Unknown",
"total_amount": 0.0,
"tax_amount": 0.0,
"date": "",
"category": "Other",
"confidence": 0.1,
"extraction_success": False,
"error": f"Text extraction failed: {str(e)}"
}
async def save_uploaded_file(self, file_content: bytes, filename: str) -> str:
"""Save uploaded file to temporary storage"""
try:
# Create uploads directory if it doesn't exist
upload_dir = "uploads"
os.makedirs(upload_dir, exist_ok=True)
# Generate unique filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_filename = f"{timestamp}_{filename.replace(' ', '_')}"
file_path = os.path.join(upload_dir, safe_filename)
# Save file
async with aiofiles.open(file_path, 'wb') as f:
await f.write(file_content)
return file_path
except Exception as e:
raise Exception(f"Failed to save file: {str(e)}")
async def extract_transactions_from_image(self, image_path: str) -> Dict[str, Any]:
"""Extract multiple transactions from an image (bank statement, credit card statement, etc.)"""
try:
# Encode image to base64
base64_image = self._encode_image(image_path)
# Create Groq vision prompt for transaction extraction
prompt = """
Analyze this financial document image (bank statement, credit card statement, etc.) and extract ALL transactions in JSON format.
Look for transaction lists, payment records, or any financial entries that show:
- Date
- Amount (positive or negative)
- Vendor/Description/Payee name
- Any additional notes or memo
Return the transactions as a JSON array:
{
"extraction_success": true,
"transactions": [
{
"date": "YYYY-MM-DD",
"amount": 0.00,
"vendor": "Vendor name",
"memo": "Additional notes"
},
{
"date": "YYYY-MM-DD",
"amount": -0.00,
"vendor": "Another vendor",
"memo": "Payment or charge description"
}
]
}
Rules:
- Extract ALL visible transactions
- Include both positive (credits) and negative (debits) amounts
- Use the actual date format from the document
- Vendor should be the merchant/payee name
- Memo can include transaction type, reference numbers, etc.
- If no transactions found, return empty array but set extraction_success to true
Return only valid JSON.
"""
# Call Groq vision API
response = self.client.chat.completions.create(
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
},
},
],
}
],
model=self.model,
max_tokens=2000, # Higher token limit for multiple transactions
temperature=0.1
)
# Parse response
result_text = response.choices[0].message.content.strip()
return self._parse_transaction_extraction_result(result_text)
except Exception as e:
return {
"extraction_success": False,
"error": f"Transaction extraction error: {str(e)}",
"transactions": []
}
def _parse_transaction_extraction_result(self, result_text: str) -> Dict[str, Any]:
"""Parse Groq response for transaction extraction"""
try:
import json
import re
# Find the first '{' and last '}'
start = result_text.find('{')
end = result_text.rfind('}')
if start == -1 or end == -1 or end <= start:
return {
"extraction_success": False,
"error": "Could not find JSON object in AI response",
"transactions": []
}
json_str = result_text[start:end+1]
# Remove trailing commas before } or ]
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
try:
data = json.loads(json_str)
except Exception as e:
import logging
logging.error(f"JSON parsing error: {str(e)}")
logging.error(f"Offending JSON string:\n{json_str}")
return {
"extraction_success": False,
"error": f"JSON parsing error: {str(e)}",
"transactions": []
}
# Validate and clean data
transactions = data.get("transactions", [])
cleaned_transactions = []
for txn in transactions:
try:
cleaned_txn = {
"date": str(txn.get("date", "")).strip(),
"amount": float(str(txn.get("amount", 0)).replace('$', '').replace(',', '')),
"vendor": str(txn.get("vendor", "")).strip(),
"memo": str(txn.get("memo", "")).strip()
}
cleaned_transactions.append(cleaned_txn)
except Exception as e:
continue
return {
"extraction_success": data.get("extraction_success", True),
"transactions": cleaned_transactions,
"total_transactions": len(cleaned_transactions)
}
except Exception as e:
import logging
logging.error(f"JSON parsing error (outer): {str(e)}")
return {
"extraction_success": False,
"error": f"JSON parsing error: {str(e)}",
"transactions": []
}
def _parse_date_to_iso(self, date_str: str) -> str:
"""Parse various date formats and convert to YYYY-MM-DD"""
try:
import re
from datetime import datetime
date_str = date_str.strip().upper()
# Handle formats like "MAY 22", "JUN 01", "MAY 22, 2024"
month_pattern = r'(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\s+(\d{1,2})(?:,\s*(\d{4}))?'
match = re.match(month_pattern, date_str)
if match:
month_abbr, day, year = match.groups()
month_map = {
'JAN': 1, 'FEB': 2, 'MAR': 3, 'APR': 4, 'MAY': 5, 'JUN': 6,
'JUL': 7, 'AUG': 8, 'SEP': 9, 'OCT': 10, 'NOV': 11, 'DEC': 12
}
month = month_map[month_abbr]
day = int(day)
year = int(year) if year else datetime.now().year
# Handle 2-digit years
if year < 100:
year += 2000
return f"{year:04d}-{month:02d}-{day:02d}"
# Handle YYYY-MM-DD format
if re.match(r'\d{4}-\d{2}-\d{2}', date_str):
return date_str
# Handle MM/DD/YYYY format
if re.match(r'\d{1,2}/\d{1,2}/\d{4}', date_str):
return datetime.strptime(date_str, '%m/%d/%Y').strftime('%Y-%m-%d')
# Handle MM/DD/YY format
if re.match(r'\d{1,2}/\d{1,2}/\d{2}', date_str):
return datetime.strptime(date_str, '%m/%d/%y').strftime('%Y-%m-%d')
return None
except Exception:
return None
+60
View File
@@ -0,0 +1,60 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
import json
import os
@dataclass
class FeedbackLog:
transaction_id: str
original_match: str
correction: str
reason: str
timestamp: datetime
user_id: str
class FeedbackLogger:
def __init__(self, log_file: str = "feedback_logs.json"):
self.log_file = log_file
self.logs: List[FeedbackLog] = self._load_logs()
def _load_logs(self) -> List[FeedbackLog]:
if not os.path.exists(self.log_file):
return []
try:
with open(self.log_file, 'r') as f:
data = json.load(f)
return [FeedbackLog(**log) for log in data]
except:
return []
def _save_logs(self):
with open(self.log_file, 'w') as f:
json.dump([{
'transaction_id': log.transaction_id,
'original_match': log.original_match,
'correction': log.correction,
'reason': log.reason,
'timestamp': log.timestamp.isoformat(),
'user_id': log.user_id
} for log in self.logs], f, indent=2)
def log_override(self, transaction_id: str, original_match: str, correction: str, reason: str, user_id: str):
log = FeedbackLog(
transaction_id=transaction_id,
original_match=original_match,
correction=correction,
reason=reason,
timestamp=datetime.now(),
user_id=user_id
)
self.logs.append(log)
self._save_logs()
def get_logs_by_transaction(self, transaction_id: str) -> List[FeedbackLog]:
return [log for log in self.logs if log.transaction_id == transaction_id]
def get_recent_logs(self, days: int = 30) -> List[FeedbackLog]:
cutoff = datetime.now() - timedelta(days=days)
return [log for log in self.logs if log.timestamp > cutoff]
+138
View File
@@ -0,0 +1,138 @@
import os
import io
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
class GoogleDriveSync:
def __init__(self):
self.service = None
self.processed_files = set()
def authenticate(self):
"""Authenticate with Google Drive API"""
try:
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
# Load existing credentials
if os.path.exists('token.json'):
self.creds = Credentials.from_authorized_user_file('token.json', SCOPES)
# If no valid credentials available, let user log in
if not self.creds or not self.creds.valid:
if self.creds and self.creds.expired and self.creds.refresh_token:
self.creds.refresh(Request())
else:
if not os.path.exists('credentials.json'):
raise Exception("credentials.json not found. Please download from Google Cloud Console.")
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
self.creds = flow.run_local_server(port=0)
# Save credentials for next run
with open('token.json', 'w') as token:
token.write(self.creds.to_json())
# Build the Drive service
self.service = build('drive', 'v3', credentials=self.creds)
return True
except Exception as e:
print(f"Authentication error: {e}")
return False
def list_folders(self) -> List[Dict[str, Any]]:
"""List all folders in Google Drive"""
if not self.service:
if not self.authenticate():
return []
try:
results = self.service.files().list(
q="mimeType='application/vnd.google-apps.folder'",
pageSize=100,
fields="nextPageToken, files(id, name, createdTime, modifiedTime)"
).execute()
return results.get('files', [])
except Exception as e:
print(f"Error listing folders: {e}")
return []
def get_folder_info(self, folder_id: str) -> Dict[str, Any]:
"""Get information about a Google Drive folder"""
if not self.service:
if not self.authenticate():
return {}
try:
folder = self.service.files().get(
fileId=folder_id,
fields="id, name, createdTime, modifiedTime"
).execute()
return folder
except Exception as e:
print(f"Error getting folder info: {e}")
return {}
async def process_drive_files(self, folder_id: str = None) -> List[Dict[str, Any]]:
"""Process all receipt files from Google Drive"""
if not self.service:
if not self.authenticate():
return []
results = []
try:
# File types to look for
file_types = ["'application/pdf'", "'image/jpeg'", "'image/png'", "'image/gif'", "'image/bmp'"]
mime_types = " or ".join(file_types)
# Build query
query = f"mimeType contains {mime_types}"
if folder_id:
query += f" and '{folder_id}' in parents"
# Add date filter (last 30 days)
thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat() + 'Z'
query += f" and modifiedTime > '{thirty_days_ago}'"
results_files = self.service.files().list(
q=query,
pageSize=100,
fields="nextPageToken, files(id, name, mimeType, modifiedTime, size)"
).execute()
files = results_files.get('files', [])
files = [file for file in files if file['id'] not in self.processed_files]
# For demo purposes, return mock results
for file in files[:3]: # Process first 3 files
mock_result = {
"file_id": file['id'],
"filename": file['name'],
"drive_modified": file['modifiedTime'],
"file_size": file.get('size', 0),
"extraction_success": True,
"vendor": "Demo Vendor",
"description": "Coffee and sandwich",
"total_amount": 25.50,
"tax_amount": 2.04,
"date": "2024-01-15",
"category": "Food",
"confidence": 0.95
}
results.append(mock_result)
self.processed_files.add(file['id'])
except Exception as e:
print(f"Error processing Drive files: {e}")
return results
+555
View File
@@ -0,0 +1,555 @@
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime
from typing import List
import uuid
import csv
import io
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
from api_models import (
MatchingRequest, MatchingResponse, MatchResponse,
ApprovalRequest, RuleRequest, DocumentUploadResponse,
DocumentProcessResponse, TransactionRequest
)
from models import Receipt, Transaction, Match
from matching_engine import MatchingEngine
from ai_rules import AIRule
from document_processor import DocumentProcessor
app = FastAPI(
title="AI Bookkeeper - Data Science Engine",
description="AI-powered receipt-to-transaction matching engine. Receives transaction data and provides intelligent matching capabilities.",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize DS Engine components
matching_engine = MatchingEngine()
document_processor = DocumentProcessor()
# In-memory storage for uploaded files (in production, use a database)
uploaded_files = {}
# Store imported transactions globally for easy access
stored_transactions = []
processed_receipts = {}
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"message": "AI Bookkeeper Data Science Engine is running",
"version": "1.0.0",
"status": "healthy"
}
# ============================================================================
# TRANSACTION IMPORT ENDPOINTS
# ============================================================================
@app.post("/transactions/import/csv")
async def import_transactions_csv(file: UploadFile = File(...)):
"""
Import transactions from a CSV file (custom bank export format).
"""
try:
content = await file.read()
decoded = content.decode('utf-8')
reader = csv.DictReader(io.StringIO(decoded))
transactions = []
errors = []
for idx, row in enumerate(reader):
try:
# Use correct headers and strip whitespace
account_number = row.get('Account Number') or row.get('Account Number '.strip())
txn_date_raw = row.get('Transaction Date') or row.get('Transaction Date '.strip())
amount_raw = row.get('Amount') or row.get('Amount '.strip())
payee_name = row.get('Description 2') or row.get('Description 2 '.strip())
memo = f"{row.get('Account Type','').strip()} {row.get('Cheque Number','').strip()} {row.get('Description 1','').strip()}".strip()
# Compose ID
txn_id = f"{account_number}_{idx+1}"
# Parse date (try multiple formats)
txn_date_str = txn_date_raw.strip()
txn_date = None
for fmt in ("%m/%d/%y", "%m/%d/%Y"):
try:
txn_date = datetime.strptime(txn_date_str, fmt).strftime("%Y-%m-%d")
break
except Exception:
continue
if not txn_date:
raise ValueError(f"Could not parse date: {txn_date_str}")
# Parse amount
amount = float(amount_raw.replace(',', '').strip())
transactions.append({
"id": txn_id,
"txn_date": txn_date,
"amount": amount,
"payee_name": payee_name.strip(),
"memo": memo
})
except Exception as e:
errors.append(f"Row {idx+1}: {str(e)}")
# Store transactions globally for auto-matching
global stored_transactions
stored_transactions = transactions
return {
"imported_count": len(transactions),
"converted_transactions": transactions,
"errors": errors
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/transactions/import/image")
async def import_transactions_from_image(file: UploadFile = File(...)):
"""
Import transactions from an image (bank statement, credit card statement, etc.) using AI extraction.
"""
try:
# Validate file type
allowed_types = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'pdf']
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in allowed_types:
raise HTTPException(status_code=400, detail=f"Unsupported file type. Allowed: {allowed_types}")
# Read file content
content = await file.read()
# Save file to disk
image_path = await document_processor.save_uploaded_file(content, file.filename)
# Extract transactions from image (pass file path)
extraction_result = await document_processor.extract_transactions_from_image(image_path)
if not extraction_result.get("extraction_success", False):
raise HTTPException(status_code=500, detail=extraction_result.get("error", "Extraction failed"))
extracted_transactions = extraction_result.get("transactions", [])
# Store transactions globally for auto-matching
global stored_transactions
stored_transactions = []
for idx, txn in enumerate(extracted_transactions):
try:
txn_id = f"img_{file.filename}_{idx+1}"
txn_date_raw = txn.get("date")
amount = txn.get("amount")
vendor = txn.get("vendor")
memo = txn.get("memo", "")
# Parse date to YYYY-MM-DD format
txn_date = document_processor._parse_date_to_iso(txn_date_raw)
if not txn_date:
# Fallback: use current year if parsing fails
txn_date = f"2024-{txn_date_raw}"
stored_transactions.append({
"id": txn_id,
"txn_date": txn_date,
"amount": amount,
"payee_name": vendor,
"memo": memo
})
except Exception as e:
continue
return {
"imported_count": len(stored_transactions),
"converted_transactions": stored_transactions,
"errors": []
}
except Exception as e:
logger.error(f"Error importing transactions from image: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# DOCUMENT PROCESSING ENDPOINTS
# ============================================================================
@app.post("/upload-multiple", response_model=List[DocumentUploadResponse])
async def upload_multiple_documents(files: List[UploadFile] = File(...)):
"""
Upload multiple receipt images for processing.
This endpoint accepts multiple image files and returns file IDs
that can be used with the /process/{file_id} endpoint.
"""
try:
responses = []
for file in files:
# Validate file type
allowed_types = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'pdf']
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in allowed_types:
raise HTTPException(status_code=400, detail=f"Unsupported file type for {file.filename}. Allowed: {allowed_types}")
# Generate unique file ID
file_id = str(uuid.uuid4())
# Read and store file content
content = await file.read()
uploaded_files[file_id] = {
"filename": file.filename,
"content": content,
"upload_date": datetime.now()
}
responses.append(DocumentUploadResponse(
file_id=file_id,
filename=file.filename,
file_type=file_extension,
upload_date=datetime.now(),
status="uploaded"
))
return responses
except Exception as e:
logger.error(f"Error uploading documents: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/process/{file_id}", response_model=DocumentProcessResponse)
async def process_document(file_id: str):
"""
Process a previously uploaded document to extract receipt information.
This endpoint uses AI to extract structured data from receipt images,
including vendor, amount, date, and category information.
"""
try:
# Check if file exists
if file_id not in uploaded_files:
raise HTTPException(status_code=404, detail=f"File {file_id} not found")
file_data = uploaded_files[file_id]
# Save file temporarily and process it
file_path = await document_processor.save_uploaded_file(file_data["content"], file_data["filename"])
file_type = file_data["filename"].split('.')[-1].lower()
receipt_data = await document_processor.process_file(file_path, file_type)
# Store processed receipt
processed_receipts[file_id] = receipt_data
return DocumentProcessResponse(
file_id=file_id,
extraction_success=receipt_data.get("extraction_success", False),
vendor=receipt_data.get("vendor", ""),
description=receipt_data.get("description", ""),
total_amount=receipt_data.get("total_amount", 0.0),
tax_amount=receipt_data.get("tax_amount", 0.0),
date=receipt_data.get("date", ""),
category=receipt_data.get("category", ""),
confidence=receipt_data.get("confidence", 0.0),
error=receipt_data.get("error", None)
)
except Exception as e:
logger.error(f"Error processing document {file_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# MATCHING ENDPOINTS
# ============================================================================
@app.post("/match-specific", response_model=MatchingResponse)
async def match_specific_receipts(file_ids: List[str]):
"""
Match specific receipts against imported transactions.
This endpoint takes a list of receipt file IDs and matches them against
the currently imported transactions using AI-powered matching logic.
"""
try:
logger.info(f"Starting match-specific for file IDs: {file_ids}")
# Check if transactions are imported
if not stored_transactions:
logger.warning("No transactions imported")
raise HTTPException(status_code=400, detail="No transactions imported. Please upload CSV first.")
logger.info(f"Found {len(stored_transactions)} stored transactions")
# Convert stored transactions to Transaction objects
transactions = []
for txn in stored_transactions:
try:
txn_date = datetime.strptime(txn["txn_date"], "%Y-%m-%d")
transaction = Transaction(
id=txn["id"],
transaction_date=txn_date,
amount=txn["amount"],
vendor=txn["payee_name"],
notes=txn["memo"]
)
transactions.append(transaction)
except Exception as e:
logger.warning(f"Error converting transaction {txn['id']}: {str(e)}")
continue
logger.info(f"Converted {len(transactions)} transactions")
# Get receipts for the specified file IDs
receipts = []
missing_files = []
for file_id in file_ids:
if file_id in processed_receipts:
receipt_data = processed_receipts[file_id]
logger.info(f"DEBUG: receipt_data for {file_id}: {receipt_data}")
logger.info(f"DEBUG: receipt_data keys for {file_id}: {list(receipt_data.keys())}")
try:
# Handle missing date field
if "date" not in receipt_data or not receipt_data["date"]:
logger.warning(f"Missing date for receipt {file_id}, using current date")
receipt_date = datetime.now()
else:
receipt_date = datetime.strptime(receipt_data["date"], "%Y-%m-%d")
# Handle missing amount field - try multiple possible keys
amount = receipt_data.get("amount")
if amount is None:
amount = receipt_data.get("total_amount")
if amount is None:
amount = receipt_data.get("amount_total")
if amount is None:
logger.warning(f"Missing amount for receipt {file_id}, using 0.0")
amount = 0.0
# Ensure amount is a float
try:
amount = float(amount)
except (ValueError, TypeError):
logger.warning(f"Invalid amount '{amount}' for receipt {file_id}, using 0.0")
amount = 0.0
logger.info(f"DEBUG: amount for {file_id}: {amount}")
# Handle missing vendor field
vendor = receipt_data.get("vendor", "")
if not vendor:
logger.warning(f"Missing vendor for receipt {file_id}, using 'Unknown'")
vendor = "Unknown"
# Handle missing category field
category = receipt_data.get("category", "Other")
# Handle description field
description = receipt_data.get("description", "")
# Handle tax field
tax = receipt_data.get("tax", receipt_data.get("tax_amount", 0.0))
try:
tax = float(tax)
except (ValueError, TypeError):
tax = 0.0
receipt = Receipt(
id=file_id,
file_name=uploaded_files[file_id]["filename"],
upload_date=uploaded_files[file_id]["upload_date"],
receipt_date=receipt_date,
amount=amount,
tax=tax,
vendor=vendor,
category=category,
description=description
)
receipts.append(receipt)
logger.info(f"Added receipt: {receipt.vendor} - ${receipt.amount}")
except Exception as e:
logger.warning(f"Error creating receipt object for {file_id}: {str(e)}")
missing_files.append(f"{file_id} (error: {str(e)})")
else:
logger.warning(f"Receipt {file_id} not found in processed_receipts")
missing_files.append(f"{file_id} (not found)")
if missing_files:
logger.error(f"Missing files: {missing_files}")
raise HTTPException(status_code=400, detail=f"Missing files: {missing_files}")
logger.info(f"Processing {len(receipts)} receipts against {len(transactions)} transactions")
# Perform matching
try:
logger.info("Starting direct matching call (without ThreadPoolExecutor)")
logger.info(f"matching_engine type: {type(matching_engine)}")
logger.info(f"matching_engine.process_matching type: {type(matching_engine.process_matching)}")
logger.info(f"receipts type: {type(receipts)}, length: {len(receipts)}")
logger.info(f"transactions type: {type(transactions)}, length: {len(transactions)}")
matches = matching_engine.process_matching(receipts, transactions)
logger.info(f"Matching completed successfully. Found {len(matches)} matches")
# Convert matches to response format
match_responses = []
for match in matches:
logger.info(f"Raw match object: {match}")
logger.info(f" receipt_id: {match.receipt.id}")
logger.info(f" transaction_id: {match.transaction.id}")
logger.info(f" confidence_score: {match.confidence_score}")
logger.info(f" match_reason: {match.match_reason}")
logger.info(f" receipt_vendor: {match.receipt.vendor}")
logger.info(f" receipt_amount: {match.receipt.amount}")
logger.info(f" transaction_vendor: {match.transaction.vendor}")
logger.info(f" transaction_amount: {match.transaction.amount}")
match_response = MatchResponse(
receipt_id=match.receipt.id,
transaction_id=match.transaction.id,
confidence_score=match.confidence_score,
match_reason=match.match_reason,
receipt_vendor=match.receipt.vendor,
receipt_amount=match.receipt.amount,
receipt_description=match.receipt.description,
receipt_category=match.receipt.category,
receipt_tax_amount=match.receipt.tax,
transaction_vendor=match.transaction.vendor,
transaction_amount=match.transaction.amount
)
match_responses.append(match_response)
logger.info(f"Successfully created MatchResponse for {match.receipt.vendor} -> {match.transaction.vendor}")
logger.info(f"Formatted {len(match_responses)} match responses")
# Calculate statistics
if match_responses:
high_confidence = sum(1 for m in match_responses if m.confidence_score >= 0.8)
low_confidence = len(match_responses) - high_confidence
avg_score = sum(m.confidence_score for m in match_responses) / len(match_responses)
else:
high_confidence = low_confidence = avg_score = 0
stats = {
"total": len(match_responses),
"high_confidence": high_confidence,
"low_confidence": low_confidence,
"avg_score": round(avg_score, 2)
}
logger.info(f"Generated stats: {stats}")
logger.info(f"Match-specific completed successfully with {len(match_responses)} matches")
return MatchingResponse(
matches=match_responses,
stats=stats
)
except Exception as e:
logger.error(f"Exception in matching section: {str(e)}")
logger.error(f"Exception type: {type(e)}")
logger.error(f"Exception args: {e.args}")
logger.error(f"Traceback: {e.__traceback__}")
raise HTTPException(status_code=500, detail=f"Unexpected matching error: {str(e)}")
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in match_specific_receipts: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# RULES MANAGEMENT ENDPOINTS
# ============================================================================
@app.post("/rules")
async def add_rule(request: RuleRequest):
"""
Add a new AI rule for transaction matching.
"""
try:
new_rule = AIRule(
name=request.name,
condition=request.condition,
action=request.action,
source=request.source
)
matching_engine.rules_engine.rules.append(new_rule)
return {"message": f"Rule '{request.name}' added successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/rules")
async def get_rules():
"""
Get all current AI rules.
"""
try:
rules = []
for rule in matching_engine.rules_engine.rules:
rules.append({
"name": rule.name,
"condition": rule.condition,
"action": rule.action,
"source": rule.source,
"status": rule.status
})
return {"rules": rules}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/rules/{rule_name}")
async def delete_rule(rule_name: str):
"""
Delete an AI rule by name.
"""
try:
rules = matching_engine.rules_engine.rules
for i, rule in enumerate(rules):
if rule.name == rule_name:
del rules[i]
return {"message": f"Rule '{rule_name}' deleted successfully"}
raise HTTPException(status_code=404, detail=f"Rule '{rule_name}' not found")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# STATISTICS ENDPOINT
# ============================================================================
@app.get("/stats")
async def get_stats():
"""
Get system statistics.
"""
try:
return {
"total_transactions": len(stored_transactions),
"total_receipts": len(processed_receipts),
"total_uploaded_files": len(uploaded_files),
"rules_count": len(matching_engine.rules_engine.rules)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8343)
+77
View File
@@ -0,0 +1,77 @@
from typing import List, Dict, Any
from datetime import datetime
from ai_matcher import AIMatcher
from ai_rules import AIRulesEngine
from feedback_logger import FeedbackLogger
from models import Receipt, Transaction, Match
class MatchingEngine:
def __init__(self):
self.ai_matcher = AIMatcher()
self.rules_engine = AIRulesEngine()
self.feedback_logger = FeedbackLogger()
def process_matching(self, receipts: List[Receipt], transactions: List[Transaction]) -> List[Match]:
# Get AI matches
ai_matches = self.ai_matcher.match_receipts_to_transactions(receipts, transactions)
# Apply rules and enhance matches
enhanced_matches = []
for match in ai_matches:
enhanced_match = self._enhance_match_with_rules(match)
enhanced_matches.append(enhanced_match)
return enhanced_matches
def _enhance_match_with_rules(self, match: Match) -> Match:
rule_results = self.rules_engine.apply_rules(match.receipt, match.transaction)
# Apply confidence boost from rules
if rule_results["confidence_boost"] > 0:
match.confidence_score = min(1.0, match.confidence_score + rule_results["confidence_boost"])
# Auto-approve if rules say so
if rule_results["auto_approve"]:
match.confidence_score = 1.0
match.match_reason += " (Auto-approved by rules)"
# Add tax analysis to match
if rule_results.get("tax_analysis"):
match.tax_analysis = rule_results["tax_analysis"]
return match
def approve_match(self, match: Match, user_id: str):
# Log the approval
self.feedback_logger.log_override(
transaction_id=match.transaction.id,
original_match=f"AI Score: {match.confidence_score}",
correction="Approved",
reason="User approved match",
user_id=user_id
)
def reject_match(self, match: Match, reason: str, user_id: str):
# Log the rejection
self.feedback_logger.log_override(
transaction_id=match.transaction.id,
original_match=f"AI Score: {match.confidence_score}",
correction="Rejected",
reason=reason,
user_id=user_id
)
def get_matching_stats(self, matches: List[Match]) -> Dict[str, Any]:
if not matches:
return {"total": 0, "high_confidence": 0, "low_confidence": 0, "avg_score": 0}
high_confidence = len([m for m in matches if m.confidence_score >= 0.8])
low_confidence = len([m for m in matches if m.confidence_score < 0.8])
avg_score = sum(m.confidence_score for m in matches) / len(matches)
return {
"total": len(matches),
"high_confidence": high_confidence,
"low_confidence": low_confidence,
"avg_score": round(avg_score, 3)
}
+59
View File
@@ -0,0 +1,59 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class Address:
"""Address information for tax calculations"""
province: str
city: str
postal_code: str
country: str = "Canada"
@dataclass
class Receipt:
id: str
file_name: str
upload_date: datetime
receipt_date: datetime
amount: float
tax: float
vendor: str
category: str
description: str
# Tax rule fields
billing_address: Optional[Address] = None
shipping_address: Optional[Address] = None
currency: str = "CAD"
is_meals_entertainment: bool = False
@dataclass
class Transaction:
id: str
transaction_date: datetime
amount: float
vendor: str
notes: str
# Tax rule fields
currency: str = "CAD"
fx_rate: Optional[float] = None
@dataclass
class Asset:
"""Asset for depreciation calculations"""
id: str
name: str
purchase_date: datetime
purchase_amount: float
useful_life_years: int
residual_value: float
cca_rate: float # Capital Cost Allowance rate
asset_class: str
@dataclass
class Match:
receipt: Receipt
transaction: Transaction
confidence_score: float
match_reason: str
tax_analysis: Optional[dict] = None
+16
View File
@@ -0,0 +1,16 @@
groq>=0.5.0
python-dotenv==1.0.0
pandas==2.1.4
numpy==1.24.3
fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.5.0
requests==2.31.0
python-multipart==0.0.6
Pillow==10.0.1
PyPDF2==3.0.1
aiofiles==23.2.1
google-auth==2.23.4
google-auth-oauthlib==1.1.0
google-auth-httplib2==0.1.1
google-api-python-client==2.108.0
+271
View File
@@ -0,0 +1,271 @@
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
from models import Receipt, Transaction, Address, Asset
import logging
logger = logging.getLogger(__name__)
class TaxRulesEngine:
"""Engine to handle tax calculations based on the four tax rules"""
# Provincial tax rates (simplified - in production, use a tax rate API)
PROVINCIAL_TAX_RATES = {
"ON": 0.13, # Ontario HST
"QC": 0.14975, # Quebec QST
"BC": 0.12, # British Columbia
"AB": 0.05, # Alberta
"SK": 0.11, # Saskatchewan
"MB": 0.12, # Manitoba
"NS": 0.15, # Nova Scotia
"NB": 0.15, # New Brunswick
"NL": 0.15, # Newfoundland and Labrador
"PE": 0.15, # Prince Edward Island
"NT": 0.05, # Northwest Territories
"NU": 0.05, # Nunavut
"YT": 0.05, # Yukon
}
def __init__(self):
self.logger = logging.getLogger(__name__)
def apply_sales_tax_rule(self, receipt: Receipt) -> Dict[str, Any]:
"""
Sales Tax Rule: Apply correct sales tax based on billing vs shipping addresses
"""
try:
# Determine which address to use for tax calculation
tax_address = self._get_tax_address(receipt)
if not tax_address:
return {
"success": False,
"error": "No valid address found for tax calculation",
"calculated_tax": 0.0,
"tax_rate": 0.0
}
# Get tax rate for the province
tax_rate = self.PROVINCIAL_TAX_RATES.get(tax_address.province, 0.0)
# Calculate tax amount
calculated_tax = receipt.amount * tax_rate
return {
"success": True,
"calculated_tax": calculated_tax,
"tax_rate": tax_rate,
"tax_address": tax_address.province,
"rule_applied": "Sales Tax Rule"
}
except Exception as e:
self.logger.error(f"Error applying sales tax rule: {str(e)}")
return {
"success": False,
"error": str(e),
"calculated_tax": 0.0,
"tax_rate": 0.0
}
def _get_tax_address(self, receipt: Receipt) -> Optional[Address]:
"""Determine which address to use for tax calculation"""
# Rule: Use shipping address if different from billing, otherwise use billing
if receipt.shipping_address and receipt.billing_address:
if self._addresses_different(receipt.billing_address, receipt.shipping_address):
return receipt.shipping_address
else:
return receipt.billing_address
elif receipt.shipping_address:
return receipt.shipping_address
elif receipt.billing_address:
return receipt.billing_address
else:
return None
def _addresses_different(self, billing: Address, shipping: Address) -> bool:
"""Check if billing and shipping addresses are different"""
return (billing.province != shipping.province or
billing.city != shipping.city or
billing.postal_code != shipping.postal_code)
def apply_fx_rule(self, receipt: Receipt, transaction: Transaction) -> Dict[str, Any]:
"""
Foreign Exchange Rule: Handle currency mismatches
"""
try:
# Check for currency mismatch
if receipt.currency != transaction.currency:
fx_discrepancy = abs(receipt.amount - abs(transaction.amount))
return {
"success": True,
"fx_discrepancy": fx_discrepancy,
"receipt_currency": receipt.currency,
"transaction_currency": transaction.currency,
"receipt_amount": receipt.amount,
"transaction_amount": abs(transaction.amount),
"requires_manual_review": True,
"rule_applied": "Foreign Exchange Rule"
}
else:
return {
"success": True,
"fx_discrepancy": 0.0,
"requires_manual_review": False,
"rule_applied": "No FX Rule (same currency)"
}
except Exception as e:
self.logger.error(f"Error applying FX rule: {str(e)}")
return {
"success": False,
"error": str(e),
"fx_discrepancy": 0.0,
"requires_manual_review": False
}
def calculate_straight_line_depreciation(self, asset: Asset, year: int) -> Dict[str, Any]:
"""
Straight-Line Depreciation for accounting purposes
"""
try:
if year > asset.useful_life_years:
return {
"success": False,
"error": f"Year {year} exceeds useful life of {asset.useful_life_years} years",
"depreciation": 0.0
}
# Straight-line formula: (Cost - Residual Value) / Useful Life
annual_depreciation = (asset.purchase_amount - asset.residual_value) / asset.useful_life_years
return {
"success": True,
"depreciation": annual_depreciation,
"book_value": asset.purchase_amount - (annual_depreciation * year),
"method": "Straight-Line",
"rule_applied": "Depreciation Rule (Accounting)"
}
except Exception as e:
self.logger.error(f"Error calculating straight-line depreciation: {str(e)}")
return {
"success": False,
"error": str(e),
"depreciation": 0.0
}
def calculate_cca_depreciation(self, asset: Asset, year: int) -> Dict[str, Any]:
"""
CCA (Capital Cost Allowance) Depreciation for tax purposes
"""
try:
if year < 1:
return {
"success": False,
"error": "Year must be at least 1",
"depreciation": 0.0
}
# CCA uses declining balance method
book_value = asset.purchase_amount
total_depreciation = 0.0
for current_year in range(1, year + 1):
# CCA is calculated on the declining balance
cca_amount = book_value * asset.cca_rate
book_value -= cca_amount
total_depreciation += cca_amount
# Stop if book value reaches residual value
if book_value <= asset.residual_value:
break
return {
"success": True,
"depreciation": cca_amount, # Current year depreciation
"total_depreciation": total_depreciation,
"book_value": max(book_value, asset.residual_value),
"method": "CCA Declining Balance",
"rule_applied": "Depreciation Rule (Tax)"
}
except Exception as e:
self.logger.error(f"Error calculating CCA depreciation: {str(e)}")
return {
"success": False,
"error": str(e),
"depreciation": 0.0
}
def apply_meals_entertainment_rule(self, receipt: Receipt) -> Dict[str, Any]:
"""
Meals & Entertainment Tax Deduction Rule
"""
try:
if not receipt.is_meals_entertainment:
return {
"success": True,
"tax_deduction": receipt.amount,
"accounting_deduction": receipt.amount,
"rule_applied": "No M&E Rule (not meals/entertainment)"
}
# For tax purposes: 50% deductible
tax_deduction = receipt.amount * 0.5
# For accounting purposes: 100% deductible
accounting_deduction = receipt.amount
# Sales tax is fully deductible for accounting
tax_on_meal = receipt.tax
return {
"success": True,
"tax_deduction": tax_deduction,
"accounting_deduction": accounting_deduction,
"tax_on_meal": tax_on_meal,
"rule_applied": "Meals & Entertainment Rule"
}
except Exception as e:
self.logger.error(f"Error applying meals & entertainment rule: {str(e)}")
return {
"success": False,
"error": str(e),
"tax_deduction": 0.0,
"accounting_deduction": 0.0
}
def apply_all_tax_rules(self, receipt: Receipt, transaction: Transaction = None) -> Dict[str, Any]:
"""
Apply all tax rules to a receipt
"""
results = {
"receipt_id": receipt.id,
"rules_applied": [],
"sales_tax": {},
"fx_analysis": {},
"meals_entertainment": {}
}
# Apply Sales Tax Rule
sales_tax_result = self.apply_sales_tax_rule(receipt)
results["sales_tax"] = sales_tax_result
if sales_tax_result["success"]:
results["rules_applied"].append("Sales Tax Rule")
# Apply FX Rule (if transaction provided)
if transaction:
fx_result = self.apply_fx_rule(receipt, transaction)
results["fx_analysis"] = fx_result
if fx_result["success"]:
results["rules_applied"].append("Foreign Exchange Rule")
# Apply Meals & Entertainment Rule
me_result = self.apply_meals_entertainment_rule(receipt)
results["meals_entertainment"] = me_result
if me_result["success"]:
results["rules_applied"].append("Meals & Entertainment Rule")
return results