diff --git a/backend/news_fetcher.py b/backend/news_fetcher.py index f41812e..e8b4d09 100644 --- a/backend/news_fetcher.py +++ b/backend/news_fetcher.py @@ -1,230 +1,216 @@ -"""AI Analysis module for DS Task AI News using Groq LLM""" -import os -from typing import Dict, List, Any, Optional + +"""RSS News Fetcher for DS Task AI News""" +import feedparser +import requests import json +import os from datetime import datetime - -try: - from groq import Groq - GROQ_AVAILABLE = True -except ImportError: - GROQ_AVAILABLE = False - print("⚠️ Groq not available - install with: pip install groq") - +from typing import List, Dict, Any +from urllib.parse import urlparse +import hashlib from config import settings +from recommender import NewsRecommender # Add this import +from ai_analyzer import AIAnalyzer # Add this import -class AIAnalyzer: - """AI-powered article analysis using Groq LLM""" - +class NewsFetcher: def __init__(self): - self.client = None - self.model = "llama3-8b-8192" # Fast Groq model - self.available = False - - if GROQ_AVAILABLE and settings.groq_api_key: - try: - self.client = Groq(api_key=settings.groq_api_key) - self.available = True - print("✅ Groq AI Analyzer initialized successfully") - except Exception as e: - print(f"❌ Groq initialization failed: {e}") - else: - print("⚠️ Groq AI Analyzer not available (missing API key or library)") + self.raw_news_dir = settings.raw_news_dir + self.max_articles = settings.max_articles_per_feed + self.recommender = NewsRecommender() # Add recommender for embedding/vector access + self.ai_analyzer = AIAnalyzer() # Add AIAnalyzer for LLM duplicate check + # Ensure directories exist + os.makedirs(self.raw_news_dir, exist_ok=True) - def _make_groq_request(self, prompt: str, max_tokens: int = 500) -> Optional[str]: - """Make a request to Groq API""" - if not self.available: - return None - + def generate_article_id(self, title: str, url: str) -> str: + """Generate unique ID for article""" + content = f"{title}{url}" + return hashlib.md5(content.encode()).hexdigest()[:12] + + def clean_content(self, content: str) -> str: + """Clean and truncate content""" + if not content: + return "" + + # Remove HTML tags (basic cleaning) + import re + content = re.sub(r'<[^>]+>', '', content) + + # Truncate to reasonable length + return content[:1000] if len(content) > 1000 else content + + def is_duplicate_by_llm(self, article: Dict[str, Any], existing_article: Dict[str, Any]) -> bool: + """Use LLM to check if two articles are about the same event or story""" + if not self.ai_analyzer.available: + return False # LLM not available, skip this check + prompt = f""" + Are these two news articles about the same event or story? Answer only 'yes' or 'no'.\n\nArticle 1:\nTitle: {article.get('title', '')}\nContent: {article.get('content', '')[:500]}\n\nArticle 2:\nTitle: {existing_article.get('title', '')}\nContent: {existing_article.get('content', '')[:500]}\n""" + response = self.ai_analyzer._make_groq_request(prompt, max_tokens=5) + if response and response.strip().lower().startswith('yes'): + return True + return False + + def is_duplicate_by_similarity(self, article: Dict[str, Any], threshold: float = 0.9) -> bool: + """Check if the article is a duplicate using similarity search and LLM verification""" + all_articles = self.recommender.vector_store.get_all_articles() + if not all_articles: + return False # No articles to compare with + embedding = self.recommender.embedding_generator.generate_query_embedding( + self.recommender.embedding_generator.create_article_text(article) + ) + existing_embeddings = self.recommender.vector_store.index.reconstruct_n(0, len(all_articles)) + import numpy as np + for idx, existing_embedding in enumerate(existing_embeddings): + norm1 = np.linalg.norm(embedding) + norm2 = np.linalg.norm(existing_embedding) + if norm1 == 0 or norm2 == 0: + continue + similarity = float(np.dot(embedding, existing_embedding) / (norm1 * norm2)) + if similarity >= threshold: + # Use LLM to confirm duplicate + existing_article = all_articles[idx] + if self.is_duplicate_by_llm(article, existing_article): + return True # LLM confirms duplicate + return False + + def fetch_rss_feed(self, feed_url: str) -> List[Dict[str, Any]]: + """Fetch articles from a single RSS feed""" try: - response = self.client.chat.completions.create( - messages=[ - {"role": "system", "content": "You are an expert news analyst. Provide concise, accurate analysis."}, - {"role": "user", "content": prompt} - ], - model=self.model, - max_tokens=max_tokens, - temperature=0.3 - ) - return response.choices[0].message.content.strip() + print(f"Fetching from: {feed_url}") + + # Use requests with proper headers and timeout + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + + try: + import requests + response = requests.get(feed_url, headers=headers, timeout=15) + response.raise_for_status() + feed = feedparser.parse(response.content) + except Exception as e: + print(f"HTTP request failed, trying direct feedparser: {e}") + feed = feedparser.parse(feed_url) + + if feed.bozo: + print(f"Warning: Feed parsing issues for {feed_url}") + if hasattr(feed, 'bozo_exception'): + print(f"Bozo exception: {feed.bozo_exception}") + + articles = [] + source_name = getattr(feed.feed, 'title', urlparse(feed_url).netloc) + + for entry in feed.entries[:self.max_articles]: + try: + # Extract article data + title = getattr(entry, 'title', 'No Title') + content = getattr(entry, 'summary', getattr(entry, 'description', '')) + url = getattr(entry, 'link', '') + published = getattr(entry, 'published', '') + + # Parse date + try: + if published: + pub_date = datetime(*entry.published_parsed[:6]) + else: + pub_date = datetime.now() + except: + pub_date = datetime.now() + + # Create article object + article = { + "id": self.generate_article_id(title, url), + "title": title, + "content": self.clean_content(content), + "url": url, + "source": source_name, + "published_date": pub_date.isoformat(), + "fetched_date": datetime.now().isoformat(), + "categories": getattr(entry, 'tags', []), + "slug": title.lower().replace(" ", "-").replace("'", "")[:50] + } + + # Check for duplicate using similarity search + if self.is_duplicate_by_similarity(article): + print(f"Skipped duplicate article (similarity): {title}") + continue + + articles.append(article) + + except Exception as e: + print(f"Error processing entry: {e}") + continue + + print(f"Fetched {len(articles)} articles from {source_name}") + + # If no articles but feed parsed successfully, it might be due to no new content + if len(articles) == 0 and not feed.bozo: + print(f"No new articles found in {source_name} (feed is valid)") + + return articles + except Exception as e: - print(f"❌ Groq API error: {e}") - return None - - def summarize_article(self, article: Dict[str, Any]) -> Dict[str, Any]: - """Generate AI summary of an article""" - if not self.available: - return {"summary": "AI analysis not available", "available": False} - - title = article.get('title', '') - content = article.get('content', '') - - prompt = f""" - Analyze this news article and provide a concise summary: - - Title: {title} - Content: {content[:1000]}... - - Provide: - 1. A 2-sentence summary - 2. 3 key points - 3. Main topic category - - Format as JSON: - {{ - "summary": "Brief 2-sentence summary", - "key_points": ["point1", "point2", "point3"], - "category": "Technology/Business/Science/etc" - }} - """ - - response = self._make_groq_request(prompt, max_tokens=300) - - if response: - try: - analysis = json.loads(response) - analysis["available"] = True - analysis["analyzed_at"] = datetime.now().isoformat() - return analysis - except json.JSONDecodeError: - return { - "summary": response, - "available": True, - "analyzed_at": datetime.now().isoformat() - } - - return {"summary": "Analysis failed", "available": False} - - def extract_keywords(self, article: Dict[str, Any]) -> List[str]: - """Extract key terms and entities from article""" - if not self.available: + print(f"Error fetching RSS feed {feed_url}: {e}") return [] - - title = article.get('title', '') - content = article.get('content', '') - - prompt = f""" - Extract the most important keywords and entities from this article: - - Title: {title} - Content: {content[:800]}... - - Return only a JSON array of 5-8 most relevant keywords: - ["keyword1", "keyword2", "keyword3", ...] - """ - - response = self._make_groq_request(prompt, max_tokens=100) - - if response: - try: - keywords = json.loads(response) - return keywords if isinstance(keywords, list) else [] - except json.JSONDecodeError: - # Fallback: extract from response text - words = response.replace('[', '').replace(']', '').replace('"', '').split(',') - return [word.strip() for word in words[:8]] - - return [] - def analyze_sentiment(self, article: Dict[str, Any]) -> Dict[str, Any]: - """Analyze sentiment and tone of article""" - if not self.available: - return {"sentiment": "neutral", "confidence": 0.0, "available": False} + def fetch_all_news(self) -> List[Dict[str, Any]]: + """Fetch news from all configured RSS feeds""" + all_articles = [] - title = article.get('title', '') - content = article.get('content', '') + for feed_url in settings.rss_feeds: + feed_url = feed_url.strip() + if feed_url: + articles = self.fetch_rss_feed(feed_url) + all_articles.extend(articles) - prompt = f""" - Analyze the sentiment and tone of this news article: + # Remove duplicates based on ID + unique_articles = {} + for article in all_articles: + unique_articles[article['id']] = article - Title: {title} - Content: {content[:600]}... + final_articles = list(unique_articles.values()) + print(f"Total unique articles fetched: {len(final_articles)}") - Return JSON with: - {{ - "sentiment": "positive/negative/neutral", - "confidence": 0.85, - "tone": "informative/urgent/optimistic/concerned/etc", - "reasoning": "Brief explanation" - }} - """ - - response = self._make_groq_request(prompt, max_tokens=150) - - if response: - try: - sentiment = json.loads(response) - sentiment["available"] = True - return sentiment - except json.JSONDecodeError: - return { - "sentiment": "neutral", - "confidence": 0.5, - "tone": "informative", - "reasoning": response, - "available": True - } - - return {"sentiment": "neutral", "confidence": 0.0, "available": False} + return final_articles - def generate_insights(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]: - """Generate insights from multiple articles""" - if not self.available or not articles: - return {"insights": "AI insights not available", "available": False} - - # Prepare article summaries - article_summaries = [] - for i, article in enumerate(articles[:5]): # Limit to 5 articles - title = article.get('title', '') - source = article.get('source', '') - article_summaries.append(f"{i+1}. {title} (Source: {source})") - - prompt = f""" - Analyze these recent news articles and provide insights: - - Articles: - {chr(10).join(article_summaries)} - - Provide: - 1. Main trends or themes - 2. Key developments - 3. Potential implications - - Format as JSON: - {{ - "trends": ["trend1", "trend2"], - "key_developments": ["development1", "development2"], - "implications": "Brief analysis of what this means" - }} - """ - - response = self._make_groq_request(prompt, max_tokens=400) - - if response: - try: - insights = json.loads(response) - insights["available"] = True - insights["analyzed_at"] = datetime.now().isoformat() - insights["article_count"] = len(articles) - return insights - except json.JSONDecodeError: - return { - "insights": response, - "available": True, - "analyzed_at": datetime.now().isoformat() - } - - return {"insights": "Analysis failed", "available": False} + def save_articles(self, articles: List[Dict[str, Any]]) -> str: + """Save articles to JSON file""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"news_{timestamp}.json" + + # Normalize the path to avoid double backslashes + raw_news_dir = os.path.normpath(self.raw_news_dir) + filepath = os.path.normpath(os.path.join(raw_news_dir, filename)) + + # Ensure directory exists + os.makedirs(raw_news_dir, exist_ok=True) + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(articles, f, indent=2, ensure_ascii=False) + + print(f"Saved {len(articles)} articles to {filepath}") + return filepath - def get_status(self) -> Dict[str, Any]: - """Get AI analyzer status""" - return { - "available": self.available, - "model": self.model if self.available else None, - "features": [ - "Article Summarization", - "Keyword Extraction", - "Sentiment Analysis", - "Trend Insights" - ] if self.available else [] - } \ No newline at end of file + def fetch_and_save_news(self) -> Dict[str, Any]: + """Fetch news and save to file""" + articles = self.fetch_all_news() + + if articles: + filepath = self.save_articles(articles) + return { + "success": True, + "articles_count": len(articles), + "filepath": filepath, + "articles": articles + } + else: + return { + "success": False, + "articles_count": 0, + "message": "No articles fetched" + } + +# Test function +if __name__ == "__main__": + fetcher = NewsFetcher() + result = fetcher.fetch_and_save_news() + print(f"Result: {result}")