131 lines
5.0 KiB
Python
131 lines
5.0 KiB
Python
import csv
|
|
import json
|
|
import os
|
|
|
|
from dotenv import load_dotenv
|
|
from firecrawl import FirecrawlApp
|
|
from openai import OpenAI
|
|
from serpapi import GoogleSearch
|
|
from swarm import Agent
|
|
from swarm.repl import run_demo_loop
|
|
|
|
load_dotenv()
|
|
|
|
# Initialize FirecrawlApp and OpenAI
|
|
app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
|
|
|
def search_google(query, objective):
|
|
"""Search Google using SerpAPI."""
|
|
print(f"Parameters: query={query}, objective={objective}")
|
|
search = GoogleSearch({"q": query, "api_key": os.getenv("SERP_API_KEY")})
|
|
results = search.get_dict().get("organic_results", [])
|
|
return {"objective": objective, "results": results}
|
|
|
|
def scrape_url(url, objective):
|
|
"""Scrape a website using Firecrawl."""
|
|
print(f"Parameters: url={url}, objective={objective}")
|
|
scrape_status = app.scrape_url(
|
|
url,
|
|
params={'formats': ['markdown']}
|
|
)
|
|
return {"objective": objective, "results": scrape_status}
|
|
|
|
def crawl_url(url, objective):
|
|
"""Crawl a website using Firecrawl."""
|
|
print(f"Parameters: url={url}, objective={objective}")
|
|
# If using a crawled url set, pass the ID in the function call below
|
|
# scrape_status = app.check_crawl_status("c99c9598-5a21-46d3-bced-3444a8b1942d")
|
|
# scrape_status['results'] = scrape_status['data']
|
|
scrape_status = app.crawl_url(
|
|
url,
|
|
params={'limit': 10, 'scrapeOptions': {'formats': ['markdown']}}
|
|
)
|
|
return {"objective": objective, "results": scrape_status}
|
|
|
|
def analyze_website_content(content, objective):
|
|
"""Analyze the scraped website content using OpenAI."""
|
|
print(f"Parameters: content={content[:50]}..., objective={objective}")
|
|
analysis = generate_completion(
|
|
"website data extractor",
|
|
f"Analyze the following website content and extract a JSON object based on the objective. Do not write the ```json and ``` to denote a JSON when returning a response",
|
|
"Objective: " + objective + "\nContent: " + content
|
|
)
|
|
return {"objective": objective, "results": json.loads(analysis)}
|
|
|
|
def generate_completion(role, task, content):
|
|
"""Generate a completion using OpenAI."""
|
|
print(f"Parameters: role={role}, task={task[:50]}..., content={content[:50]}...")
|
|
response = client.chat.completions.create(
|
|
model="gpt-4o",
|
|
messages=[
|
|
{"role": "system", "content": f"You are a {role}. {task}"},
|
|
{"role": "user", "content": content}
|
|
]
|
|
)
|
|
return response.choices[0].message.content
|
|
|
|
def read_websites_from_csv(file_path):
|
|
"""Read websites from a CSV file."""
|
|
websites = []
|
|
with open(file_path, mode='r') as file:
|
|
csv_reader = csv.DictReader(file)
|
|
for row in csv_reader:
|
|
websites.append(row['website'])
|
|
return websites
|
|
|
|
def write_results_to_json(results, file_path):
|
|
"""Write results to a JSON file."""
|
|
with open(file_path, mode='w', encoding='utf-8') as file:
|
|
json.dump(json.loads(results), file, ensure_ascii=False)
|
|
|
|
def handoff_to_search_google():
|
|
"""Hand off the search query to the search google agent."""
|
|
return google_search_agent
|
|
|
|
def handoff_to_map_url():
|
|
"""Hand off the url to the map url agent."""
|
|
return crawl_website_agent
|
|
|
|
def handoff_to_analyst():
|
|
"""Hand off the website content to the analyst agent."""
|
|
return analyst_agent
|
|
|
|
def handoff_to_writer():
|
|
"""Hand off the results to the writer agent."""
|
|
return writer_agent
|
|
|
|
user_interface_agent = Agent(
|
|
name="User Interface Agent",
|
|
instructions="You are a user interface agent that handles all interactions with the user. You need to always start with reading a CSV, then perform web data extraction objective that the user wants to achieve by searching the web, crawling the website URL, and extracting the content from a specific page. Be concise.",
|
|
functions=[read_websites_from_csv, handoff_to_search_google],
|
|
)
|
|
|
|
google_search_agent = Agent(
|
|
name="Google Search Agent",
|
|
instructions="You are a google search agent specialized in searching the web. Only search for the website not any specific page. When you are done, you must hand off to the crawl agent.",
|
|
functions=[search_google, handoff_to_map_url],
|
|
)
|
|
|
|
crawl_website_agent = Agent(
|
|
name="Crawl Website Agent",
|
|
instructions="You are a crawl url agent specialized in crawling the web pages. When you are done, you must hand off the results to the analyst agent.",
|
|
functions=[crawl_url, handoff_to_analyst],
|
|
)
|
|
|
|
analyst_agent = Agent(
|
|
name="Analyst Agent",
|
|
instructions="You are an analyst agent that examines website content and returns a JSON object. When you are done, you must hand off the results to the writer agent.",
|
|
functions=[analyze_website_content, handoff_to_writer],
|
|
)
|
|
|
|
writer_agent = Agent(
|
|
name="Writer Agent",
|
|
instructions="You are a writer agent that writes the final results to a JSON file.",
|
|
functions=[write_results_to_json],
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
# Run the demo loop with the user interface agent
|
|
run_demo_loop(user_interface_agent, stream=True)
|