Compare commits

...

4 Commits

8 changed files with 962 additions and 687 deletions
+133 -91
View File
@@ -7,9 +7,9 @@ AI-powered receipt-to-transaction matching engine using Groq LLM. This is a **Da
This Data Science Engine receives QuickBooks transaction data from backend applications and provides:
- **AI-powered receipt processing** (OCR and data extraction)
- **Intelligent receipt-transaction matching** with confidence scores
- **Google Drive integration** for batch receipt processing
- **Configurable AI rules** for business logic
- **Feedback logging** for continuous improvement
- **RESTful API** for easy integration
## 🚀 Quick Start
@@ -19,11 +19,22 @@ pip install -r requirements.txt
```
### 2. Configure API Keys
The Groq API key is already configured in `config.py`
Create a `.env` file in the project root with your Groq API key:
### 3. Start the DS Engine
```bash
# Create .env file
echo "GROQ_API_KEY=your_actual_groq_api_key_here" > .env
```
**Important**: Get your API key from [Groq Console](https://console.groq.com/)
### 3. Start the Server
```bash
# Option 1: Using the main script
python main.py
# Option 2: Using uvicorn directly
uvicorn main:app --host 0.0.0.0 --port 8343 --reload
```
### 4. Access API Documentation
@@ -32,22 +43,16 @@ python main.py
## 📋 API Endpoints
### QuickBooks Data Import
- `POST /transactions/import/quickbooks` - Import and convert QuickBooks transactions
### Transaction Import
- `POST /transactions/import/csv` - Import transactions from CSV file
- `POST /transactions/import/image` - Import transactions from image/PDF
### Receipt Processing
- `POST /upload` - Upload receipt documents (PDF/images)
- `POST /upload-multiple` - Upload multiple receipt documents
- `POST /process/{file_id}` - Extract data from uploaded documents
- `GET /documents` - List all processed documents
### Google Drive Integration
- `POST /drive/sync` - Sync and process receipts from Google Drive
- `GET /drive/folders` - List accessible Google Drive folders
- `GET /drive/folder/{folder_id}` - Get folder information
### AI Matching Engine
- `POST /match` - Match receipts to transactions using AI
- `POST /approve` - Approve or reject AI matches
- `POST /match-specific` - Match specific receipts to transactions using AI
### AI Rules Management
- `POST /rules` - Add new AI rules
@@ -56,6 +61,7 @@ python main.py
### System Monitoring
- `GET /stats` - Get system statistics and performance metrics
- `GET /` - Health check endpoint
## 🔧 Core Components
@@ -63,21 +69,25 @@ python main.py
- Uses Groq LLM to compare receipts and transactions
- Provides confidence scores and reasoning
- Configurable matching criteria (amount, date, vendor)
- Rate limiting to prevent API quota exhaustion
### **AIRulesEngine** (`ai_rules.py`)
- Applies business rules for auto-approval and categorization
- Configurable rule conditions and actions
- Supports system and user-generated rules
- Safe condition evaluation with proper error handling
### **DocumentProcessor** (`document_processor.py`)
- AI-powered receipt data extraction
- AI-powered receipt data extraction using Groq vision model
- Supports PDF and image formats
- Uses Groq vision model for OCR
- Robust JSON parsing with error handling
- Extracts vendor, amount, date, tax, and category information
### **MatchingEngine** (`matching_engine.py`)
- Main orchestrator combining all components
- Handles the complete matching workflow
- Provides statistics and feedback logging
- Configurable confidence thresholds
### **FeedbackLogger** (`feedback_logger.py`)
- Tracks manual overrides for AI training
@@ -87,70 +97,46 @@ python main.py
## 📊 Configuration
Edit `config.py` to adjust:
- **Confidence threshold** (default: 0.8)
- **Confidence threshold** (default: 0.3)
- **Date tolerance days** (default: 7)
- **Amount tolerance percent** (default: 5%)
- **Groq API key** (already configured)
- **Groq API key** (from environment variable)
## 🔄 Integration Workflow
### 1. Backend Sends QuickBooks Data
```python
# Backend sends QuickBooks transactions
response = requests.post(
"http://localhost:8343/transactions/import/quickbooks",
json={
"transactions": [
{
"id": "QB_TXN_123",
"txn_date": "2024-01-15",
"amount": 12.50,
"payee_name": "Starbucks",
"memo": "Coffee purchase"
}
]
}
)
### 1. Import Transactions
```bash
# Import from CSV
curl -X POST -F "file=@transactions.csv" http://localhost:8343/transactions/import/csv
# Import from image
curl -X POST -F "file=@statement.jpg" http://localhost:8343/transactions/import/image
```
### 2. Process Receipts
```python
# Sync from Google Drive
response = requests.post(
"http://localhost:8343/drive/sync",
json={"folder_id": "your_folder_id"}
)
### 2. Upload and Process Receipts
```bash
# Upload receipts
curl -X POST -F "files=@receipt1.jpg" -F "files=@receipt2.jpg" http://localhost:8343/upload-multiple
# Or upload directly
response = requests.post(
"http://localhost:8343/upload",
files={"file": receipt_file}
)
# Process a specific receipt
curl -X POST http://localhost:8343/process/{file_id}
```
### 3. AI Matching
```python
# Match receipts to transactions
response = requests.post(
"http://localhost:8343/match",
json={
"receipts": processed_receipts,
"transactions": converted_transactions
}
)
```bash
# Match specific receipts
curl -X POST -H "Content-Type: application/json" \
-d '["file_id_1", "file_id_2"]' \
http://localhost:8343/match-specific
```
### 4. User Feedback
```python
# Approve or reject matches
response = requests.post(
"http://localhost:8343/approve",
json={
"match_id": "match_123",
"user_id": "user_456",
"action": "approve"
}
)
### 4. Check Results
```bash
# Get system stats
curl http://localhost:8343/stats
# View AI rules
curl http://localhost:8343/rules
```
## 🎯 Key Features
@@ -159,55 +145,96 @@ response = requests.post(
- **Rule-based auto-approval** and categorization
- **Feedback logging** for continuous improvement
- **Configurable matching parameters**
- **Google Drive integration** for batch processing
- **JSON API** for easy backend integration
- **RESTful JSON API** for easy backend integration
- **Comprehensive error handling**
- **Rate limiting** to prevent API quota exhaustion
- **Robust JSON parsing** for AI responses
## 📝 Data Formats
### QuickBooks Transaction Input
### Transaction Input (CSV)
```csv
Date,Description,Amount,Category
2024-01-15,Starbucks Coffee,12.50,Food & Dining
2024-01-16,Office Supplies,45.99,Office
```
### Receipt Processing Output
```json
{
"id": "string",
"txn_date": "YYYY-MM-DD",
"amount": 0.00,
"payee_name": "string",
"memo": "string (optional)",
"account_name": "string (optional)",
"txn_type": "string (optional)"
"vendor": "Starbucks",
"total_amount": 12.50,
"tax_amount": 1.25,
"date": "2024-01-15",
"category": "Food & Dining",
"confidence": 0.95,
"extraction_success": true
}
```
### Match Result Output
```json
{
"receipt_id": "string",
"transaction_id": "string",
"receipt_id": "uuid",
"transaction_id": "transaction_123",
"confidence_score": 0.95,
"match_reason": "string",
"receipt_vendor": "string",
"receipt_amount": 0.00,
"transaction_vendor": "string",
"transaction_amount": 0.00
"match_reason": "Same vendor, minor date difference (Auto-approved by rules)",
"receipt_vendor": "Starbucks",
"receipt_amount": 12.50,
"transaction_vendor": "STARBUCKS",
"transaction_amount": 12.50
}
```
## 🔍 AI Matching Criteria
The engine uses three primary criteria for matching:
The engine uses multiple criteria for matching:
1. **Amount Similarity** - Compares receipt and transaction amounts (5% tolerance)
2. **Date Proximity** - Checks date closeness (7-day tolerance)
3. **Vendor Matching** - AI-powered vendor name comparison
3. **Vendor Matching** - AI-powered vendor name comparison using Groq LLM
4. **Rule-based Auto-approval** - Automatic approval for exact matches and high-confidence matches
## 🛠️ Development
### Project Structure
```
├── main.py # FastAPI application entry point
├── ai_matcher.py # AI-powered matching logic
├── ai_rules.py # Business rules engine
├── document_processor.py # Receipt data extraction
├── matching_engine.py # Main matching orchestrator
├── feedback_logger.py # User feedback tracking
├── models.py # Pydantic data models
├── api_models.py # API request/response models
├── config.py # Configuration settings
├── requirements.txt # Python dependencies
└── test_images/ # Test image files
```
### Running Tests
```bash
# Test the server
curl http://localhost:8343/
# Test stats endpoint
curl http://localhost:8343/stats
# Test rules endpoint
curl http://localhost:8343/rules
```
## 🚀 Production Deployment
For production deployment:
- Replace in-memory storage with a database
- Configure proper authentication
- Set up monitoring and logging
- Use environment variables for configuration
- Replace in-memory storage with a database (PostgreSQL recommended)
- Configure proper authentication and authorization
- Set up monitoring and logging (ELK stack recommended)
- Use environment variables for all configuration
- Implement proper error handling and retries
- Set up rate limiting and API quotas
- Configure CORS for frontend integration
- Use HTTPS in production
## 📞 Support
@@ -217,4 +244,19 @@ This Data Science Engine is designed to be integrated with backend applications
- Data persistence and management
- External integrations
The engine focuses purely on AI/ML capabilities and provides a clean JSON API for backend integration.
The engine focuses purely on AI/ML capabilities and provides a clean JSON API for backend integration.
## 🔧 Troubleshooting
### Common Issues
1. **API Key Error**: Ensure `GROQ_API_KEY` is set in your `.env` file
2. **Port Already in Use**: Kill existing process with `pkill -f "python main.py"`
3. **Import Errors**: Install dependencies with `pip install -r requirements.txt`
4. **Rate Limiting**: The system includes built-in rate limiting to prevent API quota exhaustion
### Logs
Check the application logs for detailed error information:
```bash
tail -f app.log
```
+156 -32
View File
@@ -3,34 +3,75 @@ from datetime import datetime, timedelta
from typing import List, Tuple
import config
from models import Receipt, Transaction, Match
import time
import logging
import asyncio
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AIMatcher:
def __init__(self):
self.client = groq.Groq(api_key=config.GROQ_API_KEY)
self.model = "llama3-8b-8192"
self.max_retries = 3
self.retry_delay = 2 # seconds - increased for rate limiting
self.rate_limit_delay = 1.0 # seconds between API calls
self.last_api_call = 0
def match_receipts_to_transactions(self, receipts: List[Receipt], transactions: List[Transaction]) -> List[Match]:
"""Match receipts to transactions using AI"""
logger.info(f"Starting AI matching for {len(receipts)} receipts against {len(transactions)} transactions")
matches = []
for receipt in receipts:
for i, receipt in enumerate(receipts):
logger.info(f"Processing receipt {i+1}/{len(receipts)}: {receipt.vendor} - ${receipt.amount}")
# Rate limiting
self._rate_limit()
# Get the BEST match for this receipt (highest confidence score)
best_match = self._find_best_match(receipt, transactions)
if best_match:
matches.append(best_match)
logger.info(f"Found match: {best_match.confidence_score:.3f} - {best_match.match_reason}")
else:
logger.warning(f"No match found for receipt: {receipt.vendor} - ${receipt.amount}")
return sorted(matches, key=lambda x: x.confidence_score, reverse=True)
# Sort by confidence score (highest first)
matches = sorted(matches, key=lambda x: x.confidence_score, reverse=True)
logger.info(f"AI matching completed. Found {len(matches)} matches")
return matches
def _rate_limit(self):
"""Implement rate limiting to avoid API quota exhaustion"""
current_time = time.time()
time_since_last_call = current_time - self.last_api_call
if time_since_last_call < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - time_since_last_call
logger.debug(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time)
self.last_api_call = time.time()
def _find_best_match(self, receipt: Receipt, transactions: List[Transaction]) -> Match:
"""Find the BEST match for a receipt (highest confidence score)"""
candidates = self._filter_candidates(receipt, transactions)
if not candidates:
logger.warning(f"No candidates found for receipt: {receipt.vendor} - ${receipt.amount}")
return None
logger.info(f"Found {len(candidates)} candidates for receipt: {receipt.vendor}")
best_match = None
highest_score = 0
for transaction in candidates:
score, reason = self._calculate_match_score(receipt, transaction)
logger.debug(f"Score {score:.3f} for transaction {transaction.vendor}: {reason}")
# Keep the match with the highest score, regardless of how low it is
if score > highest_score:
highest_score = score
@@ -39,21 +80,23 @@ class AIMatcher:
return best_match
def _filter_candidates(self, receipt: Receipt, transactions: List[Transaction]) -> List[Transaction]:
# Return MOST transactions - let the AI decide on scoring
# Only filter out transactions with completely different amounts (>100% difference) to avoid obvious mismatches
"""Filter transactions to create a reasonable candidate list"""
candidates = []
amount_threshold = receipt.amount * 1.0 # 100% threshold - more inclusive
amount_threshold = receipt.amount * 2.0 # 200% threshold - very inclusive
for transaction in transactions:
# Use absolute value for transaction amount comparison
transaction_amount_abs = abs(transaction.amount)
# Only exclude transactions with obviously different amounts
if abs(receipt.amount - transaction_amount_abs) <= amount_threshold:
candidates.append(transaction)
logger.debug(f"Filtered {len(transactions)} transactions to {len(candidates)} candidates")
return candidates
def _calculate_match_score(self, receipt: Receipt, transaction: Transaction) -> Tuple[float, str]:
"""Calculate match score using AI"""
# Calculate differences for the AI to consider
date_diff = abs((receipt.receipt_date - transaction.transaction_date).days)
transaction_amount_abs = abs(transaction.amount)
@@ -61,7 +104,7 @@ class AIMatcher:
amount_percent_diff = (amount_diff / receipt.amount) * 100 if receipt.amount > 0 else 0
prompt = f"""
Compare this receipt with this transaction and provide a confidence score (0-1) and brief reason:
Compare this receipt with this transaction and provide a confidence score (0-1) and brief reason.
Receipt: {receipt.vendor}, ${receipt.amount}, {receipt.receipt_date.strftime('%Y-%m-%d')}
Transaction: {transaction.vendor}, ${transaction.amount} (absolute: ${transaction_amount_abs}), {transaction.transaction_date.strftime('%Y-%m-%d')}
@@ -81,33 +124,114 @@ class AIMatcher:
- Minimal similarity: 0.1-0.19
- No meaningful similarity: 0.0-0.09
Examples:
- Same vendor, same amount, 11 days apart: 0.7-0.8
- Similar vendor name, same amount, same date: 0.8-0.9
- Same vendor, 10% amount difference, same date: 0.6-0.7
- Different vendor, same amount, same date: 0.3-0.4
- Completely different vendor, amount, date: 0.1-0.2
Consider vendor name similarity, amount accuracy, and date proximity. Score based on overall likelihood this is the correct match.
Return only: score|reason
IMPORTANT: Return ONLY the score and reason separated by a pipe character.
Format: [score]|[reason]
Example: 0.85|Same vendor, same amount, 2 days apart
"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=100,
temperature=0.1
)
result = response.choices[0].message.content.strip()
if '|' in result:
score_str, reason = result.split('|', 1)
score = float(score_str.strip())
return min(max(score, 0), 1), reason.strip()
else:
return 0.0, "Invalid AI response"
for attempt in range(self.max_retries):
try:
result = self._call_groq_api_with_timeout(prompt, timeout=30) # Increased timeout
# Parse the result - handle multiple formats
score, reason = self._parse_ai_response(result)
logger.debug(f"AI Response: {result}")
logger.debug(f"Parsed: score={score}, reason={reason}")
return score, reason
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed for receipt {receipt.id}: {str(e)}")
if attempt < self.max_retries - 1:
# Exponential backoff for rate limiting
sleep_time = self.retry_delay * (2 ** attempt)
logger.info(f"Waiting {sleep_time} seconds before retry...")
time.sleep(sleep_time)
else:
logger.error(f"All attempts failed for receipt {receipt.id}")
return 0.0, f"AI error after {self.max_retries} attempts: {str(e)}"
def _parse_ai_response(self, result: str) -> Tuple[float, str]:
"""Parse AI response with robust error handling"""
result = result.strip()
logger.debug(f"Parsing AI response: {result}")
# Try to find score in various formats
if '|' in result:
parts = result.split('|')
logger.debug(f"Split response into {len(parts)} parts: {parts}")
# Look for a numeric score in any part
for i, part in enumerate(parts):
part = part.strip()
try:
# Remove any non-numeric characters except decimal point
score_str_clean = ''.join(c for c in part if c.isdigit() or c == '.')
if score_str_clean:
score = float(score_str_clean)
if 0 <= score <= 1: # Valid confidence score
# Get reason from other parts
reason_parts = [p.strip() for j, p in enumerate(parts) if j != i and p.strip()]
reason = ' | '.join(reason_parts) if reason_parts else "Score extracted"
logger.debug(f"Found score {score} in part {i}, reason: {reason}")
return score, reason
except ValueError:
continue
# Try to extract just a number from the response
try:
import re
numbers = re.findall(r'\d+\.?\d*', result)
if numbers:
for num_str in numbers:
score = float(num_str)
if 0 <= score <= 1: # Valid confidence score
logger.debug(f"Extracted score {score} from response")
return score, f"Extracted from response: {result[:50]}..."
except (ValueError, IndexError):
pass
# Fallback - try to find any number and normalize it
try:
import re
numbers = re.findall(r'\d+\.?\d*', result)
if numbers:
score = float(numbers[0])
# Normalize to 0-1 range if it's a percentage or other scale
if score > 1:
score = score / 100 # Assume percentage
score = max(0, min(1, score)) # Clamp to 0-1
logger.debug(f"Normalized score {score} from response")
return score, f"Normalized from response: {result[:50]}..."
except (ValueError, IndexError):
pass
# Final fallback
logger.warning(f"Could not parse AI response: {result}")
return 0.0, f"Unparseable response: {result[:50]}..."
def _call_groq_api_with_timeout(self, prompt: str, timeout: int = 15) -> str:
"""Make API call with timeout and retry logic"""
import concurrent.futures
def api_call():
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=200,
temperature=0.1
)
return response.choices[0].message.content.strip()
except Exception as e:
raise e
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(api_call)
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
raise Exception(f"API call timed out after {timeout} seconds")
except Exception as e:
return 0.0, f"AI error: {str(e)}"
raise e
+34 -9
View File
@@ -20,7 +20,7 @@ class AIRulesEngine:
self.rules = [
AIRule("exact_amount_match", "amount_diff <= 0.01", "auto_approve", "system"),
AIRule("same_vendor_same_date", "vendor_match and date_diff <= 1", "high_confidence", "system"),
AIRule("gas_station_pattern", "vendor contains 'gas' or 'fuel'", "categorize_transport", "system")
AIRule("gas_station_pattern", "vendor_contains_gas_or_fuel", "categorize_transport", "system")
]
def apply_rules(self, receipt: Receipt, transaction: Transaction) -> Dict[str, Any]:
@@ -36,17 +36,42 @@ class AIRulesEngine:
return results
def _evaluate_condition(self, condition: str, receipt: Receipt, transaction: Transaction) -> bool:
amount_diff = abs(receipt.amount - transaction.amount)
"""Safely evaluate rule conditions without using eval()"""
amount_diff = abs(receipt.amount - abs(transaction.amount))
date_diff = abs((receipt.receipt_date - transaction.transaction_date).days)
vendor_match = receipt.vendor.lower() in transaction.vendor.lower() or transaction.vendor.lower() in receipt.vendor.lower()
vendor_lower = receipt.vendor.lower()
vendor_contains_gas_or_fuel = 'gas' in vendor_lower or 'fuel' in vendor_lower
return eval(condition, {
"amount_diff": amount_diff,
"date_diff": date_diff,
"vendor_match": vendor_match,
"receipt": receipt,
"transaction": transaction
})
# Handle specific condition types safely
if condition == "amount_diff <= 0.01":
return amount_diff <= 0.01
elif condition == "vendor_match and date_diff <= 1":
return vendor_match and date_diff <= 1
elif condition == "vendor_contains_gas_or_fuel":
return vendor_contains_gas_or_fuel
else:
# For any other conditions, try to evaluate them safely
try:
# Only allow safe operations
safe_globals = {
"amount_diff": amount_diff,
"date_diff": date_diff,
"vendor_match": vendor_match,
"vendor_contains_gas_or_fuel": vendor_contains_gas_or_fuel,
"receipt": receipt,
"transaction": transaction,
"abs": abs,
"len": len,
"min": min,
"max": max,
"sum": sum,
"round": round
}
return eval(condition, safe_globals, {})
except (SyntaxError, NameError, TypeError) as e:
print(f"Warning: Invalid condition '{condition}': {e}")
return False
def _execute_action(self, action: str, results: Dict[str, Any], receipt: Receipt, transaction: Transaction):
if action == "auto_approve":
+7 -1
View File
@@ -3,7 +3,13 @@ from dotenv import load_dotenv
load_dotenv()
GROQ_API_KEY = "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"
# Get API key from environment variable with fallback
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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")
# Validate API key
if not GROQ_API_KEY or GROQ_API_KEY == "your_api_key_here":
raise ValueError("GROQ_API_KEY environment variable is not set or invalid. Please set it in your .env file.")
CONFIDENCE_THRESHOLD = 0.3
DATE_TOLERANCE_DAYS = 7
AMOUNT_TOLERANCE_PERCENT = 0.05
-82
View File
@@ -1,82 +0,0 @@
import csv
from dateutil import parser
from datetime import datetime, timedelta
# Config values
DATE_TOLERANCE_DAYS = 7
AMOUNT_TOLERANCE_PERCENT = 0.05
CONFIDENCE_THRESHOLD = 0.8
# Receipt data
receipt_date = datetime(2025, 2, 7)
receipt_amount = 1412.5
receipt_vendor = "Ajai Srivastava CPA, Accounting Services & Taxes"
print("=== DEBUGGING AJAI RECEIPT MATCH ===")
print(f"Receipt Date: {receipt_date}")
print(f"Receipt Amount: ${receipt_amount}")
print(f"Receipt Vendor: {receipt_vendor}")
print(f"Date Tolerance: {DATE_TOLERANCE_DAYS} days")
print(f"Amount Tolerance: {AMOUNT_TOLERANCE_PERCENT * 100}%")
print()
# Check CSV transaction
csv_transaction = {
"date": "2/18/2025",
"amount": -1412.5,
"vendor": "Ajai Srivastava"
}
# Parse CSV date
csv_date = parser.parse(csv_transaction["date"])
csv_amount = csv_transaction["amount"]
csv_vendor = csv_transaction["vendor"]
print("=== CSV TRANSACTION ===")
print(f"CSV Date: {csv_date}")
print(f"CSV Amount: ${csv_amount}")
print(f"CSV Vendor: {csv_vendor}")
print()
# Check date tolerance
date_diff = abs((receipt_date - csv_date).days)
date_match = date_diff <= DATE_TOLERANCE_DAYS
print("=== DATE CHECK ===")
print(f"Date Difference: {date_diff} days")
print(f"Date Match: {date_match}")
print(f"Tolerance: {DATE_TOLERANCE_DAYS} days")
print()
# Check amount tolerance
amount_tolerance = receipt_amount * AMOUNT_TOLERANCE_PERCENT
amount_diff = abs(receipt_amount - abs(csv_amount)) # Use absolute value for negative amounts
amount_match = amount_diff <= amount_tolerance
print("=== AMOUNT CHECK ===")
print(f"Receipt Amount: ${receipt_amount}")
print(f"CSV Amount (abs): ${abs(csv_amount)}")
print(f"Amount Difference: ${amount_diff}")
print(f"Amount Tolerance: ${amount_tolerance}")
print(f"Amount Match: {amount_match}")
print()
# Check vendor similarity
vendor_similarity = "Ajai Srivastava" in receipt_vendor
print("=== VENDOR CHECK ===")
print(f"Receipt Vendor: {receipt_vendor}")
print(f"CSV Vendor: {csv_vendor}")
print(f"Vendor Similarity: {vendor_similarity}")
print()
# Overall result
print("=== RESULT ===")
if date_match and amount_match:
print("✅ Transaction would pass initial filtering")
print("Would proceed to AI matching stage")
else:
print("❌ Transaction filtered out before AI matching")
if not date_match:
print(f" - Date difference ({date_diff} days) > tolerance ({DATE_TOLERANCE_DAYS} days)")
if not amount_match:
print(f" - Amount difference (${amount_diff}) > tolerance (${amount_tolerance})")
+295 -8
View File
@@ -8,6 +8,9 @@ import config
import os
import aiofiles
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self):
@@ -160,27 +163,127 @@ class DocumentProcessor:
import json
import re
# Find JSON in response
# Find JSON in response - try multiple patterns
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
if json_match:
json_str = json_match.group()
data = json.loads(json_str)
# Clean up common JSON issues
json_str = re.sub(r',\s*([}\]])', r'\1', json_str) # Remove trailing commas
json_str = re.sub(r'([{,])\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', json_str) # Quote unquoted keys
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
# Try to fix common JSON issues
logger.warning(f"Initial JSON parsing failed: {e}")
# Try to extract individual fields using regex
vendor_match = re.search(r'"vendor"\s*:\s*"([^"]*)"', json_str)
total_amount_match = re.search(r'"total_amount"\s*:\s*([0-9.]+)', json_str)
tax_amount_match = re.search(r'"tax_amount"\s*:\s*([0-9.]+)', json_str)
date_match = re.search(r'"date"\s*:\s*"([^"]*)"', json_str)
category_match = re.search(r'"category"\s*:\s*"([^"]*)"', json_str)
confidence_match = re.search(r'"confidence"\s*:\s*([0-9.]+)', json_str)
data = {
"vendor": vendor_match.group(1) if vendor_match else "",
"total_amount": float(total_amount_match.group(1)) if total_amount_match else 0.0,
"tax_amount": float(tax_amount_match.group(1)) if tax_amount_match else 0.0,
"date": date_match.group(1) if date_match else "",
"category": category_match.group(1) if category_match else "Other",
"confidence": float(confidence_match.group(1)) if confidence_match else 0.5
}
# Validate and clean data
return {
"vendor": data.get("vendor", "").strip(),
"vendor": str(data.get("vendor", "")).strip(),
"total_amount": float(data.get("total_amount", 0)),
"tax_amount": float(data.get("tax_amount", 0)),
"date": data.get("date", ""),
"category": data.get("category", "Other"),
"date": str(data.get("date", "")).strip(),
"category": str(data.get("category", "Other")).strip(),
"confidence": float(data.get("confidence", 0.5)),
"extraction_success": True
}
else:
return {"error": "Could not parse JSON from AI response"}
# Try to extract fields from plain text
logger.warning("No JSON found in response, attempting text extraction")
return self._extract_from_plain_text(result_text)
except Exception as e:
return {"error": f"JSON parsing error: {str(e)}"}
logger.error(f"JSON parsing error: {str(e)}")
return {"error": f"JSON parsing error: {str(e)}", "extraction_success": False}
def _extract_from_plain_text(self, text: str) -> Dict[str, Any]:
"""Extract receipt data from plain text when JSON parsing fails"""
try:
import re
# Extract vendor (look for common patterns)
vendor_patterns = [
r'(?:vendor|store|merchant|company)\s*[:\-]?\s*([A-Za-z0-9\s&.,]+)',
r'([A-Z][A-Za-z0-9\s&.,]{3,30})', # Capitalized words
]
vendor = ""
for pattern in vendor_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
vendor = match.group(1).strip()
break
# Extract amount (look for currency patterns)
amount_patterns = [
r'\$?\s*([0-9,]+\.?[0-9]*)',
r'(?:total|amount|sum)\s*[:\-]?\s*\$?\s*([0-9,]+\.?[0-9]*)',
]
total_amount = 0.0
for pattern in amount_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
try:
total_amount = float(match.group(1).replace(',', ''))
break
except ValueError:
continue
# Extract date
date_patterns = [
r'(\d{4}-\d{2}-\d{2})',
r'(\d{1,2}/\d{1,2}/\d{2,4})',
r'(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2},?\s+\d{4}',
]
date = ""
for pattern in date_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
date = match.group(0)
break
return {
"vendor": vendor or "Unknown",
"total_amount": total_amount,
"tax_amount": 0.0,
"date": date or "",
"category": "Other",
"confidence": 0.3, # Low confidence for text extraction
"extraction_success": True
}
except Exception as e:
logger.error(f"Text extraction error: {str(e)}")
return {
"vendor": "Unknown",
"total_amount": 0.0,
"tax_amount": 0.0,
"date": "",
"category": "Other",
"confidence": 0.1,
"extraction_success": False,
"error": f"Text extraction failed: {str(e)}"
}
async def save_uploaded_file(self, file_content: bytes, filename: str) -> str:
"""Save uploaded file to temporary storage"""
@@ -201,4 +304,188 @@ class DocumentProcessor:
return file_path
except Exception as e:
raise Exception(f"File save error: {str(e)}")
raise Exception(f"Failed to save file: {str(e)}")
async def extract_transactions_from_image(self, image_path: str) -> Dict[str, Any]:
"""Extract multiple transactions from an image (bank statement, credit card statement, etc.)"""
try:
# Encode image to base64
base64_image = self._encode_image(image_path)
# Create Groq vision prompt for transaction extraction
prompt = """
Analyze this financial document image (bank statement, credit card statement, etc.) and extract ALL transactions in JSON format.
Look for transaction lists, payment records, or any financial entries that show:
- Date
- Amount (positive or negative)
- Vendor/Description/Payee name
- Any additional notes or memo
Return the transactions as a JSON array:
{
"extraction_success": true,
"transactions": [
{
"date": "YYYY-MM-DD",
"amount": 0.00,
"vendor": "Vendor name",
"memo": "Additional notes"
},
{
"date": "YYYY-MM-DD",
"amount": -0.00,
"vendor": "Another vendor",
"memo": "Payment or charge description"
}
]
}
Rules:
- Extract ALL visible transactions
- Include both positive (credits) and negative (debits) amounts
- Use the actual date format from the document
- Vendor should be the merchant/payee name
- Memo can include transaction type, reference numbers, etc.
- If no transactions found, return empty array but set extraction_success to true
Return only valid JSON.
"""
# Call Groq vision API
response = self.client.chat.completions.create(
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
},
},
],
}
],
model=self.model,
max_tokens=2000, # Higher token limit for multiple transactions
temperature=0.1
)
# Parse response
result_text = response.choices[0].message.content.strip()
return self._parse_transaction_extraction_result(result_text)
except Exception as e:
return {
"extraction_success": False,
"error": f"Transaction extraction error: {str(e)}",
"transactions": []
}
def _parse_transaction_extraction_result(self, result_text: str) -> Dict[str, Any]:
"""Parse Groq response for transaction extraction"""
try:
import json
import re
# Find the first '{' and last '}'
start = result_text.find('{')
end = result_text.rfind('}')
if start == -1 or end == -1 or end <= start:
return {
"extraction_success": False,
"error": "Could not find JSON object in AI response",
"transactions": []
}
json_str = result_text[start:end+1]
# Remove trailing commas before } or ]
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
try:
data = json.loads(json_str)
except Exception as e:
import logging
logging.error(f"JSON parsing error: {str(e)}")
logging.error(f"Offending JSON string:\n{json_str}")
return {
"extraction_success": False,
"error": f"JSON parsing error: {str(e)}",
"transactions": []
}
# Validate and clean data
transactions = data.get("transactions", [])
cleaned_transactions = []
for txn in transactions:
try:
cleaned_txn = {
"date": str(txn.get("date", "")).strip(),
"amount": float(str(txn.get("amount", 0)).replace('$', '').replace(',', '')),
"vendor": str(txn.get("vendor", "")).strip(),
"memo": str(txn.get("memo", "")).strip()
}
cleaned_transactions.append(cleaned_txn)
except Exception as e:
continue
return {
"extraction_success": data.get("extraction_success", True),
"transactions": cleaned_transactions,
"total_transactions": len(cleaned_transactions)
}
except Exception as e:
import logging
logging.error(f"JSON parsing error (outer): {str(e)}")
return {
"extraction_success": False,
"error": f"JSON parsing error: {str(e)}",
"transactions": []
}
def _parse_date_to_iso(self, date_str: str) -> str:
"""Parse various date formats and convert to YYYY-MM-DD"""
try:
import re
from datetime import datetime
date_str = date_str.strip().upper()
# Handle formats like "MAY 22", "JUN 01", "MAY 22, 2024"
month_pattern = r'(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\s+(\d{1,2})(?:,\s*(\d{4}))?'
match = re.match(month_pattern, date_str)
if match:
month_abbr, day, year = match.groups()
month_map = {
'JAN': 1, 'FEB': 2, 'MAR': 3, 'APR': 4, 'MAY': 5, 'JUN': 6,
'JUL': 7, 'AUG': 8, 'SEP': 9, 'OCT': 10, 'NOV': 11, 'DEC': 12
}
month = month_map[month_abbr]
day = int(day)
year = int(year) if year else datetime.now().year
# Handle 2-digit years
if year < 100:
year += 2000
return f"{year:04d}-{month:02d}-{day:02d}"
# Handle YYYY-MM-DD format
if re.match(r'\d{4}-\d{2}-\d{2}', date_str):
return date_str
# Handle MM/DD/YYYY format
if re.match(r'\d{1,2}/\d{1,2}/\d{4}', date_str):
return datetime.strptime(date_str, '%m/%d/%Y').strftime('%Y-%m-%d')
# Handle MM/DD/YY format
if re.match(r'\d{1,2}/\d{1,2}/\d{2}', date_str):
return datetime.strptime(date_str, '%m/%d/%y').strftime('%Y-%m-%d')
return None
except Exception:
return None
+337 -415
View File
@@ -5,22 +5,32 @@ from typing import List
import uuid
import csv
import io
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
from api_models import (
MatchingRequest, MatchingResponse, MatchResponse,
ApprovalRequest, RuleRequest, DocumentUploadResponse,
DocumentProcessResponse, DriveSyncRequest, DriveSyncResponse,
QuickBooksImportRequest, QuickBooksImportResponse, TransactionRequest
DocumentProcessResponse, TransactionRequest
)
from models import Receipt, Transaction, Match
from matching_engine import MatchingEngine
from ai_rules import AIRule
from document_processor import DocumentProcessor
from google_drive_sync import GoogleDriveSync
app = FastAPI(
title="AI Bookkeeper - Data Science Engine",
description="AI-powered receipt-to-transaction matching engine. Receives QuickBooks data from backend and provides intelligent matching capabilities.",
description="AI-powered receipt-to-transaction matching engine. Receives transaction data and provides intelligent matching capabilities.",
version="1.0.0"
)
@@ -36,7 +46,6 @@ app.add_middleware(
# Initialize DS Engine components
matching_engine = MatchingEngine()
document_processor = DocumentProcessor()
drive_sync = GoogleDriveSync()
# In-memory storage for uploaded files (in production, use a database)
uploaded_files = {}
@@ -55,53 +64,13 @@ async def root():
}
# ============================================================================
# QUICKBOOKS DATA IMPORT ENDPOINTS
# TRANSACTION IMPORT ENDPOINTS
# ============================================================================
@app.post("/transactions/import/quickbooks", response_model=QuickBooksImportResponse)
async def import_quickbooks_transactions(request: QuickBooksImportRequest):
@app.post("/transactions/import/csv")
async def import_transactions_csv(file: UploadFile = File(...)):
"""
Import and convert QuickBooks transactions to internal format.
This endpoint receives raw QuickBooks transaction data from the backend
and converts it to the internal format used by the AI matching engine.
"""
try:
converted_transactions = []
errors = []
for qb_txn in request.transactions:
try:
# Convert QuickBooks date format to datetime
txn_date = datetime.strptime(qb_txn.txn_date, "%Y-%m-%d")
# Convert to internal TransactionRequest format
converted_txn = TransactionRequest(
id=qb_txn.id,
transaction_date=txn_date,
amount=abs(qb_txn.amount), # Ensure positive amount
vendor=qb_txn.payee_name,
notes=qb_txn.memo or f"QuickBooks transaction from {qb_txn.account_name or 'unknown account'}"
)
converted_transactions.append(converted_txn)
except Exception as e:
errors.append(f"Error converting transaction {qb_txn.id}: {str(e)}")
return QuickBooksImportResponse(
imported_count=len(converted_transactions),
converted_transactions=converted_transactions,
errors=errors
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/transactions/import/csv", response_model=QuickBooksImportResponse)
async def import_quickbooks_transactions_csv(file: UploadFile = File(...)):
"""
Import QuickBooks transactions from a CSV file (custom bank export format).
Import transactions from a CSV file (custom bank export format).
"""
try:
content = await file.read()
@@ -145,169 +114,103 @@ async def import_quickbooks_transactions_csv(file: UploadFile = File(...)):
global stored_transactions
stored_transactions = transactions
# Use the same logic as the JSON import endpoint
request_obj = QuickBooksImportRequest(transactions=transactions)
response = await import_quickbooks_transactions(request_obj)
# Attach errors from CSV parsing
if hasattr(response, 'errors'):
response.errors.extend(errors)
return response
return {
"imported_count": len(transactions),
"converted_transactions": transactions,
"errors": errors
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# RECEIPT PROCESSING ENDPOINTS
# ============================================================================
@app.post("/upload", response_model=DocumentUploadResponse)
async def upload_document(file: UploadFile = File(...)):
@app.post("/transactions/import/image")
async def import_transactions_from_image(file: UploadFile = File(...)):
"""
Upload a receipt document (PDF or image) for processing.
Supports: PDF, JPG, JPEG, PNG, GIF, BMP
Import transactions from an image (bank statement, credit card statement, etc.) using AI extraction.
"""
try:
# Validate file type
allowed_types = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp']
allowed_types = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'pdf']
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in allowed_types:
raise HTTPException(status_code=400, detail=f"Unsupported file type. Allowed: {allowed_types}")
# Read file content
file_content = await file.read()
# Save file
file_path = await document_processor.save_uploaded_file(file_content, file.filename)
# Generate file ID
file_id = str(uuid.uuid4())
# Store file info
uploaded_files[file_id] = {
"filename": file.filename,
"file_path": file_path,
"file_type": file_extension,
"upload_date": datetime.now(),
"status": "uploaded"
content = await file.read()
# Save file to disk
image_path = await document_processor.save_uploaded_file(content, file.filename)
# Extract transactions from image (pass file path)
extraction_result = await document_processor.extract_transactions_from_image(image_path)
if not extraction_result.get("extraction_success", False):
raise HTTPException(status_code=500, detail=extraction_result.get("error", "Extraction failed"))
extracted_transactions = extraction_result.get("transactions", [])
# Store transactions globally for auto-matching
global stored_transactions
stored_transactions = []
for idx, txn in enumerate(extracted_transactions):
try:
txn_id = f"img_{file.filename}_{idx+1}"
txn_date_raw = txn.get("date")
amount = txn.get("amount")
vendor = txn.get("vendor")
memo = txn.get("memo", "")
# Parse date to YYYY-MM-DD format
txn_date = document_processor._parse_date_to_iso(txn_date_raw)
if not txn_date:
# Fallback: use current year if parsing fails
txn_date = f"2024-{txn_date_raw}"
stored_transactions.append({
"id": txn_id,
"txn_date": txn_date,
"amount": amount,
"payee_name": vendor,
"memo": memo
})
except Exception as e:
continue
return {
"imported_count": len(stored_transactions),
"converted_transactions": stored_transactions,
"errors": []
}
return DocumentUploadResponse(
file_id=file_id,
filename=file.filename,
file_type=file_extension,
upload_date=datetime.now(),
status="uploaded"
)
except Exception as e:
logger.error(f"Error importing transactions from image: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/process/{file_id}", response_model=DocumentProcessResponse)
async def process_document(file_id: str):
"""
Process uploaded document and extract receipt data using AI.
Uses Groq LLM to extract vendor, amount, date, category from receipt images/PDFs.
"""
try:
if file_id not in uploaded_files:
raise HTTPException(status_code=404, detail="File not found")
file_info = uploaded_files[file_id]
file_path = file_info["file_path"]
file_type = file_info["file_type"]
# Process document using AI
result = await document_processor.process_file(file_path, file_type)
# Update file status
if "error" in result:
uploaded_files[file_id]["status"] = "failed"
else:
uploaded_files[file_id]["status"] = "processed"
uploaded_files[file_id]["extracted_data"] = result
# Store processed receipt data for auto-matching
global processed_receipts
processed_receipts[file_id] = {
"filename": file_info["filename"],
"upload_date": file_info["upload_date"],
"extraction_success": result.get("extraction_success", False),
"vendor": result.get("vendor"),
"total_amount": result.get("total_amount"),
"tax_amount": result.get("tax_amount"),
"date": result.get("date"),
"category": result.get("category"),
"confidence": result.get("confidence"),
"error": result.get("error")
}
return DocumentProcessResponse(
file_id=file_id,
extraction_success=result.get("extraction_success", False),
vendor=result.get("vendor"),
total_amount=result.get("total_amount"),
tax_amount=result.get("tax_amount"),
date=result.get("date"),
category=result.get("category"),
confidence=result.get("confidence"),
error=result.get("error")
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/documents")
async def list_documents():
"""List all uploaded and processed documents"""
try:
documents = []
for file_id, file_info in uploaded_files.items():
documents.append({
"file_id": file_id,
"filename": file_info["filename"],
"file_type": file_info["file_type"],
"upload_date": file_info["upload_date"],
"status": file_info["status"],
"extracted_data": file_info.get("extracted_data")
})
return {"documents": documents}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# DOCUMENT PROCESSING ENDPOINTS
# ============================================================================
@app.post("/upload-multiple", response_model=List[DocumentUploadResponse])
async def upload_multiple_documents(files: List[UploadFile] = File(...)):
"""
Upload multiple receipt documents (PDF or image) for processing.
Supports: PDF, JPG, JPEG, PNG, GIF, BMP
Upload multiple receipt images for processing.
This endpoint accepts multiple image files and returns file IDs
that can be used with the /process/{file_id} endpoint.
"""
responses = []
allowed_types = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp']
for file in files:
try:
try:
responses = []
for file in files:
# Validate file type
allowed_types = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'pdf']
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in allowed_types:
responses.append(DocumentUploadResponse(
file_id="",
filename=file.filename,
file_type=file_extension,
upload_date=datetime.now(),
status=f"failed: unsupported file type ({file_extension})"
))
continue
file_content = await file.read()
file_path = await document_processor.save_uploaded_file(file_content, file.filename)
raise HTTPException(status_code=400, detail=f"Unsupported file type for {file.filename}. Allowed: {allowed_types}")
# Generate unique file ID
file_id = str(uuid.uuid4())
# Read and store file content
content = await file.read()
uploaded_files[file_id] = {
"filename": file.filename,
"file_path": file_path,
"file_type": file_extension,
"upload_date": datetime.now(),
"status": "uploaded"
"content": content,
"upload_date": datetime.now()
}
responses.append(DocumentUploadResponse(
file_id=file_id,
filename=file.filename,
@@ -315,237 +218,265 @@ async def upload_multiple_documents(files: List[UploadFile] = File(...)):
upload_date=datetime.now(),
status="uploaded"
))
except Exception as e:
responses.append(DocumentUploadResponse(
file_id="",
filename=file.filename,
file_type="",
upload_date=datetime.now(),
status=f"failed: {str(e)}"
))
return responses
return responses
except Exception as e:
logger.error(f"Error uploading documents: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# GOOGLE DRIVE INTEGRATION ENDPOINTS
# ============================================================================
@app.post("/drive/sync", response_model=DriveSyncResponse)
async def sync_google_drive(request: DriveSyncRequest):
@app.post("/process/{file_id}", response_model=DocumentProcessResponse)
async def process_document(file_id: str):
"""
Sync and process receipts from Google Drive folder.
Process a previously uploaded document to extract receipt information.
Automatically downloads and processes all receipt files from the specified
Google Drive folder using AI extraction.
This endpoint uses AI to extract structured data from receipt images,
including vendor, amount, date, and category information.
"""
try:
# Process files from Drive
results = await drive_sync.process_drive_files(request.folder_id)
# Check if file exists
if file_id not in uploaded_files:
raise HTTPException(status_code=404, detail=f"File {file_id} not found")
# Count results
files_processed = len(results)
successful_extractions = len([r for r in results if r.get("extraction_success", False)])
failed_extractions = files_processed - successful_extractions
file_data = uploaded_files[file_id]
# Convert to response format
response_results = []
for result in results:
response_results.append(DocumentProcessResponse(
file_id=result.get("file_id", ""),
extraction_success=result.get("extraction_success", False),
vendor=result.get("vendor"),
total_amount=result.get("total_amount"),
tax_amount=result.get("tax_amount"),
date=result.get("date"),
category=result.get("category"),
confidence=result.get("confidence"),
error=result.get("error")
))
# Save file temporarily and process it
file_path = await document_processor.save_uploaded_file(file_data["content"], file_data["filename"])
file_type = file_data["filename"].split('.')[-1].lower()
receipt_data = await document_processor.process_file(file_path, file_type)
return DriveSyncResponse(
files_processed=files_processed,
successful_extractions=successful_extractions,
failed_extractions=failed_extractions,
results=response_results
# Store processed receipt
processed_receipts[file_id] = receipt_data
return DocumentProcessResponse(
file_id=file_id,
extraction_success=receipt_data.get("extraction_success", False),
vendor=receipt_data.get("vendor", ""),
total_amount=receipt_data.get("total_amount", 0.0),
tax_amount=receipt_data.get("tax_amount", 0.0),
date=receipt_data.get("date", ""),
category=receipt_data.get("category", ""),
confidence=receipt_data.get("confidence", 0.0),
error=receipt_data.get("error", None)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/drive/folders")
async def list_drive_folders():
"""List all accessible Google Drive folders"""
try:
folders = drive_sync.list_folders()
return {"folders": folders}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/drive/folder/{folder_id}")
async def get_folder_info(folder_id: str):
"""Get information about a specific Google Drive folder"""
try:
folder_info = drive_sync.get_folder_info(folder_id)
return folder_info
except Exception as e:
logger.error(f"Error processing document {file_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# AI MATCHING ENGINE ENDPOINTS
# MATCHING ENDPOINTS
# ============================================================================
@app.post("/match", response_model=MatchingResponse)
async def match_receipts_transactions(request: MatchingRequest):
@app.post("/match-specific", response_model=MatchingResponse)
async def match_specific_receipts(file_ids: List[str]):
"""
Match receipts to transactions using AI.
Match specific receipts against imported transactions.
Core AI matching engine that compares receipts against QuickBooks transactions
using intelligent algorithms and returns confidence scores.
This endpoint takes a list of receipt file IDs and matches them against
the currently imported transactions using AI-powered matching logic.
"""
try:
# Convert request models to internal models
receipts = [
Receipt(
id=r.id, file_name=r.file_name, upload_date=r.upload_date,
receipt_date=r.receipt_date, amount=r.amount, tax=r.tax,
vendor=r.vendor, category=r.category
) for r in request.receipts
]
logger.info(f"Starting match-specific for file IDs: {file_ids}")
transactions = [
Transaction(
id=t.id, transaction_date=t.transaction_date, amount=t.amount,
vendor=t.vendor, notes=t.notes
) for t in request.transactions
]
# Process matching using AI engine
matches = matching_engine.process_matching(receipts, transactions)
# Convert to response format
match_responses = [
MatchResponse(
receipt_id=match.receipt.id,
transaction_id=match.transaction.id,
confidence_score=match.confidence_score,
match_reason=match.match_reason,
receipt_vendor=match.receipt.vendor,
receipt_amount=match.receipt.amount,
transaction_vendor=match.transaction.vendor,
transaction_amount=match.transaction.amount
) for match in matches
]
# Get statistics
stats = matching_engine.get_matching_stats(matches)
return MatchingResponse(matches=match_responses, stats=stats)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/match-auto", response_model=MatchingResponse)
async def match_auto():
"""
Automatically match all processed receipts against all imported transactions.
This endpoint uses the stored transaction data from CSV import and
all processed receipts to perform matching without requiring manual data input.
"""
try:
# Check if transactions are imported
if not stored_transactions:
logger.warning("No transactions imported")
raise HTTPException(status_code=400, detail="No transactions imported. Please upload CSV first.")
if not processed_receipts:
raise HTTPException(status_code=400, detail="No receipts processed. Please upload and process receipts first.")
logger.info(f"Found {len(stored_transactions)} stored transactions")
# Convert stored transactions to Receipt/Transaction models
transactions = [
Transaction(
id=t["id"],
transaction_date=datetime.strptime(t["txn_date"], "%Y-%m-%d"),
amount=abs(t["amount"]),
vendor=t["payee_name"],
notes=t.get("memo", "")
) for t in stored_transactions
]
# Convert stored transactions to Transaction objects
transactions = []
for txn in stored_transactions:
try:
txn_date = datetime.strptime(txn["txn_date"], "%Y-%m-%d")
transaction = Transaction(
id=txn["id"],
transaction_date=txn_date,
amount=txn["amount"],
vendor=txn["payee_name"],
notes=txn["memo"]
)
transactions.append(transaction)
except Exception as e:
logger.warning(f"Error converting transaction {txn['id']}: {str(e)}")
continue
logger.info(f"Converted {len(transactions)} transactions")
# Get receipts for the specified file IDs
receipts = []
for file_id, receipt_data in processed_receipts.items():
if receipt_data.get("extraction_success"):
receipts.append(Receipt(
id=file_id,
file_name=receipt_data.get("filename", ""),
upload_date=receipt_data.get("upload_date", datetime.now()),
receipt_date=datetime.strptime(receipt_data.get("date", "2024-01-01"), "%Y-%m-%d"),
amount=receipt_data.get("total_amount", 0.0),
tax=receipt_data.get("tax_amount", 0.0),
vendor=receipt_data.get("vendor", ""),
category=receipt_data.get("category", "")
))
missing_files = []
if not receipts:
raise HTTPException(status_code=400, detail="No successfully processed receipts found.")
for file_id in file_ids:
if file_id in processed_receipts:
receipt_data = processed_receipts[file_id]
logger.info(f"DEBUG: receipt_data for {file_id}: {receipt_data}")
logger.info(f"DEBUG: receipt_data keys for {file_id}: {list(receipt_data.keys())}")
try:
# Handle missing date field
if "date" not in receipt_data or not receipt_data["date"]:
logger.warning(f"Missing date for receipt {file_id}, using current date")
receipt_date = datetime.now()
else:
receipt_date = datetime.strptime(receipt_data["date"], "%Y-%m-%d")
# Handle missing amount field - try multiple possible keys
amount = receipt_data.get("amount")
if amount is None:
amount = receipt_data.get("total_amount")
if amount is None:
amount = receipt_data.get("amount_total")
if amount is None:
logger.warning(f"Missing amount for receipt {file_id}, using 0.0")
amount = 0.0
# Ensure amount is a float
try:
amount = float(amount)
except (ValueError, TypeError):
logger.warning(f"Invalid amount '{amount}' for receipt {file_id}, using 0.0")
amount = 0.0
logger.info(f"DEBUG: amount for {file_id}: {amount}")
# Handle missing vendor field
vendor = receipt_data.get("vendor", "")
if not vendor:
logger.warning(f"Missing vendor for receipt {file_id}, using 'Unknown'")
vendor = "Unknown"
# Handle missing category field
category = receipt_data.get("category", "Other")
# Handle tax field
tax = receipt_data.get("tax", receipt_data.get("tax_amount", 0.0))
try:
tax = float(tax)
except (ValueError, TypeError):
tax = 0.0
receipt = Receipt(
id=file_id,
file_name=uploaded_files[file_id]["filename"],
upload_date=uploaded_files[file_id]["upload_date"],
receipt_date=receipt_date,
amount=amount,
tax=tax,
vendor=vendor,
category=category
)
receipts.append(receipt)
logger.info(f"Added receipt: {receipt.vendor} - ${receipt.amount}")
except Exception as e:
logger.warning(f"Error creating receipt object for {file_id}: {str(e)}")
missing_files.append(f"{file_id} (error: {str(e)})")
else:
logger.warning(f"Receipt {file_id} not found in processed_receipts")
missing_files.append(f"{file_id} (not found)")
# Process matching using AI engine
matches = matching_engine.process_matching(receipts, transactions)
if missing_files:
logger.error(f"Missing files: {missing_files}")
raise HTTPException(status_code=400, detail=f"Missing files: {missing_files}")
# Convert to response format
match_responses = [
MatchResponse(
receipt_id=match.receipt.id,
transaction_id=match.transaction.id,
confidence_score=match.confidence_score,
match_reason=match.match_reason,
receipt_vendor=match.receipt.vendor,
receipt_amount=match.receipt.amount,
transaction_vendor=match.transaction.vendor,
transaction_amount=match.transaction.amount
) for match in matches
]
logger.info(f"Processing {len(receipts)} receipts against {len(transactions)} transactions")
# Get statistics
stats = matching_engine.get_matching_stats(matches)
return MatchingResponse(matches=match_responses, stats=stats)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/approve")
async def approve_match(request: ApprovalRequest):
"""
Approve or reject an AI match.
Logs user feedback for continuous AI improvement and learning.
"""
try:
if request.action == "approve":
return {"message": f"Match {request.match_id} approved by {request.user_id}"}
elif request.action == "reject":
return {"message": f"Match {request.match_id} rejected by {request.user_id}. Reason: {request.reason}"}
else:
raise HTTPException(status_code=400, detail="Action must be 'approve' or 'reject'")
# Perform matching
try:
logger.info("Starting direct matching call (without ThreadPoolExecutor)")
logger.info(f"matching_engine type: {type(matching_engine)}")
logger.info(f"matching_engine.process_matching type: {type(matching_engine.process_matching)}")
logger.info(f"receipts type: {type(receipts)}, length: {len(receipts)}")
logger.info(f"transactions type: {type(transactions)}, length: {len(transactions)}")
matches = matching_engine.process_matching(receipts, transactions)
logger.info(f"Matching completed successfully. Found {len(matches)} matches")
# Convert matches to response format
match_responses = []
for match in matches:
logger.info(f"Raw match object: {match}")
logger.info(f" receipt_id: {match.receipt.id}")
logger.info(f" transaction_id: {match.transaction.id}")
logger.info(f" confidence_score: {match.confidence_score}")
logger.info(f" match_reason: {match.match_reason}")
logger.info(f" receipt_vendor: {match.receipt.vendor}")
logger.info(f" receipt_amount: {match.receipt.amount}")
logger.info(f" transaction_vendor: {match.transaction.vendor}")
logger.info(f" transaction_amount: {match.transaction.amount}")
match_response = MatchResponse(
receipt_id=match.receipt.id,
transaction_id=match.transaction.id,
confidence_score=match.confidence_score,
match_reason=match.match_reason,
receipt_vendor=match.receipt.vendor,
receipt_amount=match.receipt.amount,
transaction_vendor=match.transaction.vendor,
transaction_amount=match.transaction.amount
)
match_responses.append(match_response)
logger.info(f"Successfully created MatchResponse for {match.receipt.vendor} -> {match.transaction.vendor}")
logger.info(f"Formatted {len(match_responses)} match responses")
# Calculate statistics
if match_responses:
high_confidence = sum(1 for m in match_responses if m.confidence_score >= 0.8)
low_confidence = len(match_responses) - high_confidence
avg_score = sum(m.confidence_score for m in match_responses) / len(match_responses)
else:
high_confidence = low_confidence = avg_score = 0
stats = {
"total": len(match_responses),
"high_confidence": high_confidence,
"low_confidence": low_confidence,
"avg_score": round(avg_score, 2)
}
logger.info(f"Generated stats: {stats}")
logger.info(f"Match-specific completed successfully with {len(match_responses)} matches")
return MatchingResponse(
matches=match_responses,
stats=stats
)
except Exception as e:
logger.error(f"Exception in matching section: {str(e)}")
logger.error(f"Exception type: {type(e)}")
logger.error(f"Exception args: {e.args}")
logger.error(f"Traceback: {e.__traceback__}")
raise HTTPException(status_code=500, detail=f"Unexpected matching error: {str(e)}")
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in match_specific_receipts: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# AI RULES MANAGEMENT ENDPOINTS
# RULES MANAGEMENT ENDPOINTS
# ============================================================================
@app.post("/rules")
async def add_rule(request: RuleRequest):
"""Add a new AI rule for matching and categorization"""
"""
Add a new AI rule for transaction matching.
"""
try:
rule = AIRule(
new_rule = AIRule(
name=request.name,
condition=request.condition,
action=request.action,
source=request.source
)
matching_engine.rules_engine.add_rule(rule)
matching_engine.rules_engine.rules.append(new_rule)
return {"message": f"Rule '{request.name}' added successfully"}
except Exception as e:
@@ -553,68 +484,59 @@ async def add_rule(request: RuleRequest):
@app.get("/rules")
async def get_rules():
"""Get all active AI rules"""
"""
Get all current AI rules.
"""
try:
rules = matching_engine.rules_engine.rules
return {
"rules": [
{
"name": rule.name,
"condition": rule.condition,
"action": rule.action,
"source": rule.source,
"status": rule.status
} for rule in rules
]
}
rules = []
for rule in matching_engine.rules_engine.rules:
rules.append({
"name": rule.name,
"condition": rule.condition,
"action": rule.action,
"source": rule.source,
"status": rule.status
})
return {"rules": rules}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/rules/{rule_name}")
async def delete_rule(rule_name: str):
"""Delete an AI rule"""
"""
Delete an AI rule by name.
"""
try:
matching_engine.rules_engine.remove_rule(rule_name)
return {"message": f"Rule '{rule_name}' deleted successfully"}
rules = matching_engine.rules_engine.rules
for i, rule in enumerate(rules):
if rule.name == rule_name:
del rules[i]
return {"message": f"Rule '{rule_name}' deleted successfully"}
raise HTTPException(status_code=404, detail=f"Rule '{rule_name}' not found")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# SYSTEM MONITORING ENDPOINTS
# STATISTICS ENDPOINT
# ============================================================================
@app.get("/stats")
async def get_stats():
"""Get system statistics and performance metrics"""
try:
recent_logs = matching_engine.feedback_logger.get_recent_logs(30)
return {
"total_feedback_logs": len(matching_engine.feedback_logger.logs),
"recent_feedback_logs": len(recent_logs),
"active_rules": len([r for r in matching_engine.rules_engine.rules if r.status == "active"]),
"uploaded_documents": len(uploaded_files),
"processed_documents": len([f for f in uploaded_files.values() if f["status"] == "processed"]),
"stored_transactions": len(stored_transactions),
"processed_receipts": len(processed_receipts)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status")
async def get_status():
"""Get current system status for demo purposes"""
"""
Get system statistics.
"""
try:
return {
"csv_uploaded": len(stored_transactions) > 0,
"transactions_count": len(stored_transactions),
"receipts_uploaded": len(uploaded_files),
"receipts_processed": len(processed_receipts),
"ready_for_matching": len(stored_transactions) > 0 and len(processed_receipts) > 0,
"sample_transactions": stored_transactions[:3] if stored_transactions else [],
"sample_receipts": list(processed_receipts.keys())[:3] if processed_receipts else []
"total_transactions": len(stored_transactions),
"total_receipts": len(processed_receipts),
"total_uploaded_files": len(uploaded_files),
"rules_count": len(matching_engine.rules_engine.rules)
}
except Exception as e:
@@ -622,4 +544,4 @@ async def get_status():
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8343)
uvicorn.run(app, host="0.0.0.0", port=8343)
-49
View File
@@ -1,49 +0,0 @@
import json
import requests
import csv
from dateutil import parser
# Prepare transactions
transactions = []
with open("chequing statement.csv", newline="") as f:
reader = csv.DictReader(f)
idx = 1
for row in reader:
try:
txn_id = f"{row['Account Number']}_{idx}"
txn_date = parser.parse(row["Transaction Date"]).isoformat()
amount = float(row["Amount"].replace(",", "").strip())
vendor = row["Description 2"].strip()
notes = f"{row['Account Type']} {row['Cheque Number']} {row['Description 1']}".strip()
transactions.append({
"id": txn_id,
"transaction_date": txn_date,
"amount": amount,
"vendor": vendor,
"notes": notes
})
idx += 1
except Exception as e:
continue
# Receipt data for Ajai Invoice (3).jpg
receipt = {
"id": "33754868-bff5-4caf-9ece-cfd63f4e52d9",
"file_name": "Ajai Invoice (3).jpg",
"upload_date": "2025-07-02T15:31:23.641315",
"receipt_date": "2025-02-07T00:00:00",
"amount": 1412.5,
"tax": 162.5,
"vendor": "Ajai Srivastava CPA, Accounting Services & Taxes",
"category": "Office"
}
# Build request
data = {
"receipts": [receipt],
"transactions": transactions
}
# Post to /match
response = requests.post("http://localhost:8000/match", json=data)
print(json.dumps(response.json(), indent=2))