Eclipse Paho Java を使った接続
このチュートリアルでは、Eclipse Paho Java クライアント を使用して、クライアントと MQTT ブローカー間の接続、サブスクライブ、メッセージ送受信、サブスクリプション解除などの機能を実装する方法を学びます。
Eclipse Paho Java は、Java アプリケーションで MQTT 通信プロトコルを実装するためのオープンソースの MQTT クライアントライブラリです。複数の API を提供し、MQTT プロトコルの実装と利用を簡素化し、さまざまな MQTT バージョンをサポートしています。これを使うことで、簡単に MQTT クライアントを作成し、メッセージの送受信や接続、サブスクライブ、サブスクリプション解除などの操作を行えます。さらに、Eclipse Paho Java は自動再接続や SSL セキュア接続などの高度な機能も備えており、Java 開発者が迅速に MQTT 通信プロトコルを実装するのに役立つ強力で使いやすい MQTT クライアントライブラリです。
前提条件
Eclipse Paho Java クライアントを使って EMQX Cloud に接続する前に、以下の前提条件を満たしていることを確認してください。
- MQTT ブローカーのデプロイ
- Maven のインストール
MQTT ブローカーのデプロイ
アプリケーションをEMQX Cloudに接続するには、デプロイメントを作成して設定する必要があります。
サーバレスデプロイメント
EMQX Cloudコンソールでサーバレスデプロイメントを作成します。
デプロイメントが作成されて稼働したら、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を確認します。
ブローカーアドレス
ポート番号(サーバレスではTLSポートのみ対応)
サーバレスデプロイメントはTLS接続が必須です。概要ページからCA証明書をダウンロードし、TLS用のポート
8883を使用してください。デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。
詳細はサーバレスポートガイドを参照してください。
Dedicated Flex または BYOC デプロイメント
EMQX CloudコンソールでDedicated FlexまたはBYOCのデプロイメントを作成できます。
作成後、デプロイメントの概要ページに移動し、以下を含むMQTT接続情報を取得します。
ブローカーアドレス
MQTTおよびWebSocket用のTCPおよびTLSポート番号(TCPおよびTLS接続の両方に対応)
デプロイメントのアクセス制御 -> クライアント認証で、デフォルト認証(ユーザー名/パスワード)を設定します。
詳細なポート設定については、Dedicated & BYOCポートガイドをご覧ください。
Maven のインストール
本プロジェクトでは Maven をビルドツールとして使用します。まずは Maven のインストール を行ってください。
プロジェクトの作成
Intellij IDEAで Maven プロジェクトを作成します。詳細は Maven プロジェクトの作成 を参照してください。src/main/java以下にio.emqx.mqttパッケージを作成します。
MQTT 依存関係の追加
pom.xml に以下の依存関係を追加します。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>次に以下のコマンドを実行します。
mvn installTCP ポートでの接続
例示コードではパブリック MQTT サーバーを使用して接続します。独自にデプロイした場合は、デプロイメントコンソールで該当の接続アドレスを確認し、ユーザー名とパスワードを適切に設定してください。
接続オプション
MqttSample.java を作成し、ブローカー、ポート、トピック、認証情報を設定します。
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
// パーシステンス
MemoryPersistence persistence = new MemoryPersistence();
// MQTT 接続オプション
MqttConnectOptions connOpts = new MqttConnectOptions();
// 認証
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());接続
MqttClient client = new MqttClient(broker, clientId, persistence);
// コールバック設定
client.setCallback(new SampleCallback());
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);SampleCallback.java の内容は以下の通りです:
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}SSL/TLS ポートでの接続
例示コードではパブリック MQTT サーバーを使用して接続します。独自にデプロイした場合は、デプロイメントコンソールで該当の接続アドレスを確認し、ユーザー名とパスワードを適切に設定してください。
このセクションでは、SSL/TLS の片方向認証での接続方法を紹介します。双方向認証を使用する場合は、こちら を参照してください。
接続オプション
MqttSample.java を作成し、ブローカー、ポート、トピック、認証情報、CA 証明書を設定します。
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "ssl://broker.emqx.io:8883";
String clientId = MqttClient.generateClientId();
// パーシステンス
MemoryPersistence persistence = new MemoryPersistence();
// MQTT 接続オプション
MqttConnectOptions connOpts = new MqttConnectOptions();
// 認証
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
// CA 証明書
try {
String caCrtFile = MqttSample.class.getResource("").getPath() + "./broker.emqx.io-ca.crt";
connOpts.setSocketFactory(SSLUtils.getSingleSocketFactory(caCrtFile));
} catch (Exception e) {
throw new RuntimeException(e);
}SSLUtils.java の内容は以下の通りです:
public class SSLUtils {
// 片方向 ssl/tls
public static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception {
Security.addProvider(new BouncyCastleProvider());
X509Certificate caCert = null;
FileInputStream caCrtFileInputStream = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("cert-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext.getSocketFactory();
}
}接続
MqttClient client = new MqttClient(broker, clientId, persistence);
// コールバック設定
client.setCallback(new SampleCallback()); // TCP ポート接続時と同じ SampleCallback.java を使用します。
// 接続
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);パブリッシュとサブスクライブ
このセクションでは、MQTT ブローカーへの接続に成功した後に、トピックのサブスクライブとメッセージのパブリッシュを行う方法を紹介します。
トピックのサブスクライブ
サブスクライブするトピックとトピックの QoS レベル を設定します。
// トピックをサブスクライブ
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);トピックのサブスクリプション解除
トピックのサブスクリプションを解除するには、以下のコードを使用します。解除するトピックを指定してください。
client.unsubscribe(topic);メッセージのパブリッシュ
メッセージをパブリッシュする際は、ブローカーにトピックとペイロードを通知します。
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");メッセージの受信
以下のコードは、クライアントがメッセージイベントを監視し、メッセージ受信後にコールバック関数を実行して、受信したメッセージとトピックをコンソールに出力することを示しています。
client.setCallback(new SampleCallback());SampleCallback.java の内容は以下の通りです:
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}MQTT ブローカーからの切断
クライアントが能動的に切断する場合は、以下のコードを使用します。
client.disconnect();完全なコード例
MqttSample.java
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSample {
public static void main(String[] args) {
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
// パーシステンス
MemoryPersistence persistence = new MemoryPersistence();
// 接続オプション
MqttConnectOptions connOpts = new MqttConnectOptions();
// 認証
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
// ssl/tls 設定
try {
// broker = "ssl://broker.emqx.io:8883";
// 片方向 ssl/tls
// String caCrtFile = MqttSample.class.getResource("").getPath() + "./broker.emqx.io-ca.crt";
// connOpts.setSocketFactory(SSLUtils.getSingleSocketFactory(caCrtFile));
// 双方向 ssl/tls
// String caCrtFile = MqttSample.class.getResource("").getPath() + "./server-ca.crt";
// String crtFile = MqttSample.class.getResource("").getPath() + "./client.crt";
// String keyFile = MqttSample.class.getResource("").getPath() + "./client.key";
// connOpts.setSocketFactory(SSLUtils.getSocketFactory(caCrtFile, crtFile, keyFile, ""));
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// コールバック設定
client.setCallback(new SampleCallback());
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");
client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}SampleCallback.java
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}SSLUtils.class
public class SSLUtils {
// 片方向 ssl/tls
public static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception {
Security.addProvider(new BouncyCastleProvider());
X509Certificate caCert = null;
FileInputStream caCrtFileInputStream = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("cert-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext.getSocketFactory();
}
// 双方向 ssl/tls
public static SSLSocketFactory getSocketFactory(final String caCrtFile,
final String crtFile, final String keyFile, final String password)
throws Exception {
Security.addProvider(new BouncyCastleProvider());
// CA 証明書の読み込み
X509Certificate caCert = null;
FileInputStream fis = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(fis);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
// クライアント証明書の読み込み
bis = new BufferedInputStream(new FileInputStream(crtFile));
X509Certificate cert = null;
while (bis.available() > 0) {
cert = (X509Certificate) cf.generateCertificate(bis);
}
// クライアント秘密鍵の読み込み
PEMParser pemParser = new PEMParser(new FileReader(keyFile));
Object object = pemParser.readObject();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
KeyPair key = converter.getKeyPair((PEMKeyPair) object);
pemParser.close();
// CA 証明書はサーバー認証に使用
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(caKs);
// クライアントの鍵と証明書はサーバーに送信し、認証に使用
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", cert);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
new java.security.cert.Certificate[]{cert});
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
// 最後に SSL ソケットファクトリを作成
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context.getSocketFactory();
}
}テスト
以下のコマンドで実行します。
mvn compile exec:java -Dexec.mainClass="io.emqx.mqtt.MqttSample"コンソール出力例:
Connecting to broker: tcp://broker.emqx.io:1883
Connected to broker: tcp://broker.emqx.io:1883
Subscribed to topic: test/topic
Message published
deliveryComplete
Received message:
topic:test/topic
Qos:2
payload:Hello World
Disconnectedこれで、接続、サブスクライブ、パブリッシュ、メッセージ受信の一連の流れが完了します。
さらに詳しく
上記は paho.mqtt.java クライアントライブラリを使って EMQX Cloud に接続する方法の紹介です。ソースコードは GitHub で確認できます。他の言語の例も GitHub で多数公開されています。