# Stream Data into Kafka
TIP
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Bridge data to Kafka to setup kafka bridges in rule engine.
EMQX bridges and forwards MQTT messages to Kafka cluster:
Config file for Kafka bridge plugin: etc/plugins/emqx_bridge_kafka.conf
# Configure Kafka Cluster
## Kafka Server ## bridge.kafka.servers = 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092 bridge.kafka.servers = 127.0.0.1:9092 ## Kafka Partition Strategy. option value: per_partition | per_broker bridge.kafka.connection_strategy = per_partition bridge.kafka.min_metadata_refresh_interval = 5S ## Produce writes type. option value: sync | async bridge.kafka.produce = sync bridge.kafka.produce.sync_timeout = 3S ## Base directory for replayq to store messages on disk. ## If this config entry if missing or set to undefined, ## replayq works in a mem-only manner. ## i.e. messages are not queued on disk -- in such case, ## the send or send_sync API callers are responsible for ## possible message loss in case of application, ## network or kafka disturbances. For instance, ## in the wolff:send API caller may trap_exit then ## react on parition-producer worker pid's 'EXIT' ## message to issue a retry after restarting the producer. ## bridge.kafka.replayq_dir = /tmp/emqx_bridge_kafka/ ## default=10MB, replayq segment size. ## bridge.kafka.producer.replayq_seg_bytes = 10MB ## producer required_acks. option value all_isr | leader_only | none. bridge.kafka.producer.required_acks = none ## default=10000. Timeout leader wait for replicas before reply to producer. ## bridge.kafka.producer.ack_timeout = 10S ## default number of message sets sent on wire before block waiting for acks ## bridge.kafka.producer.max_batch_bytes = 1024KB ## by default, send max 1 MB of data in one batch (message set) ## bridge.kafka.producer.min_batch_bytes = 0 ## Number of batches to be sent ahead without receiving ack for the last request. ## Must be 0 if messages must be delivered in strict order. ## bridge.kafka.producer.max_send_ahead = 0 ## by default, no compression # bridge.kafka.producer.compression = no_compression # bridge.kafka.encode_payload_type = base64 # bridge.kafka.sock.buffer = 32KB # bridge.kafka.sock.recbuf = 32KB bridge.kafka.sock.sndbuf = 1MB # bridge.kafka.sock.read_packets = 20
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# Configure Kafka Bridge Hooks
## Bridge Kafka Hooks ## ${topic}: the kafka topics to which the messages will be published. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed . bridge.kafka.hook.client.connected.1 = {"topic": "client_connected"} bridge.kafka.hook.client.disconnected.1 = {"topic": "client_disconnected"} bridge.kafka.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"} bridge.kafka.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"} bridge.kafka.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"} bridge.kafka.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"} bridge.kafka.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
Copied!
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# Description of Kafka Bridge Hooks
Event | Description |
---|---|
bridge.kafka.hook.client.connected.1 | Client connected |
bridge.kafka.hook.client.disconnected.1 | Client disconnected |
bridge.kafka.hook.session.subscribed.1 | Topics subscribed |
bridge.kafka.hook.session.unsubscribed.1 | Topics unsubscribed |
bridge.kafka.hook.message.publish.1 | Messages published |
bridge.kafka.hook.message.delivered.1 | Messages delivered |
bridge.kafka.hook.message.acked.1 | Messages acknowledged |
# Forward Client Connected / Disconnected Events to Kafka
Client goes online, EMQX forwards 'client_connected' event message to Kafka:
topic = "client_connected", value = { "client_id": ${clientid}, "node": ${node}, "ts": ${ts} }
Copied!
1
2
3
4
5
6
2
3
4
5
6
Client goes offline, EMQX forwards 'client_disconnected' event message to Kafka:
topic = "client_disconnected", value = { "client_id": ${clientid}, "reason": ${reason}, "node": ${node}, "ts": ${ts} }
Copied!
1
2
3
4
5
6
7
2
3
4
5
6
7
# Forward Subscription Event to Kafka
topic = session_subscribed value = { "client_id": ${clientid}, "topic": ${topic}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# Forward Unsubscription Event to Kafka
topic = session_unsubscribed value = { "client_id": ${clientid}, "topic": ${topic}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# Forward MQTT Messages to Kafka
topic = message_publish value = { "client_id": ${clientid}, "username": ${username}, "topic": ${topic}, "payload": ${payload}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# Forwarding MQTT Message Deliver Event to Kafka
topic = message_delivered value = {"client_id": ${clientid}, "username": ${username}, "from": ${fromClientId}, "topic": ${topic}, "payload": ${payload}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# Forwarding MQTT Message Ack Event to Kafka
topic = message_acked value = { "client_id": ${clientid}, "username": ${username}, "from": ${fromClientId}, "topic": ${topic}, "payload": ${payload}, "qos": ${qos}, "node": ${node}, "ts": ${timestamp} }
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# Examples of Kafka Message Consumption
Kafka consumes MQTT clients connected / disconnected event messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_connected --from-beginning sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_disconnected --from-beginning
Copied!
1
2
3
2
3
Kafka consumes MQTT subscription messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_subscribed --from-beginning sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_unsubscribed --from-beginning
Copied!
1
2
3
2
3
Kafka consumes MQTT published messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_publish --from-beginning
Copied!
1
Kafka consumes MQTT message Deliver and Ack event messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_delivered --from-beginning sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_acked --from-beginning
Copied!
1
2
3
2
3
TIP
the payload is base64 encoded
# Enable Kafka Bridge
./bin/emqx_ctl plugins load emqx_bridge_kafka
Copied!
1
What’s on this page
- Configure Kafka Cluster
- Configure Kafka Bridge Hooks
- Description of Kafka Bridge Hooks
- Forward Client Connected / Disconnected Events to Kafka
- Forward Subscription Event to Kafka
- Forward Unsubscription Event to Kafka
- Forward MQTT Messages to Kafka
- Forwarding MQTT Message Deliver Event to Kafka
- Forwarding MQTT Message Ack Event to Kafka
- Examples of Kafka Message Consumption
- Enable Kafka Bridge