fix: Restore NewsFetcher class in news_fetcher.py

- Fixed import error by restoring proper NewsFetcher class structure
- Updated RSS feed fetching implementation with improved error handling
- Enhanced feed parsing with better timeout management and user agents
- Maintained compatibility with existing system architecture
- Resolved server startup issues caused by missing class definition
This commit is contained in:
Aherobo Ovie Victor
2025-07-15 21:55:43 +01:00
parent 508270e732
commit bccb7f2c2c
+183 -197
View File
@@ -1,230 +1,216 @@
"""AI Analysis module for DS Task AI News using Groq LLM"""
import os
from typing import Dict, List, Any, Optional
"""RSS News Fetcher for DS Task AI News"""
import feedparser
import requests
import json
import os
from datetime import datetime
try:
from groq import Groq
GROQ_AVAILABLE = True
except ImportError:
GROQ_AVAILABLE = False
print("⚠️ Groq not available - install with: pip install groq")
from typing import List, Dict, Any
from urllib.parse import urlparse
import hashlib
from config import settings
from recommender import NewsRecommender # Add this import
from ai_analyzer import AIAnalyzer # Add this import
class AIAnalyzer:
"""AI-powered article analysis using Groq LLM"""
class NewsFetcher:
def __init__(self):
self.client = None
self.model = "llama3-8b-8192" # Fast Groq model
self.available = False
self.raw_news_dir = settings.raw_news_dir
self.max_articles = settings.max_articles_per_feed
self.recommender = NewsRecommender() # Add recommender for embedding/vector access
self.ai_analyzer = AIAnalyzer() # Add AIAnalyzer for LLM duplicate check
# Ensure directories exist
os.makedirs(self.raw_news_dir, exist_ok=True)
if GROQ_AVAILABLE and settings.groq_api_key:
try:
self.client = Groq(api_key=settings.groq_api_key)
self.available = True
print("✅ Groq AI Analyzer initialized successfully")
except Exception as e:
print(f"❌ Groq initialization failed: {e}")
else:
print("⚠️ Groq AI Analyzer not available (missing API key or library)")
def generate_article_id(self, title: str, url: str) -> str:
"""Generate unique ID for article"""
content = f"{title}{url}"
return hashlib.md5(content.encode()).hexdigest()[:12]
def _make_groq_request(self, prompt: str, max_tokens: int = 500) -> Optional[str]:
"""Make a request to Groq API"""
if not self.available:
return None
def clean_content(self, content: str) -> str:
"""Clean and truncate content"""
if not content:
return ""
try:
response = self.client.chat.completions.create(
messages=[
{"role": "system", "content": "You are an expert news analyst. Provide concise, accurate analysis."},
{"role": "user", "content": prompt}
],
model=self.model,
max_tokens=max_tokens,
temperature=0.3
# Remove HTML tags (basic cleaning)
import re
content = re.sub(r'<[^>]+>', '', content)
# Truncate to reasonable length
return content[:1000] if len(content) > 1000 else content
def is_duplicate_by_llm(self, article: Dict[str, Any], existing_article: Dict[str, Any]) -> bool:
"""Use LLM to check if two articles are about the same event or story"""
if not self.ai_analyzer.available:
return False # LLM not available, skip this check
prompt = f"""
Are these two news articles about the same event or story? Answer only 'yes' or 'no'.\n\nArticle 1:\nTitle: {article.get('title', '')}\nContent: {article.get('content', '')[:500]}\n\nArticle 2:\nTitle: {existing_article.get('title', '')}\nContent: {existing_article.get('content', '')[:500]}\n"""
response = self.ai_analyzer._make_groq_request(prompt, max_tokens=5)
if response and response.strip().lower().startswith('yes'):
return True
return False
def is_duplicate_by_similarity(self, article: Dict[str, Any], threshold: float = 0.9) -> bool:
"""Check if the article is a duplicate using similarity search and LLM verification"""
all_articles = self.recommender.vector_store.get_all_articles()
if not all_articles:
return False # No articles to compare with
embedding = self.recommender.embedding_generator.generate_query_embedding(
self.recommender.embedding_generator.create_article_text(article)
)
return response.choices[0].message.content.strip()
existing_embeddings = self.recommender.vector_store.index.reconstruct_n(0, len(all_articles))
import numpy as np
for idx, existing_embedding in enumerate(existing_embeddings):
norm1 = np.linalg.norm(embedding)
norm2 = np.linalg.norm(existing_embedding)
if norm1 == 0 or norm2 == 0:
continue
similarity = float(np.dot(embedding, existing_embedding) / (norm1 * norm2))
if similarity >= threshold:
# Use LLM to confirm duplicate
existing_article = all_articles[idx]
if self.is_duplicate_by_llm(article, existing_article):
return True # LLM confirms duplicate
return False
def fetch_rss_feed(self, feed_url: str) -> List[Dict[str, Any]]:
"""Fetch articles from a single RSS feed"""
try:
print(f"Fetching from: {feed_url}")
# Use requests with proper headers and timeout
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
import requests
response = requests.get(feed_url, headers=headers, timeout=15)
response.raise_for_status()
feed = feedparser.parse(response.content)
except Exception as e:
print(f"❌ Groq API error: {e}")
return None
print(f"HTTP request failed, trying direct feedparser: {e}")
feed = feedparser.parse(feed_url)
def summarize_article(self, article: Dict[str, Any]) -> Dict[str, Any]:
"""Generate AI summary of an article"""
if not self.available:
return {"summary": "AI analysis not available", "available": False}
if feed.bozo:
print(f"Warning: Feed parsing issues for {feed_url}")
if hasattr(feed, 'bozo_exception'):
print(f"Bozo exception: {feed.bozo_exception}")
title = article.get('title', '')
content = article.get('content', '')
articles = []
source_name = getattr(feed.feed, 'title', urlparse(feed_url).netloc)
prompt = f"""
Analyze this news article and provide a concise summary:
Title: {title}
Content: {content[:1000]}...
Provide:
1. A 2-sentence summary
2. 3 key points
3. Main topic category
Format as JSON:
{{
"summary": "Brief 2-sentence summary",
"key_points": ["point1", "point2", "point3"],
"category": "Technology/Business/Science/etc"
}}
"""
response = self._make_groq_request(prompt, max_tokens=300)
if response:
for entry in feed.entries[:self.max_articles]:
try:
analysis = json.loads(response)
analysis["available"] = True
analysis["analyzed_at"] = datetime.now().isoformat()
return analysis
except json.JSONDecodeError:
return {
"summary": response,
"available": True,
"analyzed_at": datetime.now().isoformat()
# Extract article data
title = getattr(entry, 'title', 'No Title')
content = getattr(entry, 'summary', getattr(entry, 'description', ''))
url = getattr(entry, 'link', '')
published = getattr(entry, 'published', '')
# Parse date
try:
if published:
pub_date = datetime(*entry.published_parsed[:6])
else:
pub_date = datetime.now()
except:
pub_date = datetime.now()
# Create article object
article = {
"id": self.generate_article_id(title, url),
"title": title,
"content": self.clean_content(content),
"url": url,
"source": source_name,
"published_date": pub_date.isoformat(),
"fetched_date": datetime.now().isoformat(),
"categories": getattr(entry, 'tags', []),
"slug": title.lower().replace(" ", "-").replace("'", "")[:50]
}
return {"summary": "Analysis failed", "available": False}
# Check for duplicate using similarity search
if self.is_duplicate_by_similarity(article):
print(f"Skipped duplicate article (similarity): {title}")
continue
def extract_keywords(self, article: Dict[str, Any]) -> List[str]:
"""Extract key terms and entities from article"""
if not self.available:
articles.append(article)
except Exception as e:
print(f"Error processing entry: {e}")
continue
print(f"Fetched {len(articles)} articles from {source_name}")
# If no articles but feed parsed successfully, it might be due to no new content
if len(articles) == 0 and not feed.bozo:
print(f"No new articles found in {source_name} (feed is valid)")
return articles
except Exception as e:
print(f"Error fetching RSS feed {feed_url}: {e}")
return []
title = article.get('title', '')
content = article.get('content', '')
def fetch_all_news(self) -> List[Dict[str, Any]]:
"""Fetch news from all configured RSS feeds"""
all_articles = []
prompt = f"""
Extract the most important keywords and entities from this article:
for feed_url in settings.rss_feeds:
feed_url = feed_url.strip()
if feed_url:
articles = self.fetch_rss_feed(feed_url)
all_articles.extend(articles)
Title: {title}
Content: {content[:800]}...
# Remove duplicates based on ID
unique_articles = {}
for article in all_articles:
unique_articles[article['id']] = article
Return only a JSON array of 5-8 most relevant keywords:
["keyword1", "keyword2", "keyword3", ...]
"""
final_articles = list(unique_articles.values())
print(f"Total unique articles fetched: {len(final_articles)}")
response = self._make_groq_request(prompt, max_tokens=100)
return final_articles
if response:
try:
keywords = json.loads(response)
return keywords if isinstance(keywords, list) else []
except json.JSONDecodeError:
# Fallback: extract from response text
words = response.replace('[', '').replace(']', '').replace('"', '').split(',')
return [word.strip() for word in words[:8]]
def save_articles(self, articles: List[Dict[str, Any]]) -> str:
"""Save articles to JSON file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"news_{timestamp}.json"
return []
# Normalize the path to avoid double backslashes
raw_news_dir = os.path.normpath(self.raw_news_dir)
filepath = os.path.normpath(os.path.join(raw_news_dir, filename))
def analyze_sentiment(self, article: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze sentiment and tone of article"""
if not self.available:
return {"sentiment": "neutral", "confidence": 0.0, "available": False}
# Ensure directory exists
os.makedirs(raw_news_dir, exist_ok=True)
title = article.get('title', '')
content = article.get('content', '')
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(articles, f, indent=2, ensure_ascii=False)
prompt = f"""
Analyze the sentiment and tone of this news article:
print(f"Saved {len(articles)} articles to {filepath}")
return filepath
Title: {title}
Content: {content[:600]}...
def fetch_and_save_news(self) -> Dict[str, Any]:
"""Fetch news and save to file"""
articles = self.fetch_all_news()
Return JSON with:
{{
"sentiment": "positive/negative/neutral",
"confidence": 0.85,
"tone": "informative/urgent/optimistic/concerned/etc",
"reasoning": "Brief explanation"
}}
"""
response = self._make_groq_request(prompt, max_tokens=150)
if response:
try:
sentiment = json.loads(response)
sentiment["available"] = True
return sentiment
except json.JSONDecodeError:
if articles:
filepath = self.save_articles(articles)
return {
"sentiment": "neutral",
"confidence": 0.5,
"tone": "informative",
"reasoning": response,
"available": True
"success": True,
"articles_count": len(articles),
"filepath": filepath,
"articles": articles
}
else:
return {
"success": False,
"articles_count": 0,
"message": "No articles fetched"
}
return {"sentiment": "neutral", "confidence": 0.0, "available": False}
def generate_insights(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate insights from multiple articles"""
if not self.available or not articles:
return {"insights": "AI insights not available", "available": False}
# Prepare article summaries
article_summaries = []
for i, article in enumerate(articles[:5]): # Limit to 5 articles
title = article.get('title', '')
source = article.get('source', '')
article_summaries.append(f"{i+1}. {title} (Source: {source})")
prompt = f"""
Analyze these recent news articles and provide insights:
Articles:
{chr(10).join(article_summaries)}
Provide:
1. Main trends or themes
2. Key developments
3. Potential implications
Format as JSON:
{{
"trends": ["trend1", "trend2"],
"key_developments": ["development1", "development2"],
"implications": "Brief analysis of what this means"
}}
"""
response = self._make_groq_request(prompt, max_tokens=400)
if response:
try:
insights = json.loads(response)
insights["available"] = True
insights["analyzed_at"] = datetime.now().isoformat()
insights["article_count"] = len(articles)
return insights
except json.JSONDecodeError:
return {
"insights": response,
"available": True,
"analyzed_at": datetime.now().isoformat()
}
return {"insights": "Analysis failed", "available": False}
def get_status(self) -> Dict[str, Any]:
"""Get AI analyzer status"""
return {
"available": self.available,
"model": self.model if self.available else None,
"features": [
"Article Summarization",
"Keyword Extraction",
"Sentiment Analysis",
"Trend Insights"
] if self.available else []
}
# Test function
if __name__ == "__main__":
fetcher = NewsFetcher()
result = fetcher.fetch_and_save_news()
print(f"Result: {result}")