Add assets for a new 'Scheduling web scrapers' article
This commit is contained in:
@@ -0,0 +1,42 @@
|
||||
import asyncio
|
||||
import time
|
||||
from firecrawl_scraper import save_firecrawl_news_data
|
||||
|
||||
|
||||
async def schedule_scraper(interval_hours: float = 1):
|
||||
"""
|
||||
Schedule the scraper to run at specified intervals
|
||||
|
||||
Args:
|
||||
interval_hours (float): Hours between each scrape (can be decimal for shorter periods)
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
print(f"Starting scrape at {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
# Run the scraper
|
||||
filename = save_firecrawl_news_data()
|
||||
print(f"Data saved to {filename}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during scraping: {e}")
|
||||
|
||||
# Wait for the specified interval
|
||||
await asyncio.sleep(interval_hours * 20) # Convert hours to seconds
|
||||
|
||||
|
||||
async def main():
|
||||
# Create tasks for different scheduling intervals
|
||||
tasks = [
|
||||
schedule_scraper(interval_hours=1), # Run every hour
|
||||
# Add more tasks with different intervals if needed
|
||||
# schedule_scraper(interval_hours=0.5), # Run every 30 minutes
|
||||
# schedule_scraper(interval_hours=2), # Run every 2 hours
|
||||
]
|
||||
|
||||
# Run all tasks concurrently
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run the async scheduler
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,98 @@
|
||||
import json
|
||||
import requests
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from pydantic import BaseModel
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class NewsItem(BaseModel):
|
||||
title: str
|
||||
source_url: str
|
||||
author: str
|
||||
rank: str
|
||||
upvotes: str
|
||||
date: str
|
||||
|
||||
|
||||
BASE_URL = "https://news.ycombinator.com/"
|
||||
|
||||
|
||||
def get_page_content():
|
||||
"""
|
||||
Send a GET request to the Hacker News homepage and return the HTML content.
|
||||
"""
|
||||
response = requests.get(BASE_URL)
|
||||
return response.text
|
||||
|
||||
|
||||
def get_title_rows(html_content, class_name):
|
||||
"""
|
||||
Parse the HTML content and return the first table row.
|
||||
"""
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
title_rows = soup.find("table").find_all("tr", {"class": class_name})
|
||||
return title_rows
|
||||
|
||||
|
||||
def get_subtext_rows(html_content):
|
||||
"""
|
||||
Parse the HTML content and return the subtext row.
|
||||
"""
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
subtext_rows = soup.find("table").find_all("td", {"class": "subtext"})
|
||||
return subtext_rows
|
||||
|
||||
|
||||
def get_news_data():
|
||||
"""
|
||||
Extract the news data from the table row.
|
||||
"""
|
||||
title_rows = get_title_rows(get_page_content(), "athing submission")
|
||||
subtext_rows = get_subtext_rows(get_page_content())
|
||||
|
||||
news_data = []
|
||||
|
||||
for title_row, subtext_row in zip(title_rows, subtext_rows):
|
||||
# Extract title information from the title row
|
||||
title_span = title_row.find("span", {"class": "titleline"})
|
||||
title = title_span.a.text
|
||||
url = title_span.a["href"]
|
||||
rank = title_row.find("span", {"class": "rank"}).text
|
||||
|
||||
# Extract metadata from the subtext row
|
||||
author = BASE_URL + subtext_row.find("a", {"class": "hnuser"})["href"]
|
||||
upvotes = subtext_row.find("span", {"class": "score"}).text
|
||||
date = subtext_row.find("span", {"class": "age"}).get("title").split(" ")[0]
|
||||
|
||||
news_data.append(
|
||||
NewsItem(
|
||||
title=title,
|
||||
source_url=url,
|
||||
author=author,
|
||||
rank=rank,
|
||||
upvotes=upvotes,
|
||||
date=date,
|
||||
)
|
||||
)
|
||||
|
||||
return news_data
|
||||
|
||||
|
||||
def save_news_data():
|
||||
"""
|
||||
Save the scraped news data to a JSON file with the current date in the filename.
|
||||
"""
|
||||
|
||||
news_data = get_news_data()
|
||||
current_date = datetime.now().strftime("%Y_%m_%d_%H_%M")
|
||||
filename = f"hacker_news_data_{current_date}.json"
|
||||
|
||||
with open(filename, "w") as f:
|
||||
json.dump([item.dict() for item in news_data], f, indent=4)
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
save_news_data()
|
||||
@@ -0,0 +1,30 @@
|
||||
# cron_scraper.py
|
||||
import sys
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from firecrawl_scraper import save_firecrawl_news_data
|
||||
|
||||
# Set up logging
|
||||
log_dir = Path("logs")
|
||||
log_dir.mkdir(exist_ok=True)
|
||||
log_file = log_dir / f"scraper_{datetime.now().strftime('%Y_%m')}.log"
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
handlers=[logging.FileHandler(log_file), logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
logging.info("Starting scraping job")
|
||||
filename = save_firecrawl_news_data()
|
||||
logging.info(f"Successfully saved data to {filename}")
|
||||
except Exception as e:
|
||||
logging.error(f"Scraping failed: {str(e)}", exc_info=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,61 @@
|
||||
# firecrawl_scraper.py
|
||||
import json
|
||||
from firecrawl import FirecrawlApp
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import List
|
||||
from datetime import datetime
|
||||
|
||||
load_dotenv()
|
||||
|
||||
BASE_URL = "https://news.ycombinator.com/"
|
||||
|
||||
|
||||
class NewsItem(BaseModel):
|
||||
title: str = Field(description="The title of the news item")
|
||||
source_url: str = Field(description="The URL of the news item")
|
||||
author: str = Field(
|
||||
description="The URL of the post author's profile concatenated with the base URL."
|
||||
)
|
||||
rank: str = Field(description="The rank of the news item")
|
||||
upvotes: str = Field(description="The number of upvotes of the news item")
|
||||
date: str = Field(description="The date of the news item.")
|
||||
|
||||
|
||||
class NewsData(BaseModel):
|
||||
news_items: List[NewsItem]
|
||||
|
||||
|
||||
def get_firecrawl_news_data():
|
||||
app = FirecrawlApp()
|
||||
|
||||
data = app.scrape_url(
|
||||
BASE_URL,
|
||||
params={
|
||||
"formats": ["extract"],
|
||||
"extract": {"schema": NewsData.model_json_schema()},
|
||||
},
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def save_firecrawl_news_data():
|
||||
"""
|
||||
Save the scraped news data to a JSON file with the current date in the filename.
|
||||
"""
|
||||
# Get the data
|
||||
data = get_firecrawl_news_data()
|
||||
# Format current date for filename
|
||||
date_str = datetime.now().strftime("%Y_%m_%d_%H_%M")
|
||||
filename = f"firecrawl_hacker_news_data_{date_str}.json"
|
||||
|
||||
# Save the news items to JSON file
|
||||
with open(filename, "w") as f:
|
||||
json.dump(data["extract"]["news_items"], f, indent=4)
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
save_firecrawl_news_data()
|
||||
@@ -0,0 +1,10 @@
|
||||
import schedule
|
||||
import time
|
||||
from firecrawl_scraper import save_firecrawl_news_data
|
||||
|
||||
# Schedule the scraper to run every hour
|
||||
schedule.every().hour.do(save_firecrawl_news_data)
|
||||
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(1)
|
||||
Reference in New Issue
Block a user