使用 Paho Go 连接到部署
本文主要介绍如何在 Go 项目中,使用 paho.mqtt.golang 库实现一个 MQTT 客戶端与 MQTT 服务器的连接、订阅、收发消息等功能。
Go 是 Google 开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。而 paho.mqtt.golang 是一个 MQTT 库,它提供了一个简单的 API,用于在 Go 项目中连接到 MQTT 服务器,并发送和接收消息。
前置准备
在使用 Paho Go 客户端将您的 Go 应用程序连接到 EMQX Platform 之前,请确保已部署了 MQTT 服务器。
部署 MQTT 服务器
安装依赖
本项目使用 Go 1.15.13 版本进行开发测试,可用如下命令确认 Go 版本。
➜ ~ go version
go version go1.15.13 darwin/amd64创建一个新的文件夹
go-mqtt, 进入文件夹, 运行以下命令bash➜ ~ go mod init go-mqtt go: creating new go.mod: module go-mqtt使用
go get <库>命令安装paho.mqtt.golangbash➜ ~ go get github.com/eclipse/paho.mqtt.golang go: downloading github.com/eclipse/paho.mqtt.golang v1.3.5 go: github.com/eclipse/paho.mqtt.golang upgrade => v1.3.5创建
main.go文件, 并导入paho.mqtt.golang客戶端gopackage main import ( mqtt "github.com/eclipse/paho.mqtt.golang" ) func main(){ }
通过 TCP 端口连接
本节介绍如何通过 TCP 端口连接到部署。
连接设置
示例代码将使用公共 MQTT 服务器来连接,公共 MQTT 服务器无需设置用户名和密码。如果您创建了部署,请在部署控制台找到相应的连接地址,并参考 默认认证设置用户名和密码。
goconst protocol = "tcp" const broker = "broker.emqx.io" // MQTT Broker 连接地址 const port = 1883 const topic = "t/1" const username = "emqx" const password = "******"连接关键代码
我们编写一个函数, 用于创建并返回 MQTT 客户端。
gofunc createMqttClient() mqtt.Client { connectAddress := fmt.Sprintf("%s://%s:%d", protocol,broker, port) rand.Seed(time.Now().UnixNano()) clientID := fmt.Sprintf("go-client-%d", rand.Int()) fmt.Println("connect address: ", connectAddress) opts := mqtt.NewClientOptions() opts.AddBroker(connectAddress) opts.SetUsername(username) opts.SetPassword(password) opts.SetClientID(clientID) opts.SetKeepAlive(time.Second * 60) client := mqtt.NewClient(opts) token := client.Connect() // 如果连接失败,则终止程序 if token.WaitTimeout(3*time.Second) && token.Error() != nil { log.Fatal(token.Error()) } return client }
通过 SSL/TLS 端口连接
本节介绍如何通过 SSL/TLS 协议连接到部署。
连接设置
使用 SSL/TLS 协议连接需要配置端口为 8883,连接协议类型为 "ssl"。
goconst protocol = "ssl" const port = 8883连接关键代码
若需要配置服务器证书,可以通过
loadTLSConfig()函数加载证书;若不需要,则直接使用 TCP 协议相同的连接方式即可。gofunc createMqttClient() mqtt.Client { connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port) rand.Seed(time.Now().UnixNano()) clientID := fmt.Sprintf("go-client-%d", rand.Int()) fmt.Println("connect address: ", connectAddress) opts := mqtt.NewClientOptions() opts.AddBroker(connectAddress) opts.SetUsername(username) opts.SetPassword(password) opts.SetClientID(clientID) opts.SetKeepAlive(time.Second * 60) // Optional: 设置CA证书 // opts.SetTLSConfig(loadTLSConfig("caFilePath")) client := mqtt.NewClient(opts) token := client.Connect() if token.WaitTimeout(3*time.Second) && token.Error() != nil { log.Fatal(token.Error()) } return client } func loadTLSConfig(caFile string) *tls.Config { // load tls config var tlsConfig tls.Config tlsConfig.InsecureSkipVerify = false if caFile != "" { certpool := x509.NewCertPool() ca, err := ioutil.ReadFile(caFile) if err != nil { log.Fatal(err.Error()) } certpool.AppendCertsFromPEM(ca) tlsConfig.RootCAs = certpool } return &tlsConfig }
发布和订阅
本节主要介绍如何在已连接到部署的情况下订阅主题并发布消息。
发布消息
定义发布函数,用于发布消息。
在这个函数里,我们定义了一个无条件的循环,每隔 1s 发布一条消息。通过 msgCount 来统计发布的消息数量。
func publish(client mqtt.Client) {
qos := 0
msgCount := 0
for {
payload := fmt.Sprintf("message: %d!", msgCount)
if token := client.Publish(topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, payload)
} else {
fmt.Printf("publish success, topic: %s, payload: %s\n", topic, payload)
}
msgCount++
time.Sleep(time.Second * 1)
}
}订阅主题
定义订阅函数,用于订阅主题。该方法将在控制台打印消息的 Topic 和 Payload。
func subscribe(client mqtt.Client) {
qos := 0
client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received `%s` from `%s` topic", msg.Payload(), msg.Topic())
})
}取消订阅
通过以下代码取消订阅,此时应指定取消订阅的主题。
client.Unsubscribe(topic)断开连接
如客户端希望主动断开连接,可以通过如下代码实现。
// 参数指定断连前等待时长
client.Disconnect(250)完整代码
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"math/rand"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const protocol = "tcp"
const broker = ""
const port = 1883
const topic = "t/1"
const username = ""
const password = ""
func main() {
client := createMqttClient()
go subscribe(client) // 在主函数里, 我们用另起一个 go 协程来订阅消息
time.Sleep(time.Second * 1) // 暂停一秒等待 subscribe 完成
publish(client)
}
func createMqttClient() mqtt.Client {
connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port)
rand.Seed(time.Now().UnixNano())
clientID := fmt.Sprintf("go-client-%d", rand.Int())
fmt.Println("connect address: ", connectAddress)
opts := mqtt.NewClientOptions()
opts.AddBroker(connectAddress)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetClientID(clientID)
opts.SetKeepAlive(time.Second * 60)
// Optional: 设置CA证书
// opts.SetTLSConfig(loadTLSConfig("caFilePath"))
client := mqtt.NewClient(opts)
token := client.Connect()
if token.WaitTimeout(3*time.Second) && token.Error() != nil {
log.Fatal(token.Error())
}
return client
}
func publish(client mqtt.Client) {
qos := 0
msgCount := 0
for {
payload := fmt.Sprintf("message: %d!", msgCount)
if token := client.Publish(topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, payload)
} else {
fmt.Printf("publish success, topic: %s, payload: %s\n", topic, payload)
}
msgCount++
time.Sleep(time.Second * 1)
}
}
func subscribe(client mqtt.Client) {
qos := 0
client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
})
}
func loadTLSConfig(caFile string) *tls.Config {
// load tls config
var tlsConfig tls.Config
tlsConfig.InsecureSkipVerify = false
if caFile != "" {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
if err != nil {
log.Fatal(err.Error())
}
certpool.AppendCertsFromPEM(ca)
tlsConfig.RootCAs = certpool
}
return &tlsConfig
}测试验证
执行程序
➜ ~ go run main.go
connect address: tcp://***.***.***.***:1883
publish success, topic: t/1, payload: message: 0!
Received `message: 0!` from `t/1` topic
publish success, topic: t/1, payload: message: 1!
Received `message: 1!` from `t/1` topic
publish success, topic: t/1, payload: message: 2!
Received `message: 2!` from `t/1` topic
publish success, topic: t/1, payload: message: 3!
Received `message: 3!` from `t/1` topic
publish success, topic: t/1, payload: message: 4!
Received `message: 4!` from `t/1` topic
publish success, topic: t/1, payload: message: 5!
Received `message: 5!` from `t/1` topic
publish success, topic: t/1, payload: message: 6!
更多内容
综上所述,我们实现了在 Go 项目中创建 MQTT 连接,使用客户端与 MQTT 服务器进行订阅、收发消息、取消订阅以及断开连接的场景。可以在 这里 下载到示例的源码,同时也可以在 GitHub 上找到更多其他语言的 Demo 示例。