commit ca7df9d598dbdd222595daeab35af211dd1e002e Author: bolade Date: Mon Aug 11 14:46:35 2025 +0100 saving progress diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..697f158 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.venv/ +.env +config.json +*__pycache__/ +*.db +*.txt \ No newline at end of file diff --git a/ai.py b/ai.py new file mode 100644 index 0000000..288e8d5 --- /dev/null +++ b/ai.py @@ -0,0 +1,101 @@ +import json +import os +from typing import Dict, List + +from groq import Groq + + +def _format_messages_for_context(messages: List[dict]) -> str: + lines = [] + for m in messages: + direction = "IN" if m.get("is_incoming", True) else "OUT" + date = m.get("date_sent") + subj = m.get("subject") or "" + from_email = m.get("from_email") or "" + to_email = m.get("to_email") or "" + body = (m.get("body") or "").strip() + if len(body) > 1000: + body = body[:1000] + "..." + lines.append( + f"[{date}] [{direction}] {from_email} -> {to_email}\nSubject: {subj}\n{body}" + ) + return "\n\n---\n\n".join(lines) + + +def _heuristic_analyze(messages: List[dict]) -> Dict: + # Simple fallback if Groq isn't available + body_concat = "\n\n".join([(m.get("body") or "") for m in messages[-4:]]) + question_like = "?" in body_concat or any( + kw in body_concat.lower() + for kw in ["could you", "can you", "please", "let me know", "need", "request"] + ) + last_subj = (messages[-1].get("subject") or "") if messages else "" + return { + "actionable": bool(question_like), + "summary": (body_concat[:350] + "...") + if len(body_concat) > 350 + else body_concat, + "subject": last_subj, + "confidence": 0.35, + "model": "heuristic", + } + + +def analyze_thread( + thread_subject: str, messages: List[dict], max_messages: int = 4 +) -> Dict: + """ + Analyze a thread using Groq LLM. Returns dict with keys: + - actionable: bool + - summary: str + - subject: str + - confidence: float (0..1) + - model: str + Gracefully falls back to a heuristic when GROQ_API_KEY is missing or calls fail. + """ + msgs = messages[-max_messages:] if max_messages else messages + + api_key = os.getenv("GROQ_API_KEY") + if not api_key: + return _heuristic_analyze(msgs) + + client = Groq(api_key=api_key) + + system_prompt = ( + "You are a helpful assistant that triages email threads and writes concise summaries. " + "Decide if the thread requires a reply from our side now, based on the last few messages. " + "Ignore newsletters/automations (e.g., from no-reply), and focus on whether there's a clear question or request. " + "Return a strict JSON object with keys: actionable (true/false), summary (<= 80 words), confidence (0..1)." + ) + + user_prompt = ( + f"Thread subject: {thread_subject or ''}\n\n" + "Recent messages (oldest to newest):\n\n" + f"{_format_messages_for_context(msgs)}\n\n" + "Respond with only JSON, no extra commentary." + ) + + try: + completion = client.chat.completions.create( + model=os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile"), + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + temperature=0.2, + max_tokens=300, + ) + content = completion.choices[0].message.content.strip() + # Attempt to extract JSON + data = json.loads(content) + data.setdefault("subject", thread_subject or "") + data.setdefault("model", os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile")) + # Basic validation + if not isinstance(data.get("actionable"), bool) or not isinstance( + data.get("summary"), str + ): + raise ValueError("Invalid schema from model") + return data + except Exception: + # Fallback to heuristic + return _heuristic_analyze(msgs) diff --git a/app.py b/app.py new file mode 100644 index 0000000..cfc5dba --- /dev/null +++ b/app.py @@ -0,0 +1,204 @@ +import json +import os +from typing import List + +from fastapi import Depends, FastAPI, HTTPException, Request +from fastapi.responses import HTMLResponse, RedirectResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session + +from ai import analyze_thread +from database import ( + Message, + SessionLocal, + Thread, + create_db_tables, + get_thread_messages, +) + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + + +create_db_tables() +app = FastAPI(title="Email Alerts UI") + +# Static and templates +os.makedirs("templates", exist_ok=True) +os.makedirs("static", exist_ok=True) +templates = Jinja2Templates(directory="templates") +app.mount("/static", StaticFiles(directory="static"), name="static") + + +@app.get("/", response_class=HTMLResponse) +def home(request: Request, db: Session = Depends(get_db), account: str | None = None): + q = db.query(Thread).order_by(Thread.updated_at.desc()) + if account: + q = q.filter(Thread.account_email == account.lower()) + threads = q.limit(100).all() + return templates.TemplateResponse( + "threads.html", + { + "request": request, + "threads": threads, + "account": account or "", + }, + ) + + +@app.get("/thread/{thread_id}", response_class=HTMLResponse) +def show_thread(thread_id: int, request: Request, db: Session = Depends(get_db)): + thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none() + if not thread: + raise HTTPException(status_code=404, detail="Thread not found") + messages: List[Message] = get_thread_messages(db, thread_id) + # Convert for AI analyzer and template + msg_dicts = [ + { + "id": m.id, + "date_sent": m.date_sent, + "subject": m.subject, + "from_email": m.from_email, + "to_email": m.to_email, + "body": m.body, + "is_incoming": m.is_incoming, + } + for m in messages + ] + ai = analyze_thread(thread.subject or "", msg_dicts) + return templates.TemplateResponse( + "thread_detail.html", + { + "request": request, + "thread": thread, + "messages": messages, + "ai": ai, + }, + ) + + +@app.get("/health") +def health(): + return {"status": "ok"} + + +# --------------------- +# Config editor routes +# --------------------- + +CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.json") + + +def load_config() -> dict: + if not os.path.exists(CONFIG_PATH): + return { + "email_address": "", + "time_frames": [ + {"name": "1-24 hours", "hours": 24, "alert_level": 1}, + {"name": "24-48 hours", "hours": 48, "alert_level": 2}, + {"name": "48+ hours", "hours": 72, "alert_level": 3}, + ], + "email_days_back": 7, + "agency_domains": [], + "zoho_email": "", + "zoho_app_password": "", + "auto_process": False, + "auto_process_interval": 30, + } + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + return json.load(f) + + +def save_config(cfg: dict) -> None: + with open(CONFIG_PATH, "w", encoding="utf-8") as f: + json.dump(cfg, f, indent=2, ensure_ascii=False) + + +@app.get("/config", response_class=HTMLResponse) +def config_form(request: Request, saved: int | None = None): + cfg = load_config() + # Render up to existing frames or at least 3 rows + frames = cfg.get("time_frames") or [] + min_rows = max(len(frames), 3) + return templates.TemplateResponse( + "config.html", + { + "request": request, + "cfg": cfg, + "rows": list(range(min_rows)), + "saved": bool(saved), + }, + ) + + +@app.post("/config") +async def config_save(request: Request): + form = await request.form() + cfg = load_config() + + # Basic fields + cfg["email_address"] = (form.get("email_address") or "").strip() + # email_days_back + try: + cfg["email_days_back"] = int( + form.get("email_days_back") or cfg.get("email_days_back", 7) + ) + except Exception: + cfg["email_days_back"] = 7 + # agency domains (comma or newline separated) + domains_raw = (form.get("agency_domains") or "").replace("\r", "") + parts = [p.strip() for p in domains_raw.replace(",", "\n").split("\n") if p.strip()] + cfg["agency_domains"] = parts + # auto_process + cfg["auto_process"] = form.get("auto_process") == "on" + # auto_process_interval + try: + cfg["auto_process_interval"] = int( + form.get("auto_process_interval") or cfg.get("auto_process_interval", 30) + ) + except Exception: + cfg["auto_process_interval"] = 30 + + # Zoho (optional - note: current client reads env vars) + cfg["zoho_email"] = (form.get("zoho_email") or cfg.get("zoho_email", "")).strip() + cfg["zoho_app_password"] = ( + form.get("zoho_app_password") or cfg.get("zoho_app_password", "") + ).strip() + + # Time frames: collect indexed rows + frames: list[dict] = [] + # find indices present + indices = set() + for k in form.keys(): + if k.startswith("time_name_"): + try: + indices.add(int(k.split("_")[-1])) + except Exception: + pass + for i in sorted(indices): + name = (form.get(f"time_name_{i}") or "").strip() + hrs_raw = form.get(f"time_hours_{i}") or "" + lvl_raw = form.get(f"time_alert_{i}") or "" + if not name and not hrs_raw and not lvl_raw: + continue + try: + hours = int(hrs_raw) + except Exception: + hours = 0 + try: + level = int(lvl_raw) + except Exception: + level = 0 + if name: + frames.append({"name": name, "hours": hours, "alert_level": level}) + if frames: + cfg["time_frames"] = frames + + save_config(cfg) + return RedirectResponse(url="/config?saved=1", status_code=303) diff --git a/database.py b/database.py new file mode 100644 index 0000000..536b1c3 --- /dev/null +++ b/database.py @@ -0,0 +1,324 @@ +from email.utils import parseaddr +from typing import Annotated, Iterable, List, Optional, Tuple + +from fastapi import Depends +from sqlalchemy import ( + Boolean, + Column, + DateTime, + ForeignKey, + Index, + Integer, + String, + Text, + UniqueConstraint, + create_engine, + func, +) +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Session, relationship, sessionmaker + +SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" + +engine = create_engine( + SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} +) + +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + + +db_dependency = Annotated[Session, Depends(get_db)] +Base = declarative_base() + + +def create_db_tables(): + Base.metadata.create_all(bind=engine) + + +class Thread(Base): + __tablename__ = "threads" + + id = Column(Integer, primary_key=True, index=True) + # The mailbox this thread belongs to (scopes data when analyzing multiple inboxes) + account_email = Column(String, nullable=False, index=True) + # A stable key for the thread, typically the root message-id (or a synthetic key) + thread_key = Column(String, nullable=False) + subject = Column(String, index=True) + requires_reply = Column(Boolean, nullable=False, default=False) + created_at = Column(DateTime, nullable=False, server_default=func.now()) + updated_at = Column( + DateTime, nullable=False, server_default=func.now(), onupdate=func.now() + ) + + # Ensure uniqueness per account + __table_args__ = ( + UniqueConstraint("account_email", "thread_key", name="uq_thread_account_key"), + Index("ix_threads_account_updated", "account_email", "updated_at"), + ) + + # ORM relationship + messages = relationship( + "Message", + back_populates="thread", + cascade="all, delete-orphan", + order_by="Message.date_sent", + ) + + +class Message(Base): + __tablename__ = "messages" + + id = Column(Integer, primary_key=True, index=True) + # Links to Thread + thread_id = Column(Integer, ForeignKey("threads.id"), nullable=False, index=True) + + # RFC 5322 identifiers for threading + message_id = Column(String, nullable=False, unique=True, index=True) + in_reply_to = Column(String, index=True) # parent message-id if any + + # Headers / metadata + subject = Column(String, index=True) + from_email = Column(String, index=True) + to_email = Column(String, index=True) + folder = Column(String, index=True) # e.g. INBOX, Sent + is_incoming = Column(Boolean, nullable=False, default=True) + + date_sent = Column(DateTime, index=True) + body = Column(Text) + + created_at = Column(DateTime, nullable=False, server_default=func.now()) + updated_at = Column( + DateTime, nullable=False, server_default=func.now(), onupdate=func.now() + ) + + thread = relationship("Thread", back_populates="messages") + + __table_args__ = ( + Index("ix_messages_thread_date", "thread_id", "date_sent"), + Index("ix_messages_inreplyto", "in_reply_to"), + ) + + +# ---------------------- +# Utility / DAO functions +# ---------------------- + + +def _normalize_email(addr: Optional[str]) -> str: + if not addr: + return "" + name, email_addr = parseaddr(addr) + return email_addr.lower() + + +def _is_incoming_message(account_email: str, from_email: str) -> bool: + account = (account_email or "").lower() + sender = _normalize_email(from_email) + # If sender is the account itself, it's outgoing; otherwise incoming + return sender != account and sender != "" + + +def find_or_create_thread( + db: Session, + *, + account_email: str, + subject: Optional[str], + message_id: str, + in_reply_to: Optional[str] = None, +) -> Thread: + """ + Resolves the thread for a message. + Strategy: + - If in_reply_to matches an existing message, reuse its thread and its thread_key. + - Else if a message with message_id already exists, reuse its thread. + - Else create a new thread using message_id as thread_key. + """ + account_email = (account_email or "").lower() + + # 1) Try to find parent by in_reply_to + parent_msg: Optional[Message] = None + if in_reply_to: + parent_msg = ( + db.query(Message) + .join(Thread, Message.thread_id == Thread.id) + .filter( + Message.message_id == in_reply_to, + Thread.account_email == account_email, + ) + .one_or_none() + ) + if parent_msg: + # Parent's thread + parent_thread = parent_msg.thread + return parent_thread + + # 2) If message exists already, reuse its thread (idempotent ingest) + existing_msg = ( + db.query(Message) + .join(Thread, Message.thread_id == Thread.id) + .filter(Message.message_id == message_id, Thread.account_email == account_email) + .one_or_none() + ) + if existing_msg: + return existing_msg.thread + + # 3) Create a new thread using message_id as the thread_key + thread = Thread(account_email=account_email, thread_key=message_id, subject=subject) + db.add(thread) + db.flush() # assign id + return thread + + +def upsert_message( + db: Session, + *, + account_email: str, + message_id: str, + subject: Optional[str], + from_email: Optional[str], + to_email: Optional[str], + date_sent, + body: Optional[str], + in_reply_to: Optional[str] = None, + folder: Optional[str] = None, +) -> Tuple[Message, Thread]: + """Insert or update a message, linking it to the proper thread.""" + thread = find_or_create_thread( + db, + account_email=account_email, + subject=subject, + message_id=message_id, + in_reply_to=in_reply_to, + ) + + msg = db.query(Message).filter_by(message_id=message_id).one_or_none() + if msg is None: + msg = Message(message_id=message_id, thread_id=thread.id) + db.add(msg) + + msg.thread_id = thread.id + msg.in_reply_to = in_reply_to + msg.subject = subject + msg.from_email = _normalize_email(from_email) + msg.to_email = _normalize_email(to_email) + msg.date_sent = date_sent + msg.body = body + msg.folder = folder or "INBOX" + msg.is_incoming = _is_incoming_message(account_email, msg.from_email) + + # Keep thread subject if missing; otherwise prefer the earliest subject + if not thread.subject and subject: + thread.subject = subject + + # Update requires_reply flag after inserting/updating the message + update_thread_requires_reply(db, thread, account_email) + + return msg, thread + + +def update_thread_requires_reply( + db: Session, thread: Thread, account_email: str +) -> None: + """Set thread.requires_reply based on the latest message direction. + + Rule: If the most recent message in the thread is incoming (from someone else), + then the thread requires a reply. Otherwise, it doesn't. + """ + latest: Optional[Message] = ( + db.query(Message) + .filter(Message.thread_id == thread.id) + .order_by(Message.date_sent.desc(), Message.id.desc()) + .first() + ) + if latest is None: + thread.requires_reply = False + else: + thread.requires_reply = latest.is_incoming + + # Touch updated_at + thread.updated_at = func.now() + db.flush() + + +def get_thread_messages(db: Session, thread_id: int) -> List[Message]: + return ( + db.query(Message) + .filter(Message.thread_id == thread_id) + .order_by(Message.date_sent.asc(), Message.id.asc()) + .all() + ) + + +def get_threads_requiring_reply(db: Session, account_email: str) -> List[Thread]: + return ( + db.query(Thread) + .filter( + Thread.account_email == account_email.lower(), + Thread.requires_reply.is_(True), + ) + .order_by(Thread.updated_at.desc()) + .all() + ) + + +def ingest_emails( + db: Session, + account_email: str, + emails: Iterable[dict], + default_folder: str = "INBOX", +) -> None: + """ + Bulk-ingest emails fetched via ZohoClient.fetch_emails. + Expected fields per email dict: subject, from, date, snippet/body, messageId, optional inReplyTo, optional to. + """ + from datetime import datetime + + for e in emails: + # Map common keys from ZohoClient output + message_id = e.get("messageId") or e.get("id") + if not message_id: + # Skip messages without identifiers + continue + + subject = e.get("subject") + from_header = e.get("from") or e.get("from_email") + to_header = e.get("to") or e.get("to_email") + in_reply_to = e.get("inReplyTo") or e.get("in_reply_to") + folder = e.get("folder") or default_folder + body = e.get("body") or e.get("snippet") + + # Parse date if it's a string + date_val = e.get("date") or e.get("date_sent") + if isinstance(date_val, str): + try: + # Try multiple formats; fall back to now on failure + from email.utils import parsedate_to_datetime + + date_sent = parsedate_to_datetime(date_val) + except Exception: + date_sent = datetime.utcnow() + else: + date_sent = date_val + + upsert_message( + db, + account_email=account_email, + message_id=message_id, + subject=subject, + from_email=from_header, + to_email=to_header, + date_sent=date_sent, + body=body, + in_reply_to=in_reply_to, + folder=folder, + ) + + db.commit() diff --git a/main.py b/main.py new file mode 100644 index 0000000..e08db99 --- /dev/null +++ b/main.py @@ -0,0 +1,140 @@ +from typing import List, Optional + +from database import ( + Message, + SessionLocal, + Thread, + create_db_tables, + get_thread_messages, + get_threads_requiring_reply, + ingest_emails, +) +from zoho_client import ZohoClient + + +def ingest_action( + account_email: str, days_back: int = 7, max_results: int = 50 +) -> None: + create_db_tables() + client = ZohoClient(email=account_email) + inbox = client.fetch_folder_emails( + folder="INBOX", max_results=max_results, days_back=days_back + ) + sent = client.fetch_folder_emails( + folder="Sent", max_results=max_results, days_back=days_back + ) + client.close() + + db = SessionLocal() + try: + ingest_emails( + db, account_email=account_email, emails=inbox, default_folder="INBOX" + ) + ingest_emails( + db, account_email=account_email, emails=sent, default_folder="Sent" + ) + threads: List[Thread] = get_threads_requiring_reply(db, account_email) + print(f"Threads requiring reply for {account_email}: {len(threads)}") + for t in threads: + print( + f"- Thread #{t.id} | Subject: {t.subject!r} | requires_reply={t.requires_reply}" + ) + finally: + db.close() + + +def list_threads_action( + account_email: Optional[str] = None, limit: int = 20, only_requiring: bool = False +) -> None: + create_db_tables() + db = SessionLocal() + try: + q = db.query(Thread).order_by(Thread.updated_at.desc()) + if account_email: + q = q.filter(Thread.account_email == account_email.lower()) + if only_requiring: + q = q.filter(Thread.requires_reply.is_(True)) + threads = q.limit(limit).all() + print( + f"Showing {len(threads)} threads" + + (f" for {account_email}" if account_email else "") + ) + for t in threads: + count = db.query(Message).filter(Message.thread_id == t.id).count() + print( + f"- id={t.id} msgs={count} requires_reply={t.requires_reply} subject={t.subject!r}" + ) + finally: + db.close() + + +def show_thread_action(thread_id: int) -> None: + create_db_tables() + db = SessionLocal() + try: + thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none() + if not thread: + print(f"Thread {thread_id} not found") + return + + print( + f"Thread #{thread.id} subject={thread.subject!r} account={thread.account_email} requires_reply={thread.requires_reply}" + ) + messages: List[Message] = get_thread_messages(db, thread.id) + for i, m in enumerate(messages, 1): + direction = "IN" if m.is_incoming else "OUT" + snippet = (m.body or "").strip().replace("\n", " ") + if len(snippet) > 140: + snippet = snippet[:140] + "..." + print( + f"[{i}] {m.date_sent} [{direction}] {m.folder} | from={m.from_email} -> to={m.to_email}\n" + f" subject={m.subject!r}\n" + f" message_id={m.message_id} in_reply_to={m.in_reply_to}\n" + f" body={snippet}" + ) + finally: + db.close() + + +if __name__ == "__main__": + import argparse + import os + + parser = argparse.ArgumentParser(description="Email alerts utility") + sub = parser.add_subparsers(dest="cmd", required=False) + + p_ingest = sub.add_parser("ingest", help="Fetch INBOX and Sent and ingest into DB") + p_ingest.add_argument( + "--account", dest="account", default=os.getenv("ZOHO_EMAIL", "") + ) + p_ingest.add_argument("--days-back", dest="days_back", type=int, default=7) + p_ingest.add_argument("--max-results", dest="max_results", type=int, default=50) + + p_list = sub.add_parser("list-threads", help="List threads") + p_list.add_argument("--account", dest="account", default=None) + p_list.add_argument("--limit", dest="limit", type=int, default=20) + p_list.add_argument("--only-requiring", dest="only_req", action="store_true") + + p_show = sub.add_parser("show-thread", help="Print all messages in a thread") + p_show.add_argument("thread_id", type=int) + + args = parser.parse_args() + + if args.cmd == "ingest": + acct = args.account or os.getenv("ZOHO_EMAIL", "") + if not acct: + raise SystemExit("Provide --account or set ZOHO_EMAIL") + ingest_action(acct, days_back=args.days_back, max_results=args.max_results) + elif args.cmd == "list-threads": + list_threads_action( + account_email=args.account, limit=args.limit, only_requiring=args.only_req + ) + elif args.cmd == "show-thread": + show_thread_action(args.thread_id) + else: + # Default behavior: run ingest using env and then list requiring-reply threads + acct = os.getenv("ZOHO_EMAIL", "") + if not acct: + parser.print_help() + raise SystemExit(0) + ingest_action(acct) diff --git a/static/styles.css b/static/styles.css new file mode 100644 index 0000000..4c30263 --- /dev/null +++ b/static/styles.css @@ -0,0 +1,114 @@ +:root { + --bg: #0b1020; + --panel: #11162a; + --panel-soft: #151b30; + --text: #e6e8ef; + --muted: #a6adc8; + --brand: #4f8cff; + --brand-weak: rgba(79, 140, 255, 0.15); + --success: #22c55e; + --warn: #f59e0b; + --danger: #ef4444; + --border: #23304f; +} + +* { box-sizing: border-box; } +html, body { height: 100%; } +body { + font-family: system-ui, -apple-system, Segoe UI, Roboto, Ubuntu, Cantarell, "Helvetica Neue", Arial; + color: var(--text); + background: radial-gradient(1200px 800px at 20% -10%, #1a2140 0%, var(--bg) 45%), var(--bg); + margin: 0; +} + +header { + position: sticky; top: 0; z-index: 10; + background: rgba(10, 14, 28, 0.7); + backdrop-filter: blur(8px); + border-bottom: 1px solid var(--border); +} +header .inner { + display: flex; align-items: center; justify-content: space-between; + gap: 1rem; padding: 0.75rem 1rem; max-width: 1100px; margin: 0 auto; +} +header h1 { font-size: 1.05rem; margin: 0; letter-spacing: 0.4px; } +nav a { text-decoration: none; color: var(--muted); margin-left: 0.75rem; } +nav a:hover { color: var(--text); } + +.container { max-width: 1100px; margin: 1.25rem auto; padding: 0 1rem; } + +h2 { margin: 0.25rem 0 0.75rem; font-size: 1.2rem; } +h3 { margin: 0.5rem 0 0.5rem; font-size: 1.05rem; color: var(--muted); } +.muted { color: var(--muted); } + +.card { + background: linear-gradient(180deg, var(--panel) 0%, var(--panel-soft) 100%); + border: 1px solid var(--border); + border-radius: 10px; padding: 1rem; box-shadow: 0 6px 24px rgba(0,0,0,0.25); +} +.card + .card { margin-top: 1rem; } + +.badge { display: inline-block; padding: 0.2rem 0.5rem; border-radius: 999px; font-size: 0.75rem; border: 1px solid var(--border); background: #0e1425; color: var(--muted); } +.badge.success { color: #0f2f1d; background: #d1fae5; border-color: #86efac; } +.badge.warn { color: #3b2a07; background: #fef3c7; border-color: #fcd34d; } +.badge.danger { color: #4b0a0a; background: #fee2e2; border-color: #fca5a5; } +.badge.brand { color: #0a2a62; background: #dbe8ff; border-color: #9fc0ff; } + +.table-wrap { overflow-x: auto; } +table { border-collapse: collapse; width: 100%; font-size: 0.95rem; } +th, td { border: 1px solid var(--border); padding: 10px; vertical-align: top; } +th { background: #0f152a; color: var(--muted); text-align: left; position: sticky; top: 0; } +tbody tr:hover { background: #0e1426; } + +pre, code { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; } +pre { background: #0b1121; padding: 0.75rem; border-radius: 8px; white-space: pre-wrap; border: 1px solid var(--border); } + +/* Chat-style messages */ +.messages { display: flex; flex-direction: column; gap: 0.75rem; } +.msg-item { display: flex; } +.msg-item.incoming { justify-content: flex-start; } +.msg-item.outgoing { justify-content: flex-end; } +.msg-bubble { + max-width: 800px; width: fit-content; + background: #0f152a; border: 1px solid var(--border); border-radius: 12px; + padding: 0.75rem 0.9rem; box-shadow: 0 4px 20px rgba(0,0,0,0.2); +} +.incoming .msg-bubble { background: #0f152a; } +.outgoing .msg-bubble { background: var(--brand-weak); border-color: #345fb0; } +.msg-meta { font-size: 0.78rem; color: var(--muted); margin-bottom: 0.35rem; } +.msg-subject { font-size: 0.9rem; margin-bottom: 0.25rem; color: var(--text); } +.msg-body { font-size: 0.92rem; } + +.row { display: flex; gap: 1rem; flex-wrap: wrap; } +.col { flex: 1 1 360px; } + +/* Links */ +a { color: var(--brand); } +a:hover { text-decoration: underline; } + +/* Small helpers */ +.pill { padding: 0.15rem 0.5rem; border-radius: 999px; border: 1px solid var(--border); } +.right { text-align: right; } +.mt-1 { margin-top: 0.5rem; } .mt-2 { margin-top: 1rem; } +.mb-1 { margin-bottom: 0.5rem; } .mb-2 { margin-bottom: 1rem; } + +/* Forms */ +label { display: block; font-size: 0.9rem; color: var(--muted); } +input[type="text"], input[type="email"], input[type="number"], input[type="password"], textarea, select { + width: 100%; + background: #0b1121; + color: var(--text); + border: 1px solid var(--border); + border-radius: 8px; + padding: 0.5rem 0.6rem; + outline: none; +} +input:focus, textarea:focus, select:focus { border-color: #3c64b1; box-shadow: 0 0 0 3px rgba(79, 140, 255, 0.2); } + +button { + appearance: none; border: 0; cursor: pointer; + background: linear-gradient(180deg, #5a8dff 0%, #3c64b1 100%); + color: white; padding: 0.55rem 0.9rem; border-radius: 8px; + box-shadow: 0 6px 18px rgba(63, 99, 183, 0.35); +} +button:hover { filter: brightness(1.05); } diff --git a/templates/base.html b/templates/base.html new file mode 100644 index 0000000..9c9c2ef --- /dev/null +++ b/templates/base.html @@ -0,0 +1,23 @@ + + + + + + Email Alerts + + + +
+
+

Email Alerts

+ +
+
+
+ {% block content %}{% endblock %} +
+ + diff --git a/templates/config.html b/templates/config.html new file mode 100644 index 0000000..61581a3 --- /dev/null +++ b/templates/config.html @@ -0,0 +1,90 @@ +{% extends "base.html" %} +{% block content %} +
+
+

Configuration

+ {% if saved %} +

Saved

+ {% endif %} +

Edit core settings for triage, time frames, and processing.

+
+
+ +
+

General

+
+
+ +
+
+ +
+
+ +
+
+ +
+
+ +

Zoho (optional)

+
+
+ +
+
+ +
+
+ +

Processing

+
+
+ +
+
+ +
+
+ +

Alert Time Frames

+
+ + + + + + + + + + {% set frames = cfg.time_frames %} + {% for i in rows %} + {% set row = frames[i] if frames and i < frames|length else None %} + + + + + + {% endfor %} + +
NameHoursAlert Level
+
+ +
+ +
+
+{% endblock %} diff --git a/templates/thread_detail.html b/templates/thread_detail.html new file mode 100644 index 0000000..8432947 --- /dev/null +++ b/templates/thread_detail.html @@ -0,0 +1,51 @@ +{% extends "base.html" %} +{% block content %} +
+
+

Thread #{{ thread.id }}

+

Subject: {{ thread.subject }}

+

Account: {{ thread.account_email }}

+

+ {% if thread.requires_reply %} + Needs reply + {% else %} + Up to date + {% endif %} +

+
+
+ +
+
+
+

AI Analysis

+

+ Actionable: + {% if ai.actionable %}Yes{% else %}No{% endif %} +

+

Summary: {{ ai.summary }}

+

Confidence: {{ ai.confidence }} • Model: {{ ai.model }}

+
+
+
+ +
+
+

Messages

+
+ {% for m in messages %} +
+
+
{{ m.date_sent }} • {% if m.is_incoming %}Incoming{% else %}Outgoing{% endif %} • {{ m.folder }}
+
From: {{ m.from_email }} → To: {{ m.to_email }}
+
{{ m.subject }}
+
{{ m.body }}
+
+
+ {% else %} +

No messages.

+ {% endfor %} +
+
+
+{% endblock %} diff --git a/templates/threads.html b/templates/threads.html new file mode 100644 index 0000000..2a7c5b0 --- /dev/null +++ b/templates/threads.html @@ -0,0 +1,44 @@ +{% extends "base.html" %} +{% block content %} +
+
+

Threads{% if account %} for {{ account }}{% endif %}

+

Latest updated threads. Click an ID to view details and AI analysis.

+
+
+ +
+ + + + + + + + + + + + + {% for t in threads %} + + + + + + + + + {% else %} + + {% endfor %} + +
IDSubjectAccountMsgsRequires ReplyUpdated
{{ t.id }}{{ t.subject }}{{ t.account_email }}{{ t.messages|length }} + {% if t.requires_reply %} + Needs reply + {% else %} + Up to date + {% endif %} + {{ t.updated_at }}
No threads yet
+
+{% endblock %} diff --git a/whatsapp_sender.py b/whatsapp_sender.py new file mode 100644 index 0000000..19a0e2c --- /dev/null +++ b/whatsapp_sender.py @@ -0,0 +1,121 @@ +import os +from typing import Any, Dict, List + +from dotenv import load_dotenv +from twilio.base.exceptions import TwilioException +from twilio.rest import Client + +load_dotenv() + + +class WhatsAppSender: + def __init__(self): + self.account_sid = os.getenv("TWILIO_ACCOUNT_SID") + self.auth_token = os.getenv("TWILIO_AUTH_TOKEN") + self.from_number = os.getenv("TWILIO_WHATSAPP_NUMBER") + self.to_number = os.getenv("WHATSAPP_TO_NUMBER") # Individual phone number + + if self.account_sid and self.auth_token: + try: + self.client = Client(self.account_sid, self.auth_token) + self.use_mock = False + except Exception as e: + print(f"Warning: Twilio client failed to initialize: {e}") + self.use_mock = True + else: + self.use_mock = True + print("Note: Using mock WhatsApp sender (add Twilio credentials to .env)") + + # Use real WhatsApp mode + self.use_mock = False + print("📱 Using WhatsApp mode") + + def send_alert(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]: + """Send alert message to WhatsApp""" + if self.use_mock: + return self._mock_send(alert_message, thread_id) + + try: + # Format message for WhatsApp + formatted_message = self._format_message(alert_message) + + # Send to WhatsApp + message = self.client.messages.create( + from_=f"whatsapp:{self.from_number}", + body=formatted_message, + to=f"whatsapp:{self.to_number}", + ) + + return { + "status": "success", + "message_sid": message.sid, + "thread_id": thread_id, + "sent_at": message.date_created, + } + + except TwilioException as e: + print(f"WhatsApp send error: {e}") + return {"status": "error", "error": str(e), "thread_id": thread_id} + + def _mock_send(self, alert_message: str, thread_id: str = None) -> Dict[str, Any]: + """Mock WhatsApp sending for testing""" + print("📱 [MOCK] WhatsApp Alert Sent:") + print(f" To: {self.to_number or 'your_number'}") + print(f" Thread ID: {thread_id}") + print(f" Message: {alert_message[:100]}...") + return { + "status": "success", + "message_sid": "mock_sid_123", + "thread_id": thread_id, + "sent_at": "2024-01-15T10:00:00Z", + } + + def _format_message(self, alert_message: str) -> str: + """Format alert message for WhatsApp""" + # WhatsApp has character limits, so we might need to truncate + max_length = 1000 + if len(alert_message) > max_length: + alert_message = alert_message[: max_length - 3] + "..." + + return alert_message + + def send_bulk_alerts(self, alerts: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Send multiple alerts to WhatsApp""" + results = [] + + for alert in alerts: + message = alert.get("message", "") + thread_id = alert.get("thread_id", "unknown") + + result = self.send_alert(message, thread_id) + results.append(result) + + # Add small delay between messages to avoid rate limits + import time + + time.sleep(1) + + return results + + +if __name__ == "__main__": + # Test WhatsApp sender + sender = WhatsAppSender() + + test_message = """ +🚨 LEVEL 1 ALERT (24 Hours) + +🟢 Urgency: LOW +📧 Thread ID: test_thread_123 + +📝 Summary: +Client inquiry about project status. Requires follow-up. + +🎯 Action Required: +Respond to client question + +⏰ Confidence: 70.0% + """.strip() + + result = sender.send_alert(test_message, "test_thread_123") + print(f"Send result: {result}") diff --git a/zoho_client.py b/zoho_client.py new file mode 100644 index 0000000..72b3475 --- /dev/null +++ b/zoho_client.py @@ -0,0 +1,280 @@ +import email +import imaplib +import os +from datetime import datetime, timedelta +from email.header import decode_header +from typing import Any, Dict, List + +from dotenv import load_dotenv + +load_dotenv() + + +class ZohoClient: + def __init__(self, email=None, app_password=None): + self.imap_server = "imap.zoho.com" + self.imap_port = 993 + # Use provided credentials or fall back to environment variables + self.email = email or os.getenv("ZOHO_EMAIL", "") + self.app_password = app_password or os.getenv("ZOHO_PASSWORD", "") + + if not self.email or not self.app_password: + raise ValueError("Zoho email and app password must be provided") + + self.connection = None + self._connect() + + def _connect(self): + """Connect to Zoho IMAP server using app password""" + try: + self.connection = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) + self.connection.login(self.email, self.app_password) + print(f"✅ Connected to Zoho IMAP server as {self.email}") + except Exception as e: + print(f"❌ Failed to connect to Zoho IMAP: {e}") + print("💡 Make sure IMAP is enabled in your Zoho Mail settings") + raise + + def fetch_folder_emails( + self, + folder: str = "INBOX", + query: str = None, + max_results: int = None, + days_back: int = 7, + ) -> List[Dict[str, Any]]: + """Fetch emails from a given folder with date filtering""" + try: + # Select folder + mailbox = '"Sent"' if folder.lower() == "sent" else folder + print(f"📥 Selecting {folder}...[STEP 2]") + self.connection.select(mailbox) + + # Build search criteria - only emails from specified days back + days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y") + search_criteria = f"SINCE {days_ago}" + + if query: + search_criteria += f" {query}" + + # Search for emails + status, message_numbers = self.connection.search(None, search_criteria) + + if status != "OK": + print(f"❌ Search failed: {status}") + return [] + + email_list = message_numbers[0].split() + + # Limit results if specified + if max_results is not None: + email_list = email_list[-max_results:] # Get the most recent emails + + emails = [] + for i, num in enumerate(email_list): + try: + print(f"📧 Fetching email {num.decode()}... [STEP 3] {i}") + # Fetch email data + status, data = self.connection.fetch(num, "(RFC822)") + + if status == "OK": + raw_email = data[0][1] + email_message = email.message_from_bytes(raw_email) + + # Extract headers + subject = self._decode_header(email_message.get("Subject", "")) + from_header = self._decode_header(email_message.get("From", "")) + to_header = self._decode_header(email_message.get("To", "")) + date_header = email_message.get("Date", "") + message_id = email_message.get("Message-ID", "") + in_reply_to = email_message.get("In-Reply-To", "") + + # Generate thread ID (using Message-ID as fallback) + thread_id = message_id or f"thread_{num.decode()}" + + # Get email body snippet + body = self._get_email_body(email_message) + snippet = body[:200] + "..." if len(body) > 200 else body + + email_data = { + "id": num.decode(), + "threadId": thread_id, + "from": from_header, + "to": to_header, + "subject": subject, + "date": date_header, + "messageId": message_id, + "inReplyTo": in_reply_to, + "folder": folder, + "snippet": snippet, + } + + emails.append(email_data) + + except Exception as e: + print(f"❌ Error processing email {num}: {e}") + continue + + print( + f"📧 Fetched {len(emails)} real emails from {folder} for last {days_back} days" + ) + return emails + + except Exception as e: + print(f"❌ Error fetching emails from {folder}: {e}") + return [] + + def fetch_emails( + self, query: str = None, max_results: int = None, days_back: int = 7 + ) -> List[Dict[str, Any]]: + """Fetch emails from INBOX (backwards-compatible wrapper).""" + return self.fetch_folder_emails( + folder="INBOX", query=query, max_results=max_results, days_back=days_back + ) + + def _decode_header(self, header_value: str) -> str: + """Decode email header values""" + if not header_value: + return "" + + try: + decoded_parts = decode_header(header_value) + decoded_string = "" + + for part, encoding in decoded_parts: + if isinstance(part, bytes): + if encoding: + decoded_string += part.decode(encoding) + else: + decoded_string += part.decode("utf-8", errors="ignore") + else: + decoded_string += str(part) + + return decoded_string + except Exception: + return str(header_value) + + def _get_email_body(self, email_message) -> str: + """Extract email body text""" + body = "" + + if email_message.is_multipart(): + for part in email_message.walk(): + if part.get_content_type() == "text/plain": + try: + body += part.get_payload(decode=True).decode( + "utf-8", errors="ignore" + ) + except Exception: + pass + else: + try: + body = email_message.get_payload(decode=True).decode( + "utf-8", errors="ignore" + ) + except Exception: + pass + + return body + + def get_thread_messages(self, thread_id: str) -> List[Dict[str, Any]]: + """Get all messages in a thread (simplified for IMAP)""" + # For IMAP, we'll return a single message since thread grouping is more complex + # This is a simplified implementation + return [] + + def check_sent_folder_for_replies(self, subject: str, days_back: int = 7) -> bool: + """Check SENT folder to see if we've replied to emails with this subject""" + try: + # Select SENT folder + self.connection.select('"Sent"') # Zoho uses "Sent" folder + + # Build search criteria for sent emails within the date range + days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y") + + # Search for emails with similar subject (remove "Re:" prefixes for matching) + clean_subject = ( + subject.replace("Re: ", "").replace("RE: ", "").replace("re: ", "") + ) + search_criteria = f'SINCE {days_ago} SUBJECT "{clean_subject}"' + + status, message_numbers = self.connection.search(None, search_criteria) + + if status == "OK" and message_numbers[0]: + # Found sent emails with matching subject + return True + + return False + + except Exception as e: + print(f"❌ Error checking sent folder: {e}") + return False + finally: + # Always return to INBOX + try: + self.connection.select("INBOX") + except Exception: + pass + + def check_message_reply_status( + self, message_id: str, subject: str, days_back: int = 7 + ) -> bool: + """Check if a specific message has been replied to by checking SENT folder""" + try: + # First check by subject in SENT folder + if self.check_sent_folder_for_replies(subject, days_back): + return True + + # Additional check: look for In-Reply-To headers matching our message ID + self.connection.select('"Sent"') + days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y") + + # Search all sent emails in date range + status, message_numbers = self.connection.search(None, f"SINCE {days_ago}") + + if status == "OK" and message_numbers[0]: + email_list = message_numbers[0].split() + + for num in email_list[ + -20: + ]: # Check last 20 sent emails for performance + try: + status, data = self.connection.fetch(num, "(RFC822)") + if status == "OK": + raw_email = data[0][1] + email_message = email.message_from_bytes(raw_email) + + # Check In-Reply-To header + in_reply_to = email_message.get("In-Reply-To", "") + if message_id and message_id in in_reply_to: + return True + + except Exception: + continue + + return False + + except Exception as e: + print(f"❌ Error checking message reply status: {e}") + return False + finally: + # Always return to INBOX + try: + self.connection.select("INBOX") + except Exception: + pass + + def close(self): + """Close the IMAP connection""" + if self.connection: + try: + self.connection.close() + self.connection.logout() + except Exception as e: + print(f"Error closing connection: {e}") + + +if __name__ == "__main__": + client = ZohoClient() + emails = client.fetch_emails(max_results=10) + print(f"Fetched {len(emails)} emails") + client.close()