"""FastAPI backend for DS Task AI News""" from fastapi import FastAPI, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Dict, Any, Optional import uvicorn import time from collections import defaultdict from config import settings from news_fetcher import NewsFetcher from recommender import NewsRecommender from ai_analyzer import AIAnalyzer # Groq integration try: from groq import Groq groq_client = Groq(api_key=settings.groq_api_key) if settings.groq_api_key else None groq_available = groq_client is not None if groq_available: print("✅ Groq LLM service initialized") else: print("⚠️ Groq API key not provided") except Exception as e: print(f"⚠️ Groq initialization failed: {e}") groq_client = None groq_available = False # Initialize FastAPI app app = FastAPI( title="DS Task AI News API", description="AI-powered news retrieval and recommendation system", version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify actual origins allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize components news_fetcher = NewsFetcher() recommender = NewsRecommender() ai_analyzer = AIAnalyzer() # Simple rate limiter rate_limit_storage = defaultdict(list) RATE_LIMIT_REQUESTS = 100 # requests per minute RATE_LIMIT_WINDOW = 60 # seconds def check_rate_limit(client_ip: str) -> bool: """Check if client has exceeded rate limit""" current_time = time.time() # Clean old requests rate_limit_storage[client_ip] = [ req_time for req_time in rate_limit_storage[client_ip] if current_time - req_time < RATE_LIMIT_WINDOW ] # Check if limit exceeded if len(rate_limit_storage[client_ip]) >= RATE_LIMIT_REQUESTS: return False # Add current request rate_limit_storage[client_ip].append(current_time) return True # Pydantic models class NewsQuery(BaseModel): query: str top_k: int = 5 class InterestsQuery(BaseModel): interests: List[str] top_k: int = 10 class SearchQuery(BaseModel): query: str source: Optional[str] = None category: Optional[str] = None date_from: Optional[str] = None date_to: Optional[str] = None top_k: int = 10 include_content: bool = False class AnalyzeRequest(BaseModel): article_id: str class InsightsRequest(BaseModel): article_count: int = 5 # API Endpoints @app.get("/") async def root(): """Health check endpoint""" return { "message": "DS Task AI News API is running!", "version": "1.0.0", "status": "healthy" } @app.get("/health") async def health_check(): """Detailed health check""" stats = recommender.get_store_stats() return { "status": "healthy", "vector_store": stats, "settings": { "embedding_model": settings.embedding_model, "vector_db_type": settings.vector_db_type, "rss_feeds_count": len(settings.rss_feeds) } } @app.post("/fetch-news") async def fetch_news(): """Fetch news from RSS feeds and add to vector store""" try: # Fetch news articles result = news_fetcher.fetch_and_save_news() if not result["success"]: raise HTTPException(status_code=500, detail=result.get("message", "Failed to fetch news")) # Add articles to vector store articles = result["articles"] store_result = recommender.add_articles_to_store(articles) if not store_result["success"]: raise HTTPException(status_code=500, detail=store_result.get("message", "Failed to add articles to store")) return { "success": True, "message": "News fetched and processed successfully", "articles_fetched": result["articles_count"], "articles_stored": store_result["articles_added"], "total_articles": store_result["total_articles"] } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching news: {str(e)}") @app.get("/recommend-news") async def recommend_news( article_id: str = Query(..., description="ID of the article to find similar articles for"), top_k: int = Query(5, description="Number of recommendations to return") ): """Get news recommendations based on article ID""" try: recommendations = recommender.recommend_by_article_id(article_id, top_k) return { "success": True, "article_id": article_id, "recommendations": recommendations, "count": len(recommendations) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting recommendations: {str(e)}") @app.post("/recommend-by-query") async def recommend_by_query(query_data: NewsQuery): """Get news recommendations based on text query""" try: recommendations = recommender.recommend_by_query(query_data.query, query_data.top_k) return { "success": True, "query": query_data.query, "recommendations": recommendations, "count": len(recommendations) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting recommendations: {str(e)}") @app.post("/recommend-by-interests") async def recommend_by_interests(interests_data: InterestsQuery): """Get news recommendations based on user interests""" try: recommendations = recommender.recommend_by_interests(interests_data.interests, interests_data.top_k) return { "success": True, "interests": interests_data.interests, "recommendations": recommendations, "count": len(recommendations) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting recommendations: {str(e)}") @app.get("/trending") async def get_trending_news(top_k: int = Query(10, description="Number of trending articles to return")): """Get trending news articles""" try: trending = recommender.get_trending_articles(top_k) return { "success": True, "trending_articles": trending, "count": len(trending) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting trending news: {str(e)}") @app.get("/articles") async def get_all_articles( source: Optional[str] = Query(None, description="Filter by news source"), limit: int = Query(50, description="Maximum number of articles to return"), offset: int = Query(0, description="Number of articles to skip for pagination"), category: Optional[str] = Query(None, description="Filter by article category"), date_from: Optional[str] = Query(None, description="Filter articles from this date (YYYY-MM-DD)"), date_to: Optional[str] = Query(None, description="Filter articles to this date (YYYY-MM-DD)") ): """Get all articles with pagination and advanced filtering""" try: # Get all articles first all_articles = recommender.vector_store.get_all_articles() # Apply filters filtered_articles = all_articles # Filter by source if source: filtered_articles = [a for a in filtered_articles if a.get('source', '').lower() == source.lower()] # Filter by category (if articles have categories) if category: filtered_articles = [a for a in filtered_articles if category.lower() in [cat.lower() for cat in a.get('categories', [])]] # Filter by date range if date_from or date_to: from datetime import datetime def parse_date(date_str): try: return datetime.fromisoformat(date_str.replace('Z', '+00:00')) except: try: return datetime.strptime(date_str, '%Y-%m-%d') except: return None if date_from: from_date = parse_date(date_from) if from_date: filtered_articles = [a for a in filtered_articles if parse_date(a.get('published_date', '')) and parse_date(a.get('published_date', '')) >= from_date] if date_to: to_date = parse_date(date_to) if to_date: filtered_articles = [a for a in filtered_articles if parse_date(a.get('published_date', '')) and parse_date(a.get('published_date', '')) <= to_date] # Sort by published date (newest first) filtered_articles = sorted(filtered_articles, key=lambda x: x.get('published_date', ''), reverse=True) # Calculate pagination total_count = len(filtered_articles) start_idx = offset end_idx = offset + limit paginated_articles = filtered_articles[start_idx:end_idx] # Calculate pagination metadata has_next = end_idx < total_count has_prev = offset > 0 total_pages = (total_count + limit - 1) // limit # Ceiling division current_page = (offset // limit) + 1 return { "success": True, "articles": paginated_articles, "pagination": { "total_count": total_count, "count": len(paginated_articles), "limit": limit, "offset": offset, "current_page": current_page, "total_pages": total_pages, "has_next": has_next, "has_prev": has_prev, "next_offset": end_idx if has_next else None, "prev_offset": max(0, offset - limit) if has_prev else None }, "filters": { "source": source, "category": category, "date_from": date_from, "date_to": date_to } } except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting articles: {str(e)}") @app.post("/search") async def search_articles(search_data: SearchQuery, request: Request): """Advanced search with multiple filters and semantic similarity""" try: # Rate limiting client_ip = request.client.host if not check_rate_limit(client_ip): raise HTTPException(status_code=429, detail="Rate limit exceeded. Please try again later.") # Get semantic search results first semantic_results = recommender.search_articles(search_data.query, {}, search_data.top_k * 2) # Apply additional filters filtered_results = semantic_results # Filter by source if search_data.source: filtered_results = [r for r in filtered_results if r.get('source', '').lower() == search_data.source.lower()] # Filter by category if search_data.category: filtered_results = [r for r in filtered_results if search_data.category.lower() in [cat.lower() for cat in r.get('categories', [])]] # Filter by date range if search_data.date_from or search_data.date_to: from datetime import datetime def parse_date(date_str): try: return datetime.fromisoformat(date_str.replace('Z', '+00:00')) except: try: return datetime.strptime(date_str, '%Y-%m-%d') except: return None if search_data.date_from: from_date = parse_date(search_data.date_from) if from_date: filtered_results = [r for r in filtered_results if parse_date(r.get('published_date', '')) and parse_date(r.get('published_date', '')) >= from_date] if search_data.date_to: to_date = parse_date(search_data.date_to) if to_date: filtered_results = [r for r in filtered_results if parse_date(r.get('published_date', '')) and parse_date(r.get('published_date', '')) <= to_date] # Limit results to requested amount final_results = filtered_results[:search_data.top_k] # Optionally include full content if not search_data.include_content: for result in final_results: if 'content' in result and len(result['content']) > 200: result['content'] = result['content'][:200] + "..." return { "success": True, "query": search_data.query, "filters": { "source": search_data.source, "category": search_data.category, "date_from": search_data.date_from, "date_to": search_data.date_to }, "results": final_results, "count": len(final_results), "total_semantic_matches": len(semantic_results), "filtered_matches": len(filtered_results) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error searching articles: {str(e)}") @app.get("/stats") async def get_stats(): """Get system statistics""" try: stats = recommender.get_store_stats() # Add RSS feed information stats['rss_feeds'] = settings.rss_feeds stats['embedding_model'] = settings.embedding_model stats['groq_available'] = groq_available return { "success": True, "statistics": stats } except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting stats: {str(e)}") # AI Analysis Endpoints @app.post("/analyze-article") async def analyze_article(request: AnalyzeRequest): """Analyze a specific article with AI""" try: # Get article from vector store articles = recommender.vector_store.get_all_articles() article = next((a for a in articles if a.get('id') == request.article_id), None) if not article: raise HTTPException(status_code=404, detail="Article not found") # Perform AI analysis summary = ai_analyzer.summarize_article(article) keywords = ai_analyzer.extract_keywords(article) sentiment = ai_analyzer.analyze_sentiment(article) return { "success": True, "article_id": request.article_id, "analysis": { "summary": summary, "keywords": keywords, "sentiment": sentiment } } except Exception as e: raise HTTPException(status_code=500, detail=f"Error analyzing article: {str(e)}") @app.post("/generate-insights") async def generate_insights(request: InsightsRequest): """Generate AI insights from recent articles""" try: # Get recent articles recent_articles = recommender.get_trending_articles(request.article_count) # Generate insights insights = ai_analyzer.generate_insights(recent_articles) return { "success": True, "insights": insights, "article_count": len(recent_articles) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error generating insights: {str(e)}") @app.get("/ai-status") async def get_ai_status(): """Get AI analyzer status and capabilities""" try: status = ai_analyzer.get_status() return { "success": True, "ai_status": status } except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting AI status: {str(e)}") # Run the application if __name__ == "__main__": uvicorn.run( "main:app", host=settings.host, port=settings.port, reload=settings.debug )