# 使用 Go SDK 连接到部署

本文主要介绍如何在 Go 项目中,使用 paho.mqtt.golang 库实现一个 MQTT 客戶端与 MQTT 服务器的连接、订阅、收发消息等功能。

Go (opens new window) 是 Google 开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。而 paho.mqtt.golang 是一个 MQTT 库,它提供了一个简单的 API,用于在 Go 项目中连接到 MQTT 服务器,并发送和接收消息。

# 前提条件

  1. 已经创建了部署,在 部署概览 下可以查看到连接相关的信息,请确保部署状态为运行中。同时你可以使用 WebSocket 测试连接到 MQTT 服务器。
  2. 认证鉴权 > 认证 中设置用户名和密码,用于连接验证。

本项目使用 Go 1.15.13 版本进行开发测试,可用如下命令确认 Go 版本。

➜ ~ go version
go version go1.15.13 darwin/amd64
1
2

# 安装依赖

  1. 创建一个新的文件夹 go-mqtt, 进入文件夹, 运行以下命令

    ➜ ~ go mod init go-mqtt
    go: creating new go.mod: module go-mqtt
    
    1
    2
  2. 使用 go get <库> 命令安装 paho.mqtt.golang

    ➜ ~ go get github.com/eclipse/paho.mqtt.golang
    go: downloading github.com/eclipse/paho.mqtt.golang v1.3.5
    go: github.com/eclipse/paho.mqtt.golang upgrade => v1.3.5
    
    1
    2
    3
  3. 创建 main.go 文件, 并导入 paho.mqtt.golang 客戶端

    package main
    
    import (
        mqtt "github.com/eclipse/paho.mqtt.golang"
    )
    
    func main(){
    }
    
    1
    2
    3
    4
    5
    6
    7
    8

# 连接

请在控制台的 部署概览 找到相关的地址以及端口信息,需要注意如果是基础版,端口不是 1883 或 8883 端口,请确认好端口。并且在 认证鉴权 中添加认证信息。

# 连接设置

设置 MQTT Broker 连接地址,端口以及 topic, 以及用戶名和密码。

const broker = "broker.emqx.io" // MQTT Broker 连接地址
const port = 1883
const topic = "t/1"
const username = "emqx"
const password = "******"
1
2
3
4
5

# 连接关键代码

我们编写一个函数, 用于创建并返回 MQTT 客户端。

func createMqttClient() mqtt.Client {
    connectAddress := fmt.Sprintf("tcp://%s:%d", broker, port)
    client_id := fmt.Sprintf("go-client-%d", rand.Int())

    fmt.Println("connect address: ", connectAddress)
    opts := mqtt.NewClientOptions()
    opts.AddBroker(connectAddress)
    opts.SetUsername(username)
    opts.SetPassword(password)
    opts.SetClientID(client_id)
    opts.SetKeepAlive(60)
    client := mqtt.NewClient(opts)
    token := client.Connect()
    // 如果连接失败,则终止程序
    if token.WaitTimeout(3*time.Second) && token.Error() != nil {
        log.Fatal(token.Error())
    }
    return client
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 发布消息

定义发布函数,用于发布消息。

在这个函数里,我们定义了一个无条件的循环,每隔 1s 发布一条消息。通过 msgCount 来统计发布的消息数量。

func publish(client mqtt.Client) {
    qos := 0
    msgCount := 0
    for {
        payload := fmt.Sprintf("message: %d!", msgCount)
        if token := client.Publish(topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
            fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, payload)
        } else {
            fmt.Printf("publish success, topic: %s, payload: %s\n", topic, payload)
        }
        msgCount++
        time.Sleep(time.Second * 1)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 订阅主题

定义订阅函数,用于订阅主题。该方法将在控制台打印消息的 Topic 和 Payload。

func subscribe(client mqtt.Client) {
    qos := 0
    client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
        fmt.Printf("Received `%s` from `%s` topic", msg.Payload(), msg.Topic())
    })
}
1
2
3
4
5
6

# 完整代码

package main

import (
        "fmt"
        "log"
        "math/rand"
        "time"

        mqtt "github.com/eclipse/paho.mqtt.golang"
)

const broker = ""
const port = 1883
const topic = "t/1"
const username = ""
const password = ""

func main() {
        client := createMqttClient()
        go subscribe(client)        // 在主函数里, 我们用另起一个 go 协程来订阅消息
        time.Sleep(time.Second * 1) // 暂停一秒等待 subscribe 完成
        publish(client)
}

func createMqttClient() mqtt.Client {
        connectAddress := fmt.Sprintf("tcp://%s:%d", broker, port)
        client_id := fmt.Sprintf("go-client-%d", rand.Int())

        fmt.Println("connect address: ", connectAddress)
        opts := mqtt.NewClientOptions()
        opts.AddBroker(connectAddress)
        opts.SetUsername(username)
        opts.SetPassword(password)
        opts.SetClientID(client_id)
        opts.SetKeepAlive(60)
        client := mqtt.NewClient(opts)
        token := client.Connect()
        if token.WaitTimeout(3*time.Second) && token.Error() != nil {
                log.Fatal(token.Error())
        }
        return client
}

func publish(client mqtt.Client) {
        qos := 0
        msgCount := 0
        for {
                payload := fmt.Sprintf("message: %d!", msgCount)
                if token := client.Publish(topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
                        fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, payload)
                } else {
                        fmt.Printf("publish success, topic: %s, payload: %s\n", topic, payload)
                }
                msgCount++
                time.Sleep(time.Second * 1)
        }
}

func subscribe(client mqtt.Client) {
        qos := 0
        client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
                fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
        })
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

# 测试验证

执行程序

➜ ~ go run main.go
connect address:  tcp://***.***.***.***:1883
publish success, topic: t/1, payload: message: 0!
Received `message: 0!` from `t/1` topic
publish success, topic: t/1, payload: message: 1!
Received `message: 1!` from `t/1` topic
publish success, topic: t/1, payload: message: 2!
Received `message: 2!` from `t/1` topic
publish success, topic: t/1, payload: message: 3!
Received `message: 3!` from `t/1` topic
publish success, topic: t/1, payload: message: 4!
Received `message: 4!` from `t/1` topic
publish success, topic: t/1, payload: message: 5!
Received `message: 5!` from `t/1` topic
publish success, topic: t/1, payload: message: 6!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

go pubsub

# 更多内容

以上为您演示了如何使用 paho.mqtt.golang 客戶端连接到 EMQX Cloud, 模拟了客户端与 MQTT 服务器进行订阅, 收发消息的场景。 可以在 这里 (opens new window) 下载到示例的源码。同时也可以在 Github (opens new window) 上找到更多其它语言的 Demo 示例。