Files

169 lines
7.3 KiB
Python
Raw Permalink Normal View History

2025-07-25 11:31:36 +01:00
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import email.utils
import re
@dataclass
class ThreadState:
thread_id: str
subject: str
last_external_message: datetime
last_agency_reply: Optional[datetime]
alert_level: int # 0=no alert, 1=24h, 2=48h, 3=72h
is_active: bool
class ThreadTracker:
def __init__(self, db_path: str = "email_threads.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize database tables"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS threads (
thread_id TEXT PRIMARY KEY,
subject TEXT,
last_external_message TEXT,
last_agency_reply TEXT,
alert_level INTEGER DEFAULT 0,
is_active BOOLEAN DEFAULT 1
)
""")
conn.commit()
def _parse_email_date(self, date_str: str) -> datetime:
"""Parse email date string to datetime object"""
try:
# Try parsing as ISO format first
return datetime.fromisoformat(date_str)
except ValueError:
try:
# Try parsing RFC 2822 format (Gmail standard)
parsed_date = email.utils.parsedate_to_datetime(date_str)
# Convert to naive datetime to avoid timezone issues
return parsed_date.replace(tzinfo=None)
except (ValueError, TypeError):
try:
# Try parsing common Gmail date formats
# Remove timezone info and parse
clean_date = re.sub(r'\s*[+-]\d{4}\s*$', '', date_str)
return datetime.strptime(clean_date, '%a, %d %b %Y %H:%M:%S')
except ValueError:
# Fallback to current time
print(f"Warning: Could not parse date '{date_str}', using current time")
return datetime.now()
def update_thread(self, thread_id: str, email: Dict[str, Any], agency_domains: List[str] = None):
"""Update thread state with new email"""
if agency_domains is None:
agency_domains = ['iyeoluwaakinrinola03@gmail.com'] # Default agency domain
from_email = email.get('from', '').lower()
is_agency_reply = any(domain.lower() in from_email for domain in agency_domains)
message_date = self._parse_email_date(email.get('date', datetime.now().isoformat()))
subject = email.get('subject', 'No Subject')
with sqlite3.connect(self.db_path) as conn:
# Get current thread state
cursor = conn.execute(
"SELECT * FROM threads WHERE thread_id = ?", (thread_id,)
)
row = cursor.fetchone()
if row:
# Update existing thread
if is_agency_reply:
conn.execute("""
UPDATE threads
SET last_agency_reply = ?, alert_level = 0, is_active = 0
WHERE thread_id = ?
""", (message_date.isoformat(), thread_id))
else:
conn.execute("""
UPDATE threads
SET last_external_message = ?, subject = ?, is_active = 1
WHERE thread_id = ?
""", (message_date.isoformat(), subject, thread_id))
else:
# Create new thread
if not is_agency_reply:
conn.execute("""
INSERT INTO threads (thread_id, subject, last_external_message, is_active)
VALUES (?, ?, ?, 1)
""", (thread_id, subject, message_date.isoformat()))
def get_threads_needing_alerts(self, time_frames: List[Dict] = None) -> List[ThreadState]:
"""Get threads that need alerts based on timing"""
now = datetime.now()
alert_threads = []
# Default time frames if none provided
if time_frames is None:
time_frames = [
{'hours': 24, 'alert_level': 1},
{'hours': 48, 'alert_level': 2},
{'hours': 72, 'alert_level': 3}
]
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT thread_id, subject, last_external_message, last_agency_reply, alert_level
FROM threads
WHERE is_active = 1 AND last_agency_reply IS NULL
""")
for row in cursor.fetchall():
thread_id, subject, last_external, last_agency, alert_level = row
last_external_dt = datetime.fromisoformat(last_external)
# Calculate hours since last external message
hours_since = (now - last_external_dt).total_seconds() / 3600
# Determine appropriate alert level based on configurable time frames
appropriate_alert_level = 0
for frame in time_frames:
if hours_since >= frame['hours']:
appropriate_alert_level = frame['alert_level']
# Send alert if thread meets timing criteria (regardless of current alert_level)
if appropriate_alert_level > 0:
# Update alert level to the appropriate level
if appropriate_alert_level > alert_level:
conn.execute(
"UPDATE threads SET alert_level = ? WHERE thread_id = ?",
(appropriate_alert_level, thread_id)
)
alert_threads.append(ThreadState(
thread_id=thread_id,
subject=subject or 'No Subject',
last_external_message=last_external_dt,
last_agency_reply=datetime.fromisoformat(last_agency) if last_agency else None,
alert_level=appropriate_alert_level,
is_active=True
))
return alert_threads
def check_thread_reply_status(self, thread_id: str, email_client, agency_domains: List[str] = None) -> bool:
"""Check if the last message in a thread is from the agency (indicating a reply)"""
if agency_domains is None:
agency_domains = ['projects@manaknightdigital.com']
try:
# For IMAP, we can't easily get all thread messages, so we'll use a simpler approach
# We'll check if the current email is from the agency
# This is a simplified approach for IMAP
return False # Let AI determine if response is needed
except Exception as e:
print(f"Error checking thread reply status: {e}")
return False
def get_thread_history(self, thread_id: str) -> List[Dict[str, Any]]:
"""Get message history for a thread (placeholder for future implementation)"""
# This would integrate with Gmail API to get full thread history
return []