feat: Update AI analysis and alert processing for improved email thread management

- Increased the maximum number of messages analyzed in the `analyze_thread` function from 4 to 6 for better context.
- Enhanced the system prompt to clarify the criteria for determining if a thread is actionable.
- Improved logging in the `process_alerts` function to provide detailed insights during alert processing.
- Refactored the WhatsApp sender to include better error handling and logging for Twilio integration.
- Updated email ingestion logic to return whether analysis is needed based on the folder processed.
This commit is contained in:
bolade
2025-11-03 22:51:43 +01:00
parent 0c94225ff8
commit 6db9f5255d
7 changed files with 267 additions and 53 deletions
+10 -4
View File
@@ -60,7 +60,7 @@ def _heuristic_analyze(messages: List[dict]) -> Dict:
def analyze_thread( def analyze_thread(
thread_subject: str, messages: List[dict], max_messages: int = 4 thread_subject: str, messages: List[dict], max_messages: int = 6
) -> Dict: ) -> Dict:
""" """
Analyze a thread using Groq LLM. Returns dict with keys: Analyze a thread using Groq LLM. Returns dict with keys:
@@ -81,8 +81,10 @@ def analyze_thread(
system_prompt = ( system_prompt = (
"You are a helpful assistant that triages email threads and writes concise summaries. " "You are a helpful assistant that triages email threads and writes concise summaries. "
"Decide if the thread requires a reply from our side now, based on the last few messages. " "The account owner has received emails and needs to determine which require their attention/reply. "
"Ignore newsletters/automations (e.g., from no-reply), and focus on whether there's a clear question or request. " "Analyze if this thread is ACTIONABLE - meaning the owner should respond to address a question, request, or important matter. "
"Mark as NOT actionable if: newsletters, automated notifications (e.g., from no-reply), FYI updates, or conversations already concluded. "
"Mark as ACTIONABLE if: direct questions, action requests, important discussions awaiting response, or business opportunities. "
"Return a strict JSON object with keys: actionable (true/false), summary (<= 80 words), confidence (0..1)." "Return a strict JSON object with keys: actionable (true/false), summary (<= 80 words), confidence (0..1)."
) )
@@ -110,8 +112,12 @@ def analyze_thread(
user_prompt = ( user_prompt = (
f"Thread subject: {thread_subject or ''}\n\n" f"Thread subject: {thread_subject or ''}\n\n"
"Recent messages (oldest to newest):\n\n" "Recent messages (oldest to newest):\n"
"[IN] = incoming message TO the account owner (from someone else)\n"
"[OUT] = outgoing message FROM the account owner (their reply)\n\n"
f"{formatted_context}\n\n" f"{formatted_context}\n\n"
"Based on this email thread, does the account owner need to take action or reply? "
"Consider: Has the owner already responded to the latest inquiry? Is there an open question or request? "
"Respond with only JSON, no extra commentary." "Respond with only JSON, no extra commentary."
) )
+46 -19
View File
@@ -4,14 +4,14 @@ from typing import List, Optional
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from src.ai import analyze_thread from ai import analyze_thread
from src.database import ( from database import (
Message, Message,
Thread, Thread,
get_last_incoming_outgoing, get_last_incoming_outgoing,
get_thread_messages, get_thread_messages,
) )
from src.whatsapp_sender import WhatsAppSender from whatsapp_sender import WhatsAppSender
@dataclass @dataclass
@@ -68,18 +68,21 @@ def _format_alert(
def process_alerts(db: Session, cfg: dict) -> List[int]: def process_alerts(db: Session, cfg: dict) -> List[int]:
"""Analyze threads and send WhatsApp alerts when thresholds are met. """Analyze threads and send WhatsApp alerts when thresholds are met."""
import logging
Returns list of thread IDs that had alerts sent this run.
"""
account_email = (cfg.get("email_address") or cfg.get("zoho_email") or "").lower() account_email = (cfg.get("email_address") or cfg.get("zoho_email") or "").lower()
if not account_email: if not account_email:
logging.warning("No account_email configured")
return [] return []
frames = _load_frames_from_config(cfg) frames = _load_frames_from_config(cfg)
if not frames: if not frames:
logging.warning("No time frames configured")
return [] return []
logging.info(f"Processing alerts for {account_email} with frames: {frames}")
# Ensure thresholds unique by level # Ensure thresholds unique by level
level_to_hours = {f.alert_level: f.hours for f in frames} level_to_hours = {f.alert_level: f.hours for f in frames}
@@ -92,29 +95,34 @@ def process_alerts(db: Session, cfg: dict) -> List[int]:
.all() .all()
) )
logging.info(f"Found {len(threads)} threads requiring reply")
to_number = cfg.get("whatsapp_to") or None to_number = cfg.get("whatsapp_to") or None
logging.info(f"WhatsApp target: {to_number}")
sender = WhatsAppSender(to_number=to_number) sender = WhatsAppSender(to_number=to_number)
alerted = [] alerted = []
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
for t in threads: for t in threads:
logging.info(f"Checking thread {t.id}: {t.subject}")
# Get the last incoming message to calculate time threshold
last_in, last_out = get_last_incoming_outgoing(db, t.id) last_in, last_out = get_last_incoming_outgoing(db, t.id)
if not last_in: if not last_in:
logging.info(f" Thread {t.id}: No incoming messages, skipping")
continue continue
last_in_dt = _utc(last_in.date_sent) if last_in.date_sent else None last_in_dt = _utc(last_in.date_sent) if last_in.date_sent else None
last_out_dt = (
_utc(last_out.date_sent) if last_out and last_out.date_sent else None
)
# Only if last message is incoming or last_in is later than last_out
if last_out_dt and last_out_dt > last_in_dt:
print("Here in lies the problem")
# There's a reply from us after the last incoming; skip
continue
# Calculate time since the last incoming message (that needs owner's reply)
# Note: requires_reply=True means the latest message overall is incoming,
# so we trust the database flag instead of re-checking here
hours_since_last_in = ( hours_since_last_in = (
(now - last_in_dt).total_seconds() / 3600.0 if last_in_dt else 0 (now - last_in_dt).total_seconds() / 3600.0 if last_in_dt else 0
) )
logging.info(
f" Thread {t.id}: Hours since last incoming: {hours_since_last_in:.2f}"
)
# Determine highest frame crossed # Determine highest frame crossed
target_level = 0 target_level = 0
@@ -124,14 +132,23 @@ def process_alerts(db: Session, cfg: dict) -> List[int]:
target_level = level target_level = level
target_hours = hours target_hours = hours
logging.info(
f" Thread {t.id}: Target level: {target_level}, hours: {target_hours}"
)
if target_level == 0: if target_level == 0:
logging.info(f" Thread {t.id}: No threshold crossed, skipping")
continue continue
# Avoid re-sending same or lower level # Avoid re-sending same or lower level
if (t.last_alert_level_sent or 0) >= target_level: if (t.last_alert_level_sent or 0) >= target_level:
logging.info(
f" Thread {t.id}: Already alerted at level {t.last_alert_level_sent}, skipping"
)
continue continue
# Build AI input messages # Build AI input messages - get recent context (last 6 messages for better understanding)
all_messages = get_thread_messages(db, t.id)
msgs = [ msgs = [
{ {
"date_sent": m.date_sent.isoformat() if m.date_sent else None, "date_sent": m.date_sent.isoformat() if m.date_sent else None,
@@ -141,11 +158,16 @@ def process_alerts(db: Session, cfg: dict) -> List[int]:
"body": m.body, "body": m.body,
"is_incoming": m.is_incoming, "is_incoming": m.is_incoming,
} }
for m in get_thread_messages(db, t.id)[-4:] for m in all_messages[-6:] # Last 6 messages for better context
] ]
print(f"These are the messages: {msgs}") logging.info(
f" Thread {t.id}: Analyzing {len(msgs)} messages (total: {len(all_messages)})"
)
ai = analyze_thread(t.subject or "", msgs) ai = analyze_thread(t.subject or "", msgs)
logging.info(
f" Thread {t.id}: AI result - actionable: {ai.get('actionable')}, confidence: {ai.get('confidence')}"
)
# Persist AI decision on thread # Persist AI decision on thread
t.actionable = bool(ai.get("actionable", False)) t.actionable = bool(ai.get("actionable", False))
@@ -154,16 +176,21 @@ def process_alerts(db: Session, cfg: dict) -> List[int]:
t.last_analyzed_at = now t.last_analyzed_at = now
if not t.actionable: if not t.actionable:
# Don't alert on non-actionable logging.info(f" Thread {t.id}: Not actionable, skipping alert")
continue continue
# Compose and send WhatsApp # Compose and send WhatsApp
msg = _format_alert(target_level, target_hours, t, ai, last_in) msg = _format_alert(target_level, target_hours, t, ai, last_in)
logging.info(f" Thread {t.id}: Sending alert via WhatsApp")
res = sender.send_alert(msg, thread_id=str(t.id)) res = sender.send_alert(msg, thread_id=str(t.id))
logging.info(f" Thread {t.id}: WhatsApp response: {res}")
if res.get("status") == "success": if res.get("status") == "success":
t.last_alert_level_sent = target_level t.last_alert_level_sent = target_level
t.last_alert_sent_at = now t.last_alert_sent_at = now
alerted.append(t.id) alerted.append(t.id)
logging.info(f" Thread {t.id}: Alert sent successfully!")
db.commit() db.commit()
logging.info(f"Total alerts sent: {len(alerted)}")
return alerted return alerted
+33 -12
View File
@@ -12,9 +12,9 @@ from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from src.ai import analyze_thread from ai import analyze_thread
from src.alerts import process_alerts from alerts import process_alerts
from src.database import ( from database import (
Message, Message,
SessionLocal, SessionLocal,
Thread, Thread,
@@ -22,7 +22,7 @@ from src.database import (
get_thread_messages, get_thread_messages,
ingest_emails, ingest_emails,
) )
from src.zoho_client import ZohoClient from zoho_client import ZohoClient
load_dotenv() load_dotenv()
@@ -364,10 +364,12 @@ def send_to_all(message: str):
asyncio.run_coroutine_threadsafe(manager.broadcast(message), loop) asyncio.run_coroutine_threadsafe(manager.broadcast(message), loop)
def _sync_emails_once(cfg: dict) -> int: async def _sync_emails_once(cfg: dict) -> int:
"""Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count.""" """Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count."""
from datetime import datetime, timezone from datetime import datetime, timezone
from database import analyze_and_update_threads_async
account_email = cfg.get("email_address") or cfg.get("zoho_email") or "" account_email = cfg.get("email_address") or cfg.get("zoho_email") or ""
if not account_email: if not account_email:
raise ValueError("Configure email_address or zoho_email in /config") raise ValueError("Configure email_address or zoho_email in /config")
@@ -383,7 +385,7 @@ def _sync_emails_once(cfg: dict) -> int:
days_back = max(1, delta_days) days_back = max(1, delta_days)
except Exception: except Exception:
pass pass
max_results = 10 max_results = None
client = ZohoClient( client = ZohoClient(
email=cfg.get("zoho_email") or account_email, email=cfg.get("zoho_email") or account_email,
app_password=cfg.get("zoho_app_password"), app_password=cfg.get("zoho_app_password"),
@@ -414,12 +416,24 @@ def _sync_emails_once(cfg: dict) -> int:
send_to_all("Analysing Threads with AI") send_to_all("Analysing Threads with AI")
try: try:
# Ingest emails from both folders
ingest_emails( ingest_emails(
db, account_email=account_email, emails=inbox, default_folder="INBOX" db, account_email=account_email, emails=inbox, default_folder="INBOX"
) )
ingest_emails( needs_analysis_sent = ingest_emails(
db, account_email=account_email, emails=sent, default_folder="Sent" db, account_email=account_email, emails=sent, default_folder="Sent"
) )
# Run AI analysis if we ingested sent emails
if needs_analysis_sent:
await analyze_and_update_threads_async(
db=db,
account_email=account_email,
max_concurrent=3,
only_unanalyzed=True,
)
db.commit()
# Return count for UX/debug # Return count for UX/debug
count = ( count = (
db.query(Thread) db.query(Thread)
@@ -433,13 +447,13 @@ def _sync_emails_once(cfg: dict) -> int:
db.close() db.close()
def _sync_emails_background_task(): async def _sync_emails_background_task():
"""Background task to sync emails and update config status.""" """Background task to sync emails and update config status."""
from datetime import datetime, timezone from datetime import datetime, timezone
cfg = load_config() cfg = load_config()
try: try:
count = _sync_emails_once(cfg) count = await _sync_emails_once(cfg)
# Update success status # Update success status
cfg = load_config() # Reload in case it was modified cfg = load_config() # Reload in case it was modified
cfg.update( cfg.update(
@@ -532,8 +546,9 @@ _stop_event = asyncio.Event()
async def _auto_runner(): async def _auto_runner():
# Delay a bit on startup # Delay on startup to let the web interface load first
await asyncio.sleep(2) # This prevents blocking the initial page load with email sync
await asyncio.sleep(10) # 10 second delay before first auto-sync
while not _stop_event.is_set(): while not _stop_event.is_set():
cfg = load_config() cfg = load_config()
interval_min = int(cfg.get("auto_process_interval", 30) or 30) interval_min = int(cfg.get("auto_process_interval", 30) or 30)
@@ -550,7 +565,7 @@ async def _auto_runner():
save_config(cfg) save_config(cfg)
try: try:
count = _sync_emails_once(cfg) count = await _sync_emails_once(cfg)
# Update success status # Update success status
cfg = load_config() # Reload in case it was modified cfg = load_config() # Reload in case it was modified
cfg.update( cfg.update(
@@ -639,3 +654,9 @@ async def on_shutdown():
_stop_event.set() _stop_event.set()
with suppress(Exception): with suppress(Exception):
await _auto_task await _auto_task
if __name__ == "__main__":
import uvicorn
uvicorn.run(app="app:app", host="0.0.0.0", port=8010)
+7 -4
View File
@@ -350,6 +350,7 @@ def ingest_emails(
Expected fields per email dict: subject, from, date, snippet/body, messageId, optional inReplyTo, optional to. Expected fields per email dict: subject, from, date, snippet/body, messageId, optional inReplyTo, optional to.
""" """
from datetime import datetime from datetime import datetime
folder = default_folder folder = default_folder
for e in emails: for e in emails:
# Map common keys from ZohoClient output # Map common keys from ZohoClient output
@@ -393,10 +394,9 @@ def ingest_emails(
db.commit() db.commit()
if folder == "Sent": # Return whether analysis is needed (for async handling by caller)
analyze_and_update_threads( return folder == "Sent"
account_email=account_email, max_concurrent=3, only_unanalyzed=True
)
def get_latest_email_date( def get_latest_email_date(
db: Session, account_email: str, folder: str = None db: Session, account_email: str, folder: str = None
@@ -529,6 +529,9 @@ def analyze_and_update_threads(
""" """
Synchronous wrapper for the async thread analysis function. Synchronous wrapper for the async thread analysis function.
WARNING: This will fail if called from within an async event loop.
Use analyze_and_update_threads_async directly in async contexts.
Args: Args:
account_email: The email account to process threads for account_email: The email account to process threads for
thread_ids: Optional list of specific thread IDs to analyze. If None, analyzes all threads. thread_ids: Optional list of specific thread IDs to analyze. If None, analyzes all threads.
+48 -12
View File
@@ -1,32 +1,49 @@
import logging
import os import os
from typing import Any, Dict, List from typing import Any, Dict, List
from dotenv import load_dotenv
from twilio.base.exceptions import TwilioException from twilio.base.exceptions import TwilioException
from twilio.rest import Client from twilio.rest import Client
load_dotenv()
class WhatsAppSender: class WhatsAppSender:
def __init__(self, to_number: str | None = None): def __init__(self, to_number: str | None = None):
self.account_sid = os.getenv("TWILIO_ACCOUNT_SID") self.account_sid = (os.getenv("TWILIO_ACCOUNT_SID") or "").strip()
self.auth_token = os.getenv("TWILIO_AUTH_TOKEN") self.auth_token = (os.getenv("TWILIO_AUTH_TOKEN") or "").strip()
self.from_number = os.getenv("TWILIO_WHATSAPP_NUMBER") self.from_number = (os.getenv("TWILIO_WHATSAPP_NUMBER") or "").strip()
env_to = os.getenv("WHATSAPP_TO_NUMBER") env_to = (os.getenv("WHATSAPP_TO_NUMBER") or "").strip()
self.to_number = to_number or env_to # Individual phone number self.to_number = to_number or env_to # Individual phone number
# Log credential status (without exposing full credentials)
logging.info(
f"Twilio config: SID={'' if self.account_sid else ''} "
f"({self.account_sid[:8] + '...' if self.account_sid else 'missing'}), "
f"Token={'' if self.auth_token else ''}, "
f"From={'' if self.from_number else ''} ({self.from_number}), "
f"To={'' if self.to_number else ''} ({self.to_number})"
)
if self.account_sid and self.auth_token and self.from_number and self.to_number: if self.account_sid and self.auth_token and self.from_number and self.to_number:
try: try:
self.client = Client(self.account_sid, self.auth_token) self.client = Client(self.account_sid, self.auth_token)
self.use_mock = False self.use_mock = False
logging.info("Twilio client initialized successfully")
except Exception as e: except Exception as e:
print(f"Warning: Twilio client failed to initialize: {e}") logging.error(f"Twilio client failed to initialize: {e}")
self.use_mock = True self.use_mock = True
else: else:
self.use_mock = True self.use_mock = True
print( missing = []
"Note: Using mock WhatsApp sender (set TWILIO_* and WHATSAPP_TO_NUMBER or pass to_number)" if not self.account_sid:
missing.append("TWILIO_ACCOUNT_SID")
if not self.auth_token:
missing.append("TWILIO_AUTH_TOKEN")
if not self.from_number:
missing.append("TWILIO_WHATSAPP_NUMBER")
if not self.to_number:
missing.append("WHATSAPP_TO_NUMBER or to_number parameter")
logging.warning(
f"Using mock WhatsApp sender. Missing: {', '.join(missing)}"
) )
def send_alert(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]: def send_alert(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]:
@@ -53,8 +70,27 @@ class WhatsAppSender:
} }
except TwilioException as e: except TwilioException as e:
print(f"WhatsApp send error: {e}") error_msg = str(e)
return {"status": "error", "error": str(e), "thread_id": thread_id} logging.error(f"WhatsApp send error: {error_msg}")
# Provide specific guidance based on error
if "20003" in error_msg or "Authenticate" in error_msg:
logging.error(
"Authentication failed! Check your Twilio credentials:\n"
" 1. Verify TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN in .env file\n"
" 2. Make sure there are no extra spaces or quotes\n"
" 3. Confirm you're using the correct credentials (Live vs Test)\n"
" 4. Check https://console.twilio.com for correct values"
)
elif "21211" in error_msg:
logging.error(f"Invalid 'To' phone number: {self.to_number}")
elif "21408" in error_msg:
logging.error(
f"WhatsApp not enabled for number: {self.from_number}\n"
" Enable WhatsApp at https://console.twilio.com/us1/develop/sms/senders/whatsapp-senders"
)
return {"status": "error", "error": error_msg, "thread_id": thread_id}
def _mock_send(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]: def _mock_send(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]:
"""Mock WhatsApp sending for testing""" """Mock WhatsApp sending for testing"""
+9 -2
View File
@@ -9,6 +9,8 @@ from typing import Any, Dict, List
from dotenv import load_dotenv from dotenv import load_dotenv
from database import get_latest_email_date
load_dotenv() load_dotenv()
@@ -71,11 +73,11 @@ class ZohoClient:
# Determine the start date for fetching emails # Determine the start date for fetching emails
start_date = None start_date = None
first_time = True first_time = True
latest_date = None # Initialize to None to avoid UnboundLocalError
# If database session and account email are provided, try to get the latest email date # If database session and account email are provided, try to get the latest email date
if db_session and account_email: if db_session and account_email:
try: try:
from .database import get_latest_email_date
latest_date = get_latest_email_date( latest_date = get_latest_email_date(
db_session, account_email, folder db_session, account_email, folder
) )
@@ -93,12 +95,17 @@ class ZohoClient:
print( print(
f"⚠️ Error getting latest date from database: {e}, falling back to days_back" f"⚠️ Error getting latest date from database: {e}, falling back to days_back"
) )
latest_date = None # Ensure latest_date is None if error occurs
# Fall back to days_back if no database date available # Fall back to days_back if no database date available
if start_date is None: if start_date is None:
start_date = datetime.now() - timedelta(days=days_back) start_date = datetime.now() - timedelta(days=days_back)
print(f"📅 Using fallback date: {start_date}") print(f"📅 Using fallback date: {start_date}")
# Set latest_date for comparison if not already set
if latest_date is None:
latest_date = start_date
# Build search criteria - only emails from the calculated start date # Build search criteria - only emails from the calculated start date
search_date = start_date.strftime("%d-%b-%Y") search_date = start_date.strftime("%d-%b-%Y")
search_criteria = f"SINCE {search_date}" search_criteria = f"SINCE {search_date}"
+114
View File
@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
Test Twilio credentials and WhatsApp configuration
"""
import os
from dotenv import load_dotenv
from twilio.rest import Client
# Load environment variables
load_dotenv()
def test_credentials():
print("=" * 60)
print("TWILIO CREDENTIALS TEST")
print("=" * 60)
# Get credentials
account_sid = (os.getenv("TWILIO_ACCOUNT_SID") or "").strip()
auth_token = (os.getenv("TWILIO_AUTH_TOKEN") or "").strip()
from_number = (os.getenv("TWILIO_WHATSAPP_NUMBER") or "").strip()
to_number = (os.getenv("WHATSAPP_TO_NUMBER") or "").strip()
# Check if credentials exist
print("\n1. Checking environment variables:")
print(f" TWILIO_ACCOUNT_SID: {'✓ Found' if account_sid else '✗ Missing'}")
if account_sid:
print(f" Value: {account_sid[:8]}...{account_sid[-4:]}")
print(f" TWILIO_AUTH_TOKEN: {'✓ Found' if auth_token else '✗ Missing'}")
if auth_token:
print(f" Value: {'*' * len(auth_token[:4])}...{auth_token[-4:]}")
print(f" TWILIO_WHATSAPP_NUMBER: {'✓ Found' if from_number else '✗ Missing'}")
if from_number:
print(f" Value: {from_number}")
print(f" WHATSAPP_TO_NUMBER: {'✓ Found' if to_number else '✗ Missing'}")
if to_number:
print(f" Value: {to_number}")
# Test authentication
if not (account_sid and auth_token):
print("\n❌ Missing required credentials!")
print("\nPlease check your .env file and ensure:")
print(" - TWILIO_ACCOUNT_SID is set")
print(" - TWILIO_AUTH_TOKEN is set")
print(" - No extra spaces or quotes around values")
return False
print("\n2. Testing Twilio authentication...")
try:
client = Client(account_sid, auth_token)
# Try to fetch account info to verify auth
account = client.api.accounts(account_sid).fetch()
print(" ✓ Authentication successful!")
print(f" Account: {account.friendly_name}")
print(f" Status: {account.status}")
except Exception as e:
print(" ✗ Authentication failed!")
print(f" Error: {e}")
print("\n Troubleshooting:")
print(" 1. Go to https://console.twilio.com")
print(" 2. Navigate to Account > API keys & tokens")
print(" 3. Copy the Account SID and Auth Token")
print(" 4. Update your .env file with correct values")
print(" 5. Make sure there are NO quotes or spaces")
return False
# Test WhatsApp sender configuration
print("\n3. Testing WhatsApp sender configuration...")
if not from_number:
print(" ✗ TWILIO_WHATSAPP_NUMBER not set")
print(" Please set up WhatsApp sender at:")
print(" https://console.twilio.com/us1/develop/sms/senders/whatsapp-senders")
return False
if not from_number.startswith("+"):
print(f" ⚠ Warning: Phone number should start with '+': {from_number}")
print(f" ✓ WhatsApp sender configured: {from_number}")
# Test recipient
print("\n4. Testing recipient configuration...")
if not to_number:
print(" ✗ WHATSAPP_TO_NUMBER not set")
return False
if not to_number.startswith("+"):
print(f" ⚠ Warning: Phone number should start with '+': {to_number}")
print(f" ✓ Recipient configured: {to_number}")
print("\n" + "=" * 60)
print("✓ ALL CHECKS PASSED - Ready to send WhatsApp messages!")
print("=" * 60)
return True
if __name__ == "__main__":
success = test_credentials()
if success:
print("\nYou can now send test messages with:")
print(
" python -c 'from src.whatsapp_sender import WhatsAppSender; "
'w = WhatsAppSender(); print(w.send_alert("Test message"))\''
)
else:
print("\n❌ Please fix the issues above before sending WhatsApp messages")