# Ingest Data into Pulsar
TIP
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Bridge data to Pulsar to setup pulasr bridges in rule engine.
EMQX bridges and forwards MQTT messages to Pulsar cluster:
Config file for Pulsar bridge plugin: etc/plugins/emqx_bridge_pulsar.conf
# Configure Pulsar Cluster
## Pulsar Server bridge.pulsar.servers = 127.0.0.1:6650 ## Pick a partition producer and sync/async bridge.pulsar.produce = sync ## bridge.pulsar.produce.sync_timeout = 3s ## bridge.pulsar.producer.batch_size = 1000 ## by default, no compression ## bridge.pulsar.producer.compression = no_compression ## bridge.pulsar.encode_payload_type = base64 ## bridge.pulsar.sock.buffer = 32KB ## bridge.pulsar.sock.recbuf = 32KB bridge.pulsar.sock.sndbuf = 1MB ## bridge.pulsar.sock.read_packets = 20
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Configure Pulsar Bridge Hooks
## Bridge Pulsar Hooks ## ${topic}: the pulsar topics to which the messages will be published. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed . ## Client Connected Record Hook bridge.pulsar.hook.client.connected.1 = {"topic": "client_connected"} ## Client Disconnected Record Hook bridge.pulsar.hook.client.disconnected.1 = {"topic": "client_disconnected"} ## Session Subscribed Record Hook bridge.pulsar.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"} ## Session Unsubscribed Record Hook bridge.pulsar.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"} ## Message Publish Record Hook bridge.pulsar.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"} ## Message Delivered Record Hook bridge.pulsar.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"} ## Message Acked Record Hook bridge.pulsar.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"} ## More Configures ## partitioner strategy: ## Option: random | roundrobin | first_key_dispatch ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "strategy":"random"} ## key: ## Option: ${clientid} | ${username} ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "key":"${clientid}"} ## format: ## Option: json | json ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "format":"json"}
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Description of Pulsar Bridge Hooks
Event | Description |
---|---|
bridge.pulsar.hook.client.connected.1 | Client connected |
bridge.pulsar.hook.client.disconnected.1 | Client disconnected |
bridge.pulsar.hook.session.subscribed.1 | Topics subscribed |
bridge.pulsar.hook.session.unsubscribed.1 | Topics unsubscribed |
bridge.pulsar.hook.message.publish.1 | Messages published |
bridge.pulsar.hook.message.delivered.1 | Messages delivered |
bridge.pulsar.hook.message.acked.1 | Messages acknowledged |
# Forward Client Connected / Disconnected Events to Pulsar
Client goes online, EMQX forwards 'client_connected' event message to Pulsar:
topic = "client_connected", value = { "client_id": ${clientid}, "username": ${username}, "node": ${node}, "ts": ${ts} }
Copied!
1
2
3
4
5
6
7
2
3
4
5
6
7
Client goes offline, EMQX forwards 'client_disconnected' event message to Pulsar:
topic = "client_disconnected", value = { "client_id": ${clientid}, "username": ${username}, "reason": ${reason}, "node": ${node}, "ts": ${ts} }
Copied!
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# Forward Subscription Event to Pulsar
topic = session_subscribed value = { "client_id": ${clientid}, "topic": ${topic}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# Forward Unsubscription Event to Pulsar
topic = session_unsubscribed value = { "client_id": ${clientid}, "topic": ${topic}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# Forward MQTT Messages to Pulsar
topic = message_publish value = { "client_id": ${clientid}, "username": ${username}, "topic": ${topic}, "payload": ${payload}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# Forwarding MQTT Message Deliver Event to Pulsar
topic = message_delivered value = {"client_id": ${clientid}, "username": ${username}, "from": ${fromClientId}, "topic": ${topic}, "payload": ${payload}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# Forwarding MQTT Message Ack Event to Pulsar
topic = message_acked value = { "client_id": ${clientid}, "username": ${username}, "from": ${fromClientId}, "topic": ${topic}, "payload": ${payload}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# Examples of Pulsar Message Consumption
Pulsar consumes MQTT clients connected / disconnected event messages:
sh pulsar-client consume client_connected -s "client_connected" -n 1000 sh pulsar-client consume client_disconnected -s "client_disconnected" -n 1000
Copied!
1
2
3
2
3
Pulsar consumes MQTT subscription messages:
sh pulsar-client consume session_subscribed -s "session_subscribed" -n 1000 sh pulsar-client consume session_unsubscribed -s "session_unsubscribed" -n 1000
Copied!
1
2
3
2
3
Pulsar consumes MQTT published messages:
sh pulsar-client consume message_publish -s "message_publish" -n 1000
Copied!
1
Pulsar consumes MQTT message Deliver and Ack event messages:
sh pulsar-client consume message_delivered -s "message_delivered" -n 1000 sh pulsar-client consume message_acked -s "message_acked" -n 1000
Copied!
1
2
3
2
3
TIP
The payload is base64 encoded default
# Enable Pulsar Bridge
./bin/emqx_ctl plugins load emqx_bridge_pulsar
Copied!
1
What’s on this page
- Configure Pulsar Cluster
- Configure Pulsar Bridge Hooks
- Description of Pulsar Bridge Hooks
- Forward Client Connected / Disconnected Events to Pulsar
- Forward Subscription Event to Pulsar
- Forward Unsubscription Event to Pulsar
- Forward MQTT Messages to Pulsar
- Forwarding MQTT Message Deliver Event to Pulsar
- Forwarding MQTT Message Ack Event to Pulsar
- Examples of Pulsar Message Consumption
- Enable Pulsar Bridge