|
|
import asyncio |
|
|
import base64 |
|
|
import json |
|
|
from typing import Generic, Optional, TypeVar |
|
|
|
|
|
from browser_use import Browser as BrowserUseBrowser |
|
|
from browser_use import BrowserConfig |
|
|
from browser_use.browser.context import BrowserContext, BrowserContextConfig |
|
|
from browser_use.dom.service import DomService |
|
|
from pydantic import Field, field_validator |
|
|
from pydantic_core.core_schema import ValidationInfo |
|
|
|
|
|
from app.config import config |
|
|
from app.llm import LLM |
|
|
from app.tool.base import BaseTool, ToolResult |
|
|
from app.tool.web_search import WebSearch |
|
|
|
|
|
|
|
|
_BROWSER_DESCRIPTION = """\ |
|
|
A powerful browser automation tool that allows interaction with web pages through various actions. |
|
|
* This tool provides commands for controlling a browser session, navigating web pages, and extracting information |
|
|
* It maintains state across calls, keeping the browser session alive until explicitly closed |
|
|
* Use this when you need to browse websites, fill forms, click buttons, extract content, or perform web searches |
|
|
* Each action requires specific parameters as defined in the tool's dependencies |
|
|
|
|
|
Key capabilities include: |
|
|
* Navigation: Go to specific URLs, go back, search the web, or refresh pages |
|
|
* Interaction: Click elements, input text, select from dropdowns, send keyboard commands |
|
|
* Scrolling: Scroll up/down by pixel amount or scroll to specific text |
|
|
* Content extraction: Extract and analyze content from web pages based on specific goals |
|
|
* Tab management: Switch between tabs, open new tabs, or close tabs |
|
|
|
|
|
Note: When using element indices, refer to the numbered elements shown in the current browser state. |
|
|
""" |
|
|
|
|
|
Context = TypeVar("Context") |
|
|
|
|
|
|
|
|
class BrowserUseTool(BaseTool, Generic[Context]): |
|
|
name: str = "browser_use" |
|
|
description: str = _BROWSER_DESCRIPTION |
|
|
parameters: dict = { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"action": { |
|
|
"type": "string", |
|
|
"enum": [ |
|
|
"go_to_url", |
|
|
"click_element", |
|
|
"input_text", |
|
|
"scroll_down", |
|
|
"scroll_up", |
|
|
"scroll_to_text", |
|
|
"send_keys", |
|
|
"get_dropdown_options", |
|
|
"select_dropdown_option", |
|
|
"go_back", |
|
|
"web_search", |
|
|
"wait", |
|
|
"extract_content", |
|
|
"switch_tab", |
|
|
"open_tab", |
|
|
"close_tab", |
|
|
], |
|
|
"description": "The browser action to perform", |
|
|
}, |
|
|
"url": { |
|
|
"type": "string", |
|
|
"description": "URL for 'go_to_url' or 'open_tab' actions", |
|
|
}, |
|
|
"index": { |
|
|
"type": "integer", |
|
|
"description": "Element index for 'click_element', 'input_text', 'get_dropdown_options', or 'select_dropdown_option' actions", |
|
|
}, |
|
|
"text": { |
|
|
"type": "string", |
|
|
"description": "Text for 'input_text', 'scroll_to_text', or 'select_dropdown_option' actions", |
|
|
}, |
|
|
"scroll_amount": { |
|
|
"type": "integer", |
|
|
"description": "Pixels to scroll (positive for down, negative for up) for 'scroll_down' or 'scroll_up' actions", |
|
|
}, |
|
|
"tab_id": { |
|
|
"type": "integer", |
|
|
"description": "Tab ID for 'switch_tab' action", |
|
|
}, |
|
|
"query": { |
|
|
"type": "string", |
|
|
"description": "Search query for 'web_search' action", |
|
|
}, |
|
|
"goal": { |
|
|
"type": "string", |
|
|
"description": "Extraction goal for 'extract_content' action", |
|
|
}, |
|
|
"keys": { |
|
|
"type": "string", |
|
|
"description": "Keys to send for 'send_keys' action", |
|
|
}, |
|
|
"seconds": { |
|
|
"type": "integer", |
|
|
"description": "Seconds to wait for 'wait' action", |
|
|
}, |
|
|
}, |
|
|
"required": ["action"], |
|
|
"dependencies": { |
|
|
"go_to_url": ["url"], |
|
|
"click_element": ["index"], |
|
|
"input_text": ["index", "text"], |
|
|
"switch_tab": ["tab_id"], |
|
|
"open_tab": ["url"], |
|
|
"scroll_down": ["scroll_amount"], |
|
|
"scroll_up": ["scroll_amount"], |
|
|
"scroll_to_text": ["text"], |
|
|
"send_keys": ["keys"], |
|
|
"get_dropdown_options": ["index"], |
|
|
"select_dropdown_option": ["index", "text"], |
|
|
"go_back": [], |
|
|
"web_search": ["query"], |
|
|
"wait": ["seconds"], |
|
|
"extract_content": ["goal"], |
|
|
}, |
|
|
} |
|
|
|
|
|
lock: asyncio.Lock = Field(default_factory=asyncio.Lock) |
|
|
browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True) |
|
|
context: Optional[BrowserContext] = Field(default=None, exclude=True) |
|
|
dom_service: Optional[DomService] = Field(default=None, exclude=True) |
|
|
web_search_tool: WebSearch = Field(default_factory=WebSearch, exclude=True) |
|
|
|
|
|
|
|
|
tool_context: Optional[Context] = Field(default=None, exclude=True) |
|
|
|
|
|
llm: Optional[LLM] = Field(default_factory=LLM) |
|
|
|
|
|
@field_validator("parameters", mode="before") |
|
|
def validate_parameters(cls, v: dict, info: ValidationInfo) -> dict: |
|
|
if not v: |
|
|
raise ValueError("Parameters cannot be empty") |
|
|
return v |
|
|
|
|
|
async def _ensure_browser_initialized(self) -> BrowserContext: |
|
|
"""Ensure browser and context are initialized.""" |
|
|
if self.browser is None: |
|
|
browser_config_kwargs = {"headless": False, "disable_security": True} |
|
|
|
|
|
if config.browser_config: |
|
|
from browser_use.browser.browser import ProxySettings |
|
|
|
|
|
|
|
|
if config.browser_config.proxy and config.browser_config.proxy.server: |
|
|
browser_config_kwargs["proxy"] = ProxySettings( |
|
|
server=config.browser_config.proxy.server, |
|
|
username=config.browser_config.proxy.username, |
|
|
password=config.browser_config.proxy.password, |
|
|
) |
|
|
|
|
|
browser_attrs = [ |
|
|
"headless", |
|
|
"disable_security", |
|
|
"extra_chromium_args", |
|
|
"chrome_instance_path", |
|
|
"wss_url", |
|
|
"cdp_url", |
|
|
] |
|
|
|
|
|
for attr in browser_attrs: |
|
|
value = getattr(config.browser_config, attr, None) |
|
|
if value is not None: |
|
|
if not isinstance(value, list) or value: |
|
|
browser_config_kwargs[attr] = value |
|
|
|
|
|
self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs)) |
|
|
|
|
|
if self.context is None: |
|
|
context_config = BrowserContextConfig() |
|
|
|
|
|
|
|
|
if ( |
|
|
config.browser_config |
|
|
and hasattr(config.browser_config, "new_context_config") |
|
|
and config.browser_config.new_context_config |
|
|
): |
|
|
context_config = config.browser_config.new_context_config |
|
|
|
|
|
self.context = await self.browser.new_context(context_config) |
|
|
self.dom_service = DomService(await self.context.get_current_page()) |
|
|
|
|
|
return self.context |
|
|
|
|
|
async def execute( |
|
|
self, |
|
|
action: str, |
|
|
url: Optional[str] = None, |
|
|
index: Optional[int] = None, |
|
|
text: Optional[str] = None, |
|
|
scroll_amount: Optional[int] = None, |
|
|
tab_id: Optional[int] = None, |
|
|
query: Optional[str] = None, |
|
|
goal: Optional[str] = None, |
|
|
keys: Optional[str] = None, |
|
|
seconds: Optional[int] = None, |
|
|
**kwargs, |
|
|
) -> ToolResult: |
|
|
""" |
|
|
Execute a specified browser action. |
|
|
|
|
|
Args: |
|
|
action: The browser action to perform |
|
|
url: URL for navigation or new tab |
|
|
index: Element index for click or input actions |
|
|
text: Text for input action or search query |
|
|
scroll_amount: Pixels to scroll for scroll action |
|
|
tab_id: Tab ID for switch_tab action |
|
|
query: Search query for Google search |
|
|
goal: Extraction goal for content extraction |
|
|
keys: Keys to send for keyboard actions |
|
|
seconds: Seconds to wait |
|
|
**kwargs: Additional arguments |
|
|
|
|
|
Returns: |
|
|
ToolResult with the action's output or error |
|
|
""" |
|
|
async with self.lock: |
|
|
try: |
|
|
context = await self._ensure_browser_initialized() |
|
|
|
|
|
|
|
|
max_content_length = getattr( |
|
|
config.browser_config, "max_content_length", 2000 |
|
|
) |
|
|
|
|
|
|
|
|
if action == "go_to_url": |
|
|
if not url: |
|
|
return ToolResult( |
|
|
error="URL is required for 'go_to_url' action" |
|
|
) |
|
|
page = await context.get_current_page() |
|
|
await page.goto(url) |
|
|
await page.wait_for_load_state() |
|
|
return ToolResult(output=f"Navigated to {url}") |
|
|
|
|
|
elif action == "go_back": |
|
|
await context.go_back() |
|
|
return ToolResult(output="Navigated back") |
|
|
|
|
|
elif action == "refresh": |
|
|
await context.refresh_page() |
|
|
return ToolResult(output="Refreshed current page") |
|
|
|
|
|
elif action == "web_search": |
|
|
if not query: |
|
|
return ToolResult( |
|
|
error="Query is required for 'web_search' action" |
|
|
) |
|
|
|
|
|
search_response = await self.web_search_tool.execute( |
|
|
query=query, fetch_content=True, num_results=1 |
|
|
) |
|
|
|
|
|
first_search_result = search_response.results[0] |
|
|
url_to_navigate = first_search_result.url |
|
|
|
|
|
page = await context.get_current_page() |
|
|
await page.goto(url_to_navigate) |
|
|
await page.wait_for_load_state() |
|
|
|
|
|
return search_response |
|
|
|
|
|
|
|
|
elif action == "click_element": |
|
|
if index is None: |
|
|
return ToolResult( |
|
|
error="Index is required for 'click_element' action" |
|
|
) |
|
|
element = await context.get_dom_element_by_index(index) |
|
|
if not element: |
|
|
return ToolResult(error=f"Element with index {index} not found") |
|
|
download_path = await context._click_element_node(element) |
|
|
output = f"Clicked element at index {index}" |
|
|
if download_path: |
|
|
output += f" - Downloaded file to {download_path}" |
|
|
return ToolResult(output=output) |
|
|
|
|
|
elif action == "input_text": |
|
|
if index is None or not text: |
|
|
return ToolResult( |
|
|
error="Index and text are required for 'input_text' action" |
|
|
) |
|
|
element = await context.get_dom_element_by_index(index) |
|
|
if not element: |
|
|
return ToolResult(error=f"Element with index {index} not found") |
|
|
await context._input_text_element_node(element, text) |
|
|
return ToolResult( |
|
|
output=f"Input '{text}' into element at index {index}" |
|
|
) |
|
|
|
|
|
elif action == "scroll_down" or action == "scroll_up": |
|
|
direction = 1 if action == "scroll_down" else -1 |
|
|
amount = ( |
|
|
scroll_amount |
|
|
if scroll_amount is not None |
|
|
else context.config.browser_window_size["height"] |
|
|
) |
|
|
await context.execute_javascript( |
|
|
f"window.scrollBy(0, {direction * amount});" |
|
|
) |
|
|
return ToolResult( |
|
|
output=f"Scrolled {'down' if direction > 0 else 'up'} by {amount} pixels" |
|
|
) |
|
|
|
|
|
elif action == "scroll_to_text": |
|
|
if not text: |
|
|
return ToolResult( |
|
|
error="Text is required for 'scroll_to_text' action" |
|
|
) |
|
|
page = await context.get_current_page() |
|
|
try: |
|
|
locator = page.get_by_text(text, exact=False) |
|
|
await locator.scroll_into_view_if_needed() |
|
|
return ToolResult(output=f"Scrolled to text: '{text}'") |
|
|
except Exception as e: |
|
|
return ToolResult(error=f"Failed to scroll to text: {str(e)}") |
|
|
|
|
|
elif action == "send_keys": |
|
|
if not keys: |
|
|
return ToolResult( |
|
|
error="Keys are required for 'send_keys' action" |
|
|
) |
|
|
page = await context.get_current_page() |
|
|
await page.keyboard.press(keys) |
|
|
return ToolResult(output=f"Sent keys: {keys}") |
|
|
|
|
|
elif action == "get_dropdown_options": |
|
|
if index is None: |
|
|
return ToolResult( |
|
|
error="Index is required for 'get_dropdown_options' action" |
|
|
) |
|
|
element = await context.get_dom_element_by_index(index) |
|
|
if not element: |
|
|
return ToolResult(error=f"Element with index {index} not found") |
|
|
page = await context.get_current_page() |
|
|
options = await page.evaluate( |
|
|
""" |
|
|
(xpath) => { |
|
|
const select = document.evaluate(xpath, document, null, |
|
|
XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; |
|
|
if (!select) return null; |
|
|
return Array.from(select.options).map(opt => ({ |
|
|
text: opt.text, |
|
|
value: opt.value, |
|
|
index: opt.index |
|
|
})); |
|
|
} |
|
|
""", |
|
|
element.xpath, |
|
|
) |
|
|
return ToolResult(output=f"Dropdown options: {options}") |
|
|
|
|
|
elif action == "select_dropdown_option": |
|
|
if index is None or not text: |
|
|
return ToolResult( |
|
|
error="Index and text are required for 'select_dropdown_option' action" |
|
|
) |
|
|
element = await context.get_dom_element_by_index(index) |
|
|
if not element: |
|
|
return ToolResult(error=f"Element with index {index} not found") |
|
|
page = await context.get_current_page() |
|
|
await page.select_option(element.xpath, label=text) |
|
|
return ToolResult( |
|
|
output=f"Selected option '{text}' from dropdown at index {index}" |
|
|
) |
|
|
|
|
|
|
|
|
elif action == "extract_content": |
|
|
if not goal: |
|
|
return ToolResult( |
|
|
error="Goal is required for 'extract_content' action" |
|
|
) |
|
|
|
|
|
page = await context.get_current_page() |
|
|
import markdownify |
|
|
|
|
|
content = markdownify.markdownify(await page.content()) |
|
|
|
|
|
prompt = f"""\ |
|
|
Your task is to extract the content of the page. You will be given a page and a goal, and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. |
|
|
Extraction goal: {goal} |
|
|
|
|
|
Page content: |
|
|
{content[:max_content_length]} |
|
|
""" |
|
|
messages = [{"role": "system", "content": prompt}] |
|
|
|
|
|
|
|
|
extraction_function = { |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "extract_content", |
|
|
"description": "Extract specific information from a webpage based on a goal", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"extracted_content": { |
|
|
"type": "object", |
|
|
"description": "The content extracted from the page according to the goal", |
|
|
"properties": { |
|
|
"text": { |
|
|
"type": "string", |
|
|
"description": "Text content extracted from the page", |
|
|
}, |
|
|
"metadata": { |
|
|
"type": "object", |
|
|
"description": "Additional metadata about the extracted content", |
|
|
"properties": { |
|
|
"source": { |
|
|
"type": "string", |
|
|
"description": "Source of the extracted content", |
|
|
} |
|
|
}, |
|
|
}, |
|
|
}, |
|
|
} |
|
|
}, |
|
|
"required": ["extracted_content"], |
|
|
}, |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
response = await self.llm.ask_tool( |
|
|
messages, |
|
|
tools=[extraction_function], |
|
|
tool_choice="required", |
|
|
) |
|
|
|
|
|
if response and response.tool_calls: |
|
|
args = json.loads(response.tool_calls[0].function.arguments) |
|
|
extracted_content = args.get("extracted_content", {}) |
|
|
return ToolResult( |
|
|
output=f"Extracted from page:\n{extracted_content}\n" |
|
|
) |
|
|
|
|
|
return ToolResult(output="No content was extracted from the page.") |
|
|
|
|
|
|
|
|
elif action == "switch_tab": |
|
|
if tab_id is None: |
|
|
return ToolResult( |
|
|
error="Tab ID is required for 'switch_tab' action" |
|
|
) |
|
|
await context.switch_to_tab(tab_id) |
|
|
page = await context.get_current_page() |
|
|
await page.wait_for_load_state() |
|
|
return ToolResult(output=f"Switched to tab {tab_id}") |
|
|
|
|
|
elif action == "open_tab": |
|
|
if not url: |
|
|
return ToolResult(error="URL is required for 'open_tab' action") |
|
|
await context.create_new_tab(url) |
|
|
return ToolResult(output=f"Opened new tab with {url}") |
|
|
|
|
|
elif action == "close_tab": |
|
|
await context.close_current_tab() |
|
|
return ToolResult(output="Closed current tab") |
|
|
|
|
|
|
|
|
elif action == "wait": |
|
|
seconds_to_wait = seconds if seconds is not None else 3 |
|
|
await asyncio.sleep(seconds_to_wait) |
|
|
return ToolResult(output=f"Waited for {seconds_to_wait} seconds") |
|
|
|
|
|
else: |
|
|
return ToolResult(error=f"Unknown action: {action}") |
|
|
|
|
|
except Exception as e: |
|
|
return ToolResult(error=f"Browser action '{action}' failed: {str(e)}") |
|
|
|
|
|
async def get_current_state( |
|
|
self, context: Optional[BrowserContext] = None |
|
|
) -> ToolResult: |
|
|
""" |
|
|
Get the current browser state as a ToolResult. |
|
|
If context is not provided, uses self.context. |
|
|
""" |
|
|
try: |
|
|
|
|
|
ctx = context or self.context |
|
|
if not ctx: |
|
|
return ToolResult(error="Browser context not initialized") |
|
|
|
|
|
state = await ctx.get_state() |
|
|
|
|
|
|
|
|
viewport_height = 0 |
|
|
if hasattr(state, "viewport_info") and state.viewport_info: |
|
|
viewport_height = state.viewport_info.height |
|
|
elif hasattr(ctx, "config") and hasattr(ctx.config, "browser_window_size"): |
|
|
viewport_height = ctx.config.browser_window_size.get("height", 0) |
|
|
|
|
|
|
|
|
page = await ctx.get_current_page() |
|
|
|
|
|
await page.bring_to_front() |
|
|
await page.wait_for_load_state() |
|
|
|
|
|
screenshot = await page.screenshot( |
|
|
full_page=True, animations="disabled", type="jpeg", quality=100 |
|
|
) |
|
|
|
|
|
screenshot = base64.b64encode(screenshot).decode("utf-8") |
|
|
|
|
|
|
|
|
state_info = { |
|
|
"url": state.url, |
|
|
"title": state.title, |
|
|
"tabs": [tab.model_dump() for tab in state.tabs], |
|
|
"help": "[0], [1], [2], etc., represent clickable indices corresponding to the elements listed. Clicking on these indices will navigate to or interact with the respective content behind them.", |
|
|
"interactive_elements": ( |
|
|
state.element_tree.clickable_elements_to_string() |
|
|
if state.element_tree |
|
|
else "" |
|
|
), |
|
|
"scroll_info": { |
|
|
"pixels_above": getattr(state, "pixels_above", 0), |
|
|
"pixels_below": getattr(state, "pixels_below", 0), |
|
|
"total_height": getattr(state, "pixels_above", 0) |
|
|
+ getattr(state, "pixels_below", 0) |
|
|
+ viewport_height, |
|
|
}, |
|
|
"viewport_height": viewport_height, |
|
|
} |
|
|
|
|
|
return ToolResult( |
|
|
output=json.dumps(state_info, indent=4, ensure_ascii=False), |
|
|
base64_image=screenshot, |
|
|
) |
|
|
except Exception as e: |
|
|
return ToolResult(error=f"Failed to get browser state: {str(e)}") |
|
|
|
|
|
async def cleanup(self): |
|
|
"""Clean up browser resources.""" |
|
|
async with self.lock: |
|
|
if self.context is not None: |
|
|
await self.context.close() |
|
|
self.context = None |
|
|
self.dom_service = None |
|
|
if self.browser is not None: |
|
|
await self.browser.close() |
|
|
self.browser = None |
|
|
|
|
|
def __del__(self): |
|
|
"""Ensure cleanup when object is destroyed.""" |
|
|
if self.browser is not None or self.context is not None: |
|
|
try: |
|
|
asyncio.run(self.cleanup()) |
|
|
except RuntimeError: |
|
|
loop = asyncio.new_event_loop() |
|
|
loop.run_until_complete(self.cleanup()) |
|
|
loop.close() |
|
|
|
|
|
@classmethod |
|
|
def create_with_context(cls, context: Context) -> "BrowserUseTool[Context]": |
|
|
"""Factory method to create a BrowserUseTool with a specific context.""" |
|
|
tool = cls() |
|
|
tool.tool_context = context |
|
|
return tool |
|
|
|