204 lines
7.7 KiB
Python
204 lines
7.7 KiB
Python
import groq
|
|
import base64
|
|
import io
|
|
from PIL import Image
|
|
import PyPDF2
|
|
from typing import Dict, Any, List, Optional
|
|
import config
|
|
import os
|
|
import aiofiles
|
|
from datetime import datetime
|
|
|
|
class DocumentProcessor:
|
|
def __init__(self):
|
|
self.client = groq.Groq(api_key=config.GROQ_API_KEY)
|
|
self.model = "meta-llama/llama-4-scout-17b-16e-instruct" # Vision model
|
|
|
|
async def process_file(self, file_path: str, file_type: str) -> Dict[str, Any]:
|
|
"""Process uploaded file and extract receipt data"""
|
|
try:
|
|
if file_type.lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']:
|
|
return await self._process_image(file_path)
|
|
elif file_type.lower() == 'pdf':
|
|
return await self._process_pdf(file_path)
|
|
else:
|
|
raise ValueError(f"Unsupported file type: {file_type}")
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
async def _process_image(self, image_path: str) -> Dict[str, Any]:
|
|
"""Extract data from image using Groq vision"""
|
|
try:
|
|
# Encode image to base64
|
|
base64_image = self._encode_image(image_path)
|
|
|
|
# Create Groq vision prompt
|
|
prompt = """
|
|
Analyze this receipt image and extract the following information in JSON format:
|
|
{
|
|
"vendor": "Store/company name",
|
|
"total_amount": 0.00,
|
|
"tax_amount": 0.00,
|
|
"date": "YYYY-MM-DD",
|
|
"category": "Food/Transport/Office/Other",
|
|
"confidence": 0.95
|
|
}
|
|
|
|
Rules:
|
|
- Extract vendor name as it appears on receipt
|
|
- Total amount should be the final total including tax
|
|
- Tax amount is separate tax line if available
|
|
- Date should be the date on the receipt
|
|
- Categorize based on vendor type (Starbucks=Food, Shell=Transport, etc.)
|
|
- Confidence score 0-1 based on how clear the receipt is
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
# Call Groq vision API with correct format
|
|
response = self.client.chat.completions.create(
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64_image}",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
],
|
|
model=self.model,
|
|
max_tokens=500,
|
|
temperature=0.1
|
|
)
|
|
|
|
# Parse response
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Image processing error: {str(e)}"}
|
|
|
|
def _encode_image(self, image_path: str) -> str:
|
|
"""Encode image to base64 string"""
|
|
with open(image_path, "rb") as image_file:
|
|
return base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
async def _process_pdf(self, pdf_path: str) -> Dict[str, Any]:
|
|
"""Extract data from PDF by converting to image first"""
|
|
try:
|
|
# For now, extract text from PDF and process as text
|
|
text_content = self._extract_text_from_pdf(pdf_path)
|
|
return self._process_text_content(text_content)
|
|
|
|
except Exception as e:
|
|
return {"error": f"PDF processing error: {str(e)}"}
|
|
|
|
def _extract_text_from_pdf(self, pdf_path: str) -> str:
|
|
"""Extract text from PDF"""
|
|
try:
|
|
with open(pdf_path, 'rb') as file:
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|
text = ""
|
|
for page in pdf_reader.pages:
|
|
text += page.extract_text() + "\n"
|
|
return text
|
|
except Exception as e:
|
|
return ""
|
|
|
|
def _process_text_content(self, text_content: str) -> Dict[str, Any]:
|
|
"""Process text content using Groq (fallback for PDFs)"""
|
|
try:
|
|
prompt = f"""
|
|
Analyze this receipt text and extract the following information in JSON format:
|
|
|
|
Receipt Text:
|
|
{text_content}
|
|
|
|
Extract:
|
|
{{
|
|
"vendor": "Store/company name",
|
|
"total_amount": 0.00,
|
|
"tax_amount": 0.00,
|
|
"date": "YYYY-MM-DD",
|
|
"category": "Food/Transport/Office/Other",
|
|
"confidence": 0.95
|
|
}}
|
|
|
|
Rules:
|
|
- Extract vendor name as it appears on receipt
|
|
- Total amount should be the final total including tax
|
|
- Tax amount is separate tax line if available
|
|
- Date should be the date on the receipt
|
|
- Categorize based on vendor type
|
|
- Confidence score 0-1 based on clarity
|
|
|
|
Return only valid JSON.
|
|
"""
|
|
|
|
response = self.client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
max_tokens=500,
|
|
temperature=0.1
|
|
)
|
|
|
|
result_text = response.choices[0].message.content.strip()
|
|
return self._parse_extraction_result(result_text)
|
|
|
|
except Exception as e:
|
|
return {"error": f"Text processing error: {str(e)}"}
|
|
|
|
def _parse_extraction_result(self, result_text: str) -> Dict[str, Any]:
|
|
"""Parse Groq response and extract JSON data"""
|
|
try:
|
|
# Clean up response and extract JSON
|
|
import json
|
|
import re
|
|
|
|
# Find JSON in response
|
|
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group()
|
|
data = json.loads(json_str)
|
|
|
|
# Validate and clean data
|
|
return {
|
|
"vendor": data.get("vendor", "").strip(),
|
|
"total_amount": float(data.get("total_amount", 0)),
|
|
"tax_amount": float(data.get("tax_amount", 0)),
|
|
"date": data.get("date", ""),
|
|
"category": data.get("category", "Other"),
|
|
"confidence": float(data.get("confidence", 0.5)),
|
|
"extraction_success": True
|
|
}
|
|
else:
|
|
return {"error": "Could not parse JSON from AI response"}
|
|
|
|
except Exception as e:
|
|
return {"error": f"JSON parsing error: {str(e)}"}
|
|
|
|
async def save_uploaded_file(self, file_content: bytes, filename: str) -> str:
|
|
"""Save uploaded file to temporary storage"""
|
|
try:
|
|
# Create uploads directory if it doesn't exist
|
|
upload_dir = "uploads"
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# Generate unique filename
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_filename = f"{timestamp}_{filename.replace(' ', '_')}"
|
|
file_path = os.path.join(upload_dir, safe_filename)
|
|
|
|
# Save file
|
|
async with aiofiles.open(file_path, 'wb') as f:
|
|
await f.write(file_content)
|
|
|
|
return file_path
|
|
|
|
except Exception as e:
|
|
raise Exception(f"File save error: {str(e)}") |