import os import sqlite3 from datetime import datetime from typing import List, Dict, Any from zoho_client import ZohoClient from email_triage import EmailTriage from thread_tracker import ThreadTracker from ai_analyzer import AIAnalyzer from whatsapp_sender import WhatsAppSender from dotenv import load_dotenv load_dotenv() class EmailProcessor: def __init__(self, agency_domains: List[str] = None): self.zoho_client = ZohoClient() self.triage = EmailTriage() self.tracker = ThreadTracker() self.ai_analyzer = AIAnalyzer() self.whatsapp_sender = WhatsAppSender() self.agency_domains = agency_domains or ['projects@manaknightdigital.com'] def process_emails(self, max_results: int = 100, send_alerts: bool = True, days_back: int = 7, time_frames: List[Dict] = None) -> Dict[str, Any]: """Main processing pipeline with optional WhatsApp alerts""" try: # 1. Fetch emails emails = self.zoho_client.fetch_emails(max_results=max_results, days_back=days_back) # 2. Let AI decide which emails are actionable (no hardcoded filtering) actionable_emails = [] for email in emails: # Skip emails from projects@manaknightdigital.com (our own emails) from_email = email.get('from', '').lower() if 'projects@manaknightdigital.com' in from_email: print(f"⏭️ Skipping own email: {email.get('subject', 'No subject')}") continue # Use AI to determine if email needs response summary = self.ai_analyzer.analyze_thread_context([email]) if summary.needs_response: actionable_emails.append((email, summary)) # 3. Update thread tracking and check reply status for email, intent in actionable_emails: self.tracker.update_thread(email['threadId'], email, self.agency_domains) # Check if this thread has been replied to is_replied = self.tracker.check_thread_reply_status(email['threadId'], self.zoho_client, self.agency_domains) if is_replied: # Mark thread as replied with sqlite3.connect(self.tracker.db_path) as conn: conn.execute(""" UPDATE threads SET last_agency_reply = ?, alert_level = 0, is_active = 0 WHERE thread_id = ? """, (email.get('date', datetime.now().isoformat()), email['threadId'])) # 4. Check for alerts alert_threads = self.tracker.get_threads_needing_alerts(time_frames) # Print number of threads that will trigger alerts if alert_threads: print(f"🚨 Found {len(alert_threads)} threads needing alerts") for thread in alert_threads: print(f" - {thread.subject} ({thread.alert_level} level alert)") else: print("✅ No threads currently need alerts") # 5. Generate AI summaries and send alerts alert_summaries = [] sent_alerts = [] # Create a mapping of thread_id to actual email data thread_to_email = {email['threadId']: email for email, intent in actionable_emails} for thread in alert_threads: # Get the actual email data for this thread email_data = thread_to_email.get(thread.thread_id) if email_data: # Use real email data for AI analysis thread_messages = [email_data] summary = self.ai_analyzer.analyze_thread_context(thread_messages) # Only send alerts if AI determines email needs response if summary.needs_response: alert_message = self.ai_analyzer.generate_alert_message( thread.thread_id, summary, thread.alert_level, email_data ) alert_summary = { 'thread_id': thread.thread_id, 'alert_level': thread.alert_level, 'summary': summary, 'message': alert_message } alert_summaries.append(alert_summary) # Send WhatsApp alert if enabled if send_alerts: send_result = self.whatsapp_sender.send_alert( alert_message, thread.thread_id ) sent_alerts.append(send_result) else: print(f" ⏭️ Skipping alert for thread {thread.thread_id} - AI determined no response needed") return { 'total_emails': len(emails), 'actionable_emails': len(actionable_emails), 'alert_threads': alert_threads, 'alert_summaries': alert_summaries, 'sent_alerts': sent_alerts, 'status': 'success' } except Exception as e: return {'status': 'error', 'error': str(e)} def get_alert_summary(self, alert_threads: List) -> List[Dict[str, Any]]: """Generate alert summaries with AI analysis""" summaries = [] alert_levels = {1: "LEVEL 1", 2: "LEVEL 2 - URGENT", 3: "LEVEL 3 - CRITICAL"} for thread in alert_threads: # This would need to be updated to use real email data summaries.append({ 'thread_id': thread.thread_id, 'alert_level': alert_levels.get(thread.alert_level, "UNKNOWN"), 'last_message_date': thread.last_external_message.strftime("%Y-%m-%d %H:%M"), 'ai_summary': "Real AI analysis", 'urgency': "Real urgency", 'action_required': "Real action" }) return summaries if __name__ == "__main__": processor = EmailProcessor() result = processor.process_emails(max_results=10, send_alerts=True) print(f"Processed {result.get('total_emails', 0)} emails, {result.get('actionable_emails', 0)} actionable") print(f"Generated {len(result.get('alert_summaries', []))} AI summaries") print(f"Sent {len(result.get('sent_alerts', []))} WhatsApp alerts")