Connect via Eclipse Paho Java
In this tutorial, you will learn how to use Eclipse Paho Java Client to implement the connection, subscription, messaging, unsubscription, and other functions between the client and MQTT broker.
Eclipse Paho Java is an open-source MQTT client library that can be used to implement MQTT communication protocol in Java applications. It offers multiple APIs for simplifying the implementation and usage of MQTT protocol and supports various MQTT versions. Using it, one can easily create MQTT clients, send or receive messages, and handle operations such as connecting, subscribing, and unsubscribing. Eclipse Paho Java also offers advanced features like auto-reconnect and SSL secure connectivity. It is a powerful and easy-to-use MQTT client library that can help Java developers quickly implement the MQTT communication protocol.
Prerequisites
Before connecting to the EMQX Platform using the Eclipse Paho Java client, ensure you have completed the following prerequisites:
- Deploy MQTT Broker
- Install Maven
Deploy MQTT Broker
Install Maven
The project uses Maven as the build tool. Please install Maven first.
Create project
- Create a Maven project with
Intellij IDEA, refer to Creating a Maven project - Create package
io.emqx.mqttundersrc/main/java
Add MQTT dependency
Add the dependency definition to the pom.xml
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>Run
mvn installConnect over TCP Port
The example code will use a public MQTT server for connection. If you have created your own deployment, please find the corresponding connection address in the deployment console and set the username and password accordingly.
Connect Options
Create MqttSample.java, set up the broker, port, topic and authentication.
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
// persistence
MemoryPersistence persistence = new MemoryPersistence();
// MQTT connect options
MqttConnectOptions connOpts = new MqttConnectOptions();
// authentication
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());Connect
MqttClient client = new MqttClient(broker, clientId, persistence);
// callback
client.setCallback(new SampleCallback());
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);The content of SampleCallback.java as bellow:
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}Connect over SSL/TLS Port
The example code will use a public MQTT server for connection. If you have created your own deployment, please find the corresponding connection address in the deployment console and set the username and password accordingly.
This section introduces how to connect to a deployment with SSL/TLS one-way authentication. If you need to use two-way authentication, you can refer to here.
Connect Options
Create MqttSample.java, set up the broker, port, topic, authentication and CA certificate.
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "ssl://broker.emqx.io:8883";
String clientId = MqttClient.generateClientId();
// persistence
MemoryPersistence persistence = new MemoryPersistence();
// MQTT connect options
MqttConnectOptions connOpts = new MqttConnectOptions();
// authentication
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
// CA certificate
try {
String caCrtFile = MqttSample.class.getResource("").getPath() + "./broker.emqx.io-ca.crt";
connOpts.setSocketFactory(SSLUtils.getSingleSocketFactory(caCrtFile));
} catch (Exception e) {
throw new RuntimeException(e);
}The content of SSLUtils.java as bellow:
public class SSLUtils {
// one-way ssl/tls
public static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception {
Security.addProvider(new BouncyCastleProvider());
X509Certificate caCert = null;
FileInputStream caCrtFileInputStream = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("cert-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext.getSocketFactory();
}
}Connect
MqttClient client = new MqttClient(broker, clientId, persistence);
// callback
client.setCallback(new SampleCallback()); // The callback class SampleCallback.java is the same as the SampleCallback.java used when connecting over TCP Port.
// Connect
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);Publish and Subscribe
This section introduces how to subscribe to topics and publish messages after you successfully connect to the MQTT broker.
Subscribe to Topics
Set the topic for subscription and the QoS Level of the topic.
// subscribe topic
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);Unsubscribe to Topics
Use the following codes to unsubscribe to topics. You need to define the topic for unsubscription and the QoS level.
client.unsubscribe(topic);Publish Messages
Inform MQTT Broker about the topic and payload when publishing messages.
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");Receive Messages
The following code specifies that the client listens for message events and executes a callback function after receiving a message, printing the received message and its topic to the console.
client.setCallback(new SampleCallback());The content of SampleCallback.java as bellow:
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}Disconnect from MQTT Broker
If the client wants to disconnect actively, use the following code:
client.disconnect();The full code
MqttSample.java
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSample {
public static void main(String[] args) {
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
// persistence
MemoryPersistence persistence = new MemoryPersistence();
// connect options
MqttConnectOptions connOpts = new MqttConnectOptions();
// authentication
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
// ssl/tls config
try {
// broker = "ssl://broker.emqx.io:8883";
// one-way ssl/tls
// String caCrtFile = MqttSample.class.getResource("").getPath() + "./broker.emqx.io-ca.crt";
// connOpts.setSocketFactory(SSLUtils.getSingleSocketFactory(caCrtFile));
// two-way ssl/tls
// String caCrtFile = MqttSample.class.getResource("").getPath() + "./server-ca.crt";
// String crtFile = MqttSample.class.getResource("").getPath() + "./client.crt";
// String keyFile = MqttSample.class.getResource("").getPath() + "./client.key";
// connOpts.setSocketFactory(SSLUtils.getSocketFactory(caCrtFile, crtFile, keyFile, ""));
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// callback
client.setCallback(new SampleCallback());
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");
client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}SampleCallback.java
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}SSLUtils.class
public class SSLUtils {
// one-way ssl/tls
public static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception {
Security.addProvider(new BouncyCastleProvider());
X509Certificate caCert = null;
FileInputStream caCrtFileInputStream = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("cert-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext.getSocketFactory();
}
// two-way ssl/tls
public static SSLSocketFactory getSocketFactory(final String caCrtFile,
final String crtFile, final String keyFile, final String password)
throws Exception {
Security.addProvider(new BouncyCastleProvider());
// load CA certificate
X509Certificate caCert = null;
FileInputStream fis = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(fis);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
// load client certificate
bis = new BufferedInputStream(new FileInputStream(crtFile));
X509Certificate cert = null;
while (bis.available() > 0) {
cert = (X509Certificate) cf.generateCertificate(bis);
}
// load client private key
PEMParser pemParser = new PEMParser(new FileReader(keyFile));
Object object = pemParser.readObject();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
KeyPair key = converter.getKeyPair((PEMKeyPair) object);
pemParser.close();
// CA certificate is used to authenticate server
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(caKs);
// client key and certificates are sent to server, so it can authenticate
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", cert);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
new java.security.cert.Certificate[]{cert});
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
// finally, create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context.getSocketFactory();
}
}Test
Run with the command:
mvn compile exec:java -Dexec.mainClass="io.emqx.mqtt.MqttSample"The console output:
Connecting to broker: tcp://broker.emqx.io:1883
Connected to broker: tcp://broker.emqx.io:1883
Subscribed to topic: test/topic
Message published
deliveryComplete
Received message:
topic:test/topic
Qos:2
payload:Hello World
DisconnectedThat's the whole process of connecting, subscribing, publishing, and receiving the message.
More
The above shows you how to connect to EMQX Platform using the paho.mqtt.java client library, You can see the source code on GitHub. You can also find more examples of other language on GitHub.