Files
microcap_scrapping/main.py
T
Aherobo Ovie Victor 80ee708348 feat: Implement stock listing extraction and database population
- Added `extract_listings.py` for extracting stock listings from TSX, TSXV, CSE, and CBOE using Playwright.
- Created `main.py` to orchestrate the entire stock intelligence system, including extraction, database import, financial scraping, news scraping, and report generation.
- Developed `populate_database.py` to populate the database with existing JSON data.
- Introduced `scrape_nasdaq_tsx_only.py` for focused scraping of NASDAQ and TSX stocks.
- Added `setup.py` for initial setup and testing of the system.
- Created `watchlist.txt` template for user-defined stock tracking.
- Generated `final_test_output.txt` to log the results of the test run.
2025-11-06 12:34:01 +01:00

335 lines
12 KiB
Python

"""
Main orchestrator script for the Stock Intelligence System
Runs all steps in sequence
"""
import asyncio
import os
import json
from datetime import datetime
import sys
# Import our modules
from extract_listings import StockListingExtractor
from database import StockDatabase
from scrape_yahoo_finance import YahooFinanceScraper
from scrape_news_pr import NewsPressScraper
class StockIntelligenceOrchestrator:
def __init__(self):
self.db = StockDatabase()
self.stats = {
'start_time': datetime.now(),
'stocks_extracted': 0,
'financials_scraped': 0,
'news_scraped': 0,
'errors': []
}
async def step1_extract_listings(self):
"""Step 1: Extract stock listings from exchanges"""
print("\n" + "=" * 70)
print("STEP 1: EXTRACTING STOCK LISTINGS FROM EXCHANGES")
print("=" * 70)
extractor = StockListingExtractor()
listings = await extractor.extract_all()
self.stats['stocks_extracted'] = len(listings)
return listings
def step2_import_to_database(self):
"""Step 2: Import listings to database"""
print("\n" + "=" * 70)
print("STEP 2: IMPORTING LISTINGS TO DATABASE")
print("=" * 70)
listings_file = "data/listings/all_listings_combined.json"
if os.path.exists(listings_file):
imported = self.db.import_listings_from_json(listings_file)
print(f"✅ Imported {imported} stocks to database")
return imported
else:
print(f"❌ Listings file not found: {listings_file}")
return 0
async def step3_scrape_financials(self, max_stocks=None):
"""Step 3: Scrape financial data from Yahoo Finance"""
print("\n" + "=" * 70)
print("STEP 3: SCRAPING FINANCIAL DATA")
print("=" * 70)
# Get stocks from database
stocks = self.db.get_all_stocks()
print(f"📊 Found {len(stocks)} stocks in database")
if max_stocks:
stocks = stocks[:max_stocks]
print(f"⚠️ Limiting to {max_stocks} stocks for testing")
# Convert to list format for scraper
stock_list = []
for stock in stocks:
stock_list.append({
'symbol': stock[1], # symbol column
'name': stock[2], # company_name column
'exchange': stock[3] # exchange column
})
scraper = YahooFinanceScraper()
results = await scraper.scrape_multiple_stocks(stock_list, max_stocks=max_stocks)
self.stats['financials_scraped'] = len([r for r in results if not r.get('error')])
# Update coverage in database
for result in results:
if not result.get('error'):
self.db.update_coverage(
result['ticker'],
has_financials=True,
has_ttm=True
)
return results
async def step4_scrape_news_pr(self, max_stocks=None):
"""Step 4: Scrape news and press releases"""
print("\n" + "=" * 70)
print("STEP 4: SCRAPING NEWS & PRESS RELEASES")
print("=" * 70)
# Get stocks from database
stocks = self.db.get_all_stocks()
if max_stocks:
stocks = stocks[:max_stocks]
print(f"⚠️ Limiting to {max_stocks} stocks for testing")
# Convert to list format
stock_list = []
for stock in stocks:
stock_list.append({
'symbol': stock[1],
'name': stock[2],
'exchange': stock[3]
})
scraper = NewsPressScraper()
results = await scraper.scrape_multiple_stocks(stock_list, max_stocks=max_stocks)
self.stats['news_scraped'] = len(results)
# Update coverage in database
for result in results:
has_news = len(result.get('news_articles', [])) > 0
has_pr = len(result.get('press_releases', [])) > 0
self.db.update_coverage(
result['ticker'],
has_news=has_news,
has_press_releases=has_pr
)
return results
def step5_generate_reports(self):
"""Step 5: Generate text reports for each stock"""
print("\n" + "=" * 70)
print("STEP 5: GENERATING STOCK REPORTS")
print("=" * 70)
reports_dir = "data/reports"
os.makedirs(reports_dir, exist_ok=True)
# Get all stocks
stocks = self.db.get_all_stocks()
reports_generated = 0
for stock in stocks:
ticker = stock[1]
company_name = stock[2]
exchange = stock[3]
try:
report = self.generate_stock_report(ticker, company_name, exchange)
# Save report
report_file = f"{reports_dir}/{ticker}_report.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
reports_generated += 1
except Exception as e:
print(f"❌ Error generating report for {ticker}: {e}")
self.stats['errors'].append(f"{ticker}: {e}")
print(f"✅ Generated {reports_generated} stock reports")
print(f"📁 Reports saved to: {reports_dir}/")
return reports_generated
def generate_stock_report(self, ticker, company_name, exchange):
"""Generate a comprehensive text report for a stock"""
report = []
report.append("=" * 70)
report.append(f"STOCK INTELLIGENCE REPORT: {ticker}")
report.append("=" * 70)
report.append(f"Company: {company_name}")
report.append(f"Exchange: {exchange}")
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 70)
report.append("")
# Load financial data if available
financials_file = f"data/financials/{ticker}_yahoo.json"
if os.path.exists(financials_file):
report.append("[FINANCIAL DATA]")
report.append("-" * 70)
with open(financials_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if data.get('profile'):
report.append("\nProfile:")
for key, value in data['profile'].items():
report.append(f" {key}: {value}")
if data.get('statistics'):
report.append("\nKey Statistics:")
for key, value in data['statistics'].items():
report.append(f" {key}: {value}")
report.append("")
# Load news if available
news_file = f"data/news/{ticker}_news_pr.json"
if os.path.exists(news_file):
with open(news_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if data.get('news_articles'):
report.append("[NEWS ARTICLES - Last 12 Months]")
report.append("-" * 70)
for article in data['news_articles'][:10]:
report.append(f"\nTitle: {article.get('title', 'N/A')}")
report.append(f"Source: {article.get('source', 'N/A')}")
report.append(f"Date: {article.get('date', 'N/A')}")
report.append(f"URL: {article.get('url', 'N/A')}")
if article.get('snippet'):
report.append(f"Snippet: {article['snippet']}")
report.append("")
if data.get('press_releases'):
report.append("[PRESS RELEASES]")
report.append("-" * 70)
for pr in data['press_releases'][:10]:
report.append(f"\nTitle: {pr.get('title', 'N/A')}")
report.append(f"Source: {pr.get('source', 'N/A')}")
report.append(f"Date: {pr.get('date', 'N/A')}")
report.append(f"URL: {pr.get('url', 'N/A')}")
report.append("")
report.append("=" * 70)
report.append("END OF REPORT")
report.append("=" * 70)
return "\n".join(report)
def print_final_stats(self):
"""Print final statistics"""
end_time = datetime.now()
duration = end_time - self.stats['start_time']
print("\n" + "=" * 70)
print("FINAL STATISTICS")
print("=" * 70)
print(f"Duration: {duration}")
print(f"Stocks extracted: {self.stats['stocks_extracted']}")
print(f"Financials scraped: {self.stats['financials_scraped']}")
print(f"News scraped: {self.stats['news_scraped']}")
print(f"Errors: {len(self.stats['errors'])}")
if self.stats['errors']:
print("\nError summary:")
for error in self.stats['errors'][:10]:
print(f" - {error}")
# Get coverage report from database
coverage = self.db.get_coverage_report()
print(f"\nCoverage Report:")
print(f" Total stocks tracked: {len(coverage)}")
complete = sum(1 for c in coverage if c[2] and c[3] and c[4] and c[5])
print(f" Fully covered stocks: {complete}")
print("=" * 70)
async def run_full_pipeline(self, test_mode=True):
"""Run the full data collection pipeline"""
print("\n" + "=" * 70)
print("STOCK INTELLIGENCE AUTOMATION SYSTEM")
print("Starting full pipeline...")
if test_mode:
print("⚠️ RUNNING IN TEST MODE (limited stocks)")
print("=" * 70)
try:
# Step 1: Extract listings
listings = await self.step1_extract_listings()
if not listings:
print("\n❌ No listings extracted. Check if websites are accessible.")
return
# Step 2: Import to database
self.step2_import_to_database()
# Step 3: Scrape financials
if test_mode:
await self.step3_scrape_financials(max_stocks=5)
else:
await self.step3_scrape_financials()
# Step 4: Scrape news & PR
if test_mode:
await self.step4_scrape_news_pr(max_stocks=3)
else:
await self.step4_scrape_news_pr()
# Step 5: Generate reports
self.step5_generate_reports()
# Print stats
self.print_final_stats()
print("\n✅ Pipeline completed successfully!")
except Exception as e:
print(f"\n❌ Pipeline failed with error: {e}")
import traceback
traceback.print_exc()
finally:
self.db.close()
async def main():
"""Main entry point"""
# Check if running in test mode
test_mode = "--full" not in sys.argv
if test_mode:
print("\n⚠️ Running in TEST MODE (limited stocks)")
print(" To run full pipeline, use: python main.py --full")
orchestrator = StockIntelligenceOrchestrator()
await orchestrator.run_full_pipeline(test_mode=test_mode)
if __name__ == "__main__":
asyncio.run(main())