498 lines
20 KiB
Python
498 lines
20 KiB
Python
import groq
|
|
import base64
|
|
import io
|
|
from PIL import Image
|
|
import PyPDF2
|
|
from typing import Dict, Any, List, Optional
|
|
import config
|
|
import os
|
|
import aiofiles
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DocumentProcessor:
|
|
def __init__(self):
|
|
self.client = groq.Groq(api_key=config.GROQ_API_KEY)
|
|
self.model = "meta-llama/llama-4-scout-17b-16e-instruct" # Vision model
|
|
|
|
async def process_file(self, file_path: str, file_type: str) -> Dict[str, Any]:
|
|
"""Process uploaded file and extract receipt data"""
|
|
try:
|
|
if file_type.lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']:
|
|
return await self._process_image(file_path)
|
|
elif file_type.lower() == 'pdf':
|
|
return await self._process_pdf(file_path)
|
|
else:
|
|
raise ValueError(f"Unsupported file type: {file_type}")
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def _process_image(self, image_path: str) -> Dict[str, Any]:
|
|
"""Extract data from image using Groq vision"""
|
|
try:
|
|
# Encode image to base64
|
|
base64_image = self._encode_image(image_path)
|
|
|
|
# Create Groq vision prompt
|
|
prompt = """
|
|
Analyze this receipt image and extract the following information in JSON format:
|
|
{
|
|
"vendor": "Store/company name",
|
|
"description": "Detailed description of items/services purchased",
|
|
"total_amount": 0.00,
|
|
"tax_amount": 0.00,
|
|
"date": "YYYY-MM-DD",
|
|
"category": "Food/Transport/Office/Other",
|
|
"confidence": 0.95
|
|
}
|
|
|
|
Rules:
|
|
- Extract vendor name as it appears on receipt
|
|
- Extract description of items/services purchased (e.g., "Coffee and sandwich", "Gasoline", "Office supplies")
|
|
- Total amount should be the final total including tax
|
|
- Tax amount is separate tax line if available
|
|
- Date should be the date on the receipt
|
|
- Categorize based on vendor type (Starbucks=Food, Shell=Transport, etc.)
|
|
- Confidence score 0-1 based on how clear the receipt is
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
# Call Groq vision API with correct format
|
|
response = self.client.chat.completions.create(
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64_image}",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
],
|
|
model=self.model,
|
|
max_tokens=500,
|
|
temperature=0.1
|
|
)
|
|
|
|
# Parse response
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Image processing error: {str(e)}"}
|
|
|
|
def _encode_image(self, image_path: str) -> str:
|
|
"""Encode image to base64 string"""
|
|
with open(image_path, "rb") as image_file:
|
|
return base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
async def _process_pdf(self, pdf_path: str) -> Dict[str, Any]:
|
|
"""Extract data from PDF by converting to image first"""
|
|
try:
|
|
# For now, extract text from PDF and process as text
|
|
text_content = self._extract_text_from_pdf(pdf_path)
|
|
return self._process_text_content(text_content)
|
|
|
|
except Exception as e:
|
|
return {"error": f"PDF processing error: {str(e)}"}
|
|
|
|
def _extract_text_from_pdf(self, pdf_path: str) -> str:
|
|
"""Extract text from PDF"""
|
|
try:
|
|
with open(pdf_path, 'rb') as file:
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|
text = ""
|
|
for page in pdf_reader.pages:
|
|
text += page.extract_text() + "\n"
|
|
return text
|
|
except Exception as e:
|
|
return ""
|
|
|
|
def _process_text_content(self, text_content: str) -> Dict[str, Any]:
|
|
"""Process text content using Groq (fallback for PDFs)"""
|
|
try:
|
|
prompt = f"""
|
|
Analyze this receipt text and extract the following information in JSON format:
|
|
|
|
Receipt Text:
|
|
{text_content}
|
|
|
|
Extract:
|
|
{{
|
|
"vendor": "Store/company name",
|
|
"description": "Detailed description of items/services purchased",
|
|
"total_amount": 0.00,
|
|
"tax_amount": 0.00,
|
|
"date": "YYYY-MM-DD",
|
|
"category": "Food/Transport/Office/Other",
|
|
"confidence": 0.95
|
|
}}
|
|
|
|
Rules:
|
|
- Extract vendor name as it appears on receipt
|
|
- Extract description of items/services purchased (e.g., "Coffee and sandwich", "Gasoline", "Office supplies")
|
|
- Total amount should be the final total including tax
|
|
- Tax amount is separate tax line if available
|
|
- Date should be the date on the receipt
|
|
- Categorize based on vendor type
|
|
- Confidence score 0-1 based on clarity
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
response = self.client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
max_tokens=500,
|
|
temperature=0.1
|
|
)
|
|
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Text processing error: {str(e)}"}
|
|
|
|
def _parse_extraction_result(self, result_text: str) -> Dict[str, Any]:
|
|
"""Parse Groq response and extract JSON data"""
|
|
try:
|
|
# Clean up response and extract JSON
|
|
import json
|
|
import re
|
|
|
|
# Find JSON in response - try multiple patterns
|
|
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group()
|
|
|
|
# Clean up common JSON issues
|
|
json_str = re.sub(r',\s*([}\]])', r'\1', json_str) # Remove trailing commas
|
|
json_str = re.sub(r'([{,])\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', json_str) # Quote unquoted keys
|
|
|
|
try:
|
|
data = json.loads(json_str)
|
|
except json.JSONDecodeError as e:
|
|
# Try to fix common JSON issues
|
|
logger.warning(f"Initial JSON parsing failed: {e}")
|
|
|
|
# Try to extract individual fields using regex
|
|
vendor_match = re.search(r'"vendor"\s*:\s*"([^"]*)"', json_str)
|
|
description_match = re.search(r'"description"\s*:\s*"([^"]*)"', json_str)
|
|
total_amount_match = re.search(r'"total_amount"\s*:\s*([0-9.]+)', json_str)
|
|
tax_amount_match = re.search(r'"tax_amount"\s*:\s*([0-9.]+)', json_str)
|
|
date_match = re.search(r'"date"\s*:\s*"([^"]*)"', json_str)
|
|
category_match = re.search(r'"category"\s*:\s*"([^"]*)"', json_str)
|
|
confidence_match = re.search(r'"confidence"\s*:\s*([0-9.]+)', json_str)
|
|
|
|
data = {
|
|
"vendor": vendor_match.group(1) if vendor_match else "",
|
|
"description": description_match.group(1) if description_match else "",
|
|
"total_amount": float(total_amount_match.group(1)) if total_amount_match else 0.0,
|
|
"tax_amount": float(tax_amount_match.group(1)) if tax_amount_match else 0.0,
|
|
"date": date_match.group(1) if date_match else "",
|
|
"category": category_match.group(1) if category_match else "Other",
|
|
"confidence": float(confidence_match.group(1)) if confidence_match else 0.5
|
|
}
|
|
|
|
# Validate and clean data
|
|
return {
|
|
"vendor": str(data.get("vendor", "")).strip(),
|
|
"description": str(data.get("description", "")).strip(),
|
|
"total_amount": float(data.get("total_amount", 0)),
|
|
"tax_amount": float(data.get("tax_amount", 0)),
|
|
"date": str(data.get("date", "")).strip(),
|
|
"category": str(data.get("category", "Other")).strip(),
|
|
"confidence": float(data.get("confidence", 0.5)),
|
|
"extraction_success": True
|
|
}
|
|
else:
|
|
# Try to extract fields from plain text
|
|
logger.warning("No JSON found in response, attempting text extraction")
|
|
return self._extract_from_plain_text(result_text)
|
|
|
|
except Exception as e:
|
|
logger.error(f"JSON parsing error: {str(e)}")
|
|
return {"error": f"JSON parsing error: {str(e)}", "extraction_success": False}
|
|
|
|
def _extract_from_plain_text(self, text: str) -> Dict[str, Any]:
|
|
"""Extract receipt data from plain text when JSON parsing fails"""
|
|
try:
|
|
import re
|
|
|
|
# Extract vendor (look for common patterns)
|
|
vendor_patterns = [
|
|
r'(?:vendor|store|merchant|company)\s*[:\-]?\s*([A-Za-z0-9\s&.,]+)',
|
|
r'([A-Z][A-Za-z0-9\s&.,]{3,30})', # Capitalized words
|
|
]
|
|
|
|
vendor = ""
|
|
for pattern in vendor_patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
vendor = match.group(1).strip()
|
|
break
|
|
|
|
# Extract amount (look for currency patterns)
|
|
amount_patterns = [
|
|
r'\$?\s*([0-9,]+\.?[0-9]*)',
|
|
r'(?:total|amount|sum)\s*[:\-]?\s*\$?\s*([0-9,]+\.?[0-9]*)',
|
|
]
|
|
|
|
total_amount = 0.0
|
|
for pattern in amount_patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
try:
|
|
total_amount = float(match.group(1).replace(',', ''))
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
# Extract date
|
|
date_patterns = [
|
|
r'(\d{4}-\d{2}-\d{2})',
|
|
r'(\d{1,2}/\d{1,2}/\d{2,4})',
|
|
r'(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2},?\s+\d{4}',
|
|
]
|
|
|
|
date = ""
|
|
for pattern in date_patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
date = match.group(0)
|
|
break
|
|
|
|
return {
|
|
"vendor": vendor or "Unknown",
|
|
"total_amount": total_amount,
|
|
"tax_amount": 0.0,
|
|
"date": date or "",
|
|
"category": "Other",
|
|
"confidence": 0.3, # Low confidence for text extraction
|
|
"extraction_success": True
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Text extraction error: {str(e)}")
|
|
return {
|
|
"vendor": "Unknown",
|
|
"total_amount": 0.0,
|
|
"tax_amount": 0.0,
|
|
"date": "",
|
|
"category": "Other",
|
|
"confidence": 0.1,
|
|
"extraction_success": False,
|
|
"error": f"Text extraction failed: {str(e)}"
|
|
}
|
|
|
|
async def save_uploaded_file(self, file_content: bytes, filename: str) -> str:
|
|
"""Save uploaded file to temporary storage"""
|
|
try:
|
|
# Create uploads directory if it doesn't exist
|
|
upload_dir = "uploads"
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# Generate unique filename
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_filename = f"{timestamp}_{filename.replace(' ', '_')}"
|
|
file_path = os.path.join(upload_dir, safe_filename)
|
|
|
|
# Save file
|
|
async with aiofiles.open(file_path, 'wb') as f:
|
|
await f.write(file_content)
|
|
|
|
return file_path
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Failed to save file: {str(e)}")
|
|
|
|
async def extract_transactions_from_image(self, image_path: str) -> Dict[str, Any]:
|
|
"""Extract multiple transactions from an image (bank statement, credit card statement, etc.)"""
|
|
try:
|
|
# Encode image to base64
|
|
base64_image = self._encode_image(image_path)
|
|
|
|
# Create Groq vision prompt for transaction extraction
|
|
prompt = """
|
|
Analyze this financial document image (bank statement, credit card statement, etc.) and extract ALL transactions in JSON format.
|
|
|
|
Look for transaction lists, payment records, or any financial entries that show:
|
|
- Date
|
|
- Amount (positive or negative)
|
|
- Vendor/Description/Payee name
|
|
- Any additional notes or memo
|
|
|
|
Return the transactions as a JSON array:
|
|
{
|
|
"extraction_success": true,
|
|
"transactions": [
|
|
{
|
|
"date": "YYYY-MM-DD",
|
|
"amount": 0.00,
|
|
"vendor": "Vendor name",
|
|
"memo": "Additional notes"
|
|
},
|
|
{
|
|
"date": "YYYY-MM-DD",
|
|
"amount": -0.00,
|
|
"vendor": "Another vendor",
|
|
"memo": "Payment or charge description"
|
|
}
|
|
]
|
|
}
|
|
|
|
Rules:
|
|
- Extract ALL visible transactions
|
|
- Include both positive (credits) and negative (debits) amounts
|
|
- Use the actual date format from the document
|
|
- Vendor should be the merchant/payee name
|
|
- Memo can include transaction type, reference numbers, etc.
|
|
- If no transactions found, return empty array but set extraction_success to true
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
# Call Groq vision API
|
|
response = self.client.chat.completions.create(
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64_image}",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
],
|
|
model=self.model,
|
|
max_tokens=2000, # Higher token limit for multiple transactions
|
|
temperature=0.1
|
|
)
|
|
|
|
# Parse response
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_transaction_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {
|
|
"extraction_success": False,
|
|
"error": f"Transaction extraction error: {str(e)}",
|
|
"transactions": []
|
|
}
|
|
|
|
def _parse_transaction_extraction_result(self, result_text: str) -> Dict[str, Any]:
|
|
"""Parse Groq response for transaction extraction"""
|
|
try:
|
|
import json
|
|
import re
|
|
|
|
# Find the first '{' and last '}'
|
|
start = result_text.find('{')
|
|
end = result_text.rfind('}')
|
|
if start == -1 or end == -1 or end <= start:
|
|
return {
|
|
"extraction_success": False,
|
|
"error": "Could not find JSON object in AI response",
|
|
"transactions": []
|
|
}
|
|
json_str = result_text[start:end+1]
|
|
|
|
# Remove trailing commas before } or ]
|
|
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
|
|
|
|
try:
|
|
data = json.loads(json_str)
|
|
except Exception as e:
|
|
import logging
|
|
logging.error(f"JSON parsing error: {str(e)}")
|
|
logging.error(f"Offending JSON string:\n{json_str}")
|
|
return {
|
|
"extraction_success": False,
|
|
"error": f"JSON parsing error: {str(e)}",
|
|
"transactions": []
|
|
}
|
|
|
|
# Validate and clean data
|
|
transactions = data.get("transactions", [])
|
|
cleaned_transactions = []
|
|
for txn in transactions:
|
|
try:
|
|
cleaned_txn = {
|
|
"date": str(txn.get("date", "")).strip(),
|
|
"amount": float(str(txn.get("amount", 0)).replace('$', '').replace(',', '')),
|
|
"vendor": str(txn.get("vendor", "")).strip(),
|
|
"memo": str(txn.get("memo", "")).strip()
|
|
}
|
|
cleaned_transactions.append(cleaned_txn)
|
|
except Exception as e:
|
|
continue
|
|
return {
|
|
"extraction_success": data.get("extraction_success", True),
|
|
"transactions": cleaned_transactions,
|
|
"total_transactions": len(cleaned_transactions)
|
|
}
|
|
except Exception as e:
|
|
import logging
|
|
logging.error(f"JSON parsing error (outer): {str(e)}")
|
|
return {
|
|
"extraction_success": False,
|
|
"error": f"JSON parsing error: {str(e)}",
|
|
"transactions": []
|
|
}
|
|
|
|
def _parse_date_to_iso(self, date_str: str) -> str:
|
|
"""Parse various date formats and convert to YYYY-MM-DD"""
|
|
try:
|
|
import re
|
|
from datetime import datetime
|
|
|
|
date_str = date_str.strip().upper()
|
|
|
|
# Handle formats like "MAY 22", "JUN 01", "MAY 22, 2024"
|
|
month_pattern = r'(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\s+(\d{1,2})(?:,\s*(\d{4}))?'
|
|
match = re.match(month_pattern, date_str)
|
|
|
|
if match:
|
|
month_abbr, day, year = match.groups()
|
|
month_map = {
|
|
'JAN': 1, 'FEB': 2, 'MAR': 3, 'APR': 4, 'MAY': 5, 'JUN': 6,
|
|
'JUL': 7, 'AUG': 8, 'SEP': 9, 'OCT': 10, 'NOV': 11, 'DEC': 12
|
|
}
|
|
|
|
month = month_map[month_abbr]
|
|
day = int(day)
|
|
year = int(year) if year else datetime.now().year
|
|
|
|
# Handle 2-digit years
|
|
if year < 100:
|
|
year += 2000
|
|
|
|
return f"{year:04d}-{month:02d}-{day:02d}"
|
|
|
|
# Handle YYYY-MM-DD format
|
|
if re.match(r'\d{4}-\d{2}-\d{2}', date_str):
|
|
return date_str
|
|
|
|
# Handle MM/DD/YYYY format
|
|
if re.match(r'\d{1,2}/\d{1,2}/\d{4}', date_str):
|
|
return datetime.strptime(date_str, '%m/%d/%Y').strftime('%Y-%m-%d')
|
|
|
|
# Handle MM/DD/YY format
|
|
if re.match(r'\d{1,2}/\d{1,2}/\d{2}', date_str):
|
|
return datetime.strptime(date_str, '%m/%d/%y').strftime('%Y-%m-%d')
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
return None |