377 lines
15 KiB
Python
377 lines
15 KiB
Python
import groq
|
|
import base64
|
|
import io
|
|
from PIL import Image
|
|
import PyPDF2
|
|
from typing import Dict, Any, List, Optional
|
|
import config
|
|
import os
|
|
import aiofiles
|
|
from datetime import datetime
|
|
|
|
class DocumentProcessor:
|
|
def __init__(self):
|
|
self.client = groq.Groq(api_key=config.GROQ_API_KEY)
|
|
self.model = "meta-llama/llama-4-scout-17b-16e-instruct" # Vision model
|
|
|
|
async def process_file(self, file_path: str, file_type: str) -> Dict[str, Any]:
|
|
"""Process uploaded file and extract receipt data"""
|
|
try:
|
|
if file_type.lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']:
|
|
return await self._process_image(file_path)
|
|
elif file_type.lower() == 'pdf':
|
|
return await self._process_pdf(file_path)
|
|
else:
|
|
raise ValueError(f"Unsupported file type: {file_type}")
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def _process_image(self, image_path: str) -> Dict[str, Any]:
|
|
"""Extract data from image using Groq vision"""
|
|
try:
|
|
# Encode image to base64
|
|
base64_image = self._encode_image(image_path)
|
|
|
|
# Create Groq vision prompt
|
|
prompt = """
|
|
Analyze this receipt image and extract the following information in JSON format:
|
|
{
|
|
"vendor": "Store/company name",
|
|
"total_amount": 0.00,
|
|
"tax_amount": 0.00,
|
|
"date": "YYYY-MM-DD",
|
|
"category": "Food/Transport/Office/Other",
|
|
"confidence": 0.95
|
|
}
|
|
|
|
Rules:
|
|
- Extract vendor name as it appears on receipt
|
|
- Total amount should be the final total including tax
|
|
- Tax amount is separate tax line if available
|
|
- Date should be the date on the receipt
|
|
- Categorize based on vendor type (Starbucks=Food, Shell=Transport, etc.)
|
|
- Confidence score 0-1 based on how clear the receipt is
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
# Call Groq vision API with correct format
|
|
response = self.client.chat.completions.create(
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64_image}",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
],
|
|
model=self.model,
|
|
max_tokens=500,
|
|
temperature=0.1
|
|
)
|
|
|
|
# Parse response
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Image processing error: {str(e)}"}
|
|
|
|
def _encode_image(self, image_path: str) -> str:
|
|
"""Encode image to base64 string"""
|
|
with open(image_path, "rb") as image_file:
|
|
return base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
async def _process_pdf(self, pdf_path: str) -> Dict[str, Any]:
|
|
"""Extract data from PDF by converting to image first"""
|
|
try:
|
|
# For now, extract text from PDF and process as text
|
|
text_content = self._extract_text_from_pdf(pdf_path)
|
|
return self._process_text_content(text_content)
|
|
|
|
except Exception as e:
|
|
return {"error": f"PDF processing error: {str(e)}"}
|
|
|
|
def _extract_text_from_pdf(self, pdf_path: str) -> str:
|
|
"""Extract text from PDF"""
|
|
try:
|
|
with open(pdf_path, 'rb') as file:
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|
text = ""
|
|
for page in pdf_reader.pages:
|
|
text += page.extract_text() + "\n"
|
|
return text
|
|
except Exception as e:
|
|
return ""
|
|
|
|
def _process_text_content(self, text_content: str) -> Dict[str, Any]:
|
|
"""Process text content using Groq (fallback for PDFs)"""
|
|
try:
|
|
prompt = f"""
|
|
Analyze this receipt text and extract the following information in JSON format:
|
|
|
|
Receipt Text:
|
|
{text_content}
|
|
|
|
Extract:
|
|
{{
|
|
"vendor": "Store/company name",
|
|
"total_amount": 0.00,
|
|
"tax_amount": 0.00,
|
|
"date": "YYYY-MM-DD",
|
|
"category": "Food/Transport/Office/Other",
|
|
"confidence": 0.95
|
|
}}
|
|
|
|
Rules:
|
|
- Extract vendor name as it appears on receipt
|
|
- Total amount should be the final total including tax
|
|
- Tax amount is separate tax line if available
|
|
- Date should be the date on the receipt
|
|
- Categorize based on vendor type
|
|
- Confidence score 0-1 based on clarity
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
response = self.client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
max_tokens=500,
|
|
temperature=0.1
|
|
)
|
|
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Text processing error: {str(e)}"}
|
|
|
|
def _parse_extraction_result(self, result_text: str) -> Dict[str, Any]:
|
|
"""Parse Groq response and extract JSON data"""
|
|
try:
|
|
# Clean up response and extract JSON
|
|
import json
|
|
import re
|
|
|
|
# Find JSON in response
|
|
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group()
|
|
data = json.loads(json_str)
|
|
|
|
# Validate and clean data
|
|
return {
|
|
"vendor": data.get("vendor", "").strip(),
|
|
"total_amount": float(data.get("total_amount", 0)),
|
|
"tax_amount": float(data.get("tax_amount", 0)),
|
|
"date": data.get("date", ""),
|
|
"category": data.get("category", "Other"),
|
|
"confidence": float(data.get("confidence", 0.5)),
|
|
"extraction_success": True
|
|
}
|
|
else:
|
|
return {"error": "Could not parse JSON from AI response"}
|
|
|
|
except Exception as e:
|
|
return {"error": f"JSON parsing error: {str(e)}"}
|
|
|
|
async def save_uploaded_file(self, file_content: bytes, filename: str) -> str:
|
|
"""Save uploaded file to temporary storage"""
|
|
try:
|
|
# Create uploads directory if it doesn't exist
|
|
upload_dir = "uploads"
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# Generate unique filename
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_filename = f"{timestamp}_{filename.replace(' ', '_')}"
|
|
file_path = os.path.join(upload_dir, safe_filename)
|
|
|
|
# Save file
|
|
async with aiofiles.open(file_path, 'wb') as f:
|
|
await f.write(file_content)
|
|
|
|
return file_path
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Failed to save file: {str(e)}")
|
|
|
|
async def extract_transactions_from_image(self, image_path: str) -> Dict[str, Any]:
|
|
"""Extract multiple transactions from an image (bank statement, credit card statement, etc.)"""
|
|
try:
|
|
# Encode image to base64
|
|
base64_image = self._encode_image(image_path)
|
|
|
|
# Create Groq vision prompt for transaction extraction
|
|
prompt = """
|
|
Analyze this financial document image (bank statement, credit card statement, etc.) and extract ALL transactions in JSON format.
|
|
|
|
Look for transaction lists, payment records, or any financial entries that show:
|
|
- Date
|
|
- Amount (positive or negative)
|
|
- Vendor/Description/Payee name
|
|
- Any additional notes or memo
|
|
|
|
Return the transactions as a JSON array:
|
|
{
|
|
"extraction_success": true,
|
|
"transactions": [
|
|
{
|
|
"date": "YYYY-MM-DD",
|
|
"amount": 0.00,
|
|
"vendor": "Vendor name",
|
|
"memo": "Additional notes"
|
|
},
|
|
{
|
|
"date": "YYYY-MM-DD",
|
|
"amount": -0.00,
|
|
"vendor": "Another vendor",
|
|
"memo": "Payment or charge description"
|
|
}
|
|
]
|
|
}
|
|
|
|
Rules:
|
|
- Extract ALL visible transactions
|
|
- Include both positive (credits) and negative (debits) amounts
|
|
- Use the actual date format from the document
|
|
- Vendor should be the merchant/payee name
|
|
- Memo can include transaction type, reference numbers, etc.
|
|
- If no transactions found, return empty array but set extraction_success to true
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
# Call Groq vision API
|
|
response = self.client.chat.completions.create(
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64_image}",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
],
|
|
model=self.model,
|
|
max_tokens=2000, # Higher token limit for multiple transactions
|
|
temperature=0.1
|
|
)
|
|
|
|
# Parse response
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_transaction_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {
|
|
"extraction_success": False,
|
|
"error": f"Transaction extraction error: {str(e)}",
|
|
"transactions": []
|
|
}
|
|
|
|
def _parse_transaction_extraction_result(self, result_text: str) -> Dict[str, Any]:
|
|
"""Parse Groq response for transaction extraction"""
|
|
try:
|
|
import json
|
|
import re
|
|
|
|
# Find JSON in response
|
|
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group()
|
|
data = json.loads(json_str)
|
|
|
|
# Validate and clean data
|
|
transactions = data.get("transactions", [])
|
|
cleaned_transactions = []
|
|
|
|
for txn in transactions:
|
|
try:
|
|
# Clean and validate each transaction
|
|
cleaned_txn = {
|
|
"date": str(txn.get("date", "")).strip(),
|
|
"amount": float(str(txn.get("amount", 0)).replace('$', '').replace(',', '')),
|
|
"vendor": str(txn.get("vendor", "")).strip(),
|
|
"memo": str(txn.get("memo", "")).strip()
|
|
}
|
|
cleaned_transactions.append(cleaned_txn)
|
|
except Exception as e:
|
|
# Skip invalid transactions
|
|
continue
|
|
|
|
return {
|
|
"extraction_success": data.get("extraction_success", True),
|
|
"transactions": cleaned_transactions,
|
|
"total_transactions": len(cleaned_transactions)
|
|
}
|
|
else:
|
|
return {
|
|
"extraction_success": False,
|
|
"error": "Could not parse JSON from AI response",
|
|
"transactions": []
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"extraction_success": False,
|
|
"error": f"JSON parsing error: {str(e)}",
|
|
"transactions": []
|
|
}
|
|
|
|
def _parse_date_to_iso(self, date_str: str) -> str:
|
|
"""Parse various date formats and convert to YYYY-MM-DD"""
|
|
try:
|
|
import re
|
|
from datetime import datetime
|
|
|
|
date_str = date_str.strip().upper()
|
|
|
|
# Handle formats like "MAY 22", "JUN 01", "MAY 22, 2024"
|
|
month_pattern = r'(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\s+(\d{1,2})(?:,\s*(\d{4}))?'
|
|
match = re.match(month_pattern, date_str)
|
|
|
|
if match:
|
|
month_abbr, day, year = match.groups()
|
|
month_map = {
|
|
'JAN': 1, 'FEB': 2, 'MAR': 3, 'APR': 4, 'MAY': 5, 'JUN': 6,
|
|
'JUL': 7, 'AUG': 8, 'SEP': 9, 'OCT': 10, 'NOV': 11, 'DEC': 12
|
|
}
|
|
|
|
month = month_map[month_abbr]
|
|
day = int(day)
|
|
year = int(year) if year else datetime.now().year
|
|
|
|
# Handle 2-digit years
|
|
if year < 100:
|
|
year += 2000
|
|
|
|
return f"{year:04d}-{month:02d}-{day:02d}"
|
|
|
|
# Handle YYYY-MM-DD format
|
|
if re.match(r'\d{4}-\d{2}-\d{2}', date_str):
|
|
return date_str
|
|
|
|
# Handle MM/DD/YYYY format
|
|
if re.match(r'\d{1,2}/\d{1,2}/\d{4}', date_str):
|
|
return datetime.strptime(date_str, '%m/%d/%Y').strftime('%Y-%m-%d')
|
|
|
|
# Handle MM/DD/YY format
|
|
if re.match(r'\d{1,2}/\d{1,2}/\d{2}', date_str):
|
|
return datetime.strptime(date_str, '%m/%d/%y').strftime('%Y-%m-%d')
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
return None |