diff --git a/.gitignore b/.gitignore index 697f158..16cd419 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ config.json *__pycache__/ *.db -*.txt \ No newline at end of file +*.txt +*.log \ No newline at end of file diff --git a/AI_ANALYSIS_GUIDE.md b/AI_ANALYSIS_GUIDE.md new file mode 100644 index 0000000..a7f8577 --- /dev/null +++ b/AI_ANALYSIS_GUIDE.md @@ -0,0 +1,164 @@ +# AI Thread Analysis with Asyncio + +This document explains how to use the new async AI analysis functionality for email threads. + +## Overview + +The new functionality adds AI-powered analysis to email threads, determining if they require attention (are "actionable") and generating concise summaries. It uses asyncio to process multiple threads concurrently for better performance. + +## Key Functions + +### `analyze_and_update_threads()` + +This is the main function you'll use to analyze threads. + +```python +from src.database import analyze_and_update_threads + +# Analyze all unanalyzed threads for an account +analyze_and_update_threads( + account_email="user@company.com", + max_concurrent=5, + only_unanalyzed=True +) + +# Analyze specific threads +analyze_and_update_threads( + account_email="user@company.com", + thread_ids=[1, 2, 3], + max_concurrent=3 +) +``` + +**Parameters:** +- `account_email`: The email account to process +- `thread_ids`: Optional list of specific thread IDs to analyze +- `max_concurrent`: Maximum number of concurrent AI analysis tasks (default: 5) +- `only_unanalyzed`: If True, only analyze threads that haven't been analyzed yet (default: True) + +### `get_threads_needing_analysis()` + +Check which threads need analysis: + +```python +from src.database import get_threads_needing_analysis, SessionLocal + +db = SessionLocal() +threads = get_threads_needing_analysis(db, "user@company.com") +print(f"Found {len(threads)} threads needing analysis") +db.close() +``` + +## Database Schema Updates + +The function updates the following Thread model fields: + +- `actionable`: Boolean indicating if the thread requires action +- `ai_summary`: Text summary of the thread content +- `ai_confidence`: Float (0.0-1.0) confidence score +- `last_analyzed_at`: Timestamp of when analysis was performed + +## Complete Workflow Example + +Here's a complete workflow from email ingestion to AI analysis: + +```python +from src.database import ( + SessionLocal, + ingest_emails, + analyze_and_update_threads, + get_threads_requiring_reply +) + +# 1. Ingest emails (using your existing email fetching logic) +db = SessionLocal() +try: + # Assuming you have fetched emails from your email provider + emails = [...] # Your email data + ingest_emails(db, "user@company.com", emails) + + # 2. Run AI analysis on new threads + analyze_and_update_threads( + account_email="user@company.com", + max_concurrent=5, + only_unanalyzed=True + ) + + # 3. Get threads that need replies and are actionable + reply_threads = get_threads_requiring_reply(db, "user@company.com") + actionable_threads = [t for t in reply_threads if t.actionable] + + print(f"Found {len(actionable_threads)} actionable threads requiring replies") + +finally: + db.close() +``` + +## AI Analysis Details + +The AI analysis: + +- Uses the Groq API if `GROQ_API_KEY` environment variable is set +- Falls back to heuristic analysis if Groq is unavailable +- Analyzes the last 4 messages in each thread by default +- Generates summaries of ≤80 words +- Identifies questions, requests, and actionable items +- Ignores automated/newsletter emails + +## Performance + +- Uses asyncio for concurrent processing +- Configurable concurrency limit (default: 5 concurrent analyses) +- AI analysis runs in thread pool to avoid blocking +- Efficient database operations with single commit per batch + +## Error Handling + +- Gracefully handles individual thread analysis failures +- Continues processing other threads if one fails +- Provides detailed error logging +- Automatically rolls back database changes on failure + +## Usage Tips + +1. **Start with small batches**: Use `max_concurrent=3` initially to avoid overwhelming the AI service +2. **Regular analysis**: Run analysis after each email ingestion cycle +3. **Focus on actionable threads**: Prioritize threads that are both `requires_reply=True` and `actionable=True` +4. **Monitor confidence scores**: Lower confidence may indicate uncertain analysis +5. **Environment setup**: Set `GROQ_API_KEY` for better AI analysis quality + +## Testing + +Use the provided test scripts: + +```bash +# Test the complete workflow +python3 example_workflow.py + +# Test single thread analysis +python3 test_single_analysis.py + +# Reset analysis data for testing +python3 reset_analysis.py +``` + +## Integration with Existing Code + +To integrate with your existing email processing: + +```python +# After your existing email ingestion +from src.database import analyze_and_update_threads + +def process_emails(account_email: str): + # Your existing email fetching and ingestion code + fetch_and_ingest_emails(account_email) + + # Add AI analysis + analyze_and_update_threads( + account_email=account_email, + only_unanalyzed=True + ) +``` + +This ensures that new threads are automatically analyzed for actionability after each email sync. diff --git a/FETCH_MODIFICATION_SUMMARY.md b/FETCH_MODIFICATION_SUMMARY.md new file mode 100644 index 0000000..b7b3a4a --- /dev/null +++ b/FETCH_MODIFICATION_SUMMARY.md @@ -0,0 +1,79 @@ +# Fetch Folder Emails Modification Summary + +## Changes Made + +### 1. Database Module (`database.py`) +- Added `get_latest_email_date()` function to retrieve the most recent email date for a given account and folder +- Added import for `datetime` to support the new function + +### 2. ZohoClient Module (`zoho_client.py`) +- Modified `fetch_folder_emails()` to accept two new optional parameters: + - `db_session`: Database session for querying latest email dates + - `account_email`: Account email to identify which emails to check for latest date +- Updated logic to: + 1. Check database for latest email date if db_session and account_email are provided + 2. Use latest date (minus 1 minute buffer) as start date for IMAP search + 3. Fall back to `days_back` parameter if no database date is available +- Updated `fetch_emails()` wrapper to support the new parameters +- Enhanced documentation with detailed parameter descriptions + +### 3. App Module (`app.py`) +- Modified email fetching calls to pass database session and account email +- Reorganized code to create database session before fetching emails + +## How It Works + +### Before +- Always fetched emails starting from X days back +- No awareness of what emails were already in the database +- Could result in fetching duplicate emails or missing recent ones + +### After +- Intelligently determines start date based on database contents +- If emails exist in database: starts from the latest email date +- If no emails in database: falls back to `days_back` parameter +- Adds 1-minute buffer to avoid missing emails with same timestamp +- Reduces unnecessary email fetching and improves efficiency + +## Usage Examples + +### Basic Usage (Backwards Compatible) +```python +client = ZohoClient() +emails = client.fetch_folder_emails(folder="INBOX", days_back=7) +``` + +### With Database Integration (Recommended) +```python +from database import SessionLocal + +db = SessionLocal() +client = ZohoClient() +emails = client.fetch_folder_emails( + folder="INBOX", + days_back=30, # Fallback only + db_session=db, + account_email="user@example.com" +) +db.close() +``` + +## Benefits + +1. **Efficiency**: Only fetches new emails since last sync +2. **Reliability**: Ensures no emails are missed between syncs +3. **Backwards Compatibility**: Still works without database parameters +4. **Flexibility**: Falls back gracefully when database is empty +5. **Performance**: Reduces IMAP server load and processing time + +## Testing + +Use `test_fetch_with_db.py` to test the new functionality: +```bash +python test_fetch_with_db.py +``` + +The test script demonstrates: +- Checking latest email date in database +- Fetching emails with database integration +- Displaying results diff --git a/app.py b/app.py deleted file mode 100644 index 4ca02b1..0000000 --- a/app.py +++ /dev/null @@ -1,411 +0,0 @@ -import asyncio -import json -import os -from contextlib import suppress -from typing import List - -from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request -from fastapi.responses import HTMLResponse, RedirectResponse -from fastapi.staticfiles import StaticFiles -from fastapi.templating import Jinja2Templates -from sqlalchemy.orm import Session - -from ai import analyze_thread -from alerts import process_alerts -from database import ( - Message, - SessionLocal, - Thread, - create_db_tables, - get_thread_messages, - ingest_emails, -) -from zoho_client import ZohoClient - - -def get_db(): - db = SessionLocal() - try: - yield db - finally: - db.close() - - -create_db_tables() -app = FastAPI(title="Email Alerts UI") - -# Static and templates -os.makedirs("templates", exist_ok=True) -os.makedirs("static", exist_ok=True) -templates = Jinja2Templates(directory="templates") -app.mount("/static", StaticFiles(directory="static"), name="static") - - -@app.get("/", response_class=HTMLResponse) -def home(request: Request, db: Session = Depends(get_db), account: str | None = None): - q = db.query(Thread).order_by(Thread.updated_at.desc()) - if account: - q = q.filter(Thread.account_email == account.lower()) - threads = q.limit(100).all() - return templates.TemplateResponse( - "threads.html", - { - "request": request, - "threads": threads, - "account": account or "", - "status": _status_for_templates(), - }, - ) - - -@app.get("/thread/{thread_id}", response_class=HTMLResponse) -def show_thread(thread_id: int, request: Request, db: Session = Depends(get_db)): - thread = db.query(Thread).filter(Thread.id == thread_id).one_or_none() - if not thread: - raise HTTPException(status_code=404, detail="Thread not found") - messages: List[Message] = get_thread_messages(db, thread_id) - # Convert for AI analyzer and template - msg_dicts = [ - { - "id": m.id, - "date_sent": m.date_sent, - "subject": m.subject, - "from_email": m.from_email, - "to_email": m.to_email, - "body": m.body, - "is_incoming": m.is_incoming, - } - for m in messages - ] - ai = analyze_thread(thread.subject or "", msg_dicts) - # Save AI info on the thread for listing and downstream alerts - try: - from datetime import datetime, timezone - - thread.actionable = bool(ai.get("actionable", False)) - thread.ai_summary = ai.get("summary") - thread.ai_confidence = ai.get("confidence") - thread.last_analyzed_at = datetime.now(timezone.utc) - db.commit() - except Exception: - pass - return templates.TemplateResponse( - "thread_detail.html", - { - "request": request, - "thread": thread, - "messages": messages, - "ai": ai, - "status": _status_for_templates(), - }, - ) - - -@app.get("/health") -def health(): - return {"status": "ok"} - - -# --------------------- -# Config editor routes -# --------------------- - -CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.json") - - -def load_config() -> dict: - if not os.path.exists(CONFIG_PATH): - return { - "email_address": "", - "time_frames": [ - {"name": "1-24 hours", "hours": 24, "alert_level": 1}, - {"name": "24-48 hours", "hours": 48, "alert_level": 2}, - {"name": "48+ hours", "hours": 72, "alert_level": 3}, - ], - "email_days_back": 7, - "agency_domains": [], - "zoho_email": "", - "zoho_app_password": "", - "whatsapp_to": "", - "auto_process": False, - "auto_process_interval": 30, - # Sync status - "sync_in_progress": False, - "last_sync_at": None, - "last_sync_count": 0, - "last_sync_status": "idle", - "last_sync_error": None, - } - with open(CONFIG_PATH, "r", encoding="utf-8") as f: - return json.load(f) - - -def save_config(cfg: dict) -> None: - with open(CONFIG_PATH, "w", encoding="utf-8") as f: - json.dump(cfg, f, indent=2, ensure_ascii=False) - - -def _status_for_templates() -> dict: - cfg = load_config() - return { - "auto_process": bool(cfg.get("auto_process")), - "interval": int(cfg.get("auto_process_interval", 30) or 30), - "sync_in_progress": bool(cfg.get("sync_in_progress")), - "last_sync_at": cfg.get("last_sync_at"), - "last_sync_status": cfg.get("last_sync_status", "idle"), - "last_sync_count": int(cfg.get("last_sync_count") or 0), - "last_sync_error": cfg.get("last_sync_error"), - } - - -@app.get("/config", response_class=HTMLResponse) -def config_form(request: Request, saved: int | None = None): - cfg = load_config() - # Render up to existing frames or at least 3 rows - frames = cfg.get("time_frames") or [] - min_rows = max(len(frames), 3) - return templates.TemplateResponse( - "config.html", - { - "request": request, - "cfg": cfg, - "rows": list(range(min_rows)), - "saved": bool(saved), - "status": _status_for_templates(), - }, - ) - - -@app.post("/config") -async def config_save(request: Request): - form = await request.form() - cfg = load_config() - - # Basic fields - cfg["email_address"] = (form.get("email_address") or "").strip() - # email_days_back - try: - cfg["email_days_back"] = int( - form.get("email_days_back") or cfg.get("email_days_back", 7) - ) - except Exception: - cfg["email_days_back"] = 7 - # agency domains (comma or newline separated) - domains_raw = (form.get("agency_domains") or "").replace("\r", "") - parts = [p.strip() for p in domains_raw.replace(",", "\n").split("\n") if p.strip()] - cfg["agency_domains"] = parts - # auto_process - cfg["auto_process"] = form.get("auto_process") == "on" - # auto_process_interval - try: - cfg["auto_process_interval"] = int( - form.get("auto_process_interval") or cfg.get("auto_process_interval", 30) - ) - except Exception: - cfg["auto_process_interval"] = 30 - - # Zoho (optional - note: current client reads env vars) - cfg["zoho_email"] = (form.get("zoho_email") or cfg.get("zoho_email", "")).strip() - cfg["zoho_app_password"] = ( - form.get("zoho_app_password") or cfg.get("zoho_app_password", "") - ).strip() - # WhatsApp destination - cfg["whatsapp_to"] = (form.get("whatsapp_to") or cfg.get("whatsapp_to", "")).strip() - - # Time frames: collect indexed rows - frames: list[dict] = [] - # find indices present - indices = set() - for k in form.keys(): - if k.startswith("time_name_"): - try: - indices.add(int(k.split("_")[-1])) - except Exception: - pass - for i in sorted(indices): - name = (form.get(f"time_name_{i}") or "").strip() - hrs_raw = form.get(f"time_hours_{i}") or "" - lvl_raw = form.get(f"time_alert_{i}") or "" - if not name and not hrs_raw and not lvl_raw: - continue - try: - hours = int(hrs_raw) - except Exception: - hours = 0 - try: - level = int(lvl_raw) - except Exception: - level = 0 - if name: - frames.append({"name": name, "hours": hours, "alert_level": level}) - if frames: - cfg["time_frames"] = frames - - save_config(cfg) - return RedirectResponse(url="/config?saved=1", status_code=303) - - -@app.post("/process") -def process(db: Session = Depends(get_db)): - cfg = load_config() - alerted = process_alerts(db, cfg) - return {"alerted_threads": alerted} - - -def _sync_emails_once(cfg: dict) -> int: - """Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count.""" - from datetime import datetime, timezone - - # Mark status - cfg["sync_in_progress"] = True - cfg["last_sync_status"] = "running" - cfg["last_sync_error"] = None - save_config(cfg) - - account_email = cfg.get("email_address") or cfg.get("zoho_email") or "" - if not account_email: - cfg.update( - { - "last_sync_status": "error", - "last_sync_error": "Configure email_address or zoho_email in /config", - "sync_in_progress": False, - } - ) - save_config(cfg) - raise HTTPException( - status_code=400, detail="Configure email_address or zoho_email in /config" - ) - - # Incremental lookback by last_sync_at - days_back = int(cfg.get("email_days_back", 7) or 7) - last_sync_at = cfg.get("last_sync_at") - if last_sync_at: - try: - last_dt = datetime.fromisoformat(last_sync_at) - now = datetime.now(timezone.utc) - delta_days = int(((now - last_dt).total_seconds() + 86399) // 86400) - days_back = max(1, delta_days) - except Exception: - pass - max_results = 100 - client = ZohoClient( - email=cfg.get("zoho_email") or account_email, - app_password=cfg.get("zoho_app_password"), - ) - try: - inbox = client.fetch_folder_emails( - folder="INBOX", max_results=max_results, days_back=days_back - ) - sent = client.fetch_folder_emails( - folder="Sent", max_results=max_results, days_back=days_back - ) - finally: - client.close() - - db = SessionLocal() - try: - ingest_emails( - db, account_email=account_email, emails=inbox, default_folder="INBOX" - ) - ingest_emails( - db, account_email=account_email, emails=sent, default_folder="Sent" - ) - # Update sync completion status and metrics - cfg.update( - { - "last_sync_at": datetime.now(timezone.utc).isoformat(), - "last_sync_status": "ok", - "last_sync_error": None, - "last_sync_count": int( - (len(inbox) if inbox else 0) + (len(sent) if sent else 0) - ), - "sync_in_progress": False, - } - ) - save_config(cfg) - return count - except Exception as e: - # Update error status - cfg.update( - { - "last_sync_status": "error", - "last_sync_error": str(e), - "sync_in_progress": False, - } - ) - save_config(cfg) - raise - finally: - db.close() - - -@app.post("/sync_emails") -def sync_emails(background_tasks: BackgroundTasks): - cfg = load_config() - if cfg.get("sync_in_progress"): - return RedirectResponse(url="/?sync=busy", status_code=303) - - def _background_sync(): - try: - _sync_emails_once(load_config()) - except Exception: - # Error handling is done inside _sync_emails_once - pass - - # Mark as starting and add background task - cfg["sync_in_progress"] = True - cfg["last_sync_status"] = "running" - cfg["last_sync_error"] = None - save_config(cfg) - - background_tasks.add_task(_background_sync) - return RedirectResponse(url="/?sync=started", status_code=303) - - -# --------------------- -# Auto-processing loop -# --------------------- -_auto_task = None -_stop_event = asyncio.Event() - - -async def _auto_runner(): - # Delay a bit on startup - await asyncio.sleep(2) - while not _stop_event.is_set(): - cfg = load_config() - interval_min = int(cfg.get("auto_process_interval", 30) or 30) - if cfg.get("auto_process"): - # Sync emails then run alerts - try: - if not cfg.get("sync_in_progress"): - _sync_emails_once(cfg) - except Exception: - # keep loop alive - pass - db = SessionLocal() - try: - process_alerts(db, cfg) - finally: - db.close() - try: - await asyncio.wait_for( - _stop_event.wait(), timeout=max(5, interval_min * 60) - ) - except asyncio.TimeoutError: - continue - - -@app.on_event("startup") -async def on_startup(): - global _auto_task - _stop_event.clear() - _auto_task = asyncio.create_task(_auto_runner()) - - -@app.on_event("shutdown") -async def on_shutdown(): - if _auto_task: - _stop_event.set() - with suppress(Exception): - await _auto_task diff --git a/example_workflow.py b/example_workflow.py new file mode 100644 index 0000000..a280bef --- /dev/null +++ b/example_workflow.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +""" +Complete workflow example: Ingest emails and then analyze them with AI. +This demonstrates the full pipeline from email ingestion to AI analysis. +""" + +import os +import sys +from datetime import datetime, timedelta + +# Add the src directory to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) + +from database import ( + Message, + SessionLocal, + Thread, + analyze_and_update_threads, + create_db_tables, + get_threads_needing_analysis, + ingest_emails, +) + + +def create_sample_emails(account_email: str) -> list: + """Create some sample email data for testing.""" + now = datetime.now() # Using datetime.now() instead of utcnow() + + sample_emails = [ + { + "messageId": "msg001", + "subject": "Meeting Request - Project Review", + "from": "colleague@company.com", + "to": account_email, + "date": now - timedelta(days=2), + "body": "Hi, could we schedule a meeting to review the project progress? I'm available this week on Tuesday or Wednesday afternoon. Please let me know what works for you.", + "folder": "INBOX", + }, + { + "messageId": "msg002", + "subject": "Re: Meeting Request - Project Review", + "from": account_email, + "to": "colleague@company.com", + "date": now - timedelta(days=1, hours=8), + "body": "Sure! Wednesday afternoon works for me. How about 2 PM in the conference room?", + "folder": "Sent", + "inReplyTo": "msg001", + }, + { + "messageId": "msg003", + "subject": "Re: Meeting Request - Project Review", + "from": "colleague@company.com", + "to": account_email, + "date": now - timedelta(days=1, hours=6), + "body": "Perfect! See you Wednesday at 2 PM. Should I prepare anything specific for the meeting?", + "folder": "INBOX", + "inReplyTo": "msg002", + }, + { + "messageId": "msg004", + "subject": "Weekly Newsletter - Company Updates", + "from": "no-reply@company.com", + "to": account_email, + "date": now - timedelta(hours=12), + "body": "Welcome to this week's company newsletter! Here are the latest updates: New office opening, Q3 results, upcoming events...", + "folder": "INBOX", + }, + { + "messageId": "msg005", + "subject": "Urgent: Server Issue in Production", + "from": "ops-team@company.com", + "to": account_email, + "date": now - timedelta(hours=2), + "body": "We're experiencing a critical server issue in production. The application is currently down. Can you please help investigate? Login credentials are attached.", + "folder": "INBOX", + }, + ] + + return sample_emails + + +def main(): + """Main function demonstrating the complete workflow.""" + + # Create database tables if they don't exist + create_db_tables() + + # Example account email + account_email = "test-user@company.com" + + print(f"Starting workflow for account: {account_email}") + print("=" * 50) + + # Get a database session + db = SessionLocal() + try: + # Step 1: Ingest sample emails + print("Step 1: Ingesting sample emails...") + sample_emails = create_sample_emails(account_email) + + ingest_emails(db, account_email, sample_emails) + print(f"✓ Ingested {len(sample_emails)} emails") + + # Show what was ingested + threads = ( + db.query(Thread).filter(Thread.account_email == account_email.lower()).all() + ) + print(f"✓ Created {len(threads)} threads") + + for thread in threads: + messages = db.query(Message).filter(Message.thread_id == thread.id).count() + print(f" - Thread {thread.id}: '{thread.subject}' ({messages} messages)") + + print() + + # Step 2: Check threads needing analysis + print("Step 2: Checking threads needing analysis...") + threads_needing_analysis = get_threads_needing_analysis(db, account_email) + print(f"✓ Found {len(threads_needing_analysis)} threads needing analysis") + + if not threads_needing_analysis: + print("No threads need analysis.") + return + + print() + + # Step 3: Run AI analysis + print("Step 3: Running AI analysis...") + print( + "This will analyze threads to determine if they're actionable and generate summaries." + ) + + analyze_and_update_threads( + account_email=account_email, max_concurrent=3, only_unanalyzed=True + ) + + print("✓ AI analysis complete!") + print() + + # Step 4: Show results + print("Step 4: Analysis Results") + print("-" * 30) + + # Show results + analyzed_threads = ( + db.query(Thread) + .filter( + Thread.account_email == account_email.lower(), + Thread.last_analyzed_at.isnot(None), + ) + .all() + ) + + # Refresh the threads to get the latest data + for thread in analyzed_threads: + db.refresh(thread) + + actionable_count = sum(1 for t in analyzed_threads if t.actionable) + + print(f"Total analyzed threads: {len(analyzed_threads)}") + print(f"Actionable threads: {actionable_count}") + print(f"Non-actionable threads: {len(analyzed_threads) - actionable_count}") + print() + + for thread in analyzed_threads: + confidence = thread.ai_confidence or 0.0 + print(f"Thread {thread.id}: {thread.subject}") + print(f" 📧 Actionable: {'YES' if thread.actionable else 'No'}") + print(f" 🎯 Confidence: {confidence:.2f}") + print(f" 📝 Summary: {thread.ai_summary or 'No summary available'}") + print(f" 🕐 Analyzed: {thread.last_analyzed_at}") + print() + + # Step 5: Show threads requiring replies + print("Step 5: Threads Requiring Reply") + print("-" * 30) + + from database import get_threads_requiring_reply + + reply_threads = get_threads_requiring_reply(db, account_email) + + print(f"Threads requiring reply: {len(reply_threads)}") + for thread in reply_threads: + actionable_note = ( + " (AI: Actionable)" if thread.actionable else " (AI: Not actionable)" + ) + print(f" - {thread.subject}{actionable_note}") + + if reply_threads: + print( + "\n💡 Tip: Focus on threads marked as both 'requiring reply' and 'AI: Actionable'" + ) + + except Exception as e: + print(f"❌ Error: {e}") + import traceback + + traceback.print_exc() + finally: + db.close() + + +if __name__ == "__main__": + main() diff --git a/reset_analysis.py b/reset_analysis.py new file mode 100644 index 0000000..fe03cdd --- /dev/null +++ b/reset_analysis.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +""" +Reset AI analysis data for testing purposes. +""" + +import os +import sys + +# Add the src directory to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) + +from database import SessionLocal, Thread + + +def main(): + """Reset analysis data for all threads.""" + + db = SessionLocal() + try: + # Reset analysis data + threads = db.query(Thread).all() + + for thread in threads: + thread.actionable = False + thread.ai_summary = None + thread.ai_confidence = None + thread.last_analyzed_at = None + + db.commit() + print(f"Reset analysis data for {len(threads)} threads") + + except Exception as e: + print(f"Error: {e}") + db.rollback() + finally: + db.close() + + +if __name__ == "__main__": + main() diff --git a/src/ai.py b/src/ai.py index 288e8d5..d8012aa 100644 --- a/src/ai.py +++ b/src/ai.py @@ -1,9 +1,27 @@ import json import os +import time from typing import Dict, List from groq import Groq +# Rate limiting variables +_last_api_call = 0 +_min_interval = 1.0 # Minimum seconds between API calls + + +def _rate_limit(): + """Simple rate limiting to avoid overwhelming the Groq API.""" + global _last_api_call + current_time = time.time() + time_since_last = current_time - _last_api_call + + if time_since_last < _min_interval: + sleep_time = _min_interval - time_since_last + time.sleep(sleep_time) + + _last_api_call = time.time() + def _format_messages_for_context(messages: List[dict]) -> str: lines = [] @@ -76,8 +94,40 @@ def analyze_thread( ) try: + # Validate input before sending to API + if not msgs: + return _heuristic_analyze(msgs) + + # Check message content length to avoid oversized requests + formatted_context = _format_messages_for_context(msgs) + if len(formatted_context) > 10000: # Limit context size + # Truncate messages if too long + truncated_msgs = msgs[-2:] # Use only last 2 messages + formatted_context = _format_messages_for_context(truncated_msgs) + print( + f"Warning: Truncated message context due to length ({len(formatted_context)} chars)" + ) + + user_prompt = ( + f"Thread subject: {thread_subject or ''}\n\n" + "Recent messages (oldest to newest):\n\n" + f"{formatted_context}\n\n" + "Respond with only JSON, no extra commentary." + ) + + # Validate prompt length + total_prompt_length = len(system_prompt) + len(user_prompt) + if total_prompt_length > 15000: # Further reduce if still too long + print( + f"Warning: Prompt too long ({total_prompt_length} chars), falling back to heuristic" + ) + return _heuristic_analyze(msgs) + + # Apply rate limiting before API call + _rate_limit() + completion = client.chat.completions.create( - model=os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile"), + model=os.getenv("GROQ_MODEL", "llama3-8b-8192"), messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, @@ -96,6 +146,23 @@ def analyze_thread( ): raise ValueError("Invalid schema from model") return data - except Exception: + except json.JSONDecodeError as e: + print(f"JSON decode error from Groq API: {e}") + return _heuristic_analyze(msgs) + except Exception as e: + # Log the specific error for debugging + error_msg = str(e) + print(f"Groq API error: {error_msg}") + + # Check for specific error types + if "400" in error_msg or "Bad Request" in error_msg: + print("400 Bad Request - likely prompt too long or invalid format") + elif "429" in error_msg or "rate limit" in error_msg.lower(): + print("Rate limit exceeded - consider reducing concurrent requests") + elif "401" in error_msg or "unauthorized" in error_msg.lower(): + print("Unauthorized - check GROQ_API_KEY") + elif "503" in error_msg or "service unavailable" in error_msg.lower(): + print("Service unavailable - Groq API may be down") + # Fallback to heuristic return _heuristic_analyze(msgs) diff --git a/src/app.py b/src/app.py index 7db9685..a549211 100644 --- a/src/app.py +++ b/src/app.py @@ -1,10 +1,11 @@ import asyncio import json +import logging import os from contextlib import suppress from typing import List -from fastapi import Depends, FastAPI, HTTPException, Request +from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates @@ -21,6 +22,9 @@ from src.database import ( ingest_emails, ) from src.zoho_client import ZohoClient +from dotenv import load_dotenv + +load_dotenv() def get_db(): @@ -31,6 +35,9 @@ def get_db(): db.close() +logging.basicConfig(level=logging.INFO, filename="email_alerts.log") + + create_db_tables() app = FastAPI(title="Email Alerts UI") @@ -256,25 +263,9 @@ def _sync_emails_once(cfg: dict) -> int: """Fetch INBOX and Sent from Zoho and ingest into DB. Returns threads requiring reply count.""" from datetime import datetime, timezone - # Mark status - cfg["sync_in_progress"] = True - cfg["last_sync_status"] = "running" - cfg["last_sync_error"] = None - save_config(cfg) - account_email = cfg.get("email_address") or cfg.get("zoho_email") or "" if not account_email: - cfg.update( - { - "last_sync_status": "error", - "last_sync_error": "Configure email_address or zoho_email in /config", - "sync_in_progress": False, - } - ) - save_config(cfg) - raise HTTPException( - status_code=400, detail="Configure email_address or zoho_email in /config" - ) + raise ValueError("Configure email_address or zoho_email in /config") # Incremental lookback by last_sync_at days_back = int(cfg.get("email_days_back", 7) or 7) @@ -292,17 +283,26 @@ def _sync_emails_once(cfg: dict) -> int: email=cfg.get("zoho_email") or account_email, app_password=cfg.get("zoho_app_password"), ) + + db = SessionLocal() try: inbox = client.fetch_folder_emails( - folder="INBOX", max_results=max_results, days_back=days_back + folder="INBOX", + max_results=max_results, + days_back=days_back, + db_session=db, + account_email=account_email, ) sent = client.fetch_folder_emails( - folder="Sent", max_results=max_results, days_back=days_back + folder="Sent", + max_results=max_results, + days_back=days_back, + db_session=db, + account_email=account_email, ) finally: client.close() - db = SessionLocal() try: ingest_emails( db, account_email=account_email, emails=inbox, default_folder="INBOX" @@ -321,23 +321,54 @@ def _sync_emails_once(cfg: dict) -> int: db.close() +def _sync_emails_background_task(): + """Background task to sync emails and update config status.""" + from datetime import datetime, timezone + + cfg = load_config() + try: + count = _sync_emails_once(cfg) + # Update success status + cfg = load_config() # Reload in case it was modified + cfg.update( + { + "sync_in_progress": False, + "last_sync_status": "success", + "last_sync_at": datetime.now(timezone.utc).isoformat(), + "last_sync_count": count, + "last_sync_error": None, + } + ) + save_config(cfg) + except Exception as e: + # Update error status + cfg = load_config() # Reload in case it was modified + cfg.update( + { + "sync_in_progress": False, + "last_sync_status": "error", + "last_sync_error": str(e), + } + ) + save_config(cfg) + logging.error(f"Email sync failed: {e}") + + @app.post("/sync_emails") -async def sync_emails(): +async def sync_emails(background_tasks: BackgroundTasks): cfg = load_config() if cfg.get("sync_in_progress"): return RedirectResponse(url="/?sync=busy", status_code=303) - async def _task(): - try: - _sync_emails_once(load_config()) - except Exception: - pass - + # Mark sync as starting cfg["sync_in_progress"] = True cfg["last_sync_status"] = "running" cfg["last_sync_error"] = None save_config(cfg) - asyncio.create_task(_task()) + + # Add the background task + background_tasks.add_task(_sync_emails_background_task) + return RedirectResponse(url="/?sync=started", status_code=303) @@ -358,7 +389,40 @@ async def _auto_runner(): # Sync emails then run alerts try: if not cfg.get("sync_in_progress"): - _sync_emails_once(cfg) + from datetime import datetime, timezone + + # Mark sync as starting + cfg["sync_in_progress"] = True + cfg["last_sync_status"] = "running" + cfg["last_sync_error"] = None + save_config(cfg) + + try: + count = _sync_emails_once(cfg) + # Update success status + cfg = load_config() # Reload in case it was modified + cfg.update( + { + "sync_in_progress": False, + "last_sync_status": "success", + "last_sync_at": datetime.now(timezone.utc).isoformat(), + "last_sync_count": count, + "last_sync_error": None, + } + ) + save_config(cfg) + except Exception as e: + # Update error status + cfg = load_config() # Reload in case it was modified + cfg.update( + { + "sync_in_progress": False, + "last_sync_status": "error", + "last_sync_error": str(e), + } + ) + save_config(cfg) + logging.error(f"Auto email sync failed: {e}") except Exception: # keep loop alive pass @@ -377,6 +441,23 @@ async def _auto_runner(): @app.on_event("startup") async def on_startup(): + # Reset any stale sync status from previous session + try: + cfg = load_config() + if cfg.get("sync_in_progress"): + cfg.update( + { + "sync_in_progress": False, + "last_sync_status": "interrupted", + "last_sync_error": "Previous session ended while sync was in progress", + } + ) + save_config(cfg) + logging.info("Reset stale sync status from previous session") + except Exception as e: + logging.error(f"Failed to reset stale sync status on startup: {e}") + + # Start the auto-processing task global _auto_task _stop_event.clear() _auto_task = asyncio.create_task(_auto_runner()) @@ -384,6 +465,23 @@ async def on_startup(): @app.on_event("shutdown") async def on_shutdown(): + # Reset sync status if it's currently in progress + try: + cfg = load_config() + if cfg.get("sync_in_progress"): + cfg.update( + { + "sync_in_progress": False, + "last_sync_status": "interrupted", + "last_sync_error": "Server shutdown while sync was in progress", + } + ) + save_config(cfg) + logging.info("Reset sync status due to server shutdown") + except Exception as e: + logging.error(f"Failed to reset sync status on shutdown: {e}") + + # Stop the auto-processing task if _auto_task: _stop_event.set() with suppress(Exception): diff --git a/src/database.py b/src/database.py index 8060aba..515b34e 100644 --- a/src/database.py +++ b/src/database.py @@ -1,3 +1,5 @@ +import asyncio +from datetime import datetime from email.utils import parseaddr from typing import Annotated, Iterable, List, Optional, Tuple @@ -390,3 +392,178 @@ def ingest_emails( ) db.commit() + + +def get_latest_email_date( + db: Session, account_email: str, folder: str = None +) -> Optional[datetime]: + """ + Get the latest email date for a specific account and folder (if specified). + Returns None if no emails exist. + """ + query = ( + db.query(func.max(Message.date_sent)) + .join(Thread, Message.thread_id == Thread.id) + .filter(Thread.account_email == account_email.lower()) + ) + + if folder: + query = query.filter(Message.folder == folder) + + result = query.scalar() + return result + + +async def analyze_and_update_threads_async( + db: Session, + account_email: str, + thread_ids: Optional[List[int]] = None, + max_concurrent: int = 5, + only_unanalyzed: bool = True, +) -> None: + """ + Asynchronously analyze threads and update their AI summaries and actionable status. + + Args: + db: Database session to use + account_email: The email account to process threads for + thread_ids: Optional list of specific thread IDs to analyze. If None, analyzes all threads. + max_concurrent: Maximum number of concurrent AI analysis tasks + only_unanalyzed: If True, only analyze threads that haven't been analyzed yet + """ + try: + from ai import analyze_thread + except ImportError: + # Try absolute import if relative import fails + import os + import sys + + current_dir = os.path.dirname(os.path.abspath(__file__)) + sys.path.insert(0, current_dir) + from ai import analyze_thread + + # Get threads to analyze + query = db.query(Thread).filter(Thread.account_email == account_email.lower()) + + if thread_ids: + query = query.filter(Thread.id.in_(thread_ids)) + + if only_unanalyzed: + query = query.filter(Thread.last_analyzed_at.is_(None)) + + threads = query.all() + + if not threads: + print(f"No threads to analyze for {account_email}") + return + + print(f"Found {len(threads)} threads to analyze for {account_email}") + + # Create semaphore to limit concurrent tasks + semaphore = asyncio.Semaphore(max_concurrent) + + async def analyze_single_thread(thread: Thread) -> None: + async with semaphore: + try: + # Get messages for this thread + messages = get_thread_messages(db, thread.id) + + if not messages: + print(f"No messages found for thread {thread.id}") + return + + # Convert messages to dict format for AI analysis + message_dicts = [] + for msg in messages: + message_dicts.append( + { + "date_sent": msg.date_sent, + "is_incoming": msg.is_incoming, + "subject": msg.subject, + "from_email": msg.from_email, + "to_email": msg.to_email, + "body": msg.body, + } + ) + + # Run AI analysis in a thread pool to avoid blocking + loop = asyncio.get_event_loop() + analysis_result = await loop.run_in_executor( + None, analyze_thread, thread.subject, message_dicts + ) + + # Update thread with analysis results + thread.actionable = analysis_result.get("actionable", False) + thread.ai_summary = analysis_result.get("summary", "") + thread.ai_confidence = float(analysis_result.get("confidence", 0.0)) + thread.last_analyzed_at = ( + datetime.now() + ) # Using datetime.now() instead of utcnow() + + print( + f"Analyzed thread {thread.id}: actionable={thread.actionable}, confidence={thread.ai_confidence:.2f}" + ) + + except Exception as e: + print(f"Error analyzing thread {thread.id}: {e}") + + # Create tasks for all threads + tasks = [analyze_single_thread(thread) for thread in threads] + + # Run all tasks concurrently + await asyncio.gather(*tasks, return_exceptions=True) + + print(f"Successfully analyzed {len(threads)} threads") + + +def analyze_and_update_threads( + account_email: str, + thread_ids: Optional[List[int]] = None, + max_concurrent: int = 3, # Reduced default to avoid overwhelming API + only_unanalyzed: bool = True, +) -> None: + """ + Synchronous wrapper for the async thread analysis function. + + Args: + account_email: The email account to process threads for + thread_ids: Optional list of specific thread IDs to analyze. If None, analyzes all threads. + max_concurrent: Maximum number of concurrent AI analysis tasks (default: 3, reduced to avoid API limits) + only_unanalyzed: If True, only analyze threads that haven't been analyzed yet + """ + db = SessionLocal() + try: + asyncio.run( + analyze_and_update_threads_async( + db=db, + account_email=account_email, + thread_ids=thread_ids, + max_concurrent=max_concurrent, + only_unanalyzed=only_unanalyzed, + ) + ) + + # Commit all changes + db.commit() + + except Exception as e: + db.rollback() + print(f"Error in analyze_and_update_threads: {e}") + raise + finally: + db.close() + + +def get_threads_needing_analysis(db: Session, account_email: str) -> List[Thread]: + """ + Get threads that haven't been analyzed yet or need re-analysis. + """ + return ( + db.query(Thread) + .filter( + Thread.account_email == account_email.lower(), + Thread.last_analyzed_at.is_(None), + ) + .order_by(Thread.updated_at.desc()) + .all() + ) diff --git a/src/zoho_client.py b/src/zoho_client.py index 72b3475..f93aca6 100644 --- a/src/zoho_client.py +++ b/src/zoho_client.py @@ -1,8 +1,10 @@ import email import imaplib +import logging import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from email.header import decode_header +from email.utils import parsedate_to_datetime from typing import Any, Dict, List from dotenv import load_dotenv @@ -41,17 +43,63 @@ class ZohoClient: query: str = None, max_results: int = None, days_back: int = 7, + db_session=None, + account_email: str = None, ) -> List[Dict[str, Any]]: - """Fetch emails from a given folder with date filtering""" + """Fetch emails from a given folder with intelligent date filtering + + Args: + folder: Folder to fetch from (default: INBOX) + query: Additional search query + max_results: Maximum number of emails to fetch + days_back: Fallback days to search if no database date available (default: 7) + db_session: Database session to check for latest email date + account_email: Account email to query latest date for + + Returns: + List of email dictionaries + + If db_session and account_email are provided, will fetch emails starting from + the latest email date in the database. Otherwise, falls back to days_back. + """ try: # Select folder mailbox = '"Sent"' if folder.lower() == "sent" else folder print(f"📥 Selecting {folder}...[STEP 2]") self.connection.select(mailbox) - # Build search criteria - only emails from specified days back - days_ago = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y") - search_criteria = f"SINCE {days_ago}" + # Determine the start date for fetching emails + start_date = None + + # If database session and account email are provided, try to get the latest email date + if db_session and account_email: + try: + from .database import get_latest_email_date + + latest_date = get_latest_email_date( + db_session, account_email, folder + ) + if latest_date: + # Add a small buffer (1 minute) to avoid missing emails with same timestamp + start_date = latest_date - timedelta(minutes=1) + print(f"📅 Using latest email date from database: {start_date}") + else: + print( + f"📅 No emails found in database, using days_back: {days_back}" + ) + except Exception as e: + print( + f"⚠️ Error getting latest date from database: {e}, falling back to days_back" + ) + + # Fall back to days_back if no database date available + if start_date is None: + start_date = datetime.now() - timedelta(days=days_back) + print(f"📅 Using fallback date: {start_date}") + + # Build search criteria - only emails from the calculated start date + search_date = start_date.strftime("%d-%b-%Y") + search_criteria = f"SINCE {search_date}" if query: search_criteria += f" {query}" @@ -73,42 +121,53 @@ class ZohoClient: for i, num in enumerate(email_list): try: print(f"📧 Fetching email {num.decode()}... [STEP 3] {i}") + logging.info(f"Fetching email {num.decode()}... [STEP 3] {i}") # Fetch email data status, data = self.connection.fetch(num, "(RFC822)") if status == "OK": raw_email = data[0][1] email_message = email.message_from_bytes(raw_email) - - # Extract headers - subject = self._decode_header(email_message.get("Subject", "")) - from_header = self._decode_header(email_message.get("From", "")) - to_header = self._decode_header(email_message.get("To", "")) date_header = email_message.get("Date", "") - message_id = email_message.get("Message-ID", "") - in_reply_to = email_message.get("In-Reply-To", "") + email_date = parse_email_date_safely(date_header) + print(f"📅 Email date: {email_date} Latest: {latest_date}") + + # Ensure both dates are timezone-aware for comparison + if email_date and latest_date: + # If latest_date is timezone-naive, make it timezone-aware (assume UTC) + if latest_date.tzinfo is None: + latest_date = latest_date.replace(tzinfo=timezone.utc) + + if email_date > latest_date: + # Extract headers + subject = self._decode_header(email_message.get("Subject", "")) + from_header = self._decode_header(email_message.get("From", "")) + to_header = self._decode_header(email_message.get("To", "")) - # Generate thread ID (using Message-ID as fallback) - thread_id = message_id or f"thread_{num.decode()}" + message_id = email_message.get("Message-ID", "") + in_reply_to = email_message.get("In-Reply-To", "") - # Get email body snippet - body = self._get_email_body(email_message) - snippet = body[:200] + "..." if len(body) > 200 else body + # Generate thread ID (using Message-ID as fallback) + thread_id = message_id or f"thread_{num.decode()}" - email_data = { - "id": num.decode(), - "threadId": thread_id, - "from": from_header, - "to": to_header, - "subject": subject, - "date": date_header, - "messageId": message_id, - "inReplyTo": in_reply_to, - "folder": folder, - "snippet": snippet, - } + # Get email body snippet + body = self._get_email_body(email_message) + snippet = body[:200] + "..." if len(body) > 200 else body - emails.append(email_data) + email_data = { + "id": num.decode(), + "threadId": thread_id, + "from": from_header, + "to": to_header, + "subject": subject, + "date": date_header, + "messageId": message_id, + "inReplyTo": in_reply_to, + "folder": folder, + "snippet": snippet, + } + + emails.append(email_data) except Exception as e: print(f"❌ Error processing email {num}: {e}") @@ -124,11 +183,21 @@ class ZohoClient: return [] def fetch_emails( - self, query: str = None, max_results: int = None, days_back: int = 7 + self, + query: str = None, + max_results: int = None, + days_back: int = 7, + db_session=None, + account_email: str = None, ) -> List[Dict[str, Any]]: """Fetch emails from INBOX (backwards-compatible wrapper).""" return self.fetch_folder_emails( - folder="INBOX", query=query, max_results=max_results, days_back=days_back + folder="INBOX", + query=query, + max_results=max_results, + days_back=days_back, + db_session=db_session, + account_email=account_email, ) def _decode_header(self, header_value: str) -> str: @@ -273,6 +342,33 @@ class ZohoClient: print(f"Error closing connection: {e}") +def parse_email_date_safely(date_string): + """ + Safely parse email date string ensuring timezone awareness. + + Args: + date_string: Date string from email header + + Returns: + Timezone-aware datetime object or None if parsing fails + """ + if not date_string: + return None + + try: + parsed_date = parsedate_to_datetime(date_string) + + # If the parsed date is timezone-naive, assume UTC + if parsed_date.tzinfo is None: + parsed_date = parsed_date.replace(tzinfo=timezone.utc) + + return parsed_date + except Exception as e: + logging.warning(f"Could not parse date '{date_string}': {e}") + # Return current time as fallback + return datetime.now(timezone.utc) + + if __name__ == "__main__": client = ZohoClient() emails = client.fetch_emails(max_results=10) diff --git a/test_ai_analysis.py b/test_ai_analysis.py new file mode 100644 index 0000000..fb41ff7 --- /dev/null +++ b/test_ai_analysis.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +""" +Test script to demonstrate the AI analysis functionality. +This script shows how to use the new async thread analysis function. +""" + +import os +import sys + +# Add the src directory to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) + +from database import ( + SessionLocal, + Thread, + analyze_and_update_threads, + create_db_tables, + get_threads_needing_analysis, +) + + +def main(): + """Main function to test the AI analysis.""" + + # Create database tables if they don't exist + create_db_tables() + + # Example account email - replace with your actual account + account_email = "your-email@example.com" + + # Get a database session + db = SessionLocal() + try: + # Check how many threads need analysis + threads_needing_analysis = get_threads_needing_analysis(db, account_email) + print( + f"Found {len(threads_needing_analysis)} threads needing analysis for {account_email}" + ) + + if not threads_needing_analysis: + print( + "No threads need analysis. Make sure you have ingested some emails first." + ) + + # Show all threads for this account + all_threads = ( + db.query(Thread) + .filter(Thread.account_email == account_email.lower()) + .all() + ) + print(f"Total threads for {account_email}: {len(all_threads)}") + + if all_threads: + print("Sample threads:") + for thread in all_threads[:3]: + print( + f" Thread {thread.id}: {thread.subject} (analyzed: {thread.last_analyzed_at is not None})" + ) + + return + + print(f"Starting AI analysis for {len(threads_needing_analysis)} threads...") + + # Run the analysis with max 3 concurrent tasks + analyze_and_update_threads( + account_email=account_email, max_concurrent=3, only_unanalyzed=True + ) + + print("Analysis complete!") + + # Show results + analyzed_threads = ( + db.query(Thread) + .filter( + Thread.account_email == account_email.lower(), + Thread.last_analyzed_at.isnot(None), + ) + .all() + ) + + print(f"\nAnalyzed {len(analyzed_threads)} threads:") + for thread in analyzed_threads[:5]: # Show first 5 + print(f" Thread {thread.id}: {thread.subject[:50]}...") + print(f" Actionable: {thread.actionable}") + print(f" Confidence: {thread.ai_confidence:.2f}") + print(f" Summary: {thread.ai_summary[:100]}...") + print() + + except Exception as e: + print(f"Error: {e}") + import traceback + + traceback.print_exc() + finally: + db.close() + + +if __name__ == "__main__": + main() diff --git a/test_fetch_with_db.py b/test_fetch_with_db.py new file mode 100644 index 0000000..3c88705 --- /dev/null +++ b/test_fetch_with_db.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +""" +Test script to demonstrate the modified fetch_folder_emails function +that uses database dates for intelligent email fetching. +""" + +from src.database import SessionLocal, get_latest_email_date +from src.zoho_client import ZohoClient + + +def test_fetch_with_database(): + """Test the modified fetch function with database integration.""" + + # Create database session + db = SessionLocal() + + try: + # Create Zoho client + client = ZohoClient() + + # Example account email (replace with actual) + account_email = client.email + + print("=== Testing fetch_folder_emails with database integration ===") + + # Check what's the latest date in database + latest_date = get_latest_email_date(db, account_email, "INBOX") + print(f"Latest email date in database: {latest_date}") + + # Fetch emails using the new function + emails = client.fetch_folder_emails( + folder="INBOX", + max_results=10, + days_back=30, # Fallback if no database date + db_session=db, + account_email=account_email, + ) + + print(f"Fetched {len(emails)} emails") + + # Display first few emails + for i, email in enumerate(emails[:3]): + print(f"\nEmail {i + 1}:") + print(f" Subject: {email.get('subject', 'No subject')}") + print(f" From: {email.get('from', 'Unknown')}") + print(f" Date: {email.get('date', 'Unknown')}") + + client.close() + + except Exception as e: + print(f"Error: {e}") + finally: + db.close() + + +if __name__ == "__main__": + test_fetch_with_database() diff --git a/test_groq_api.py b/test_groq_api.py new file mode 100644 index 0000000..6c2b8bc --- /dev/null +++ b/test_groq_api.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +""" +Test script to validate Groq API configuration and diagnose issues. +""" + +import os +import sys + +# Add the src directory to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) + +from ai import analyze_thread + + +def test_groq_api(): + """Test the Groq API with a simple request.""" + + print("Testing Groq API Configuration") + print("=" * 40) + + # Check API key + api_key = os.getenv("GROQ_API_KEY") + if not api_key: + print("❌ GROQ_API_KEY environment variable not set") + print(" Please set your Groq API key: export GROQ_API_KEY='your-key-here'") + return False + + print(f"✓ GROQ_API_KEY found (length: {len(api_key)})") + + # Check model + model = os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile") + print(f"✓ Using model: {model}") + + # Test with simple data + print("\nTesting simple analysis...") + + simple_messages = [ + { + "date_sent": "2025-08-11 10:00:00", + "is_incoming": True, + "subject": "Test Question", + "from_email": "test@example.com", + "to_email": "user@example.com", + "body": "Can you help me with this issue? Please let me know.", + } + ] + + try: + result = analyze_thread("Test Question", simple_messages) + print("✓ Analysis successful!") + print(f" Model used: {result.get('model', 'unknown')}") + print(f" Actionable: {result.get('actionable', False)}") + print(f" Confidence: {result.get('confidence', 0)}") + print(f" Summary: {result.get('summary', 'No summary')[:100]}...") + + if result.get("model") == "heuristic": + print("\n⚠️ Note: Fell back to heuristic analysis") + print(" This might indicate an API issue") + + return True + + except Exception as e: + print(f"❌ Analysis failed: {e}") + import traceback + + traceback.print_exc() + return False + + +def test_rate_limiting(): + """Test rate limiting by making multiple quick requests.""" + + print("\nTesting Rate Limiting") + print("=" * 40) + + import time + + simple_messages = [ + { + "date_sent": "2025-08-11 10:00:00", + "is_incoming": True, + "subject": "Quick test", + "from_email": "test@example.com", + "to_email": "user@example.com", + "body": "Quick test message.", + } + ] + + start_time = time.time() + + for i in range(3): + print(f"Request {i + 1}...") + result = analyze_thread(f"Test {i + 1}", simple_messages) + print(f" Result: {result.get('model', 'unknown')} analysis") + + total_time = time.time() - start_time + print(f"\nTotal time for 3 requests: {total_time:.2f} seconds") + + if total_time < 2.5: + print("⚠️ Requests completed very quickly - rate limiting may not be working") + else: + print("✓ Rate limiting appears to be working") + + +if __name__ == "__main__": + success = test_groq_api() + + if success: + test_rate_limiting() + + print("\nTroubleshooting Tips:") + print("- If getting 400 errors: Check message content for special characters") + print("- If getting 401 errors: Verify GROQ_API_KEY is correct") + print( + "- If getting 429 errors: Reduce max_concurrent in analyze_and_update_threads()" + ) + print("- If getting 503 errors: Groq service may be temporarily unavailable") diff --git a/test_single_analysis.py b/test_single_analysis.py new file mode 100644 index 0000000..587adda --- /dev/null +++ b/test_single_analysis.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +""" +Simple test to debug the AI analysis function. +""" + +import os +import sys + +# Add the src directory to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) + +from ai import analyze_thread +from database import SessionLocal, Thread, get_thread_messages + + +def test_single_analysis(): + """Test analyzing a single thread.""" + + db = SessionLocal() + try: + # Get a thread to test with + thread = ( + db.query(Thread) + .filter(Thread.account_email == "test-user@company.com") + .first() + ) + + if not thread: + print("No threads found for test-user@company.com") + return + + print(f"Testing thread {thread.id}: {thread.subject}") + print( + f"Before analysis: actionable={thread.actionable}, confidence={thread.ai_confidence}" + ) + + # Get messages + messages = get_thread_messages(db, thread.id) + print(f"Found {len(messages)} messages") + + # Convert to dict format + message_dicts = [] + for msg in messages: + message_dicts.append( + { + "date_sent": msg.date_sent, + "is_incoming": msg.is_incoming, + "subject": msg.subject, + "from_email": msg.from_email, + "to_email": msg.to_email, + "body": msg.body, + } + ) + + # Run analysis + print("Running AI analysis...") + analysis_result = analyze_thread(thread.subject, message_dicts) + print(f"Analysis result: {analysis_result}") + + # Update thread + thread.actionable = analysis_result.get("actionable", False) + thread.ai_summary = analysis_result.get("summary", "") + thread.ai_confidence = float(analysis_result.get("confidence", 0.0)) + from datetime import datetime + + thread.last_analyzed_at = datetime.now() + + print( + f"Updated thread: actionable={thread.actionable}, confidence={thread.ai_confidence}" + ) + print(f"Summary: {thread.ai_summary}") + + # Commit changes + db.commit() + print("Changes committed") + + # Verify the changes + db.refresh(thread) + print( + f"After refresh: actionable={thread.actionable}, confidence={thread.ai_confidence}" + ) + + except Exception as e: + print(f"Error: {e}") + import traceback + + traceback.print_exc() + db.rollback() + finally: + db.close() + + +if __name__ == "__main__": + test_single_analysis()