diff --git a/app.py b/app.py index cfc5dba..4ca02b1 100644 --- a/app.py +++ b/app.py @@ -1,21 +1,26 @@ +import asyncio import json import os +from contextlib import suppress from typing import List -from fastapi import Depends, FastAPI, HTTPException, Request +from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from ai import analyze_thread +from alerts import process_alerts from database import ( Message, SessionLocal, Thread, create_db_tables, get_thread_messages, + ingest_emails, ) +from zoho_client import ZohoClient def get_db(): @@ -48,6 +53,7 @@ def home(request: Request, db: Session = Depends(get_db), account: str | None = "request": request, "threads": threads, "account": account or "", + "status": _status_for_templates(), }, ) @@ -72,6 +78,17 @@ def show_thread(thread_id: int, request: Request, db: Session = Depends(get_db)) for m in messages ] ai = analyze_thread(thread.subject or "", msg_dicts) + # Save AI info on the thread for listing and downstream alerts + try: + from datetime import datetime, timezone + + thread.actionable = bool(ai.get("actionable", False)) + thread.ai_summary = ai.get("summary") + thread.ai_confidence = ai.get("confidence") + thread.last_analyzed_at = datetime.now(timezone.utc) + db.commit() + except Exception: + pass return templates.TemplateResponse( "thread_detail.html", { @@ -79,6 +96,7 @@ def show_thread(thread_id: int, request: Request, db: Session = Depends(get_db)) "thread": thread, "messages": messages, "ai": ai, + "status": _status_for_templates(), }, ) @@ -108,8 +126,15 @@ def load_config() -> dict: "agency_domains": [], "zoho_email": "", "zoho_app_password": "", + "whatsapp_to": "", "auto_process": False, "auto_process_interval": 30, + # Sync status + "sync_in_progress": False, + "last_sync_at": None, + "last_sync_count": 0, + "last_sync_status": "idle", + "last_sync_error": None, } with open(CONFIG_PATH, "r", encoding="utf-8") as f: return json.load(f) @@ -120,6 +145,19 @@ def save_config(cfg: dict) -> None: json.dump(cfg, f, indent=2, ensure_ascii=False) +def _status_for_templates() -> dict: + cfg = load_config() + return { + "auto_process": bool(cfg.get("auto_process")), + "interval": int(cfg.get("auto_process_interval", 30) or 30), + "sync_in_progress": bool(cfg.get("sync_in_progress")), + "last_sync_at": cfg.get("last_sync_at"), + "last_sync_status": cfg.get("last_sync_status", "idle"), + "last_sync_count": int(cfg.get("last_sync_count") or 0), + "last_sync_error": cfg.get("last_sync_error"), + } + + @app.get("/config", response_class=HTMLResponse) def config_form(request: Request, saved: int | None = None): cfg = load_config() @@ -133,6 +171,7 @@ def config_form(request: Request, saved: int | None = None): "cfg": cfg, "rows": list(range(min_rows)), "saved": bool(saved), + "status": _status_for_templates(), }, ) @@ -170,6 +209,8 @@ async def config_save(request: Request): cfg["zoho_app_password"] = ( form.get("zoho_app_password") or cfg.get("zoho_app_password", "") ).strip() + # WhatsApp destination + cfg["whatsapp_to"] = (form.get("whatsapp_to") or cfg.get("whatsapp_to", "")).strip() # Time frames: collect indexed rows frames: list[dict] = [] @@ -202,3 +243,169 @@ async def config_save(request: Request): save_config(cfg) return RedirectResponse(url="/config?saved=1", status_code=303) + + +@app.post("/process") +def process(db: Session = Depends(get_db)): + cfg = load_config() + alerted = process_alerts(db, cfg) + return {"alerted_threads": alerted} + + +def _sync_emails_once(cfg: dict) -> int: + """Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count.""" + from datetime import datetime, timezone + + # Mark status + cfg["sync_in_progress"] = True + cfg["last_sync_status"] = "running" + cfg["last_sync_error"] = None + save_config(cfg) + + account_email = cfg.get("email_address") or cfg.get("zoho_email") or "" + if not account_email: + cfg.update( + { + "last_sync_status": "error", + "last_sync_error": "Configure email_address or zoho_email in /config", + "sync_in_progress": False, + } + ) + save_config(cfg) + raise HTTPException( + status_code=400, detail="Configure email_address or zoho_email in /config" + ) + + # Incremental lookback by last_sync_at + days_back = int(cfg.get("email_days_back", 7) or 7) + last_sync_at = cfg.get("last_sync_at") + if last_sync_at: + try: + last_dt = datetime.fromisoformat(last_sync_at) + now = datetime.now(timezone.utc) + delta_days = int(((now - last_dt).total_seconds() + 86399) // 86400) + days_back = max(1, delta_days) + except Exception: + pass + max_results = 100 + client = ZohoClient( + email=cfg.get("zoho_email") or account_email, + app_password=cfg.get("zoho_app_password"), + ) + try: + inbox = client.fetch_folder_emails( + folder="INBOX", max_results=max_results, days_back=days_back + ) + sent = client.fetch_folder_emails( + folder="Sent", max_results=max_results, days_back=days_back + ) + finally: + client.close() + + db = SessionLocal() + try: + ingest_emails( + db, account_email=account_email, emails=inbox, default_folder="INBOX" + ) + ingest_emails( + db, account_email=account_email, emails=sent, default_folder="Sent" + ) + # Update sync completion status and metrics + cfg.update( + { + "last_sync_at": datetime.now(timezone.utc).isoformat(), + "last_sync_status": "ok", + "last_sync_error": None, + "last_sync_count": int( + (len(inbox) if inbox else 0) + (len(sent) if sent else 0) + ), + "sync_in_progress": False, + } + ) + save_config(cfg) + return count + except Exception as e: + # Update error status + cfg.update( + { + "last_sync_status": "error", + "last_sync_error": str(e), + "sync_in_progress": False, + } + ) + save_config(cfg) + raise + finally: + db.close() + + +@app.post("/sync_emails") +def sync_emails(background_tasks: BackgroundTasks): + cfg = load_config() + if cfg.get("sync_in_progress"): + return RedirectResponse(url="/?sync=busy", status_code=303) + + def _background_sync(): + try: + _sync_emails_once(load_config()) + except Exception: + # Error handling is done inside _sync_emails_once + pass + + # Mark as starting and add background task + cfg["sync_in_progress"] = True + cfg["last_sync_status"] = "running" + cfg["last_sync_error"] = None + save_config(cfg) + + background_tasks.add_task(_background_sync) + return RedirectResponse(url="/?sync=started", status_code=303) + + +# --------------------- +# Auto-processing loop +# --------------------- +_auto_task = None +_stop_event = asyncio.Event() + + +async def _auto_runner(): + # Delay a bit on startup + await asyncio.sleep(2) + while not _stop_event.is_set(): + cfg = load_config() + interval_min = int(cfg.get("auto_process_interval", 30) or 30) + if cfg.get("auto_process"): + # Sync emails then run alerts + try: + if not cfg.get("sync_in_progress"): + _sync_emails_once(cfg) + except Exception: + # keep loop alive + pass + db = SessionLocal() + try: + process_alerts(db, cfg) + finally: + db.close() + try: + await asyncio.wait_for( + _stop_event.wait(), timeout=max(5, interval_min * 60) + ) + except asyncio.TimeoutError: + continue + + +@app.on_event("startup") +async def on_startup(): + global _auto_task + _stop_event.clear() + _auto_task = asyncio.create_task(_auto_runner()) + + +@app.on_event("shutdown") +async def on_shutdown(): + if _auto_task: + _stop_event.set() + with suppress(Exception): + await _auto_task diff --git a/main.py b/main.py deleted file mode 100644 index e08db99..0000000 --- a/main.py +++ /dev/null @@ -1,140 +0,0 @@ -from typing import List, Optional - -from database import ( - Message, - SessionLocal, - Thread, - create_db_tables, - get_thread_messages, - get_threads_requiring_reply, - ingest_emails, -) -from zoho_client import ZohoClient - - -def ingest_action( - account_email: str, days_back: int = 7, max_results: int = 50 -) -> None: - create_db_tables() - client = ZohoClient(email=account_email) - inbox = client.fetch_folder_emails( - folder="INBOX", max_results=max_results, days_back=days_back - ) - sent = client.fetch_folder_emails( - folder="Sent", max_results=max_results, days_back=days_back - ) - client.close() - - db = SessionLocal() - try: - ingest_emails( - db, account_email=account_email, emails=inbox, default_folder="INBOX" - ) - ingest_emails( - db, account_email=account_email, emails=sent, default_folder="Sent" - ) - threads: List[Thread] = get_threads_requiring_reply(db, account_email) - print(f"Threads requiring reply for {account_email}: {len(threads)}") - for t in threads: - print( - f"- Thread #{t.id} | Subject: {t.subject!r} | requires_reply={t.requires_reply}" - ) - finally: - db.close() - - -def list_threads_action( - account_email: Optional[str] = None, limit: int = 20, only_requiring: bool = False -) -> None: - create_db_tables() - db = SessionLocal() - try: - q = db.query(Thread).order_by(Thread.updated_at.desc()) - if account_email: - q = q.filter(Thread.account_email == account_email.lower()) - if only_requiring: - q = q.filter(Thread.requires_reply.is_(True)) - threads = q.limit(limit).all() - print( - f"Showing {len(threads)} threads" - + (f" for {account_email}" if account_email else "") - ) - for t in threads: - count = db.query(Message).filter(Message.thread_id == t.id).count() - print( - f"- id={t.id} msgs={count} requires_reply={t.requires_reply} subject={t.subject!r}" - ) - finally: - db.close() - - -def show_thread_action(thread_id: int) -> None: - create_db_tables() - db = SessionLocal() - try: - thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none() - if not thread: - print(f"Thread {thread_id} not found") - return - - print( - f"Thread #{thread.id} subject={thread.subject!r} account={thread.account_email} requires_reply={thread.requires_reply}" - ) - messages: List[Message] = get_thread_messages(db, thread.id) - for i, m in enumerate(messages, 1): - direction = "IN" if m.is_incoming else "OUT" - snippet = (m.body or "").strip().replace("\n", " ") - if len(snippet) > 140: - snippet = snippet[:140] + "..." - print( - f"[{i}] {m.date_sent} [{direction}] {m.folder} | from={m.from_email} -> to={m.to_email}\n" - f" subject={m.subject!r}\n" - f" message_id={m.message_id} in_reply_to={m.in_reply_to}\n" - f" body={snippet}" - ) - finally: - db.close() - - -if __name__ == "__main__": - import argparse - import os - - parser = argparse.ArgumentParser(description="Email alerts utility") - sub = parser.add_subparsers(dest="cmd", required=False) - - p_ingest = sub.add_parser("ingest", help="Fetch INBOX and Sent and ingest into DB") - p_ingest.add_argument( - "--account", dest="account", default=os.getenv("ZOHO_EMAIL", "") - ) - p_ingest.add_argument("--days-back", dest="days_back", type=int, default=7) - p_ingest.add_argument("--max-results", dest="max_results", type=int, default=50) - - p_list = sub.add_parser("list-threads", help="List threads") - p_list.add_argument("--account", dest="account", default=None) - p_list.add_argument("--limit", dest="limit", type=int, default=20) - p_list.add_argument("--only-requiring", dest="only_req", action="store_true") - - p_show = sub.add_parser("show-thread", help="Print all messages in a thread") - p_show.add_argument("thread_id", type=int) - - args = parser.parse_args() - - if args.cmd == "ingest": - acct = args.account or os.getenv("ZOHO_EMAIL", "") - if not acct: - raise SystemExit("Provide --account or set ZOHO_EMAIL") - ingest_action(acct, days_back=args.days_back, max_results=args.max_results) - elif args.cmd == "list-threads": - list_threads_action( - account_email=args.account, limit=args.limit, only_requiring=args.only_req - ) - elif args.cmd == "show-thread": - show_thread_action(args.thread_id) - else: - # Default behavior: run ingest using env and then list requiring-reply threads - acct = os.getenv("ZOHO_EMAIL", "") - if not acct: - parser.print_help() - raise SystemExit(0) - ingest_action(acct) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ai.py b/src/ai.py similarity index 100% rename from ai.py rename to src/ai.py diff --git a/src/alerts.py b/src/alerts.py new file mode 100644 index 0000000..c0bcbd9 --- /dev/null +++ b/src/alerts.py @@ -0,0 +1,166 @@ +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import List, Optional + +from sqlalchemy.orm import Session + +from src.ai import analyze_thread +from src.database import ( + Message, + Thread, + get_last_incoming_outgoing, + get_thread_messages, +) +from src.whatsapp_sender import WhatsAppSender + + +@dataclass +class TimeFrame: + name: str + hours: int + alert_level: int + + +def _utc(dt: datetime) -> datetime: + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def _load_frames_from_config(cfg: dict) -> List[TimeFrame]: + frames = [] + for f in cfg.get("time_frames", []): + try: + frames.append( + TimeFrame( + name=f.get("name", ""), + hours=int(f.get("hours", 0)), + alert_level=int(f.get("alert_level", 0)), + ) + ) + except Exception: + continue + # sort ascending by hours + frames.sort(key=lambda x: x.hours) + return frames + + +def _format_alert( + level: int, hours: int, thread: Thread, ai: dict, last_in: Optional[Message] +) -> str: + title = { + 1: "LEVEL 1 ALERT (24 Hours)", + 2: "LEVEL 2 ALERT (48 Hours - URGENT)", + 3: "LEVEL 3 ALERT (72 Hours - CRITICAL)", + }.get(level, f"LEVEL {level} ALERT ({hours} Hours)") + sender = last_in.from_email if last_in else "unknown" + subject = thread.subject or "(no subject)" + summary = ai.get("summary") or thread.ai_summary or "" + conf_pct = int(round((ai.get("confidence") or thread.ai_confidence or 0) * 100)) + return ( + f"🚨 {title} 🚨\n\n" + f"Conversation with: {sender}\n" + f"Subject: {subject}\n" + f"Contextual Summary: {summary}\n\n" + f"Thread ID: {thread.id}\n" + f"Confidence: {conf_pct}%" + ) + + +def process_alerts(db: Session, cfg: dict) -> List[int]: + """Analyze threads and send WhatsApp alerts when thresholds are met. + + Returns list of thread IDs that had alerts sent this run. + """ + account_email = (cfg.get("email_address") or cfg.get("zoho_email") or "").lower() + if not account_email: + return [] + + frames = _load_frames_from_config(cfg) + if not frames: + return [] + + # Ensure thresholds unique by level + level_to_hours = {f.alert_level: f.hours for f in frames} + + # Find candidate threads: requires_reply True and account matches + threads: List[Thread] = ( + db.query(Thread) + .filter(Thread.account_email == account_email, Thread.requires_reply.is_(True)) + .order_by(Thread.updated_at.desc()) + .limit(200) + .all() + ) + + to_number = cfg.get("whatsapp_to") or None + sender = WhatsAppSender(to_number=to_number) + alerted = [] + now = datetime.now(timezone.utc) + + for t in threads: + last_in, last_out = get_last_incoming_outgoing(db, t.id) + if not last_in: + continue + last_in_dt = _utc(last_in.date_sent) if last_in.date_sent else None + last_out_dt = ( + _utc(last_out.date_sent) if last_out and last_out.date_sent else None + ) + + # Only if last message is incoming or last_in is later than last_out + if last_out_dt and last_out_dt > last_in_dt: + # There's a reply from us after the last incoming; skip + continue + + hours_since_last_in = ( + (now - last_in_dt).total_seconds() / 3600.0 if last_in_dt else 0 + ) + + # Determine highest frame crossed + target_level = 0 + target_hours = 0 + for level, hours in sorted(level_to_hours.items(), key=lambda x: x[1]): + if hours_since_last_in >= hours: + target_level = level + target_hours = hours + + if target_level == 0: + continue + + # Avoid re-sending same or lower level + if (t.last_alert_level_sent or 0) >= target_level: + continue + + # Build AI input messages + msgs = [ + { + "date_sent": m.date_sent.isoformat() if m.date_sent else None, + "subject": m.subject, + "from_email": m.from_email, + "to_email": m.to_email, + "body": m.body, + "is_incoming": m.is_incoming, + } + for m in get_thread_messages(db, t.id)[-4:] + ] + ai = analyze_thread(t.subject or "", msgs) + + # Persist AI decision on thread + t.actionable = bool(ai.get("actionable", False)) + t.ai_summary = ai.get("summary") + t.ai_confidence = ai.get("confidence") + t.last_analyzed_at = now + + if not t.actionable: + # Don't alert on non-actionable + continue + + # Compose and send WhatsApp + msg = _format_alert(target_level, target_hours, t, ai, last_in) + res = sender.send_alert(msg, thread_id=str(t.id)) + if res.get("status") == "success": + t.last_alert_level_sent = target_level + t.last_alert_sent_at = now + alerted.append(t.id) + + db.commit() + return alerted diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..7db9685 --- /dev/null +++ b/src/app.py @@ -0,0 +1,390 @@ +import asyncio +import json +import os +from contextlib import suppress +from typing import List + +from fastapi import Depends, FastAPI, HTTPException, Request +from fastapi.responses import HTMLResponse, RedirectResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session + +from src.ai import analyze_thread +from src.alerts import process_alerts +from src.database import ( + Message, + SessionLocal, + Thread, + create_db_tables, + get_thread_messages, + ingest_emails, +) +from src.zoho_client import ZohoClient + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + + +create_db_tables() +app = FastAPI(title="Email Alerts UI") + +# Static and templates +os.makedirs("templates", exist_ok=True) +os.makedirs("static", exist_ok=True) +templates = Jinja2Templates(directory="templates") +app.mount("/static", StaticFiles(directory="static"), name="static") + + +@app.get("/", response_class=HTMLResponse) +def home(request: Request, db: Session = Depends(get_db), account: str | None = None): + q = db.query(Thread).order_by(Thread.updated_at.desc()) + if account: + q = q.filter(Thread.account_email == account.lower()) + threads = q.limit(100).all() + return templates.TemplateResponse( + "threads.html", + { + "request": request, + "threads": threads, + "account": account or "", + "status": _status_for_templates(), + }, + ) + + +@app.get("/thread/{thread_id}", response_class=HTMLResponse) +def show_thread(thread_id: int, request: Request, db: Session = Depends(get_db)): + thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none() + if not thread: + raise HTTPException(status_code=404, detail="Thread not found") + messages: List[Message] = get_thread_messages(db, thread_id) + # Convert for AI analyzer and template + msg_dicts = [ + { + "id": m.id, + "date_sent": m.date_sent, + "subject": m.subject, + "from_email": m.from_email, + "to_email": m.to_email, + "body": m.body, + "is_incoming": m.is_incoming, + } + for m in messages + ] + ai = analyze_thread(thread.subject or "", msg_dicts) + # Save AI info on the thread for listing and downstream alerts + try: + from datetime import datetime, timezone + + thread.actionable = bool(ai.get("actionable", False)) + thread.ai_summary = ai.get("summary") + thread.ai_confidence = ai.get("confidence") + thread.last_analyzed_at = datetime.now(timezone.utc) + db.commit() + except Exception: + pass + return templates.TemplateResponse( + "thread_detail.html", + { + "request": request, + "thread": thread, + "messages": messages, + "ai": ai, + "status": _status_for_templates(), + }, + ) + + +@app.get("/health") +def health(): + return {"status": "ok"} + + +# --------------------- +# Config editor routes +# --------------------- + +CONFIG_PATH = os.path.join(os.path.dirname(__file__), "..", "config.json") + + +def load_config() -> dict: + if not os.path.exists(CONFIG_PATH): + return { + "email_address": "", + "time_frames": [ + {"name": "1-24 hours", "hours": 24, "alert_level": 1}, + {"name": "24-48 hours", "hours": 48, "alert_level": 2}, + {"name": "48+ hours", "hours": 72, "alert_level": 3}, + ], + "email_days_back": 7, + "agency_domains": [], + "zoho_email": "", + "zoho_app_password": "", + "whatsapp_to": "", + "auto_process": False, + "auto_process_interval": 30, + # Sync status + "sync_in_progress": False, + "last_sync_at": None, + "last_sync_count": 0, + "last_sync_status": "idle", + "last_sync_error": None, + } + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + return json.load(f) + + +def save_config(cfg: dict) -> None: + with open(CONFIG_PATH, "w", encoding="utf-8") as f: + json.dump(cfg, f, indent=2, ensure_ascii=False) + + +def _status_for_templates() -> dict: + cfg = load_config() + return { + "auto_process": bool(cfg.get("auto_process")), + "interval": int(cfg.get("auto_process_interval", 30) or 30), + "sync_in_progress": bool(cfg.get("sync_in_progress")), + "last_sync_at": cfg.get("last_sync_at"), + "last_sync_status": cfg.get("last_sync_status", "idle"), + "last_sync_count": int(cfg.get("last_sync_count") or 0), + "last_sync_error": cfg.get("last_sync_error"), + } + + +@app.get("/config", response_class=HTMLResponse) +def config_form(request: Request, saved: int | None = None): + cfg = load_config() + # Render up to existing frames or at least 3 rows + frames = cfg.get("time_frames") or [] + min_rows = max(len(frames), 3) + return templates.TemplateResponse( + "config.html", + { + "request": request, + "cfg": cfg, + "rows": list(range(min_rows)), + "saved": bool(saved), + "status": _status_for_templates(), + }, + ) + + +@app.post("/config") +async def config_save(request: Request): + form = await request.form() + cfg = load_config() + + # Basic fields + cfg["email_address"] = (form.get("email_address") or "").strip() + # email_days_back + try: + cfg["email_days_back"] = int( + form.get("email_days_back") or cfg.get("email_days_back", 7) + ) + except Exception: + cfg["email_days_back"] = 7 + # agency domains (comma or newline separated) + domains_raw = (form.get("agency_domains") or "").replace("\r", "") + parts = [p.strip() for p in domains_raw.replace(",", "\n").split("\n") if p.strip()] + cfg["agency_domains"] = parts + # auto_process + cfg["auto_process"] = form.get("auto_process") == "on" + # auto_process_interval + try: + cfg["auto_process_interval"] = int( + form.get("auto_process_interval") or cfg.get("auto_process_interval", 30) + ) + except Exception: + cfg["auto_process_interval"] = 30 + + # Zoho (optional - note: current client reads env vars) + cfg["zoho_email"] = (form.get("zoho_email") or cfg.get("zoho_email", "")).strip() + cfg["zoho_app_password"] = ( + form.get("zoho_app_password") or cfg.get("zoho_app_password", "") + ).strip() + # WhatsApp destination + cfg["whatsapp_to"] = (form.get("whatsapp_to") or cfg.get("whatsapp_to", "")).strip() + + # Time frames: collect indexed rows + frames: list[dict] = [] + # find indices present + indices = set() + for k in form.keys(): + if k.startswith("time_name_"): + try: + indices.add(int(k.split("_")[-1])) + except Exception: + pass + for i in sorted(indices): + name = (form.get(f"time_name_{i}") or "").strip() + hrs_raw = form.get(f"time_hours_{i}") or "" + lvl_raw = form.get(f"time_alert_{i}") or "" + if not name and not hrs_raw and not lvl_raw: + continue + try: + hours = int(hrs_raw) + except Exception: + hours = 0 + try: + level = int(lvl_raw) + except Exception: + level = 0 + if name: + frames.append({"name": name, "hours": hours, "alert_level": level}) + if frames: + cfg["time_frames"] = frames + + save_config(cfg) + return RedirectResponse(url="/config?saved=1", status_code=303) + + +@app.post("/process") +def process(db: Session = Depends(get_db)): + cfg = load_config() + alerted = process_alerts(db, cfg) + return {"alerted_threads": alerted} + + +def _sync_emails_once(cfg: dict) -> int: + """Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count.""" + from datetime import datetime, timezone + + # Mark status + cfg["sync_in_progress"] = True + cfg["last_sync_status"] = "running" + cfg["last_sync_error"] = None + save_config(cfg) + + account_email = cfg.get("email_address") or cfg.get("zoho_email") or "" + if not account_email: + cfg.update( + { + "last_sync_status": "error", + "last_sync_error": "Configure email_address or zoho_email in /config", + "sync_in_progress": False, + } + ) + save_config(cfg) + raise HTTPException( + status_code=400, detail="Configure email_address or zoho_email in /config" + ) + + # Incremental lookback by last_sync_at + days_back = int(cfg.get("email_days_back", 7) or 7) + last_sync_at = cfg.get("last_sync_at") + if last_sync_at: + try: + last_dt = datetime.fromisoformat(last_sync_at) + now = datetime.now(timezone.utc) + delta_days = int(((now - last_dt).total_seconds() + 86399) // 86400) + days_back = max(1, delta_days) + except Exception: + pass + max_results = 100 + client = ZohoClient( + email=cfg.get("zoho_email") or account_email, + app_password=cfg.get("zoho_app_password"), + ) + try: + inbox = client.fetch_folder_emails( + folder="INBOX", max_results=max_results, days_back=days_back + ) + sent = client.fetch_folder_emails( + folder="Sent", max_results=max_results, days_back=days_back + ) + finally: + client.close() + + db = SessionLocal() + try: + ingest_emails( + db, account_email=account_email, emails=inbox, default_folder="INBOX" + ) + ingest_emails( + db, account_email=account_email, emails=sent, default_folder="Sent" + ) + # Return count for UX/debug + count = ( + db.query(Thread) + .filter(Thread.account_email == account_email.lower()) + .count() + ) + return count + finally: + db.close() + + +@app.post("/sync_emails") +async def sync_emails(): + cfg = load_config() + if cfg.get("sync_in_progress"): + return RedirectResponse(url="/?sync=busy", status_code=303) + + async def _task(): + try: + _sync_emails_once(load_config()) + except Exception: + pass + + cfg["sync_in_progress"] = True + cfg["last_sync_status"] = "running" + cfg["last_sync_error"] = None + save_config(cfg) + asyncio.create_task(_task()) + return RedirectResponse(url="/?sync=started", status_code=303) + + +# --------------------- +# Auto-processing loop +# --------------------- +_auto_task = None +_stop_event = asyncio.Event() + + +async def _auto_runner(): + # Delay a bit on startup + await asyncio.sleep(2) + while not _stop_event.is_set(): + cfg = load_config() + interval_min = int(cfg.get("auto_process_interval", 30) or 30) + if cfg.get("auto_process"): + # Sync emails then run alerts + try: + if not cfg.get("sync_in_progress"): + _sync_emails_once(cfg) + except Exception: + # keep loop alive + pass + db = SessionLocal() + try: + process_alerts(db, cfg) + finally: + db.close() + try: + await asyncio.wait_for( + _stop_event.wait(), timeout=max(5, interval_min * 60) + ) + except asyncio.TimeoutError: + continue + + +@app.on_event("startup") +async def on_startup(): + global _auto_task + _stop_event.clear() + _auto_task = asyncio.create_task(_auto_runner()) + + +@app.on_event("shutdown") +async def on_shutdown(): + if _auto_task: + _stop_event.set() + with suppress(Exception): + await _auto_task diff --git a/database.py b/src/database.py similarity index 78% rename from database.py rename to src/database.py index 536b1c3..8060aba 100644 --- a/database.py +++ b/src/database.py @@ -6,6 +6,7 @@ from sqlalchemy import ( Boolean, Column, DateTime, + Float, ForeignKey, Index, Integer, @@ -41,6 +42,47 @@ Base = declarative_base() def create_db_tables(): Base.metadata.create_all(bind=engine) + _migrate_add_missing_columns() + + +def _column_exists(table: str, column: str) -> bool: + try: + with engine.connect() as conn: + rows = conn.exec_driver_sql(f"PRAGMA table_info('{table}')").fetchall() + cols = {row[1] for row in rows} + return column in cols + except Exception: + return False + + +def _migrate_add_missing_columns() -> None: + """Best-effort SQLite migrations for newly added columns.""" + try: + with engine.begin() as conn: + if not _column_exists("threads", "actionable"): + conn.exec_driver_sql( + "ALTER TABLE threads ADD COLUMN actionable BOOLEAN NOT NULL DEFAULT 0" + ) + if not _column_exists("threads", "ai_summary"): + conn.exec_driver_sql("ALTER TABLE threads ADD COLUMN ai_summary TEXT") + if not _column_exists("threads", "ai_confidence"): + conn.exec_driver_sql( + "ALTER TABLE threads ADD COLUMN ai_confidence FLOAT" + ) + if not _column_exists("threads", "last_analyzed_at"): + conn.exec_driver_sql( + "ALTER TABLE threads ADD COLUMN last_analyzed_at DATETIME" + ) + if not _column_exists("threads", "last_alert_level_sent"): + conn.exec_driver_sql( + "ALTER TABLE threads ADD COLUMN last_alert_level_sent INTEGER NOT NULL DEFAULT 0" + ) + if not _column_exists("threads", "last_alert_sent_at"): + conn.exec_driver_sql( + "ALTER TABLE threads ADD COLUMN last_alert_sent_at DATETIME" + ) + except Exception as e: + print(f"DB migration warning: {e}") class Thread(Base): @@ -53,6 +95,13 @@ class Thread(Base): thread_key = Column(String, nullable=False) subject = Column(String, index=True) requires_reply = Column(Boolean, nullable=False, default=False) + # AI/alert fields + actionable = Column(Boolean, nullable=False, default=False) + ai_summary = Column(Text) + ai_confidence = Column(Float) + last_analyzed_at = Column(DateTime) + last_alert_level_sent = Column(Integer, nullable=False, default=0) + last_alert_sent_at = Column(DateTime) created_at = Column(DateTime, nullable=False, server_default=func.now()) updated_at = Column( DateTime, nullable=False, server_default=func.now(), onupdate=func.now() @@ -269,6 +318,25 @@ def get_threads_requiring_reply(db: Session, account_email: str) -> List[Thread] ) +def get_last_incoming_outgoing( + db: Session, thread_id: int +) -> Tuple[Optional["Message"], Optional["Message"]]: + """Return the last incoming and outgoing messages for a thread.""" + last_incoming = ( + db.query(Message) + .filter(Message.thread_id == thread_id, Message.is_incoming.is_(True)) + .order_by(Message.date_sent.desc(), Message.id.desc()) + .first() + ) + last_outgoing = ( + db.query(Message) + .filter(Message.thread_id == thread_id, Message.is_incoming.is_(False)) + .order_by(Message.date_sent.desc(), Message.id.desc()) + .first() + ) + return last_incoming, last_outgoing + + def ingest_emails( db: Session, account_email: str, diff --git a/whatsapp_sender.py b/src/whatsapp_sender.py similarity index 89% rename from whatsapp_sender.py rename to src/whatsapp_sender.py index 19a0e2c..bd52c43 100644 --- a/whatsapp_sender.py +++ b/src/whatsapp_sender.py @@ -9,13 +9,14 @@ load_dotenv() class WhatsAppSender: - def __init__(self): + def __init__(self, to_number: str | None = None): self.account_sid = os.getenv("TWILIO_ACCOUNT_SID") self.auth_token = os.getenv("TWILIO_AUTH_TOKEN") self.from_number = os.getenv("TWILIO_WHATSAPP_NUMBER") - self.to_number = os.getenv("WHATSAPP_TO_NUMBER") # Individual phone number + env_to = os.getenv("WHATSAPP_TO_NUMBER") + self.to_number = to_number or env_to # Individual phone number - if self.account_sid and self.auth_token: + if self.account_sid and self.auth_token and self.from_number and self.to_number: try: self.client = Client(self.account_sid, self.auth_token) self.use_mock = False @@ -24,11 +25,9 @@ class WhatsAppSender: self.use_mock = True else: self.use_mock = True - print("Note: Using mock WhatsApp sender (add Twilio credentials to .env)") - - # Use real WhatsApp mode - self.use_mock = False - print("📱 Using WhatsApp mode") + print( + "Note: Using mock WhatsApp sender (set TWILIO_* and WHATSAPP_TO_NUMBER or pass to_number)" + ) def send_alert(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]: """Send alert message to WhatsApp""" diff --git a/zoho_client.py b/src/zoho_client.py similarity index 100% rename from zoho_client.py rename to src/zoho_client.py diff --git a/templates/base.html b/templates/base.html index 9c9c2ef..9b7742a 100644 --- a/templates/base.html +++ b/templates/base.html @@ -13,6 +13,9 @@ diff --git a/templates/config.html b/templates/config.html index 61581a3..0019d62 100644 --- a/templates/config.html +++ b/templates/config.html @@ -47,6 +47,15 @@ +
Summary: {{ ai.summary }}
+Summary: {{ ai.summary or thread.ai_summary }}
Confidence: {{ ai.confidence }} • Model: {{ ai.model }}
+Last stored: {{ thread.last_analyzed_at }}
+Last alert level sent: {{ thread.last_alert_level_sent }} at {{ thread.last_alert_sent_at }}
Latest updated threads. Click an ID to view details and AI analysis.
+ {% if status %} +