使用 Paho Go 连接到部署
本文主要介绍如何在 Go 项目中,使用 paho.mqtt.golang
库实现一个 MQTT 客戶端与 MQTT 服务器的连接、订阅、收发消息等功能。
Go 是 Google 开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。而 paho.mqtt.golang
是一个 MQTT 库,它提供了一个简单的 API,用于在 Go 项目中连接到 MQTT 服务器,并发送和接收消息。
前置准备
MQTT 服务器部署
- 使用 EMQX 提供的 免费公共 MQTT 服务器,该服务基于 EMQX 的 全托管 MQTT 云服务 创建。服务器接入信息如下:
- 连接地址: broker.emqx.io
- TCP Port: 1883
- SSL/TLS Port: 8883
- 您也可以自己创建部署,在部署概览下可以查看到连接相关的信息,请确保部署状态为运行中。使用 TCP 端口或 SSL/TLS 端口 测试连接到 MQTT 服务器。如果您是自己创建部署,请设置客户端认证,在部署控制台访问控制->认证 中设置用户名和密码,用于连接验证。
安装依赖
本项目使用 Go 1.15.13 版本进行开发测试,可用如下命令确认 Go 版本。
➜ ~ go version
go version go1.15.13 darwin/amd64
创建一个新的文件夹
go-mqtt
, 进入文件夹, 运行以下命令bash➜ ~ go mod init go-mqtt go: creating new go.mod: module go-mqtt
使用
go get <库>
命令安装paho.mqtt.golang
bash➜ ~ go get github.com/eclipse/paho.mqtt.golang go: downloading github.com/eclipse/paho.mqtt.golang v1.3.5 go: github.com/eclipse/paho.mqtt.golang upgrade => v1.3.5
创建
main.go
文件, 并导入paho.mqtt.golang
客戶端gopackage main import ( mqtt "github.com/eclipse/paho.mqtt.golang" ) func main(){ }
通过 TCP 端口连接
本节介绍如何通过 TCP 端口连接到部署。
连接设置
示例代码将使用公共 MQTT 服务器来连接,公共 MQTT 服务器无需设置用户名和密码。如果您创建了部署,请在部署控制台找到相应的连接地址,并参考 默认认证设置用户名和密码。
goconst protocol = "tcp" const broker = "broker.emqx.io" // MQTT Broker 连接地址 const port = 1883 const topic = "t/1" const username = "emqx" const password = "******"
连接关键代码
我们编写一个函数, 用于创建并返回 MQTT 客户端。
gofunc createMqttClient() mqtt.Client { connectAddress := fmt.Sprintf("%s://%s:%d", protocol,broker, port) rand.Seed(time.Now().UnixNano()) clientID := fmt.Sprintf("go-client-%d", rand.Int()) fmt.Println("connect address: ", connectAddress) opts := mqtt.NewClientOptions() opts.AddBroker(connectAddress) opts.SetUsername(username) opts.SetPassword(password) opts.SetClientID(clientID) opts.SetKeepAlive(time.Second * 60) client := mqtt.NewClient(opts) token := client.Connect() // 如果连接失败,则终止程序 if token.WaitTimeout(3*time.Second) && token.Error() != nil { log.Fatal(token.Error()) } return client }
通过 SSL/TLS 端口连接
本节介绍如何通过 SSL/TLS 协议连接到部署。
连接设置
使用 SSL/TLS 协议连接需要配置端口为 8883,连接协议类型为 "ssl"。
goconst protocol = "ssl" const port = 8883
连接关键代码
若需要配置服务器证书,可以通过
loadTLSConfig()
函数加载证书;若不需要,则直接使用 TCP 协议相同的连接方式即可。gofunc createMqttClient() mqtt.Client { connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port) rand.Seed(time.Now().UnixNano()) clientID := fmt.Sprintf("go-client-%d", rand.Int()) fmt.Println("connect address: ", connectAddress) opts := mqtt.NewClientOptions() opts.AddBroker(connectAddress) opts.SetUsername(username) opts.SetPassword(password) opts.SetClientID(clientID) opts.SetKeepAlive(time.Second * 60) // Optional: 设置CA证书 // opts.SetTLSConfig(loadTLSConfig("caFilePath")) client := mqtt.NewClient(opts) token := client.Connect() if token.WaitTimeout(3*time.Second) && token.Error() != nil { log.Fatal(token.Error()) } return client } func loadTLSConfig(caFile string) *tls.Config { // load tls config var tlsConfig tls.Config tlsConfig.InsecureSkipVerify = false if caFile != "" { certpool := x509.NewCertPool() ca, err := ioutil.ReadFile(caFile) if err != nil { log.Fatal(err.Error()) } certpool.AppendCertsFromPEM(ca) tlsConfig.RootCAs = certpool } return &tlsConfig }
发布和订阅
本节主要介绍如何在已连接到部署的情况下订阅主题并发布消息。
发布消息
定义发布函数,用于发布消息。
在这个函数里,我们定义了一个无条件的循环,每隔 1s 发布一条消息。通过 msgCount 来统计发布的消息数量。
func publish(client mqtt.Client) {
qos := 0
msgCount := 0
for {
payload := fmt.Sprintf("message: %d!", msgCount)
if token := client.Publish(topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, payload)
} else {
fmt.Printf("publish success, topic: %s, payload: %s\n", topic, payload)
}
msgCount++
time.Sleep(time.Second * 1)
}
}
订阅主题
定义订阅函数,用于订阅主题。该方法将在控制台打印消息的 Topic 和 Payload。
func subscribe(client mqtt.Client) {
qos := 0
client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received `%s` from `%s` topic", msg.Payload(), msg.Topic())
})
}
取消订阅
通过以下代码取消订阅,此时应指定取消订阅的主题。
client.Unsubscribe(topic)
断开连接
如客户端希望主动断开连接,可以通过如下代码实现。
// 参数指定断连前等待时长
client.Disconnect(250)
完整代码
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"math/rand"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const protocol = "tcp"
const broker = ""
const port = 1883
const topic = "t/1"
const username = ""
const password = ""
func main() {
client := createMqttClient()
go subscribe(client) // 在主函数里, 我们用另起一个 go 协程来订阅消息
time.Sleep(time.Second * 1) // 暂停一秒等待 subscribe 完成
publish(client)
}
func createMqttClient() mqtt.Client {
connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port)
rand.Seed(time.Now().UnixNano())
clientID := fmt.Sprintf("go-client-%d", rand.Int())
fmt.Println("connect address: ", connectAddress)
opts := mqtt.NewClientOptions()
opts.AddBroker(connectAddress)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetClientID(clientID)
opts.SetKeepAlive(time.Second * 60)
// Optional: 设置CA证书
// opts.SetTLSConfig(loadTLSConfig("caFilePath"))
client := mqtt.NewClient(opts)
token := client.Connect()
if token.WaitTimeout(3*time.Second) && token.Error() != nil {
log.Fatal(token.Error())
}
return client
}
func publish(client mqtt.Client) {
qos := 0
msgCount := 0
for {
payload := fmt.Sprintf("message: %d!", msgCount)
if token := client.Publish(topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, payload)
} else {
fmt.Printf("publish success, topic: %s, payload: %s\n", topic, payload)
}
msgCount++
time.Sleep(time.Second * 1)
}
}
func subscribe(client mqtt.Client) {
qos := 0
client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
})
}
func loadTLSConfig(caFile string) *tls.Config {
// load tls config
var tlsConfig tls.Config
tlsConfig.InsecureSkipVerify = false
if caFile != "" {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
if err != nil {
log.Fatal(err.Error())
}
certpool.AppendCertsFromPEM(ca)
tlsConfig.RootCAs = certpool
}
return &tlsConfig
}
测试验证
执行程序
➜ ~ go run main.go
connect address: tcp://***.***.***.***:1883
publish success, topic: t/1, payload: message: 0!
Received `message: 0!` from `t/1` topic
publish success, topic: t/1, payload: message: 1!
Received `message: 1!` from `t/1` topic
publish success, topic: t/1, payload: message: 2!
Received `message: 2!` from `t/1` topic
publish success, topic: t/1, payload: message: 3!
Received `message: 3!` from `t/1` topic
publish success, topic: t/1, payload: message: 4!
Received `message: 4!` from `t/1` topic
publish success, topic: t/1, payload: message: 5!
Received `message: 5!` from `t/1` topic
publish success, topic: t/1, payload: message: 6!
更多内容
综上所述,我们实现了在 Go 项目中创建 MQTT 连接,使用客户端与 MQTT 服务器进行订阅、收发消息、取消订阅以及断开连接的场景。可以在 这里 下载到示例的源码,同时也可以在 GitHub 上找到更多其他语言的 Demo 示例。