Backends
MQTT Message Persistence
One-to-one message Persistence
- PUB publishes a message;
- Backend records this message in DB;
- SUB subscribes to a topic;
- Backend retrieves the messages of this topic from DB;
- Messages are sent to SUB;
- Once the SUB acknowledged / received the message, backend removes the message from DB.
Many-to-many message Persistence
- PUB publishes a message;
- Backend records the message in DB;
- SUB1 and SUB2 subscribe to a topic;
- Backend retrieves the messages of this topic;
- Messages are sent to SUB1 and SUB2;
- Backend records the read position of SUB1 and SUB2, the next message’s retrieval starts from this position.
Client Connection State
EMQ X supports retaining the client's connection state in Redis or DB.
Client Subscription by Broker
EMQ X Persistence supports subscription by broker. When a client goes online, the persistence module loads the subscriptions of the client from Redis or Databases.
List of Persistence Plugins
EMQ X allowes storing messages in Redis, MySQL, PostgreSQL, MongoDB, Cassandra, DynamoDB, InfluxDB, OpenTSDB and Timescale:
Persistence Plugins | Config File | Description |
---|---|---|
emqx_backend_redis | emqx_backend_redis.conf | Redis Message Persistence |
emqx_backend_mysql | emqx_backend_mysql.conf | MySQL Message Persistence |
emqx_backend_pgsql | emqx_backend_pgsql.conf | PostgreSQL Message Persistence |
emqx_backend_mongo | emqx_backend_mongo.conf | MongoDB Message Persistence |
emqx_backend_cassa | emqx_backend_cassa.conf | Cassandra Message Persistence |
emqx_backend_dynamo | emqx_backend_dynamo.conf | DynamoDB Message Persistence |
emqx_backend_influxdb | emqx_backend_influxdb.conf | InfluxDB Message Persistence |
emqx_backend_opentsdb | emqx_backend_opentsdb.conf | OpenTSDB Message Persistence |
emqx_backend_timescale | emqx_backend_timescale.conf | Timescale Message Persistence |
Redis Backend
Config file: emqx_backend_redis.conf
Configure the Redis Server
Config Connection Pool of Multiple Redis Servers:
## Redis Server
backend.redis.pool1.server = 127.0.0.1:6379
## Redis Sentinel
## backend.redis.server = 127.0.0.1:26378
##Redis Sentinel Cluster name
## backend.redis.sentinel = mymaster
## Redis Pool Size
backend.redis.pool1.pool_size = 8
## Redis database
backend.redis.pool1.database = 1
## Redis subscribe channel
backend.redis.pool1.channel = mqtt_channel
Configure Persistence Hooks
## Expired after seconds, if =< 0 take the default value
backend.redis.msg.expired_after = 3600
## Client Connected Record
backend.redis.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message for one QOS > 0
backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
## Lookup Unread Message for many QOS > 0
backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
## Lookup Retain Message
backend.redis.hook.session.subscribed.3 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store Retain Message
backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack for one
backend.redis.hook.message.acked.1 = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
## Store Ack for many
backend.redis.hook.message.acked.2 = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}
Description of Persistence Hooks
hook | topic | action/function | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribe to topics | |
client.disconnected | on_client_disconnected | Store the client disconnected state | |
session.subscribed | queue/# | on_message_fetch_for_queue | Fetch one to one offline message |
session.subscribed | pubsub/# | on_message_fetch_for_pubsub | Fetch one to many offline message |
session.subscribed | # | on_retain_lookup | Lookup retained message |
message.publish | # | on_message_publish | Store the published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | queue/# | on_message_acked_for_queue | Process ACK of one to one messages |
message.acked | pubsub/# | on_message_acked_for_pubsub | Process ACK of one to many messages |
Redis Command Line Parameters
hook | Parameter | Example (Fields separated exactly by one space) |
---|---|---|
client.connected | clientid | SET conn:${clientid} clientid |
client.disconnected | clientid | SET disconn:${clientid} clientid |
session.subscribed | clientid, topic, qos | HSET sub:${clientid} topic qos |
session.unsubscribed | clientid, topic | SET unsub:${clientid} topic |
message.publish | message, msgid, topic, payload, qos, clientid | RPUSH pub:${topic} msgid |
message.acked | msgid, topic, clientid | HSET ack:${clientid} topic msgid |
message.delivered | msgid, topic, clientid | HSET delivered:${clientid} topic msgid |
Configure 'action' with Redis Commands
Redis backend supports raw 'commands' in 'action', e.g.:
## After a client connected to the EMQ X server, it executes a redis command (multiple redis commands also supported)
backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} clientid"]}, "pool": "pool1"}
Using Redis Hash for Devices' Connection State
mqtt:client Hash for devices' connection state:
hmset
key = mqtt:client:${clientid}
value = {state:int, online_at:timestamp, offline_at:timestamp}
hset
key = mqtt:node:${node}
field = ${clientid}
value = ${ts}
Lookup devices' connection state:
HGETALL "mqtt:client:${clientId}"
E.g.: Client with ClientId 'test' goes online:
HGETALL mqtt:client:test
1) "state"
2) "1"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "undefined"
Client with ClientId 'test' goes offline:
HGETALL mqtt:client:test
1) "state"
2) "0"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "1481685924"
Using Redis Hash for Retained Messages
mqtt:retain Hash for retained messages:
hmset
key = mqtt:retain:${topic}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
Lookup retained message:
HGETALL "mqtt:retain:${topic}"
Lookup retained messages with a topic of 'retain':
HGETALL mqtt:retain:topic
1) "id"
> - 2) "6P9NLcJ65VXBbC22sYb4"
>
> 3) "from"
>
> - 4) "test"
>
> 5) "qos"
> 6) "1"
> 7) "topic"
> 8) "topic"
> 9) "retain"
>
> - 10\) "true"
>
> 11) "payload"
> 12) "Hello world\!"
> 13) "ts"
> 14) "1481690659"
Using Redis Hash for messages
mqtt:msg Hash for MQTT messages:
hmset
key = mqtt:msg:${msgid}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
zadd
key = mqtt:msg:${topic}
field = 1
value = ${msgid}
Using Redis Set for Message Acknowledgements
mqtt:acked SET stores acknowledgements from the clients:
set
key = mqtt:acked:${clientid}:${topic}
value = ${msgid}
Using Redis Hash for Subscription
mqtt:sub Hash for Subscriptions:
hset
key = mqtt:sub:${clientid}
field = ${topic}
value = ${qos}
A client subscribes to a topic:
HSET mqtt:sub:${clientid} ${topic} ${qos}
A client with ClientId of 'test' subscribes to topic1 and topic2:
HSET "mqtt:sub:test" "topic1" 1
HSET "mqtt:sub:test" "topic2" 2
Lookup the subscribed topics of client with ClientId of 'test':
HGETALL mqtt:sub:test
1) "topic1"
2) "1"
3) "topic2"
4) "2"
Redis SUB/UNSUB Publish
When a device subscribes / unsubscribes to topics, EMQ X broker publish an event to the Redis:
PUBLISH
channel = "mqtt_channel"
message = {type: string , topic: string, clientid: string, qos: int}
\*type: [subscribe/unsubscribe]
client with ClientID 'test' subscribe to 'topic0':
PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"
Client with ClientId 'test' unsubscribes to 'test_topic0':
PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"
Enable Redis Backend
./bin/emqx_ctl plugins load emqx_backend_redis
MySQL Backend
Config file: emqx_backend_mysql.conf
Configure MySQL Server
Connection pool of multiple MySQL servers is supported:
## Mysql Server
backend.mysql.pool1.server = 127.0.0.1:3306
## Mysql Pool Size
backend.mysql.pool1.pool_size = 8
## Mysql Username
backend.mysql.pool1.user = root
## Mysql Password
backend.mysql.pool1.password = public
## Mysql Database
backend.mysql.pool1.database = mqtt
## Max number of fetch offline messages. Without count limit if infinity
## backend.mysql.max_returned_count = 500
## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.mysql.time_range = 2h
Configure MySQL Persistence Hooks
## Client Connected Record
backend.mysql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.mysql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.mysql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.mysql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.mysql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store Retain Message
backend.mysql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.mysql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.mysql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
### "offline_opts": Get configuration for offline messages
### max_returned_count: Maximum number of offline messages get at a time
### time_range: Get only messages in the current time range
## backend.mysql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.mysql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Description of MySQL Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
SQL Parameters Description
hook | Parameters | Example (${name} represents available parameter) |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
Configure 'action' with SQL
MySQL backend supports SQL in 'action':
## After a client is connected to the EMQ X server, it executes a SQL command (multiple SQL commands also supported)
backend.mysql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
Create MySQL DB
create database mqtt;
Import MySQL DB & Table Schema
mysql -u root -p mqtt < etc/sql/emqx_backend_mysql.sql
TIP
DB name is free of choice
MySQL Client Connection Table
mqtt_client stores client connection states:
DROP TABLE IF EXISTS `mqtt_client`;
CREATE TABLE `mqtt_client` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(64) DEFAULT NULL,
`state` varchar(3) DEFAULT NULL,
`node` varchar(100) DEFAULT NULL,
`online_at` datetime DEFAULT NULL,
`offline_at` datetime DEFAULT NULL,
`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `mqtt_client_idx` (`clientid`),
UNIQUE KEY `mqtt_client_key` (`clientid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query the client connection state:
select * from mqtt_client where clientid = ${clientid};
If client 'test' is online:
select * from mqtt_client where clientid = "test";
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| id | clientid | state | node | online_at | offline_at | created |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| 1 | test | 1 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL | 2016-12-24 09:40:22 |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
1 rows in set (0.00 sec)
If client 'test' is offline:
select * from mqtt_client where clientid = "test";
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| id | clientid | state | node | online_at | offline_at | created |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| 1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22 |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
1 rows in set (0.00 sec)
MySQL Subscription Table
mqtt_sub table stores MQTT subscriptions of clients:
DROP TABLE IF EXISTS `mqtt_sub`;
CREATE TABLE `mqtt_sub` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(64) DEFAULT NULL,
`topic` varchar(255) DEFAULT NULL,
`qos` int(3) DEFAULT NULL,
`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `mqtt_sub_idx` (`clientid`,`topic`(255),`qos`),
UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
E.g., client 'test' subscribes to 'test_topic1' and 'test_topic2':
insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic1", 1);
insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic2", 2);
Query subscription of a client:
select * from mqtt_sub where clientid = ${clientid};
E.g., query the Subscription of client 'test':
select * from mqtt_sub where clientid = "test";
+----+--------------+-------------+------+---------------------+
| id | clientId | topic | qos | created |
+----+--------------+-------------+------+---------------------+
| 1 | test | test_topic1 | 1 | 2016-12-24 17:09:05 |
| 2 | test | test_topic2 | 2 | 2016-12-24 17:12:51 |
+----+--------------+-------------+------+---------------------+
2 rows in set (0.00 sec)
MySQL Message Table
mqtt_msg stores MQTT messages:
DROP TABLE IF EXISTS `mqtt_msg`;
CREATE TABLE `mqtt_msg` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`msgid` varchar(100) DEFAULT NULL,
`topic` varchar(1024) NOT NULL,
`sender` varchar(1024) DEFAULT NULL,
`node` varchar(60) DEFAULT NULL,
`qos` int(11) NOT NULL DEFAULT '0',
`retain` tinyint(2) DEFAULT NULL,
`payload` blob,
`arrived` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query messages published by a client:
select * from mqtt_msg where sender = ${clientid};
Query messages published by client 'test':
select * from mqtt_msg where sender = "test";
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
| id | msgid | topic | sender | node | qos | retain | payload | arrived |
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
| 1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12 |
| 2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45 |
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
2 rows in set (0.00 sec)
MySQL Retained Message Table
mqtt_retain stores retained messages:
DROP TABLE IF EXISTS `mqtt_retain`;
CREATE TABLE `mqtt_retain` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`topic` varchar(200) DEFAULT NULL,
`msgid` varchar(60) DEFAULT NULL,
`sender` varchar(100) DEFAULT NULL,
`node` varchar(100) DEFAULT NULL,
`qos` int(2) DEFAULT NULL,
`payload` blob,
`arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_retain_key` (`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query retained messages:
select * from mqtt_retain where topic = ${topic};
Query retained messages with topic 'retain':
select * from mqtt_retain where topic = "retain";
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
| id | topic | msgid | sender | node | qos | payload | arrived |
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
| 1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18 |
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
> 1 rows in set (0.00 sec)
MySQL Acknowledgement Table
mqtt_acked stores acknowledgements from the clients:
DROP TABLE IF EXISTS `mqtt_acked`;
CREATE TABLE `mqtt_acked` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(200) DEFAULT NULL,
`topic` varchar(200) DEFAULT NULL,
`mid` int(200) DEFAULT NULL,
`created` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_acked_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Enable MySQL Backend
./bin/emqx_ctl plugins load emqx_backend_mysql
PostgreSQL Backend
Config file: emqx_backend_pgsql.conf
Configure PostgreSQL Server
Connection pool of multiple PostgreSQL servers is supported:
## Pgsql Server
backend.pgsql.pool1.server = 127.0.0.1:5432
## Pgsql Pool Size
backend.pgsql.pool1.pool_size = 8
## Pgsql Username
backend.pgsql.pool1.username = root
## Pgsql Password
backend.pgsql.pool1.password = public
## Pgsql Database
backend.pgsql.pool1.database = mqtt
## Pgsql Ssl
backend.pgsql.pool1.ssl = false
## Max number of fetch offline messages. Without count limit if infinity
## backend.pgsql.max_returned_count = 500
## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.pgsql.time_range = 2h
Configure PostgreSQL Persistence Hooks
## Client Connected Record
backend.pgsql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.pgsql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.pgsql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.pgsql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store Retain Message
backend.pgsql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.pgsql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.pgsql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
### "offline_opts": Get configuration for offline messages
### max_returned_count: Maximum number of offline messages get at a time
### time_range: Get only messages in the current time range
## backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.pgsql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Description of PostgreSQL Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
SQL Parameters Description
hook | Parameters | Example (${name} represents available parameter) |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
Configure 'action' with SQL
PostgreSQL backend supports SQL in 'action':
## After a client is connected to the EMQ X server, it executes a SQL command (multiple command also supported)
backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
Create PostgreSQL DB
createdb mqtt -E UTF8 -e
Import PostgreSQL DB & Table Schema
\i etc/sql/emqx_backend_pgsql.sql
TIP
DB name is free of choice
PostgreSQL Client Connection Table
mqtt_client stores client connection states:
CREATE TABLE mqtt_client(
id SERIAL primary key,
clientid character varying(100),
state integer,
node character varying(100),
online_at timestamp,
offline_at timestamp,
created timestamp without time zone,
UNIQUE (clientid)
);
Query a client's connection state:
select * from mqtt_client where clientid = ${clientid};
E.g., if client 'test' is online:
select * from mqtt_client where clientid = 'test';
id | clientid | state | node | online_at | offline_at | created
----+----------+-------+----------------+---------------------+---------------------+---------------------
1 | test | 1 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL | 2016-12-24 09:40:22
(1 rows)
Client 'test' is offline:
select * from mqtt_client where clientid = 'test';
id | clientid | state | nod | online_at | offline_at | created
----+----------+-------+----------------+---------------------+---------------------+---------------------
1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22
(1 rows)
PostgreSQL Subscription Table
mqtt_sub stores subscriptions of clients:
CREATE TABLE mqtt_sub(
id SERIAL primary key,
clientid character varying(100),
topic character varying(200),
qos integer,
created timestamp without time zone,
UNIQUE (clientid, topic)
);
E.g., client 'test' subscribes to topic 'test_topic1' and 'test_topic2':
insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic1', 1);
insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic2', 2);
Query subscription of a client:
select * from mqtt_sub where clientid = ${clientid};
Query subscription of client 'test':
select * from mqtt_sub where clientid = 'test';
id | clientId | topic | qos | created
----+--------------+-------------+------+---------------------
1 | test | test_topic1 | 1 | 2016-12-24 17:09:05
2 | test | test_topic2 | 2 | 2016-12-24 17:12:51
(2 rows)
PostgreSQL Message Table
mqtt_msg stores MQTT messages:
CREATE TABLE mqtt_msg (
id SERIAL primary key,
msgid character varying(60),
sender character varying(100),
topic character varying(200),
qos integer,
retain integer,
payload text,
arrived timestamp without time zone
);
Query messages published by a client:
select * from mqtt_msg where sender = ${clientid};
Query messages published by client 'test':
select * from mqtt_msg where sender = 'test';
id | msgid | topic | sender | node | qos | retain | payload | arrived
----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------
1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12
2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45
(2 rows)
PostgreSQL Retained Message Table
mqtt_retain stores retained messages:
CREATE TABLE mqtt_retain(
id SERIAL primary key,
topic character varying(200),
msgid character varying(60),
sender character varying(100),
qos integer,
payload text,
arrived timestamp without time zone,
UNIQUE (topic)
);
Query retained messages:
select * from mqtt_retain where topic = ${topic};
Query retained messages with topic 'retain':
select * from mqtt_retain where topic = 'retain';
id | topic | msgid | sender | node | qos | payload | arrived
----+----------+-------------------------------+---------+------+------+---------+---------------------
1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18
(1 rows)
PostgreSQL Acknowledgement Table
mqtt_acked stores acknowledgements from the clients:
CREATE TABLE mqtt_acked (
id SERIAL primary key,
clientid character varying(100),
topic character varying(100),
mid integer,
created timestamp without time zone,
UNIQUE (clientid, topic)
);
Enable PostgreSQL Backend
./bin/emqx_ctl plugins load emqx_backend_pgsql
MongoDB Backend
Config file: emqx_backend_mongo.conf
Configure MongoDB Server
Connection pool of multiple MongoDB servers is supported:
## MongoDB Server Pools
## Mongo Topology Type single|unknown|sharded|rs
backend.mongo.pool1.type = single
## If type rs, need config setname
## backend.mongo.pool1.rs_set_name = testrs
## Mongo Server 127.0.0.1:27017,127.0.0.2:27017...
backend.mongo.pool1.server = 127.0.0.1:27017
## MongoDB Pool Size
backend.mongo.pool1.c_pool_size = 8
## MongoDB Database
backend.mongo.pool1.database = mqtt
## Mongo User
## backend.mongo.pool1.login = emqtt
## Mongo Password
## backend.mongo.pool1.password = emqtt
## MongoDB AuthSource
## Value: String
## Default: mqtt
## backend.mongo.pool1.auth_source = admin
## Whether to enable SSL connection.
##
## Value: true | false
## backend.mongo.pool1.ssl = false
## SSL keyfile.
##
## Value: File
## backend.mongo.pool1.keyfile =
## SSL certfile.
##
## Value: File
## backend.mongo.pool1.certfile =
## SSL cacertfile.
##
## Value: File
## backend.mongo.pool1.cacertfile =
# Value: unsafe | safe
## backend.mongo.pool1.w_mode = safe
## Value: master | slave_ok
## backend.mongo.pool1.r_mode = slave_ok
## Mongo Topology Options
## backend.mongo.topology.pool_size = 1
## backend.mongo.topology.max_overflow = 0
## backend.mongo.topology.overflow_ttl = 1000
## backend.mongo.topology.overflow_check_period = 1000
## backend.mongo.topology.local_threshold_ms = 1000
## backend.mongo.topology.connect_timeout_ms = 20000
## backend.mongo.topology.socket_timeout_ms = 100
## backend.mongo.topology.server_selection_timeout_ms = 30000
## backend.mongo.topology.wait_queue_timeout_ms = 1000
## backend.mongo.topology.heartbeat_frequency_ms = 10000
## backend.mongo.topology.min_heartbeat_frequency_ms = 1000
## Max number of fetch offline messages. Without count limit if infinity
## backend.mongo.max_returned_count = 500
## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.mongo.time_range = 2h
Configure MongoDB Persistence Hooks
## Client Connected Record
backend.mongo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.mongo.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.mongo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.mongo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0, payload_format options mongo_json | plain_text
backend.mongo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1", "payload_format": "mongo_json"}
## Store Retain Message, payload_format options mongo_json | plain_text
backend.mongo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1", "payload_format": "mongo_json"}
## Delete Retain Message
backend.mongo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.mongo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
### "offline_opts": Get configuration for offline messages
### max_returned_count: Maximum number of offline messages get at a time
### time_range: Get only messages in the current time range
## backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.mongo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Description of MongoDB Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
Create MongoDB DB & Collections
use mqtt
db.createCollection("mqtt_client")
db.createCollection("mqtt_sub")
db.createCollection("mqtt_msg")
db.createCollection("mqtt_retain")
db.createCollection("mqtt_acked")
db.mqtt_client.ensureIndex({clientid:1, node:2})
db.mqtt_sub.ensureIndex({clientid:1})
db.mqtt_msg.ensureIndex({sender:1, topic:2})
db.mqtt_retain.ensureIndex({topic:1})
TIP
DB name is free of choice
MongoDB MQTT Client Collection
mqtt_client stores MQTT clients' connection states:
{
clientid: string,
state: 0,1, //0 disconnected 1 connected
node: string,
online_at: timestamp,
offline_at: timestamp
}
Query client's connection state:
db.mqtt_client.findOne({clientid: ${clientid}})
E.g., if client 'test' is online:
db.mqtt_client.findOne({clientid: "test"})
{
"_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
"clientid" : "test",
"state" : 1,
"node" : "emqx@127.0.0.1",
"online_at" : 1482976411,
"offline_at" : null
}
Client 'test' is offline:
db.mqtt_client.findOne({clientid: "test"})
{
"_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
"clientid" : "test",
"state" : 0,
"node" : "emq@127.0.0.1",
"online_at" : 1482976411,
"offline_at" : 1482976501
}
MongoDB Subscription Collection
mqtt_sub stores subscriptions of clients:
{
clientid: string,
topic: string,
qos: 0,1,2
}
E.g., client 'test' subscribes to topic 'test_topic1' and 'test_topic2':
db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
db.mqtt_sub.insert({clientid: "test", topic: "test_topic2", qos: 2})
Query subscription of client 'test':
db.mqtt_sub.find({clientid: "test"})
{ "_id" : ObjectId("58646d90c65dff6ac9668ca1"), "clientid" : "test", "topic" : "test_topic1", "qos" : 1 }
{ "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }
MongoDB Message Collection
mqtt_msg stores MQTT messages:
{
_id: int,
topic: string,
msgid: string,
sender: string,
qos: 0,1,2,
retain: boolean (true, false),
payload: string,
arrived: timestamp
}
Query messages published by a client:
db.mqtt_msg.find({sender: ${clientid}})
Query messages published by client 'test':
db.mqtt_msg.find({sender: "test"})
{
"_id" : 1,
"topic" : "/World",
"msgid" : "AAVEwm0la4RufgAABeIAAQ==",
"sender" : "test",
"qos" : 1,
"retain" : 1,
"payload" : "Hello world!",
"arrived" : 1482976729
}
MongoDB Retained Message Collection
mqtt_retain stores retained messages:
{
topic: string,
msgid: string,
sender: string,
qos: 0,1,2,
payload: string,
arrived: timestamp
}
Query retained messages:
db.mqtt_retain.findOne({topic: ${topic}})
Query retained messages with topic 'retain':
db.mqtt_retain.findOne({topic: "/World"})
{
"_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
"topic" : "/World",
"msgid" : "AAVEwm0la4RufgAABeIAAQ==",
"sender" : "c1",
"qos" : 1,
"payload" : "Hello world!",
"arrived" : 1482976729
}
MongoDB Acknowledgement Collection
mqtt_acked stores acknowledgements from the clients:
{
clientid: string,
topic: string,
mongo_id: int
}
Enable MongoDB Backend
./bin/emqx_ctl plugins load emqx_backend_mongo
Cassandra Backend
Config file: etc/plugins/emqx_backend_cassa.conf
Configure Cassandra Cluster
Multi node Cassandra cluster is supported:
## Cassandra Node
backend.ecql.pool1.nodes = 127.0.0.1:9042
## Cassandra Pool Size
backend.ecql.pool1.size = 8
## Cassandra auto reconnect flag
backend.ecql.pool1.auto_reconnect = 1
## Cassandra Username
backend.ecql.pool1.username = cassandra
## Cassandra Password
backend.ecql.pool1.password = cassandra
## Cassandra Keyspace
backend.ecql.pool1.keyspace = mqtt
## Cassandra Logger type
backend.ecql.pool1.logger = info
## Max number of fetch offline messages. Without count limit if infinity
## backend.cassa.max_returned_count = 500
## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.cassa.time_range = 2h
Configure Cassandra Persistence Hooks
## Client Connected Record
backend.cassa.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.cassa.hook.client.connected.2 = {"action": {"function": "on_subscription_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.cassa.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.cassa.hook.session.subscribed.2 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.cassa.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Delete Acked Record
backend.cassa.hook.session.unsubscribed.1= {"topic": "#", action": {"cql": ["delete from acked where client_id = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
## Store Retain Message
backend.cassa.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.cassa.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.cassa.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
### "offline_opts": Get configuration for offline messages
### max_returned_count: Maximum number of offline messages get at a time
### time_range: Get only messages in the current time range
## backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.cassa.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Description of Cassandra Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
CQL Parameters Description
Customized CQL command parameters includes:
hook | Parameter | Example (${name} in CQL represents available parameter |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
Configure 'action' with CQL
Cassandra backend supports CLQ in 'action':
## After a client is connected to the EMQ X server, it executes a CQL command(multiple command also supported):
backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
Initializing Cassandra
Create KeySpace:
CREATE KEYSPACE mqtt WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USR mqtt;
Import Cassandra tables:
cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"
TIP
KeySpace is free of choice
Cassandra Client Connection Table
mqtt.client stores client connection states:
CREATE TABLE mqtt.client (
client_id text,
node text,
state int,
connected timestamp,
disconnected timestamp,
PRIMARY KEY(client_id)
);
Query a client's connection state:
select * from mqtt.client where clientid = ${clientid};
If client 'test' is online:
select * from mqtt.client where clientid = 'test';
client_id | connected | disconnected | node | state
-----------+---------------------------------+---------------+---------------+-------
test | 2017-02-14 08:27:29.872000+0000 | null | emqx@127.0.0.1| 1
Client 'test' is offline:
select * from mqtt.client where clientid = 'test';
client_id | connected | disconnected | node | state
-----------+---------------------------------+---------------------------------+---------------+-------
test | 2017-02-14 08:27:29.872000+0000 | 2017-02-14 08:27:35.872000+0000 | emqx@127.0.0.1| 0
Cassandra Subscription Table
mqtt.sub stores subscriptions of clients:
CREATE TABLE mqtt.sub (
client_id text,
topic text,
qos int,
PRIMARY KEY(client_id, topic)
);
Client 'test' subscribes to topic 'test_topic1' and 'test_topic2':
insert into mqtt.sub(client_id, topic, qos) values('test', 'test_topic1', 1);
insert into mqtt.sub(client_id, topic, qos) values('test', 'test_topic2', 2);
Query subscriptions of a client:
select * from mqtt_sub where clientid = ${clientid};
Query subscriptions of client 'test':
select * from mqtt_sub where clientid = 'test';
client_id | topic | qos
-----------+-------------+-----
test | test_topic1 | 1
test | test_topic2 | 2
Cassandra Message Table
mqtt.msg stores MQTT messages:
CREATE TABLE mqtt.msg (
topic text,
msgid text,
sender text,
qos int,
retain int,
payload text,
arrived timestamp,
PRIMARY KEY(topic, msgid)
) WITH CLUSTERING ORDER BY (msgid DESC);
Query messages published by a client:
select * from mqtt_msg where sender = ${clientid};
Query messages published by client 'test':
select * from mqtt_msg where sender = 'test';
topic | msgid | arrived | payload | qos | retain | sender
-------+----------------------+---------------------------------+--------------+-----+--------+--------
hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
Cassandra Retained Message Table
mqtt.retain stores retained messages:
CREATE TABLE mqtt.retain (
topic text,
msgid text,
PRIMARY KEY(topic)
);
Query retained messages:
select * from mqtt_retain where topic = ${topic};
Query retained messages with topic 'retain':
select * from mqtt_retain where topic = 'retain';
topic | msgid
--------+----------------------
retain | 2PguFrHsrzEvIIBdctmb
Cassandra Acknowledgement Table
mqtt.acked stores acknowledgements from the clients:
CREATE TABLE mqtt.acked (
client_id text,
topic text,
msgid text,
PRIMARY KEY(client_id, topic)
);
Enable Cassandra Backend
./bin/emqx_ctl plugins load emqx_backend_cassa
DynamoDB Backend
Configure DynamoDB Cluster
Config file: etc/plugins/emqx_backend_dynamo.conf
## DynamoDB Region
backend.dynamo.region = us-west-2
## DynamoDB Server
backend.dynamo.pool1.server = http://localhost:8000
## DynamoDB Pool Size
backend.dynamo.pool1.pool_size = 8
## AWS ACCESS KEY ID
backend.dynamo.pool1.aws_access_key_id = AKIAU5IM2XOC7AQWG7HK
## AWS SECRET ACCESS KEY
backend.dynamo.pool1.aws_secret_access_key = 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
## DynamoDB Backend Hooks
backend.dynamo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
backend.dynamo.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
backend.dynamo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
backend.dynamo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
backend.dynamo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
backend.dynamo.hook.session.unsubscribed.1= {"topic": "#", "action": {"function": "on_acked_delete"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
backend.dynamo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
# backend.dynamo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Description of DynamoDB Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch_for_queue | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked_for_queue | Process ACK |
Create DynamoDB DB
./test/dynamo_test.sh
TIP
DB name is free of choice
DynamoDB Client Connection Table
mqtt_client stores client connection states:
{
"TableName": "mqtt_client",
"KeySchema": [
{ "AttributeName": "clientid", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "clientid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query the client connection state:
aws dynamodb scan --table-name mqtt_client --region us-west-2 --endpoint-url http://localhost:8000
{
"Items": [
{
"offline_at": { "N": "0" },
"node": { "S": "emqx@127.0.0.1" },
"clientid": { "S": "mqttjs_384b9c73a9" },
"connect_state": { "N": "1" },
"online_at": { "N": "1562224940" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
DynamoDB Subscription Table
mqtt_sub table stores MQTT subscriptions of clients:
{
"TableName": "mqtt_sub",
"KeySchema": [
{ "AttributeName": "clientid", "KeyType": "HASH" },
{ "AttributeName": "topic", "KeyType": "RANGE" }
],
"AttributeDefinitions": [
{ "AttributeName": "clientid", "AttributeType": "S" },
{ "AttributeName": "topic", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query topics subscribed by the client named "test-dynamo":
aws dynamodb scan --table-name mqtt_sub --region us-west-2 --endpoint-url http://localhost:8000
{
"Items": [{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub" }, "clientid": { "S": "test-dynamo" }},
{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-1"}, "clientid": { "S": "test-dynamo" }},
{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-2"}, "clientid": { "S": "test-dynamo" }}],
"Count": 3,
"ScannedCount": 3,
"ConsumedCapacity": null
}
DynamoDB Message Table
mqtt_msg stores MQTT messages:
{
"TableName": "mqtt_msg",
"KeySchema": [
{ "AttributeName": "msgid", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "msgid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
mqtt_topic_msg_map stores the mapping between topics and messages:
{
"TableName": "mqtt_topic_msg_map",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query mqtt_msg and mqtt_topic_msg_map after a client publishes a message to the "test" topic:
Query mqtt_msg:
aws dynamodb scan --table-name mqtt_msg --region us-west-2 --endpoint-url http://localhost:8000
> - {
>
> - "Items": \[
>
> - {
> "arrived": { "N": "1562308553" }, "qos": { "N": "1" },
> "sender": { "S": "mqttjs\_231b962d5c" }, "payload": { "S":
> "{ "msg": "Hello, World\!" }"}, "retain": { "N": "0" },
> "msgid": { "S":
> "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
> "topic": { "S": "test" }
>
> }
>
> \], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null
>
> }
Query mqtt_topic_msg_map:
aws dynamodb scan --table-name mqtt_topic_msg_map --region us-west-2 --endpoint-url http://localhost:8000
> - {
>
> - "Items": \[
>
> - {
> "topic": { "S": "test" }, "MsgId": { "SS": \[
> "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" \]}
>
> }
>
> \], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null
>
> }
DynamoDB Retained Message Table
mqtt_retain stores retained messages:
{
"TableName": "mqtt_retain",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query mqtt_retain after a client publishes a message to the "test" topic:
{
"Items": [
{
"arrived": { "N": "1562312113" },
"qos": { "N": "1" },
"sender": { "S": "mqttjs_d0513acfce" },
"payload": { "S": "test" },
"retain": { "N": "1" },
"msgid": { "S": "Mjg4MTk1NzE3MTY4MjYxMjA5MDExMDg0NTk5ODgzMjAyNTH" },
"topic": { "S": "testtopic" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
DynamoDB Acknowledgement Table
mqtt_acked stores acknowledgements from the clients:
{
"TableName": "mqtt_acked",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" },
{ "AttributeName": "clientid", "KeyType": "RANGE" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" },
{ "AttributeName": "clientid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query mqtt_acked after a client publishes a message to the "test" topic:
{
"Items": [
{
"topic": { "S": "test" },
"msgid": { "S": "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
"clientid": { "S": "mqttjs_861e582a70" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
Enable DynamoDB Backend
./bin/emqx_ctl plugins load emqx_backend_dynamo
InfluxDB Backend
Configure InfluxDB Server
Config file: etc/plugins/emqx_backend_influxdb.conf:
## InfluxDB UDP Server
backend.influxdb.pool1.server = 127.0.0.1:8089
## InfluxDB Pool Size
backend.influxdb.pool1.pool_size = 5
## Wether to add timestamp automatically
backend.influxdb.pool1.set_timestamp = true
backend.influxdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Parameters in hook rule:
Option | Description |
---|---|
topic | Configure which topics need to execute hooks |
action | Configure specific action for hook, function is a built-in function provided as Backend for general functions |
pool | Pool Name, used to connect multiple InfluxDB servers |
Example:
## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Description of InfluxDB Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
message.publish | # | on_message_publish | Store published messages |
Since MQTT Message cannot be written directly to InfluxDB, InfluxDB Backend provides an emqx_backend_influxdb.tmpl template file to convert MQTT Message to DataPoint that can be written to InfluxDB.
Template file use Json format:
key
- MQTT Topic, Json String, support wildcard charactersvalue
- Template, Json Object, used to convert MQTT Message intomeasurement,tag_key=tag_value,... field_key=field_value,... timestamp
and write to InfluxDB。
You can define different templates for different topics or multiple templates for the same topic, likes:
{
<Topic 1>: <Template 1>,
<Topic 2>: <Template 2>
}
Template format:
{
"measurement": <Measurement>,
"tags": {
<Tag Key>: <Tag Value>
},
"fields": {
<Field Key>: <Field Value>
},
"timestamp": <Timestamp>
}
measurement
and fields
are required options, tags
and timestamp
are optional.
All values (such as <Measurement>
) can be configured directly in the template as a fixed value that data types supported depending on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.
Currently, we support placeholders as follows:
Placeholder | Description |
---|---|
$id | MQTT Message UUID, assigned by EMQ X |
$clientid | Client ID used by the Client |
$username | Username used by the Client |
$peerhost | IP of Client |
$qos | QoS of MQTT Message |
$topic | Topic of MQTT Message |
$payload | Payload of MQTT Message, must be valid Json data |
$<Number> | It must be used with $paylaod to retrieve data from Json Array |
$timestamp | The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds |
$payload and $<Number>:
You can directly use $content
to obtain the complete message payload, you can use ["$payload", <Key>, ...]
to get the data inside the message payload.
For example payload
is {"data": {"temperature": 23.9}}
, you can via ["$payload", "data", "temperature"]
to get 23.9
.
In the case of array data type in Json, we introduced $0
and $<pos_integer>
, $0
means to get all elements in the array, and $<pos_integer>
means to get the <pos_integer> th element in the array.
A simple example, ["$payload", "$0", "temp"]
will get [20, 21]
from [{"temp": 20}, {"temp": 21}]
, and ["$payload", "$1", "temp"]
will only get 20
.
It is worth noting that when you use $0
, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into InfluxDB, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.
Example
data/templates directory provides a sample template (emqx_backend_influxdb_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:
{
"sample": {
"measurement": "$topic",
"tags": {
"host": ["$payload", "data", "$0", "host"],
"region": ["$payload", "data", "$0", "region"],
"qos": "$qos",
"clientid": "$clientid"
},
"fields": {
"temperature": ["$payload", "data", "$0", "temp"]
},
"timestamp": "$timestamp"
}
}
When an MQTT Message whose Topic is "sample" has the following Payload:
{
"data": [
{
"temp": 1,
"host": "serverA",
"region": "hangzhou"
},
{
"temp": 2,
"host": "serverB",
"region": "ningbo"
}
]
}
Backend converts MQTT messages to:
[
{
"measurement": "sample",
"tags": {
"clientid": "mqttjs_ebcc36079a",
"host": "serverA",
"qos": "0",
"region": "hangzhou",
},
"fields": {
"temperature": "1"
},
"timestamp": "1560743513626681000"
},
{
"measurement": "sample",
"tags": {
"clientid": "mqttjs_ebcc36079a",
"host": "serverB",
"qos": "0",
"region": "ningbo",
},
"fields": {
"temperature": "2"
},
"timestamp": "1560743513626681000"
}
]
The data was finally encoded and written to InfluxDB as follows:
"sample,clientid=mqttjs_6990f0e886,host=serverA,qos=0,region=hangzhou temperature=\"1\" 1560745505429670000\nsample,clientid=mqttjs_6990f0e886,host=serverB,qos=0,region=ningbo temperature=\"2\" 1560745505429670000\n"
Enable InfluxDB Backend
./bin/emqx_ctl plugins load emqx_backend_influxdb
OpenTSDB Backend
Configure OpenTSDB Server
Config file: etc/plugins/emqx_backend_opentsdb.conf:
## OpenTSDB Server
backend.opentsdb.pool1.server = 127.0.0.1:4242
## OpenTSDB Pool Size
backend.opentsdb.pool1.pool_size = 8
## Whether to return summary info
backend.opentsdb.pool1.summary = true
## Whether to return detailed info
##
## Value: true | false
backend.opentsdb.pool1.details = false
## Synchronous write or not
##
## Value: true | false
backend.opentsdb.pool1.sync = false
## Synchronous write timeout in milliseconds
##
## Value: Duration
##
## Default: 0
backend.opentsdb.pool1.sync_timeout = 0
## Max batch size
##
## Value: Number >= 0
## Default: 20
backend.opentsdb.pool1.max_batch_size = 20
## Store PUBLISH Messages
backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Parameters in hook rule:
Option | Description |
---|---|
topic | Configure which topics need to execute hooks |
action | Configure specific action for hook, function is a built-in function provided as Backend for general functions |
pool | Pool Name, used to connect multiple OpenTSDB servers |
Example:
## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Description of OpenTSDB Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
message.publish | # | on_message_publish | Store published messages |
Since MQTT Message cannot be written directly to OpenTSDB, OpenTSDB Backend provides an emqx_backend_opentsdb.tmpl template file to convert MQTT Message to DataPoint that can be written to OpenTSDB.
Template file use Json format:
key
- MQTT Topic, Json String, support wildcard charactersvalue
- Template, Json Object, used to convert MQTT Message intomeasurement,tag_key=tag_value,... field_key=field_value,... timestamp
and write to InfluxDB。
You can define different templates for different topics or multiple templates for the same topic, likes:
{
<Topic 1>: <Template 1>,
<Topic 2>: <Template 2>
}
The template format is as follows:
{
"measurement": <Measurement>,
"tags": {
<Tag Key>: <Tag Value>
},
"value": <Value>,
"timestamp": <Timestamp>
}
measurement
and value
are required options, tags
and timestamp
are optional.
All values (such as <Measurement>
) can be configured directly in the template as a fixed value that data types supported depending on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.
Currently, we support placeholders as follows:
Placeholder | Description |
---|---|
$id | MQTT Message UUID, assigned by EMQ X |
$clientid | Client ID used by the Client |
$username | Username used by the Client |
$peerhost | IP of Client |
$qos | QoS of MQTT Message |
$topic | Topic of MQTT Message |
$payload | Payload of MQTT Message, must be valid Json data |
$<Number> | It must be used with $paylaod to retrieve data from Json Array |
$timestamp | The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds |
$payload and $<Number>:
You can directly use $content
to obtain the complete message payload, you can use ["$payload", <Key>, ...]
to get the data inside the message payload.
For example payload
is {"data": {"temperature": 23.9}}
, you can via ["$payload", "data", "temperature"]
to get 23.9
.
In the case of array data type in Json, we introduced $0
and $<pos_integer>
, $0
means to get all elements in the array, and $<pos_integer>
means to get the <pos_integer>th element in the array.
A simple example, ["$payload", "$0", "temp"]
will get [20, 21]
from [{"temp": 20}, {"temp": 21}]
, and ["$payload", "$1", "temp"]
will only get 20
.
It is worth noting that when you use $0
, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into OpenTSDB, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.
Example
data/templates directory provides a sample template (emqx_backend_opentsdb_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:
{
"sample": {
"measurement": "$topic",
"tags": {
"host": ["$payload", "data", "$0", "host"],
"region": ["$payload", "data", "$0", "region"],
"qos": "$qos",
"clientid": "$clientid"
},
"value": ["$payload", "data", "$0", "temp"],
"timestamp": "$timestamp"
}
}
When an MQTT Message whose Topic is "sample" has the following Payload:
{
"data": [
{
"temp": 1,
"host": "serverA",
"region": "hangzhou"
},
{
"temp": 2,
"host": "serverB",
"region": "ningbo"
}
]
}
Backend converts MQTT messages into the following data and writes it to OpenTSDB:
[
{
"measurement": "sample",
"tags": {
"clientid": "mqttjs_ebcc36079a",
"host": "serverA",
"qos": "0",
"region": "hangzhou",
},
"value": "1",
"timestamp": "1560743513626681000"
},
{
"measurement": "sample",
"tags": {
"clientid": "mqttjs_ebcc36079a",
"host": "serverB",
"qos": "0",
"region": "ningbo",
},
"value": "2",
"timestamp": "1560743513626681000"
}
]
Enable OpenTSDB Backend
./bin/emqx_ctl plugins load emqx_backend_opentsdb
Timescale Backend
Configure Timescale Server
Config file: etc/plugins/emqx_backend_timescale.conf:
## Timescale Server
backend.timescale.pool1.server = 127.0.0.1:5432
## Timescale Pool Size
backend.timescale.pool1.pool_size = 8
## Timescale Username
backend.timescale.pool1.username = postgres
## Timescale Password
backend.timescale.pool1.password = password
## Timescale Database
backend.timescale.pool1.database = tutorial
## Timescale SSL
backend.timescale.pool1.ssl = false
## SSL keyfile.
##
## Value: File
## backend.timescale.pool1.keyfile =
## SSL certfile.
##
## Value: File
## backend.timescale.pool1.certfile =
## SSL cacertfile.
##
## Value: File
## backend.timescale.pool1.cacertfile =
## Store Publish Message
backend.timescale.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Parameters in hook rule:
Option | Description |
---|---|
topic | Configure which topics need to execute hooks |
action | Configure specific action for hook, function is a built-in function provided as Backend for general functions |
pool | Pool Name, used to connect multiple Timescale servers |
Example:
## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Description of Timescale Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
message.publish | # | on_message_publish | Store published messages |
Timescale Backend provides the template file named emqx_backend_timescale.tmpl
, which is used to extract data from MQTT messages with different topics for writing to Timescale.
Template file use Json format:
key
- MQTT Topic, Json String, support wildcard charactersvalue
- Template, Json Object, used to convert MQTT Message intomeasurement,tag_key=tag_value,... field_key=field_value,... timestamp
and write to InfluxDB。
You can define different templates for different topics or multiple templates for the same topic, likes:
{
<Topic 1>: <Template 1>,
<Topic 2>: <Template 2>
}
The template format is as follows:
{
"name": <Name of template>,
"sql": <SQL INSERT INTO>,
"param_keys": <Param Keys>
}
name
, sql
and param_keys
are required options.
name
can be any string, just make sure there are no duplicates.
sql
is SQL INSERT INTO statement for Timescale, like insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)
.
param_keys
is a array, its first element corresponds to $1
appearing in sql
and so on.
Any element in an array can be a fixed value, and the data type it supports depends on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.
Currently, we support placeholders as follows:
Placeholder | Description |
---|---|
$id | MQTT Message UUID, assigned by EMQ X |
$clientid | Client ID used by the Client |
$username | Username used by the Client |
$peerhost | IP of Client |
$qos | QoS of MQTT Message |
$topic | Topic of MQTT Message |
$payload | Payload of MQTT Message, must be valid Json data |
$<Number> | It must be used with $paylaod to retrieve data from Json Array |
$timestamp | The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds |
$payload and $<Number>:
You can directly use $content
to obtain the complete message payload, you can use ["$payload", <Key>, ...]
to get the data inside the message payload.
For example payload
is {"data": {"temperature": 23.9}}
, you can via ["$payload", "data", "temperature"]
to get 23.9
.
In the case of array data type in Json, we introduced $0
and $<pos_integer>
, $0
means to get all elements in the array, and $<pos_integer>
means to get the <pos_integer> th element in the array.
A simple example, ["$payload", "$0", "temp"]
will get [20, 21]
from [{"temp": 20}, {"temp": 21}]
, and ["$payload", "$1", "temp"]
will only get 20
.
It is worth noting that when you use $0
, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into Timescale, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.
Example
data/templates directory provides a sample template (emqx_backend_timescale_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:
{
"sensor_data": {
"name": "insert_sensor_data",
"sql": "insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)",
"param_keys": [
["$payload", "data", "$0", "location"],
["$payload", "data", "$0", "temperature"],
["$payload", "data", "$0", "humidity"]
]
},
"sensor_data2/#": {
"name": "insert_sensor_data2",
"sql": "insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)",
"param_keys": [
["$payload", "location"],
["$payload", "temperature"],
["$payload", "humidity"]
]
},
"easy_data": {
"name": "insert_easy_data",
"sql": "insert into easy_data(time, data) values (NOW(), $1)",
"param_keys": [
"$payload"
]
}
}
When an MQTT Message whose Topic is "sensor_data" has the following Payload:
{
"data":[
{
"location":"bedroom",
"temperature":21.3,
"humidity":40.3
},
{
"location":"bathroom",
"temperature":22.3,
"humidity":61.8
},
{
"location":"kitchen",
"temperature":29.5,
"humidity":58.7
}
]
}
["$payload", "data", "$0", "location"]
will extract Payload from MQTT Message first.
If the format of Payload is json, backend continue to extract data
from Payload.
And the value of data
is an array, we use $0
to gets all elements in the array.
["$payload", "data", "$0", "location"]
will help us get ["bedroom", "bathroom", "kitchen"]
finally.
Accordingly if you replace $0
with $1
, you get only ["bedroom"]
.
So in this scene, we will get the following SQL statement:
insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'bedroom', 21.3, 40.3)
insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'bathroom', 22.3, 61.8)
insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'kitchen', 29.5, 58.7)
Eventually Timescale Backend executes these SQL statements to write data to Timescale.