bccb7f2c2c
- Fixed import error by restoring proper NewsFetcher class structure - Updated RSS feed fetching implementation with improved error handling - Enhanced feed parsing with better timeout management and user agents - Maintained compatibility with existing system architecture - Resolved server startup issues caused by missing class definition
217 lines
8.9 KiB
Python
217 lines
8.9 KiB
Python
|
|
"""RSS News Fetcher for DS Task AI News"""
|
|
import feedparser
|
|
import requests
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any
|
|
from urllib.parse import urlparse
|
|
import hashlib
|
|
from config import settings
|
|
from recommender import NewsRecommender # Add this import
|
|
from ai_analyzer import AIAnalyzer # Add this import
|
|
|
|
class NewsFetcher:
|
|
def __init__(self):
|
|
self.raw_news_dir = settings.raw_news_dir
|
|
self.max_articles = settings.max_articles_per_feed
|
|
self.recommender = NewsRecommender() # Add recommender for embedding/vector access
|
|
self.ai_analyzer = AIAnalyzer() # Add AIAnalyzer for LLM duplicate check
|
|
# Ensure directories exist
|
|
os.makedirs(self.raw_news_dir, exist_ok=True)
|
|
|
|
def generate_article_id(self, title: str, url: str) -> str:
|
|
"""Generate unique ID for article"""
|
|
content = f"{title}{url}"
|
|
return hashlib.md5(content.encode()).hexdigest()[:12]
|
|
|
|
def clean_content(self, content: str) -> str:
|
|
"""Clean and truncate content"""
|
|
if not content:
|
|
return ""
|
|
|
|
# Remove HTML tags (basic cleaning)
|
|
import re
|
|
content = re.sub(r'<[^>]+>', '', content)
|
|
|
|
# Truncate to reasonable length
|
|
return content[:1000] if len(content) > 1000 else content
|
|
|
|
def is_duplicate_by_llm(self, article: Dict[str, Any], existing_article: Dict[str, Any]) -> bool:
|
|
"""Use LLM to check if two articles are about the same event or story"""
|
|
if not self.ai_analyzer.available:
|
|
return False # LLM not available, skip this check
|
|
prompt = f"""
|
|
Are these two news articles about the same event or story? Answer only 'yes' or 'no'.\n\nArticle 1:\nTitle: {article.get('title', '')}\nContent: {article.get('content', '')[:500]}\n\nArticle 2:\nTitle: {existing_article.get('title', '')}\nContent: {existing_article.get('content', '')[:500]}\n"""
|
|
response = self.ai_analyzer._make_groq_request(prompt, max_tokens=5)
|
|
if response and response.strip().lower().startswith('yes'):
|
|
return True
|
|
return False
|
|
|
|
def is_duplicate_by_similarity(self, article: Dict[str, Any], threshold: float = 0.9) -> bool:
|
|
"""Check if the article is a duplicate using similarity search and LLM verification"""
|
|
all_articles = self.recommender.vector_store.get_all_articles()
|
|
if not all_articles:
|
|
return False # No articles to compare with
|
|
embedding = self.recommender.embedding_generator.generate_query_embedding(
|
|
self.recommender.embedding_generator.create_article_text(article)
|
|
)
|
|
existing_embeddings = self.recommender.vector_store.index.reconstruct_n(0, len(all_articles))
|
|
import numpy as np
|
|
for idx, existing_embedding in enumerate(existing_embeddings):
|
|
norm1 = np.linalg.norm(embedding)
|
|
norm2 = np.linalg.norm(existing_embedding)
|
|
if norm1 == 0 or norm2 == 0:
|
|
continue
|
|
similarity = float(np.dot(embedding, existing_embedding) / (norm1 * norm2))
|
|
if similarity >= threshold:
|
|
# Use LLM to confirm duplicate
|
|
existing_article = all_articles[idx]
|
|
if self.is_duplicate_by_llm(article, existing_article):
|
|
return True # LLM confirms duplicate
|
|
return False
|
|
|
|
def fetch_rss_feed(self, feed_url: str) -> List[Dict[str, Any]]:
|
|
"""Fetch articles from a single RSS feed"""
|
|
try:
|
|
print(f"Fetching from: {feed_url}")
|
|
|
|
# Use requests with proper headers and timeout
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
}
|
|
|
|
try:
|
|
import requests
|
|
response = requests.get(feed_url, headers=headers, timeout=15)
|
|
response.raise_for_status()
|
|
feed = feedparser.parse(response.content)
|
|
except Exception as e:
|
|
print(f"HTTP request failed, trying direct feedparser: {e}")
|
|
feed = feedparser.parse(feed_url)
|
|
|
|
if feed.bozo:
|
|
print(f"Warning: Feed parsing issues for {feed_url}")
|
|
if hasattr(feed, 'bozo_exception'):
|
|
print(f"Bozo exception: {feed.bozo_exception}")
|
|
|
|
articles = []
|
|
source_name = getattr(feed.feed, 'title', urlparse(feed_url).netloc)
|
|
|
|
for entry in feed.entries[:self.max_articles]:
|
|
try:
|
|
# Extract article data
|
|
title = getattr(entry, 'title', 'No Title')
|
|
content = getattr(entry, 'summary', getattr(entry, 'description', ''))
|
|
url = getattr(entry, 'link', '')
|
|
published = getattr(entry, 'published', '')
|
|
|
|
# Parse date
|
|
try:
|
|
if published:
|
|
pub_date = datetime(*entry.published_parsed[:6])
|
|
else:
|
|
pub_date = datetime.now()
|
|
except:
|
|
pub_date = datetime.now()
|
|
|
|
# Create article object
|
|
article = {
|
|
"id": self.generate_article_id(title, url),
|
|
"title": title,
|
|
"content": self.clean_content(content),
|
|
"url": url,
|
|
"source": source_name,
|
|
"published_date": pub_date.isoformat(),
|
|
"fetched_date": datetime.now().isoformat(),
|
|
"categories": getattr(entry, 'tags', []),
|
|
"slug": title.lower().replace(" ", "-").replace("'", "")[:50]
|
|
}
|
|
|
|
# Check for duplicate using similarity search
|
|
if self.is_duplicate_by_similarity(article):
|
|
print(f"Skipped duplicate article (similarity): {title}")
|
|
continue
|
|
|
|
articles.append(article)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing entry: {e}")
|
|
continue
|
|
|
|
print(f"Fetched {len(articles)} articles from {source_name}")
|
|
|
|
# If no articles but feed parsed successfully, it might be due to no new content
|
|
if len(articles) == 0 and not feed.bozo:
|
|
print(f"No new articles found in {source_name} (feed is valid)")
|
|
|
|
return articles
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching RSS feed {feed_url}: {e}")
|
|
return []
|
|
|
|
def fetch_all_news(self) -> List[Dict[str, Any]]:
|
|
"""Fetch news from all configured RSS feeds"""
|
|
all_articles = []
|
|
|
|
for feed_url in settings.rss_feeds:
|
|
feed_url = feed_url.strip()
|
|
if feed_url:
|
|
articles = self.fetch_rss_feed(feed_url)
|
|
all_articles.extend(articles)
|
|
|
|
# Remove duplicates based on ID
|
|
unique_articles = {}
|
|
for article in all_articles:
|
|
unique_articles[article['id']] = article
|
|
|
|
final_articles = list(unique_articles.values())
|
|
print(f"Total unique articles fetched: {len(final_articles)}")
|
|
|
|
return final_articles
|
|
|
|
def save_articles(self, articles: List[Dict[str, Any]]) -> str:
|
|
"""Save articles to JSON file"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"news_{timestamp}.json"
|
|
|
|
# Normalize the path to avoid double backslashes
|
|
raw_news_dir = os.path.normpath(self.raw_news_dir)
|
|
filepath = os.path.normpath(os.path.join(raw_news_dir, filename))
|
|
|
|
# Ensure directory exists
|
|
os.makedirs(raw_news_dir, exist_ok=True)
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(articles, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Saved {len(articles)} articles to {filepath}")
|
|
return filepath
|
|
|
|
def fetch_and_save_news(self) -> Dict[str, Any]:
|
|
"""Fetch news and save to file"""
|
|
articles = self.fetch_all_news()
|
|
|
|
if articles:
|
|
filepath = self.save_articles(articles)
|
|
return {
|
|
"success": True,
|
|
"articles_count": len(articles),
|
|
"filepath": filepath,
|
|
"articles": articles
|
|
}
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"articles_count": 0,
|
|
"message": "No articles fetched"
|
|
}
|
|
|
|
# Test function
|
|
if __name__ == "__main__":
|
|
fetcher = NewsFetcher()
|
|
result = fetcher.fetch_and_save_news()
|
|
print(f"Result: {result}")
|