# Redis Backend
TIP
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to Redis to setup Save data to Redis in rule engine.
Config file: emqx_backend_redis.conf
# Configure the Redis Server
Config Connection Pool of Multiple Redis Servers:
## Redis Server backend.redis.pool1.server = 127.0.0.1:6379 ## Redis Sentinel ## backend.redis.server = 127.0.0.1:26378 ##Redis Sentinel Cluster name ## backend.redis.sentinel = mymaster ## Redis Pool Size backend.redis.pool1.pool_size = 8 ## Redis database backend.redis.pool1.database = 1 ## Redis subscribe channel backend.redis.pool1.channel = mqtt_channel
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Configure Persistence Hooks
## Expired after seconds, if =< 0 take the default value backend.redis.msg.expired_after = 3600 ## Client Connected Record backend.redis.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"} ## Subscribe Lookup Record backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"} ## Client DisConnected Record backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"} ## Lookup Unread Message for one QOS > 0 backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"} ## Lookup Unread Message for many QOS > 0 backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"} ## Lookup Retain Message backend.redis.hook.session.subscribed.3 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"} ## Store Publish Message QOS > 0 backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"} ## Store Retain Message backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"} ## Delete Retain Message backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"} ## Store Ack for one backend.redis.hook.message.acked.1 = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"} ## Store Ack for many backend.redis.hook.message.acked.2 = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Description of Persistence Hooks
hook | topic | action/function | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribe to topics | |
client.disconnected | on_client_disconnected | Store the client disconnected state | |
session.subscribed | queue/# | on_message_fetch_for_queue | Fetch one to one offline message |
session.subscribed | pubsub/# | on_message_fetch_for_pubsub | Fetch one to many offline message |
session.subscribed | # | on_retain_lookup | Lookup retained message |
message.publish | # | on_message_publish | Store the published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | queue/# | on_message_acked_for_queue | Process ACK of one to one messages |
message.acked | pubsub/# | on_message_acked_for_pubsub | Process ACK of one to many messages |
# Redis Command Line Parameters
hook | Parameter | Example (Fields separated exactly by one space) |
---|---|---|
client.connected | clientid | SET conn:${clientid} clientid |
client.disconnected | clientid | SET disconn:${clientid} clientid |
session.subscribed | clientid, topic, qos | HSET sub:${clientid} topic qos |
session.unsubscribed | clientid, topic | SET unsub:${clientid} topic |
message.publish | message, msgid, topic, payload, qos, clientid | RPUSH pub:${topic} msgid |
message.acked | msgid, topic, clientid | HSET ack:${clientid} topic msgid |
message.delivered | msgid, topic, clientid | HSET delivered:${clientid} topic msgid |
# Configure 'action' with Redis Commands
Redis backend supports raw 'commands' in 'action', e.g.:
## After a client connected to the EMQX server, it executes a redis command (multiple redis commands also supported) backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} clientid"]}, "pool": "pool1"}
Copied!
2
# Using Redis Hash for Devices' Connection State
mqtt:client Hash for devices' connection state:
hmset key = mqtt:client:${clientid} value = {state:int, online_at:timestamp, offline_at:timestamp} hset key = mqtt:node:${node} field = ${clientid} value = ${ts}
Copied!
2
3
4
5
6
7
8
Lookup devices' connection state:
HGETALL "mqtt:client:${clientId}"
Copied!
E.g.: Client with ClientId 'test' goes online:
HGETALL mqtt:client:test 1) "state" 2) "1" 3) "online_at" 4) "1481685802" 5) "offline_at" 6) "undefined"
Copied!
2
3
4
5
6
7
Client with ClientId 'test' goes offline:
HGETALL mqtt:client:test 1) "state" 2) "0" 3) "online_at" 4) "1481685802" 5) "offline_at" 6) "1481685924"
Copied!
2
3
4
5
6
7
# Using Redis Hash for Retained Messages
mqtt:retain Hash for retained messages:
hmset key = mqtt:retain:${topic} value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
Copied!
2
3
Lookup retained message:
HGETALL "mqtt:retain:${topic}"
Copied!
Lookup retained messages with a topic of 'retain':
HGETALL mqtt:retain:topic 1) "id" > - 2) "6P9NLcJ65VXBbC22sYb4" > 3) "from" > - 4) "test" > 5) "qos" > 6) "1" > 7) "topic" > 8) "topic" > 9) "retain" > - 10\) "true" > 11) "payload" > 12) "Hello world\!" > 13) "ts" > 14) "1481690659"
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Using Redis Hash for messages
mqtt:msg Hash for MQTT messages:
hmset key = mqtt:msg:${msgid} value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp} zadd key = mqtt:msg:${topic} field = 1 value = ${msgid}
Copied!
2
3
4
5
6
7
8
# Using Redis Set for Message Acknowledgements
mqtt:acked SET stores acknowledgements from the clients:
set key = mqtt:acked:${clientid}:${topic} value = ${msgid}
Copied!
2
3
# Using Redis Hash for Subscription
mqtt:sub Hash for Subscriptions:
hset key = mqtt:sub:${clientid} field = ${topic} value = ${qos}
Copied!
2
3
4
A client subscribes to a topic:
HSET mqtt:sub:${clientid} ${topic} ${qos}
Copied!
A client with ClientId of 'test' subscribes to topic1 and topic2:
HSET "mqtt:sub:test" "topic1" 1 HSET "mqtt:sub:test" "topic2" 2
Copied!
2
Lookup the subscribed topics of client with ClientId of 'test':
HGETALL mqtt:sub:test 1) "topic1" 2) "1" 3) "topic2" 4) "2"
Copied!
2
3
4
5
# Redis SUB/UNSUB Publish
When a device subscribes / unsubscribes to topics, EMQX broker publish an event to the Redis:
PUBLISH channel = "mqtt_channel" message = {type: string , topic: string, clientid: string, qos: int} \*type: [subscribe/unsubscribe]
Copied!
2
3
4
client with ClientID 'test' subscribe to 'topic0':
PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"
Copied!
Client with ClientId 'test' unsubscribes to 'test_topic0':
PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"
Copied!
# Enable Redis Backend
./bin/emqx_ctl plugins load emqx_backend_redis
Copied!
- Configure the Redis Server
- Configure Persistence Hooks
- Description of Persistence Hooks
- Redis Command Line Parameters
- Configure 'action' with Redis Commands
- Using Redis Hash for Devices' Connection State
- Using Redis Hash for Retained Messages
- Using Redis Hash for messages
- Using Redis Set for Message Acknowledgements
- Using Redis Hash for Subscription
- Redis SUB/UNSUB Publish
- Enable Redis Backend