Implement email alert system with WhatsApp notifications

- Added alerts processing logic in src/alerts.py to analyze threads and send WhatsApp alerts based on configured time frames.
- Created FastAPI application in src/app.py to manage threads, display configurations, and trigger alert processing.
- Developed database models and utility functions in src/database.py for managing threads and messages.
- Integrated Twilio API for sending WhatsApp messages in src/whatsapp_sender.py.
- Implemented Zoho email client in src/zoho_client.py to fetch emails and check for replies.
- Added configuration management for email settings and alert parameters.
- Established auto-processing loop for periodic email syncing and alert generation.
This commit is contained in:
bolade
2025-08-11 17:34:35 +01:00
parent ca7df9d598
commit d553d6f31e
13 changed files with 879 additions and 150 deletions
View File
+101
View File
@@ -0,0 +1,101 @@
import json
import os
from typing import Dict, List
from groq import Groq
def _format_messages_for_context(messages: List[dict]) -> str:
lines = []
for m in messages:
direction = "IN" if m.get("is_incoming", True) else "OUT"
date = m.get("date_sent")
subj = m.get("subject") or ""
from_email = m.get("from_email") or ""
to_email = m.get("to_email") or ""
body = (m.get("body") or "").strip()
if len(body) > 1000:
body = body[:1000] + "..."
lines.append(
f"[{date}] [{direction}] {from_email} -> {to_email}\nSubject: {subj}\n{body}"
)
return "\n\n---\n\n".join(lines)
def _heuristic_analyze(messages: List[dict]) -> Dict:
# Simple fallback if Groq isn't available
body_concat = "\n\n".join([(m.get("body") or "") for m in messages[-4:]])
question_like = "?" in body_concat or any(
kw in body_concat.lower()
for kw in ["could you", "can you", "please", "let me know", "need", "request"]
)
last_subj = (messages[-1].get("subject") or "") if messages else ""
return {
"actionable": bool(question_like),
"summary": (body_concat[:350] + "...")
if len(body_concat) > 350
else body_concat,
"subject": last_subj,
"confidence": 0.35,
"model": "heuristic",
}
def analyze_thread(
thread_subject: str, messages: List[dict], max_messages: int = 4
) -> Dict:
"""
Analyze a thread using Groq LLM. Returns dict with keys:
- actionable: bool
- summary: str
- subject: str
- confidence: float (0..1)
- model: str
Gracefully falls back to a heuristic when GROQ_API_KEY is missing or calls fail.
"""
msgs = messages[-max_messages:] if max_messages else messages
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
return _heuristic_analyze(msgs)
client = Groq(api_key=api_key)
system_prompt = (
"You are a helpful assistant that triages email threads and writes concise summaries. "
"Decide if the thread requires a reply from our side now, based on the last few messages. "
"Ignore newsletters/automations (e.g., from no-reply), and focus on whether there's a clear question or request. "
"Return a strict JSON object with keys: actionable (true/false), summary (<= 80 words), confidence (0..1)."
)
user_prompt = (
f"Thread subject: {thread_subject or ''}\n\n"
"Recent messages (oldest to newest):\n\n"
f"{_format_messages_for_context(msgs)}\n\n"
"Respond with only JSON, no extra commentary."
)
try:
completion = client.chat.completions.create(
model=os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile"),
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=0.2,
max_tokens=300,
)
content = completion.choices[0].message.content.strip()
# Attempt to extract JSON
data = json.loads(content)
data.setdefault("subject", thread_subject or "")
data.setdefault("model", os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile"))
# Basic validation
if not isinstance(data.get("actionable"), bool) or not isinstance(
data.get("summary"), str
):
raise ValueError("Invalid schema from model")
return data
except Exception:
# Fallback to heuristic
return _heuristic_analyze(msgs)
+166
View File
@@ -0,0 +1,166 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List, Optional
from sqlalchemy.orm import Session
from src.ai import analyze_thread
from src.database import (
Message,
Thread,
get_last_incoming_outgoing,
get_thread_messages,
)
from src.whatsapp_sender import WhatsAppSender
@dataclass
class TimeFrame:
name: str
hours: int
alert_level: int
def _utc(dt: datetime) -> datetime:
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _load_frames_from_config(cfg: dict) -> List[TimeFrame]:
frames = []
for f in cfg.get("time_frames", []):
try:
frames.append(
TimeFrame(
name=f.get("name", ""),
hours=int(f.get("hours", 0)),
alert_level=int(f.get("alert_level", 0)),
)
)
except Exception:
continue
# sort ascending by hours
frames.sort(key=lambda x: x.hours)
return frames
def _format_alert(
level: int, hours: int, thread: Thread, ai: dict, last_in: Optional[Message]
) -> str:
title = {
1: "LEVEL 1 ALERT (24 Hours)",
2: "LEVEL 2 ALERT (48 Hours - URGENT)",
3: "LEVEL 3 ALERT (72 Hours - CRITICAL)",
}.get(level, f"LEVEL {level} ALERT ({hours} Hours)")
sender = last_in.from_email if last_in else "unknown"
subject = thread.subject or "(no subject)"
summary = ai.get("summary") or thread.ai_summary or ""
conf_pct = int(round((ai.get("confidence") or thread.ai_confidence or 0) * 100))
return (
f"🚨 {title} 🚨\n\n"
f"Conversation with: {sender}\n"
f"Subject: {subject}\n"
f"Contextual Summary: {summary}\n\n"
f"Thread ID: {thread.id}\n"
f"Confidence: {conf_pct}%"
)
def process_alerts(db: Session, cfg: dict) -> List[int]:
"""Analyze threads and send WhatsApp alerts when thresholds are met.
Returns list of thread IDs that had alerts sent this run.
"""
account_email = (cfg.get("email_address") or cfg.get("zoho_email") or "").lower()
if not account_email:
return []
frames = _load_frames_from_config(cfg)
if not frames:
return []
# Ensure thresholds unique by level
level_to_hours = {f.alert_level: f.hours for f in frames}
# Find candidate threads: requires_reply True and account matches
threads: List[Thread] = (
db.query(Thread)
.filter(Thread.account_email == account_email, Thread.requires_reply.is_(True))
.order_by(Thread.updated_at.desc())
.limit(200)
.all()
)
to_number = cfg.get("whatsapp_to") or None
sender = WhatsAppSender(to_number=to_number)
alerted = []
now = datetime.now(timezone.utc)
for t in threads:
last_in, last_out = get_last_incoming_outgoing(db, t.id)
if not last_in:
continue
last_in_dt = _utc(last_in.date_sent) if last_in.date_sent else None
last_out_dt = (
_utc(last_out.date_sent) if last_out and last_out.date_sent else None
)
# Only if last message is incoming or last_in is later than last_out
if last_out_dt and last_out_dt > last_in_dt:
# There's a reply from us after the last incoming; skip
continue
hours_since_last_in = (
(now - last_in_dt).total_seconds() / 3600.0 if last_in_dt else 0
)
# Determine highest frame crossed
target_level = 0
target_hours = 0
for level, hours in sorted(level_to_hours.items(), key=lambda x: x[1]):
if hours_since_last_in >= hours:
target_level = level
target_hours = hours
if target_level == 0:
continue
# Avoid re-sending same or lower level
if (t.last_alert_level_sent or 0) >= target_level:
continue
# Build AI input messages
msgs = [
{
"date_sent": m.date_sent.isoformat() if m.date_sent else None,
"subject": m.subject,
"from_email": m.from_email,
"to_email": m.to_email,
"body": m.body,
"is_incoming": m.is_incoming,
}
for m in get_thread_messages(db, t.id)[-4:]
]
ai = analyze_thread(t.subject or "", msgs)
# Persist AI decision on thread
t.actionable = bool(ai.get("actionable", False))
t.ai_summary = ai.get("summary")
t.ai_confidence = ai.get("confidence")
t.last_analyzed_at = now
if not t.actionable:
# Don't alert on non-actionable
continue
# Compose and send WhatsApp
msg = _format_alert(target_level, target_hours, t, ai, last_in)
res = sender.send_alert(msg, thread_id=str(t.id))
if res.get("status") == "success":
t.last_alert_level_sent = target_level
t.last_alert_sent_at = now
alerted.append(t.id)
db.commit()
return alerted
+390
View File
@@ -0,0 +1,390 @@
import asyncio
import json
import os
from contextlib import suppress
from typing import List
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from src.ai import analyze_thread
from src.alerts import process_alerts
from src.database import (
Message,
SessionLocal,
Thread,
create_db_tables,
get_thread_messages,
ingest_emails,
)
from src.zoho_client import ZohoClient
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
create_db_tables()
app = FastAPI(title="Email Alerts UI")
# Static and templates
os.makedirs("templates", exist_ok=True)
os.makedirs("static", exist_ok=True)
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
def home(request: Request, db: Session = Depends(get_db), account: str | None = None):
q = db.query(Thread).order_by(Thread.updated_at.desc())
if account:
q = q.filter(Thread.account_email == account.lower())
threads = q.limit(100).all()
return templates.TemplateResponse(
"threads.html",
{
"request": request,
"threads": threads,
"account": account or "",
"status": _status_for_templates(),
},
)
@app.get("/thread/{thread_id}", response_class=HTMLResponse)
def show_thread(thread_id: int, request: Request, db: Session = Depends(get_db)):
thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none()
if not thread:
raise HTTPException(status_code=404, detail="Thread not found")
messages: List[Message] = get_thread_messages(db, thread_id)
# Convert for AI analyzer and template
msg_dicts = [
{
"id": m.id,
"date_sent": m.date_sent,
"subject": m.subject,
"from_email": m.from_email,
"to_email": m.to_email,
"body": m.body,
"is_incoming": m.is_incoming,
}
for m in messages
]
ai = analyze_thread(thread.subject or "", msg_dicts)
# Save AI info on the thread for listing and downstream alerts
try:
from datetime import datetime, timezone
thread.actionable = bool(ai.get("actionable", False))
thread.ai_summary = ai.get("summary")
thread.ai_confidence = ai.get("confidence")
thread.last_analyzed_at = datetime.now(timezone.utc)
db.commit()
except Exception:
pass
return templates.TemplateResponse(
"thread_detail.html",
{
"request": request,
"thread": thread,
"messages": messages,
"ai": ai,
"status": _status_for_templates(),
},
)
@app.get("/health")
def health():
return {"status": "ok"}
# ---------------------
# Config editor routes
# ---------------------
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "..", "config.json")
def load_config() -> dict:
if not os.path.exists(CONFIG_PATH):
return {
"email_address": "",
"time_frames": [
{"name": "1-24 hours", "hours": 24, "alert_level": 1},
{"name": "24-48 hours", "hours": 48, "alert_level": 2},
{"name": "48+ hours", "hours": 72, "alert_level": 3},
],
"email_days_back": 7,
"agency_domains": [],
"zoho_email": "",
"zoho_app_password": "",
"whatsapp_to": "",
"auto_process": False,
"auto_process_interval": 30,
# Sync status
"sync_in_progress": False,
"last_sync_at": None,
"last_sync_count": 0,
"last_sync_status": "idle",
"last_sync_error": None,
}
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def save_config(cfg: dict) -> None:
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def _status_for_templates() -> dict:
cfg = load_config()
return {
"auto_process": bool(cfg.get("auto_process")),
"interval": int(cfg.get("auto_process_interval", 30) or 30),
"sync_in_progress": bool(cfg.get("sync_in_progress")),
"last_sync_at": cfg.get("last_sync_at"),
"last_sync_status": cfg.get("last_sync_status", "idle"),
"last_sync_count": int(cfg.get("last_sync_count") or 0),
"last_sync_error": cfg.get("last_sync_error"),
}
@app.get("/config", response_class=HTMLResponse)
def config_form(request: Request, saved: int | None = None):
cfg = load_config()
# Render up to existing frames or at least 3 rows
frames = cfg.get("time_frames") or []
min_rows = max(len(frames), 3)
return templates.TemplateResponse(
"config.html",
{
"request": request,
"cfg": cfg,
"rows": list(range(min_rows)),
"saved": bool(saved),
"status": _status_for_templates(),
},
)
@app.post("/config")
async def config_save(request: Request):
form = await request.form()
cfg = load_config()
# Basic fields
cfg["email_address"] = (form.get("email_address") or "").strip()
# email_days_back
try:
cfg["email_days_back"] = int(
form.get("email_days_back") or cfg.get("email_days_back", 7)
)
except Exception:
cfg["email_days_back"] = 7
# agency domains (comma or newline separated)
domains_raw = (form.get("agency_domains") or "").replace("\r", "")
parts = [p.strip() for p in domains_raw.replace(",", "\n").split("\n") if p.strip()]
cfg["agency_domains"] = parts
# auto_process
cfg["auto_process"] = form.get("auto_process") == "on"
# auto_process_interval
try:
cfg["auto_process_interval"] = int(
form.get("auto_process_interval") or cfg.get("auto_process_interval", 30)
)
except Exception:
cfg["auto_process_interval"] = 30
# Zoho (optional - note: current client reads env vars)
cfg["zoho_email"] = (form.get("zoho_email") or cfg.get("zoho_email", "")).strip()
cfg["zoho_app_password"] = (
form.get("zoho_app_password") or cfg.get("zoho_app_password", "")
).strip()
# WhatsApp destination
cfg["whatsapp_to"] = (form.get("whatsapp_to") or cfg.get("whatsapp_to", "")).strip()
# Time frames: collect indexed rows
frames: list[dict] = []
# find indices present
indices = set()
for k in form.keys():
if k.startswith("time_name_"):
try:
indices.add(int(k.split("_")[-1]))
except Exception:
pass
for i in sorted(indices):
name = (form.get(f"time_name_{i}") or "").strip()
hrs_raw = form.get(f"time_hours_{i}") or ""
lvl_raw = form.get(f"time_alert_{i}") or ""
if not name and not hrs_raw and not lvl_raw:
continue
try:
hours = int(hrs_raw)
except Exception:
hours = 0
try:
level = int(lvl_raw)
except Exception:
level = 0
if name:
frames.append({"name": name, "hours": hours, "alert_level": level})
if frames:
cfg["time_frames"] = frames
save_config(cfg)
return RedirectResponse(url="/config?saved=1", status_code=303)
@app.post("/process")
def process(db: Session = Depends(get_db)):
cfg = load_config()
alerted = process_alerts(db, cfg)
return {"alerted_threads": alerted}
def _sync_emails_once(cfg: dict) -> int:
"""Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count."""
from datetime import datetime, timezone
# Mark status
cfg["sync_in_progress"] = True
cfg["last_sync_status"] = "running"
cfg["last_sync_error"] = None
save_config(cfg)
account_email = cfg.get("email_address") or cfg.get("zoho_email") or ""
if not account_email:
cfg.update(
{
"last_sync_status": "error",
"last_sync_error": "Configure email_address or zoho_email in /config",
"sync_in_progress": False,
}
)
save_config(cfg)
raise HTTPException(
status_code=400, detail="Configure email_address or zoho_email in /config"
)
# Incremental lookback by last_sync_at
days_back = int(cfg.get("email_days_back", 7) or 7)
last_sync_at = cfg.get("last_sync_at")
if last_sync_at:
try:
last_dt = datetime.fromisoformat(last_sync_at)
now = datetime.now(timezone.utc)
delta_days = int(((now - last_dt).total_seconds() + 86399) // 86400)
days_back = max(1, delta_days)
except Exception:
pass
max_results = 100
client = ZohoClient(
email=cfg.get("zoho_email") or account_email,
app_password=cfg.get("zoho_app_password"),
)
try:
inbox = client.fetch_folder_emails(
folder="INBOX", max_results=max_results, days_back=days_back
)
sent = client.fetch_folder_emails(
folder="Sent", max_results=max_results, days_back=days_back
)
finally:
client.close()
db = SessionLocal()
try:
ingest_emails(
db, account_email=account_email, emails=inbox, default_folder="INBOX"
)
ingest_emails(
db, account_email=account_email, emails=sent, default_folder="Sent"
)
# Return count for UX/debug
count = (
db.query(Thread)
.filter(Thread.account_email == account_email.lower())
.count()
)
return count
finally:
db.close()
@app.post("/sync_emails")
async def sync_emails():
cfg = load_config()
if cfg.get("sync_in_progress"):
return RedirectResponse(url="/?sync=busy", status_code=303)
async def _task():
try:
_sync_emails_once(load_config())
except Exception:
pass
cfg["sync_in_progress"] = True
cfg["last_sync_status"] = "running"
cfg["last_sync_error"] = None
save_config(cfg)
asyncio.create_task(_task())
return RedirectResponse(url="/?sync=started", status_code=303)
# ---------------------
# Auto-processing loop
# ---------------------
_auto_task = None
_stop_event = asyncio.Event()
async def _auto_runner():
# Delay a bit on startup
await asyncio.sleep(2)
while not _stop_event.is_set():
cfg = load_config()
interval_min = int(cfg.get("auto_process_interval", 30) or 30)
if cfg.get("auto_process"):
# Sync emails then run alerts
try:
if not cfg.get("sync_in_progress"):
_sync_emails_once(cfg)
except Exception:
# keep loop alive
pass
db = SessionLocal()
try:
process_alerts(db, cfg)
finally:
db.close()
try:
await asyncio.wait_for(
_stop_event.wait(), timeout=max(5, interval_min * 60)
)
except asyncio.TimeoutError:
continue
@app.on_event("startup")
async def on_startup():
global _auto_task
_stop_event.clear()
_auto_task = asyncio.create_task(_auto_runner())
@app.on_event("shutdown")
async def on_shutdown():
if _auto_task:
_stop_event.set()
with suppress(Exception):
await _auto_task
+392
View File
@@ -0,0 +1,392 @@
from email.utils import parseaddr
from typing import Annotated, Iterable, List, Optional, Tuple
from fastapi import Depends
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Index,
Integer,
String,
Text,
UniqueConstraint,
create_engine,
func,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, relationship, sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
Base = declarative_base()
def create_db_tables():
Base.metadata.create_all(bind=engine)
_migrate_add_missing_columns()
def _column_exists(table: str, column: str) -> bool:
try:
with engine.connect() as conn:
rows = conn.exec_driver_sql(f"PRAGMA table_info('{table}')").fetchall()
cols = {row[1] for row in rows}
return column in cols
except Exception:
return False
def _migrate_add_missing_columns() -> None:
"""Best-effort SQLite migrations for newly added columns."""
try:
with engine.begin() as conn:
if not _column_exists("threads", "actionable"):
conn.exec_driver_sql(
"ALTER TABLE threads ADD COLUMN actionable BOOLEAN NOT NULL DEFAULT 0"
)
if not _column_exists("threads", "ai_summary"):
conn.exec_driver_sql("ALTER TABLE threads ADD COLUMN ai_summary TEXT")
if not _column_exists("threads", "ai_confidence"):
conn.exec_driver_sql(
"ALTER TABLE threads ADD COLUMN ai_confidence FLOAT"
)
if not _column_exists("threads", "last_analyzed_at"):
conn.exec_driver_sql(
"ALTER TABLE threads ADD COLUMN last_analyzed_at DATETIME"
)
if not _column_exists("threads", "last_alert_level_sent"):
conn.exec_driver_sql(
"ALTER TABLE threads ADD COLUMN last_alert_level_sent INTEGER NOT NULL DEFAULT 0"
)
if not _column_exists("threads", "last_alert_sent_at"):
conn.exec_driver_sql(
"ALTER TABLE threads ADD COLUMN last_alert_sent_at DATETIME"
)
except Exception as e:
print(f"DB migration warning: {e}")
class Thread(Base):
__tablename__ = "threads"
id = Column(Integer, primary_key=True, index=True)
# The mailbox this thread belongs to (scopes data when analyzing multiple inboxes)
account_email = Column(String, nullable=False, index=True)
# A stable key for the thread, typically the root message-id (or a synthetic key)
thread_key = Column(String, nullable=False)
subject = Column(String, index=True)
requires_reply = Column(Boolean, nullable=False, default=False)
# AI/alert fields
actionable = Column(Boolean, nullable=False, default=False)
ai_summary = Column(Text)
ai_confidence = Column(Float)
last_analyzed_at = Column(DateTime)
last_alert_level_sent = Column(Integer, nullable=False, default=0)
last_alert_sent_at = Column(DateTime)
created_at = Column(DateTime, nullable=False, server_default=func.now())
updated_at = Column(
DateTime, nullable=False, server_default=func.now(), onupdate=func.now()
)
# Ensure uniqueness per account
__table_args__ = (
UniqueConstraint("account_email", "thread_key", name="uq_thread_account_key"),
Index("ix_threads_account_updated", "account_email", "updated_at"),
)
# ORM relationship
messages = relationship(
"Message",
back_populates="thread",
cascade="all, delete-orphan",
order_by="Message.date_sent",
)
class Message(Base):
__tablename__ = "messages"
id = Column(Integer, primary_key=True, index=True)
# Links to Thread
thread_id = Column(Integer, ForeignKey("threads.id"), nullable=False, index=True)
# RFC 5322 identifiers for threading
message_id = Column(String, nullable=False, unique=True, index=True)
in_reply_to = Column(String, index=True) # parent message-id if any
# Headers / metadata
subject = Column(String, index=True)
from_email = Column(String, index=True)
to_email = Column(String, index=True)
folder = Column(String, index=True) # e.g. INBOX, Sent
is_incoming = Column(Boolean, nullable=False, default=True)
date_sent = Column(DateTime, index=True)
body = Column(Text)
created_at = Column(DateTime, nullable=False, server_default=func.now())
updated_at = Column(
DateTime, nullable=False, server_default=func.now(), onupdate=func.now()
)
thread = relationship("Thread", back_populates="messages")
__table_args__ = (
Index("ix_messages_thread_date", "thread_id", "date_sent"),
Index("ix_messages_inreplyto", "in_reply_to"),
)
# ----------------------
# Utility / DAO functions
# ----------------------
def _normalize_email(addr: Optional[str]) -> str:
if not addr:
return ""
name, email_addr = parseaddr(addr)
return email_addr.lower()
def _is_incoming_message(account_email: str, from_email: str) -> bool:
account = (account_email or "").lower()
sender = _normalize_email(from_email)
# If sender is the account itself, it's outgoing; otherwise incoming
return sender != account and sender != ""
def find_or_create_thread(
db: Session,
*,
account_email: str,
subject: Optional[str],
message_id: str,
in_reply_to: Optional[str] = None,
) -> Thread:
"""
Resolves the thread for a message.
Strategy:
- If in_reply_to matches an existing message, reuse its thread and its thread_key.
- Else if a message with message_id already exists, reuse its thread.
- Else create a new thread using message_id as thread_key.
"""
account_email = (account_email or "").lower()
# 1) Try to find parent by in_reply_to
parent_msg: Optional[Message] = None
if in_reply_to:
parent_msg = (
db.query(Message)
.join(Thread, Message.thread_id == Thread.id)
.filter(
Message.message_id == in_reply_to,
Thread.account_email == account_email,
)
.one_or_none()
)
if parent_msg:
# Parent's thread
parent_thread = parent_msg.thread
return parent_thread
# 2) If message exists already, reuse its thread (idempotent ingest)
existing_msg = (
db.query(Message)
.join(Thread, Message.thread_id == Thread.id)
.filter(Message.message_id == message_id, Thread.account_email == account_email)
.one_or_none()
)
if existing_msg:
return existing_msg.thread
# 3) Create a new thread using message_id as the thread_key
thread = Thread(account_email=account_email, thread_key=message_id, subject=subject)
db.add(thread)
db.flush() # assign id
return thread
def upsert_message(
db: Session,
*,
account_email: str,
message_id: str,
subject: Optional[str],
from_email: Optional[str],
to_email: Optional[str],
date_sent,
body: Optional[str],
in_reply_to: Optional[str] = None,
folder: Optional[str] = None,
) -> Tuple[Message, Thread]:
"""Insert or update a message, linking it to the proper thread."""
thread = find_or_create_thread(
db,
account_email=account_email,
subject=subject,
message_id=message_id,
in_reply_to=in_reply_to,
)
msg = db.query(Message).filter_by(message_id=message_id).one_or_none()
if msg is None:
msg = Message(message_id=message_id, thread_id=thread.id)
db.add(msg)
msg.thread_id = thread.id
msg.in_reply_to = in_reply_to
msg.subject = subject
msg.from_email = _normalize_email(from_email)
msg.to_email = _normalize_email(to_email)
msg.date_sent = date_sent
msg.body = body
msg.folder = folder or "INBOX"
msg.is_incoming = _is_incoming_message(account_email, msg.from_email)
# Keep thread subject if missing; otherwise prefer the earliest subject
if not thread.subject and subject:
thread.subject = subject
# Update requires_reply flag after inserting/updating the message
update_thread_requires_reply(db, thread, account_email)
return msg, thread
def update_thread_requires_reply(
db: Session, thread: Thread, account_email: str
) -> None:
"""Set thread.requires_reply based on the latest message direction.
Rule: If the most recent message in the thread is incoming (from someone else),
then the thread requires a reply. Otherwise, it doesn't.
"""
latest: Optional[Message] = (
db.query(Message)
.filter(Message.thread_id == thread.id)
.order_by(Message.date_sent.desc(), Message.id.desc())
.first()
)
if latest is None:
thread.requires_reply = False
else:
thread.requires_reply = latest.is_incoming
# Touch updated_at
thread.updated_at = func.now()
db.flush()
def get_thread_messages(db: Session, thread_id: int) -> List[Message]:
return (
db.query(Message)
.filter(Message.thread_id == thread_id)
.order_by(Message.date_sent.asc(), Message.id.asc())
.all()
)
def get_threads_requiring_reply(db: Session, account_email: str) -> List[Thread]:
return (
db.query(Thread)
.filter(
Thread.account_email == account_email.lower(),
Thread.requires_reply.is_(True),
)
.order_by(Thread.updated_at.desc())
.all()
)
def get_last_incoming_outgoing(
db: Session, thread_id: int
) -> Tuple[Optional["Message"], Optional["Message"]]:
"""Return the last incoming and outgoing messages for a thread."""
last_incoming = (
db.query(Message)
.filter(Message.thread_id == thread_id, Message.is_incoming.is_(True))
.order_by(Message.date_sent.desc(), Message.id.desc())
.first()
)
last_outgoing = (
db.query(Message)
.filter(Message.thread_id == thread_id, Message.is_incoming.is_(False))
.order_by(Message.date_sent.desc(), Message.id.desc())
.first()
)
return last_incoming, last_outgoing
def ingest_emails(
db: Session,
account_email: str,
emails: Iterable[dict],
default_folder: str = "INBOX",
) -> None:
"""
Bulk-ingest emails fetched via ZohoClient.fetch_emails.
Expected fields per email dict: subject, from, date, snippet/body, messageId, optional inReplyTo, optional to.
"""
from datetime import datetime
for e in emails:
# Map common keys from ZohoClient output
message_id = e.get("messageId") or e.get("id")
if not message_id:
# Skip messages without identifiers
continue
subject = e.get("subject")
from_header = e.get("from") or e.get("from_email")
to_header = e.get("to") or e.get("to_email")
in_reply_to = e.get("inReplyTo") or e.get("in_reply_to")
folder = e.get("folder") or default_folder
body = e.get("body") or e.get("snippet")
# Parse date if it's a string
date_val = e.get("date") or e.get("date_sent")
if isinstance(date_val, str):
try:
# Try multiple formats; fall back to now on failure
from email.utils import parsedate_to_datetime
date_sent = parsedate_to_datetime(date_val)
except Exception:
date_sent = datetime.utcnow()
else:
date_sent = date_val
upsert_message(
db,
account_email=account_email,
message_id=message_id,
subject=subject,
from_email=from_header,
to_email=to_header,
date_sent=date_sent,
body=body,
in_reply_to=in_reply_to,
folder=folder,
)
db.commit()
+120
View File
@@ -0,0 +1,120 @@
import os
from typing import Any, Dict, List
from dotenv import load_dotenv
from twilio.base.exceptions import TwilioException
from twilio.rest import Client
load_dotenv()
class WhatsAppSender:
def __init__(self, to_number: str | None = None):
self.account_sid = os.getenv("TWILIO_ACCOUNT_SID")
self.auth_token = os.getenv("TWILIO_AUTH_TOKEN")
self.from_number = os.getenv("TWILIO_WHATSAPP_NUMBER")
env_to = os.getenv("WHATSAPP_TO_NUMBER")
self.to_number = to_number or env_to # Individual phone number
if self.account_sid and self.auth_token and self.from_number and self.to_number:
try:
self.client = Client(self.account_sid, self.auth_token)
self.use_mock = False
except Exception as e:
print(f"Warning: Twilio client failed to initialize: {e}")
self.use_mock = True
else:
self.use_mock = True
print(
"Note: Using mock WhatsApp sender (set TWILIO_* and WHATSAPP_TO_NUMBER or pass to_number)"
)
def send_alert(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]:
"""Send alert message to WhatsApp"""
if self.use_mock:
return self._mock_send(alert_message, thread_id)
try:
# Format message for WhatsApp
formatted_message = self._format_message(alert_message)
# Send to WhatsApp
message = self.client.messages.create(
from_=f"whatsapp:{self.from_number}",
body=formatted_message,
to=f"whatsapp:{self.to_number}",
)
return {
"status": "success",
"message_sid": message.sid,
"thread_id": thread_id,
"sent_at": message.date_created,
}
except TwilioException as e:
print(f"WhatsApp send error: {e}")
return {"status": "error", "error": str(e), "thread_id": thread_id}
def _mock_send(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]:
"""Mock WhatsApp sending for testing"""
print("📱 [MOCK] WhatsApp Alert Sent:")
print(f" To: {self.to_number or 'your_number'}")
print(f" Thread ID: {thread_id}")
print(f" Message: {alert_message[:100]}...")
return {
"status": "success",
"message_sid": "mock_sid_123",
"thread_id": thread_id,
"sent_at": "2024-01-15T10:00:00Z",
}
def _format_message(self, alert_message: str) -> str:
"""Format alert message for WhatsApp"""
# WhatsApp has character limits, so we might need to truncate
max_length = 1000
if len(alert_message) > max_length:
alert_message = alert_message[: max_length - 3] + "..."
return alert_message
def send_bulk_alerts(self, alerts: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Send multiple alerts to WhatsApp"""
results = []
for alert in alerts:
message = alert.get("message", "")
thread_id = alert.get("thread_id", "unknown")
result = self.send_alert(message, thread_id)
results.append(result)
# Add small delay between messages to avoid rate limits
import time
time.sleep(1)
return results
if __name__ == "__main__":
# Test WhatsApp sender
sender = WhatsAppSender()
test_message = """
🚨 LEVEL 1 ALERT (24 Hours)
🟢 Urgency: LOW
📧 Thread ID: test_thread_123
📝 Summary:
Client inquiry about project status. Requires follow-up.
🎯 Action Required:
Respond to client question
⏰ Confidence: 70.0%
""".strip()
result = sender.send_alert(test_message, "test_thread_123")
print(f"Send result: {result}")
+280
View File
@@ -0,0 +1,280 @@
import email
import imaplib
import os
from datetime import datetime, timedelta
from email.header import decode_header
from typing import Any, Dict, List
from dotenv import load_dotenv
load_dotenv()
class ZohoClient:
def __init__(self, email=None, app_password=None):
self.imap_server = "imap.zoho.com"
self.imap_port = 993
# Use provided credentials or fall back to environment variables
self.email = email or os.getenv("ZOHO_EMAIL", "")
self.app_password = app_password or os.getenv("ZOHO_PASSWORD", "")
if not self.email or not self.app_password:
raise ValueError("Zoho email and app password must be provided")
self.connection = None
self._connect()
def _connect(self):
"""Connect to Zoho IMAP server using app password"""
try:
self.connection = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
self.connection.login(self.email, self.app_password)
print(f"✅ Connected to Zoho IMAP server as {self.email}")
except Exception as e:
print(f"❌ Failed to connect to Zoho IMAP: {e}")
print("💡 Make sure IMAP is enabled in your Zoho Mail settings")
raise
def fetch_folder_emails(
self,
folder: str = "INBOX",
query: str = None,
max_results: int = None,
days_back: int = 7,
) -> List[Dict[str, Any]]:
"""Fetch emails from a given folder with date filtering"""
try:
# Select folder
mailbox = '"Sent"' if folder.lower() == "sent" else folder
print(f"📥 Selecting {folder}...[STEP 2]")
self.connection.select(mailbox)
# Build search criteria - only emails from specified days back
days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
search_criteria = f"SINCE {days_ago}"
if query:
search_criteria += f" {query}"
# Search for emails
status, message_numbers = self.connection.search(None, search_criteria)
if status != "OK":
print(f"❌ Search failed: {status}")
return []
email_list = message_numbers[0].split()
# Limit results if specified
if max_results is not None:
email_list = email_list[-max_results:] # Get the most recent emails
emails = []
for i, num in enumerate(email_list):
try:
print(f"📧 Fetching email {num.decode()}... [STEP 3] {i}")
# Fetch email data
status, data = self.connection.fetch(num, "(RFC822)")
if status == "OK":
raw_email = data[0][1]
email_message = email.message_from_bytes(raw_email)
# Extract headers
subject = self._decode_header(email_message.get("Subject", ""))
from_header = self._decode_header(email_message.get("From", ""))
to_header = self._decode_header(email_message.get("To", ""))
date_header = email_message.get("Date", "")
message_id = email_message.get("Message-ID", "")
in_reply_to = email_message.get("In-Reply-To", "")
# Generate thread ID (using Message-ID as fallback)
thread_id = message_id or f"thread_{num.decode()}"
# Get email body snippet
body = self._get_email_body(email_message)
snippet = body[:200] + "..." if len(body) > 200 else body
email_data = {
"id": num.decode(),
"threadId": thread_id,
"from": from_header,
"to": to_header,
"subject": subject,
"date": date_header,
"messageId": message_id,
"inReplyTo": in_reply_to,
"folder": folder,
"snippet": snippet,
}
emails.append(email_data)
except Exception as e:
print(f"❌ Error processing email {num}: {e}")
continue
print(
f"📧 Fetched {len(emails)} real emails from {folder} for last {days_back} days"
)
return emails
except Exception as e:
print(f"❌ Error fetching emails from {folder}: {e}")
return []
def fetch_emails(
self, query: str = None, max_results: int = None, days_back: int = 7
) -> List[Dict[str, Any]]:
"""Fetch emails from INBOX (backwards-compatible wrapper)."""
return self.fetch_folder_emails(
folder="INBOX", query=query, max_results=max_results, days_back=days_back
)
def _decode_header(self, header_value: str) -> str:
"""Decode email header values"""
if not header_value:
return ""
try:
decoded_parts = decode_header(header_value)
decoded_string = ""
for part, encoding in decoded_parts:
if isinstance(part, bytes):
if encoding:
decoded_string += part.decode(encoding)
else:
decoded_string += part.decode("utf-8", errors="ignore")
else:
decoded_string += str(part)
return decoded_string
except Exception:
return str(header_value)
def _get_email_body(self, email_message) -> str:
"""Extract email body text"""
body = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/plain":
try:
body += part.get_payload(decode=True).decode(
"utf-8", errors="ignore"
)
except Exception:
pass
else:
try:
body = email_message.get_payload(decode=True).decode(
"utf-8", errors="ignore"
)
except Exception:
pass
return body
def get_thread_messages(self, thread_id: str) -> List[Dict[str, Any]]:
"""Get all messages in a thread (simplified for IMAP)"""
# For IMAP, we'll return a single message since thread grouping is more complex
# This is a simplified implementation
return []
def check_sent_folder_for_replies(self, subject: str, days_back: int = 7) -> bool:
"""Check SENT folder to see if we've replied to emails with this subject"""
try:
# Select SENT folder
self.connection.select('"Sent"') # Zoho uses "Sent" folder
# Build search criteria for sent emails within the date range
days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
# Search for emails with similar subject (remove "Re:" prefixes for matching)
clean_subject = (
subject.replace("Re: ", "").replace("RE: ", "").replace("re: ", "")
)
search_criteria = f'SINCE {days_ago} SUBJECT "{clean_subject}"'
status, message_numbers = self.connection.search(None, search_criteria)
if status == "OK" and message_numbers[0]:
# Found sent emails with matching subject
return True
return False
except Exception as e:
print(f"❌ Error checking sent folder: {e}")
return False
finally:
# Always return to INBOX
try:
self.connection.select("INBOX")
except Exception:
pass
def check_message_reply_status(
self, message_id: str, subject: str, days_back: int = 7
) -> bool:
"""Check if a specific message has been replied to by checking SENT folder"""
try:
# First check by subject in SENT folder
if self.check_sent_folder_for_replies(subject, days_back):
return True
# Additional check: look for In-Reply-To headers matching our message ID
self.connection.select('"Sent"')
days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
# Search all sent emails in date range
status, message_numbers = self.connection.search(None, f"SINCE {days_ago}")
if status == "OK" and message_numbers[0]:
email_list = message_numbers[0].split()
for num in email_list[
-20:
]: # Check last 20 sent emails for performance
try:
status, data = self.connection.fetch(num, "(RFC822)")
if status == "OK":
raw_email = data[0][1]
email_message = email.message_from_bytes(raw_email)
# Check In-Reply-To header
in_reply_to = email_message.get("In-Reply-To", "")
if message_id and message_id in in_reply_to:
return True
except Exception:
continue
return False
except Exception as e:
print(f"❌ Error checking message reply status: {e}")
return False
finally:
# Always return to INBOX
try:
self.connection.select("INBOX")
except Exception:
pass
def close(self):
"""Close the IMAP connection"""
if self.connection:
try:
self.connection.close()
self.connection.logout()
except Exception as e:
print(f"Error closing connection: {e}")
if __name__ == "__main__":
client = ZohoClient()
emails = client.fetch_emails(max_results=10)
print(f"Fetched {len(emails)} emails")
client.close()