# 将 MQTT 数据传输到 Azure Event Hubs

[Azure Event Hub](https://azure.microsoft.com/en-us/products/event-hubs) 是一个用于数据摄取的实时托管事件流平台。EMQX 与 Azure Event Hub 的集成为用户在高吞吐量情况下提供了可靠的数据传输和处理能力。Azure Event Hubs 可作为 EMQX 与 Azure 丰富的云服务应用之间的数据通道，将物联网数据集成到 Azure Blob Storage、Azure Stream Analytics 以及部署在 Azure 虚拟机上的各类应用和服务当中。目前， EMQX 支持使用 SASL/PLAIN 身份验证、通过与 Kafka 协议兼容的 Apache Kafka 终端点进行 Azure Event Hub 集成。

本页详细介绍了 EMQX 与 Azure Event Hubs 的数据集成并提供了实用的规则和 Sink 创建指导。

## 工作原理

Azure Event Hubs 数据集成是 EMQX 的一个开箱即用功能，旨在帮助用户无缝集成 MQTT 数据流与 Azure Event Hubs，利用其丰富的服务和能力进行物联网应用开发。

![emqx-integration-azure](./assets/emqx-integration-azure.jpg)

EMQX 通过规则引擎和 Sink 将 MQTT 数据转发到 Azure Event Hubs。完整的过程如下：

1. **物联网设备发布消息**：设备通过特定主题发布遥测和状态数据，触发规则引擎。
2. **规则引擎处理消息**：使用内置规则引擎，基于主题匹配处理来自特定来源的 MQTT 消息。规则引擎匹配相应的规则并处理消息，例如转换数据格式、过滤特定信息或用上下文信息丰富消息。
3. **桥接到 Azure Event Hubs**：规则触发将消息转发到 Azure Event Hubs 的动作，允许轻松配置数据属性、排序键，并将 MQTT 主题映射到 Azure Event Hubs 消息头。这为数据集成提供了更丰富的上下文信息和顺序保证，使得物联网数据处理更加灵活。

在 MQTT 消息数据写入 Azure Event Hubs 之后，您可以进行灵活的应用开发，例如：

- 实时数据处理和分析：利用 Azure Event Hubs 强大的数据处理和分析工具及其自身的流处理能力，对消息数据进行实时处理和分析，获取有价值的洞察和决策支持。
- 事件驱动功能：触发 Azure 事件处理，实现动态灵活的功能触发和处理。
- 数据存储和共享：将消息数据传输到 Azure Event Hubs 存储服务，安全存储和管理大量数据。这使您能够与其他 Azure 服务共享和分析这些数据，以满足各种业务需求。

## 特性与优势

EMQX 与 Azure Event Hubs 的数据集成可以为您的业务带来以下功能和优势：

- **高性能海量消息吞吐**：EMQX 支持海量 MQTT 客户端连接，每秒数百万条消息能够持续引入 Azure Event Hubs，可以获得极低的消息传输与存储延迟时间，并在 Azure Event Hubs 上配置保留时间实现消息量的控制。
- **灵活的数据映射**：通过配置的 Azure Event Hubs，可以实现 MQTT 主题与 Azure Event Hubs 事件中心的灵活映射，并且支持 MQTT 用户属性与 Azure Event Hubs 消息头的映射，这为数据集成提供了更丰富的上下文信息和顺序保证。
- **弹性伸缩支持**：EMQX 与 Azure Event Hubs 均可以支持弹性伸缩，能够随着应用规格进行扩展，轻松将物联网数据规模从数 MB 轻松扩展到数 TB。
- **丰富的生态系统**：得益于采用标准 MQTT 协议以及各类主流物联网传输协议的支持，EMQX 能够实现各类物联网设备的接入。结合 Azure Event Hubs 在 Azure Functions、各类编程语言 SDK 以及 Kafka 生态系统中的支持，能够轻松打通设备到云端的数据通道，实现无缝物联网数据接入与处理。

这些功能增强了集成能力和灵活性，可以帮助用户快速实现海量物联网设备数据与 Azure 的连接。让用户更便捷的获得云计算带来的数据分析和智能化能力，构建功能强大的数据驱动型应用。

## 准备工作

本节介绍了在 EMQX 中创建 Azure Event Hubs 数据集成之前需要做的准备工作，包括如何设置 Azure Event Hubs。

### 前置准备

- 了解[规则](./rules.md)。
- 了解[数据集成](./data-bridges.md)。

### 设置 Azure Event Hubs

为了使用 Azure Event Hub 数据集成，必须在 Azure 账户中设置命名空间和事件中心。以下 Azure 官方文档详细介绍了如何进行设置。

- [什么是适用于 Apache Kafka 的 Azure 事件中心](https://learn.microsoft.com/zh-cn/azure/event-hubs/azure-event-hubs-kafka-overview)
- [快速入门：使用 Azure 门户创建事件中心](https://learn.microsoft.com/zh-cn/azure/event-hubs/event-hubs-create)
- [快速入门：使用 Azure 事件中心和 Apache Kafka 流式传输数据](https://learn.microsoft.com/zh-cn/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs?tabs=connection-string)
  - 遵循“连接字符串”说明，这是 EMQX 用于连接的方式。
- [获取事件中心连接字符串](https://learn.microsoft.com/zh-cn/azure/event-hubs/event-hubs-get-connection-string)

## 创建连接器

在添加 Azure Event Hubs Sink 前，您需要创建连接器用于将 Sink 连接到 Azure Event Hubs 服务器。

1. 进入 EMQX Dashboard，点击**集成** -> **连接器**。

2. 点击页面右上角的**创建**。

3. 在**创建连接器**页面，点击选择 **Azure Event Hubs**，然后点击**下一步**。

4. 为连接器输入一个名称，名称应为大小写字母和数字的组合，此处我们输入 `my-azure-event-hubs`。

5. 配置连接信息。

   - **引导主机**：输入命名空间的主机名。默认端口为 `9093`。其他字段按实际情况设置。
   - **连接字符串**：输入命名空间的连接字符串。可以在命名空间共享访问策略的“连接字符串 - 主键”中找到。有关详细信息，请参阅 [获取事件中心连接字符串](https://learn.microsoft.com/zh-cn/azure/event-hubs/event-hubs-get-connection-string)。
   - **启用 TLS**：连接到 Azure Event Hub 时默认启用 TLS。有关 TLS 连接选项的详细信息，请参阅[外部资源访问的 TLS](../network/overview.md#启用-tls-加密访问外部资源)。
   - **高级设置（可选）**：请参阅[高级配置](#高级配置)。

6. 在点击**创建**之前，您可以点击**测试连接**以测试连接器是否能连接到 Azure Event Hubs 服务器。

7. 点击底部的**创建**按钮完成连接器的创建。在弹出对话框中，您可以点击**返回连接器列表**或点击**创建规则**继续创建带有 Azure Event Hubs Sink 的规则，以指定要转发到 Azure Event Hubs 的数据。详细步骤请参见[创建 Azure Event Hubs Sink 规则](#创建-azure-event-hubs-sink-规则)。

## 创建 Azure Event Hubs Sink 规则

本节演示了如何在Dashboard 中创建规则和 Azure Event Hubs Sink，并将 Sink 添加到规则中。

1. 转到 Dashboard **集成** -> **规则页面**。

2. 点击页面右上角的创建。

3. 输入规则 ID，例如  `my_rule`。

4. 在 SQL 编辑器中输入规则，例如我们希望将 `t/#` 主题的 MQTT 消息存储至 Azure Event Hubs，可通过如下规则实现：

   注意：如果要自定义 SQL 语句，请确保 `SELECT` 字段包含 Sink 中所需的所有字段。

   ```sql
   SELECT
   *
   FROM
   "t/#"
   ```

5. 点击**添加动作**按钮，从**动作类型**下拉列表中选择 Azure Event Hubs，保持动作下拉框为默认的`创建动作`选项，您也可以从动作下拉框中选择一个之前已经创建好的 Azure Event Hubs 动作。此处我们创建一个全新的 Sink 并添加到规则中。

6. 在下方的表单中输入 Sink 的名称与描述。

7. 在**连接器**下拉框中选择刚刚创建的 `my-azure-event-hubs` 连接器。您也可以点击下拉框旁边的创建按钮，在弹出框中快捷创建新的连接器，所需的配置参数按照参照[创建连接器](#创建连接器)。

8. 配置 Sink 信息，完成数据的写入：

   - **事件中心名称**：输入要使用的事件中心的名称。从 EMQX v5.7.2 开始，该字段还支持设置 Kafka 动态主题，详见[配置 Kafka 动态主题](./data-bridge-kafka.md#配置-kafka-动态主题)。
   - **Azure Event Hub 头部**：输入一个占位符，作为将在发布到 Azure Event Hub 时添加到消息中的消息标头。
   - **Azure Event Hub 头部值编码模式**：选择消息标头的值编码模式；可选值为 `none` 或 `json`。
   - **额外的 Azure Event Hub 头部信息**：您可以点击**添加**为 Azure Event Hub 消息标头提供更多的键值对。
   - **消息键**：事件中心消息键。在此处插入一个字符串，可以是纯字符串或包含占位符（${var}）的字符串。
   - **消息值**：事件中心消息值。在此处插入一个字符串，可以是纯字符串或包含占位符（${var}）的字符串。
   - **分区策略**：指定消息将如何被分派到 Azure Event Hubs 分区，默认为 `random`。
     - `random`：随机选择每个消息的分区。
     - `key_dispatch`：将 Azure Event Hubs 消息键哈希到分区号。

   - **分区限制**：限制生产者能够发送消息的最大分区数量。默认为关闭，即可以发送到所有分区。

9. **备选动作（可选）**：如果您希望在消息投递失败时提升系统的可靠性，可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时，这些备选动作将被触发。更多信息请参见：[备选动作](./data-bridges.md#备选动作)。

10. **高级设置（可选）**：根据情况配置同步/异步模式，队列与批量等参数，详细请参考 [Sink 的特性](./data-bridges.md)。

11. 在点击**创建**之前，您可以点击**测试连接**测试 Sink 是否能够连接到 Azure Event Hubs 服务器。

12. 点击**创建**按钮完成 Sink 的创建，创建成功后页面将回到创建规则，新的 Sink 将添加到规则动作中。

13. 回到规则创建页面，点击**创建**按钮完成整个规则创建。

现在您已成功创建了规则，你可以点击**集成** -> **规则**页面看到新建的规则，同时在**动作(Sink)** 标签页看到新建的 Azure Event Hubs Sink。

您也可以点击 **集成** -> **Flow 设计器**查看拓扑，通过拓扑可以直观的看到，主题 `t/#` 下的消息在经过规则 my_rule 解析后被写入到 Azure Event Hubs 中。

## 测试规则

您可以使用 [MQTTX](https://mqttx.app/zh) 来模拟客户端向 EMQX 发送 MQTT 消息来测试 Sink 和规则的运行。

1. 使用 MQTTX 向 `t/1` 主题发布消息：

```bash
   mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Azure Event Hub" }'
```

2. 在 **规则** 页面点击规则名称查看 Sink 运行统计，命中、发送成功次数应当 +1。

3. 在 Azure 门户仪表板中检查是否将消息写入配置的事件中心。使用任何兼容 Kafka 的消费者，检查消息是否被写入配置的事件中心。有关使用 Kafka CLI 的更多信息，请参阅 [Use the Kafka CLI to Send and Receive Messages to/from Azure Event Hubs for Apache Kafka Ecosystem](https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/kafka-cli)。

## 高级配置

本节描述了一些高级配置选项，这些选项可以优化您的连接器性能，并根据您的特定场景定制操作，在创建对应的对象时，您可以展开 **高级设置**并根据业务需求配置以下设置。

| 字段                     | 描述                                                         | 推荐值   |
| ------------------------ | ------------------------------------------------------------ | -------- |
| 允许自动创建主题         | （仅用于生产者）启用该选项后，当客户端发送元数据请求且目标主题不存在时，EMQX 将允许自动创建该主题。 | `不启用` |
| 连接超时                 | 等待 TCP 连接建立的最大时间，包括启用时的认证时间。          | `5`秒    |
| 启动超时时间             | 确定连接器在回应资源创建请求之前等待自动启动的资源达到健康状态的最长时间间隔（以秒为单位）。此设置有助于确保连接器在验证连接的资源（例如 Confluent 集群）完全运行并准备好处理数据事务之前不会执行操作。 | `5`秒    |
| 健康检查间隔             | 检查连接器运行状态的时间间隔。                               | `15`秒   |
| 健康检查超时             | 指定对与 Azure Event Hubs 服务的连接执行自动健康检查的超时时间。 | `60` 秒  |
| 元数据刷新最小间隔       | 客户端在刷新 Azure Event Hubs Kafka 代理和主题元数据之前必须等待的最短时间间隔。将此值设置得太小可能会不必要地增加 Kafka 服务器的负载。 | `3`秒    |
| 元数据请求超时           | 连接器从 Kafka 请求元数据时的最大等待时长。                  | `5`秒    |
| Socket 发送/收包缓存大小 | 管理 TCP socket 发送/收包缓存大小以优化网络传输性能。        | `1024`KB |
| 是否关闭延迟发送         | 选择是否让系统内核立即或延迟发送 TCP socket。打开切换开关即关闭延迟发送，可以让系统内核立即发送，否则当需要发送的内容很少时，可能会有一定延迟（默认 40 毫秒）。 | `关闭`   |
| TCP Keepalive            | 此配置为连接器启用 TCP 保活机制，以维护持续连接的有效性，防止由长时间不活动导致的连接中断。该值应以逗号分隔的三个数字格式提供，格式为 `Idle, Interval, Probes`：<br />Idle：服务器发起保活探测前连接必须保持空闲的秒数。Linux 上的默认值是 7200 秒。<br />Interval：每个 TCP 保活探测之间的秒数。Linux 上的默认值是 75 秒。<br />Probes：在将连接视为关闭之前，发送的最大 TCP 保活探测次数（如果对端没有响应）。Linux 上的默认值是 9 次探测。<br />例如，如果您将值设置为 `240,30,5`，则意味着在 240 秒的空闲时间后将发送 TCP 保活探测，随后每 30 秒发送一次探测。如果连续 5 次探测尝试没有响应，连接将被标记为关闭。 | `none`   |

