299 lines
12 KiB
Python
299 lines
12 KiB
Python
"""
|
|
Browser Agent Module - AI-powered browser automation using LangChain and Playwright
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from langchain.agents import create_agent
|
|
from playwright.async_api import Browser, BrowserContext, Page, async_playwright
|
|
|
|
|
|
class BrowserController:
|
|
"""Handles browser operations using Playwright"""
|
|
|
|
def __init__(self):
|
|
self.playwright = None
|
|
self.browser: Optional[Browser] = None
|
|
self.context: Optional[BrowserContext] = None
|
|
self.page: Optional[Page] = None
|
|
self.action_history: List[Dict[str, Any]] = []
|
|
|
|
async def start(self, headless: bool = False):
|
|
"""Initialize browser instance"""
|
|
self.playwright = await async_playwright().start()
|
|
self.browser = await self.playwright.chromium.launch(headless=headless)
|
|
self.context = await self.browser.new_context(
|
|
viewport={"width": 1920, "height": 1080}
|
|
)
|
|
self.page = await self.context.new_page()
|
|
|
|
async def stop(self):
|
|
"""Close browser and cleanup"""
|
|
if self.page:
|
|
await self.page.close()
|
|
if self.context:
|
|
await self.context.close()
|
|
if self.browser:
|
|
await self.browser.close()
|
|
if self.playwright:
|
|
await self.playwright.stop()
|
|
|
|
async def navigate(self, url: str) -> str:
|
|
"""Navigate to a URL"""
|
|
try:
|
|
await self.page.goto(url, wait_until="networkidle", timeout=30000)
|
|
title = await self.page.title()
|
|
self.action_history.append(
|
|
{"action": "navigate", "url": url, "title": title}
|
|
)
|
|
return f"Successfully navigated to {url}. Page title: {title}"
|
|
except Exception as e:
|
|
return f"Error navigating to {url}: {str(e)}"
|
|
|
|
async def click(self, selector: str) -> str:
|
|
"""Click an element by selector"""
|
|
try:
|
|
await self.page.click(selector, timeout=10000)
|
|
self.action_history.append({"action": "click", "selector": selector})
|
|
return f"Successfully clicked element: {selector}"
|
|
except Exception as e:
|
|
return f"Error clicking {selector}: {str(e)}"
|
|
|
|
async def type_text(self, selector: str, text: str) -> str:
|
|
"""Type text into an input field"""
|
|
try:
|
|
await self.page.fill(selector, text, timeout=10000)
|
|
self.action_history.append(
|
|
{"action": "type", "selector": selector, "text": text}
|
|
)
|
|
return f"Successfully typed text into {selector}"
|
|
except Exception as e:
|
|
return f"Error typing into {selector}: {str(e)}"
|
|
|
|
async def get_text(self, selector: str) -> str:
|
|
"""Get text content from an element"""
|
|
try:
|
|
text = await self.page.text_content(selector, timeout=10000)
|
|
return f"Text content: {text}"
|
|
except Exception as e:
|
|
return f"Error getting text from {selector}: {str(e)}"
|
|
|
|
async def get_page_content(self) -> str:
|
|
"""Get the current page's text content"""
|
|
try:
|
|
title = await self.page.title()
|
|
url = self.page.url
|
|
# Get visible text from body
|
|
body_text = await self.page.evaluate("""
|
|
() => {
|
|
return document.body.innerText.slice(0, 5000);
|
|
}
|
|
""")
|
|
return f"URL: {url}\nTitle: {title}\nContent:\n{body_text}"
|
|
except Exception as e:
|
|
return f"Error getting page content: {str(e)}"
|
|
|
|
async def screenshot(self) -> Optional[bytes]:
|
|
"""Take a screenshot of the current page"""
|
|
try:
|
|
screenshot_bytes = await self.page.screenshot(full_page=False)
|
|
self.action_history.append({"action": "screenshot"})
|
|
return screenshot_bytes
|
|
except Exception as e:
|
|
print(f"Error taking screenshot: {str(e)}")
|
|
return None
|
|
|
|
async def execute_javascript(self, script: str) -> str:
|
|
"""Execute JavaScript on the page"""
|
|
try:
|
|
result = await self.page.evaluate(script)
|
|
self.action_history.append({"action": "javascript", "script": script})
|
|
return f"JavaScript executed. Result: {result}"
|
|
except Exception as e:
|
|
return f"Error executing JavaScript: {str(e)}"
|
|
|
|
async def get_elements_info(self, selector: str) -> str:
|
|
"""Get information about elements matching a selector"""
|
|
try:
|
|
elements = await self.page.query_selector_all(selector)
|
|
count = len(elements)
|
|
if count == 0:
|
|
return f"No elements found matching selector: {selector}"
|
|
|
|
info_list = []
|
|
for i, element in enumerate(elements[:5]): # Limit to first 5
|
|
text = await element.text_content()
|
|
info_list.append(f"{i + 1}. {text[:100]}")
|
|
|
|
result = f"Found {count} elements matching '{selector}':\n" + "\n".join(
|
|
info_list
|
|
)
|
|
if count > 5:
|
|
result += f"\n... and {count - 5} more"
|
|
return result
|
|
except Exception as e:
|
|
return f"Error getting elements info: {str(e)}"
|
|
|
|
async def scroll(self, direction: str = "down") -> str:
|
|
"""Scroll the page"""
|
|
try:
|
|
if direction == "down":
|
|
await self.page.evaluate("window.scrollBy(0, window.innerHeight)")
|
|
elif direction == "up":
|
|
await self.page.evaluate("window.scrollBy(0, -window.innerHeight)")
|
|
elif direction == "top":
|
|
await self.page.evaluate("window.scrollTo(0, 0)")
|
|
elif direction == "bottom":
|
|
await self.page.evaluate(
|
|
"window.scrollTo(0, document.body.scrollHeight)"
|
|
)
|
|
|
|
self.action_history.append({"action": "scroll", "direction": direction})
|
|
return f"Scrolled {direction}"
|
|
except Exception as e:
|
|
return f"Error scrolling: {str(e)}"
|
|
|
|
|
|
class BrowserAgent:
|
|
"""AI Agent that can control the browser using LangChain"""
|
|
|
|
def __init__(self, openai_api_key: str, model: str = "gpt-4o-mini"):
|
|
self.browser = BrowserController()
|
|
self.model = model
|
|
self.api_key = openai_api_key
|
|
self.agent = None
|
|
|
|
def _create_tools(self) -> List:
|
|
"""Create LangChain tools from browser methods"""
|
|
|
|
def navigate(url: str) -> str:
|
|
"""Navigate to a URL. Input should be a valid URL starting with http:// or https://"""
|
|
return asyncio.run(self.browser.navigate(url))
|
|
|
|
def click(selector: str) -> str:
|
|
"""Click an element on the page. Input should be a CSS selector (e.g., 'button.submit', '#login-btn', 'a[href="/about"]')"""
|
|
return asyncio.run(self.browser.click(selector))
|
|
|
|
def type_text(input_str: str) -> str:
|
|
"""Type text into an input field. Input format: 'selector|text' (e.g., 'input[name=email]|test@example.com')"""
|
|
parts = input_str.split("|", 1)
|
|
if len(parts) != 2:
|
|
return "Error: Input must be in format 'selector|text'"
|
|
return asyncio.run(self.browser.type_text(parts[0], parts[1]))
|
|
|
|
def get_text(selector: str) -> str:
|
|
"""Get text content from an element. Input should be a CSS selector"""
|
|
return asyncio.run(self.browser.get_text(selector))
|
|
|
|
def get_page_content(dummy: str = "") -> str:
|
|
"""Get the current page's title, URL, and visible text content. No input needed - just pass empty string."""
|
|
return asyncio.run(self.browser.get_page_content())
|
|
|
|
def scroll(direction: str = "down") -> str:
|
|
"""Scroll the page. Input should be: 'down', 'up', 'top', or 'bottom'"""
|
|
return asyncio.run(self.browser.scroll(direction))
|
|
|
|
def get_elements_info(selector: str) -> str:
|
|
"""Get information about elements matching a CSS selector. Returns count and text of matching elements."""
|
|
return asyncio.run(self.browser.get_elements_info(selector))
|
|
|
|
def execute_javascript(script: str) -> str:
|
|
"""Execute JavaScript code on the page. Input should be valid JavaScript code (e.g., 'document.title' or 'document.querySelector("h1").textContent')"""
|
|
return asyncio.run(self.browser.execute_javascript(script))
|
|
|
|
# Return list of tool functions
|
|
return [
|
|
navigate,
|
|
click,
|
|
type_text,
|
|
get_text,
|
|
get_page_content,
|
|
scroll,
|
|
get_elements_info,
|
|
execute_javascript,
|
|
]
|
|
|
|
async def initialize(self, headless: bool = False):
|
|
"""Initialize the browser and agent"""
|
|
await self.browser.start(headless=headless)
|
|
|
|
# Create tools
|
|
tools = self._create_tools()
|
|
|
|
# System prompt for the agent
|
|
system_prompt = """You are an AI browser automation assistant. You can control a web browser to help users accomplish tasks.
|
|
|
|
Available actions:
|
|
- navigate: Go to websites
|
|
- click: Click buttons, links, or other elements
|
|
- type_text: Fill in forms and input fields
|
|
- get_text: Read text from specific elements
|
|
- get_page_content: Read the current page content
|
|
- scroll: Scroll the page in different directions
|
|
- get_elements_info: Find and inspect elements
|
|
- execute_javascript: Run JavaScript code for complex interactions
|
|
|
|
When given a task:
|
|
1. First, understand what the user wants to accomplish
|
|
2. Break it down into steps
|
|
3. Use get_page_content to understand the current page
|
|
4. Use appropriate tools to complete each step
|
|
5. Verify your actions worked before moving to the next step
|
|
|
|
Always use CSS selectors for targeting elements (e.g., 'button.login', '#submit-btn', 'input[name=email]').
|
|
For typing text, use the format: 'selector|text'
|
|
|
|
Be methodical and explain what you're doing at each step."""
|
|
|
|
# Create agent using new API
|
|
self.agent = create_agent(
|
|
model=self.model,
|
|
tools=tools,
|
|
system_prompt=system_prompt,
|
|
)
|
|
|
|
async def execute_task(self, task: str) -> Dict[str, Any]:
|
|
"""Execute a task using the AI agent"""
|
|
try:
|
|
# Invoke the agent with the task
|
|
result = await self.agent.ainvoke(
|
|
{"messages": [{"role": "user", "content": task}]}
|
|
)
|
|
|
|
# Extract the final message content
|
|
output = "Task completed"
|
|
if result and "messages" in result:
|
|
messages = result["messages"]
|
|
if messages and len(messages) > 0:
|
|
last_message = messages[-1]
|
|
if hasattr(last_message, "content"):
|
|
output = last_message.content
|
|
elif isinstance(last_message, dict) and "content" in last_message:
|
|
output = last_message["content"]
|
|
|
|
# Take a screenshot
|
|
screenshot_bytes = await self.browser.screenshot()
|
|
screenshot_base64 = None
|
|
if screenshot_bytes:
|
|
screenshot_base64 = base64.b64encode(screenshot_bytes).decode("utf-8")
|
|
|
|
return {
|
|
"success": True,
|
|
"output": output,
|
|
"screenshot": screenshot_base64,
|
|
"action_history": self.browser.action_history.copy(),
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"screenshot": None,
|
|
"action_history": self.browser.action_history.copy(),
|
|
}
|
|
|
|
async def cleanup(self):
|
|
"""Cleanup resources"""
|
|
await self.browser.stop()
|