Files

618 lines
23 KiB
Python
Raw Permalink Normal View History

"""
PRODUCTION-READY Stock Intelligence System
Includes: SEC filings, SEDAR+, ownership data, tax info, AGM details, calculated metrics
Can run daily on any stock or full universe
"""
import asyncio
import os
import json
import sys
from datetime import datetime
from typing import List, Dict, Any
# Import all modules
from extract_listings import StockListingExtractor
from database import StockDatabase
from scrape_yahoo_finance import YahooFinanceScraper
from scrape_news_pr import NewsPressScraper
from scrape_serpapi import SerpAPINewsScraper
from scrape_sec_filings import SECFilingScraper
from scrape_sedar import SEDARPlusScraper
from financial_calculator import FinancialMetricsCalculator
from export_csv import CSVExporter
from config import *
class RobustStockIntelligence:
def __init__(self):
self.db = StockDatabase()
self.stats = {
'start_time': datetime.now(),
'stocks_processed': 0,
'financials_scraped': 0,
'news_scraped': 0,
'filings_scraped': 0,
'metrics_calculated': 0,
'errors': []
}
async def step1_extract_listings(self, force_refresh=False):
"""Extract stock listings from exchanges"""
print("\n" + "=" * 70)
print("STEP 1: EXTRACTING STOCK LISTINGS")
print("=" * 70)
listings_file = "data/listings/all_listings_combined.json"
if os.path.exists(listings_file) and not force_refresh:
print(f"📂 Loading existing listings from {listings_file}")
with open(listings_file, 'r') as f:
listings = json.load(f)
print(f"✅ Loaded {len(listings)} stocks from file")
else:
print("🔄 Extracting fresh listings from exchanges...")
extractor = StockListingExtractor()
listings = await extractor.extract_all()
self.stats['stocks_processed'] = len(listings)
return listings
def step2_import_to_database(self):
"""Import listings to database"""
print("\n" + "=" * 70)
print("STEP 2: IMPORTING TO DATABASE")
print("=" * 70)
listings_file = "data/listings/all_listings_combined.json"
if os.path.exists(listings_file):
imported = self.db.import_listings_from_json(listings_file)
return imported
else:
print(f"❌ No listings file found")
return 0
async def step3_scrape_financials(self, stocks: List[Dict], use_serpapi_fallback=True):
"""Scrape financial data with Yahoo Finance"""
print("\n" + "=" * 70)
print("STEP 3: SCRAPING FINANCIAL DATA")
print("=" * 70)
scraper = YahooFinanceScraper()
results = await scraper.scrape_multiple_stocks(stocks)
self.stats['financials_scraped'] = len([r for r in results if not r.get('error')])
# Update database
for result in results:
if not result.get('error'):
self.db.update_coverage(
result['ticker'],
has_financials=True,
has_ttm=True
)
# Insert quote data if available
quote_data = result.get('quote', {})
if quote_data and any(quote_data.values()):
self.db.insert_stock_quote(result['ticker'], quote_data)
return results
async def step4_calculate_metrics(self, financial_data: List[Dict]):
"""Calculate all financial metrics from base numbers"""
print("\n" + "=" * 70)
print("STEP 4: CALCULATING FINANCIAL METRICS")
print("=" * 70)
calculator = FinancialMetricsCalculator()
metrics_calculated = 0
for data in financial_data:
if data.get('error'):
continue
ticker = data['ticker']
print(f" Calculating metrics for {ticker}...")
try:
# Convert Yahoo Finance data to calculator format
base_data = calculator.convert_yahoo_data(data)
# Calculate all metrics
metrics = calculator.calculate_all_metrics(base_data)
# Save metrics to file
metrics_file = f"data/metrics/{ticker}_calculated_metrics.json"
os.makedirs(os.path.dirname(metrics_file), exist_ok=True)
with open(metrics_file, 'w') as f:
json.dump(metrics, f, indent=2)
# Insert metrics into database
current_year = datetime.now().year
self.db.insert_financial_metrics(ticker, current_year, metrics, is_ttm=True)
metrics_calculated += 1
except Exception as e:
print(f" Error calculating metrics: {e}")
self.stats['errors'].append(f"{ticker} metrics: {e}")
self.stats['metrics_calculated'] = metrics_calculated
print(f"✅ Calculated metrics for {metrics_calculated} stocks")
return metrics_calculated
async def step5_scrape_news_pr(self, stocks: List[Dict], use_serpapi=True):
"""Scrape news and press releases"""
print("\n" + "=" * 70)
print("STEP 5: SCRAPING NEWS & PRESS RELEASES")
print("=" * 70)
if use_serpapi:
print("📡 Using SerpAPI for robust news collection...")
scraper = SerpAPINewsScraper()
results = scraper.scrape_multiple_stocks(stocks)
else:
print("🌐 Using direct web scraping...")
scraper = NewsPressScraper()
results = await scraper.scrape_multiple_stocks(stocks)
self.stats['news_scraped'] = len(results)
# Insert articles into database and update coverage
for result in results:
ticker = result['ticker']
news_articles = result.get('news_articles', [])
press_releases = result.get('press_releases', [])
# Insert news articles
for article in news_articles:
self.db.insert_news_article(
ticker=ticker,
title=article.get('title', ''),
source=article.get('source', ''),
published_date=article.get('date', ''),
url=article.get('link') or article.get('url', ''),
snippet=article.get('snippet', '')
)
# Insert press releases as news articles (same table)
for pr in press_releases:
self.db.insert_news_article(
ticker=ticker,
title=pr.get('title', ''),
source=pr.get('source', 'Press Release'),
published_date=pr.get('date', ''),
url=pr.get('link') or pr.get('url', ''),
snippet=pr.get('snippet', '')
)
# Update coverage flags
has_news = len(news_articles) > 0
has_pr = len(press_releases) > 0
self.db.update_coverage(
ticker,
has_news=has_news,
has_press_releases=has_pr
)
return results
async def step6_scrape_sec_filings(self, stocks: List[Dict]):
"""Scrape SEC EDGAR filings (for US-listed stocks)"""
print("\n" + "=" * 70)
print("STEP 6: SCRAPING SEC EDGAR FILINGS")
print("=" * 70)
# Filter for US-listed or cross-listed stocks
us_stocks = [s for s in stocks if s.get('exchange') in ['CBOE', 'NYSE', 'NASDAQ']]
if not us_stocks:
print("⚠️ No US-listed stocks to process")
return []
scraper = SECFilingScraper()
results = []
for stock in us_stocks:
ticker = stock['symbol']
data = await scraper.get_complete_company_data(ticker)
results.append(data)
if not data.get('error'):
# Insert filings into database
filings = data.get('filings', [])
for filing in filings:
self.db.insert_filing(
ticker=ticker,
filing_date=filing.get('filing_date', ''),
filing_type=filing.get('form_type', ''),
title=filing.get('description', ''),
document_url=filing.get('url', ''),
source='SEC EDGAR'
)
# Insert ownership forms
ownership = data.get('insider_ownership', [])
for form in ownership:
self.db.insert_filing(
ticker=ticker,
filing_date=form.get('filing_date', ''),
filing_type=form.get('form_type', ''),
title=f"Insider Transaction - {form.get('owner', '')}",
document_url=form.get('url', ''),
source='SEC EDGAR - Ownership'
)
self.db.update_coverage(
ticker,
has_filings=True
)
self.stats['filings_scraped'] += len([r for r in results if not r.get('error')])
return results
async def step7_scrape_sedar_filings(self, stocks: List[Dict]):
"""Scrape SEDAR+ filings (for Canadian stocks)"""
print("\n" + "=" * 70)
print("STEP 7: SCRAPING SEDAR+ FILINGS")
print("=" * 70)
# Filter for Canadian stocks
canadian_stocks = [s for s in stocks if s.get('exchange') in ['TSX', 'TSXV', 'CSE']]
if not canadian_stocks:
print("⚠️ No Canadian stocks to process")
return []
scraper = SEDARPlusScraper()
results = await scraper.scrape_multiple_companies(canadian_stocks)
# Insert filings and update database
for result in results:
if not result.get('error'):
ticker = result['ticker']
# Insert filings
filings = result.get('filings', [])
for filing in filings:
self.db.insert_filing(
ticker=ticker,
filing_date=filing.get('date', ''),
filing_type=filing.get('type', ''),
title=filing.get('title', ''),
document_url=filing.get('url', ''),
source='SEDAR+'
)
has_agm = bool(result.get('agm_info'))
has_tax = bool(result.get('tax_disclosures'))
self.db.update_coverage(
ticker,
has_filings=True,
has_agm_info=has_agm,
has_tax_disclosures=has_tax
)
self.stats['filings_scraped'] += len([r for r in results if not r.get('error')])
return results
def step8_generate_reports(self):
"""Generate comprehensive reports"""
print("\n" + "=" * 70)
print("STEP 8: GENERATING REPORTS")
print("=" * 70)
reports_dir = "data/reports"
os.makedirs(reports_dir, exist_ok=True)
stocks = self.db.get_all_stocks()
reports_generated = 0
for stock in stocks:
ticker = stock[1]
company_name = stock[2]
exchange = stock[3]
try:
report = self._generate_comprehensive_report(ticker, company_name, exchange)
report_file = f"{reports_dir}/{ticker}_comprehensive_report.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
reports_generated += 1
except Exception as e:
print(f"❌ Error generating report for {ticker}: {e}")
self.stats['errors'].append(f"{ticker} report: {e}")
print(f"✅ Generated {reports_generated} comprehensive reports")
return reports_generated
def step9_export_csv(self):
"""Export all data to CSV files"""
print("\n" + "=" * 70)
print("STEP 9: EXPORTING TO CSV")
print("=" * 70)
exporter = CSVExporter()
files = exporter.export_all()
exporter.close()
return files
def _extract_base_financials(self, yahoo_data: Dict) -> Dict:
"""Extract base financial numbers from Yahoo Finance data"""
base = {}
stats = yahoo_data.get('statistics', {})
profile = yahoo_data.get('profile', {})
# Try to extract numeric values from Yahoo Finance statistics
# This is a simplified version - actual implementation would need more parsing
base['price'] = profile.get('current_price', 0)
# Parse statistics (values come as strings with formatting)
# Example: "1.2B" -> 1200000000
for key, value in stats.items():
if isinstance(value, str):
# Try to convert formatted numbers
try:
if 'B' in value:
base[key] = float(value.replace('B', '').replace(',', '')) * 1_000_000_000
elif 'M' in value:
base[key] = float(value.replace('M', '').replace(',', '')) * 1_000_000
elif 'K' in value:
base[key] = float(value.replace('K', '').replace(',', '')) * 1_000
else:
base[key] = float(value.replace(',', '').replace('%', ''))
except:
pass
return base
def _generate_comprehensive_report(self, ticker: str, company_name: str, exchange: str) -> str:
"""Generate comprehensive report with all data"""
report = []
report.append("=" * 80)
report.append(f"COMPREHENSIVE STOCK INTELLIGENCE REPORT")
report.append(f"Ticker: {ticker} | Company: {company_name} | Exchange: {exchange}")
report.append("=" * 80)
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("")
# Load all available data files
data_sources = {
'yahoo': f"data/financials/{ticker}_yahoo.json",
'metrics': f"data/metrics/{ticker}_calculated_metrics.json",
'news': f"data/news/{ticker}_news_pr.json",
'serpapi': f"data/serpapi_news/{ticker}_serpapi.json",
'sec': f"data/sec_filings/{ticker}_sec_filings.json",
'sedar': f"data/sedar_filings/{ticker}_sedar_data.json"
}
# Financial Data Section
if os.path.exists(data_sources['metrics']):
report.append("[CALCULATED FINANCIAL METRICS]")
report.append("-" * 80)
with open(data_sources['metrics'], 'r') as f:
metrics = json.load(f)
calculator = FinancialMetricsCalculator()
report.append(calculator.format_metrics_for_display(metrics))
report.append("")
# News Section
news_files = [data_sources['news'], data_sources['serpapi']]
all_news = []
for nf in news_files:
if os.path.exists(nf):
with open(nf, 'r') as f:
data = json.load(f)
all_news.extend(data.get('news_articles', []))
if all_news:
report.append("[NEWS ARTICLES - Last 12 Months]")
report.append("-" * 80)
for article in all_news[:15]:
report.append(f"\nTitle: {article.get('title', 'N/A')}")
report.append(f"Source: {article.get('source', 'N/A')}")
report.append(f"Date: {article.get('date', 'N/A')}")
report.append(f"URL: {article.get('link') or article.get('url', 'N/A')}")
report.append("")
# SEC Filings Section
if os.path.exists(data_sources['sec']):
report.append("[SEC EDGAR FILINGS]")
report.append("-" * 80)
with open(data_sources['sec'], 'r') as f:
sec_data = json.load(f)
filings = sec_data.get('filings', [])[:10]
for filing in filings:
report.append(f"\n{filing['form_type']} - {filing['filing_date']}")
report.append(f" {filing.get('description', 'N/A')}")
report.append(f" URL: {filing['url']}")
report.append("")
# SEDAR+ Filings Section
if os.path.exists(data_sources['sedar']):
report.append("[SEDAR+ FILINGS & AGM INFORMATION]")
report.append("-" * 80)
with open(data_sources['sedar'], 'r') as f:
sedar_data = json.load(f)
# AGM Info
agm = sedar_data.get('agm_info', {})
if agm:
report.append("\nAnnual General Meeting:")
report.append(f" Date: {agm.get('date', 'N/A')}")
report.append(f" Location: {agm.get('location', 'N/A')}")
# Recent filings
filings = sedar_data.get('filings', [])[:10]
if filings:
report.append("\nRecent Filings:")
for filing in filings:
report.append(f" - {filing.get('title', 'N/A')[:70]}")
report.append("")
report.append("=" * 80)
report.append("END OF REPORT")
report.append("=" * 80)
return "\n".join(report)
async def run_for_single_stock(self, ticker: str):
"""Run complete analysis for a single stock (daily update mode)"""
print("\n" + "=" * 70)
print(f"DAILY UPDATE FOR STOCK: {ticker}")
print("=" * 70)
# Get stock info from database
self.db.cursor.execute("SELECT * FROM stocks_master WHERE symbol = ?", (ticker,))
stock_data = self.db.cursor.fetchone()
if not stock_data:
print(f"❌ Stock {ticker} not found in database")
return
stock = {
'symbol': stock_data[1],
'name': stock_data[2],
'exchange': stock_data[3]
}
# Run all steps for this one stock
await self.step3_scrape_financials([stock])
await self.step5_scrape_news_pr([stock], use_serpapi=True)
if stock['exchange'] in ['CBOE', 'NYSE', 'NASDAQ']:
await self.step6_scrape_sec_filings([stock])
elif stock['exchange'] in ['TSX', 'TSXV', 'CSE']:
await self.step7_scrape_sedar_filings([stock])
self.step8_generate_reports()
self.step9_export_csv()
print(f"\n✅ Daily update completed for {ticker}")
async def run_full_pipeline(self, test_mode=False, stocks_limit=None):
"""Run complete pipeline"""
print("\n" + "=" * 70)
print("PRODUCTION-READY STOCK INTELLIGENCE SYSTEM")
print("=" * 70)
print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if test_mode:
print("⚠️ RUNNING IN TEST MODE")
print("=" * 70)
try:
# Step 1: Get listings
listings = await self.step1_extract_listings()
if not listings:
print("\n❌ No listings found")
return
# Step 2: Import to database
self.step2_import_to_database()
# Limit stocks if requested
if stocks_limit:
listings = listings[:stocks_limit]
print(f"\n⚠️ Limited to {stocks_limit} stocks for testing")
# Step 3: Scrape financials
financial_data = await self.step3_scrape_financials(listings)
# Step 4: Calculate metrics
await self.step4_calculate_metrics(financial_data)
# Step 5: Scrape news (using SerpAPI for robustness)
await self.step5_scrape_news_pr(listings, use_serpapi=True)
# Step 6 & 7: Scrape filings
await self.step6_scrape_sec_filings(listings)
await self.step7_scrape_sedar_filings(listings)
# Step 8: Generate reports
self.step8_generate_reports()
# Step 9: Export to CSV
self.step9_export_csv()
# Print final stats
self._print_final_stats()
print("\n✅ PIPELINE COMPLETED SUCCESSFULLY!")
except Exception as e:
print(f"\n❌ Pipeline failed: {e}")
import traceback
traceback.print_exc()
finally:
self.db.close()
def _print_final_stats(self):
"""Print final statistics"""
end_time = datetime.now()
duration = end_time - self.stats['start_time']
print("\n" + "=" * 70)
print("FINAL STATISTICS")
print("=" * 70)
print(f"Duration: {duration}")
print(f"Stocks processed: {self.stats['stocks_processed']}")
print(f"Financials scraped: {self.stats['financials_scraped']}")
print(f"Metrics calculated: {self.stats['metrics_calculated']}")
print(f"News articles collected: {self.stats['news_scraped']}")
print(f"Filings scraped: {self.stats['filings_scraped']}")
print(f"Errors: {len(self.stats['errors'])}")
print("=" * 70)
async def main():
"""Main entry point"""
orchestrator = RobustStockIntelligence()
# Check command line arguments
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "--ticker" and len(sys.argv) > 2:
# Daily update for single stock
ticker = sys.argv[2].upper()
await orchestrator.run_for_single_stock(ticker)
elif command == "--full":
# Full pipeline, all stocks
await orchestrator.run_full_pipeline(test_mode=False)
elif command == "--test":
# Test mode with limited stocks
limit = int(sys.argv[2]) if len(sys.argv) > 2 else 5
await orchestrator.run_full_pipeline(test_mode=True, stocks_limit=limit)
else:
print("Usage:")
print(" python main_robust.py --test [num] # Test mode with N stocks")
print(" python main_robust.py --full # Full pipeline, all stocks")
print(" python main_robust.py --ticker SYMBOL # Daily update for one stock")
else:
# Default: test mode with 5 stocks
print("\n⚠️ No arguments provided. Running in test mode (5 stocks)")
print(" Use --help to see options")
await orchestrator.run_full_pipeline(test_mode=True, stocks_limit=5)
if __name__ == "__main__":
asyncio.run(main())