Files
email_alerts/zoho_client.py
T
Iyeoluwa Akinrinola 70b2bcc89d Fix Zoho connection issues and add local testing capabilities
- Fixed 'days_back' parameter error in zoho_client.py
- Added robust settings validation in app.py to prevent int() errors
- Created LOCAL_TESTING.md with comprehensive testing guide
- Added run_local.sh for easy local server setup
- Added deploy_updates.sh for deployment instructions
- Added test_local_connection.py for automated testing
- Updated config.json with proper structure
- All fixes tested and working locally
2025-07-25 12:40:10 +01:00

168 lines
6.4 KiB
Python

import os
import imaplib
import email
from email.header import decode_header
from datetime import datetime, timedelta
from typing import List, Dict, Any
from dotenv import load_dotenv
load_dotenv()
class ZohoClient:
def __init__(self, email=None, app_password=None):
self.imap_server = "imap.zoho.com"
self.imap_port = 993
# Use provided credentials or fall back to environment variables
self.email = email or os.getenv("ZOHO_EMAIL", "")
self.app_password = app_password or os.getenv("ZOHO_APP_PASSWORD", "")
if not self.email or not self.app_password:
raise ValueError("Zoho email and app password must be provided")
self.connection = None
self._connect()
def _connect(self):
"""Connect to Zoho IMAP server using app password"""
try:
self.connection = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
self.connection.login(self.email, self.app_password)
print(f"✅ Connected to Zoho IMAP server as {self.email}")
except Exception as e:
print(f"❌ Failed to connect to Zoho IMAP: {e}")
print("💡 Make sure IMAP is enabled in your Zoho Mail settings")
raise
def fetch_emails(self, query: str = None, max_results: int = None, days_back: int = 7) -> List[Dict[str, Any]]:
"""Fetch emails from Zoho with date filtering (configurable days back)"""
try:
# Select INBOX
self.connection.select('INBOX')
# Build search criteria - only emails from specified days back
days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
search_criteria = f'SINCE {days_ago}'
if query:
search_criteria += f' {query}'
# Search for emails
status, message_numbers = self.connection.search(None, search_criteria)
if status != 'OK':
print(f"❌ Search failed: {status}")
return []
email_list = message_numbers[0].split()
# Limit results if specified
if max_results is not None:
email_list = email_list[-max_results:] # Get the most recent emails
emails = []
for num in email_list:
try:
# Fetch email data
status, data = self.connection.fetch(num, '(RFC822)')
if status == 'OK':
raw_email = data[0][1]
email_message = email.message_from_bytes(raw_email)
# Extract headers
subject = self._decode_header(email_message.get('Subject', ''))
from_header = self._decode_header(email_message.get('From', ''))
date_header = email_message.get('Date', '')
message_id = email_message.get('Message-ID', '')
# Generate thread ID (using Message-ID as fallback)
thread_id = message_id or f"thread_{num.decode()}"
# Get email body snippet
body = self._get_email_body(email_message)
snippet = body[:200] + "..." if len(body) > 200 else body
email_data = {
'id': num.decode(),
'threadId': thread_id,
'from': from_header,
'subject': subject,
'date': date_header,
'messageId': message_id,
'snippet': snippet
}
emails.append(email_data)
except Exception as e:
print(f"❌ Error processing email {num}: {e}")
continue
print(f"📧 Fetched {len(emails)} real emails from last {days_back} days")
return emails
except Exception as e:
print(f"❌ Error fetching emails: {e}")
return []
def _decode_header(self, header_value: str) -> str:
"""Decode email header values"""
if not header_value:
return ""
try:
decoded_parts = decode_header(header_value)
decoded_string = ""
for part, encoding in decoded_parts:
if isinstance(part, bytes):
if encoding:
decoded_string += part.decode(encoding)
else:
decoded_string += part.decode('utf-8', errors='ignore')
else:
decoded_string += str(part)
return decoded_string
except Exception:
return str(header_value)
def _get_email_body(self, email_message) -> str:
"""Extract email body text"""
body = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/plain":
try:
body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
except:
pass
else:
try:
body = email_message.get_payload(decode=True).decode('utf-8', errors='ignore')
except:
pass
return body
def get_thread_messages(self, thread_id: str) -> List[Dict[str, Any]]:
"""Get all messages in a thread (simplified for IMAP)"""
# For IMAP, we'll return a single message since thread grouping is more complex
# This is a simplified implementation
return []
def close(self):
"""Close the IMAP connection"""
if self.connection:
try:
self.connection.close()
self.connection.logout()
except:
pass
if __name__ == "__main__":
client = ZohoClient()
emails = client.fetch_emails(max_results=10)
print(f"Fetched {len(emails)} emails")
client.close()