Skip to content
On this page

Integrate with Kafka

In this article, we will simulate temperature and humidity data and report these data to EMQX Cloud via the MQTT protocol and then use the EMQX Cloud Data Integrations to bridge the data into Kafka.

Before you start, you need to complete the following operations:

  • A deployment (EMQX Cluster) has been created on EMQX Cloud.

  • For Professional Plan users: Please complete Peering Connection Creation first, all IPs mentioned below refer to the internal network IP of the resource.(Professional Plan with a NAT gateway can also use public IP to connect to resources).

  • For BYOC Plan users: Please establish a peering connection between the VPC where BYOC is deployed and the VPC where the resources are located. All IPs mentioned below refer to the internal IP of the resources. If you need to access the resources via public IP addresses, please configure a NAT gateway in your public cloud console for the VPC where BYOC is deployed.

Kafka configuration

  1. Install Kafka

    # Install zookeeper
    docker run -d --restart=always \
        --name zookeeper \
        -p 2181:2181 \
    # Install Kafka and open port 9092
    docker run -d  --restart=always --name mykafka \
        -p 9092:9092 \
        -e HOST_IP=localhost \
        -e KAFKA_ADVERTISED_PORT=9092 \
        -e KAFKA_ADVERTISED_HOST_NAME=<server IP> \
        -e KAFKA_BROKER_ID=1 \
        -e KAFKA_ZOOKEEPER_CONNECT=<server IP>:2181 \
        -e ZK=<server IP> \
  2. Create a topic

    # Create the "emqx" topic in the Kafka instance
    $ docker exec -it mykafka /opt/kafka/bin/ --zookeeper <broker IP>:2181 --replication-factor 1 --partitions 1 --topic emqx --create

    If the topic is successfully created, the message of Created topic emqx will be returned.

Deployment Data Integrations Configuration

Go to the Data Integrations page

  1. Create kafka resources and verify that they are available.

    On the data integration page, click kafka resources, fill in the kafka connection details, and then click test. Please check the kafka service if the test fails. create resource

  2. Click the New button after the test is passed, and you will see the Create Resource successfully message.


  3. Create a new rule

    Put the following SQL statement in the SQL input field. The device reporting message time (up timestamp), client ID, and message body (Payload) will be retrieved from the temp hum/emqx subject in the SQL rule, and the device ambient temperature and humidity will be read from the message body.

    timestamp as up_timestamp, 
    clientid as client_id, 
    payload.temp as temp,
    payload.hum as hum

    rule sql

  4. Rule SQL Testing

    To see if the rule SQL fulfills our requirements, click SQL test and fill in the test payload, topic, and client information.

    rule sql

  5. Add Action to Rule

    Click Next to add a Kafka forwarding action to the rule once the SQL test succeeds. To demonstrate how to bridge the data reported by the device to Kafka, we'll utilize the following Kafka topic and message template.

    # kafka topic
    # kafka message template 
    {"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}

    rule sql

  6. After successfully binding the action to the rule, click View Details to see the rule sql statement and the bound actions.


  7. To see the created rules, go to Data Integrations/View Created Rules. Click the Monitor button to see the detailed match data of the rule.



  1. Use MQTTX to simulate temperature and humidity data reporting

    You need to replace with the created deployment connection address, add client authentication information to the EMQX Dashboard. MQTTX

  2. View data bridging results

    # Go to the Kafka instance and view the emqx topic
    $ docker exec -it mykafka /opt/kafka/bin/ --bootstrap-server <broker IP>:9092  --topic emqx --from-beginning