""" Browser Agent Module - AI-powered browser automation using LangChain and Playwright """ import asyncio import base64 from typing import Any, Dict, List, Optional from langchain.agents import create_agent from playwright.async_api import Browser, BrowserContext, Page, async_playwright class BrowserController: """Handles browser operations using Playwright""" def __init__(self): self.playwright = None self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.page: Optional[Page] = None self.action_history: List[Dict[str, Any]] = [] async def start(self, headless: bool = False): """Initialize browser instance""" self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch(headless=headless) self.context = await self.browser.new_context( viewport={"width": 1920, "height": 1080} ) self.page = await self.context.new_page() async def stop(self): """Close browser and cleanup""" if self.page: await self.page.close() if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() async def navigate(self, url: str) -> str: """Navigate to a URL""" try: await self.page.goto(url, wait_until="networkidle", timeout=30000) title = await self.page.title() self.action_history.append( {"action": "navigate", "url": url, "title": title} ) return f"Successfully navigated to {url}. Page title: {title}" except Exception as e: return f"Error navigating to {url}: {str(e)}" async def click(self, selector: str) -> str: """Click an element by selector""" try: await self.page.click(selector, timeout=10000) self.action_history.append({"action": "click", "selector": selector}) return f"Successfully clicked element: {selector}" except Exception as e: return f"Error clicking {selector}: {str(e)}" async def type_text(self, selector: str, text: str) -> str: """Type text into an input field""" try: await self.page.fill(selector, text, timeout=10000) self.action_history.append( {"action": "type", "selector": selector, "text": text} ) return f"Successfully typed text into {selector}" except Exception as e: return f"Error typing into {selector}: {str(e)}" async def get_text(self, selector: str) -> str: """Get text content from an element""" try: text = await self.page.text_content(selector, timeout=10000) return f"Text content: {text}" except Exception as e: return f"Error getting text from {selector}: {str(e)}" async def get_page_content(self) -> str: """Get the current page's text content""" try: title = await self.page.title() url = self.page.url # Get visible text from body body_text = await self.page.evaluate(""" () => { return document.body.innerText.slice(0, 5000); } """) return f"URL: {url}\nTitle: {title}\nContent:\n{body_text}" except Exception as e: return f"Error getting page content: {str(e)}" async def screenshot(self) -> Optional[bytes]: """Take a screenshot of the current page""" try: screenshot_bytes = await self.page.screenshot(full_page=False) self.action_history.append({"action": "screenshot"}) return screenshot_bytes except Exception as e: print(f"Error taking screenshot: {str(e)}") return None async def execute_javascript(self, script: str) -> str: """Execute JavaScript on the page""" try: result = await self.page.evaluate(script) self.action_history.append({"action": "javascript", "script": script}) return f"JavaScript executed. Result: {result}" except Exception as e: return f"Error executing JavaScript: {str(e)}" async def get_elements_info(self, selector: str) -> str: """Get information about elements matching a selector""" try: elements = await self.page.query_selector_all(selector) count = len(elements) if count == 0: return f"No elements found matching selector: {selector}" info_list = [] for i, element in enumerate(elements[:5]): # Limit to first 5 text = await element.text_content() info_list.append(f"{i + 1}. {text[:100]}") result = f"Found {count} elements matching '{selector}':\n" + "\n".join( info_list ) if count > 5: result += f"\n... and {count - 5} more" return result except Exception as e: return f"Error getting elements info: {str(e)}" async def scroll(self, direction: str = "down") -> str: """Scroll the page""" try: if direction == "down": await self.page.evaluate("window.scrollBy(0, window.innerHeight)") elif direction == "up": await self.page.evaluate("window.scrollBy(0, -window.innerHeight)") elif direction == "top": await self.page.evaluate("window.scrollTo(0, 0)") elif direction == "bottom": await self.page.evaluate( "window.scrollTo(0, document.body.scrollHeight)" ) self.action_history.append({"action": "scroll", "direction": direction}) return f"Scrolled {direction}" except Exception as e: return f"Error scrolling: {str(e)}" class BrowserAgent: """AI Agent that can control the browser using LangChain""" def __init__(self, openai_api_key: str, model: str = "gpt-4o-mini"): self.browser = BrowserController() self.model = model self.api_key = openai_api_key self.agent = None def _create_tools(self) -> List: """Create LangChain tools from browser methods""" def navigate(url: str) -> str: """Navigate to a URL. Input should be a valid URL starting with http:// or https://""" return asyncio.run(self.browser.navigate(url)) def click(selector: str) -> str: """Click an element on the page. Input should be a CSS selector (e.g., 'button.submit', '#login-btn', 'a[href="/about"]')""" return asyncio.run(self.browser.click(selector)) def type_text(input_str: str) -> str: """Type text into an input field. Input format: 'selector|text' (e.g., 'input[name=email]|test@example.com')""" parts = input_str.split("|", 1) if len(parts) != 2: return "Error: Input must be in format 'selector|text'" return asyncio.run(self.browser.type_text(parts[0], parts[1])) def get_text(selector: str) -> str: """Get text content from an element. Input should be a CSS selector""" return asyncio.run(self.browser.get_text(selector)) def get_page_content(dummy: str = "") -> str: """Get the current page's title, URL, and visible text content. No input needed - just pass empty string.""" return asyncio.run(self.browser.get_page_content()) def scroll(direction: str = "down") -> str: """Scroll the page. Input should be: 'down', 'up', 'top', or 'bottom'""" return asyncio.run(self.browser.scroll(direction)) def get_elements_info(selector: str) -> str: """Get information about elements matching a CSS selector. Returns count and text of matching elements.""" return asyncio.run(self.browser.get_elements_info(selector)) def execute_javascript(script: str) -> str: """Execute JavaScript code on the page. Input should be valid JavaScript code (e.g., 'document.title' or 'document.querySelector("h1").textContent')""" return asyncio.run(self.browser.execute_javascript(script)) # Return list of tool functions return [ navigate, click, type_text, get_text, get_page_content, scroll, get_elements_info, execute_javascript, ] async def initialize(self, headless: bool = False): """Initialize the browser and agent""" await self.browser.start(headless=headless) # Create tools tools = self._create_tools() # System prompt for the agent system_prompt = """You are an AI browser automation assistant. You can control a web browser to help users accomplish tasks. Available actions: - navigate: Go to websites - click: Click buttons, links, or other elements - type_text: Fill in forms and input fields - get_text: Read text from specific elements - get_page_content: Read the current page content - scroll: Scroll the page in different directions - get_elements_info: Find and inspect elements - execute_javascript: Run JavaScript code for complex interactions When given a task: 1. First, understand what the user wants to accomplish 2. Break it down into steps 3. Use get_page_content to understand the current page 4. Use appropriate tools to complete each step 5. Verify your actions worked before moving to the next step Always use CSS selectors for targeting elements (e.g., 'button.login', '#submit-btn', 'input[name=email]'). For typing text, use the format: 'selector|text' Be methodical and explain what you're doing at each step.""" # Create agent using new API self.agent = create_agent( model=self.model, tools=tools, system_prompt=system_prompt, ) async def execute_task(self, task: str) -> Dict[str, Any]: """Execute a task using the AI agent""" try: # Invoke the agent with the task result = await self.agent.ainvoke( {"messages": [{"role": "user", "content": task}]} ) # Extract the final message content output = "Task completed" if result and "messages" in result: messages = result["messages"] if messages and len(messages) > 0: last_message = messages[-1] if hasattr(last_message, "content"): output = last_message.content elif isinstance(last_message, dict) and "content" in last_message: output = last_message["content"] # Take a screenshot screenshot_bytes = await self.browser.screenshot() screenshot_base64 = None if screenshot_bytes: screenshot_base64 = base64.b64encode(screenshot_bytes).decode("utf-8") return { "success": True, "output": output, "screenshot": screenshot_base64, "action_history": self.browser.action_history.copy(), } except Exception as e: return { "success": False, "error": str(e), "screenshot": None, "action_history": self.browser.action_history.copy(), } async def cleanup(self): """Cleanup resources""" await self.browser.stop()