diff --git a/ai_analyzer.py b/ai_analyzer.py index 6713575..21c0cba 100644 --- a/ai_analyzer.py +++ b/ai_analyzer.py @@ -18,29 +18,21 @@ class AIAnalyzer: self.api_key = os.getenv("GROQ_API_KEY") self.model = "llama3-8b-8192" - # Check if API key is available - if not self.api_key or self.api_key == "your_groq_api_key_here": - print("⚠️ GROQ_API_KEY not configured. AI analysis will be disabled.") - self.client = None - return + if not self.api_key: + raise ValueError("GROQ_API_KEY is required. Please add it to your .env file") try: from groq import Groq self.client = Groq(api_key=self.api_key) print("✅ Groq AI client initialized successfully") except Exception as e: - print(f"⚠️ Failed to initialize Groq client: {e}") - self.client = None + raise RuntimeError(f"Failed to initialize Groq client: {e}") def analyze_thread_context(self, thread_messages: List[Dict[str, Any]]) -> EmailSummary: """Analyze email thread context and generate summary""" if not thread_messages: return EmailSummary("No messages", "low", "none", 0.0, False) - # If AI client is not available, use basic analysis - if not self.client: - return self._basic_analysis(thread_messages) - # Prepare context for analysis context = self._prepare_thread_context(thread_messages) @@ -142,43 +134,6 @@ class AIAnalyzer: return EmailSummary(summary, urgency, action, confidence, needs_response) - def _basic_analysis(self, thread_messages: List[Dict[str, Any]]) -> EmailSummary: - """Basic email analysis when AI is not available""" - if not thread_messages: - return EmailSummary("No messages", "low", "none", 0.0, False) - - # Get the latest email - latest_email = thread_messages[-1] - from_email = latest_email.get('from', '').lower() - subject = latest_email.get('subject', '').lower() - snippet = latest_email.get('snippet', '').lower() - - # Basic filtering logic - non_actionable_keywords = [ - 'newsletter', 'promotion', 'unsubscribe', 'confirm your email', - 'password reset', 'no-reply', 'noreply', 'marketing', 'advertisement' - ] - - # Check if email is from own domain - if 'projects@manaknightdigital.com' in from_email: - return EmailSummary("Own email - no action needed", "low", "none", 0.9, False) - - # Check for non-actionable keywords - text = f"{from_email} {subject} {snippet}".lower() - for keyword in non_actionable_keywords: - if keyword in text: - return EmailSummary(f"Automated email detected ({keyword})", "low", "none", 0.8, False) - - # Check for actionable indicators - actionable_indicators = ['?', 'can you', 'could you', 'please', 'help', 'urgent', 'asap'] - action_score = sum(1 for indicator in actionable_indicators if indicator in text) - - if action_score > 0: - return EmailSummary("Email requires response", "medium", "Reply needed", 0.7, True) - - # Default to no action needed - return EmailSummary("Standard email - review if needed", "low", "Review manually", 0.5, False) - def generate_alert_message(self, thread_id: str, summary: EmailSummary, alert_level: int, email_data: Dict[str, Any] = None) -> str: """Generate formatted alert message for WhatsApp""" alert_levels = {