feat: Implement AI-powered embeddings and vector similarity search system

This commit is contained in:
Aherobo Ovie Victor
2025-07-07 18:45:10 +01:00
parent e188af8b17
commit 86d14ef472
3 changed files with 419 additions and 3 deletions
+220
View File
@@ -0,0 +1,220 @@
"""Groq LLM integration for DS Task AI News"""
import os
from typing import List, Dict, Any, Optional
from groq import Groq
from config import settings
class GroqLLMService:
def __init__(self):
self.client = None
self.model = "llama3-8b-8192" # Default Groq model
# Initialize Groq client if API key is available
if settings.groq_api_key:
try:
self.client = Groq(api_key=settings.groq_api_key)
print("✅ Groq LLM service initialized")
except Exception as e:
print(f"⚠️ Groq initialization failed: {e}")
self.client = None
else:
print("⚠️ Groq API key not provided")
def is_available(self) -> bool:
"""Check if Groq service is available"""
return self.client is not None
def summarize_article(self, article: Dict[str, Any]) -> Optional[str]:
"""Generate a summary for an article"""
if not self.is_available():
return None
try:
title = article.get('title', '')
content = article.get('content', '')
prompt = f"""
Please provide a concise summary of this news article in 2-3 sentences:
Title: {title}
Content: {content}
Summary:
"""
response = self.client.chat.completions.create(
messages=[
{"role": "user", "content": prompt}
],
model=self.model,
max_tokens=150,
temperature=0.3
)
summary = response.choices[0].message.content.strip()
return summary
except Exception as e:
print(f"Error generating summary: {e}")
return None
def analyze_sentiment(self, article: Dict[str, Any]) -> Optional[str]:
"""Analyze sentiment of an article"""
if not self.is_available():
return None
try:
title = article.get('title', '')
content = article.get('content', '')
prompt = f"""
Analyze the sentiment of this news article. Respond with only one word: "positive", "negative", or "neutral".
Title: {title}
Content: {content}
Sentiment:
"""
response = self.client.chat.completions.create(
messages=[
{"role": "user", "content": prompt}
],
model=self.model,
max_tokens=10,
temperature=0.1
)
sentiment = response.choices[0].message.content.strip().lower()
# Validate response
if sentiment in ['positive', 'negative', 'neutral']:
return sentiment
else:
return 'neutral' # Default fallback
except Exception as e:
print(f"Error analyzing sentiment: {e}")
return None
def extract_keywords(self, article: Dict[str, Any]) -> Optional[List[str]]:
"""Extract key topics/keywords from an article"""
if not self.is_available():
return None
try:
title = article.get('title', '')
content = article.get('content', '')
prompt = f"""
Extract 3-5 key topics or keywords from this news article. Return them as a comma-separated list.
Title: {title}
Content: {content}
Keywords:
"""
response = self.client.chat.completions.create(
messages=[
{"role": "user", "content": prompt}
],
model=self.model,
max_tokens=50,
temperature=0.3
)
keywords_text = response.choices[0].message.content.strip()
keywords = [kw.strip() for kw in keywords_text.split(',') if kw.strip()]
return keywords[:5] # Limit to 5 keywords
except Exception as e:
print(f"Error extracting keywords: {e}")
return None
def generate_insights(self, articles: List[Dict[str, Any]]) -> Optional[str]:
"""Generate insights from multiple articles"""
if not self.is_available() or not articles:
return None
try:
# Create a summary of article titles
titles = [article.get('title', '') for article in articles[:10]] # Limit to 10 articles
titles_text = '\n'.join([f"- {title}" for title in titles])
prompt = f"""
Based on these recent news headlines, provide 2-3 key insights about current trends or themes:
Headlines:
{titles_text}
Key Insights:
"""
response = self.client.chat.completions.create(
messages=[
{"role": "user", "content": prompt}
],
model=self.model,
max_tokens=200,
temperature=0.4
)
insights = response.choices[0].message.content.strip()
return insights
except Exception as e:
print(f"Error generating insights: {e}")
return None
def enhance_article(self, article: Dict[str, Any]) -> Dict[str, Any]:
"""Enhance article with AI-generated metadata"""
enhanced_article = article.copy()
if self.is_available():
# Add summary
summary = self.summarize_article(article)
if summary:
enhanced_article['ai_summary'] = summary
# Add sentiment
sentiment = self.analyze_sentiment(article)
if sentiment:
enhanced_article['sentiment'] = sentiment
# Add keywords
keywords = self.extract_keywords(article)
if keywords:
enhanced_article['ai_keywords'] = keywords
return enhanced_article
def batch_enhance_articles(self, articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Enhance multiple articles with AI features"""
enhanced_articles = []
for article in articles:
enhanced = self.enhance_article(article)
enhanced_articles.append(enhanced)
return enhanced_articles
# Test function
if __name__ == "__main__":
# Test Groq integration
groq_service = GroqLLMService()
if groq_service.is_available():
print("✅ Groq service is available")
# Test with sample article
sample_article = {
"title": "AI Technology Advances in Healthcare",
"content": "Recent developments in artificial intelligence are transforming the healthcare industry with new diagnostic tools and treatment methods."
}
enhanced = groq_service.enhance_article(sample_article)
print(f"Enhanced article: {enhanced}")
else:
print("⚠️ Groq service not available (API key needed)")
+87 -3
View File
@@ -8,6 +8,7 @@ import uvicorn
from config import settings from config import settings
from news_fetcher import NewsFetcher from news_fetcher import NewsFetcher
from recommender import NewsRecommender from recommender import NewsRecommender
from groq_integration import GroqLLMService
# Initialize FastAPI app # Initialize FastAPI app
app = FastAPI( app = FastAPI(
@@ -28,6 +29,7 @@ app.add_middleware(
# Initialize components # Initialize components
news_fetcher = NewsFetcher() news_fetcher = NewsFetcher()
recommender = NewsRecommender() recommender = NewsRecommender()
groq_service = GroqLLMService()
# Pydantic models # Pydantic models
class NewsQuery(BaseModel): class NewsQuery(BaseModel):
@@ -211,19 +213,101 @@ async def get_stats():
"""Get system statistics""" """Get system statistics"""
try: try:
stats = recommender.get_store_stats() stats = recommender.get_store_stats()
# Add RSS feed information # Add RSS feed information
stats['rss_feeds'] = settings.rss_feeds stats['rss_feeds'] = settings.rss_feeds
stats['embedding_model'] = settings.embedding_model stats['embedding_model'] = settings.embedding_model
stats['groq_available'] = groq_service.is_available()
return { return {
"success": True, "success": True,
"statistics": stats "statistics": stats
} }
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting stats: {str(e)}") raise HTTPException(status_code=500, detail=f"Error getting stats: {str(e)}")
@app.post("/enhance-article")
async def enhance_article_with_ai(article_data: Dict[str, Any]):
"""Enhance an article with AI-generated summary, sentiment, and keywords"""
try:
if not groq_service.is_available():
raise HTTPException(status_code=503, detail="Groq LLM service not available")
enhanced_article = groq_service.enhance_article(article_data)
return {
"success": True,
"original_article": article_data,
"enhanced_article": enhanced_article
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error enhancing article: {str(e)}")
@app.post("/generate-insights")
async def generate_news_insights():
"""Generate insights from recent news articles"""
try:
if not groq_service.is_available():
raise HTTPException(status_code=503, detail="Groq LLM service not available")
# Get recent articles
recent_articles = recommender.get_trending_articles(top_k=10)
if not recent_articles:
raise HTTPException(status_code=404, detail="No recent articles found")
insights = groq_service.generate_insights(recent_articles)
return {
"success": True,
"insights": insights,
"based_on_articles": len(recent_articles)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error generating insights: {str(e)}")
@app.post("/fetch-and-enhance-news")
async def fetch_and_enhance_news():
"""Fetch news and enhance with AI features"""
try:
# Fetch news articles
result = news_fetcher.fetch_and_save_news()
if not result["success"]:
raise HTTPException(status_code=500, detail=result.get("message", "Failed to fetch news"))
articles = result["articles"]
# Enhance with AI if Groq is available
if groq_service.is_available():
# Enhance first 5 articles as example
enhanced_articles = groq_service.batch_enhance_articles(articles[:5])
# Add enhanced articles to vector store
store_result = recommender.add_articles_to_store(enhanced_articles)
else:
# Add regular articles to vector store
store_result = recommender.add_articles_to_store(articles)
if not store_result["success"]:
raise HTTPException(status_code=500, detail=store_result.get("message", "Failed to add articles to store"))
return {
"success": True,
"message": "News fetched and processed successfully",
"articles_fetched": result["articles_count"],
"articles_enhanced": 5 if groq_service.is_available() else 0,
"articles_stored": store_result["articles_added"],
"total_articles": store_result["total_articles"],
"ai_features_enabled": groq_service.is_available()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching and enhancing news: {str(e)}")
# Run the application # Run the application
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run( uvicorn.run(
+112
View File
@@ -0,0 +1,112 @@
"""Test AI features: embeddings and vector search"""
import sys
import os
sys.path.append('backend')
def test_ai_pipeline():
print("🤖 Testing AI Features Pipeline")
print("=" * 50)
# Step 1: Get some news articles
print("1. Fetching news articles...")
from news_fetcher import NewsFetcher
fetcher = NewsFetcher()
# Get articles from BBC
articles = fetcher.fetch_rss_feed("https://feeds.bbci.co.uk/news/rss.xml")
print(f"✅ Got {len(articles)} articles")
# Use first 5 articles for testing
test_articles = articles[:5]
for i, article in enumerate(test_articles):
print(f" {i+1}. {article['title'][:50]}...")
# Step 2: Test embeddings
print("\n2. Testing embeddings generation...")
from embeddings import EmbeddingGenerator
embedding_gen = EmbeddingGenerator()
print(f" Using model: {'Cohere' if embedding_gen.use_cohere else 'Sentence Transformers'}")
# Generate embeddings
embeddings = embedding_gen.generate_embeddings(test_articles)
print(f"✅ Generated embeddings: {embeddings.shape}")
# Step 3: Test vector store
print("\n3. Testing vector store...")
from vector_store import VectorStore
# Clear any existing index for clean test
vector_store = VectorStore()
vector_store.clear_index()
# Add articles to vector store
vector_store.add_articles(test_articles, embeddings)
stats = vector_store.get_stats()
print(f"✅ Vector store: {stats['total_articles']} articles, dimension {stats['index_dimension']}")
# Step 4: Test similarity search
print("\n4. Testing similarity search...")
# Test query
query = "technology artificial intelligence"
query_embedding = embedding_gen.generate_query_embedding(query)
print(f" Query: '{query}'")
# Search for similar articles
similar_articles = vector_store.search_similar(query_embedding, top_k=3)
if similar_articles:
print(f"✅ Found {len(similar_articles)} similar articles:")
for i, article in enumerate(similar_articles):
score = article.get('similarity_score', 0)
print(f" {i+1}. {article['title'][:45]}... (score: {score:.3f})")
else:
print("⚠️ No similar articles found (threshold might be too high)")
# Step 5: Test recommender system
print("\n5. Testing recommender system...")
from recommender import NewsRecommender
recommender = NewsRecommender()
# Add articles to recommender
result = recommender.add_articles_to_store(test_articles)
if result["success"]:
print(f"✅ Added {result['articles_added']} articles to recommender")
# Test query-based recommendations
recommendations = recommender.recommend_by_query("technology news", top_k=3)
if recommendations:
print(f"✅ Query recommendations: {len(recommendations)} articles")
for i, rec in enumerate(recommendations):
score = rec.get('similarity_score', 0)
print(f" {i+1}. {rec['title'][:45]}... (score: {score:.3f})")
# Test article-based recommendations
if test_articles:
article_id = test_articles[0]['id']
similar_recs = recommender.recommend_by_article_id(article_id, top_k=2)
if similar_recs:
print(f"✅ Article-based recommendations: {len(similar_recs)} articles")
else:
print("⚠️ No article-based recommendations found")
print("\n" + "=" * 50)
print("🎉 AI FEATURES TEST COMPLETED!")
print("✅ News fetching: Working")
print("✅ Embeddings generation: Working")
print("✅ Vector storage: Working")
print("✅ Similarity search: Working")
print("✅ Recommendation system: Working")
return True
if __name__ == "__main__":
try:
test_ai_pipeline()
print("\n🚀 AI-powered news system is fully operational!")
except Exception as e:
print(f"\n❌ Error in AI pipeline: {e}")
import traceback
traceback.print_exc()