Files
DS_TASK_AI_VIEWS/backend/news_fetcher.py
Aherobo Ovie Victor bccb7f2c2c fix: Restore NewsFetcher class in news_fetcher.py
- Fixed import error by restoring proper NewsFetcher class structure
- Updated RSS feed fetching implementation with improved error handling
- Enhanced feed parsing with better timeout management and user agents
- Maintained compatibility with existing system architecture
- Resolved server startup issues caused by missing class definition
2025-07-15 21:55:43 +01:00

217 lines
8.9 KiB
Python

"""RSS News Fetcher for DS Task AI News"""
import feedparser
import requests
import json
import os
from datetime import datetime
from typing import List, Dict, Any
from urllib.parse import urlparse
import hashlib
from config import settings
from recommender import NewsRecommender # Add this import
from ai_analyzer import AIAnalyzer # Add this import
class NewsFetcher:
def __init__(self):
self.raw_news_dir = settings.raw_news_dir
self.max_articles = settings.max_articles_per_feed
self.recommender = NewsRecommender() # Add recommender for embedding/vector access
self.ai_analyzer = AIAnalyzer() # Add AIAnalyzer for LLM duplicate check
# Ensure directories exist
os.makedirs(self.raw_news_dir, exist_ok=True)
def generate_article_id(self, title: str, url: str) -> str:
"""Generate unique ID for article"""
content = f"{title}{url}"
return hashlib.md5(content.encode()).hexdigest()[:12]
def clean_content(self, content: str) -> str:
"""Clean and truncate content"""
if not content:
return ""
# Remove HTML tags (basic cleaning)
import re
content = re.sub(r'<[^>]+>', '', content)
# Truncate to reasonable length
return content[:1000] if len(content) > 1000 else content
def is_duplicate_by_llm(self, article: Dict[str, Any], existing_article: Dict[str, Any]) -> bool:
"""Use LLM to check if two articles are about the same event or story"""
if not self.ai_analyzer.available:
return False # LLM not available, skip this check
prompt = f"""
Are these two news articles about the same event or story? Answer only 'yes' or 'no'.\n\nArticle 1:\nTitle: {article.get('title', '')}\nContent: {article.get('content', '')[:500]}\n\nArticle 2:\nTitle: {existing_article.get('title', '')}\nContent: {existing_article.get('content', '')[:500]}\n"""
response = self.ai_analyzer._make_groq_request(prompt, max_tokens=5)
if response and response.strip().lower().startswith('yes'):
return True
return False
def is_duplicate_by_similarity(self, article: Dict[str, Any], threshold: float = 0.9) -> bool:
"""Check if the article is a duplicate using similarity search and LLM verification"""
all_articles = self.recommender.vector_store.get_all_articles()
if not all_articles:
return False # No articles to compare with
embedding = self.recommender.embedding_generator.generate_query_embedding(
self.recommender.embedding_generator.create_article_text(article)
)
existing_embeddings = self.recommender.vector_store.index.reconstruct_n(0, len(all_articles))
import numpy as np
for idx, existing_embedding in enumerate(existing_embeddings):
norm1 = np.linalg.norm(embedding)
norm2 = np.linalg.norm(existing_embedding)
if norm1 == 0 or norm2 == 0:
continue
similarity = float(np.dot(embedding, existing_embedding) / (norm1 * norm2))
if similarity >= threshold:
# Use LLM to confirm duplicate
existing_article = all_articles[idx]
if self.is_duplicate_by_llm(article, existing_article):
return True # LLM confirms duplicate
return False
def fetch_rss_feed(self, feed_url: str) -> List[Dict[str, Any]]:
"""Fetch articles from a single RSS feed"""
try:
print(f"Fetching from: {feed_url}")
# Use requests with proper headers and timeout
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
import requests
response = requests.get(feed_url, headers=headers, timeout=15)
response.raise_for_status()
feed = feedparser.parse(response.content)
except Exception as e:
print(f"HTTP request failed, trying direct feedparser: {e}")
feed = feedparser.parse(feed_url)
if feed.bozo:
print(f"Warning: Feed parsing issues for {feed_url}")
if hasattr(feed, 'bozo_exception'):
print(f"Bozo exception: {feed.bozo_exception}")
articles = []
source_name = getattr(feed.feed, 'title', urlparse(feed_url).netloc)
for entry in feed.entries[:self.max_articles]:
try:
# Extract article data
title = getattr(entry, 'title', 'No Title')
content = getattr(entry, 'summary', getattr(entry, 'description', ''))
url = getattr(entry, 'link', '')
published = getattr(entry, 'published', '')
# Parse date
try:
if published:
pub_date = datetime(*entry.published_parsed[:6])
else:
pub_date = datetime.now()
except:
pub_date = datetime.now()
# Create article object
article = {
"id": self.generate_article_id(title, url),
"title": title,
"content": self.clean_content(content),
"url": url,
"source": source_name,
"published_date": pub_date.isoformat(),
"fetched_date": datetime.now().isoformat(),
"categories": getattr(entry, 'tags', []),
"slug": title.lower().replace(" ", "-").replace("'", "")[:50]
}
# Check for duplicate using similarity search
if self.is_duplicate_by_similarity(article):
print(f"Skipped duplicate article (similarity): {title}")
continue
articles.append(article)
except Exception as e:
print(f"Error processing entry: {e}")
continue
print(f"Fetched {len(articles)} articles from {source_name}")
# If no articles but feed parsed successfully, it might be due to no new content
if len(articles) == 0 and not feed.bozo:
print(f"No new articles found in {source_name} (feed is valid)")
return articles
except Exception as e:
print(f"Error fetching RSS feed {feed_url}: {e}")
return []
def fetch_all_news(self) -> List[Dict[str, Any]]:
"""Fetch news from all configured RSS feeds"""
all_articles = []
for feed_url in settings.rss_feeds:
feed_url = feed_url.strip()
if feed_url:
articles = self.fetch_rss_feed(feed_url)
all_articles.extend(articles)
# Remove duplicates based on ID
unique_articles = {}
for article in all_articles:
unique_articles[article['id']] = article
final_articles = list(unique_articles.values())
print(f"Total unique articles fetched: {len(final_articles)}")
return final_articles
def save_articles(self, articles: List[Dict[str, Any]]) -> str:
"""Save articles to JSON file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"news_{timestamp}.json"
# Normalize the path to avoid double backslashes
raw_news_dir = os.path.normpath(self.raw_news_dir)
filepath = os.path.normpath(os.path.join(raw_news_dir, filename))
# Ensure directory exists
os.makedirs(raw_news_dir, exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(articles, f, indent=2, ensure_ascii=False)
print(f"Saved {len(articles)} articles to {filepath}")
return filepath
def fetch_and_save_news(self) -> Dict[str, Any]:
"""Fetch news and save to file"""
articles = self.fetch_all_news()
if articles:
filepath = self.save_articles(articles)
return {
"success": True,
"articles_count": len(articles),
"filepath": filepath,
"articles": articles
}
else:
return {
"success": False,
"articles_count": 0,
"message": "No articles fetched"
}
# Test function
if __name__ == "__main__":
fetcher = NewsFetcher()
result = fetcher.fetch_and_save_news()
print(f"Result: {result}")