Files
ds_quickbooks/main.py
T

625 lines
24 KiB
Python

from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime
from typing import List
import uuid
import csv
import io
from api_models import (
MatchingRequest, MatchingResponse, MatchResponse,
ApprovalRequest, RuleRequest, DocumentUploadResponse,
DocumentProcessResponse, DriveSyncRequest, DriveSyncResponse,
QuickBooksImportRequest, QuickBooksImportResponse, TransactionRequest
)
from models import Receipt, Transaction, Match
from matching_engine import MatchingEngine
from ai_rules import AIRule
from document_processor import DocumentProcessor
from google_drive_sync import GoogleDriveSync
app = FastAPI(
title="AI Bookkeeper - Data Science Engine",
description="AI-powered receipt-to-transaction matching engine. Receives QuickBooks data from backend and provides intelligent matching capabilities.",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize DS Engine components
matching_engine = MatchingEngine()
document_processor = DocumentProcessor()
drive_sync = GoogleDriveSync()
# In-memory storage for uploaded files (in production, use a database)
uploaded_files = {}
# Store imported transactions globally for easy access
stored_transactions = []
processed_receipts = {}
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"message": "AI Bookkeeper Data Science Engine is running",
"version": "1.0.0",
"status": "healthy"
}
# ============================================================================
# QUICKBOOKS DATA IMPORT ENDPOINTS
# ============================================================================
@app.post("/transactions/import/quickbooks", response_model=QuickBooksImportResponse)
async def import_quickbooks_transactions(request: QuickBooksImportRequest):
"""
Import and convert QuickBooks transactions to internal format.
This endpoint receives raw QuickBooks transaction data from the backend
and converts it to the internal format used by the AI matching engine.
"""
try:
converted_transactions = []
errors = []
for qb_txn in request.transactions:
try:
# Convert QuickBooks date format to datetime
txn_date = datetime.strptime(qb_txn.txn_date, "%Y-%m-%d")
# Convert to internal TransactionRequest format
converted_txn = TransactionRequest(
id=qb_txn.id,
transaction_date=txn_date,
amount=abs(qb_txn.amount), # Ensure positive amount
vendor=qb_txn.payee_name,
notes=qb_txn.memo or f"QuickBooks transaction from {qb_txn.account_name or 'unknown account'}"
)
converted_transactions.append(converted_txn)
except Exception as e:
errors.append(f"Error converting transaction {qb_txn.id}: {str(e)}")
return QuickBooksImportResponse(
imported_count=len(converted_transactions),
converted_transactions=converted_transactions,
errors=errors
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/transactions/import/csv", response_model=QuickBooksImportResponse)
async def import_quickbooks_transactions_csv(file: UploadFile = File(...)):
"""
Import QuickBooks transactions from a CSV file (custom bank export format).
"""
try:
content = await file.read()
decoded = content.decode('utf-8')
reader = csv.DictReader(io.StringIO(decoded))
transactions = []
errors = []
for idx, row in enumerate(reader):
try:
# Use correct headers and strip whitespace
account_number = row.get('Account Number') or row.get('Account Number '.strip())
txn_date_raw = row.get('Transaction Date') or row.get('Transaction Date '.strip())
amount_raw = row.get('Amount') or row.get('Amount '.strip())
payee_name = row.get('Description 2') or row.get('Description 2 '.strip())
memo = f"{row.get('Account Type','').strip()} {row.get('Cheque Number','').strip()} {row.get('Description 1','').strip()}".strip()
# Compose ID
txn_id = f"{account_number}_{idx+1}"
# Parse date (try multiple formats)
txn_date_str = txn_date_raw.strip()
txn_date = None
for fmt in ("%m/%d/%y", "%m/%d/%Y"):
try:
txn_date = datetime.strptime(txn_date_str, fmt).strftime("%Y-%m-%d")
break
except Exception:
continue
if not txn_date:
raise ValueError(f"Could not parse date: {txn_date_str}")
# Parse amount
amount = float(amount_raw.replace(',', '').strip())
transactions.append({
"id": txn_id,
"txn_date": txn_date,
"amount": amount,
"payee_name": payee_name.strip(),
"memo": memo
})
except Exception as e:
errors.append(f"Row {idx+1}: {str(e)}")
# Store transactions globally for auto-matching
global stored_transactions
stored_transactions = transactions
# Use the same logic as the JSON import endpoint
request_obj = QuickBooksImportRequest(transactions=transactions)
response = await import_quickbooks_transactions(request_obj)
# Attach errors from CSV parsing
if hasattr(response, 'errors'):
response.errors.extend(errors)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# RECEIPT PROCESSING ENDPOINTS
# ============================================================================
@app.post("/upload", response_model=DocumentUploadResponse)
async def upload_document(file: UploadFile = File(...)):
"""
Upload a receipt document (PDF or image) for processing.
Supports: PDF, JPG, JPEG, PNG, GIF, BMP
"""
try:
# Validate file type
allowed_types = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp']
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in allowed_types:
raise HTTPException(status_code=400, detail=f"Unsupported file type. Allowed: {allowed_types}")
# Read file content
file_content = await file.read()
# Save file
file_path = await document_processor.save_uploaded_file(file_content, file.filename)
# Generate file ID
file_id = str(uuid.uuid4())
# Store file info
uploaded_files[file_id] = {
"filename": file.filename,
"file_path": file_path,
"file_type": file_extension,
"upload_date": datetime.now(),
"status": "uploaded"
}
return DocumentUploadResponse(
file_id=file_id,
filename=file.filename,
file_type=file_extension,
upload_date=datetime.now(),
status="uploaded"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/process/{file_id}", response_model=DocumentProcessResponse)
async def process_document(file_id: str):
"""
Process uploaded document and extract receipt data using AI.
Uses Groq LLM to extract vendor, amount, date, category from receipt images/PDFs.
"""
try:
if file_id not in uploaded_files:
raise HTTPException(status_code=404, detail="File not found")
file_info = uploaded_files[file_id]
file_path = file_info["file_path"]
file_type = file_info["file_type"]
# Process document using AI
result = await document_processor.process_file(file_path, file_type)
# Update file status
if "error" in result:
uploaded_files[file_id]["status"] = "failed"
else:
uploaded_files[file_id]["status"] = "processed"
uploaded_files[file_id]["extracted_data"] = result
# Store processed receipt data for auto-matching
global processed_receipts
processed_receipts[file_id] = {
"filename": file_info["filename"],
"upload_date": file_info["upload_date"],
"extraction_success": result.get("extraction_success", False),
"vendor": result.get("vendor"),
"total_amount": result.get("total_amount"),
"tax_amount": result.get("tax_amount"),
"date": result.get("date"),
"category": result.get("category"),
"confidence": result.get("confidence"),
"error": result.get("error")
}
return DocumentProcessResponse(
file_id=file_id,
extraction_success=result.get("extraction_success", False),
vendor=result.get("vendor"),
total_amount=result.get("total_amount"),
tax_amount=result.get("tax_amount"),
date=result.get("date"),
category=result.get("category"),
confidence=result.get("confidence"),
error=result.get("error")
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/documents")
async def list_documents():
"""List all uploaded and processed documents"""
try:
documents = []
for file_id, file_info in uploaded_files.items():
documents.append({
"file_id": file_id,
"filename": file_info["filename"],
"file_type": file_info["file_type"],
"upload_date": file_info["upload_date"],
"status": file_info["status"],
"extracted_data": file_info.get("extracted_data")
})
return {"documents": documents}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/upload-multiple", response_model=List[DocumentUploadResponse])
async def upload_multiple_documents(files: List[UploadFile] = File(...)):
"""
Upload multiple receipt documents (PDF or image) for processing.
Supports: PDF, JPG, JPEG, PNG, GIF, BMP
"""
responses = []
allowed_types = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp']
for file in files:
try:
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in allowed_types:
responses.append(DocumentUploadResponse(
file_id="",
filename=file.filename,
file_type=file_extension,
upload_date=datetime.now(),
status=f"failed: unsupported file type ({file_extension})"
))
continue
file_content = await file.read()
file_path = await document_processor.save_uploaded_file(file_content, file.filename)
file_id = str(uuid.uuid4())
uploaded_files[file_id] = {
"filename": file.filename,
"file_path": file_path,
"file_type": file_extension,
"upload_date": datetime.now(),
"status": "uploaded"
}
responses.append(DocumentUploadResponse(
file_id=file_id,
filename=file.filename,
file_type=file_extension,
upload_date=datetime.now(),
status="uploaded"
))
except Exception as e:
responses.append(DocumentUploadResponse(
file_id="",
filename=file.filename,
file_type="",
upload_date=datetime.now(),
status=f"failed: {str(e)}"
))
return responses
# ============================================================================
# GOOGLE DRIVE INTEGRATION ENDPOINTS
# ============================================================================
@app.post("/drive/sync", response_model=DriveSyncResponse)
async def sync_google_drive(request: DriveSyncRequest):
"""
Sync and process receipts from Google Drive folder.
Automatically downloads and processes all receipt files from the specified
Google Drive folder using AI extraction.
"""
try:
# Process files from Drive
results = await drive_sync.process_drive_files(request.folder_id)
# Count results
files_processed = len(results)
successful_extractions = len([r for r in results if r.get("extraction_success", False)])
failed_extractions = files_processed - successful_extractions
# Convert to response format
response_results = []
for result in results:
response_results.append(DocumentProcessResponse(
file_id=result.get("file_id", ""),
extraction_success=result.get("extraction_success", False),
vendor=result.get("vendor"),
total_amount=result.get("total_amount"),
tax_amount=result.get("tax_amount"),
date=result.get("date"),
category=result.get("category"),
confidence=result.get("confidence"),
error=result.get("error")
))
return DriveSyncResponse(
files_processed=files_processed,
successful_extractions=successful_extractions,
failed_extractions=failed_extractions,
results=response_results
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/drive/folders")
async def list_drive_folders():
"""List all accessible Google Drive folders"""
try:
folders = drive_sync.list_folders()
return {"folders": folders}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/drive/folder/{folder_id}")
async def get_folder_info(folder_id: str):
"""Get information about a specific Google Drive folder"""
try:
folder_info = drive_sync.get_folder_info(folder_id)
return folder_info
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# AI MATCHING ENGINE ENDPOINTS
# ============================================================================
@app.post("/match", response_model=MatchingResponse)
async def match_receipts_transactions(request: MatchingRequest):
"""
Match receipts to transactions using AI.
Core AI matching engine that compares receipts against QuickBooks transactions
using intelligent algorithms and returns confidence scores.
"""
try:
# Convert request models to internal models
receipts = [
Receipt(
id=r.id, file_name=r.file_name, upload_date=r.upload_date,
receipt_date=r.receipt_date, amount=r.amount, tax=r.tax,
vendor=r.vendor, category=r.category
) for r in request.receipts
]
transactions = [
Transaction(
id=t.id, transaction_date=t.transaction_date, amount=t.amount,
vendor=t.vendor, notes=t.notes
) for t in request.transactions
]
# Process matching using AI engine
matches = matching_engine.process_matching(receipts, transactions)
# Convert to response format
match_responses = [
MatchResponse(
receipt_id=match.receipt.id,
transaction_id=match.transaction.id,
confidence_score=match.confidence_score,
match_reason=match.match_reason,
receipt_vendor=match.receipt.vendor,
receipt_amount=match.receipt.amount,
transaction_vendor=match.transaction.vendor,
transaction_amount=match.transaction.amount
) for match in matches
]
# Get statistics
stats = matching_engine.get_matching_stats(matches)
return MatchingResponse(matches=match_responses, stats=stats)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/match-auto", response_model=MatchingResponse)
async def match_auto():
"""
Automatically match all processed receipts against all imported transactions.
This endpoint uses the stored transaction data from CSV import and
all processed receipts to perform matching without requiring manual data input.
"""
try:
if not stored_transactions:
raise HTTPException(status_code=400, detail="No transactions imported. Please upload CSV first.")
if not processed_receipts:
raise HTTPException(status_code=400, detail="No receipts processed. Please upload and process receipts first.")
# Convert stored transactions to Receipt/Transaction models
transactions = [
Transaction(
id=t["id"],
transaction_date=datetime.strptime(t["txn_date"], "%Y-%m-%d"),
amount=abs(t["amount"]),
vendor=t["payee_name"],
notes=t.get("memo", "")
) for t in stored_transactions
]
receipts = []
for file_id, receipt_data in processed_receipts.items():
if receipt_data.get("extraction_success"):
receipts.append(Receipt(
id=file_id,
file_name=receipt_data.get("filename", ""),
upload_date=receipt_data.get("upload_date", datetime.now()),
receipt_date=datetime.strptime(receipt_data.get("date", "2024-01-01"), "%Y-%m-%d"),
amount=receipt_data.get("total_amount", 0.0),
tax=receipt_data.get("tax_amount", 0.0),
vendor=receipt_data.get("vendor", ""),
category=receipt_data.get("category", "")
))
if not receipts:
raise HTTPException(status_code=400, detail="No successfully processed receipts found.")
# Process matching using AI engine
matches = matching_engine.process_matching(receipts, transactions)
# Convert to response format
match_responses = [
MatchResponse(
receipt_id=match.receipt.id,
transaction_id=match.transaction.id,
confidence_score=match.confidence_score,
match_reason=match.match_reason,
receipt_vendor=match.receipt.vendor,
receipt_amount=match.receipt.amount,
transaction_vendor=match.transaction.vendor,
transaction_amount=match.transaction.amount
) for match in matches
]
# Get statistics
stats = matching_engine.get_matching_stats(matches)
return MatchingResponse(matches=match_responses, stats=stats)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/approve")
async def approve_match(request: ApprovalRequest):
"""
Approve or reject an AI match.
Logs user feedback for continuous AI improvement and learning.
"""
try:
if request.action == "approve":
return {"message": f"Match {request.match_id} approved by {request.user_id}"}
elif request.action == "reject":
return {"message": f"Match {request.match_id} rejected by {request.user_id}. Reason: {request.reason}"}
else:
raise HTTPException(status_code=400, detail="Action must be 'approve' or 'reject'")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# AI RULES MANAGEMENT ENDPOINTS
# ============================================================================
@app.post("/rules")
async def add_rule(request: RuleRequest):
"""Add a new AI rule for matching and categorization"""
try:
rule = AIRule(
name=request.name,
condition=request.condition,
action=request.action,
source=request.source
)
matching_engine.rules_engine.add_rule(rule)
return {"message": f"Rule '{request.name}' added successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/rules")
async def get_rules():
"""Get all active AI rules"""
try:
rules = matching_engine.rules_engine.rules
return {
"rules": [
{
"name": rule.name,
"condition": rule.condition,
"action": rule.action,
"source": rule.source,
"status": rule.status
} for rule in rules
]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/rules/{rule_name}")
async def delete_rule(rule_name: str):
"""Delete an AI rule"""
try:
matching_engine.rules_engine.remove_rule(rule_name)
return {"message": f"Rule '{rule_name}' deleted successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# SYSTEM MONITORING ENDPOINTS
# ============================================================================
@app.get("/stats")
async def get_stats():
"""Get system statistics and performance metrics"""
try:
recent_logs = matching_engine.feedback_logger.get_recent_logs(30)
return {
"total_feedback_logs": len(matching_engine.feedback_logger.logs),
"recent_feedback_logs": len(recent_logs),
"active_rules": len([r for r in matching_engine.rules_engine.rules if r.status == "active"]),
"uploaded_documents": len(uploaded_files),
"processed_documents": len([f for f in uploaded_files.values() if f["status"] == "processed"]),
"stored_transactions": len(stored_transactions),
"processed_receipts": len(processed_receipts)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status")
async def get_status():
"""Get current system status for demo purposes"""
try:
return {
"csv_uploaded": len(stored_transactions) > 0,
"transactions_count": len(stored_transactions),
"receipts_uploaded": len(uploaded_files),
"receipts_processed": len(processed_receipts),
"ready_for_matching": len(stored_transactions) > 0 and len(processed_receipts) > 0,
"sample_transactions": stored_transactions[:3] if stored_transactions else [],
"sample_receipts": list(processed_receipts.keys())[:3] if processed_receipts else []
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8343)