Files
manus_ai_clone/browser_agent.py
T

299 lines
12 KiB
Python
Raw Normal View History

2025-11-05 01:03:10 +01:00
"""
Browser Agent Module - AI-powered browser automation using LangChain and Playwright
"""
import asyncio
import base64
from typing import Any, Dict, List, Optional
from langchain.agents import create_agent
from playwright.async_api import Browser, BrowserContext, Page, async_playwright
class BrowserController:
"""Handles browser operations using Playwright"""
def __init__(self):
self.playwright = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.action_history: List[Dict[str, Any]] = []
async def start(self, headless: bool = False):
"""Initialize browser instance"""
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=headless)
self.context = await self.browser.new_context(
viewport={"width": 1920, "height": 1080}
)
self.page = await self.context.new_page()
async def stop(self):
"""Close browser and cleanup"""
if self.page:
await self.page.close()
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
async def navigate(self, url: str) -> str:
"""Navigate to a URL"""
try:
await self.page.goto(url, wait_until="networkidle", timeout=30000)
title = await self.page.title()
self.action_history.append(
{"action": "navigate", "url": url, "title": title}
)
return f"Successfully navigated to {url}. Page title: {title}"
except Exception as e:
return f"Error navigating to {url}: {str(e)}"
async def click(self, selector: str) -> str:
"""Click an element by selector"""
try:
await self.page.click(selector, timeout=10000)
self.action_history.append({"action": "click", "selector": selector})
return f"Successfully clicked element: {selector}"
except Exception as e:
return f"Error clicking {selector}: {str(e)}"
async def type_text(self, selector: str, text: str) -> str:
"""Type text into an input field"""
try:
await self.page.fill(selector, text, timeout=10000)
self.action_history.append(
{"action": "type", "selector": selector, "text": text}
)
return f"Successfully typed text into {selector}"
except Exception as e:
return f"Error typing into {selector}: {str(e)}"
async def get_text(self, selector: str) -> str:
"""Get text content from an element"""
try:
text = await self.page.text_content(selector, timeout=10000)
return f"Text content: {text}"
except Exception as e:
return f"Error getting text from {selector}: {str(e)}"
async def get_page_content(self) -> str:
"""Get the current page's text content"""
try:
title = await self.page.title()
url = self.page.url
# Get visible text from body
body_text = await self.page.evaluate("""
() => {
return document.body.innerText.slice(0, 5000);
}
""")
return f"URL: {url}\nTitle: {title}\nContent:\n{body_text}"
except Exception as e:
return f"Error getting page content: {str(e)}"
async def screenshot(self) -> Optional[bytes]:
"""Take a screenshot of the current page"""
try:
screenshot_bytes = await self.page.screenshot(full_page=False)
self.action_history.append({"action": "screenshot"})
return screenshot_bytes
except Exception as e:
print(f"Error taking screenshot: {str(e)}")
return None
async def execute_javascript(self, script: str) -> str:
"""Execute JavaScript on the page"""
try:
result = await self.page.evaluate(script)
self.action_history.append({"action": "javascript", "script": script})
return f"JavaScript executed. Result: {result}"
except Exception as e:
return f"Error executing JavaScript: {str(e)}"
async def get_elements_info(self, selector: str) -> str:
"""Get information about elements matching a selector"""
try:
elements = await self.page.query_selector_all(selector)
count = len(elements)
if count == 0:
return f"No elements found matching selector: {selector}"
info_list = []
for i, element in enumerate(elements[:5]): # Limit to first 5
text = await element.text_content()
info_list.append(f"{i + 1}. {text[:100]}")
result = f"Found {count} elements matching '{selector}':\n" + "\n".join(
info_list
)
if count > 5:
result += f"\n... and {count - 5} more"
return result
except Exception as e:
return f"Error getting elements info: {str(e)}"
async def scroll(self, direction: str = "down") -> str:
"""Scroll the page"""
try:
if direction == "down":
await self.page.evaluate("window.scrollBy(0, window.innerHeight)")
elif direction == "up":
await self.page.evaluate("window.scrollBy(0, -window.innerHeight)")
elif direction == "top":
await self.page.evaluate("window.scrollTo(0, 0)")
elif direction == "bottom":
await self.page.evaluate(
"window.scrollTo(0, document.body.scrollHeight)"
)
self.action_history.append({"action": "scroll", "direction": direction})
return f"Scrolled {direction}"
except Exception as e:
return f"Error scrolling: {str(e)}"
class BrowserAgent:
"""AI Agent that can control the browser using LangChain"""
def __init__(self, openai_api_key: str, model: str = "gpt-4o-mini"):
self.browser = BrowserController()
self.model = model
self.api_key = openai_api_key
self.agent = None
def _create_tools(self) -> List:
"""Create LangChain tools from browser methods"""
def navigate(url: str) -> str:
"""Navigate to a URL. Input should be a valid URL starting with http:// or https://"""
return asyncio.run(self.browser.navigate(url))
def click(selector: str) -> str:
"""Click an element on the page. Input should be a CSS selector (e.g., 'button.submit', '#login-btn', 'a[href="/about"]')"""
return asyncio.run(self.browser.click(selector))
def type_text(input_str: str) -> str:
"""Type text into an input field. Input format: 'selector|text' (e.g., 'input[name=email]|test@example.com')"""
parts = input_str.split("|", 1)
if len(parts) != 2:
return "Error: Input must be in format 'selector|text'"
return asyncio.run(self.browser.type_text(parts[0], parts[1]))
def get_text(selector: str) -> str:
"""Get text content from an element. Input should be a CSS selector"""
return asyncio.run(self.browser.get_text(selector))
def get_page_content(dummy: str = "") -> str:
"""Get the current page's title, URL, and visible text content. No input needed - just pass empty string."""
return asyncio.run(self.browser.get_page_content())
def scroll(direction: str = "down") -> str:
"""Scroll the page. Input should be: 'down', 'up', 'top', or 'bottom'"""
return asyncio.run(self.browser.scroll(direction))
def get_elements_info(selector: str) -> str:
"""Get information about elements matching a CSS selector. Returns count and text of matching elements."""
return asyncio.run(self.browser.get_elements_info(selector))
def execute_javascript(script: str) -> str:
"""Execute JavaScript code on the page. Input should be valid JavaScript code (e.g., 'document.title' or 'document.querySelector("h1").textContent')"""
return asyncio.run(self.browser.execute_javascript(script))
# Return list of tool functions
return [
navigate,
click,
type_text,
get_text,
get_page_content,
scroll,
get_elements_info,
execute_javascript,
]
async def initialize(self, headless: bool = False):
"""Initialize the browser and agent"""
await self.browser.start(headless=headless)
# Create tools
tools = self._create_tools()
# System prompt for the agent
system_prompt = """You are an AI browser automation assistant. You can control a web browser to help users accomplish tasks.
Available actions:
- navigate: Go to websites
- click: Click buttons, links, or other elements
- type_text: Fill in forms and input fields
- get_text: Read text from specific elements
- get_page_content: Read the current page content
- scroll: Scroll the page in different directions
- get_elements_info: Find and inspect elements
- execute_javascript: Run JavaScript code for complex interactions
When given a task:
1. First, understand what the user wants to accomplish
2. Break it down into steps
3. Use get_page_content to understand the current page
4. Use appropriate tools to complete each step
5. Verify your actions worked before moving to the next step
Always use CSS selectors for targeting elements (e.g., 'button.login', '#submit-btn', 'input[name=email]').
For typing text, use the format: 'selector|text'
Be methodical and explain what you're doing at each step."""
# Create agent using new API
self.agent = create_agent(
model=self.model,
tools=tools,
system_prompt=system_prompt,
)
async def execute_task(self, task: str) -> Dict[str, Any]:
"""Execute a task using the AI agent"""
try:
# Invoke the agent with the task
result = await self.agent.ainvoke(
{"messages": [{"role": "user", "content": task}]}
)
# Extract the final message content
output = "Task completed"
if result and "messages" in result:
messages = result["messages"]
if messages and len(messages) > 0:
last_message = messages[-1]
if hasattr(last_message, "content"):
output = last_message.content
elif isinstance(last_message, dict) and "content" in last_message:
output = last_message["content"]
# Take a screenshot
screenshot_bytes = await self.browser.screenshot()
screenshot_base64 = None
if screenshot_bytes:
screenshot_base64 = base64.b64encode(screenshot_bytes).decode("utf-8")
return {
"success": True,
"output": output,
"screenshot": screenshot_base64,
"action_history": self.browser.action_history.copy(),
}
except Exception as e:
return {
"success": False,
"error": str(e),
"screenshot": None,
"action_history": self.browser.action_history.copy(),
}
async def cleanup(self):
"""Cleanup resources"""
await self.browser.stop()