Files
email_alerts_v2/zoho_client.py
T

281 lines
10 KiB
Python
Raw Normal View History

2025-08-11 14:46:35 +01:00
import email
import imaplib
import os
from datetime import datetime, timedelta
from email.header import decode_header
from typing import Any, Dict, List
from dotenv import load_dotenv
load_dotenv()
class ZohoClient:
def __init__(self, email=None, app_password=None):
self.imap_server = "imap.zoho.com"
self.imap_port = 993
# Use provided credentials or fall back to environment variables
self.email = email or os.getenv("ZOHO_EMAIL", "")
self.app_password = app_password or os.getenv("ZOHO_PASSWORD", "")
if not self.email or not self.app_password:
raise ValueError("Zoho email and app password must be provided")
self.connection = None
self._connect()
def _connect(self):
"""Connect to Zoho IMAP server using app password"""
try:
self.connection = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
self.connection.login(self.email, self.app_password)
print(f"✅ Connected to Zoho IMAP server as {self.email}")
except Exception as e:
print(f"❌ Failed to connect to Zoho IMAP: {e}")
print("💡 Make sure IMAP is enabled in your Zoho Mail settings")
raise
def fetch_folder_emails(
self,
folder: str = "INBOX",
query: str = None,
max_results: int = None,
days_back: int = 7,
) -> List[Dict[str, Any]]:
"""Fetch emails from a given folder with date filtering"""
try:
# Select folder
mailbox = '"Sent"' if folder.lower() == "sent" else folder
print(f"📥 Selecting {folder}...[STEP 2]")
self.connection.select(mailbox)
# Build search criteria - only emails from specified days back
days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
search_criteria = f"SINCE {days_ago}"
if query:
search_criteria += f" {query}"
# Search for emails
status, message_numbers = self.connection.search(None, search_criteria)
if status != "OK":
print(f"❌ Search failed: {status}")
return []
email_list = message_numbers[0].split()
# Limit results if specified
if max_results is not None:
email_list = email_list[-max_results:] # Get the most recent emails
emails = []
for i, num in enumerate(email_list):
try:
print(f"📧 Fetching email {num.decode()}... [STEP 3] {i}")
# Fetch email data
status, data = self.connection.fetch(num, "(RFC822)")
if status == "OK":
raw_email = data[0][1]
email_message = email.message_from_bytes(raw_email)
# Extract headers
subject = self._decode_header(email_message.get("Subject", ""))
from_header = self._decode_header(email_message.get("From", ""))
to_header = self._decode_header(email_message.get("To", ""))
date_header = email_message.get("Date", "")
message_id = email_message.get("Message-ID", "")
in_reply_to = email_message.get("In-Reply-To", "")
# Generate thread ID (using Message-ID as fallback)
thread_id = message_id or f"thread_{num.decode()}"
# Get email body snippet
body = self._get_email_body(email_message)
snippet = body[:200] + "..." if len(body) > 200 else body
email_data = {
"id": num.decode(),
"threadId": thread_id,
"from": from_header,
"to": to_header,
"subject": subject,
"date": date_header,
"messageId": message_id,
"inReplyTo": in_reply_to,
"folder": folder,
"snippet": snippet,
}
emails.append(email_data)
except Exception as e:
print(f"❌ Error processing email {num}: {e}")
continue
print(
f"📧 Fetched {len(emails)} real emails from {folder} for last {days_back} days"
)
return emails
except Exception as e:
print(f"❌ Error fetching emails from {folder}: {e}")
return []
def fetch_emails(
self, query: str = None, max_results: int = None, days_back: int = 7
) -> List[Dict[str, Any]]:
"""Fetch emails from INBOX (backwards-compatible wrapper)."""
return self.fetch_folder_emails(
folder="INBOX", query=query, max_results=max_results, days_back=days_back
)
def _decode_header(self, header_value: str) -> str:
"""Decode email header values"""
if not header_value:
return ""
try:
decoded_parts = decode_header(header_value)
decoded_string = ""
for part, encoding in decoded_parts:
if isinstance(part, bytes):
if encoding:
decoded_string += part.decode(encoding)
else:
decoded_string += part.decode("utf-8", errors="ignore")
else:
decoded_string += str(part)
return decoded_string
except Exception:
return str(header_value)
def _get_email_body(self, email_message) -> str:
"""Extract email body text"""
body = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/plain":
try:
body += part.get_payload(decode=True).decode(
"utf-8", errors="ignore"
)
except Exception:
pass
else:
try:
body = email_message.get_payload(decode=True).decode(
"utf-8", errors="ignore"
)
except Exception:
pass
return body
def get_thread_messages(self, thread_id: str) -> List[Dict[str, Any]]:
"""Get all messages in a thread (simplified for IMAP)"""
# For IMAP, we'll return a single message since thread grouping is more complex
# This is a simplified implementation
return []
def check_sent_folder_for_replies(self, subject: str, days_back: int = 7) -> bool:
"""Check SENT folder to see if we've replied to emails with this subject"""
try:
# Select SENT folder
self.connection.select('"Sent"') # Zoho uses "Sent" folder
# Build search criteria for sent emails within the date range
days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
# Search for emails with similar subject (remove "Re:" prefixes for matching)
clean_subject = (
subject.replace("Re: ", "").replace("RE: ", "").replace("re: ", "")
)
search_criteria = f'SINCE {days_ago} SUBJECT "{clean_subject}"'
status, message_numbers = self.connection.search(None, search_criteria)
if status == "OK" and message_numbers[0]:
# Found sent emails with matching subject
return True
return False
except Exception as e:
print(f"❌ Error checking sent folder: {e}")
return False
finally:
# Always return to INBOX
try:
self.connection.select("INBOX")
except Exception:
pass
def check_message_reply_status(
self, message_id: str, subject: str, days_back: int = 7
) -> bool:
"""Check if a specific message has been replied to by checking SENT folder"""
try:
# First check by subject in SENT folder
if self.check_sent_folder_for_replies(subject, days_back):
return True
# Additional check: look for In-Reply-To headers matching our message ID
self.connection.select('"Sent"')
days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
# Search all sent emails in date range
status, message_numbers = self.connection.search(None, f"SINCE {days_ago}")
if status == "OK" and message_numbers[0]:
email_list = message_numbers[0].split()
for num in email_list[
-20:
]: # Check last 20 sent emails for performance
try:
status, data = self.connection.fetch(num, "(RFC822)")
if status == "OK":
raw_email = data[0][1]
email_message = email.message_from_bytes(raw_email)
# Check In-Reply-To header
in_reply_to = email_message.get("In-Reply-To", "")
if message_id and message_id in in_reply_to:
return True
except Exception:
continue
return False
except Exception as e:
print(f"❌ Error checking message reply status: {e}")
return False
finally:
# Always return to INBOX
try:
self.connection.select("INBOX")
except Exception:
pass
def close(self):
"""Close the IMAP connection"""
if self.connection:
try:
self.connection.close()
self.connection.logout()
except Exception as e:
print(f"Error closing connection: {e}")
if __name__ == "__main__":
client = ZohoClient()
emails = client.fetch_emails(max_results=10)
print(f"Fetched {len(emails)} emails")
client.close()