Skip to content

Eclipse Paho Java を使った接続

このチュートリアルでは、Eclipse Paho Java クライアント を使用して、クライアントと MQTT ブローカー間の接続、サブスクライブ、メッセージ送受信、サブスクリプション解除などの機能を実装する方法を学びます。

Eclipse Paho Java は、Java アプリケーションで MQTT 通信プロトコルを実装するためのオープンソースの MQTT クライアントライブラリです。複数の API を提供し、MQTT プロトコルの実装と利用を簡素化し、さまざまな MQTT バージョンをサポートしています。これを使うことで、簡単に MQTT クライアントを作成し、メッセージの送受信や接続、サブスクライブ、サブスクリプション解除などの操作を行えます。さらに、Eclipse Paho Java は自動再接続や SSL セキュア接続などの高度な機能も備えており、Java 開発者が迅速に MQTT 通信プロトコルを実装するのに役立つ強力で使いやすい MQTT クライアントライブラリです。

前提条件

Eclipse Paho Java クライアントを使って EMQX Cloud に接続する前に、以下の前提条件を満たしていることを確認してください。

  • MQTT ブローカーのデプロイ
  • Maven のインストール

MQTT ブローカーのデプロイ

アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。

サーバレスデプロイメント

  1. EMQX Cloudコンソールでサーバレスデプロイメントを作成します。

  2. デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。

    • ブローカーアドレス

    • ポート番号(サーバレスではTLSポートのみ対応)

  3. サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート8883を使用してください。

  4. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細はサーバレスポートガイドを参照してください。

Dedicated Flex または BYOC デプロイメント

  1. EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。

  2. 作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。

    • ブローカーアドレス

    • MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)

  3. デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。

詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。

Maven のインストール

本プロジェクトでは Maven をビルドツールとして使用します。まずは Maven のインストール を行ってください。

プロジェクトの作成

  1. Intellij IDEA で Maven プロジェクトを作成します。詳細は Maven プロジェクトの作成 を参照してください。
  2. src/main/java 以下に io.emqx.mqtt パッケージを作成します。

MQTT 依存関係の追加

pom.xml に以下の依存関係を追加します。

xml
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

次に以下のコマンドを実行します。

shell
mvn install

TCP ポートでの接続

例示コードではパブリック MQTT サーバーを使用して接続します。独自にデプロイした場合は、デプロイメントコンソールで該当の接続アドレスを確認し、ユーザー名とパスワードを適切に設定してください。

接続オプション

MqttSample.java を作成し、ブローカー、ポート、トピック、認証情報を設定します。

Java
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
// パーシステンス
MemoryPersistence persistence = new MemoryPersistence();
// MQTT 接続オプション
MqttConnectOptions connOpts = new MqttConnectOptions();
// 認証
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());

接続

Java
MqttClient client = new MqttClient(broker, clientId, persistence);
// コールバック設定
client.setCallback(new SampleCallback());

System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);

SampleCallback.java の内容は以下の通りです:

Java
package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class SampleCallback implements MqttCallback {
    
    public void connectionLost(Throwable cause) {
        System.out.println("connection lost:" + cause.getMessage());
    }

    
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Received message: \n  topic:" + topic + "\n  Qos:" + message.getQos() + "\n  payload:" + new String(message.getPayload()));
    }

    
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete");
    }
}

SSL/TLS ポートでの接続

例示コードではパブリック MQTT サーバーを使用して接続します。独自にデプロイした場合は、デプロイメントコンソールで該当の接続アドレスを確認し、ユーザー名とパスワードを適切に設定してください。

このセクションでは、SSL/TLS の片方向認証での接続方法を紹介します。双方向認証を使用する場合は、こちら を参照してください。

接続オプション

MqttSample.java を作成し、ブローカー、ポート、トピック、認証情報、CA 証明書を設定します。

Java
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "ssl://broker.emqx.io:8883";
String clientId = MqttClient.generateClientId();
// パーシステンス
MemoryPersistence persistence = new MemoryPersistence();
// MQTT 接続オプション
MqttConnectOptions connOpts = new MqttConnectOptions();
// 認証
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
// CA 証明書
try {
    String caCrtFile = MqttSample.class.getResource("").getPath() + "./broker.emqx.io-ca.crt";
    connOpts.setSocketFactory(SSLUtils.getSingleSocketFactory(caCrtFile));
} catch (Exception e) {
    throw new RuntimeException(e);
}

SSLUtils.java の内容は以下の通りです:

Java
public class SSLUtils {
    // 片方向 ssl/tls
    public static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception {
        Security.addProvider(new BouncyCastleProvider());
        X509Certificate caCert = null;

        FileInputStream caCrtFileInputStream = new FileInputStream(caCrtFile);

        BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
        }
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("cert-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(caKs);
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        sslContext.init(null, tmf.getTrustManagers(), null);
        return sslContext.getSocketFactory();
    }
}

接続

Java
MqttClient client = new MqttClient(broker, clientId, persistence);
// コールバック設定
client.setCallback(new SampleCallback()); // TCP ポート接続時と同じ SampleCallback.java を使用します。
// 接続
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);

パブリッシュとサブスクライブ

このセクションでは、MQTT ブローカーへの接続に成功した後に、トピックのサブスクライブとメッセージのパブリッシュを行う方法を紹介します。

トピックのサブスクライブ

サブスクライブするトピックとトピックの QoS レベル を設定します。

Java
// トピックをサブスクライブ
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);

トピックのサブスクリプション解除

トピックのサブスクリプションを解除するには、以下のコードを使用します。解除するトピックを指定してください。

Java
client.unsubscribe(topic);

メッセージのパブリッシュ

メッセージをパブリッシュする際は、ブローカーにトピックとペイロードを通知します。

Java
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");

メッセージの受信

以下のコードは、クライアントがメッセージイベントを監視し、メッセージ受信後にコールバック関数を実行して、受信したメッセージとトピックをコンソールに出力することを示しています。

Java
client.setCallback(new SampleCallback());

SampleCallback.java の内容は以下の通りです:

Java
package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class SampleCallback implements MqttCallback {
    
    public void connectionLost(Throwable cause) {
        System.out.println("connection lost:" + cause.getMessage());
    }

    
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Received message: \n  topic:" + topic + "\n  Qos:" + message.getQos() + "\n  payload:" + new String(message.getPayload()));
    }

    
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete");
    }
}

MQTT ブローカーからの切断

クライアントが能動的に切断する場合は、以下のコードを使用します。

Java
client.disconnect();

完全なコード例

MqttSample.java

Java
package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttSample {
    public static void main(String[] args) {
        String topic = "test/topic";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = MqttClient.generateClientId();
        // パーシステンス
        MemoryPersistence persistence = new MemoryPersistence();
        // 接続オプション
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // 認証 
        connOpts.setUserName("emqx_user");
        connOpts.setPassword("emqx_password".toCharArray());

        // ssl/tls 設定
        try {
            // broker = "ssl://broker.emqx.io:8883";

            // 片方向 ssl/tls
            // String caCrtFile = MqttSample.class.getResource("").getPath() + "./broker.emqx.io-ca.crt";
            // connOpts.setSocketFactory(SSLUtils.getSingleSocketFactory(caCrtFile));

            // 双方向 ssl/tls
            // String caCrtFile = MqttSample.class.getResource("").getPath() + "./server-ca.crt";
            // String crtFile = MqttSample.class.getResource("").getPath() + "./client.crt";
            // String keyFile = MqttSample.class.getResource("").getPath() + "./client.key";
            // connOpts.setSocketFactory(SSLUtils.getSocketFactory(caCrtFile, crtFile, keyFile, ""));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        
        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);
            // コールバック設定
            client.setCallback(new SampleCallback());

            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected to broker: " + broker);

            client.subscribe(topic, qos);
            System.out.println("Subscribed to topic: " + topic);
            
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(topic, message);
            System.out.println("Message published");
            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

SampleCallback.java

Java
package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class SampleCallback implements MqttCallback {
    
    public void connectionLost(Throwable cause) {
        System.out.println("connection lost:" + cause.getMessage());
    }

    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Received message: \n  topic:" + topic + "\n  Qos:" + message.getQos() + "\n  payload:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete");
    }
}

SSLUtils.class

Java
public class SSLUtils {
    // 片方向 ssl/tls
    public static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception {
        Security.addProvider(new BouncyCastleProvider());
        X509Certificate caCert = null;

        FileInputStream caCrtFileInputStream = new FileInputStream(caCrtFile);

        BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
        }
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("cert-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(caKs);
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        sslContext.init(null, tmf.getTrustManagers(), null);
        return sslContext.getSocketFactory();
    }

    // 双方向 ssl/tls
    public static SSLSocketFactory getSocketFactory(final String caCrtFile,
                                                    final String crtFile, final String keyFile, final String password)
            throws Exception {
        Security.addProvider(new BouncyCastleProvider());

        // CA 証明書の読み込み
        X509Certificate caCert = null;

        FileInputStream fis = new FileInputStream(caCrtFile);
        BufferedInputStream bis = new BufferedInputStream(fis);
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
        }

        // クライアント証明書の読み込み
        bis = new BufferedInputStream(new FileInputStream(crtFile));
        X509Certificate cert = null;
        while (bis.available() > 0) {
            cert = (X509Certificate) cf.generateCertificate(bis);
        }

        // クライアント秘密鍵の読み込み
        PEMParser pemParser = new PEMParser(new FileReader(keyFile));
        Object object = pemParser.readObject();
        JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
        KeyPair key = converter.getKeyPair((PEMKeyPair) object);
        pemParser.close();

        // CA 証明書はサーバー認証に使用
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("ca-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init(caKs);

        // クライアントの鍵と証明書はサーバーに送信し、認証に使用
        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        ks.setCertificateEntry("certificate", cert);
        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
                new java.security.cert.Certificate[]{cert});
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
                .getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

        // 最後に SSL ソケットファクトリを作成
        SSLContext context = SSLContext.getInstance("TLSv1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

        return context.getSocketFactory();
    }
}

テスト

以下のコマンドで実行します。

shell
mvn compile exec:java -Dexec.mainClass="io.emqx.mqtt.MqttSample"

コンソール出力例:

Connecting to broker: tcp://broker.emqx.io:1883
Connected to broker: tcp://broker.emqx.io:1883
Subscribed to topic: test/topic
Message published
deliveryComplete
Received message: 
  topic:test/topic
  Qos:2
  payload:Hello World
Disconnected

これで、接続、サブスクライブ、パブリッシュ、メッセージ受信の一連の流れが完了します。

さらに詳しく

上記は paho.mqtt.java クライアントライブラリを使って EMQX Cloud に接続する方法の紹介です。ソースコードは GitHub で確認できます。他の言語の例も GitHub で多数公開されています。