feat: Implement async AI analysis for email threads

- Added `get_latest_email_date()` function in `database.py` to retrieve the most recent email date for a given account and folder.
- Enhanced `fetch_folder_emails()` in `zoho_client.py` to intelligently determine the start date for fetching emails based on the latest email date in the database.
- Introduced `analyze_and_update_threads_async()` for asynchronous analysis of email threads, allowing concurrent processing.
- Created a synchronous wrapper `analyze_and_update_threads()` for easier integration.
- Updated `fetch_emails()` to support database session and account email parameters.
- Added comprehensive documentation in `AI_ANALYSIS_GUIDE.md` detailing the new AI analysis functionality.
- Implemented tests for the new features, including `test_fetch_with_db.py`, `test_ai_analysis.py`, and `test_single_analysis.py`.
- Added error handling and logging improvements throughout the codebase.
This commit is contained in:
bolade
2025-08-11 23:20:20 +01:00
parent d553d6f31e
commit 75a0a3fde7
14 changed files with 1358 additions and 476 deletions
+69 -2
View File
@@ -1,9 +1,27 @@
import json
import os
import time
from typing import Dict, List
from groq import Groq
# Rate limiting variables
_last_api_call = 0
_min_interval = 1.0 # Minimum seconds between API calls
def _rate_limit():
"""Simple rate limiting to avoid overwhelming the Groq API."""
global _last_api_call
current_time = time.time()
time_since_last = current_time - _last_api_call
if time_since_last < _min_interval:
sleep_time = _min_interval - time_since_last
time.sleep(sleep_time)
_last_api_call = time.time()
def _format_messages_for_context(messages: List[dict]) -> str:
lines = []
@@ -76,8 +94,40 @@ def analyze_thread(
)
try:
# Validate input before sending to API
if not msgs:
return _heuristic_analyze(msgs)
# Check message content length to avoid oversized requests
formatted_context = _format_messages_for_context(msgs)
if len(formatted_context) > 10000: # Limit context size
# Truncate messages if too long
truncated_msgs = msgs[-2:] # Use only last 2 messages
formatted_context = _format_messages_for_context(truncated_msgs)
print(
f"Warning: Truncated message context due to length ({len(formatted_context)} chars)"
)
user_prompt = (
f"Thread subject: {thread_subject or ''}\n\n"
"Recent messages (oldest to newest):\n\n"
f"{formatted_context}\n\n"
"Respond with only JSON, no extra commentary."
)
# Validate prompt length
total_prompt_length = len(system_prompt) + len(user_prompt)
if total_prompt_length > 15000: # Further reduce if still too long
print(
f"Warning: Prompt too long ({total_prompt_length} chars), falling back to heuristic"
)
return _heuristic_analyze(msgs)
# Apply rate limiting before API call
_rate_limit()
completion = client.chat.completions.create(
model=os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile"),
model=os.getenv("GROQ_MODEL", "llama3-8b-8192"),
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
@@ -96,6 +146,23 @@ def analyze_thread(
):
raise ValueError("Invalid schema from model")
return data
except Exception:
except json.JSONDecodeError as e:
print(f"JSON decode error from Groq API: {e}")
return _heuristic_analyze(msgs)
except Exception as e:
# Log the specific error for debugging
error_msg = str(e)
print(f"Groq API error: {error_msg}")
# Check for specific error types
if "400" in error_msg or "Bad Request" in error_msg:
print("400 Bad Request - likely prompt too long or invalid format")
elif "429" in error_msg or "rate limit" in error_msg.lower():
print("Rate limit exceeded - consider reducing concurrent requests")
elif "401" in error_msg or "unauthorized" in error_msg.lower():
print("Unauthorized - check GROQ_API_KEY")
elif "503" in error_msg or "service unavailable" in error_msg.lower():
print("Service unavailable - Groq API may be down")
# Fallback to heuristic
return _heuristic_analyze(msgs)
+128 -30
View File
@@ -1,10 +1,11 @@
import asyncio
import json
import logging
import os
from contextlib import suppress
from typing import List
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
@@ -21,6 +22,9 @@ from src.database import (
ingest_emails,
)
from src.zoho_client import ZohoClient
from dotenv import load_dotenv
load_dotenv()
def get_db():
@@ -31,6 +35,9 @@ def get_db():
db.close()
logging.basicConfig(level=logging.INFO, filename="email_alerts.log")
create_db_tables()
app = FastAPI(title="Email Alerts UI")
@@ -256,25 +263,9 @@ def _sync_emails_once(cfg: dict) -> int:
"""Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count."""
from datetime import datetime, timezone
# Mark status
cfg["sync_in_progress"] = True
cfg["last_sync_status"] = "running"
cfg["last_sync_error"] = None
save_config(cfg)
account_email = cfg.get("email_address") or cfg.get("zoho_email") or ""
if not account_email:
cfg.update(
{
"last_sync_status": "error",
"last_sync_error": "Configure email_address or zoho_email in /config",
"sync_in_progress": False,
}
)
save_config(cfg)
raise HTTPException(
status_code=400, detail="Configure email_address or zoho_email in /config"
)
raise ValueError("Configure email_address or zoho_email in /config")
# Incremental lookback by last_sync_at
days_back = int(cfg.get("email_days_back", 7) or 7)
@@ -292,17 +283,26 @@ def _sync_emails_once(cfg: dict) -> int:
email=cfg.get("zoho_email") or account_email,
app_password=cfg.get("zoho_app_password"),
)
db = SessionLocal()
try:
inbox = client.fetch_folder_emails(
folder="INBOX", max_results=max_results, days_back=days_back
folder="INBOX",
max_results=max_results,
days_back=days_back,
db_session=db,
account_email=account_email,
)
sent = client.fetch_folder_emails(
folder="Sent", max_results=max_results, days_back=days_back
folder="Sent",
max_results=max_results,
days_back=days_back,
db_session=db,
account_email=account_email,
)
finally:
client.close()
db = SessionLocal()
try:
ingest_emails(
db, account_email=account_email, emails=inbox, default_folder="INBOX"
@@ -321,23 +321,54 @@ def _sync_emails_once(cfg: dict) -> int:
db.close()
def _sync_emails_background_task():
"""Background task to sync emails and update config status."""
from datetime import datetime, timezone
cfg = load_config()
try:
count = _sync_emails_once(cfg)
# Update success status
cfg = load_config() # Reload in case it was modified
cfg.update(
{
"sync_in_progress": False,
"last_sync_status": "success",
"last_sync_at": datetime.now(timezone.utc).isoformat(),
"last_sync_count": count,
"last_sync_error": None,
}
)
save_config(cfg)
except Exception as e:
# Update error status
cfg = load_config() # Reload in case it was modified
cfg.update(
{
"sync_in_progress": False,
"last_sync_status": "error",
"last_sync_error": str(e),
}
)
save_config(cfg)
logging.error(f"Email sync failed: {e}")
@app.post("/sync_emails")
async def sync_emails():
async def sync_emails(background_tasks: BackgroundTasks):
cfg = load_config()
if cfg.get("sync_in_progress"):
return RedirectResponse(url="/?sync=busy", status_code=303)
async def _task():
try:
_sync_emails_once(load_config())
except Exception:
pass
# Mark sync as starting
cfg["sync_in_progress"] = True
cfg["last_sync_status"] = "running"
cfg["last_sync_error"] = None
save_config(cfg)
asyncio.create_task(_task())
# Add the background task
background_tasks.add_task(_sync_emails_background_task)
return RedirectResponse(url="/?sync=started", status_code=303)
@@ -358,7 +389,40 @@ async def _auto_runner():
# Sync emails then run alerts
try:
if not cfg.get("sync_in_progress"):
_sync_emails_once(cfg)
from datetime import datetime, timezone
# Mark sync as starting
cfg["sync_in_progress"] = True
cfg["last_sync_status"] = "running"
cfg["last_sync_error"] = None
save_config(cfg)
try:
count = _sync_emails_once(cfg)
# Update success status
cfg = load_config() # Reload in case it was modified
cfg.update(
{
"sync_in_progress": False,
"last_sync_status": "success",
"last_sync_at": datetime.now(timezone.utc).isoformat(),
"last_sync_count": count,
"last_sync_error": None,
}
)
save_config(cfg)
except Exception as e:
# Update error status
cfg = load_config() # Reload in case it was modified
cfg.update(
{
"sync_in_progress": False,
"last_sync_status": "error",
"last_sync_error": str(e),
}
)
save_config(cfg)
logging.error(f"Auto email sync failed: {e}")
except Exception:
# keep loop alive
pass
@@ -377,6 +441,23 @@ async def _auto_runner():
@app.on_event("startup")
async def on_startup():
# Reset any stale sync status from previous session
try:
cfg = load_config()
if cfg.get("sync_in_progress"):
cfg.update(
{
"sync_in_progress": False,
"last_sync_status": "interrupted",
"last_sync_error": "Previous session ended while sync was in progress",
}
)
save_config(cfg)
logging.info("Reset stale sync status from previous session")
except Exception as e:
logging.error(f"Failed to reset stale sync status on startup: {e}")
# Start the auto-processing task
global _auto_task
_stop_event.clear()
_auto_task = asyncio.create_task(_auto_runner())
@@ -384,6 +465,23 @@ async def on_startup():
@app.on_event("shutdown")
async def on_shutdown():
# Reset sync status if it's currently in progress
try:
cfg = load_config()
if cfg.get("sync_in_progress"):
cfg.update(
{
"sync_in_progress": False,
"last_sync_status": "interrupted",
"last_sync_error": "Server shutdown while sync was in progress",
}
)
save_config(cfg)
logging.info("Reset sync status due to server shutdown")
except Exception as e:
logging.error(f"Failed to reset sync status on shutdown: {e}")
# Stop the auto-processing task
if _auto_task:
_stop_event.set()
with suppress(Exception):
+177
View File
@@ -1,3 +1,5 @@
import asyncio
from datetime import datetime
from email.utils import parseaddr
from typing import Annotated, Iterable, List, Optional, Tuple
@@ -390,3 +392,178 @@ def ingest_emails(
)
db.commit()
def get_latest_email_date(
db: Session, account_email: str, folder: str = None
) -> Optional[datetime]:
"""
Get the latest email date for a specific account and folder (if specified).
Returns None if no emails exist.
"""
query = (
db.query(func.max(Message.date_sent))
.join(Thread, Message.thread_id == Thread.id)
.filter(Thread.account_email == account_email.lower())
)
if folder:
query = query.filter(Message.folder == folder)
result = query.scalar()
return result
async def analyze_and_update_threads_async(
db: Session,
account_email: str,
thread_ids: Optional[List[int]] = None,
max_concurrent: int = 5,
only_unanalyzed: bool = True,
) -> None:
"""
Asynchronously analyze threads and update their AI summaries and actionable status.
Args:
db: Database session to use
account_email: The email account to process threads for
thread_ids: Optional list of specific thread IDs to analyze. If None, analyzes all threads.
max_concurrent: Maximum number of concurrent AI analysis tasks
only_unanalyzed: If True, only analyze threads that haven't been analyzed yet
"""
try:
from ai import analyze_thread
except ImportError:
# Try absolute import if relative import fails
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, current_dir)
from ai import analyze_thread
# Get threads to analyze
query = db.query(Thread).filter(Thread.account_email == account_email.lower())
if thread_ids:
query = query.filter(Thread.id.in_(thread_ids))
if only_unanalyzed:
query = query.filter(Thread.last_analyzed_at.is_(None))
threads = query.all()
if not threads:
print(f"No threads to analyze for {account_email}")
return
print(f"Found {len(threads)} threads to analyze for {account_email}")
# Create semaphore to limit concurrent tasks
semaphore = asyncio.Semaphore(max_concurrent)
async def analyze_single_thread(thread: Thread) -> None:
async with semaphore:
try:
# Get messages for this thread
messages = get_thread_messages(db, thread.id)
if not messages:
print(f"No messages found for thread {thread.id}")
return
# Convert messages to dict format for AI analysis
message_dicts = []
for msg in messages:
message_dicts.append(
{
"date_sent": msg.date_sent,
"is_incoming": msg.is_incoming,
"subject": msg.subject,
"from_email": msg.from_email,
"to_email": msg.to_email,
"body": msg.body,
}
)
# Run AI analysis in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
analysis_result = await loop.run_in_executor(
None, analyze_thread, thread.subject, message_dicts
)
# Update thread with analysis results
thread.actionable = analysis_result.get("actionable", False)
thread.ai_summary = analysis_result.get("summary", "")
thread.ai_confidence = float(analysis_result.get("confidence", 0.0))
thread.last_analyzed_at = (
datetime.now()
) # Using datetime.now() instead of utcnow()
print(
f"Analyzed thread {thread.id}: actionable={thread.actionable}, confidence={thread.ai_confidence:.2f}"
)
except Exception as e:
print(f"Error analyzing thread {thread.id}: {e}")
# Create tasks for all threads
tasks = [analyze_single_thread(thread) for thread in threads]
# Run all tasks concurrently
await asyncio.gather(*tasks, return_exceptions=True)
print(f"Successfully analyzed {len(threads)} threads")
def analyze_and_update_threads(
account_email: str,
thread_ids: Optional[List[int]] = None,
max_concurrent: int = 3, # Reduced default to avoid overwhelming API
only_unanalyzed: bool = True,
) -> None:
"""
Synchronous wrapper for the async thread analysis function.
Args:
account_email: The email account to process threads for
thread_ids: Optional list of specific thread IDs to analyze. If None, analyzes all threads.
max_concurrent: Maximum number of concurrent AI analysis tasks (default: 3, reduced to avoid API limits)
only_unanalyzed: If True, only analyze threads that haven't been analyzed yet
"""
db = SessionLocal()
try:
asyncio.run(
analyze_and_update_threads_async(
db=db,
account_email=account_email,
thread_ids=thread_ids,
max_concurrent=max_concurrent,
only_unanalyzed=only_unanalyzed,
)
)
# Commit all changes
db.commit()
except Exception as e:
db.rollback()
print(f"Error in analyze_and_update_threads: {e}")
raise
finally:
db.close()
def get_threads_needing_analysis(db: Session, account_email: str) -> List[Thread]:
"""
Get threads that haven't been analyzed yet or need re-analysis.
"""
return (
db.query(Thread)
.filter(
Thread.account_email == account_email.lower(),
Thread.last_analyzed_at.is_(None),
)
.order_by(Thread.updated_at.desc())
.all()
)
+128 -32
View File
@@ -1,8 +1,10 @@
import email
import imaplib
import logging
import os
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from email.header import decode_header
from email.utils import parsedate_to_datetime
from typing import Any, Dict, List
from dotenv import load_dotenv
@@ -41,17 +43,63 @@ class ZohoClient:
query: str = None,
max_results: int = None,
days_back: int = 7,
db_session=None,
account_email: str = None,
) -> List[Dict[str, Any]]:
"""Fetch emails from a given folder with date filtering"""
"""Fetch emails from a given folder with intelligent date filtering
Args:
folder: Folder to fetch from (default: INBOX)
query: Additional search query
max_results: Maximum number of emails to fetch
days_back: Fallback days to search if no database date available (default: 7)
db_session: Database session to check for latest email date
account_email: Account email to query latest date for
Returns:
List of email dictionaries
If db_session and account_email are provided, will fetch emails starting from
the latest email date in the database. Otherwise, falls back to days_back.
"""
try:
# Select folder
mailbox = '"Sent"' if folder.lower() == "sent" else folder
print(f"📥 Selecting {folder}...[STEP 2]")
self.connection.select(mailbox)
# Build search criteria - only emails from specified days back
days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
search_criteria = f"SINCE {days_ago}"
# Determine the start date for fetching emails
start_date = None
# If database session and account email are provided, try to get the latest email date
if db_session and account_email:
try:
from .database import get_latest_email_date
latest_date = get_latest_email_date(
db_session, account_email, folder
)
if latest_date:
# Add a small buffer (1 minute) to avoid missing emails with same timestamp
start_date = latest_date - timedelta(minutes=1)
print(f"📅 Using latest email date from database: {start_date}")
else:
print(
f"📅 No emails found in database, using days_back: {days_back}"
)
except Exception as e:
print(
f"⚠️ Error getting latest date from database: {e}, falling back to days_back"
)
# Fall back to days_back if no database date available
if start_date is None:
start_date = datetime.now() - timedelta(days=days_back)
print(f"📅 Using fallback date: {start_date}")
# Build search criteria - only emails from the calculated start date
search_date = start_date.strftime("%d-%b-%Y")
search_criteria = f"SINCE {search_date}"
if query:
search_criteria += f" {query}"
@@ -73,42 +121,53 @@ class ZohoClient:
for i, num in enumerate(email_list):
try:
print(f"📧 Fetching email {num.decode()}... [STEP 3] {i}")
logging.info(f"Fetching email {num.decode()}... [STEP 3] {i}")
# Fetch email data
status, data = self.connection.fetch(num, "(RFC822)")
if status == "OK":
raw_email = data[0][1]
email_message = email.message_from_bytes(raw_email)
# Extract headers
subject = self._decode_header(email_message.get("Subject", ""))
from_header = self._decode_header(email_message.get("From", ""))
to_header = self._decode_header(email_message.get("To", ""))
date_header = email_message.get("Date", "")
message_id = email_message.get("Message-ID", "")
in_reply_to = email_message.get("In-Reply-To", "")
email_date = parse_email_date_safely(date_header)
print(f"📅 Email date: {email_date} Latest: {latest_date}")
# Ensure both dates are timezone-aware for comparison
if email_date and latest_date:
# If latest_date is timezone-naive, make it timezone-aware (assume UTC)
if latest_date.tzinfo is None:
latest_date = latest_date.replace(tzinfo=timezone.utc)
if email_date > latest_date:
# Extract headers
subject = self._decode_header(email_message.get("Subject", ""))
from_header = self._decode_header(email_message.get("From", ""))
to_header = self._decode_header(email_message.get("To", ""))
# Generate thread ID (using Message-ID as fallback)
thread_id = message_id or f"thread_{num.decode()}"
message_id = email_message.get("Message-ID", "")
in_reply_to = email_message.get("In-Reply-To", "")
# Get email body snippet
body = self._get_email_body(email_message)
snippet = body[:200] + "..." if len(body) > 200 else body
# Generate thread ID (using Message-ID as fallback)
thread_id = message_id or f"thread_{num.decode()}"
email_data = {
"id": num.decode(),
"threadId": thread_id,
"from": from_header,
"to": to_header,
"subject": subject,
"date": date_header,
"messageId": message_id,
"inReplyTo": in_reply_to,
"folder": folder,
"snippet": snippet,
}
# Get email body snippet
body = self._get_email_body(email_message)
snippet = body[:200] + "..." if len(body) > 200 else body
emails.append(email_data)
email_data = {
"id": num.decode(),
"threadId": thread_id,
"from": from_header,
"to": to_header,
"subject": subject,
"date": date_header,
"messageId": message_id,
"inReplyTo": in_reply_to,
"folder": folder,
"snippet": snippet,
}
emails.append(email_data)
except Exception as e:
print(f"❌ Error processing email {num}: {e}")
@@ -124,11 +183,21 @@ class ZohoClient:
return []
def fetch_emails(
self, query: str = None, max_results: int = None, days_back: int = 7
self,
query: str = None,
max_results: int = None,
days_back: int = 7,
db_session=None,
account_email: str = None,
) -> List[Dict[str, Any]]:
"""Fetch emails from INBOX (backwards-compatible wrapper)."""
return self.fetch_folder_emails(
folder="INBOX", query=query, max_results=max_results, days_back=days_back
folder="INBOX",
query=query,
max_results=max_results,
days_back=days_back,
db_session=db_session,
account_email=account_email,
)
def _decode_header(self, header_value: str) -> str:
@@ -273,6 +342,33 @@ class ZohoClient:
print(f"Error closing connection: {e}")
def parse_email_date_safely(date_string):
"""
Safely parse email date string ensuring timezone awareness.
Args:
date_string: Date string from email header
Returns:
Timezone-aware datetime object or None if parsing fails
"""
if not date_string:
return None
try:
parsed_date = parsedate_to_datetime(date_string)
# If the parsed date is timezone-naive, assume UTC
if parsed_date.tzinfo is None:
parsed_date = parsed_date.replace(tzinfo=timezone.utc)
return parsed_date
except Exception as e:
logging.warning(f"Could not parse date '{date_string}': {e}")
# Return current time as fallback
return datetime.now(timezone.utc)
if __name__ == "__main__":
client = ZohoClient()
emails = client.fetch_emails(max_results=10)