Skip to content

Python SDK

本文将使用 MCP over MQTT Python SDK 创建一个简单的 MCP over MQTT 服务端和客户端。

创建一个演示项目

使用 uv 创建一个演示项目:

bash
uv init mcp_over_mqtt_demo
cd mcp_over_mqtt_demo

创建一个简单的 MCP 服务器

mcp_over_mqtt_demo 项目中,让我们创建一个简单的 MCP 服务器,暴露一个工具做整数的加法运算,和一个简单的资源返回字符串。创建一个名为 demo_mcp_server.py 的文件,代码如下:

python
# demo_mcp_server.py
from mcp.server.fastmcp import FastMCP

# Create an MCP server
mcp = FastMCP(
    "demo_mcp_server/calculator",
    log_level="DEBUG",
    mqtt_server_description="A simple FastMCP server that exposes a calculator tool",
    mqtt_options={
        "host": "broker.emqx.io",
    },
)

# Add an addition tool
@mcp.tool()
def add(a: int, b: int) -> int:
    """Add two numbers"""
    return a + b

# Add a dynamic greeting resource
@mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str:
    """Get a personalized greeting"""
    return f"Hello, {name}!"

创建一个简单的 MCP 客户端

在同一个项目中,让我们创建一个简单的 MCP 客户端,连接到服务器并列出可用的工具和资源。创建一个名为 demo_mcp_client.py 的文件,并添加以下代码:

python
# demo_mcp_client.py
import logging
import anyio
import mcp.client.mqtt as mcp_mqtt
from mcp.shared.mqtt import configure_logging

configure_logging(level="INFO")
logger = logging.getLogger(__name__)

async def on_mcp_server_discovered(client, server_name):
    logger.info(f"Discovered {server_name}, connecting ...")
    await client.initialize_mcp_server(server_name)

async def on_mcp_connect(client, server_name, connect_result):
    success, init_result = connect_result
    if success == 'error':
        logger.error(f"Failed to connect to {server_name}: {init_result}")
        return
    logger.info(f"Connected to {server_name}, success={success}, init_result={init_result}")
    capabilities = init_result.capabilities
    if capabilities.prompts:
        prompts = await client.list_prompts(server_name)
        logger.info(f"Prompts of {server_name}: {prompts}")
    if capabilities.resources:
        resources = await client.list_resources(server_name)
        logger.info(f"Resources of {server_name}: {resources}")
        resource_templates = await client.list_resource_templates(server_name)
        logger.info(f"Resources templates of {server_name}: {resource_templates}")
    if capabilities.tools:
        toolsResult = await client.list_tools(server_name)
        tools = toolsResult.tools
        logger.info(f"Tools of {server_name}: {tools}")
        if tools[0].name == "add":
            result = await client.call_tool(server_name, name = tools[0].name, arguments={"a": 1, "b": 2})
            logger.info(f"Calling the tool as add(a=1, b=2), result: {result}")

async def on_mcp_disconnect(client, server_name):
    logger.info(f"Disconnected from {server_name}")

async def main():
    async with mcp_mqtt.MqttTransportClient(
        "test_client",
        auto_connect_to_mcp_server = True,
        on_mcp_server_discovered = on_mcp_server_discovered,
        on_mcp_connect = on_mcp_connect,
        on_mcp_disconnect = on_mcp_disconnect,
        mqtt_options = mcp_mqtt.MqttOptions(
            host="broker.emqx.io",
        )
    ) as client:
        client.start()
        while True:
            ## Simulate other works while the MQTT transport client is running in the background...
            await anyio.sleep(20)

if __name__ == "__main__":
    anyio.run(main)

运行演示

  1. 首先,安装所需的依赖项:
bash
uv add git+https://github.com/emqx/mcp-python-sdk --branch main
uv add "mcp[cli]"
  1. 现在运行客户端:
bash
uv run demo_mcp_client.py
  1. 打开一个新终端并运行服务器:
bash
uv run mcp run --transport mqtt ./demo_mcp_server.py

即便客户端先于服务器启动,它也会发现服务器并连接到它。客户端将列出可用的工具和资源,并使用参数 a=1b=2 调用 add 工具。