# 使用 Java SDK 连接到部署
在本教程中您将学习在 Java 中使用 Eclipse Paho Java Client (opens new window) 客户端连接到 EMQX Cloud 部署。
# 前提条件
- 已经创建了部署,在 部署概览 下可以查看到连接相关的信息,请确保部署状态为运行中。同时你可以使用 WebSocket 测试连接到 MQTT 服务器。
- 在
认证鉴权
>认证
中设置用户名和密码,用于连接验证。
本项目构建工具使用 Maven
# 初始化项目
- 使用
Intellij IDEA
新建一个 Maven 项目,参考 Creating a Maven project (opens new window) - 在
src/main/java
下创建包:io.emqx.mqtt
# 安装依赖
添加依赖到 pom.xml
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
1
2
3
4
5
2
3
4
5
然后运行
mvn install
1
# 连接
请在控制台的 部署概览 找到相关的地址以及端口信息,需要注意如果是基础版,端口不是 1883 或 8883 端口,请确认好端口。并且在 认证鉴权 中添加认证信息。
# 连接设置
创建 MqttSample.java
,设置 MQTT Broker 连接地址,端口,topic 以及认证信息。
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
// 持久化
MemoryPersistence persistence = new MemoryPersistence();
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
// 设置认证信息
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 开始连接
MqttClient client = new MqttClient(broker, clientId, persistence);
// 设置回调
client.setCallback(new SampleCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);
1
2
3
4
5
6
7
2
3
4
5
6
7
回调类 SampleCallback.java
文件内容如下:
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
// 连接丢失
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
// 收到消息
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
// 消息传递成功
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 订阅
// 订阅 topic
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);
1
2
3
2
3
# 发布
// 发布消息
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");
1
2
3
4
5
2
3
4
5
# 完整代码
MqttSample.java
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSample {
public static void main(String[] args) {
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
// 持久化
MemoryPersistence persistence = new MemoryPersistence();
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
// 设置认证信息
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// 设置回调
client.setCallback(new SampleCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);
// 订阅 topic
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);
// 发布消息
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");
client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
SampleCallback.java
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
// 连接丢失
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
// 收到消息
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:" + new String(message.getPayload()));
}
// 消息传递成功
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 测试验证
使用以下命令运行代码
mvn compile exec:java -Dexec.mainClass="io.emqx.mqtt.MqttSample"
1
控制台输出如下:
Connecting to broker: tcp://broker.emqx.io:1883
Connected to broker: tcp://broker.emqx.io:1883
Subscribed to topic: test/topic
Message published
Received message:
topic:test/topic
Qos:1
payload:Hello World
deliveryComplete
Disconnected
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
我们可以看到已经成功连接,成功订阅,发布,并且收到发布的消息。
# 更多内容
以上为您演示了如何使用 paho.mqtt.java 客户端库连接到 EMQX Cloud,可以在 这里 (opens new window) 下载到示例的源码。 同时也可以在 GitHub (opens new window) 上找到更多起他语言的 Demo 示例。