Skip to content

Ingest MQTT Data into Amazon S3

TIP

The Amazon S3 data integration is an EMQX Enterprise edition feature.

Amazon S3 is an internet-based storage service known for its high reliability, stability, and security, allowing for rapid deployment and ease of use. EMQX is capable of efficiently storing MQTT messages into Amazon S3 buckets, enabling flexible Internet of Things (IoT) data storage functionalities.

This page provides a detailed introduction to the data integration between EMQX and Amazon S3 and offers practical guidance on the rule and Sink creation.

TIP

EMQX is also compatible with other storage services supporting the S3 protocol, such as:

  • MinIO: MinIO is a high-performance, distributed object storage system. It is an open-source object storage server compatible with the Amazon S3 API, suitable for building private clouds.
  • Google Cloud Storage: Google Cloud Storage is Google Cloud's unified object storage for developers and enterprises to store large amounts of data. It offers an Amazon S3-compatible interface.

You can choose the appropriate storage service based on your business needs and scenarios.

How It Works

Amazon S3 data integration in EMQX is a ready-to-use feature that can be easily configured for complex business development. In a typical IoT application, EMQX acts as the IoT platform responsible for device connectivity and message transmission, while Amazon S3 serves as the data storage platform, handling message data storage.

emqx-integration-s3

EMQX utilizes rules engines and Sinks to forward device events and data to Amazon S3. Applications can read data from Amazon S3 for further data applications. The specific workflow is as follows:

  1. Device Connection to EMQX: IoT devices trigger an online event upon successfully connecting via the MQTT protocol. The event includes device ID, source IP address, and other property information.
  2. Device Message Publishing and Receiving: Devices publish telemetry and status data through specific topics. EMQX receives the messages and compares them within the rules engine.
  3. Rules Engine Processing Messages: The built-in rules engine processes messages and events from specific sources based on topic matching. It matches corresponding rules and processes messages and events, such as data format transformation, filtering specific information, or enriching messages with context information.
  4. Writing to Amazon S3: The rule triggers an action to write the message to S3. Using the Amazon S3 Sink, users can extract data from processing results and send it to S3. Messages can be stored in text or binary format, or multiple lines of structured data can be aggregated into a single CSV file, depending on the message content and the Sink configuration.

After events and message data are written to Amazon S3, you can connect to Amazon S3 to read the data for flexible application development, such as:

  • Data archiving: Store device messages as objects in Amazon S3 for long-term preservation to meet compliance requirements or business needs.
  • Data analysis: Import data from S3 into analytics services like Snowflake for predictive maintenance, device efficiency evaluation, and other data analysis services.

Features and Advantages

Using Amazon S3 data integration in EMQX can bring the following features and advantages to your business:

  • Message Transformation: Messages can undergo extensive processing and transformation in EMQX rules before being written to Amazon S3, facilitating subsequent storage and use.
  • Flexible Data Operations: With the S3 Sink, specific fields of data can be conveniently written into Amazon S3 buckets, supporting the dynamic setting of bucket and object keys for flexible data storage.
  • Integrated Business Processes: The S3 Sink allows device data to be combined with the rich ecosystem applications of Amazon S3, enabling more business scenarios like data analysis and archiving.
  • Low-Cost Long-Term Storage: Compared to databases, Amazon S3 offers a highly available, reliable, and cost-effective object storage service, suitable for long-term storage needs.

These features enable you to build efficient, reliable, and scalable IoT applications and benefit from business decisions and optimizations.

Before You Start

This section introduces the preparations required before creating an Amazon S3 Sink in EMQX.

Prerequisites

Prepare an S3 Bucket

EMQX supports Amazon S3 and other S3-compatible storage services. You can use AWS cloud services or deploy a MinIO instance with Docker.

Create a Connector

Before adding the S3 Sink, you need to create the corresponding connector.

  1. Go to the Dashboard Integration -> Connector page.
  2. Click the Create button in the top right corner.
  3. Select Amazon S3 as the connector type and click next.
  4. Enter the connector name, a combination of upper and lowercase letters and numbers. Here, enter my-s3.
  5. Enter the connection information.
    • If you are using the Amazon S3 bucket, enter the following information:
      • Host: The host varies by region and is formatted as s3.{region}.amazonaws.com.
      • Port: Enter 443.
      • Access Key ID and Secret Access Key: Enter the access keys created in AWS.
    • If you are using MinIO, enter the following information:
      • Host: Enter 127.0.0.1. If you are running MinIO remotely, enter the actual host address.
      • Port: Enter 9000.
      • Access Key ID and Secret Access Key: Enter the access keys created in MinIO.
  6. Use the default values for the remaining settings.
  7. Before clicking Create, you can click Test Connectivity to test if the connector can connect to the S3 service.
  8. Click the Create button at the bottom to complete the connector creation.

You have now completed the connector creation and will proceed to create a rule and Sink for specifying the data to be written into the S3 service.

Create a Rule with Amazon S3 Sink

This section demonstrates how to create a rule in EMQX to process messages from the source MQTT topic t/# and write the processed results to the iot-data bucket in S3 through the configured Sink.

  1. Go to the Dashboard Integration -> Rules page.

  2. Click the Create button in the top right corner.

  3. Enter the rule ID my_rule, and input the following rule SQL in the SQL editor:

    sql
    SELECT
      *
    FROM
        "t/#"

    TIP

    If you are new to SQL, you can click SQL Examples and Enable Debug to learn and test the rule SQL results.

  4. Add an action, select Amazon S3 from the Action Type dropdown list, keep the action dropdown as the default create action option, or choose a previously created Amazon S3 action from the action dropdown. Here, create a new Sink and add it to the rule.

  5. Enter the Sink's name and description.

  6. Select the my-s3 connector created earlier from the connector dropdown. You can also click the create button next to the dropdown to quickly create a new connector in the pop-up box. The required configuration parameters can be found in Create a Connector.

  7. Set the Bucket by entering iot-data. This field also supports ${var} format placeholders, but ensure the corresponding name bucket is created in S3 in advance.

  8. Select ACL as needed, specifying the access permission for the uploaded object.

  9. Select the Upload Method. The differences between the two methods are as follows:

    • Direct Upload: Each time the rule is triggered, data is uploaded directly to S3 according to the preset object key and content. This method is suitable for storing binary or large text data. However, it may generate a large number of files.
    • Aggregated Upload: This method packages the results of multiple rule triggers into a single file (such as a CSV file) and uploads it to S3, making it suitable for storing structured data. It can reduce the number of files and improve write efficiency.

    The configuration parameters differ for each method. Please configure according to the selected method:

  10. Set the Object Content. By default, it is a JSON text format containing all fields. It supports ${var} format placeholders. Here, enter ${payload} to indicate using the message body as the object content. In this case, the object's storage format depends on the message body's format, supporting compressed packages, images, or other binary formats.

  11. Expand Advanced Settings and configure the advanced setting options as needed (optional). For more details, refer to Advanced Settings.

  12. Use the default values for the remaining settings. Click the Create button to complete the Sink creation. After successful creation, the page will return to the rule creation, and the new Sink will be added to the rule actions.

  13. Back on the rule creation page, click the Create button to complete the entire rule creation process.

You have now successfully created the rule. You can see the newly created rule on the Rules page and the new S3 Sink on the Actions (Sink) tab.

You can also click Integration -> Flow Designer to view the topology. The topology visually shows how messages under the topic t/# are written into S3 after being parsed by the rule my_rule.

Test the Rule

This section shows how to test the rule configured with the direct upload method.

Use MQTTX to publish a message to the topic t/1:

bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello S3" }'

After sending a few messages, access the MinIO console or Amazon S3 console to see the result.

Advanced Settings

This section delves into the advanced configuration options available for the S3 Sink. In the Dashboard, when configuring the Sink, you can expand Advanced Settings to adjust the following parameters based on your specific needs.

Field NameDescriptionDefault Value
Buffer Pool SizeSpecifies the number of buffer worker processes, which are allocated to manage the data flow between EMQX and S3. These workers temporarily store and process data before sending it to the target service, crucial for optimizing performance and ensuring smooth data transmission.16
Request TTLThe "Request TTL" (Time To Live) configuration setting specifies the maximum duration, in seconds, that a request is considered valid once it enters the buffer. This timer starts ticking from the moment the request is buffered. If the request stays in the buffer for a period exceeding this TTL setting or if it is sent but does not receive a timely response or acknowledgment from S3, the request is deemed to have expired.
Health Check IntervalSpecifies the time interval (in seconds) for the Sink to perform automatic health checks on its connection with S3.15
Max Buffer Queue SizeSpecifies the maximum number of bytes that can be buffered by each buffer worker process in the S3 Sink. The buffer workers temporarily store data before sending it to S3, acting as intermediaries to handle the data stream more efficiently. Adjust this value based on system performance and data transmission requirements.256
Query ModeAllows you to choose between synchronous or asynchronous request modes to optimize message transmission according to different requirements. In asynchronous mode, writing to S3 does not block the MQTT message publishing process. However, this may lead to clients receiving messages before they arrive at S3.Asynchronous
In-flight Window"In-flight queue requests" refer to requests that have been initiated but have not yet received a response or acknowledgment. This setting controls the maximum number of in-flight queue requests that can exist simultaneously during Sink communication with S3.
When Request Mode is set to asynchronous, the "Request In-flight Queue Window" parameter becomes particularly important. If strict sequential processing of messages from the same MQTT client is crucial, then this value should be set to 1.
100
Min Part SizeThe minimum chunk size for part uploads after aggregation is complete. The data to be uploaded will accumulate in memory until it reaches this size.5MB
Max Part SizeThe maximum chunk size for part uploads. The S3 Sink will not attempt to upload parts exceeding this size.5GB