Files
2025-08-05 22:29:54 +01:00

242 lines
8.8 KiB
Python

from flask import Flask, render_template, request, jsonify, redirect, url_for, flash
import os
import json
import threading
import time
from datetime import datetime, timedelta
from email_processor import EmailProcessor
from dotenv import load_dotenv
import sqlite3
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'your-secret-key-here')
# Configuration file path
CONFIG_FILE = 'config.json'
def load_config():
"""Load configuration from JSON file"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
else:
# Default configuration
default_config = {
'email_address': 'projects@manaknightdigital.com',
'zoho_email': '', # Will be set by user through frontend
'zoho_app_password': '', # Will be set by user through frontend
'time_frames': [
{'name': '1-24 hours', 'hours': 24, 'alert_level': 1},
{'name': '24-48 hours', 'hours': 48, 'alert_level': 2},
{'name': '48+ hours', 'hours': 72, 'alert_level': 3}
],
'email_days_back': 7,
'agency_domains': ['projects@manaknightdigital.com'],
'auto_process': False,
'auto_process_interval': 30 # minutes
}
save_config(default_config)
return default_config
def save_config(config):
"""Save configuration to JSON file"""
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
def auto_process_emails():
"""Background function to automatically process emails"""
while True:
try:
config = load_config()
if config.get('auto_process', False):
print(f"\n🔄 Auto-processing emails (interval: {config['auto_process_interval']} minutes)")
processor = EmailProcessor(agency_domains=config['agency_domains'])
result = processor.process_emails(
max_results=None,
send_alerts=True,
days_back=config['email_days_back'],
time_frames=config['time_frames']
)
if result.get('status') == 'success':
print(f"✅ Auto-processing complete: {result.get('actionable_emails', 0)} actionable emails")
else:
print(f"❌ Auto-processing failed: {result.get('error', 'Unknown error')}")
else:
print("⏸️ Auto-processing disabled")
# Sleep for the configured interval
interval_minutes = config.get('auto_process_interval', 30)
time.sleep(interval_minutes * 60)
except Exception as e:
print(f"❌ Auto-processing error: {e}")
time.sleep(60) # Wait 1 minute before retrying
@app.route('/')
def index():
"""Main dashboard page"""
config = load_config()
return render_template('index.html', config=config)
@app.route('/settings')
def settings():
"""Settings page"""
config = load_config()
return render_template('settings.html', config=config)
@app.route('/update_settings', methods=['POST'])
def update_settings():
"""Update system settings"""
try:
config = load_config()
# Update email address
config['email_address'] = request.form.get('email_address', config['email_address'])
# Update Zoho credentials
config['zoho_email'] = request.form.get('zoho_email', config.get('zoho_email', ''))
config['zoho_app_password'] = request.form.get('zoho_app_password', config.get('zoho_app_password', ''))
# Update email days back
email_days_back = request.form.get('email_days_back', '7')
config['email_days_back'] = int(email_days_back) if email_days_back.strip() else 7
# Update agency domains
agency_domains = request.form.get('agency_domains', '').split(',')
config['agency_domains'] = [domain.strip() for domain in agency_domains if domain.strip()]
# Update time frames
time_frames = []
frame_names = request.form.getlist('frame_name[]')
frame_hours = request.form.getlist('frame_hours[]')
frame_levels = request.form.getlist('frame_level[]')
for i in range(len(frame_names)):
if frame_names[i] and frame_hours[i] and frame_levels[i]:
try:
time_frames.append({
'name': frame_names[i],
'hours': int(frame_hours[i]),
'alert_level': int(frame_levels[i])
})
except ValueError:
# Skip invalid time frames
continue
# Sort time frames by hours
time_frames.sort(key=lambda x: x['hours'])
config['time_frames'] = time_frames
# Update auto processing settings
config['auto_process'] = request.form.get('auto_process') == 'on'
auto_process_interval = request.form.get('auto_process_interval', '30')
config['auto_process_interval'] = int(auto_process_interval) if auto_process_interval.strip() else 30
save_config(config)
flash('Settings updated successfully!', 'success')
except Exception as e:
flash(f'Error updating settings: {str(e)}', 'error')
return redirect(url_for('settings'))
@app.route('/process_emails', methods=['POST'])
def process_emails():
"""Process emails and send alerts"""
try:
config = load_config()
# Initialize processor with current settings
processor = EmailProcessor(agency_domains=config['agency_domains'])
# Process emails with configurable settings
result = processor.process_emails(
max_results=None,
send_alerts=True,
days_back=config['email_days_back'],
time_frames=config['time_frames']
)
if result.get('status') == 'success':
return jsonify({
'status': 'success',
'message': 'Email processing completed successfully',
'data': {
'total_emails': result.get('total_emails', 0),
'actionable_emails': result.get('actionable_emails', 0),
'sent_alerts': len(result.get('sent_alerts', []))
}
})
else:
return jsonify({
'status': 'error',
'message': result.get('error', 'Unknown error occurred')
})
except Exception as e:
return jsonify({
'status': 'error',
'message': f'System error: {str(e)}'
})
@app.route('/get_threads')
def get_threads():
"""Get current threads that need alerts"""
try:
config = load_config()
processor = EmailProcessor(agency_domains=config['agency_domains'])
alert_threads = processor.tracker.get_threads_needing_alerts(config['time_frames'])
threads_data = []
for thread in alert_threads:
threads_data.append({
'thread_id': thread.thread_id,
'subject': thread.subject,
'last_message': thread.last_external_message.strftime('%Y-%m-%d %H:%M'),
'alert_level': thread.alert_level,
'hours_since': int((datetime.now() - thread.last_external_message).total_seconds() / 3600)
})
return jsonify({
'status': 'success',
'threads': threads_data
})
except Exception as e:
return jsonify({
'status': 'error',
'message': f'Error fetching threads: {str(e)}'
})
@app.route('/test_connection')
def test_connection():
"""Test email connection"""
try:
config = load_config()
processor = EmailProcessor(agency_domains=config['agency_domains'])
# Test connection by fetching a small number of emails
emails = processor.zoho_client.fetch_emails(max_results=3, days_back=config['email_days_back'])
return jsonify({
'status': 'success',
'message': f'Connection successful! Found {len(emails)} emails in the last {config["email_days_back"]} days.',
'email_count': len(emails)
})
except Exception as e:
return jsonify({
'status': 'error',
'message': f'Connection failed: {str(e)}'
})
if __name__ == '__main__':
# Start auto-processing thread
auto_thread = threading.Thread(target=auto_process_emails, daemon=True)
auto_thread.start()
print("🔄 Auto-processing thread started")
app.run(debug=True, host='0.0.0.0', port=5000)