|
|
import logging |
|
|
import sys |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stderr)]) |
|
|
|
|
|
import argparse |
|
|
import asyncio |
|
|
import atexit |
|
|
import json |
|
|
from inspect import Parameter, Signature |
|
|
from typing import Any, Dict, Optional |
|
|
|
|
|
from mcp.server.fastmcp import FastMCP |
|
|
|
|
|
from app.logger import logger |
|
|
from app.tool.base import BaseTool |
|
|
from app.tool.bash import Bash |
|
|
from app.tool.browser_use_tool import BrowserUseTool |
|
|
from app.tool.str_replace_editor import StrReplaceEditor |
|
|
from app.tool.terminate import Terminate |
|
|
|
|
|
|
|
|
class MCPServer: |
|
|
"""MCP Server implementation with tool registration and management.""" |
|
|
|
|
|
def __init__(self, name: str = "openmanus"): |
|
|
self.server = FastMCP(name) |
|
|
self.tools: Dict[str, BaseTool] = {} |
|
|
|
|
|
|
|
|
self.tools["bash"] = Bash() |
|
|
self.tools["browser"] = BrowserUseTool() |
|
|
self.tools["editor"] = StrReplaceEditor() |
|
|
self.tools["terminate"] = Terminate() |
|
|
|
|
|
def register_tool(self, tool: BaseTool, method_name: Optional[str] = None) -> None: |
|
|
"""Register a tool with parameter validation and documentation.""" |
|
|
tool_name = method_name or tool.name |
|
|
tool_param = tool.to_param() |
|
|
tool_function = tool_param["function"] |
|
|
|
|
|
|
|
|
async def tool_method(**kwargs): |
|
|
logger.info(f"Executing {tool_name}: {kwargs}") |
|
|
result = await tool.execute(**kwargs) |
|
|
|
|
|
logger.info(f"Result of {tool_name}: {result}") |
|
|
|
|
|
|
|
|
if hasattr(result, "model_dump"): |
|
|
return json.dumps(result.model_dump()) |
|
|
elif isinstance(result, dict): |
|
|
return json.dumps(result) |
|
|
return result |
|
|
|
|
|
|
|
|
tool_method.__name__ = tool_name |
|
|
tool_method.__doc__ = self._build_docstring(tool_function) |
|
|
tool_method.__signature__ = self._build_signature(tool_function) |
|
|
|
|
|
|
|
|
param_props = tool_function.get("parameters", {}).get("properties", {}) |
|
|
required_params = tool_function.get("parameters", {}).get("required", []) |
|
|
tool_method._parameter_schema = { |
|
|
param_name: { |
|
|
"description": param_details.get("description", ""), |
|
|
"type": param_details.get("type", "any"), |
|
|
"required": param_name in required_params, |
|
|
} |
|
|
for param_name, param_details in param_props.items() |
|
|
} |
|
|
|
|
|
|
|
|
self.server.tool()(tool_method) |
|
|
logger.info(f"Registered tool: {tool_name}") |
|
|
|
|
|
def _build_docstring(self, tool_function: dict) -> str: |
|
|
"""Build a formatted docstring from tool function metadata.""" |
|
|
description = tool_function.get("description", "") |
|
|
param_props = tool_function.get("parameters", {}).get("properties", {}) |
|
|
required_params = tool_function.get("parameters", {}).get("required", []) |
|
|
|
|
|
|
|
|
docstring = description |
|
|
if param_props: |
|
|
docstring += "\n\nParameters:\n" |
|
|
for param_name, param_details in param_props.items(): |
|
|
required_str = ( |
|
|
"(required)" if param_name in required_params else "(optional)" |
|
|
) |
|
|
param_type = param_details.get("type", "any") |
|
|
param_desc = param_details.get("description", "") |
|
|
docstring += ( |
|
|
f" {param_name} ({param_type}) {required_str}: {param_desc}\n" |
|
|
) |
|
|
|
|
|
return docstring |
|
|
|
|
|
def _build_signature(self, tool_function: dict) -> Signature: |
|
|
"""Build a function signature from tool function metadata.""" |
|
|
param_props = tool_function.get("parameters", {}).get("properties", {}) |
|
|
required_params = tool_function.get("parameters", {}).get("required", []) |
|
|
|
|
|
parameters = [] |
|
|
|
|
|
|
|
|
for param_name, param_details in param_props.items(): |
|
|
param_type = param_details.get("type", "") |
|
|
default = Parameter.empty if param_name in required_params else None |
|
|
|
|
|
|
|
|
annotation = Any |
|
|
if param_type == "string": |
|
|
annotation = str |
|
|
elif param_type == "integer": |
|
|
annotation = int |
|
|
elif param_type == "number": |
|
|
annotation = float |
|
|
elif param_type == "boolean": |
|
|
annotation = bool |
|
|
elif param_type == "object": |
|
|
annotation = dict |
|
|
elif param_type == "array": |
|
|
annotation = list |
|
|
|
|
|
|
|
|
param = Parameter( |
|
|
name=param_name, |
|
|
kind=Parameter.KEYWORD_ONLY, |
|
|
default=default, |
|
|
annotation=annotation, |
|
|
) |
|
|
parameters.append(param) |
|
|
|
|
|
return Signature(parameters=parameters) |
|
|
|
|
|
async def cleanup(self) -> None: |
|
|
"""Clean up server resources.""" |
|
|
logger.info("Cleaning up resources") |
|
|
|
|
|
if "browser" in self.tools and hasattr(self.tools["browser"], "cleanup"): |
|
|
await self.tools["browser"].cleanup() |
|
|
|
|
|
def register_all_tools(self) -> None: |
|
|
"""Register all tools with the server.""" |
|
|
for tool in self.tools.values(): |
|
|
self.register_tool(tool) |
|
|
|
|
|
def run(self, transport: str = "stdio") -> None: |
|
|
"""Run the MCP server.""" |
|
|
|
|
|
self.register_all_tools() |
|
|
|
|
|
|
|
|
atexit.register(lambda: asyncio.run(self.cleanup())) |
|
|
|
|
|
|
|
|
logger.info(f"Starting OpenManus server ({transport} mode)") |
|
|
self.server.run(transport=transport) |
|
|
|
|
|
|
|
|
def parse_args() -> argparse.Namespace: |
|
|
"""Parse command line arguments.""" |
|
|
parser = argparse.ArgumentParser(description="OpenManus MCP Server") |
|
|
parser.add_argument( |
|
|
"--transport", |
|
|
choices=["stdio"], |
|
|
default="stdio", |
|
|
help="Communication method: stdio or http (default: stdio)", |
|
|
) |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
args = parse_args() |
|
|
|
|
|
|
|
|
server = MCPServer() |
|
|
server.run(transport=args.transport) |
|
|
|