# Paho Go を使ったデプロイメントへの接続

この記事では、Go プロジェクトで `paho.mqtt.golang` ライブラリを使用して、MQTT クライアントと MQTT ブローカー間の接続、サブスクライブ、メッセージの送受信機能を実現する方法を案内します。

[Go](https://go.dev/) は Google が開発した、強力な静的型付け、コンパイル型、並行処理スタイル、ガベージコレクション機能を備えたプログラミング言語です。`paho.mqtt.golang` は MQTT ライブラリであり、Go プロジェクトで MQTT サーバーへの接続やメッセージの送受信を簡単に行うための API を提供します。

## 前提条件

### MQTT ブローカーのデプロイ

- EMQX が提供する[無料のパブリック MQTT ブローカー](https://www.emqx.com/en/mqtt/public-mqtt5-broker)を利用できます。このサービスは[EMQX プラットフォーム](https://www.emqx.com/en)を基に作成されています。ブローカーの接続情報は以下の通りです：
   - ブローカー：broker.emqx.io
   - TCP ポート：1883
   - TLS/SSL ポート：8883
- また、[デプロイメントを作成](https://docs.emqx.com/en/cloud/latest/create/overview.html)することも可能です。接続情報はデプロイメントの概要画面で確認できます。デプロイメントが稼働していることを確認してください。同時に、WebSocket を使って MQTT サーバーへの接続テストも可能です。独自のデプロイメントを作成する場合は、[認証](../deployments/auth_overview.md)を確認し、**アクセス制御** -> **認証** でユーザー名とパスワードを設定して認証を行ってください。

## 依存関係のインストール

本プロジェクトは Go バージョン 1.15.13 を使用して開発およびテストしています。以下のコマンドで Go のバージョンを確認できます。

``` bash
➜ ~ go version
go version go1.15.13 darwin/amd64
```

1. 新しいフォルダ `go-mqtt` を作成し、その中に入り、以下のコマンドを実行します。

    ``` bash
    ➜ ~ go mod init go-mqtt
    go: creating new go.mod: module go-mqtt
    ```

2. `go get <Library>` コマンドを使って `paho.mqtt.golang` をインストールします。

    ``` bash
    ➜ ~ go get github.com/eclipse/paho.mqtt.golang
    go: downloading github.com/eclipse/paho.mqtt.golang v1.3.5
    go: github.com/eclipse/paho.mqtt.golang upgrade => v1.3.5
    ```

3. `main.go` ファイルを作成し、`paho.mqtt.golang` クライアントをインポートします。

    ``` go
    package main
    
    import (
        mqtt "github.com/eclipse/paho.mqtt.golang"
    )
    
    func main(){
    }
    ```

## TCP プロトコルでの接続

このセクションでは、TCP プロトコルを使って MQTT ブローカーに接続する方法を紹介します。

1. 接続設定

   例ではパブリック MQTT サーバーを使用して接続します。独自のデプロイメントを作成している場合は、デプロイメントコンソールで接続先アドレスを確認し、ユーザー名とパスワードを適切に設定してください。

   ``` go
   const protocol = "tcp"
   const broker = "broker.emqx.io" // MQTT ブローカーのアドレス
   const port = 1883
   const topic = "t/1"
   const username = "emqx"
   const password = "******"
   ```

2. 接続の主要コード

   MQTT クライアントを作成して返す関数を記述します。

    ``` go
    func createMqttClient() mqtt.Client {
        connectAddress := fmt.Sprintf("%s://%s:%d", protocol,broker, port)
        rand.Seed(time.Now().UnixNano())
        clientID := fmt.Sprintf("go-client-%d", rand.Int())
    
        fmt.Println("connect address: ", connectAddress)
        opts := mqtt.NewClientOptions()
        opts.AddBroker(connectAddress)
        opts.SetUsername(username)
        opts.SetPassword(password)
        opts.SetClientID(clientID)
        opts.SetKeepAlive(time.Second * 60)
        client := mqtt.NewClient(opts)
        token := client.Connect()
        // 接続失敗時は終了
        if token.WaitTimeout(3*time.Second) && token.Error() != nil {
            log.Fatal(token.Error())
        }
        return client
    }
    ```

## SSL/TLS プロトコルでの接続

このセクションでは、SSL/TLS プロトコルを使って MQTT ブローカーに接続する方法を紹介します。

1. 接続設定
   
   ポートを 8883 に、プロトコルタイプを "ssl" に設定します。

   ``` go
    const protocol = "ssl"
    const port = 8883
   ```

2. 接続の主要コード

   サーバー証明書の設定が必要な場合は、`loadTLSConfig()` 関数で証明書を読み込みます。不要な場合は TCP プロトコルと同じコードで接続可能です。

    ``` go
    func createMqttClient() mqtt.Client {
        connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port)
        rand.Seed(time.Now().UnixNano())
        clientID := fmt.Sprintf("go-client-%d", rand.Int())
    
        fmt.Println("connect address: ", connectAddress)
        opts := mqtt.NewClientOptions()
        opts.AddBroker(connectAddress)
        opts.SetUsername(username)
        opts.SetPassword(password)
        opts.SetClientID(clientID)
        opts.SetKeepAlive(time.Second * 60)
    
        // 任意：サーバー CA 設定
        // opts.SetTLSConfig(loadTLSConfig("caFilePath"))
    
        client := mqtt.NewClient(opts)
        token := client.Connect()
        if token.WaitTimeout(3*time.Second) && token.Error() != nil {
            log.Fatal(token.Error())
        }
        return client
    }
    
    func loadTLSConfig(caFile string) *tls.Config {
        // TLS 設定の読み込み
        var tlsConfig tls.Config
        tlsConfig.InsecureSkipVerify = false
        if caFile != "" {
            certpool := x509.NewCertPool()
            ca, err := ioutil.ReadFile(caFile)
            if err != nil {
                log.Fatal(err.Error())
            }
            certpool.AppendCertsFromPEM(ca)
            tlsConfig.RootCAs = certpool
        }
        return &tlsConfig
    }
    ```

## パブリッシュとサブスクライブ

このセクションでは、MQTT ブローカーへの接続に成功した後、トピックのサブスクライブおよびメッセージのパブリッシュ方法を紹介します。

### メッセージのパブリッシュ

メッセージをパブリッシュする関数を定義します。

この関数では、無条件ループで1秒ごとにメッセージをパブリッシュします。`msgCount` でパブリッシュしたメッセージ数をカウントしています。

``` go
func publish(client mqtt.Client) {
    qos := 0
    msgCount := 0
    for {
        payload := fmt.Sprintf("message: %d!", msgCount)
        if token := client.Publish(topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
            fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, payload)
        } else {
            fmt.Printf("publish success, topic: %s, payload: %s\n", topic, payload)
        }
        msgCount++
        time.Sleep(time.Second * 1)
    }
}
```

### トピックのサブスクライブ

メッセージをサブスクライブする関数を定義します。この関数は受信したメッセージのトピックとペイロードをコンソールに出力します。

``` go
func subscribe(client mqtt.Client) {
    qos := 0
    client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
        fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
    })
}
```

### トピックのサブスクライブ解除

以下のコードでサブスクライブ解除が可能です。解除したいトピックを指定してください。

``` go
client.Unsubscribe(topic)
```

### 切断

クライアントが能動的に切断したい場合は、以下のコードを使用します。

``` go
// パラメーターは切断前の待機時間（ミリ秒）
client.Disconnect(250)
```

### 完全なコード例

``` go
package main

import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io/ioutil"
	"log"
	"math/rand"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

const protocol = "tcp"
const broker = ""
const port = 1883
const topic = "t/1"
const username = ""
const password = ""

func main() {
	client := createMqttClient()
	go subscribe(client)        // ゴルーチンでサブスクライブ関数を実行
	time.Sleep(time.Second * 1) // サブスクライブ関数の準備待ちに1秒待機
	publish(client)
}

func createMqttClient() mqtt.Client {
	connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port)
	rand.Seed(time.Now().UnixNano())
	clientID := fmt.Sprintf("go-client-%d", rand.Int())

	fmt.Println("connect address: ", connectAddress)
	opts := mqtt.NewClientOptions()
	opts.AddBroker(connectAddress)
	opts.SetUsername(username)
	opts.SetPassword(password)
	opts.SetClientID(clientID)
	opts.SetKeepAlive(time.Second * 60)

	// 任意：サーバー CA 設定
	// opts.SetTLSConfig(loadTLSConfig("caFilePath"))

	client := mqtt.NewClient(opts)
	token := client.Connect()
	if token.WaitTimeout(3*time.Second) && token.Error() != nil {
		log.Fatal(token.Error())
	}
	return client
}

func publish(client mqtt.Client) {
	qos := 0
	msgCount := 0
	for {
		payload := fmt.Sprintf("message: %d!", msgCount)
		if token := client.Publish(topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
			fmt.Printf("publish failed, topic: %s, payload: %s\n", topic, payload)
		} else {
			fmt.Printf("publish success, topic: %s, payload: %s\n", topic, payload)
		}
		msgCount++
		time.Sleep(time.Second * 1)
	}
}

func subscribe(client mqtt.Client) {
	qos := 0
	client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
		fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
	})
}

func loadTLSConfig(caFile string) *tls.Config {
	// TLS 設定の読み込み
	var tlsConfig tls.Config
	tlsConfig.InsecureSkipVerify = false
	if caFile != "" {
		certpool := x509.NewCertPool()
		ca, err := ioutil.ReadFile(caFile)
		if err != nil {
			log.Fatal(err.Error())
		}
		certpool.AppendCertsFromPEM(ca)
		tlsConfig.RootCAs = certpool
	}
	return &tlsConfig
}
```

## テスト

実行手順

``` bash
➜ ~ go run main.go
connect address:  tcp://***.***.***.***:1883
publish success, topic: t/1, payload: message: 0!
Received `message: 0!` from `t/1` topic
publish success, topic: t/1, payload: message: 1!
Received `message: 1!` from `t/1` topic
publish success, topic: t/1, payload: message: 2!
Received `message: 2!` from `t/1` topic
publish success, topic: t/1, payload: message: 3!
Received `message: 3!` from `t/1` topic
publish success, topic: t/1, payload: message: 4!
Received `message: 4!` from `t/1` topic
publish success, topic: t/1, payload: message: 5!
Received `message: 5!` from `t/1` topic
publish success, topic: t/1, payload: message: 6!
```

![go pubsub](./_assets/go_pubsub.png)

## さらに詳しく

まとめると、Go プロジェクトで MQTT 接続の作成、サブスクライブ、メッセージの送受信、サブスクライブ解除、切断を実装しました。サンプルのソースコードは[こちら](https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-Go)からダウンロード可能です。また、他の言語のデモ例も[GitHub](https://github.com/emqx/MQTT-Client-Examples)で多数公開されています。
