diff --git a/src/ai.py b/src/ai.py index d8012aa..c621ab0 100644 --- a/src/ai.py +++ b/src/ai.py @@ -60,7 +60,7 @@ def _heuristic_analyze(messages: List[dict]) -> Dict: def analyze_thread( - thread_subject: str, messages: List[dict], max_messages: int = 4 + thread_subject: str, messages: List[dict], max_messages: int = 6 ) -> Dict: """ Analyze a thread using Groq LLM. Returns dict with keys: @@ -81,8 +81,10 @@ def analyze_thread( system_prompt = ( "You are a helpful assistant that triages email threads and writes concise summaries. " - "Decide if the thread requires a reply from our side now, based on the last few messages. " - "Ignore newsletters/automations (e.g., from no-reply), and focus on whether there's a clear question or request. " + "The account owner has received emails and needs to determine which require their attention/reply. " + "Analyze if this thread is ACTIONABLE - meaning the owner should respond to address a question, request, or important matter. " + "Mark as NOT actionable if: newsletters, automated notifications (e.g., from no-reply), FYI updates, or conversations already concluded. " + "Mark as ACTIONABLE if: direct questions, action requests, important discussions awaiting response, or business opportunities. " "Return a strict JSON object with keys: actionable (true/false), summary (<= 80 words), confidence (0..1)." ) @@ -110,8 +112,12 @@ def analyze_thread( user_prompt = ( f"Thread subject: {thread_subject or ''}\n\n" - "Recent messages (oldest to newest):\n\n" + "Recent messages (oldest to newest):\n" + "[IN] = incoming message TO the account owner (from someone else)\n" + "[OUT] = outgoing message FROM the account owner (their reply)\n\n" f"{formatted_context}\n\n" + "Based on this email thread, does the account owner need to take action or reply? " + "Consider: Has the owner already responded to the latest inquiry? Is there an open question or request? " "Respond with only JSON, no extra commentary." ) diff --git a/src/alerts.py b/src/alerts.py index 0bd03ef..2464788 100644 --- a/src/alerts.py +++ b/src/alerts.py @@ -4,14 +4,14 @@ from typing import List, Optional from sqlalchemy.orm import Session -from src.ai import analyze_thread -from src.database import ( +from ai import analyze_thread +from database import ( Message, Thread, get_last_incoming_outgoing, get_thread_messages, ) -from src.whatsapp_sender import WhatsAppSender +from whatsapp_sender import WhatsAppSender @dataclass @@ -68,18 +68,21 @@ def _format_alert( def process_alerts(db: Session, cfg: dict) -> List[int]: - """Analyze threads and send WhatsApp alerts when thresholds are met. + """Analyze threads and send WhatsApp alerts when thresholds are met.""" + import logging - Returns list of thread IDs that had alerts sent this run. - """ account_email = (cfg.get("email_address") or cfg.get("zoho_email") or "").lower() if not account_email: + logging.warning("No account_email configured") return [] frames = _load_frames_from_config(cfg) if not frames: + logging.warning("No time frames configured") return [] + logging.info(f"Processing alerts for {account_email} with frames: {frames}") + # Ensure thresholds unique by level level_to_hours = {f.alert_level: f.hours for f in frames} @@ -92,29 +95,34 @@ def process_alerts(db: Session, cfg: dict) -> List[int]: .all() ) + logging.info(f"Found {len(threads)} threads requiring reply") + to_number = cfg.get("whatsapp_to") or None + logging.info(f"WhatsApp target: {to_number}") sender = WhatsAppSender(to_number=to_number) alerted = [] now = datetime.now(timezone.utc) for t in threads: + logging.info(f"Checking thread {t.id}: {t.subject}") + + # Get the last incoming message to calculate time threshold last_in, last_out = get_last_incoming_outgoing(db, t.id) if not last_in: + logging.info(f" Thread {t.id}: No incoming messages, skipping") continue + last_in_dt = _utc(last_in.date_sent) if last_in.date_sent else None - last_out_dt = ( - _utc(last_out.date_sent) if last_out and last_out.date_sent else None - ) - - # Only if last message is incoming or last_in is later than last_out - if last_out_dt and last_out_dt > last_in_dt: - print("Here in lies the problem") - # There's a reply from us after the last incoming; skip - continue + # Calculate time since the last incoming message (that needs owner's reply) + # Note: requires_reply=True means the latest message overall is incoming, + # so we trust the database flag instead of re-checking here hours_since_last_in = ( (now - last_in_dt).total_seconds() / 3600.0 if last_in_dt else 0 ) + logging.info( + f" Thread {t.id}: Hours since last incoming: {hours_since_last_in:.2f}" + ) # Determine highest frame crossed target_level = 0 @@ -124,14 +132,23 @@ def process_alerts(db: Session, cfg: dict) -> List[int]: target_level = level target_hours = hours + logging.info( + f" Thread {t.id}: Target level: {target_level}, hours: {target_hours}" + ) + if target_level == 0: + logging.info(f" Thread {t.id}: No threshold crossed, skipping") continue # Avoid re-sending same or lower level if (t.last_alert_level_sent or 0) >= target_level: + logging.info( + f" Thread {t.id}: Already alerted at level {t.last_alert_level_sent}, skipping" + ) continue - # Build AI input messages + # Build AI input messages - get recent context (last 6 messages for better understanding) + all_messages = get_thread_messages(db, t.id) msgs = [ { "date_sent": m.date_sent.isoformat() if m.date_sent else None, @@ -141,11 +158,16 @@ def process_alerts(db: Session, cfg: dict) -> List[int]: "body": m.body, "is_incoming": m.is_incoming, } - for m in get_thread_messages(db, t.id)[-4:] + for m in all_messages[-6:] # Last 6 messages for better context ] - print(f"These are the messages: {msgs}") + logging.info( + f" Thread {t.id}: Analyzing {len(msgs)} messages (total: {len(all_messages)})" + ) ai = analyze_thread(t.subject or "", msgs) + logging.info( + f" Thread {t.id}: AI result - actionable: {ai.get('actionable')}, confidence: {ai.get('confidence')}" + ) # Persist AI decision on thread t.actionable = bool(ai.get("actionable", False)) @@ -154,16 +176,21 @@ def process_alerts(db: Session, cfg: dict) -> List[int]: t.last_analyzed_at = now if not t.actionable: - # Don't alert on non-actionable + logging.info(f" Thread {t.id}: Not actionable, skipping alert") continue # Compose and send WhatsApp msg = _format_alert(target_level, target_hours, t, ai, last_in) + logging.info(f" Thread {t.id}: Sending alert via WhatsApp") res = sender.send_alert(msg, thread_id=str(t.id)) + logging.info(f" Thread {t.id}: WhatsApp response: {res}") + if res.get("status") == "success": t.last_alert_level_sent = target_level t.last_alert_sent_at = now alerted.append(t.id) + logging.info(f" Thread {t.id}: Alert sent successfully!") db.commit() + logging.info(f"Total alerts sent: {len(alerted)}") return alerted diff --git a/src/app.py b/src/app.py index b1dea8d..1980ab8 100644 --- a/src/app.py +++ b/src/app.py @@ -12,9 +12,9 @@ from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session -from src.ai import analyze_thread -from src.alerts import process_alerts -from src.database import ( +from ai import analyze_thread +from alerts import process_alerts +from database import ( Message, SessionLocal, Thread, @@ -22,7 +22,7 @@ from src.database import ( get_thread_messages, ingest_emails, ) -from src.zoho_client import ZohoClient +from zoho_client import ZohoClient load_dotenv() @@ -364,10 +364,12 @@ def send_to_all(message: str): asyncio.run_coroutine_threadsafe(manager.broadcast(message), loop) -def _sync_emails_once(cfg: dict) -> int: +async def _sync_emails_once(cfg: dict) -> int: """Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count.""" from datetime import datetime, timezone + from database import analyze_and_update_threads_async + account_email = cfg.get("email_address") or cfg.get("zoho_email") or "" if not account_email: raise ValueError("Configure email_address or zoho_email in /config") @@ -383,7 +385,7 @@ def _sync_emails_once(cfg: dict) -> int: days_back = max(1, delta_days) except Exception: pass - max_results = 10 + max_results = None client = ZohoClient( email=cfg.get("zoho_email") or account_email, app_password=cfg.get("zoho_app_password"), @@ -414,12 +416,24 @@ def _sync_emails_once(cfg: dict) -> int: send_to_all("Analysing Threads with AI") try: + # Ingest emails from both folders ingest_emails( db, account_email=account_email, emails=inbox, default_folder="INBOX" ) - ingest_emails( + needs_analysis_sent = ingest_emails( db, account_email=account_email, emails=sent, default_folder="Sent" ) + + # Run AI analysis if we ingested sent emails + if needs_analysis_sent: + await analyze_and_update_threads_async( + db=db, + account_email=account_email, + max_concurrent=3, + only_unanalyzed=True, + ) + db.commit() + # Return count for UX/debug count = ( db.query(Thread) @@ -433,13 +447,13 @@ def _sync_emails_once(cfg: dict) -> int: db.close() -def _sync_emails_background_task(): +async def _sync_emails_background_task(): """Background task to sync emails and update config status.""" from datetime import datetime, timezone cfg = load_config() try: - count = _sync_emails_once(cfg) + count = await _sync_emails_once(cfg) # Update success status cfg = load_config() # Reload in case it was modified cfg.update( @@ -532,8 +546,9 @@ _stop_event = asyncio.Event() async def _auto_runner(): - # Delay a bit on startup - await asyncio.sleep(2) + # Delay on startup to let the web interface load first + # This prevents blocking the initial page load with email sync + await asyncio.sleep(10) # 10 second delay before first auto-sync while not _stop_event.is_set(): cfg = load_config() interval_min = int(cfg.get("auto_process_interval", 30) or 30) @@ -550,7 +565,7 @@ async def _auto_runner(): save_config(cfg) try: - count = _sync_emails_once(cfg) + count = await _sync_emails_once(cfg) # Update success status cfg = load_config() # Reload in case it was modified cfg.update( @@ -639,3 +654,9 @@ async def on_shutdown(): _stop_event.set() with suppress(Exception): await _auto_task + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app="app:app", host="0.0.0.0", port=8010) diff --git a/src/database.py b/src/database.py index bffe2cd..1b1655d 100644 --- a/src/database.py +++ b/src/database.py @@ -350,6 +350,7 @@ def ingest_emails( Expected fields per email dict: subject, from, date, snippet/body, messageId, optional inReplyTo, optional to. """ from datetime import datetime + folder = default_folder for e in emails: # Map common keys from ZohoClient output @@ -393,10 +394,9 @@ def ingest_emails( db.commit() - if folder == "Sent": - analyze_and_update_threads( - account_email=account_email, max_concurrent=3, only_unanalyzed=True - ) + # Return whether analysis is needed (for async handling by caller) + return folder == "Sent" + def get_latest_email_date( db: Session, account_email: str, folder: str = None @@ -529,6 +529,9 @@ def analyze_and_update_threads( """ Synchronous wrapper for the async thread analysis function. + WARNING: This will fail if called from within an async event loop. + Use analyze_and_update_threads_async directly in async contexts. + Args: account_email: The email account to process threads for thread_ids: Optional list of specific thread IDs to analyze. If None, analyzes all threads. diff --git a/src/whatsapp_sender.py b/src/whatsapp_sender.py index bd52c43..60d0c2d 100644 --- a/src/whatsapp_sender.py +++ b/src/whatsapp_sender.py @@ -1,32 +1,49 @@ +import logging import os from typing import Any, Dict, List -from dotenv import load_dotenv from twilio.base.exceptions import TwilioException from twilio.rest import Client -load_dotenv() - class WhatsAppSender: def __init__(self, to_number: str | None = None): - self.account_sid = os.getenv("TWILIO_ACCOUNT_SID") - self.auth_token = os.getenv("TWILIO_AUTH_TOKEN") - self.from_number = os.getenv("TWILIO_WHATSAPP_NUMBER") - env_to = os.getenv("WHATSAPP_TO_NUMBER") + self.account_sid = (os.getenv("TWILIO_ACCOUNT_SID") or "").strip() + self.auth_token = (os.getenv("TWILIO_AUTH_TOKEN") or "").strip() + self.from_number = (os.getenv("TWILIO_WHATSAPP_NUMBER") or "").strip() + env_to = (os.getenv("WHATSAPP_TO_NUMBER") or "").strip() self.to_number = to_number or env_to # Individual phone number + # Log credential status (without exposing full credentials) + logging.info( + f"Twilio config: SID={'✓' if self.account_sid else '✗'} " + f"({self.account_sid[:8] + '...' if self.account_sid else 'missing'}), " + f"Token={'✓' if self.auth_token else '✗'}, " + f"From={'✓' if self.from_number else '✗'} ({self.from_number}), " + f"To={'✓' if self.to_number else '✗'} ({self.to_number})" + ) + if self.account_sid and self.auth_token and self.from_number and self.to_number: try: self.client = Client(self.account_sid, self.auth_token) self.use_mock = False + logging.info("Twilio client initialized successfully") except Exception as e: - print(f"Warning: Twilio client failed to initialize: {e}") + logging.error(f"Twilio client failed to initialize: {e}") self.use_mock = True else: self.use_mock = True - print( - "Note: Using mock WhatsApp sender (set TWILIO_* and WHATSAPP_TO_NUMBER or pass to_number)" + missing = [] + if not self.account_sid: + missing.append("TWILIO_ACCOUNT_SID") + if not self.auth_token: + missing.append("TWILIO_AUTH_TOKEN") + if not self.from_number: + missing.append("TWILIO_WHATSAPP_NUMBER") + if not self.to_number: + missing.append("WHATSAPP_TO_NUMBER or to_number parameter") + logging.warning( + f"Using mock WhatsApp sender. Missing: {', '.join(missing)}" ) def send_alert(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]: @@ -53,8 +70,27 @@ class WhatsAppSender: } except TwilioException as e: - print(f"WhatsApp send error: {e}") - return {"status": "error", "error": str(e), "thread_id": thread_id} + error_msg = str(e) + logging.error(f"WhatsApp send error: {error_msg}") + + # Provide specific guidance based on error + if "20003" in error_msg or "Authenticate" in error_msg: + logging.error( + "Authentication failed! Check your Twilio credentials:\n" + " 1. Verify TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN in .env file\n" + " 2. Make sure there are no extra spaces or quotes\n" + " 3. Confirm you're using the correct credentials (Live vs Test)\n" + " 4. Check https://console.twilio.com for correct values" + ) + elif "21211" in error_msg: + logging.error(f"Invalid 'To' phone number: {self.to_number}") + elif "21408" in error_msg: + logging.error( + f"WhatsApp not enabled for number: {self.from_number}\n" + " Enable WhatsApp at https://console.twilio.com/us1/develop/sms/senders/whatsapp-senders" + ) + + return {"status": "error", "error": error_msg, "thread_id": thread_id} def _mock_send(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]: """Mock WhatsApp sending for testing""" diff --git a/src/zoho_client.py b/src/zoho_client.py index c7ea940..b1118e5 100644 --- a/src/zoho_client.py +++ b/src/zoho_client.py @@ -9,6 +9,8 @@ from typing import Any, Dict, List from dotenv import load_dotenv +from database import get_latest_email_date + load_dotenv() @@ -71,11 +73,11 @@ class ZohoClient: # Determine the start date for fetching emails start_date = None first_time = True + latest_date = None # Initialize to None to avoid UnboundLocalError + # If database session and account email are provided, try to get the latest email date if db_session and account_email: try: - from .database import get_latest_email_date - latest_date = get_latest_email_date( db_session, account_email, folder ) @@ -93,12 +95,17 @@ class ZohoClient: print( f"⚠️ Error getting latest date from database: {e}, falling back to days_back" ) + latest_date = None # Ensure latest_date is None if error occurs # Fall back to days_back if no database date available if start_date is None: start_date = datetime.now() - timedelta(days=days_back) print(f"📅 Using fallback date: {start_date}") + # Set latest_date for comparison if not already set + if latest_date is None: + latest_date = start_date + # Build search criteria - only emails from the calculated start date search_date = start_date.strftime("%d-%b-%Y") search_criteria = f"SINCE {search_date}" diff --git a/test_twilio.py b/test_twilio.py new file mode 100644 index 0000000..35a3912 --- /dev/null +++ b/test_twilio.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +""" +Test Twilio credentials and WhatsApp configuration +""" + +import os + +from dotenv import load_dotenv +from twilio.rest import Client + +# Load environment variables +load_dotenv() + + +def test_credentials(): + print("=" * 60) + print("TWILIO CREDENTIALS TEST") + print("=" * 60) + + # Get credentials + account_sid = (os.getenv("TWILIO_ACCOUNT_SID") or "").strip() + auth_token = (os.getenv("TWILIO_AUTH_TOKEN") or "").strip() + from_number = (os.getenv("TWILIO_WHATSAPP_NUMBER") or "").strip() + to_number = (os.getenv("WHATSAPP_TO_NUMBER") or "").strip() + + # Check if credentials exist + print("\n1. Checking environment variables:") + print(f" TWILIO_ACCOUNT_SID: {'✓ Found' if account_sid else '✗ Missing'}") + if account_sid: + print(f" Value: {account_sid[:8]}...{account_sid[-4:]}") + + print(f" TWILIO_AUTH_TOKEN: {'✓ Found' if auth_token else '✗ Missing'}") + if auth_token: + print(f" Value: {'*' * len(auth_token[:4])}...{auth_token[-4:]}") + + print(f" TWILIO_WHATSAPP_NUMBER: {'✓ Found' if from_number else '✗ Missing'}") + if from_number: + print(f" Value: {from_number}") + + print(f" WHATSAPP_TO_NUMBER: {'✓ Found' if to_number else '✗ Missing'}") + if to_number: + print(f" Value: {to_number}") + + # Test authentication + if not (account_sid and auth_token): + print("\n❌ Missing required credentials!") + print("\nPlease check your .env file and ensure:") + print(" - TWILIO_ACCOUNT_SID is set") + print(" - TWILIO_AUTH_TOKEN is set") + print(" - No extra spaces or quotes around values") + return False + + print("\n2. Testing Twilio authentication...") + try: + client = Client(account_sid, auth_token) + + # Try to fetch account info to verify auth + account = client.api.accounts(account_sid).fetch() + print(" ✓ Authentication successful!") + print(f" Account: {account.friendly_name}") + print(f" Status: {account.status}") + + except Exception as e: + print(" ✗ Authentication failed!") + print(f" Error: {e}") + print("\n Troubleshooting:") + print(" 1. Go to https://console.twilio.com") + print(" 2. Navigate to Account > API keys & tokens") + print(" 3. Copy the Account SID and Auth Token") + print(" 4. Update your .env file with correct values") + print(" 5. Make sure there are NO quotes or spaces") + return False + + # Test WhatsApp sender configuration + print("\n3. Testing WhatsApp sender configuration...") + if not from_number: + print(" ✗ TWILIO_WHATSAPP_NUMBER not set") + print(" Please set up WhatsApp sender at:") + print(" https://console.twilio.com/us1/develop/sms/senders/whatsapp-senders") + return False + + if not from_number.startswith("+"): + print(f" ⚠ Warning: Phone number should start with '+': {from_number}") + + print(f" ✓ WhatsApp sender configured: {from_number}") + + # Test recipient + print("\n4. Testing recipient configuration...") + if not to_number: + print(" ✗ WHATSAPP_TO_NUMBER not set") + return False + + if not to_number.startswith("+"): + print(f" ⚠ Warning: Phone number should start with '+': {to_number}") + + print(f" ✓ Recipient configured: {to_number}") + + print("\n" + "=" * 60) + print("✓ ALL CHECKS PASSED - Ready to send WhatsApp messages!") + print("=" * 60) + return True + + +if __name__ == "__main__": + success = test_credentials() + + if success: + print("\nYou can now send test messages with:") + print( + " python -c 'from src.whatsapp_sender import WhatsAppSender; " + 'w = WhatsAppSender(); print(w.send_alert("Test message"))\'' + ) + else: + print("\n❌ Please fix the issues above before sending WhatsApp messages")