From 1fd3a950930fec4dd9b90bd72dd8ac26ea748c46 Mon Sep 17 00:00:00 2001
From: bolade
Date: Tue, 12 Aug 2025 09:54:10 +0100
Subject: [PATCH] Refactor AI analysis and email fetching functionality
- Removed AI analysis guide and related documentation files.
- Updated email fetching logic to intelligently determine start date based on the latest email date in the database.
- Enhanced the app module to include a button for re-analyzing threads with AI.
- Improved database interactions to trigger AI analysis after ingesting sent emails.
- Adjusted the UI to display additional information about threads, including formatted latest message dates.
- Removed outdated test scripts for AI analysis and email fetching.
- Updated styles for better responsiveness and layout in the frontend.
---
AI_ANALYSIS_GUIDE.md | 164 ---------------------------
FETCH_MODIFICATION_SUMMARY.md | 79 -------------
example_workflow.py | 204 ----------------------------------
reset_analysis.py | 40 -------
src/app.py | 147 ++++++++++++++++++------
src/database.py | 6 +-
src/zoho_client.py | 10 +-
static/styles.css | 12 +-
templates/thread_detail.html | 27 +++--
templates/threads.html | 34 +++---
test_ai_analysis.py | 99 -----------------
test_fetch_with_db.py | 57 ----------
test_groq_api.py | 117 -------------------
test_single_analysis.py | 94 ----------------
14 files changed, 167 insertions(+), 923 deletions(-)
delete mode 100644 AI_ANALYSIS_GUIDE.md
delete mode 100644 FETCH_MODIFICATION_SUMMARY.md
delete mode 100644 example_workflow.py
delete mode 100644 reset_analysis.py
delete mode 100644 test_ai_analysis.py
delete mode 100644 test_fetch_with_db.py
delete mode 100644 test_groq_api.py
delete mode 100644 test_single_analysis.py
diff --git a/AI_ANALYSIS_GUIDE.md b/AI_ANALYSIS_GUIDE.md
deleted file mode 100644
index a7f8577..0000000
--- a/AI_ANALYSIS_GUIDE.md
+++ /dev/null
@@ -1,164 +0,0 @@
-# AI Thread Analysis with Asyncio
-
-This document explains how to use the new async AI analysis functionality for email threads.
-
-## Overview
-
-The new functionality adds AI-powered analysis to email threads, determining if they require attention (are "actionable") and generating concise summaries. It uses asyncio to process multiple threads concurrently for better performance.
-
-## Key Functions
-
-### `analyze_and_update_threads()`
-
-This is the main function you'll use to analyze threads.
-
-```python
-from src.database import analyze_and_update_threads
-
-# Analyze all unanalyzed threads for an account
-analyze_and_update_threads(
- account_email="user@company.com",
- max_concurrent=5,
- only_unanalyzed=True
-)
-
-# Analyze specific threads
-analyze_and_update_threads(
- account_email="user@company.com",
- thread_ids=[1, 2, 3],
- max_concurrent=3
-)
-```
-
-**Parameters:**
-- `account_email`: The email account to process
-- `thread_ids`: Optional list of specific thread IDs to analyze
-- `max_concurrent`: Maximum number of concurrent AI analysis tasks (default: 5)
-- `only_unanalyzed`: If True, only analyze threads that haven't been analyzed yet (default: True)
-
-### `get_threads_needing_analysis()`
-
-Check which threads need analysis:
-
-```python
-from src.database import get_threads_needing_analysis, SessionLocal
-
-db = SessionLocal()
-threads = get_threads_needing_analysis(db, "user@company.com")
-print(f"Found {len(threads)} threads needing analysis")
-db.close()
-```
-
-## Database Schema Updates
-
-The function updates the following Thread model fields:
-
-- `actionable`: Boolean indicating if the thread requires action
-- `ai_summary`: Text summary of the thread content
-- `ai_confidence`: Float (0.0-1.0) confidence score
-- `last_analyzed_at`: Timestamp of when analysis was performed
-
-## Complete Workflow Example
-
-Here's a complete workflow from email ingestion to AI analysis:
-
-```python
-from src.database import (
- SessionLocal,
- ingest_emails,
- analyze_and_update_threads,
- get_threads_requiring_reply
-)
-
-# 1. Ingest emails (using your existing email fetching logic)
-db = SessionLocal()
-try:
- # Assuming you have fetched emails from your email provider
- emails = [...] # Your email data
- ingest_emails(db, "user@company.com", emails)
-
- # 2. Run AI analysis on new threads
- analyze_and_update_threads(
- account_email="user@company.com",
- max_concurrent=5,
- only_unanalyzed=True
- )
-
- # 3. Get threads that need replies and are actionable
- reply_threads = get_threads_requiring_reply(db, "user@company.com")
- actionable_threads = [t for t in reply_threads if t.actionable]
-
- print(f"Found {len(actionable_threads)} actionable threads requiring replies")
-
-finally:
- db.close()
-```
-
-## AI Analysis Details
-
-The AI analysis:
-
-- Uses the Groq API if `GROQ_API_KEY` environment variable is set
-- Falls back to heuristic analysis if Groq is unavailable
-- Analyzes the last 4 messages in each thread by default
-- Generates summaries of ≤80 words
-- Identifies questions, requests, and actionable items
-- Ignores automated/newsletter emails
-
-## Performance
-
-- Uses asyncio for concurrent processing
-- Configurable concurrency limit (default: 5 concurrent analyses)
-- AI analysis runs in thread pool to avoid blocking
-- Efficient database operations with single commit per batch
-
-## Error Handling
-
-- Gracefully handles individual thread analysis failures
-- Continues processing other threads if one fails
-- Provides detailed error logging
-- Automatically rolls back database changes on failure
-
-## Usage Tips
-
-1. **Start with small batches**: Use `max_concurrent=3` initially to avoid overwhelming the AI service
-2. **Regular analysis**: Run analysis after each email ingestion cycle
-3. **Focus on actionable threads**: Prioritize threads that are both `requires_reply=True` and `actionable=True`
-4. **Monitor confidence scores**: Lower confidence may indicate uncertain analysis
-5. **Environment setup**: Set `GROQ_API_KEY` for better AI analysis quality
-
-## Testing
-
-Use the provided test scripts:
-
-```bash
-# Test the complete workflow
-python3 example_workflow.py
-
-# Test single thread analysis
-python3 test_single_analysis.py
-
-# Reset analysis data for testing
-python3 reset_analysis.py
-```
-
-## Integration with Existing Code
-
-To integrate with your existing email processing:
-
-```python
-# After your existing email ingestion
-from src.database import analyze_and_update_threads
-
-def process_emails(account_email: str):
- # Your existing email fetching and ingestion code
- fetch_and_ingest_emails(account_email)
-
- # Add AI analysis
- analyze_and_update_threads(
- account_email=account_email,
- only_unanalyzed=True
- )
-```
-
-This ensures that new threads are automatically analyzed for actionability after each email sync.
diff --git a/FETCH_MODIFICATION_SUMMARY.md b/FETCH_MODIFICATION_SUMMARY.md
deleted file mode 100644
index b7b3a4a..0000000
--- a/FETCH_MODIFICATION_SUMMARY.md
+++ /dev/null
@@ -1,79 +0,0 @@
-# Fetch Folder Emails Modification Summary
-
-## Changes Made
-
-### 1. Database Module (`database.py`)
-- Added `get_latest_email_date()` function to retrieve the most recent email date for a given account and folder
-- Added import for `datetime` to support the new function
-
-### 2. ZohoClient Module (`zoho_client.py`)
-- Modified `fetch_folder_emails()` to accept two new optional parameters:
- - `db_session`: Database session for querying latest email dates
- - `account_email`: Account email to identify which emails to check for latest date
-- Updated logic to:
- 1. Check database for latest email date if db_session and account_email are provided
- 2. Use latest date (minus 1 minute buffer) as start date for IMAP search
- 3. Fall back to `days_back` parameter if no database date is available
-- Updated `fetch_emails()` wrapper to support the new parameters
-- Enhanced documentation with detailed parameter descriptions
-
-### 3. App Module (`app.py`)
-- Modified email fetching calls to pass database session and account email
-- Reorganized code to create database session before fetching emails
-
-## How It Works
-
-### Before
-- Always fetched emails starting from X days back
-- No awareness of what emails were already in the database
-- Could result in fetching duplicate emails or missing recent ones
-
-### After
-- Intelligently determines start date based on database contents
-- If emails exist in database: starts from the latest email date
-- If no emails in database: falls back to `days_back` parameter
-- Adds 1-minute buffer to avoid missing emails with same timestamp
-- Reduces unnecessary email fetching and improves efficiency
-
-## Usage Examples
-
-### Basic Usage (Backwards Compatible)
-```python
-client = ZohoClient()
-emails = client.fetch_folder_emails(folder="INBOX", days_back=7)
-```
-
-### With Database Integration (Recommended)
-```python
-from database import SessionLocal
-
-db = SessionLocal()
-client = ZohoClient()
-emails = client.fetch_folder_emails(
- folder="INBOX",
- days_back=30, # Fallback only
- db_session=db,
- account_email="user@example.com"
-)
-db.close()
-```
-
-## Benefits
-
-1. **Efficiency**: Only fetches new emails since last sync
-2. **Reliability**: Ensures no emails are missed between syncs
-3. **Backwards Compatibility**: Still works without database parameters
-4. **Flexibility**: Falls back gracefully when database is empty
-5. **Performance**: Reduces IMAP server load and processing time
-
-## Testing
-
-Use `test_fetch_with_db.py` to test the new functionality:
-```bash
-python test_fetch_with_db.py
-```
-
-The test script demonstrates:
-- Checking latest email date in database
-- Fetching emails with database integration
-- Displaying results
diff --git a/example_workflow.py b/example_workflow.py
deleted file mode 100644
index a280bef..0000000
--- a/example_workflow.py
+++ /dev/null
@@ -1,204 +0,0 @@
-#!/usr/bin/env python3
-"""
-Complete workflow example: Ingest emails and then analyze them with AI.
-This demonstrates the full pipeline from email ingestion to AI analysis.
-"""
-
-import os
-import sys
-from datetime import datetime, timedelta
-
-# Add the src directory to the path
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
-
-from database import (
- Message,
- SessionLocal,
- Thread,
- analyze_and_update_threads,
- create_db_tables,
- get_threads_needing_analysis,
- ingest_emails,
-)
-
-
-def create_sample_emails(account_email: str) -> list:
- """Create some sample email data for testing."""
- now = datetime.now() # Using datetime.now() instead of utcnow()
-
- sample_emails = [
- {
- "messageId": "msg001",
- "subject": "Meeting Request - Project Review",
- "from": "colleague@company.com",
- "to": account_email,
- "date": now - timedelta(days=2),
- "body": "Hi, could we schedule a meeting to review the project progress? I'm available this week on Tuesday or Wednesday afternoon. Please let me know what works for you.",
- "folder": "INBOX",
- },
- {
- "messageId": "msg002",
- "subject": "Re: Meeting Request - Project Review",
- "from": account_email,
- "to": "colleague@company.com",
- "date": now - timedelta(days=1, hours=8),
- "body": "Sure! Wednesday afternoon works for me. How about 2 PM in the conference room?",
- "folder": "Sent",
- "inReplyTo": "msg001",
- },
- {
- "messageId": "msg003",
- "subject": "Re: Meeting Request - Project Review",
- "from": "colleague@company.com",
- "to": account_email,
- "date": now - timedelta(days=1, hours=6),
- "body": "Perfect! See you Wednesday at 2 PM. Should I prepare anything specific for the meeting?",
- "folder": "INBOX",
- "inReplyTo": "msg002",
- },
- {
- "messageId": "msg004",
- "subject": "Weekly Newsletter - Company Updates",
- "from": "no-reply@company.com",
- "to": account_email,
- "date": now - timedelta(hours=12),
- "body": "Welcome to this week's company newsletter! Here are the latest updates: New office opening, Q3 results, upcoming events...",
- "folder": "INBOX",
- },
- {
- "messageId": "msg005",
- "subject": "Urgent: Server Issue in Production",
- "from": "ops-team@company.com",
- "to": account_email,
- "date": now - timedelta(hours=2),
- "body": "We're experiencing a critical server issue in production. The application is currently down. Can you please help investigate? Login credentials are attached.",
- "folder": "INBOX",
- },
- ]
-
- return sample_emails
-
-
-def main():
- """Main function demonstrating the complete workflow."""
-
- # Create database tables if they don't exist
- create_db_tables()
-
- # Example account email
- account_email = "test-user@company.com"
-
- print(f"Starting workflow for account: {account_email}")
- print("=" * 50)
-
- # Get a database session
- db = SessionLocal()
- try:
- # Step 1: Ingest sample emails
- print("Step 1: Ingesting sample emails...")
- sample_emails = create_sample_emails(account_email)
-
- ingest_emails(db, account_email, sample_emails)
- print(f"✓ Ingested {len(sample_emails)} emails")
-
- # Show what was ingested
- threads = (
- db.query(Thread).filter(Thread.account_email == account_email.lower()).all()
- )
- print(f"✓ Created {len(threads)} threads")
-
- for thread in threads:
- messages = db.query(Message).filter(Message.thread_id == thread.id).count()
- print(f" - Thread {thread.id}: '{thread.subject}' ({messages} messages)")
-
- print()
-
- # Step 2: Check threads needing analysis
- print("Step 2: Checking threads needing analysis...")
- threads_needing_analysis = get_threads_needing_analysis(db, account_email)
- print(f"✓ Found {len(threads_needing_analysis)} threads needing analysis")
-
- if not threads_needing_analysis:
- print("No threads need analysis.")
- return
-
- print()
-
- # Step 3: Run AI analysis
- print("Step 3: Running AI analysis...")
- print(
- "This will analyze threads to determine if they're actionable and generate summaries."
- )
-
- analyze_and_update_threads(
- account_email=account_email, max_concurrent=3, only_unanalyzed=True
- )
-
- print("✓ AI analysis complete!")
- print()
-
- # Step 4: Show results
- print("Step 4: Analysis Results")
- print("-" * 30)
-
- # Show results
- analyzed_threads = (
- db.query(Thread)
- .filter(
- Thread.account_email == account_email.lower(),
- Thread.last_analyzed_at.isnot(None),
- )
- .all()
- )
-
- # Refresh the threads to get the latest data
- for thread in analyzed_threads:
- db.refresh(thread)
-
- actionable_count = sum(1 for t in analyzed_threads if t.actionable)
-
- print(f"Total analyzed threads: {len(analyzed_threads)}")
- print(f"Actionable threads: {actionable_count}")
- print(f"Non-actionable threads: {len(analyzed_threads) - actionable_count}")
- print()
-
- for thread in analyzed_threads:
- confidence = thread.ai_confidence or 0.0
- print(f"Thread {thread.id}: {thread.subject}")
- print(f" 📧 Actionable: {'YES' if thread.actionable else 'No'}")
- print(f" 🎯 Confidence: {confidence:.2f}")
- print(f" 📝 Summary: {thread.ai_summary or 'No summary available'}")
- print(f" 🕐 Analyzed: {thread.last_analyzed_at}")
- print()
-
- # Step 5: Show threads requiring replies
- print("Step 5: Threads Requiring Reply")
- print("-" * 30)
-
- from database import get_threads_requiring_reply
-
- reply_threads = get_threads_requiring_reply(db, account_email)
-
- print(f"Threads requiring reply: {len(reply_threads)}")
- for thread in reply_threads:
- actionable_note = (
- " (AI: Actionable)" if thread.actionable else " (AI: Not actionable)"
- )
- print(f" - {thread.subject}{actionable_note}")
-
- if reply_threads:
- print(
- "\n💡 Tip: Focus on threads marked as both 'requiring reply' and 'AI: Actionable'"
- )
-
- except Exception as e:
- print(f"❌ Error: {e}")
- import traceback
-
- traceback.print_exc()
- finally:
- db.close()
-
-
-if __name__ == "__main__":
- main()
diff --git a/reset_analysis.py b/reset_analysis.py
deleted file mode 100644
index fe03cdd..0000000
--- a/reset_analysis.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#!/usr/bin/env python3
-"""
-Reset AI analysis data for testing purposes.
-"""
-
-import os
-import sys
-
-# Add the src directory to the path
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
-
-from database import SessionLocal, Thread
-
-
-def main():
- """Reset analysis data for all threads."""
-
- db = SessionLocal()
- try:
- # Reset analysis data
- threads = db.query(Thread).all()
-
- for thread in threads:
- thread.actionable = False
- thread.ai_summary = None
- thread.ai_confidence = None
- thread.last_analyzed_at = None
-
- db.commit()
- print(f"Reset analysis data for {len(threads)} threads")
-
- except Exception as e:
- print(f"Error: {e}")
- db.rollback()
- finally:
- db.close()
-
-
-if __name__ == "__main__":
- main()
diff --git a/src/app.py b/src/app.py
index a549211..29fe57a 100644
--- a/src/app.py
+++ b/src/app.py
@@ -5,6 +5,7 @@ import os
from contextlib import suppress
from typing import List
+from dotenv import load_dotenv
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
@@ -22,7 +23,6 @@ from src.database import (
ingest_emails,
)
from src.zoho_client import ZohoClient
-from dotenv import load_dotenv
load_dotenv()
@@ -50,15 +50,63 @@ app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
def home(request: Request, db: Session = Depends(get_db), account: str | None = None):
- q = db.query(Thread).order_by(Thread.updated_at.desc())
+ from datetime import datetime
+
+ from sqlalchemy import func
+
+ # Subquery to get the latest message date for each thread
+ latest_message_subq = (
+ db.query(
+ Message.thread_id, func.max(Message.date_sent).label("latest_message_date")
+ )
+ .group_by(Message.thread_id)
+ .subquery()
+ )
+
+ # Main query joining threads with their latest message dates
+ q = (
+ db.query(Thread, latest_message_subq.c.latest_message_date)
+ .outerjoin(latest_message_subq, Thread.id == latest_message_subq.c.thread_id)
+ .order_by(latest_message_subq.c.latest_message_date.desc().nulls_last())
+ )
+
if account:
q = q.filter(Thread.account_email == account.lower())
- threads = q.limit(100).all()
+
+ results = q.limit(100).all()
+
+ # Create threads with additional info including sequential frontend ID and formatted latest message date
+ threads_with_info = []
+ for i, (thread, latest_message_date) in enumerate(results, 1):
+ # Format the latest message date to 12-hour format with hours and minutes only
+ formatted_date = None
+ if latest_message_date:
+ if isinstance(latest_message_date, str):
+ try:
+ # Try parsing if it's a string
+ dt = datetime.fromisoformat(
+ latest_message_date.replace("Z", "+00:00")
+ )
+ formatted_date = dt.strftime("%d/%m/%y %I:%M %p")
+ except Exception:
+ formatted_date = latest_message_date
+ elif hasattr(latest_message_date, "strftime"):
+ # It's already a datetime object
+ formatted_date = latest_message_date.strftime("%d/%m/%y %I:%M %p")
+
+ thread_info = {
+ "frontend_id": i,
+ "thread": thread,
+ "latest_message_date": latest_message_date,
+ "formatted_date": formatted_date or "N/A",
+ }
+ threads_with_info.append(thread_info)
+
return templates.TemplateResponse(
"threads.html",
{
"request": request,
- "threads": threads,
+ "threads": threads_with_info,
"account": account or "",
"status": _status_for_templates(),
},
@@ -66,36 +114,67 @@ def home(request: Request, db: Session = Depends(get_db), account: str | None =
@app.get("/thread/{thread_id}", response_class=HTMLResponse)
-def show_thread(thread_id: int, request: Request, db: Session = Depends(get_db)):
+def show_thread(
+ thread_id: int,
+ request: Request,
+ db: Session = Depends(get_db),
+ force_analyze: bool = False,
+):
thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none()
if not thread:
raise HTTPException(status_code=404, detail="Thread not found")
messages: List[Message] = get_thread_messages(db, thread_id)
- # Convert for AI analyzer and template
- msg_dicts = [
- {
- "id": m.id,
- "date_sent": m.date_sent,
- "subject": m.subject,
- "from_email": m.from_email,
- "to_email": m.to_email,
- "body": m.body,
- "is_incoming": m.is_incoming,
- }
- for m in messages
- ]
- ai = analyze_thread(thread.subject or "", msg_dicts)
- # Save AI info on the thread for listing and downstream alerts
- try:
- from datetime import datetime, timezone
- thread.actionable = bool(ai.get("actionable", False))
- thread.ai_summary = ai.get("summary")
- thread.ai_confidence = ai.get("confidence")
- thread.last_analyzed_at = datetime.now(timezone.utc)
- db.commit()
- except Exception:
- pass
+ # Check if we should use existing AI analysis or perform a new one
+ ai = None
+ should_analyze = force_analyze or not thread.last_analyzed_at
+
+ # Check if new messages have been added since last analysis
+ if not should_analyze and thread.last_analyzed_at and messages:
+ # Check if any messages are newer than the last analysis
+ newest_message_date = max(m.created_at for m in messages)
+ if newest_message_date > thread.last_analyzed_at:
+ should_analyze = True
+
+ # If we have existing analysis and don't need to re-analyze, use it
+ if not should_analyze and thread.last_analyzed_at:
+ ai = {
+ "actionable": thread.actionable,
+ "summary": thread.ai_summary,
+ "confidence": thread.ai_confidence,
+ "analyzed_at": thread.last_analyzed_at.isoformat()
+ if thread.last_analyzed_at
+ else None,
+ }
+
+ # Only analyze if we don't have existing data, force_analyze is True, or there are new messages
+ if should_analyze:
+ # Convert for AI analyzer
+ msg_dicts = [
+ {
+ "id": m.id,
+ "date_sent": m.date_sent,
+ "subject": m.subject,
+ "from_email": m.from_email,
+ "to_email": m.to_email,
+ "body": m.body,
+ "is_incoming": m.is_incoming,
+ }
+ for m in messages
+ ]
+ ai = analyze_thread(thread.subject or "", msg_dicts)
+ # Save AI info on the thread for listing and downstream alerts
+ try:
+ from datetime import datetime, timezone
+
+ thread.actionable = bool(ai.get("actionable", False))
+ thread.ai_summary = ai.get("summary")
+ thread.ai_confidence = ai.get("confidence")
+ thread.last_analyzed_at = datetime.now(timezone.utc)
+ db.commit()
+ except Exception:
+ pass
+
return templates.TemplateResponse(
"thread_detail.html",
{
@@ -332,11 +411,11 @@ def _sync_emails_background_task():
cfg = load_config() # Reload in case it was modified
cfg.update(
{
- "sync_in_progress": False,
- "last_sync_status": "success",
- "last_sync_at": datetime.now(timezone.utc).isoformat(),
- "last_sync_count": count,
- "last_sync_error": None,
+ "sync_in_progress": False,
+ "last_sync_status": "success",
+ "last_sync_at": datetime.now(timezone.utc).strftime("%d/%m/%y %I:%M %p"),
+ "last_sync_count": count,
+ "last_sync_error": None,
}
)
save_config(cfg)
diff --git a/src/database.py b/src/database.py
index 515b34e..bffe2cd 100644
--- a/src/database.py
+++ b/src/database.py
@@ -350,7 +350,7 @@ def ingest_emails(
Expected fields per email dict: subject, from, date, snippet/body, messageId, optional inReplyTo, optional to.
"""
from datetime import datetime
-
+ folder = default_folder
for e in emails:
# Map common keys from ZohoClient output
message_id = e.get("messageId") or e.get("id")
@@ -393,6 +393,10 @@ def ingest_emails(
db.commit()
+ if folder == "Sent":
+ analyze_and_update_threads(
+ account_email=account_email, max_concurrent=3, only_unanalyzed=True
+ )
def get_latest_email_date(
db: Session, account_email: str, folder: str = None
diff --git a/src/zoho_client.py b/src/zoho_client.py
index f93aca6..fa48fa5 100644
--- a/src/zoho_client.py
+++ b/src/zoho_client.py
@@ -70,7 +70,7 @@ class ZohoClient:
# Determine the start date for fetching emails
start_date = None
-
+ first_time = True
# If database session and account email are provided, try to get the latest email date
if db_session and account_email:
try:
@@ -82,11 +82,13 @@ class ZohoClient:
if latest_date:
# Add a small buffer (1 minute) to avoid missing emails with same timestamp
start_date = latest_date - timedelta(minutes=1)
+ first_time = False
print(f"📅 Using latest email date from database: {start_date}")
else:
print(
f"📅 No emails found in database, using days_back: {days_back}"
)
+ latest_date = datetime.now(timezone.utc) + timedelta(days=1)
except Exception as e:
print(
f"⚠️ Error getting latest date from database: {e}, falling back to days_back"
@@ -130,16 +132,18 @@ class ZohoClient:
email_message = email.message_from_bytes(raw_email)
date_header = email_message.get("Date", "")
email_date = parse_email_date_safely(date_header)
- print(f"📅 Email date: {email_date} Latest: {latest_date}")
+
# Ensure both dates are timezone-aware for comparison
if email_date and latest_date:
+ print(f"📅 Email date: {email_date} Latest: {latest_date}")
# If latest_date is timezone-naive, make it timezone-aware (assume UTC)
if latest_date.tzinfo is None:
latest_date = latest_date.replace(tzinfo=timezone.utc)
- if email_date > latest_date:
+ if (email_date > latest_date) or first_time:
# Extract headers
+ print(f"📅 Email date: {email_date} Latest: {latest_date}")
subject = self._decode_header(email_message.get("Subject", ""))
from_header = self._decode_header(email_message.get("From", ""))
to_header = self._decode_header(email_message.get("To", ""))
diff --git a/static/styles.css b/static/styles.css
index 4c30263..d0c9da2 100644
--- a/static/styles.css
+++ b/static/styles.css
@@ -29,13 +29,13 @@ header {
}
header .inner {
display: flex; align-items: center; justify-content: space-between;
- gap: 1rem; padding: 0.75rem 1rem; max-width: 1100px; margin: 0 auto;
+ gap: 1rem; padding: 0.75rem 1rem; max-width: 1400px; margin: 0 auto;
}
header h1 { font-size: 1.05rem; margin: 0; letter-spacing: 0.4px; }
nav a { text-decoration: none; color: var(--muted); margin-left: 0.75rem; }
nav a:hover { color: var(--text); }
-.container { max-width: 1100px; margin: 1.25rem auto; padding: 0 1rem; }
+.container { max-width: 1400px; margin: 1.25rem auto; padding: 0 1rem; }
h2 { margin: 0.25rem 0 0.75rem; font-size: 1.2rem; }
h3 { margin: 0.5rem 0 0.5rem; font-size: 1.05rem; color: var(--muted); }
@@ -54,10 +54,10 @@ h3 { margin: 0.5rem 0 0.5rem; font-size: 1.05rem; color: var(--muted); }
.badge.danger { color: #4b0a0a; background: #fee2e2; border-color: #fca5a5; }
.badge.brand { color: #0a2a62; background: #dbe8ff; border-color: #9fc0ff; }
-.table-wrap { overflow-x: auto; }
-table { border-collapse: collapse; width: 100%; font-size: 0.95rem; }
-th, td { border: 1px solid var(--border); padding: 10px; vertical-align: top; }
-th { background: #0f152a; color: var(--muted); text-align: left; position: sticky; top: 0; }
+.table-wrap { overflow-x: auto; margin: 0 -1rem; }
+table { border-collapse: collapse; width: 100%; font-size: 0.95rem; min-width: 800px; }
+th, td { border: 1px solid var(--border); padding: 12px 15px; vertical-align: top; }
+th { background: #0f152a; color: var(--muted); text-align: left; position: sticky; top: 0; font-weight: 600; }
tbody tr:hover { background: #0e1426; }
pre, code { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; }
diff --git a/templates/thread_detail.html b/templates/thread_detail.html
index 0240a30..0100728 100644
--- a/templates/thread_detail.html
+++ b/templates/thread_detail.html
@@ -12,6 +12,9 @@
Up to date
{% endif %}
+
@@ -19,14 +22,22 @@
AI Analysis
-
- Actionable:
- {% if ai.actionable %}Yes{% else %}No{% endif %}
-
-
Summary: {{ ai.summary or thread.ai_summary }}
-
Confidence: {{ ai.confidence }} • Model: {{ ai.model }}
-
Last stored: {{ thread.last_analyzed_at }}
-
Last alert level sent: {{ thread.last_alert_level_sent }} at {{ thread.last_alert_sent_at }}
+ {% if ai %}
+
+ Actionable:
+ {% if ai.actionable %}Yes{% else %}No{% endif %}
+
+
Summary: {{ ai.summary or "No summary available" }}
+
Confidence: {{ ai.confidence or "N/A" }}{% if ai.model %} • Model: {{ ai.model }}{% endif %}
+ {% if ai.analyzed_at %}
+
Analysis cached from: {{ ai.analyzed_at }}
+ {% elif thread.last_analyzed_at %}
+
Last analyzed: {{ thread.last_analyzed_at }}
+ {% endif %}
+ {% else %}
+
No AI analysis available
+ {% endif %}
+
Last alert level sent: {{ thread.last_alert_level_sent }}{% if thread.last_alert_sent_at %} at {{ thread.last_alert_sent_at }}{% endif %}
diff --git a/templates/threads.html b/templates/threads.html
index a5f7756..0c19a6b 100644
--- a/templates/threads.html
+++ b/templates/threads.html
@@ -30,38 +30,38 @@
-
-
+
+
- | ID |
- Subject |
- AI Summary |
- Account |
- Msgs |
- Requires Reply |
- Updated |
+ ID |
+ Subject |
+ AI Summary |
+ Account |
+ Msgs |
+ Requires Reply |
+ Last Message |
{% for t in threads %}
- | {{ t.id }} |
- {{ t.subject }} |
- {{ t.ai_summary or '' }} |
- {{ t.account_email }} |
- {{ t.messages|length }} |
+ {{ t.frontend_id }} |
+ {{ t.thread.subject }} |
+ {{ t.thread.ai_summary or '' }} |
+ {{ t.thread.account_email }} |
+ {{ t.thread.messages|length }} |
- {% if t.requires_reply %}
+ {% if t.thread.requires_reply %}
Needs reply
{% else %}
Up to date
{% endif %}
|
- {{ t.updated_at }} |
+ {{ t.formatted_date }} |
{% else %}
- | No threads yet |
+ | No threads yet |
{% endfor %}
diff --git a/test_ai_analysis.py b/test_ai_analysis.py
deleted file mode 100644
index fb41ff7..0000000
--- a/test_ai_analysis.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env python3
-"""
-Test script to demonstrate the AI analysis functionality.
-This script shows how to use the new async thread analysis function.
-"""
-
-import os
-import sys
-
-# Add the src directory to the path
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
-
-from database import (
- SessionLocal,
- Thread,
- analyze_and_update_threads,
- create_db_tables,
- get_threads_needing_analysis,
-)
-
-
-def main():
- """Main function to test the AI analysis."""
-
- # Create database tables if they don't exist
- create_db_tables()
-
- # Example account email - replace with your actual account
- account_email = "your-email@example.com"
-
- # Get a database session
- db = SessionLocal()
- try:
- # Check how many threads need analysis
- threads_needing_analysis = get_threads_needing_analysis(db, account_email)
- print(
- f"Found {len(threads_needing_analysis)} threads needing analysis for {account_email}"
- )
-
- if not threads_needing_analysis:
- print(
- "No threads need analysis. Make sure you have ingested some emails first."
- )
-
- # Show all threads for this account
- all_threads = (
- db.query(Thread)
- .filter(Thread.account_email == account_email.lower())
- .all()
- )
- print(f"Total threads for {account_email}: {len(all_threads)}")
-
- if all_threads:
- print("Sample threads:")
- for thread in all_threads[:3]:
- print(
- f" Thread {thread.id}: {thread.subject} (analyzed: {thread.last_analyzed_at is not None})"
- )
-
- return
-
- print(f"Starting AI analysis for {len(threads_needing_analysis)} threads...")
-
- # Run the analysis with max 3 concurrent tasks
- analyze_and_update_threads(
- account_email=account_email, max_concurrent=3, only_unanalyzed=True
- )
-
- print("Analysis complete!")
-
- # Show results
- analyzed_threads = (
- db.query(Thread)
- .filter(
- Thread.account_email == account_email.lower(),
- Thread.last_analyzed_at.isnot(None),
- )
- .all()
- )
-
- print(f"\nAnalyzed {len(analyzed_threads)} threads:")
- for thread in analyzed_threads[:5]: # Show first 5
- print(f" Thread {thread.id}: {thread.subject[:50]}...")
- print(f" Actionable: {thread.actionable}")
- print(f" Confidence: {thread.ai_confidence:.2f}")
- print(f" Summary: {thread.ai_summary[:100]}...")
- print()
-
- except Exception as e:
- print(f"Error: {e}")
- import traceback
-
- traceback.print_exc()
- finally:
- db.close()
-
-
-if __name__ == "__main__":
- main()
diff --git a/test_fetch_with_db.py b/test_fetch_with_db.py
deleted file mode 100644
index 3c88705..0000000
--- a/test_fetch_with_db.py
+++ /dev/null
@@ -1,57 +0,0 @@
-#!/usr/bin/env python3
-"""
-Test script to demonstrate the modified fetch_folder_emails function
-that uses database dates for intelligent email fetching.
-"""
-
-from src.database import SessionLocal, get_latest_email_date
-from src.zoho_client import ZohoClient
-
-
-def test_fetch_with_database():
- """Test the modified fetch function with database integration."""
-
- # Create database session
- db = SessionLocal()
-
- try:
- # Create Zoho client
- client = ZohoClient()
-
- # Example account email (replace with actual)
- account_email = client.email
-
- print("=== Testing fetch_folder_emails with database integration ===")
-
- # Check what's the latest date in database
- latest_date = get_latest_email_date(db, account_email, "INBOX")
- print(f"Latest email date in database: {latest_date}")
-
- # Fetch emails using the new function
- emails = client.fetch_folder_emails(
- folder="INBOX",
- max_results=10,
- days_back=30, # Fallback if no database date
- db_session=db,
- account_email=account_email,
- )
-
- print(f"Fetched {len(emails)} emails")
-
- # Display first few emails
- for i, email in enumerate(emails[:3]):
- print(f"\nEmail {i + 1}:")
- print(f" Subject: {email.get('subject', 'No subject')}")
- print(f" From: {email.get('from', 'Unknown')}")
- print(f" Date: {email.get('date', 'Unknown')}")
-
- client.close()
-
- except Exception as e:
- print(f"Error: {e}")
- finally:
- db.close()
-
-
-if __name__ == "__main__":
- test_fetch_with_database()
diff --git a/test_groq_api.py b/test_groq_api.py
deleted file mode 100644
index 6c2b8bc..0000000
--- a/test_groq_api.py
+++ /dev/null
@@ -1,117 +0,0 @@
-#!/usr/bin/env python3
-"""
-Test script to validate Groq API configuration and diagnose issues.
-"""
-
-import os
-import sys
-
-# Add the src directory to the path
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
-
-from ai import analyze_thread
-
-
-def test_groq_api():
- """Test the Groq API with a simple request."""
-
- print("Testing Groq API Configuration")
- print("=" * 40)
-
- # Check API key
- api_key = os.getenv("GROQ_API_KEY")
- if not api_key:
- print("❌ GROQ_API_KEY environment variable not set")
- print(" Please set your Groq API key: export GROQ_API_KEY='your-key-here'")
- return False
-
- print(f"✓ GROQ_API_KEY found (length: {len(api_key)})")
-
- # Check model
- model = os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile")
- print(f"✓ Using model: {model}")
-
- # Test with simple data
- print("\nTesting simple analysis...")
-
- simple_messages = [
- {
- "date_sent": "2025-08-11 10:00:00",
- "is_incoming": True,
- "subject": "Test Question",
- "from_email": "test@example.com",
- "to_email": "user@example.com",
- "body": "Can you help me with this issue? Please let me know.",
- }
- ]
-
- try:
- result = analyze_thread("Test Question", simple_messages)
- print("✓ Analysis successful!")
- print(f" Model used: {result.get('model', 'unknown')}")
- print(f" Actionable: {result.get('actionable', False)}")
- print(f" Confidence: {result.get('confidence', 0)}")
- print(f" Summary: {result.get('summary', 'No summary')[:100]}...")
-
- if result.get("model") == "heuristic":
- print("\n⚠️ Note: Fell back to heuristic analysis")
- print(" This might indicate an API issue")
-
- return True
-
- except Exception as e:
- print(f"❌ Analysis failed: {e}")
- import traceback
-
- traceback.print_exc()
- return False
-
-
-def test_rate_limiting():
- """Test rate limiting by making multiple quick requests."""
-
- print("\nTesting Rate Limiting")
- print("=" * 40)
-
- import time
-
- simple_messages = [
- {
- "date_sent": "2025-08-11 10:00:00",
- "is_incoming": True,
- "subject": "Quick test",
- "from_email": "test@example.com",
- "to_email": "user@example.com",
- "body": "Quick test message.",
- }
- ]
-
- start_time = time.time()
-
- for i in range(3):
- print(f"Request {i + 1}...")
- result = analyze_thread(f"Test {i + 1}", simple_messages)
- print(f" Result: {result.get('model', 'unknown')} analysis")
-
- total_time = time.time() - start_time
- print(f"\nTotal time for 3 requests: {total_time:.2f} seconds")
-
- if total_time < 2.5:
- print("⚠️ Requests completed very quickly - rate limiting may not be working")
- else:
- print("✓ Rate limiting appears to be working")
-
-
-if __name__ == "__main__":
- success = test_groq_api()
-
- if success:
- test_rate_limiting()
-
- print("\nTroubleshooting Tips:")
- print("- If getting 400 errors: Check message content for special characters")
- print("- If getting 401 errors: Verify GROQ_API_KEY is correct")
- print(
- "- If getting 429 errors: Reduce max_concurrent in analyze_and_update_threads()"
- )
- print("- If getting 503 errors: Groq service may be temporarily unavailable")
diff --git a/test_single_analysis.py b/test_single_analysis.py
deleted file mode 100644
index 587adda..0000000
--- a/test_single_analysis.py
+++ /dev/null
@@ -1,94 +0,0 @@
-#!/usr/bin/env python3
-"""
-Simple test to debug the AI analysis function.
-"""
-
-import os
-import sys
-
-# Add the src directory to the path
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
-
-from ai import analyze_thread
-from database import SessionLocal, Thread, get_thread_messages
-
-
-def test_single_analysis():
- """Test analyzing a single thread."""
-
- db = SessionLocal()
- try:
- # Get a thread to test with
- thread = (
- db.query(Thread)
- .filter(Thread.account_email == "test-user@company.com")
- .first()
- )
-
- if not thread:
- print("No threads found for test-user@company.com")
- return
-
- print(f"Testing thread {thread.id}: {thread.subject}")
- print(
- f"Before analysis: actionable={thread.actionable}, confidence={thread.ai_confidence}"
- )
-
- # Get messages
- messages = get_thread_messages(db, thread.id)
- print(f"Found {len(messages)} messages")
-
- # Convert to dict format
- message_dicts = []
- for msg in messages:
- message_dicts.append(
- {
- "date_sent": msg.date_sent,
- "is_incoming": msg.is_incoming,
- "subject": msg.subject,
- "from_email": msg.from_email,
- "to_email": msg.to_email,
- "body": msg.body,
- }
- )
-
- # Run analysis
- print("Running AI analysis...")
- analysis_result = analyze_thread(thread.subject, message_dicts)
- print(f"Analysis result: {analysis_result}")
-
- # Update thread
- thread.actionable = analysis_result.get("actionable", False)
- thread.ai_summary = analysis_result.get("summary", "")
- thread.ai_confidence = float(analysis_result.get("confidence", 0.0))
- from datetime import datetime
-
- thread.last_analyzed_at = datetime.now()
-
- print(
- f"Updated thread: actionable={thread.actionable}, confidence={thread.ai_confidence}"
- )
- print(f"Summary: {thread.ai_summary}")
-
- # Commit changes
- db.commit()
- print("Changes committed")
-
- # Verify the changes
- db.refresh(thread)
- print(
- f"After refresh: actionable={thread.actionable}, confidence={thread.ai_confidence}"
- )
-
- except Exception as e:
- print(f"Error: {e}")
- import traceback
-
- traceback.print_exc()
- db.rollback()
- finally:
- db.close()
-
-
-if __name__ == "__main__":
- test_single_analysis()