Refactor AI analysis and email fetching functionality

- Removed AI analysis guide and related documentation files.
- Updated email fetching logic to intelligently determine start date based on the latest email date in the database.
- Enhanced the app module to include a button for re-analyzing threads with AI.
- Improved database interactions to trigger AI analysis after ingesting sent emails.
- Adjusted the UI to display additional information about threads, including formatted latest message dates.
- Removed outdated test scripts for AI analysis and email fetching.
- Updated styles for better responsiveness and layout in the frontend.
This commit is contained in:
bolade
2025-08-12 09:54:10 +01:00
parent 75a0a3fde7
commit 1fd3a95093
14 changed files with 167 additions and 923 deletions
-164
View File
@@ -1,164 +0,0 @@
# AI Thread Analysis with Asyncio
This document explains how to use the new async AI analysis functionality for email threads.
## Overview
The new functionality adds AI-powered analysis to email threads, determining if they require attention (are "actionable") and generating concise summaries. It uses asyncio to process multiple threads concurrently for better performance.
## Key Functions
### `analyze_and_update_threads()`
This is the main function you'll use to analyze threads.
```python
from src.database import analyze_and_update_threads
# Analyze all unanalyzed threads for an account
analyze_and_update_threads(
account_email="user@company.com",
max_concurrent=5,
only_unanalyzed=True
)
# Analyze specific threads
analyze_and_update_threads(
account_email="user@company.com",
thread_ids=[1, 2, 3],
max_concurrent=3
)
```
**Parameters:**
- `account_email`: The email account to process
- `thread_ids`: Optional list of specific thread IDs to analyze
- `max_concurrent`: Maximum number of concurrent AI analysis tasks (default: 5)
- `only_unanalyzed`: If True, only analyze threads that haven't been analyzed yet (default: True)
### `get_threads_needing_analysis()`
Check which threads need analysis:
```python
from src.database import get_threads_needing_analysis, SessionLocal
db = SessionLocal()
threads = get_threads_needing_analysis(db, "user@company.com")
print(f"Found {len(threads)} threads needing analysis")
db.close()
```
## Database Schema Updates
The function updates the following Thread model fields:
- `actionable`: Boolean indicating if the thread requires action
- `ai_summary`: Text summary of the thread content
- `ai_confidence`: Float (0.0-1.0) confidence score
- `last_analyzed_at`: Timestamp of when analysis was performed
## Complete Workflow Example
Here's a complete workflow from email ingestion to AI analysis:
```python
from src.database import (
SessionLocal,
ingest_emails,
analyze_and_update_threads,
get_threads_requiring_reply
)
# 1. Ingest emails (using your existing email fetching logic)
db = SessionLocal()
try:
# Assuming you have fetched emails from your email provider
emails = [...] # Your email data
ingest_emails(db, "user@company.com", emails)
# 2. Run AI analysis on new threads
analyze_and_update_threads(
account_email="user@company.com",
max_concurrent=5,
only_unanalyzed=True
)
# 3. Get threads that need replies and are actionable
reply_threads = get_threads_requiring_reply(db, "user@company.com")
actionable_threads = [t for t in reply_threads if t.actionable]
print(f"Found {len(actionable_threads)} actionable threads requiring replies")
finally:
db.close()
```
## AI Analysis Details
The AI analysis:
- Uses the Groq API if `GROQ_API_KEY` environment variable is set
- Falls back to heuristic analysis if Groq is unavailable
- Analyzes the last 4 messages in each thread by default
- Generates summaries of ≤80 words
- Identifies questions, requests, and actionable items
- Ignores automated/newsletter emails
## Performance
- Uses asyncio for concurrent processing
- Configurable concurrency limit (default: 5 concurrent analyses)
- AI analysis runs in thread pool to avoid blocking
- Efficient database operations with single commit per batch
## Error Handling
- Gracefully handles individual thread analysis failures
- Continues processing other threads if one fails
- Provides detailed error logging
- Automatically rolls back database changes on failure
## Usage Tips
1. **Start with small batches**: Use `max_concurrent=3` initially to avoid overwhelming the AI service
2. **Regular analysis**: Run analysis after each email ingestion cycle
3. **Focus on actionable threads**: Prioritize threads that are both `requires_reply=True` and `actionable=True`
4. **Monitor confidence scores**: Lower confidence may indicate uncertain analysis
5. **Environment setup**: Set `GROQ_API_KEY` for better AI analysis quality
## Testing
Use the provided test scripts:
```bash
# Test the complete workflow
python3 example_workflow.py
# Test single thread analysis
python3 test_single_analysis.py
# Reset analysis data for testing
python3 reset_analysis.py
```
## Integration with Existing Code
To integrate with your existing email processing:
```python
# After your existing email ingestion
from src.database import analyze_and_update_threads
def process_emails(account_email: str):
# Your existing email fetching and ingestion code
fetch_and_ingest_emails(account_email)
# Add AI analysis
analyze_and_update_threads(
account_email=account_email,
only_unanalyzed=True
)
```
This ensures that new threads are automatically analyzed for actionability after each email sync.
-79
View File
@@ -1,79 +0,0 @@
# Fetch Folder Emails Modification Summary
## Changes Made
### 1. Database Module (`database.py`)
- Added `get_latest_email_date()` function to retrieve the most recent email date for a given account and folder
- Added import for `datetime` to support the new function
### 2. ZohoClient Module (`zoho_client.py`)
- Modified `fetch_folder_emails()` to accept two new optional parameters:
- `db_session`: Database session for querying latest email dates
- `account_email`: Account email to identify which emails to check for latest date
- Updated logic to:
1. Check database for latest email date if db_session and account_email are provided
2. Use latest date (minus 1 minute buffer) as start date for IMAP search
3. Fall back to `days_back` parameter if no database date is available
- Updated `fetch_emails()` wrapper to support the new parameters
- Enhanced documentation with detailed parameter descriptions
### 3. App Module (`app.py`)
- Modified email fetching calls to pass database session and account email
- Reorganized code to create database session before fetching emails
## How It Works
### Before
- Always fetched emails starting from X days back
- No awareness of what emails were already in the database
- Could result in fetching duplicate emails or missing recent ones
### After
- Intelligently determines start date based on database contents
- If emails exist in database: starts from the latest email date
- If no emails in database: falls back to `days_back` parameter
- Adds 1-minute buffer to avoid missing emails with same timestamp
- Reduces unnecessary email fetching and improves efficiency
## Usage Examples
### Basic Usage (Backwards Compatible)
```python
client = ZohoClient()
emails = client.fetch_folder_emails(folder="INBOX", days_back=7)
```
### With Database Integration (Recommended)
```python
from database import SessionLocal
db = SessionLocal()
client = ZohoClient()
emails = client.fetch_folder_emails(
folder="INBOX",
days_back=30, # Fallback only
db_session=db,
account_email="user@example.com"
)
db.close()
```
## Benefits
1. **Efficiency**: Only fetches new emails since last sync
2. **Reliability**: Ensures no emails are missed between syncs
3. **Backwards Compatibility**: Still works without database parameters
4. **Flexibility**: Falls back gracefully when database is empty
5. **Performance**: Reduces IMAP server load and processing time
## Testing
Use `test_fetch_with_db.py` to test the new functionality:
```bash
python test_fetch_with_db.py
```
The test script demonstrates:
- Checking latest email date in database
- Fetching emails with database integration
- Displaying results
-204
View File
@@ -1,204 +0,0 @@
#!/usr/bin/env python3
"""
Complete workflow example: Ingest emails and then analyze them with AI.
This demonstrates the full pipeline from email ingestion to AI analysis.
"""
import os
import sys
from datetime import datetime, timedelta
# Add the src directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
from database import (
Message,
SessionLocal,
Thread,
analyze_and_update_threads,
create_db_tables,
get_threads_needing_analysis,
ingest_emails,
)
def create_sample_emails(account_email: str) -> list:
"""Create some sample email data for testing."""
now = datetime.now() # Using datetime.now() instead of utcnow()
sample_emails = [
{
"messageId": "msg001",
"subject": "Meeting Request - Project Review",
"from": "colleague@company.com",
"to": account_email,
"date": now - timedelta(days=2),
"body": "Hi, could we schedule a meeting to review the project progress? I'm available this week on Tuesday or Wednesday afternoon. Please let me know what works for you.",
"folder": "INBOX",
},
{
"messageId": "msg002",
"subject": "Re: Meeting Request - Project Review",
"from": account_email,
"to": "colleague@company.com",
"date": now - timedelta(days=1, hours=8),
"body": "Sure! Wednesday afternoon works for me. How about 2 PM in the conference room?",
"folder": "Sent",
"inReplyTo": "msg001",
},
{
"messageId": "msg003",
"subject": "Re: Meeting Request - Project Review",
"from": "colleague@company.com",
"to": account_email,
"date": now - timedelta(days=1, hours=6),
"body": "Perfect! See you Wednesday at 2 PM. Should I prepare anything specific for the meeting?",
"folder": "INBOX",
"inReplyTo": "msg002",
},
{
"messageId": "msg004",
"subject": "Weekly Newsletter - Company Updates",
"from": "no-reply@company.com",
"to": account_email,
"date": now - timedelta(hours=12),
"body": "Welcome to this week's company newsletter! Here are the latest updates: New office opening, Q3 results, upcoming events...",
"folder": "INBOX",
},
{
"messageId": "msg005",
"subject": "Urgent: Server Issue in Production",
"from": "ops-team@company.com",
"to": account_email,
"date": now - timedelta(hours=2),
"body": "We're experiencing a critical server issue in production. The application is currently down. Can you please help investigate? Login credentials are attached.",
"folder": "INBOX",
},
]
return sample_emails
def main():
"""Main function demonstrating the complete workflow."""
# Create database tables if they don't exist
create_db_tables()
# Example account email
account_email = "test-user@company.com"
print(f"Starting workflow for account: {account_email}")
print("=" * 50)
# Get a database session
db = SessionLocal()
try:
# Step 1: Ingest sample emails
print("Step 1: Ingesting sample emails...")
sample_emails = create_sample_emails(account_email)
ingest_emails(db, account_email, sample_emails)
print(f"✓ Ingested {len(sample_emails)} emails")
# Show what was ingested
threads = (
db.query(Thread).filter(Thread.account_email == account_email.lower()).all()
)
print(f"✓ Created {len(threads)} threads")
for thread in threads:
messages = db.query(Message).filter(Message.thread_id == thread.id).count()
print(f" - Thread {thread.id}: '{thread.subject}' ({messages} messages)")
print()
# Step 2: Check threads needing analysis
print("Step 2: Checking threads needing analysis...")
threads_needing_analysis = get_threads_needing_analysis(db, account_email)
print(f"✓ Found {len(threads_needing_analysis)} threads needing analysis")
if not threads_needing_analysis:
print("No threads need analysis.")
return
print()
# Step 3: Run AI analysis
print("Step 3: Running AI analysis...")
print(
"This will analyze threads to determine if they're actionable and generate summaries."
)
analyze_and_update_threads(
account_email=account_email, max_concurrent=3, only_unanalyzed=True
)
print("✓ AI analysis complete!")
print()
# Step 4: Show results
print("Step 4: Analysis Results")
print("-" * 30)
# Show results
analyzed_threads = (
db.query(Thread)
.filter(
Thread.account_email == account_email.lower(),
Thread.last_analyzed_at.isnot(None),
)
.all()
)
# Refresh the threads to get the latest data
for thread in analyzed_threads:
db.refresh(thread)
actionable_count = sum(1 for t in analyzed_threads if t.actionable)
print(f"Total analyzed threads: {len(analyzed_threads)}")
print(f"Actionable threads: {actionable_count}")
print(f"Non-actionable threads: {len(analyzed_threads) - actionable_count}")
print()
for thread in analyzed_threads:
confidence = thread.ai_confidence or 0.0
print(f"Thread {thread.id}: {thread.subject}")
print(f" 📧 Actionable: {'YES' if thread.actionable else 'No'}")
print(f" 🎯 Confidence: {confidence:.2f}")
print(f" 📝 Summary: {thread.ai_summary or 'No summary available'}")
print(f" 🕐 Analyzed: {thread.last_analyzed_at}")
print()
# Step 5: Show threads requiring replies
print("Step 5: Threads Requiring Reply")
print("-" * 30)
from database import get_threads_requiring_reply
reply_threads = get_threads_requiring_reply(db, account_email)
print(f"Threads requiring reply: {len(reply_threads)}")
for thread in reply_threads:
actionable_note = (
" (AI: Actionable)" if thread.actionable else " (AI: Not actionable)"
)
print(f" - {thread.subject}{actionable_note}")
if reply_threads:
print(
"\n💡 Tip: Focus on threads marked as both 'requiring reply' and 'AI: Actionable'"
)
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
main()
-40
View File
@@ -1,40 +0,0 @@
#!/usr/bin/env python3
"""
Reset AI analysis data for testing purposes.
"""
import os
import sys
# Add the src directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
from database import SessionLocal, Thread
def main():
"""Reset analysis data for all threads."""
db = SessionLocal()
try:
# Reset analysis data
threads = db.query(Thread).all()
for thread in threads:
thread.actionable = False
thread.ai_summary = None
thread.ai_confidence = None
thread.last_analyzed_at = None
db.commit()
print(f"Reset analysis data for {len(threads)} threads")
except Exception as e:
print(f"Error: {e}")
db.rollback()
finally:
db.close()
if __name__ == "__main__":
main()
+113 -34
View File
@@ -5,6 +5,7 @@ import os
from contextlib import suppress from contextlib import suppress
from typing import List from typing import List
from dotenv import load_dotenv
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
@@ -22,7 +23,6 @@ from src.database import (
ingest_emails, ingest_emails,
) )
from src.zoho_client import ZohoClient from src.zoho_client import ZohoClient
from dotenv import load_dotenv
load_dotenv() load_dotenv()
@@ -50,15 +50,63 @@ app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
def home(request: Request, db: Session = Depends(get_db), account: str | None = None): def home(request: Request, db: Session = Depends(get_db), account: str | None = None):
q = db.query(Thread).order_by(Thread.updated_at.desc()) from datetime import datetime
from sqlalchemy import func
# Subquery to get the latest message date for each thread
latest_message_subq = (
db.query(
Message.thread_id, func.max(Message.date_sent).label("latest_message_date")
)
.group_by(Message.thread_id)
.subquery()
)
# Main query joining threads with their latest message dates
q = (
db.query(Thread, latest_message_subq.c.latest_message_date)
.outerjoin(latest_message_subq, Thread.id == latest_message_subq.c.thread_id)
.order_by(latest_message_subq.c.latest_message_date.desc().nulls_last())
)
if account: if account:
q = q.filter(Thread.account_email == account.lower()) q = q.filter(Thread.account_email == account.lower())
threads = q.limit(100).all()
results = q.limit(100).all()
# Create threads with additional info including sequential frontend ID and formatted latest message date
threads_with_info = []
for i, (thread, latest_message_date) in enumerate(results, 1):
# Format the latest message date to 12-hour format with hours and minutes only
formatted_date = None
if latest_message_date:
if isinstance(latest_message_date, str):
try:
# Try parsing if it's a string
dt = datetime.fromisoformat(
latest_message_date.replace("Z", "+00:00")
)
formatted_date = dt.strftime("%d/%m/%y %I:%M %p")
except Exception:
formatted_date = latest_message_date
elif hasattr(latest_message_date, "strftime"):
# It's already a datetime object
formatted_date = latest_message_date.strftime("%d/%m/%y %I:%M %p")
thread_info = {
"frontend_id": i,
"thread": thread,
"latest_message_date": latest_message_date,
"formatted_date": formatted_date or "N/A",
}
threads_with_info.append(thread_info)
return templates.TemplateResponse( return templates.TemplateResponse(
"threads.html", "threads.html",
{ {
"request": request, "request": request,
"threads": threads, "threads": threads_with_info,
"account": account or "", "account": account or "",
"status": _status_for_templates(), "status": _status_for_templates(),
}, },
@@ -66,36 +114,67 @@ def home(request: Request, db: Session = Depends(get_db), account: str | None =
@app.get("/thread/{thread_id}", response_class=HTMLResponse) @app.get("/thread/{thread_id}", response_class=HTMLResponse)
def show_thread(thread_id: int, request: Request, db: Session = Depends(get_db)): def show_thread(
thread_id: int,
request: Request,
db: Session = Depends(get_db),
force_analyze: bool = False,
):
thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none() thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none()
if not thread: if not thread:
raise HTTPException(status_code=404, detail="Thread not found") raise HTTPException(status_code=404, detail="Thread not found")
messages: List[Message] = get_thread_messages(db, thread_id) messages: List[Message] = get_thread_messages(db, thread_id)
# Convert for AI analyzer and template
msg_dicts = [
{
"id": m.id,
"date_sent": m.date_sent,
"subject": m.subject,
"from_email": m.from_email,
"to_email": m.to_email,
"body": m.body,
"is_incoming": m.is_incoming,
}
for m in messages
]
ai = analyze_thread(thread.subject or "", msg_dicts)
# Save AI info on the thread for listing and downstream alerts
try:
from datetime import datetime, timezone
thread.actionable = bool(ai.get("actionable", False)) # Check if we should use existing AI analysis or perform a new one
thread.ai_summary = ai.get("summary") ai = None
thread.ai_confidence = ai.get("confidence") should_analyze = force_analyze or not thread.last_analyzed_at
thread.last_analyzed_at = datetime.now(timezone.utc)
db.commit() # Check if new messages have been added since last analysis
except Exception: if not should_analyze and thread.last_analyzed_at and messages:
pass # Check if any messages are newer than the last analysis
newest_message_date = max(m.created_at for m in messages)
if newest_message_date > thread.last_analyzed_at:
should_analyze = True
# If we have existing analysis and don't need to re-analyze, use it
if not should_analyze and thread.last_analyzed_at:
ai = {
"actionable": thread.actionable,
"summary": thread.ai_summary,
"confidence": thread.ai_confidence,
"analyzed_at": thread.last_analyzed_at.isoformat()
if thread.last_analyzed_at
else None,
}
# Only analyze if we don't have existing data, force_analyze is True, or there are new messages
if should_analyze:
# Convert for AI analyzer
msg_dicts = [
{
"id": m.id,
"date_sent": m.date_sent,
"subject": m.subject,
"from_email": m.from_email,
"to_email": m.to_email,
"body": m.body,
"is_incoming": m.is_incoming,
}
for m in messages
]
ai = analyze_thread(thread.subject or "", msg_dicts)
# Save AI info on the thread for listing and downstream alerts
try:
from datetime import datetime, timezone
thread.actionable = bool(ai.get("actionable", False))
thread.ai_summary = ai.get("summary")
thread.ai_confidence = ai.get("confidence")
thread.last_analyzed_at = datetime.now(timezone.utc)
db.commit()
except Exception:
pass
return templates.TemplateResponse( return templates.TemplateResponse(
"thread_detail.html", "thread_detail.html",
{ {
@@ -332,11 +411,11 @@ def _sync_emails_background_task():
cfg = load_config() # Reload in case it was modified cfg = load_config() # Reload in case it was modified
cfg.update( cfg.update(
{ {
"sync_in_progress": False, "sync_in_progress": False,
"last_sync_status": "success", "last_sync_status": "success",
"last_sync_at": datetime.now(timezone.utc).isoformat(), "last_sync_at": datetime.now(timezone.utc).strftime("%d/%m/%y %I:%M %p"),
"last_sync_count": count, "last_sync_count": count,
"last_sync_error": None, "last_sync_error": None,
} }
) )
save_config(cfg) save_config(cfg)
+5 -1
View File
@@ -350,7 +350,7 @@ def ingest_emails(
Expected fields per email dict: subject, from, date, snippet/body, messageId, optional inReplyTo, optional to. Expected fields per email dict: subject, from, date, snippet/body, messageId, optional inReplyTo, optional to.
""" """
from datetime import datetime from datetime import datetime
folder = default_folder
for e in emails: for e in emails:
# Map common keys from ZohoClient output # Map common keys from ZohoClient output
message_id = e.get("messageId") or e.get("id") message_id = e.get("messageId") or e.get("id")
@@ -393,6 +393,10 @@ def ingest_emails(
db.commit() db.commit()
if folder == "Sent":
analyze_and_update_threads(
account_email=account_email, max_concurrent=3, only_unanalyzed=True
)
def get_latest_email_date( def get_latest_email_date(
db: Session, account_email: str, folder: str = None db: Session, account_email: str, folder: str = None
+7 -3
View File
@@ -70,7 +70,7 @@ class ZohoClient:
# Determine the start date for fetching emails # Determine the start date for fetching emails
start_date = None start_date = None
first_time = True
# If database session and account email are provided, try to get the latest email date # If database session and account email are provided, try to get the latest email date
if db_session and account_email: if db_session and account_email:
try: try:
@@ -82,11 +82,13 @@ class ZohoClient:
if latest_date: if latest_date:
# Add a small buffer (1 minute) to avoid missing emails with same timestamp # Add a small buffer (1 minute) to avoid missing emails with same timestamp
start_date = latest_date - timedelta(minutes=1) start_date = latest_date - timedelta(minutes=1)
first_time = False
print(f"📅 Using latest email date from database: {start_date}") print(f"📅 Using latest email date from database: {start_date}")
else: else:
print( print(
f"📅 No emails found in database, using days_back: {days_back}" f"📅 No emails found in database, using days_back: {days_back}"
) )
latest_date = datetime.now(timezone.utc) + timedelta(days=1)
except Exception as e: except Exception as e:
print( print(
f"⚠️ Error getting latest date from database: {e}, falling back to days_back" f"⚠️ Error getting latest date from database: {e}, falling back to days_back"
@@ -130,16 +132,18 @@ class ZohoClient:
email_message = email.message_from_bytes(raw_email) email_message = email.message_from_bytes(raw_email)
date_header = email_message.get("Date", "") date_header = email_message.get("Date", "")
email_date = parse_email_date_safely(date_header) email_date = parse_email_date_safely(date_header)
print(f"📅 Email date: {email_date} Latest: {latest_date}")
# Ensure both dates are timezone-aware for comparison # Ensure both dates are timezone-aware for comparison
if email_date and latest_date: if email_date and latest_date:
print(f"📅 Email date: {email_date} Latest: {latest_date}")
# If latest_date is timezone-naive, make it timezone-aware (assume UTC) # If latest_date is timezone-naive, make it timezone-aware (assume UTC)
if latest_date.tzinfo is None: if latest_date.tzinfo is None:
latest_date = latest_date.replace(tzinfo=timezone.utc) latest_date = latest_date.replace(tzinfo=timezone.utc)
if email_date > latest_date: if (email_date > latest_date) or first_time:
# Extract headers # Extract headers
print(f"📅 Email date: {email_date} Latest: {latest_date}")
subject = self._decode_header(email_message.get("Subject", "")) subject = self._decode_header(email_message.get("Subject", ""))
from_header = self._decode_header(email_message.get("From", "")) from_header = self._decode_header(email_message.get("From", ""))
to_header = self._decode_header(email_message.get("To", "")) to_header = self._decode_header(email_message.get("To", ""))
+6 -6
View File
@@ -29,13 +29,13 @@ header {
} }
header .inner { header .inner {
display: flex; align-items: center; justify-content: space-between; display: flex; align-items: center; justify-content: space-between;
gap: 1rem; padding: 0.75rem 1rem; max-width: 1100px; margin: 0 auto; gap: 1rem; padding: 0.75rem 1rem; max-width: 1400px; margin: 0 auto;
} }
header h1 { font-size: 1.05rem; margin: 0; letter-spacing: 0.4px; } header h1 { font-size: 1.05rem; margin: 0; letter-spacing: 0.4px; }
nav a { text-decoration: none; color: var(--muted); margin-left: 0.75rem; } nav a { text-decoration: none; color: var(--muted); margin-left: 0.75rem; }
nav a:hover { color: var(--text); } nav a:hover { color: var(--text); }
.container { max-width: 1100px; margin: 1.25rem auto; padding: 0 1rem; } .container { max-width: 1400px; margin: 1.25rem auto; padding: 0 1rem; }
h2 { margin: 0.25rem 0 0.75rem; font-size: 1.2rem; } h2 { margin: 0.25rem 0 0.75rem; font-size: 1.2rem; }
h3 { margin: 0.5rem 0 0.5rem; font-size: 1.05rem; color: var(--muted); } h3 { margin: 0.5rem 0 0.5rem; font-size: 1.05rem; color: var(--muted); }
@@ -54,10 +54,10 @@ h3 { margin: 0.5rem 0 0.5rem; font-size: 1.05rem; color: var(--muted); }
.badge.danger { color: #4b0a0a; background: #fee2e2; border-color: #fca5a5; } .badge.danger { color: #4b0a0a; background: #fee2e2; border-color: #fca5a5; }
.badge.brand { color: #0a2a62; background: #dbe8ff; border-color: #9fc0ff; } .badge.brand { color: #0a2a62; background: #dbe8ff; border-color: #9fc0ff; }
.table-wrap { overflow-x: auto; } .table-wrap { overflow-x: auto; margin: 0 -1rem; }
table { border-collapse: collapse; width: 100%; font-size: 0.95rem; } table { border-collapse: collapse; width: 100%; font-size: 0.95rem; min-width: 800px; }
th, td { border: 1px solid var(--border); padding: 10px; vertical-align: top; } th, td { border: 1px solid var(--border); padding: 12px 15px; vertical-align: top; }
th { background: #0f152a; color: var(--muted); text-align: left; position: sticky; top: 0; } th { background: #0f152a; color: var(--muted); text-align: left; position: sticky; top: 0; font-weight: 600; }
tbody tr:hover { background: #0e1426; } tbody tr:hover { background: #0e1426; }
pre, code { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; } pre, code { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; }
+19 -8
View File
@@ -12,6 +12,9 @@
<span class="badge success">Up to date</span> <span class="badge success">Up to date</span>
{% endif %} {% endif %}
</p> </p>
<div class="mt-2">
<a href="/thread/{{ thread.id }}?force_analyze=true" class="btn btn-sm">🔄 Re-analyze with AI</a>
</div>
</div> </div>
</div> </div>
@@ -19,14 +22,22 @@
<div class="col"> <div class="col">
<div class="card"> <div class="card">
<h3>AI Analysis</h3> <h3>AI Analysis</h3>
<p> {% if ai %}
<strong>Actionable:</strong> <p>
{% if ai.actionable %}<span class="badge warn">Yes</span>{% else %}<span class="badge success">No</span>{% endif %} <strong>Actionable:</strong>
</p> {% if ai.actionable %}<span class="badge warn">Yes</span>{% else %}<span class="badge success">No</span>{% endif %}
<p><strong>Summary:</strong> {{ ai.summary or thread.ai_summary }}</p> </p>
<p class="muted">Confidence: {{ ai.confidence }} • Model: {{ ai.model }}</p> <p><strong>Summary:</strong> {{ ai.summary or "No summary available" }}</p>
<p class="muted">Last stored: {{ thread.last_analyzed_at }}</p> <p class="muted">Confidence: {{ ai.confidence or "N/A" }}{% if ai.model %} • Model: {{ ai.model }}{% endif %}</p>
<p class="muted">Last alert level sent: {{ thread.last_alert_level_sent }} at {{ thread.last_alert_sent_at }}</p> {% if ai.analyzed_at %}
<p class="muted">Analysis cached from: {{ ai.analyzed_at }}</p>
{% elif thread.last_analyzed_at %}
<p class="muted">Last analyzed: {{ thread.last_analyzed_at }}</p>
{% endif %}
{% else %}
<p class="muted">No AI analysis available</p>
{% endif %}
<p class="muted">Last alert level sent: {{ thread.last_alert_level_sent }}{% if thread.last_alert_sent_at %} at {{ thread.last_alert_sent_at }}{% endif %}</p>
</div> </div>
</div> </div>
</div> </div>
+17 -17
View File
@@ -30,38 +30,38 @@
</div> </div>
</div> </div>
<div class="card table-wrap"> <div class="card table-wrap" style="width: 100%; max-width: none;">
<table> <table style="width: 100%; table-layout: auto;">
<thead> <thead>
<tr> <tr>
<th>ID</th> <th style="width: 5%;">ID</th>
<th>Subject</th> <th style="width: 25%;">Subject</th>
<th>AI Summary</th> <th style="width: 25%;">AI Summary</th>
<th>Account</th> <th style="width: 10%;">Account</th>
<th>Msgs</th> <th style="width: 5%;">Msgs</th>
<th>Requires Reply</th> <th style="width: 15%;">Requires Reply</th>
<th>Updated</th> <th style="width: 15%;">Last Message</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
{% for t in threads %} {% for t in threads %}
<tr> <tr>
<td><a href="/thread/{{ t.id }}">{{ t.id }}</a></td> <td><a href="/thread/{{ t.thread.id }}">{{ t.frontend_id }}</a></td>
<td>{{ t.subject }}</td> <td>{{ t.thread.subject }}</td>
<td class="muted">{{ t.ai_summary or '' }}</td> <td class="muted">{{ t.thread.ai_summary or '' }}</td>
<td><span class="badge">{{ t.account_email }}</span></td> <td><span class="badge">{{ t.thread.account_email }}</span></td>
<td><span class="badge brand">{{ t.messages|length }}</span></td> <td><span class="badge brand">{{ t.thread.messages|length }}</span></td>
<td> <td>
{% if t.requires_reply %} {% if t.thread.requires_reply %}
<span class="badge warn">Needs reply</span> <span class="badge warn">Needs reply</span>
{% else %} {% else %}
<span class="badge success">Up to date</span> <span class="badge success">Up to date</span>
{% endif %} {% endif %}
</td> </td>
<td class="muted">{{ t.updated_at }}</td> <td class="muted">{{ t.formatted_date }}</td>
</tr> </tr>
{% else %} {% else %}
<tr><td colspan="6">No threads yet</td></tr> <tr><td colspan="7">No threads yet</td></tr>
{% endfor %} {% endfor %}
</tbody> </tbody>
</table> </table>
-99
View File
@@ -1,99 +0,0 @@
#!/usr/bin/env python3
"""
Test script to demonstrate the AI analysis functionality.
This script shows how to use the new async thread analysis function.
"""
import os
import sys
# Add the src directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
from database import (
SessionLocal,
Thread,
analyze_and_update_threads,
create_db_tables,
get_threads_needing_analysis,
)
def main():
"""Main function to test the AI analysis."""
# Create database tables if they don't exist
create_db_tables()
# Example account email - replace with your actual account
account_email = "your-email@example.com"
# Get a database session
db = SessionLocal()
try:
# Check how many threads need analysis
threads_needing_analysis = get_threads_needing_analysis(db, account_email)
print(
f"Found {len(threads_needing_analysis)} threads needing analysis for {account_email}"
)
if not threads_needing_analysis:
print(
"No threads need analysis. Make sure you have ingested some emails first."
)
# Show all threads for this account
all_threads = (
db.query(Thread)
.filter(Thread.account_email == account_email.lower())
.all()
)
print(f"Total threads for {account_email}: {len(all_threads)}")
if all_threads:
print("Sample threads:")
for thread in all_threads[:3]:
print(
f" Thread {thread.id}: {thread.subject} (analyzed: {thread.last_analyzed_at is not None})"
)
return
print(f"Starting AI analysis for {len(threads_needing_analysis)} threads...")
# Run the analysis with max 3 concurrent tasks
analyze_and_update_threads(
account_email=account_email, max_concurrent=3, only_unanalyzed=True
)
print("Analysis complete!")
# Show results
analyzed_threads = (
db.query(Thread)
.filter(
Thread.account_email == account_email.lower(),
Thread.last_analyzed_at.isnot(None),
)
.all()
)
print(f"\nAnalyzed {len(analyzed_threads)} threads:")
for thread in analyzed_threads[:5]: # Show first 5
print(f" Thread {thread.id}: {thread.subject[:50]}...")
print(f" Actionable: {thread.actionable}")
print(f" Confidence: {thread.ai_confidence:.2f}")
print(f" Summary: {thread.ai_summary[:100]}...")
print()
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
main()
-57
View File
@@ -1,57 +0,0 @@
#!/usr/bin/env python3
"""
Test script to demonstrate the modified fetch_folder_emails function
that uses database dates for intelligent email fetching.
"""
from src.database import SessionLocal, get_latest_email_date
from src.zoho_client import ZohoClient
def test_fetch_with_database():
"""Test the modified fetch function with database integration."""
# Create database session
db = SessionLocal()
try:
# Create Zoho client
client = ZohoClient()
# Example account email (replace with actual)
account_email = client.email
print("=== Testing fetch_folder_emails with database integration ===")
# Check what's the latest date in database
latest_date = get_latest_email_date(db, account_email, "INBOX")
print(f"Latest email date in database: {latest_date}")
# Fetch emails using the new function
emails = client.fetch_folder_emails(
folder="INBOX",
max_results=10,
days_back=30, # Fallback if no database date
db_session=db,
account_email=account_email,
)
print(f"Fetched {len(emails)} emails")
# Display first few emails
for i, email in enumerate(emails[:3]):
print(f"\nEmail {i + 1}:")
print(f" Subject: {email.get('subject', 'No subject')}")
print(f" From: {email.get('from', 'Unknown')}")
print(f" Date: {email.get('date', 'Unknown')}")
client.close()
except Exception as e:
print(f"Error: {e}")
finally:
db.close()
if __name__ == "__main__":
test_fetch_with_database()
-117
View File
@@ -1,117 +0,0 @@
#!/usr/bin/env python3
"""
Test script to validate Groq API configuration and diagnose issues.
"""
import os
import sys
# Add the src directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
from ai import analyze_thread
def test_groq_api():
"""Test the Groq API with a simple request."""
print("Testing Groq API Configuration")
print("=" * 40)
# Check API key
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
print("❌ GROQ_API_KEY environment variable not set")
print(" Please set your Groq API key: export GROQ_API_KEY='your-key-here'")
return False
print(f"✓ GROQ_API_KEY found (length: {len(api_key)})")
# Check model
model = os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile")
print(f"✓ Using model: {model}")
# Test with simple data
print("\nTesting simple analysis...")
simple_messages = [
{
"date_sent": "2025-08-11 10:00:00",
"is_incoming": True,
"subject": "Test Question",
"from_email": "test@example.com",
"to_email": "user@example.com",
"body": "Can you help me with this issue? Please let me know.",
}
]
try:
result = analyze_thread("Test Question", simple_messages)
print("✓ Analysis successful!")
print(f" Model used: {result.get('model', 'unknown')}")
print(f" Actionable: {result.get('actionable', False)}")
print(f" Confidence: {result.get('confidence', 0)}")
print(f" Summary: {result.get('summary', 'No summary')[:100]}...")
if result.get("model") == "heuristic":
print("\n⚠️ Note: Fell back to heuristic analysis")
print(" This might indicate an API issue")
return True
except Exception as e:
print(f"❌ Analysis failed: {e}")
import traceback
traceback.print_exc()
return False
def test_rate_limiting():
"""Test rate limiting by making multiple quick requests."""
print("\nTesting Rate Limiting")
print("=" * 40)
import time
simple_messages = [
{
"date_sent": "2025-08-11 10:00:00",
"is_incoming": True,
"subject": "Quick test",
"from_email": "test@example.com",
"to_email": "user@example.com",
"body": "Quick test message.",
}
]
start_time = time.time()
for i in range(3):
print(f"Request {i + 1}...")
result = analyze_thread(f"Test {i + 1}", simple_messages)
print(f" Result: {result.get('model', 'unknown')} analysis")
total_time = time.time() - start_time
print(f"\nTotal time for 3 requests: {total_time:.2f} seconds")
if total_time < 2.5:
print("⚠️ Requests completed very quickly - rate limiting may not be working")
else:
print("✓ Rate limiting appears to be working")
if __name__ == "__main__":
success = test_groq_api()
if success:
test_rate_limiting()
print("\nTroubleshooting Tips:")
print("- If getting 400 errors: Check message content for special characters")
print("- If getting 401 errors: Verify GROQ_API_KEY is correct")
print(
"- If getting 429 errors: Reduce max_concurrent in analyze_and_update_threads()"
)
print("- If getting 503 errors: Groq service may be temporarily unavailable")
-94
View File
@@ -1,94 +0,0 @@
#!/usr/bin/env python3
"""
Simple test to debug the AI analysis function.
"""
import os
import sys
# Add the src directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
from ai import analyze_thread
from database import SessionLocal, Thread, get_thread_messages
def test_single_analysis():
"""Test analyzing a single thread."""
db = SessionLocal()
try:
# Get a thread to test with
thread = (
db.query(Thread)
.filter(Thread.account_email == "test-user@company.com")
.first()
)
if not thread:
print("No threads found for test-user@company.com")
return
print(f"Testing thread {thread.id}: {thread.subject}")
print(
f"Before analysis: actionable={thread.actionable}, confidence={thread.ai_confidence}"
)
# Get messages
messages = get_thread_messages(db, thread.id)
print(f"Found {len(messages)} messages")
# Convert to dict format
message_dicts = []
for msg in messages:
message_dicts.append(
{
"date_sent": msg.date_sent,
"is_incoming": msg.is_incoming,
"subject": msg.subject,
"from_email": msg.from_email,
"to_email": msg.to_email,
"body": msg.body,
}
)
# Run analysis
print("Running AI analysis...")
analysis_result = analyze_thread(thread.subject, message_dicts)
print(f"Analysis result: {analysis_result}")
# Update thread
thread.actionable = analysis_result.get("actionable", False)
thread.ai_summary = analysis_result.get("summary", "")
thread.ai_confidence = float(analysis_result.get("confidence", 0.0))
from datetime import datetime
thread.last_analyzed_at = datetime.now()
print(
f"Updated thread: actionable={thread.actionable}, confidence={thread.ai_confidence}"
)
print(f"Summary: {thread.ai_summary}")
# Commit changes
db.commit()
print("Changes committed")
# Verify the changes
db.refresh(thread)
print(
f"After refresh: actionable={thread.actionable}, confidence={thread.ai_confidence}"
)
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
db.rollback()
finally:
db.close()
if __name__ == "__main__":
test_single_analysis()