from fastapi import FastAPI, HTTPException, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from datetime import datetime from typing import List import uuid import csv import io from api_models import ( MatchingRequest, MatchingResponse, MatchResponse, ApprovalRequest, RuleRequest, DocumentUploadResponse, DocumentProcessResponse, DriveSyncRequest, DriveSyncResponse, QuickBooksImportRequest, QuickBooksImportResponse, TransactionRequest ) from models import Receipt, Transaction, Match from matching_engine import MatchingEngine from ai_rules import AIRule from document_processor import DocumentProcessor from google_drive_sync import GoogleDriveSync app = FastAPI( title="AI Bookkeeper - Data Science Engine", description="AI-powered receipt-to-transaction matching engine. Receives QuickBooks data from backend and provides intelligent matching capabilities.", version="1.0.0" ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize DS Engine components matching_engine = MatchingEngine() document_processor = DocumentProcessor() drive_sync = GoogleDriveSync() # In-memory storage for uploaded files (in production, use a database) uploaded_files = {} @app.get("/") async def root(): """Health check endpoint""" return { "message": "AI Bookkeeper Data Science Engine is running", "version": "1.0.0", "status": "healthy" } # ============================================================================ # QUICKBOOKS DATA IMPORT ENDPOINTS # ============================================================================ @app.post("/transactions/import/quickbooks", response_model=QuickBooksImportResponse) async def import_quickbooks_transactions(request: QuickBooksImportRequest): """ Import and convert QuickBooks transactions to internal format. This endpoint receives raw QuickBooks transaction data from the backend and converts it to the internal format used by the AI matching engine. """ try: converted_transactions = [] errors = [] for qb_txn in request.transactions: try: # Convert QuickBooks date format to datetime txn_date = datetime.strptime(qb_txn.txn_date, "%Y-%m-%d") # Convert to internal TransactionRequest format converted_txn = TransactionRequest( id=qb_txn.id, transaction_date=txn_date, amount=abs(qb_txn.amount), # Ensure positive amount vendor=qb_txn.payee_name, notes=qb_txn.memo or f"QuickBooks transaction from {qb_txn.account_name or 'unknown account'}" ) converted_transactions.append(converted_txn) except Exception as e: errors.append(f"Error converting transaction {qb_txn.id}: {str(e)}") return QuickBooksImportResponse( imported_count=len(converted_transactions), converted_transactions=converted_transactions, errors=errors ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/transactions/import/csv", response_model=QuickBooksImportResponse) async def import_quickbooks_transactions_csv(file: UploadFile = File(...)): """ Import QuickBooks transactions from a CSV file (custom bank export format). """ try: content = await file.read() decoded = content.decode('utf-8') reader = csv.DictReader(io.StringIO(decoded)) transactions = [] errors = [] for idx, row in enumerate(reader): try: # Use correct headers and strip whitespace account_number = row.get('Account Number') or row.get('Account Number '.strip()) txn_date_raw = row.get('Transaction Date') or row.get('Transaction Date '.strip()) amount_raw = row.get('Amount') or row.get('Amount '.strip()) payee_name = row.get('Description 2') or row.get('Description 2 '.strip()) memo = f"{row.get('Account Type','').strip()} {row.get('Cheque Number','').strip()} {row.get('Description 1','').strip()}".strip() # Compose ID txn_id = f"{account_number}_{idx+1}" # Parse date (try multiple formats) txn_date_str = txn_date_raw.strip() txn_date = None for fmt in ("%m/%d/%y", "%m/%d/%Y"): try: txn_date = datetime.strptime(txn_date_str, fmt).strftime("%Y-%m-%d") break except Exception: continue if not txn_date: raise ValueError(f"Could not parse date: {txn_date_str}") # Parse amount amount = float(amount_raw.replace(',', '').strip()) transactions.append({ "id": txn_id, "txn_date": txn_date, "amount": amount, "payee_name": payee_name.strip(), "memo": memo }) except Exception as e: errors.append(f"Row {idx+1}: {str(e)}") # Use the same logic as the JSON import endpoint request_obj = QuickBooksImportRequest(transactions=transactions) response = await import_quickbooks_transactions(request_obj) # Attach errors from CSV parsing if hasattr(response, 'errors'): response.errors.extend(errors) return response except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # RECEIPT PROCESSING ENDPOINTS # ============================================================================ @app.post("/upload", response_model=DocumentUploadResponse) async def upload_document(file: UploadFile = File(...)): """ Upload a receipt document (PDF or image) for processing. Supports: PDF, JPG, JPEG, PNG, GIF, BMP """ try: # Validate file type allowed_types = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp'] file_extension = file.filename.split('.')[-1].lower() if file_extension not in allowed_types: raise HTTPException(status_code=400, detail=f"Unsupported file type. Allowed: {allowed_types}") # Read file content file_content = await file.read() # Save file file_path = await document_processor.save_uploaded_file(file_content, file.filename) # Generate file ID file_id = str(uuid.uuid4()) # Store file info uploaded_files[file_id] = { "filename": file.filename, "file_path": file_path, "file_type": file_extension, "upload_date": datetime.now(), "status": "uploaded" } return DocumentUploadResponse( file_id=file_id, filename=file.filename, file_type=file_extension, upload_date=datetime.now(), status="uploaded" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/process/{file_id}", response_model=DocumentProcessResponse) async def process_document(file_id: str): """ Process uploaded document and extract receipt data using AI. Uses Groq LLM to extract vendor, amount, date, category from receipt images/PDFs. """ try: if file_id not in uploaded_files: raise HTTPException(status_code=404, detail="File not found") file_info = uploaded_files[file_id] file_path = file_info["file_path"] file_type = file_info["file_type"] # Process document using AI result = await document_processor.process_file(file_path, file_type) # Update file status if "error" in result: uploaded_files[file_id]["status"] = "failed" else: uploaded_files[file_id]["status"] = "processed" uploaded_files[file_id]["extracted_data"] = result return DocumentProcessResponse( file_id=file_id, extraction_success=result.get("extraction_success", False), vendor=result.get("vendor"), total_amount=result.get("total_amount"), tax_amount=result.get("tax_amount"), date=result.get("date"), category=result.get("category"), confidence=result.get("confidence"), error=result.get("error") ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/documents") async def list_documents(): """List all uploaded and processed documents""" try: documents = [] for file_id, file_info in uploaded_files.items(): documents.append({ "file_id": file_id, "filename": file_info["filename"], "file_type": file_info["file_type"], "upload_date": file_info["upload_date"], "status": file_info["status"], "extracted_data": file_info.get("extracted_data") }) return {"documents": documents} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/upload-multiple", response_model=List[DocumentUploadResponse]) async def upload_multiple_documents(files: List[UploadFile] = File(...)): """ Upload multiple receipt documents (PDF or image) for processing. Supports: PDF, JPG, JPEG, PNG, GIF, BMP """ responses = [] allowed_types = ['pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp'] for file in files: try: file_extension = file.filename.split('.')[-1].lower() if file_extension not in allowed_types: responses.append(DocumentUploadResponse( file_id="", filename=file.filename, file_type=file_extension, upload_date=datetime.now(), status=f"failed: unsupported file type ({file_extension})" )) continue file_content = await file.read() file_path = await document_processor.save_uploaded_file(file_content, file.filename) file_id = str(uuid.uuid4()) uploaded_files[file_id] = { "filename": file.filename, "file_path": file_path, "file_type": file_extension, "upload_date": datetime.now(), "status": "uploaded" } responses.append(DocumentUploadResponse( file_id=file_id, filename=file.filename, file_type=file_extension, upload_date=datetime.now(), status="uploaded" )) except Exception as e: responses.append(DocumentUploadResponse( file_id="", filename=file.filename, file_type="", upload_date=datetime.now(), status=f"failed: {str(e)}" )) return responses # ============================================================================ # GOOGLE DRIVE INTEGRATION ENDPOINTS # ============================================================================ @app.post("/drive/sync", response_model=DriveSyncResponse) async def sync_google_drive(request: DriveSyncRequest): """ Sync and process receipts from Google Drive folder. Automatically downloads and processes all receipt files from the specified Google Drive folder using AI extraction. """ try: # Process files from Drive results = await drive_sync.process_drive_files(request.folder_id) # Count results files_processed = len(results) successful_extractions = len([r for r in results if r.get("extraction_success", False)]) failed_extractions = files_processed - successful_extractions # Convert to response format response_results = [] for result in results: response_results.append(DocumentProcessResponse( file_id=result.get("file_id", ""), extraction_success=result.get("extraction_success", False), vendor=result.get("vendor"), total_amount=result.get("total_amount"), tax_amount=result.get("tax_amount"), date=result.get("date"), category=result.get("category"), confidence=result.get("confidence"), error=result.get("error") )) return DriveSyncResponse( files_processed=files_processed, successful_extractions=successful_extractions, failed_extractions=failed_extractions, results=response_results ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/drive/folders") async def list_drive_folders(): """List all accessible Google Drive folders""" try: folders = drive_sync.list_folders() return {"folders": folders} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/drive/folder/{folder_id}") async def get_folder_info(folder_id: str): """Get information about a specific Google Drive folder""" try: folder_info = drive_sync.get_folder_info(folder_id) return folder_info except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # AI MATCHING ENGINE ENDPOINTS # ============================================================================ @app.post("/match", response_model=MatchingResponse) async def match_receipts_transactions(request: MatchingRequest): """ Match receipts to transactions using AI. Core AI matching engine that compares receipts against QuickBooks transactions using intelligent algorithms and returns confidence scores. """ try: # Convert request models to internal models receipts = [ Receipt( id=r.id, file_name=r.file_name, upload_date=r.upload_date, receipt_date=r.receipt_date, amount=r.amount, tax=r.tax, vendor=r.vendor, category=r.category ) for r in request.receipts ] transactions = [ Transaction( id=t.id, transaction_date=t.transaction_date, amount=t.amount, vendor=t.vendor, notes=t.notes ) for t in request.transactions ] # Process matching using AI engine matches = matching_engine.process_matching(receipts, transactions) # Convert to response format match_responses = [ MatchResponse( receipt_id=match.receipt.id, transaction_id=match.transaction.id, confidence_score=match.confidence_score, match_reason=match.match_reason, receipt_vendor=match.receipt.vendor, receipt_amount=match.receipt.amount, transaction_vendor=match.transaction.vendor, transaction_amount=match.transaction.amount ) for match in matches ] # Get statistics stats = matching_engine.get_matching_stats(matches) return MatchingResponse(matches=match_responses, stats=stats) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/approve") async def approve_match(request: ApprovalRequest): """ Approve or reject an AI match. Logs user feedback for continuous AI improvement and learning. """ try: if request.action == "approve": return {"message": f"Match {request.match_id} approved by {request.user_id}"} elif request.action == "reject": return {"message": f"Match {request.match_id} rejected by {request.user_id}. Reason: {request.reason}"} else: raise HTTPException(status_code=400, detail="Action must be 'approve' or 'reject'") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # AI RULES MANAGEMENT ENDPOINTS # ============================================================================ @app.post("/rules") async def add_rule(request: RuleRequest): """Add a new AI rule for matching and categorization""" try: rule = AIRule( name=request.name, condition=request.condition, action=request.action, source=request.source ) matching_engine.rules_engine.add_rule(rule) return {"message": f"Rule '{request.name}' added successfully"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/rules") async def get_rules(): """Get all active AI rules""" try: rules = matching_engine.rules_engine.rules return { "rules": [ { "name": rule.name, "condition": rule.condition, "action": rule.action, "source": rule.source, "status": rule.status } for rule in rules ] } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/rules/{rule_name}") async def delete_rule(rule_name: str): """Delete an AI rule""" try: matching_engine.rules_engine.remove_rule(rule_name) return {"message": f"Rule '{rule_name}' deleted successfully"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # SYSTEM MONITORING ENDPOINTS # ============================================================================ @app.get("/stats") async def get_stats(): """Get system statistics and performance metrics""" try: recent_logs = matching_engine.feedback_logger.get_recent_logs(30) return { "total_feedback_logs": len(matching_engine.feedback_logger.logs), "recent_feedback_logs": len(recent_logs), "active_rules": len([r for r in matching_engine.rules_engine.rules if r.status == "active"]), "uploaded_documents": len(uploaded_files), "processed_documents": len([f for f in uploaded_files.values() if f["status"] == "processed"]) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8343)