feat: Enhance notification system with WebSocket support and auto-hide alerts
This commit is contained in:
@@ -108,6 +108,7 @@ def process_alerts(db: Session, cfg: dict) -> List[int]:
|
||||
|
||||
# Only if last message is incoming or last_in is later than last_out
|
||||
if last_out_dt and last_out_dt > last_in_dt:
|
||||
print("Here in lies the problem")
|
||||
# There's a reply from us after the last incoming; skip
|
||||
continue
|
||||
|
||||
|
||||
+62
-5
@@ -6,7 +6,7 @@ from contextlib import suppress
|
||||
from typing import List
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request
|
||||
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request, WebSocket
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
@@ -345,6 +345,17 @@ def process(db: Session = Depends(get_db)):
|
||||
return RedirectResponse(url=f"/?alerts_processed={len(alerted)}", status_code=303)
|
||||
|
||||
|
||||
def send_to_all(message: str):
|
||||
logging.info(
|
||||
"Broadcasting to %d clients: %s", len(manager.active_connections), message
|
||||
)
|
||||
loop = getattr(app.state, "loop", None)
|
||||
if not loop or not loop.is_running():
|
||||
logging.warning("ASGI event loop not ready; skipping broadcast")
|
||||
return
|
||||
asyncio.run_coroutine_threadsafe(manager.broadcast(message), loop)
|
||||
|
||||
|
||||
def _sync_emails_once(cfg: dict) -> int:
|
||||
"""Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count."""
|
||||
from datetime import datetime, timezone
|
||||
@@ -364,14 +375,16 @@ def _sync_emails_once(cfg: dict) -> int:
|
||||
days_back = max(1, delta_days)
|
||||
except Exception:
|
||||
pass
|
||||
max_results = 100
|
||||
max_results = 10
|
||||
client = ZohoClient(
|
||||
email=cfg.get("zoho_email") or account_email,
|
||||
app_password=cfg.get("zoho_app_password"),
|
||||
)
|
||||
|
||||
db = SessionLocal()
|
||||
|
||||
try:
|
||||
send_to_all("Fetching emails from inbox...")
|
||||
inbox = client.fetch_folder_emails(
|
||||
folder="INBOX",
|
||||
max_results=max_results,
|
||||
@@ -379,6 +392,8 @@ def _sync_emails_once(cfg: dict) -> int:
|
||||
db_session=db,
|
||||
account_email=account_email,
|
||||
)
|
||||
|
||||
send_to_all("Fetching emails from sent...")
|
||||
sent = client.fetch_folder_emails(
|
||||
folder="Sent",
|
||||
max_results=max_results,
|
||||
@@ -389,6 +404,7 @@ def _sync_emails_once(cfg: dict) -> int:
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
send_to_all("Analysing Threads with AI")
|
||||
try:
|
||||
ingest_emails(
|
||||
db, account_email=account_email, emails=inbox, default_folder="INBOX"
|
||||
@@ -402,7 +418,9 @@ def _sync_emails_once(cfg: dict) -> int:
|
||||
.filter(Thread.account_email == account_email.lower())
|
||||
.count()
|
||||
)
|
||||
send_to_all("Email synced Successfully")
|
||||
return count
|
||||
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
@@ -445,19 +463,57 @@ def _sync_emails_background_task():
|
||||
@app.post("/sync_emails")
|
||||
async def sync_emails(background_tasks: BackgroundTasks):
|
||||
cfg = load_config()
|
||||
|
||||
if cfg.get("sync_in_progress"):
|
||||
return RedirectResponse(url="/?sync=busy", status_code=303)
|
||||
return {"status": "already_running"}
|
||||
|
||||
# Mark sync as starting
|
||||
cfg["sync_in_progress"] = True
|
||||
cfg["last_sync_status"] = "running"
|
||||
cfg["last_sync_error"] = None
|
||||
save_config(cfg)
|
||||
|
||||
# Add the background task
|
||||
background_tasks.add_task(_sync_emails_background_task)
|
||||
|
||||
return RedirectResponse(url="/?sync=started", status_code=303)
|
||||
return {"status": "syncing"}
|
||||
|
||||
|
||||
class ConnectionManager:
|
||||
def __init__(self):
|
||||
self.active_connections: List[WebSocket] = []
|
||||
|
||||
async def connect(self, websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
self.active_connections.append(websocket)
|
||||
|
||||
def disconnect(self, websocket: WebSocket):
|
||||
with suppress(ValueError):
|
||||
self.active_connections.remove(websocket)
|
||||
|
||||
async def broadcast(self, message: str):
|
||||
for connection in list(self.active_connections):
|
||||
try:
|
||||
await connection.send_json({"message": message})
|
||||
except Exception:
|
||||
with suppress(Exception):
|
||||
await connection.close()
|
||||
with suppress(ValueError):
|
||||
if connection in self.active_connections:
|
||||
self.active_connections.remove(connection)
|
||||
|
||||
|
||||
manager = ConnectionManager()
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await manager.connect(websocket)
|
||||
try:
|
||||
while True:
|
||||
data = await websocket.receive_text()
|
||||
await manager.broadcast(f"{data}")
|
||||
except Exception:
|
||||
manager.disconnect(websocket)
|
||||
|
||||
|
||||
# ---------------------
|
||||
@@ -529,6 +585,7 @@ async def _auto_runner():
|
||||
|
||||
@app.on_event("startup")
|
||||
async def on_startup():
|
||||
app.state.loop = asyncio.get_running_loop()
|
||||
# Reset any stale sync status from previous session
|
||||
try:
|
||||
cfg = load_config()
|
||||
|
||||
Reference in New Issue
Block a user