Data Bridge Configuration
A data bridge connects EMQX Edge to other MQTT brokers. Unlike swarms, data bridges do not replicate topic trees or routing tables between nodes. Instead, data forwarding is rule-based and directional.
There are two main operations in a bridge setup:
- Forwarding: Messages that match configured topics are forwarded to a remote broker.
- Subscribing: The bridge subscribes to specific topics on the remote broker and relays incoming messages back to EMQX Edge.
MQTT over TCP Bridge
In EMQX Edge, the MQTT over TCP Bridge configuration is used to specify settings for the MQTT Bridge that uses TCP as its transport protocol. This allows EMQX Edge to communicate with remote MQTT servers and exchange MQTT messages with them.
Example Configuration
bridges.mqtt.emqx1 = {
server = "mqtt-tcp://127.0.0.1:1883" # MQTT server address
proto_ver = 4 # MQTT protocol version
clientid = "bridge_client" # Client ID for the bridge
keepalive = "60s" # Ping interval for the bridge
clean_start = false # Clean start flag for the bridge
username = "username" # Username for the bridge
password = "passwd" # Password for the bridge
will = { # Will properties
topic = "will_topic" # Will topic
qos = 1 # Will QoS
retain = false # Will payload retain flag
payload = "will_message" # Will payload
properties = { # Will properties
payload_format_indicator = 0
message_expiry_interval = 0
content_type = ""
response_topic = ""
correlation_data = ""
will_delay_interval = 0
user_property = {
key1 = "value1"
key2 = "value2"
}
}
}
ssl = { # SSL configuration
key_password = "yourpass" # SSL key password
keyfile = "/etc/certs/key.pem" # SSL keyfile
certfile = "/etc/certs/cert.pem" # SSL cert file
cacertfile = "/etc/certs/cacert.pem" # SSL CA cert file
}
forwards = [ # Topics that need to be forwarded to the remote MQTT server
{
remote_topic = "fwd/topic1"
local_topic = "topic1"
suffix = "/emqx"
},
{
remote_topic = "fwd/topic2"
local_topic = "topic2"
prefix = "emqx/"
}
]
subscription = [ # Topics that need to be subscribed from the remote MQTT server
{
remote_topic = "cmd/topic1"
local_topic = "topic3"
qos = 1
retain = 2 # flag to override retain flag
suffix = "/emqx"
},
{
remote_topic = "cmd/topic2"
local_topic = "topic4"
qos = 2
}
]
max_parallel_processes = 2 # Maximum number of parallel processes for handling outstanding requests
max_send_queue_len = 32 # Maximum number of message send queue length
max_recv_queue_len = 128 # Maximum number of message receive queue length
}
This configuration enables EMQX Edge to establish an MQTT over TCP bridge connection to a remote MQTT server, using will message and secure communication with SSL.
Configuration Items
General
bridges.mqtt.\<name>
: Specifies the name of the bridge.server
: Specifies the MQTT server address for the bridge. Example:mqtt-tcp://127.0.0.1:1883
for MQTT over TCP bridgetls+mqtt-tcp://127.0.0.1:8883
for MQTT over TCP bridge with SSL eneabledmqtt-quic://54.75.171.11:14567
for MQTT over QUIC bridge
proto_ver
: Specifies the MQTT protocol version to use. Options:5
for MQTT v54
for MQTT v3.1.13
for MQTT v3.1
clientid
: Specifies the unique MQTT client identifier that EMQX Edge uses when establishing a bridge connection to a remote MQTT broker.keepalive
: Specifies the ping interval for the bridge.clean_start
: Specifies the clean start flag for the bridge. Note: Some IoT platforms require this to be set tofalse
.username
: Specifies the username for the bridge.password
: Specifies the password for the bridge.will
: See Will Message.
SSL/TLS (ssl
)
Contains settings for SSL/TLS security:
key_password
: Specifies the password for the client's private key file, if it's password-protected.keyfile
: Specifies the path to the client's private key file.certfile
: Specifies the path to the client's certificate file.cacertfile
: Specifies the path to the server's root CA certificate file. This certificate is used to identify the AWS IoT server.
Forwarding (forwards
)
Defines an array of topics that need to be forwarded to the remote MQTT server, including
remote_topic
: Topics refection topic, will change the topic in publish msg. Just leaveremote_topic=""
to preserve the original topic in msglocal_topic
: Topics that need to be forwarded to the remote MQTT server.qos
: Overwrite the original QoS level of the Publish message, this is optional.suffix
: A suffix string will be added to the remote topic(add to the original topic if you leave remote_topic as null)prefix
: A prefix string will be added to the remote topic(add to the original topic if you leave remote_topic as null)
Subscriptions (subscription
)
Defines an array of topic objects that need to be subscribed from the remote MQTT server. Each object defines a topic and the QoS level for the subscription.
Note
Only the first matching rule is applied if multiple overlapping subscription rules exist.
Each subscription rule includes:
remote_topic
: The topic filter used to subscribe to the remote broker.local_topic
: This is for Topic reflection, if you want the vanila way, then just leavelocal_topic=""
to preserve the original topic in msg from remote broker.qos
: Define the QoS in the subscribe packet. This is a must.retain
: A flag to override retain flag.retain_as_published
: An optional item for the MQTTv5 feature, Retain As Published.retain_handling
: An optional item for MQTTv5 feature, Retain Handling.suffix
: A suffix string will be added to the local topic (add to the original topic if you leave local_topic as null).prefix
: A prefix string will be added to the local topic (add to the original topic if you leave local_topic as null).
Performance Tuning
max_parallel_processes
: Specifies the maximum number of parallel processes for handling outstanding requests.max_send_queue_len
: Specifies the maximum number of messages that can be queued for sending. Since 0.23.1, It also takes effect while TCP connection is explicitly closed, which gurantees no msg lost till cache queue is full.max_recv_queue_len
: Specifies the maximum number of messages that can be queued for processing.
MQTT 5
If MQTT v5 is used (proto_ver = 5
), the following configuration items are also supported:
Connection related:
Configuration Item | Description | Value Range |
---|---|---|
conn_property.maximum_packet_size | Specifies the maximum packet size for the MQTT connection | 1 - 4294967295 |
conn_properties.receive_maximum | Limits the number of QoS 1 and QoS 2 publications that the client can process concurrently. This only applies to the current network connection. If left unconfigured, it will use the default value: 65535. | 1 - 65535 |
conn_properties.topic_alias_maximum | Specifies the highest value that the client will accept as a Topic Alias sent by the server. Used to limit the number of Topic Aliases that the client is willing to hold on this connection | 0 - 65535 |
conn_properties.request_problem_information | Specifies if the server should send additional diagnostic information (i.e., a Reason String or User Properties) in case of failures: - If set to 0, the server is allowed to include additional diagnostic information only when responding with PUBLISH, CONNACK or DISCONNECT packets. For all other packet types, the server must not include this information. If the server violates this rule, the client will disconnect and report a Protocol Error. - If set to 1, the server has the discretion to include additional diagnostic information in any type of packet where it's allowed. | 0 or 1 |
conn_properties.request_response_information | Specifies whether to request the server to return Response Information in the CONNACK. - If set to 0, the server must not return Response Information. - If set to 1, the server may return Response Information in the CONNACK packet | 0 or 1 |
conn_properties.session_expiry_interval | Specifies the session expiry interval. - If set to 0, the session ends when the network connection is closed. - If set to 4294967295 (UINT_MAX), the session will never expire | 0 - 4294967295 |
conn_properties.user_property | A map of key-value pairs. Allowed to appear multiple times to represent multiple name-value pairs. The same name is allowed to appear more than once. | Map[key(String) - value(String)] |
Subscription Related
Configuration Item | Description | Value Range |
---|---|---|
sub_properties.identifier | Subscription Identifier | 1 ~ 268,435,455 |
sub_properties.user_property | User Property | Map[key(String) - value(String)]* |
Will Message
The example configuration above includes the use of an MQTT Will Message, a feature that allows a broker to notify other clients when a connection is lost unexpectedly.
In MQTT, a Will Message is specified during the initial connection from a client to a broker. If the client disconnects ungracefully (e.g., due to network failure or crash), the broker automatically publishes the Will Message to the designated topic. This is useful for alerting other clients that a device or client has gone offline.
Below are detailed descriptions of the configuration fields related to Will Messages:
will.topic
: Specifies the topic on which the Will Message should be published.will.payload
: Specifies the payload of the Will Message. This is typically a message that informs others about the disconnection.will.qos
: Specifies the QoS level for the Will Message. It can be 0 (At most once), 1 (At least once), or 2 (Exactly once).will.retain
: Specifies whether the Will Message should be retained by the broker or not. If set to true, the Will Message is stored on the broker and is sent to any future subscribers of the topic.will.properties
:payload_format_indicator
: Specifies the format of the Will Message's payload. It can take the values 0 or 1. A value of 0 indicates an unspecified byte stream, and 1 indicates a UTF-8 encoded string.message_expiry_interval
: Specifies the period of time (in seconds) that the broker should hold the Will Message. If left unconfigured, the message will never expire.content_type
: Specifies the content type of the Will Message's payload, allowing to interpret the data contained in the payload.response_topic
: Specifies the topic for the response to the Will Message. Other clients can use this topic to send a response to the Will Message.correlation_data
: Specifies binary data that is used for correlating the response with the Will Message.will_delay_interval
: Specifies the delay between the ungraceful disconnection of the client and the moment when the broker publishes the Will Message. It's expressed in seconds. Note: the default value 0 indicates there is no delay before the Will Message is published.user_property
: Specifies a set of user-defined key-value pairs. This can be used for sending additional custom data in the format ofkey1 = value1
.
MQTT Bridges Cache
EMQX Edge allows the configuration of multiple MQTT data bridges using nanomq.conf
file. Each bridge instance can be uniquely identified by its name. In addition to individual bridge settings, EMQX Edge also provides a shared cache component, which can be configured once and used by multiple bridge instances.
This cache feature is especially useful for storing QoS messages temporarily when the bridge is under high load or if the remote broker is temporarily unavailable.
Example Configuration
## First bridge client
bridges.mqtt.emqx1 {
......
resend_interval = 5000 # Resend interval (ms)
resend_wait = 3000
cancel_timeout = 10000
}
## Second bridge client
bridges.mqtt.emqx2 {
......
resend_interval = 5000 # Resend interval (ms)
resend_wait = 3000
cancel_timeout = 10000
}
bridges.mqtt.cache {
disk_cache_size = 102400 # Max message limitation for caching
mounted_file_path="/tmp/" # Mounted file path
flush_mem_threshold = 100 # The threshold of flushing messages to flash
}
Configuration Items
disk_cache_size
: Specifies the maximum number of messages that can be cached in the MQTT bridges. A value of 0 indicates that the cache for messages is inefficient.mounted_file_path
: Specifies the file path where the cache file for the MQTT bridges is mounted.flush_mem_threshold
: Specifies the threshold for flushing messages to the cache file. When the number of messages reaches this threshold, they will be flushed to the cache file.resend_interval
: Specifies the interval, in milliseconds, for resending the messages interval. Only takes effect in bridging. This is a timer per bridging connection, also in charge of sending PINGREQ, resending msg cached in SQLite and healthy checking. Please set it with cautious.- Default: 5000 ms.
resend_wait
: Specifies the wait time, in milliseconds, for start resending this messages after certain period aftet it was published. Only takes effect in bridging.- Default: 3000 ms.
cancel_timeout
: Specifies the max wait time before canceling QoS ACK action, in milliseconds. Only takes effect in bridging. Once the action is canceled, there is no more retrying of this msg. So, you can call it max retrying time window.- Default: 8000 ms.
Note on QoS Message Handling:
Cancelling a QoS message does not mean the message is lost. It simply means the client has stopped waiting for the ACK from the remote broker.
In versions before 0.22.4, no such mechanism existed. If a bridge encountered a busy I/O thread (aio
), you would see messages like: bridging to xxxxx aio busy! msg lost! Ctx: xx
This message would stay stuck indefinitely if the remote broker never responded.
To prevent this, cancel_timeout
was introduced as a safety window to avoid indefinite blocking. It defines how long a QoS message should wait for acknowledgment before being discarded from the retry queue.
Guaranteed QoS Message Retry (Example Tuning)
If you want a guaranteed retry logic for the QoS message in bridging. You can refer to the following configurations:
## First bridge client
bridges.mqtt.emqx1 {
......
keepalive = 30s # Taking 30s keepalive as context
max_send_queue_len = 512 # Larger inflight window for message buffering
max_recv_queue_len = 512
resend_interval = 5000 # Retry every 5 seconds
resend_wait = 3000 # Wait 3 seconds before first retry
cancel_timeout = 10000 # Retry window: 10 seconds max
}
Tuning Guidance
QoS messages first enter the in-flight window (
max_send_queue_len
). If the network is congested, they will wait silently until space is available.Messages are only dropped when the in-flight window is full and new QoS messages continue arriving.
The actual timeout for each QoS message is:
cancel_timeout - resend_wait
. Divide this byresend_interval
to estimate how many retries are possible.For example:
(10000 - 3000) / 5000 ≈ 1.4
means at least 1 retry.
Once the timeout is reached, the message is removed from the QoS awaiting queue (not the in-flight window), and no further resend attempts will be made.
EMQX Edge uses SQLite to deliver the cache feature. For details on the configuration, see Cache