3559cbe19d
This commit introduces a new test script, `test_json_extraction.py`, which verifies the correctness of the JSON extraction logic. The script includes a function to extract the first valid JSON object from raw input and a series of test cases covering various scenarios, such as clean JSON, JSON with extra text, nested JSON, and escaped quotes. The tests ensure that the extraction function behaves as expected and handles edge cases appropriately.
836 lines
36 KiB
Python
836 lines
36 KiB
Python
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Any, Dict
|
|
|
|
import aiofiles
|
|
import groq
|
|
import PyPDF2
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DocumentProcessor:
|
|
def __init__(self):
|
|
self.client = groq.Groq(api_key=settings.GROQ_API_KEY)
|
|
self.model = "meta-llama/llama-4-scout-17b-16e-instruct" # Vision model
|
|
|
|
def _extract_first_json(self, raw: str) -> dict:
|
|
"""Extract the first valid JSON object from raw LLM output.
|
|
|
|
Handles cases where LLM returns extra text after/before the JSON.
|
|
"""
|
|
try:
|
|
# First try direct parsing (fastest path)
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Find the first '{' and match closing '}'
|
|
start = raw.find("{")
|
|
if start == -1:
|
|
raise ValueError("No JSON object found in LLM output")
|
|
|
|
depth = 0
|
|
end = -1
|
|
in_string = False
|
|
escape_next = False
|
|
|
|
for i in range(start, len(raw)):
|
|
ch = raw[i]
|
|
|
|
# Handle string escaping
|
|
if escape_next:
|
|
escape_next = False
|
|
continue
|
|
if ch == "\\":
|
|
escape_next = True
|
|
continue
|
|
|
|
# Track if we're inside a string
|
|
if ch == '"':
|
|
in_string = not in_string
|
|
continue
|
|
|
|
# Only count braces outside of strings
|
|
if not in_string:
|
|
if ch == "{":
|
|
depth += 1
|
|
elif ch == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
end = i + 1
|
|
break
|
|
|
|
if end == -1:
|
|
raise ValueError("Unbalanced JSON braces in LLM output")
|
|
|
|
json_str = raw[start:end]
|
|
return json.loads(json_str)
|
|
|
|
async def process_file(
|
|
self,
|
|
file_path: str,
|
|
file_type: str,
|
|
user_location: str = None,
|
|
ai_rules: list = None,
|
|
) -> Dict[str, Any]:
|
|
"""Process uploaded file and extract receipt data
|
|
|
|
Args:
|
|
file_path: Path to the file to process
|
|
file_type: Type of file (jpg, pdf, etc.)
|
|
user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada")
|
|
ai_rules: List of AI rules for categorization (e.g., [{"condition": "vendor is Starbucks", "action": "Food"}])
|
|
"""
|
|
try:
|
|
if file_type.lower() in ["jpg", "jpeg", "png", "gif", "bmp"]:
|
|
return await self._process_image(file_path, user_location, ai_rules)
|
|
elif file_type.lower() == "pdf":
|
|
return await self._process_pdf(file_path, user_location, ai_rules)
|
|
else:
|
|
raise ValueError(f"Unsupported file type: {file_type}")
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def _process_image(
|
|
self, image_path: str, user_location: str = None, ai_rules: list = None
|
|
) -> Dict[str, Any]:
|
|
"""Extract data from image using Groq vision
|
|
|
|
Args:
|
|
image_path: Path to the image file
|
|
user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada")
|
|
ai_rules: List of AI rules for categorization
|
|
"""
|
|
try:
|
|
# Encode image to base64
|
|
base64_image = self._encode_image(image_path)
|
|
|
|
# Build user location context
|
|
user_location_context = ""
|
|
if user_location:
|
|
user_location_context = f"""
|
|
|
|
USER LOCATION CONTEXT:
|
|
The user is located in {user_location}.
|
|
- If the receipt location is MISSING or UNCLEAR, use the user's location ({user_location}) for tax calculations.
|
|
- If the receipt clearly shows a different location, use the receipt's location instead.
|
|
- Apply depreciation rules based on the user's location.
|
|
"""
|
|
|
|
# Build AI rules context for categorization
|
|
ai_rules_context = ""
|
|
if ai_rules and len(ai_rules) > 0:
|
|
ai_rules_context = "\n CATEGORIZATION RULES (IMPORTANT - Apply these first):"
|
|
for idx, rule in enumerate(ai_rules, 1):
|
|
condition = rule.get("condition", "")
|
|
action = rule.get("action", "")
|
|
ai_rules_context += f"\n {idx}. If {condition} → set category to '{action}'"
|
|
ai_rules_context += "\n - Apply these custom rules before using default categorization logic\n - If multiple rules match, use the first matching rule\n - If no rules match, use default categorization based on vendor type"
|
|
|
|
# Create Groq vision prompt
|
|
prompt = f"""
|
|
Analyze this receipt image and extract the following information in JSON format:
|
|
{{
|
|
"vendor": "Store/company name",
|
|
"description": "Detailed description of items/services purchased",
|
|
"total_amount": 0.00,
|
|
"tax_amount": 0.00,
|
|
"date": "YYYY-MM-DD",
|
|
"category": "Food/Transport/Office/Other",
|
|
"confidence": 0.95,
|
|
"currency": "USD",
|
|
"location": "Province/State, Country",
|
|
"calculated_tax": 0.00,
|
|
"is_depreciable": false,
|
|
"name_of_asset": null,
|
|
"cca_rate": null,
|
|
"useful_life": null,
|
|
"residual_value": null
|
|
}}
|
|
|
|
Rules:
|
|
- Extract vendor name as it appears on receipt
|
|
- Extract description of items/services purchased (e.g., "Coffee and sandwich", "Gasoline", "Office supplies")
|
|
- Total amount should be the final total including tax
|
|
- Tax amount is separate tax line if available (if not clearly shown, calculate based on location)
|
|
- Date should be the date on the receipt
|
|
- Confidence score 0-1 based on how clear the receipt is
|
|
- Currency should be the currency used on the receipt (e.g., "USD", "EUR", "CAD")
|
|
{ai_rules_context}
|
|
{user_location_context}
|
|
LOCATION & TAX RULES:
|
|
- Extract location from receipt (look for store address, province/state, country)
|
|
- Format location as "Province/State, Country" (e.g., "Ontario, Canada" or "California, USA")
|
|
- If location not shown on receipt, return null for location (system will use user location as fallback)
|
|
|
|
TAX EXTRACTION RULES (IMPORTANT):
|
|
- If tax is EXPLICITLY shown on receipt (even if $0 or 0%), use that exact value:
|
|
* If receipt shows "Tax: $0", "Tax: $0.00", "Tax (0%)", or similar → set tax_amount to 0.00 and calculated_tax to null
|
|
* If receipt shows any other tax amount → set tax_amount to that value and calculated_tax to null
|
|
|
|
- If tax_amount is NOT shown or UNCLEAR on receipt, calculate it based on location:
|
|
* Ontario, Canada: 13% HST
|
|
* Quebec, Canada: 9.975% QST + 5% GST = 14.975% total
|
|
* British Columbia, Canada: 12% (5% GST + 7% PST)
|
|
* Alberta, Canada: 5% GST
|
|
* California, USA: ~7.25% (varies by locality)
|
|
* New York, USA: ~8.875% (varies by locality)
|
|
* Texas, USA: 6.25%
|
|
* For other locations, estimate based on typical rates
|
|
* Store calculated tax in "calculated_tax" field and set tax_amount to the calculated value
|
|
|
|
DEPRECIATION RULES:
|
|
- Determine if item is a depreciable asset (vehicles, machinery, equipment, computers, furniture, buildings)
|
|
- Set is_depreciable to true only for capital assets, false for consumables/services
|
|
- If is_depreciable is true, provide:
|
|
* name_of_asset: Specific name/model of the asset (e.g., "2024 Honda Accord", "Dell Laptop XPS 15", "Office Desk")
|
|
* cca_rate: CCA rate as decimal (e.g., 0.30 for 30%, 0.20 for 20%, 0.04 for 4%)
|
|
- Class 10 (Vehicles): 30%
|
|
- Class 8 (Furniture, equipment): 20%
|
|
- Class 50 (Computers, software): 55%
|
|
- Class 1 (Buildings): 4%
|
|
- Class 10.1 (Passenger vehicles >$30k): 30%
|
|
* useful_life: Expected years of use (e.g., 5 for computers, 8 for vehicles, 10 for furniture)
|
|
* residual_value: Estimated value at end of life (typically 10% of purchase price for equipment, 20% for vehicles)
|
|
- If is_depreciable is false, set name_of_asset, cca_rate, useful_life, and residual_value to null
|
|
|
|
CATEGORY RULES:
|
|
- Assign the category based on all the details in the receipt
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
# Call Groq vision API with correct format
|
|
response = self.client.chat.completions.create(
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64_image}",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
],
|
|
model=self.model,
|
|
max_tokens=800,
|
|
temperature=0.1,
|
|
)
|
|
|
|
# Parse response
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Image processing error: {str(e)}"}
|
|
|
|
def _encode_image(self, image_path: str) -> str:
|
|
"""Encode image to base64 string"""
|
|
with open(image_path, "rb") as image_file:
|
|
return base64.b64encode(image_file.read()).decode("utf-8")
|
|
|
|
async def _process_pdf(
|
|
self, pdf_path: str, user_location: str = None, ai_rules: list = None
|
|
) -> Dict[str, Any]:
|
|
"""Extract data from PDF by converting to image first
|
|
|
|
Args:
|
|
pdf_path: Path to the PDF file
|
|
user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada")
|
|
ai_rules: List of AI rules for categorization
|
|
"""
|
|
try:
|
|
# For now, extract text from PDF and process as text
|
|
text_content = self._extract_text_from_pdf(pdf_path)
|
|
return self._process_text_content(text_content, user_location, ai_rules)
|
|
|
|
except Exception as e:
|
|
return {"error": f"PDF processing error: {str(e)}"}
|
|
|
|
def _extract_text_from_pdf(self, pdf_path: str) -> str:
|
|
"""Extract text from PDF"""
|
|
try:
|
|
with open(pdf_path, "rb") as file:
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|
text = ""
|
|
for page in pdf_reader.pages:
|
|
text += page.extract_text() + "\n"
|
|
return text
|
|
except Exception:
|
|
return ""
|
|
|
|
def _process_text_content(
|
|
self, text_content: str, user_location: str = None, ai_rules: list = None
|
|
) -> Dict[str, Any]:
|
|
"""Process text content using Groq (fallback for PDFs)
|
|
|
|
Args:
|
|
text_content: Extracted text from PDF
|
|
user_location: User's location string in format "State/Province, Country" (e.g., "Ontario, Canada")
|
|
ai_rules: List of AI rules for categorization
|
|
"""
|
|
try:
|
|
# Build user location context
|
|
user_location_context = ""
|
|
if user_location:
|
|
user_location_context = f"""
|
|
|
|
USER LOCATION CONTEXT:
|
|
The user is located in {user_location}.
|
|
- If the receipt location is MISSING or UNCLEAR, use the user's location ({user_location}) for tax calculations.
|
|
- If the receipt clearly shows a different location, use the receipt's location instead.
|
|
- Apply depreciation rules based on the user's location.
|
|
"""
|
|
|
|
# Build AI rules context for categorization
|
|
ai_rules_context = ""
|
|
if ai_rules and len(ai_rules) > 0:
|
|
ai_rules_context = "\n CATEGORIZATION RULES (IMPORTANT - Apply these first):"
|
|
for idx, rule in enumerate(ai_rules, 1):
|
|
condition = rule.get("condition", "")
|
|
action = rule.get("action", "")
|
|
ai_rules_context += f"\n {idx}. If {condition} → set category to '{action}'"
|
|
ai_rules_context += "\n - Apply these custom rules before using default categorization logic\n - If multiple rules match, use the first matching rule\n - If no rules match, use default categorization based on vendor type"
|
|
|
|
prompt = f"""
|
|
Analyze this receipt text and extract the following information in JSON format:
|
|
|
|
Receipt Text:
|
|
{text_content}
|
|
|
|
Extract:
|
|
{{
|
|
"vendor": "Store/company name",
|
|
"description": "Detailed description of items/services purchased",
|
|
"total_amount": 0.00,
|
|
"tax_amount": 0.00,
|
|
"date": "YYYY-MM-DD",
|
|
"category": "Food/Transport/Office/Other",
|
|
"confidence": 0.95,
|
|
"currency": "USD",
|
|
"location": "Province/State, Country",
|
|
"calculated_tax": 0.00,
|
|
"is_depreciable": false,
|
|
"name_of_asset": null,
|
|
"cca_rate": null,
|
|
"useful_life": null,
|
|
"residual_value": null
|
|
}}
|
|
|
|
Rules:
|
|
- Extract vendor name as it appears on receipt
|
|
- Extract description of items/services purchased (e.g., "Coffee and sandwich", "Gasoline", "Office supplies")
|
|
- Total amount should be the final total including tax
|
|
- Tax amount is separate tax line if available (if not clearly shown, calculate based on location)
|
|
- Date should be the date on the receipt
|
|
- Confidence score 0-1 based on clarity
|
|
- Currency should be the currency used on the receipt (e.g., "USD", "EUR", "CAD")
|
|
{ai_rules_context}
|
|
{user_location_context}
|
|
LOCATION & TAX RULES:
|
|
- Extract location from receipt (look for store address, province/state, country)
|
|
- Format location as "Province/State, Country" (e.g., "Ontario, Canada" or "California, USA")
|
|
- If location not shown on receipt, return null for location (system will use user location as fallback)
|
|
|
|
TAX EXTRACTION RULES (IMPORTANT):
|
|
- If tax is EXPLICITLY shown on receipt (even if $0 or 0%), use that exact value:
|
|
* If receipt shows "Tax: $0", "Tax: $0.00", "Tax (0%)", or similar → set tax_amount to 0.00 and calculated_tax to null
|
|
* If receipt shows any other tax amount → set tax_amount to that value and calculated_tax to null
|
|
|
|
- If tax_amount is NOT shown or UNCLEAR on receipt, calculate it based on location:
|
|
* Ontario, Canada: 13% HST
|
|
* Quebec, Canada: 9.975% QST + 5% GST = 14.975% total
|
|
* British Columbia, Canada: 12% (5% GST + 7% PST)
|
|
* Alberta, Canada: 5% GST
|
|
* California, USA: ~7.25% (varies by locality)
|
|
* New York, USA: ~8.875% (varies by locality)
|
|
* Texas, USA: 6.25%
|
|
* For other locations, estimate based on typical rates
|
|
* Store calculated tax in "calculated_tax" field and set tax_amount to the calculated value
|
|
|
|
DEPRECIATION RULES:
|
|
- Determine if item is a depreciable asset (vehicles, machinery, equipment, computers, furniture, buildings)
|
|
- Set is_depreciable to true only for capital assets, false for consumables/services
|
|
- If is_depreciable is true, provide:
|
|
* name_of_asset: Specific name/model of the asset (e.g., "2024 Honda Accord", "Dell Laptop XPS 15", "Office Desk")
|
|
* cca_rate: CCA rate as decimal (e.g., 0.30 for 30%, 0.20 for 20%, 0.04 for 4%)
|
|
- Class 10 (Vehicles): 30%
|
|
- Class 8 (Furniture, equipment): 20%
|
|
- Class 50 (Computers, software): 55%
|
|
- Class 1 (Buildings): 4%
|
|
- Class 10.1 (Passenger vehicles >$30k): 30%
|
|
* useful_life: Expected years of use (e.g., 5 for computers, 8 for vehicles, 10 for furniture)
|
|
* residual_value: Estimated value at end of life (typically 10% of purchase price for equipment, 20% for vehicles)
|
|
- If is_depreciable is false, set name_of_asset, cca_rate, useful_life, and residual_value to null
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
response = self.client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
max_tokens=800,
|
|
temperature=0.1,
|
|
)
|
|
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Text processing error: {str(e)}"}
|
|
|
|
def _parse_extraction_result(self, result_text: str) -> Dict[str, Any]:
|
|
"""Parse Groq response and extract JSON data"""
|
|
try:
|
|
import re
|
|
|
|
# Try robust JSON extraction first (handles extra text)
|
|
try:
|
|
data = self._extract_first_json(result_text)
|
|
return data
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
logger.warning(f"Robust JSON extraction failed: {e}. Trying fallback methods...")
|
|
|
|
# Fallback: Find JSON in response - try multiple patterns
|
|
json_match = re.search(r"\{.*\}", result_text, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group()
|
|
|
|
# Clean up common JSON issues
|
|
json_str = re.sub(
|
|
r",\s*([}\]])", r"\1", json_str
|
|
) # Remove trailing commas
|
|
json_str = re.sub(
|
|
r"([{,])\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:", r'\1"\2":', json_str
|
|
) # Quote unquoted keys
|
|
|
|
try:
|
|
data = json.loads(json_str)
|
|
except json.JSONDecodeError as e:
|
|
# Try to fix common JSON issues
|
|
logger.warning(f"Fallback JSON parsing also failed: {e}")
|
|
|
|
# Try to extract individual fields using regex
|
|
vendor_match = re.search(r'"vendor"\s*:\s*"([^"]*)"', json_str)
|
|
description_match = re.search(
|
|
r'"description"\s*:\s*"([^"]*)"', json_str
|
|
)
|
|
total_amount_match = re.search(
|
|
r'"total_amount"\s*:\s*([0-9.]+)', json_str
|
|
)
|
|
tax_amount_match = re.search(
|
|
r'"tax_amount"\s*:\s*([0-9.]+)', json_str
|
|
)
|
|
date_match = re.search(r'"date"\s*:\s*"([^"]*)"', json_str)
|
|
category_match = re.search(r'"category"\s*:\s*"([^"]*)"', json_str)
|
|
confidence_match = re.search(
|
|
r'"confidence"\s*:\s*([0-9.]+)', json_str
|
|
)
|
|
currency_match = re.search(r'"currency"\s*:\s*"([^"]*)"', json_str)
|
|
location_match = re.search(r'"location"\s*:\s*"([^"]*)"', json_str)
|
|
calculated_tax_match = re.search(
|
|
r'"calculated_tax"\s*:\s*([0-9.]+|null)', json_str
|
|
)
|
|
is_depreciable_match = re.search(
|
|
r'"is_depreciable"\s*:\s*(true|false)', json_str
|
|
)
|
|
name_of_asset_match = re.search(
|
|
r'"name_of_asset"\s*:\s*"([^"]*)"', json_str
|
|
)
|
|
cca_rate_match = re.search(
|
|
r'"cca_rate"\s*:\s*([0-9.]+|null)', json_str
|
|
)
|
|
useful_life_match = re.search(
|
|
r'"useful_life"\s*:\s*([0-9]+|null)', json_str
|
|
)
|
|
residual_value_match = re.search(
|
|
r'"residual_value"\s*:\s*([0-9.]+|null)', json_str
|
|
)
|
|
|
|
data = {
|
|
"vendor": vendor_match.group(1) if vendor_match else "",
|
|
"description": description_match.group(1)
|
|
if description_match
|
|
else "",
|
|
"total_amount": float(total_amount_match.group(1))
|
|
if total_amount_match
|
|
else 0.0,
|
|
"tax_amount": float(tax_amount_match.group(1))
|
|
if tax_amount_match
|
|
else 0.0,
|
|
"date": date_match.group(1) if date_match else "",
|
|
"category": category_match.group(1)
|
|
if category_match
|
|
else "Other",
|
|
"confidence": float(confidence_match.group(1))
|
|
if confidence_match
|
|
else 0.5,
|
|
"currency": currency_match.group(1)
|
|
if currency_match
|
|
else "CAD",
|
|
"location": location_match.group(1) if location_match else None,
|
|
"calculated_tax": float(calculated_tax_match.group(1))
|
|
if calculated_tax_match
|
|
and calculated_tax_match.group(1) != "null"
|
|
else None,
|
|
"is_depreciable": is_depreciable_match.group(1) == "true"
|
|
if is_depreciable_match
|
|
else None,
|
|
"name_of_asset": name_of_asset_match.group(1)
|
|
if name_of_asset_match
|
|
else None,
|
|
"cca_rate": float(cca_rate_match.group(1))
|
|
if cca_rate_match and cca_rate_match.group(1) != "null"
|
|
else None,
|
|
"useful_life": int(useful_life_match.group(1))
|
|
if useful_life_match and useful_life_match.group(1) != "null"
|
|
else None,
|
|
"residual_value": float(residual_value_match.group(1))
|
|
if residual_value_match
|
|
and residual_value_match.group(1) != "null"
|
|
else None,
|
|
}
|
|
|
|
# Validate and clean data
|
|
return {
|
|
"vendor": str(data.get("vendor", "")).strip(),
|
|
"description": str(data.get("description", "")).strip(),
|
|
"total_amount": float(data.get("total_amount", 0)),
|
|
"tax_amount": float(data.get("tax_amount", 0)),
|
|
"date": str(data.get("date", "")).strip(),
|
|
"category": str(data.get("category", "Other")).strip(),
|
|
"confidence": float(data.get("confidence", 0.5)),
|
|
"extraction_success": True,
|
|
"currency": data.get("currency", "CAD").strip(),
|
|
"location": data.get("location"),
|
|
"calculated_tax": data.get("calculated_tax"),
|
|
"is_depreciable": data.get("is_depreciable"),
|
|
"name_of_asset": data.get("name_of_asset"),
|
|
"cca_rate": data.get("cca_rate"),
|
|
"useful_life": data.get("useful_life"),
|
|
"residual_value": data.get("residual_value"),
|
|
}
|
|
else:
|
|
# Try to extract fields from plain text
|
|
logger.warning("No JSON found in response, attempting text extraction")
|
|
return self._extract_from_plain_text(result_text)
|
|
|
|
except Exception as e:
|
|
logger.error(f"JSON parsing error: {str(e)}")
|
|
return {
|
|
"error": f"JSON parsing error: {str(e)}",
|
|
"extraction_success": False,
|
|
}
|
|
|
|
def _extract_from_plain_text(self, text: str) -> Dict[str, Any]:
|
|
"""Extract receipt data from plain text when JSON parsing fails"""
|
|
try:
|
|
import re
|
|
|
|
# Extract vendor (look for common patterns)
|
|
vendor_patterns = [
|
|
r"(?:vendor|store|merchant|company)\s*[:\-]?\s*([A-Za-z0-9\s&.,]+)",
|
|
r"([A-Z][A-Za-z0-9\s&.,]{3,30})", # Capitalized words
|
|
]
|
|
|
|
vendor = ""
|
|
for pattern in vendor_patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
vendor = match.group(1).strip()
|
|
break
|
|
|
|
# Extract amount (look for currency patterns)
|
|
amount_patterns = [
|
|
r"\$?\s*([0-9,]+\.?[0-9]*)",
|
|
r"(?:total|amount|sum)\s*[:\-]?\s*\$?\s*([0-9,]+\.?[0-9]*)",
|
|
]
|
|
|
|
total_amount = 0.0
|
|
for pattern in amount_patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
try:
|
|
total_amount = float(match.group(1).replace(",", ""))
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
# Extract date
|
|
date_patterns = [
|
|
r"(\d{4}-\d{2}-\d{2})",
|
|
r"(\d{1,2}/\d{1,2}/\d{2,4})",
|
|
r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2},?\s+\d{4}",
|
|
]
|
|
|
|
date = ""
|
|
for pattern in date_patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
date = match.group(0)
|
|
break
|
|
|
|
return {
|
|
"vendor": vendor or "Unknown",
|
|
"total_amount": total_amount,
|
|
"tax_amount": 0.0,
|
|
"date": date or "",
|
|
"category": "Other",
|
|
"confidence": 0.3, # Low confidence for text extraction
|
|
"extraction_success": True,
|
|
"location": None,
|
|
"calculated_tax": None,
|
|
"is_depreciable": None,
|
|
"name_of_asset": None,
|
|
"cca_rate": None,
|
|
"useful_life": None,
|
|
"residual_value": None,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Text extraction error: {str(e)}")
|
|
return {
|
|
"vendor": "Unknown",
|
|
"total_amount": 0.0,
|
|
"tax_amount": 0.0,
|
|
"date": "",
|
|
"category": "Other",
|
|
"confidence": 0.1,
|
|
"extraction_success": False,
|
|
"error": f"Text extraction failed: {str(e)}",
|
|
"location": None,
|
|
"calculated_tax": None,
|
|
"is_depreciable": None,
|
|
"name_of_asset": None,
|
|
"cca_rate": None,
|
|
"useful_life": None,
|
|
"residual_value": None,
|
|
}
|
|
|
|
async def save_uploaded_file(self, file_content: bytes, filename: str) -> str:
|
|
"""Save uploaded file to temporary storage"""
|
|
try:
|
|
# Create uploads directory if it doesn't exist
|
|
upload_dir = "uploads"
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# Generate unique filename
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_filename = f"{timestamp}_{filename.replace(' ', '_')}"
|
|
file_path = os.path.join(upload_dir, safe_filename)
|
|
|
|
# Save file
|
|
async with aiofiles.open(file_path, "wb") as f:
|
|
await f.write(file_content)
|
|
|
|
return file_path
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Failed to save file: {str(e)}")
|
|
|
|
async def extract_transactions_from_image(self, image_path: str) -> Dict[str, Any]:
|
|
"""Extract multiple transactions from an image (bank statement, credit card statement, etc.)"""
|
|
try:
|
|
# Encode image to base64
|
|
base64_image = self._encode_image(image_path)
|
|
|
|
# Create Groq vision prompt for transaction extraction
|
|
prompt = """
|
|
Analyze this financial document image (bank statement, credit card statement, etc.) and extract ALL transactions in JSON format.
|
|
|
|
Look for transaction lists, payment records, or any financial entries that show:
|
|
- Date
|
|
- Amount (positive or negative)
|
|
- Vendor/Description/Payee name
|
|
- Any additional notes or memo
|
|
|
|
Return the transactions as a JSON array:
|
|
{
|
|
"extraction_success": true,
|
|
"transactions": [
|
|
{
|
|
"date": "YYYY-MM-DD",
|
|
"amount": 0.00,
|
|
"vendor": "Vendor name",
|
|
"memo": "Additional notes"
|
|
},
|
|
{
|
|
"date": "YYYY-MM-DD",
|
|
"amount": -0.00,
|
|
"vendor": "Another vendor",
|
|
"memo": "Payment or charge description"
|
|
}
|
|
]
|
|
}
|
|
|
|
Rules:
|
|
- Extract ALL visible transactions
|
|
- Include both positive (credits) and negative (debits) amounts
|
|
- Use the actual date format from the document
|
|
- Vendor should be the merchant/payee name
|
|
- Memo can include transaction type, reference numbers, etc.
|
|
- If no transactions found, return empty array but set extraction_success to true
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
# Call Groq vision API
|
|
response = self.client.chat.completions.create(
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64_image}",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
],
|
|
model=self.model,
|
|
max_tokens=2000, # Higher token limit for multiple transactions
|
|
temperature=0.1,
|
|
)
|
|
|
|
# Parse response
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_transaction_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {
|
|
"extraction_success": False,
|
|
"error": f"Transaction extraction error: {str(e)}",
|
|
"transactions": [],
|
|
}
|
|
|
|
def _parse_transaction_extraction_result(self, result_text: str) -> Dict[str, Any]:
|
|
"""Parse Groq response for transaction extraction"""
|
|
try:
|
|
import json
|
|
import re
|
|
|
|
# Find the first '{' and last '}'
|
|
start = result_text.find("{")
|
|
end = result_text.rfind("}")
|
|
if start == -1 or end == -1 or end <= start:
|
|
return {
|
|
"extraction_success": False,
|
|
"error": "Could not find JSON object in AI response",
|
|
"transactions": [],
|
|
}
|
|
json_str = result_text[start : end + 1]
|
|
|
|
# Remove trailing commas before } or ]
|
|
json_str = re.sub(r",\s*([}\]])", r"\1", json_str)
|
|
|
|
try:
|
|
data = json.loads(json_str)
|
|
except Exception as e:
|
|
import logging
|
|
|
|
logging.error(f"JSON parsing error: {str(e)}")
|
|
logging.error(f"Offending JSON string:\n{json_str}")
|
|
return {
|
|
"extraction_success": False,
|
|
"error": f"JSON parsing error: {str(e)}",
|
|
"transactions": [],
|
|
}
|
|
|
|
# Validate and clean data
|
|
transactions = data.get("transactions", [])
|
|
cleaned_transactions = []
|
|
for txn in transactions:
|
|
try:
|
|
cleaned_txn = {
|
|
"date": str(txn.get("date", "")).strip(),
|
|
"amount": float(
|
|
str(txn.get("amount", 0)).replace("$", "").replace(",", "")
|
|
),
|
|
"vendor": str(txn.get("vendor", "")).strip(),
|
|
"memo": str(txn.get("memo", "")).strip(),
|
|
}
|
|
cleaned_transactions.append(cleaned_txn)
|
|
except Exception:
|
|
continue
|
|
return {
|
|
"extraction_success": data.get("extraction_success", True),
|
|
"transactions": cleaned_transactions,
|
|
"total_transactions": len(cleaned_transactions),
|
|
}
|
|
except Exception as e:
|
|
import logging
|
|
|
|
logging.error(f"JSON parsing error (outer): {str(e)}")
|
|
return {
|
|
"extraction_success": False,
|
|
"error": f"JSON parsing error: {str(e)}",
|
|
"transactions": [],
|
|
}
|
|
|
|
def _parse_date_to_iso(self, date_str: str) -> str:
|
|
"""Parse various date formats and convert to YYYY-MM-DD"""
|
|
try:
|
|
import re
|
|
from datetime import datetime
|
|
|
|
date_str = date_str.strip().upper()
|
|
|
|
# Handle formats like "MAY 22", "JUN 01", "MAY 22, 2024"
|
|
month_pattern = r"(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\s+(\d{1,2})(?:,\s*(\d{4}))?"
|
|
match = re.match(month_pattern, date_str)
|
|
|
|
if match:
|
|
month_abbr, day, year = match.groups()
|
|
month_map = {
|
|
"JAN": 1,
|
|
"FEB": 2,
|
|
"MAR": 3,
|
|
"APR": 4,
|
|
"MAY": 5,
|
|
"JUN": 6,
|
|
"JUL": 7,
|
|
"AUG": 8,
|
|
"SEP": 9,
|
|
"OCT": 10,
|
|
"NOV": 11,
|
|
"DEC": 12,
|
|
}
|
|
|
|
month = month_map[month_abbr]
|
|
day = int(day)
|
|
year = int(year) if year else datetime.now().year
|
|
|
|
# Handle 2-digit years
|
|
if year < 100:
|
|
year += 2000
|
|
|
|
return f"{year:04d}-{month:02d}-{day:02d}"
|
|
|
|
# Handle YYYY-MM-DD format
|
|
if re.match(r"\d{4}-\d{2}-\d{2}", date_str):
|
|
return date_str
|
|
|
|
# Handle MM/DD/YYYY format
|
|
if re.match(r"\d{1,2}/\d{1,2}/\d{4}", date_str):
|
|
return datetime.strptime(date_str, "%m/%d/%Y").strftime("%Y-%m-%d")
|
|
|
|
# Handle MM/DD/YY format
|
|
if re.match(r"\d{1,2}/\d{1,2}/\d{2}", date_str):
|
|
return datetime.strptime(date_str, "%m/%d/%y").strftime("%Y-%m-%d")
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
return None
|