# Redis Backend

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to Redis to setup Save data to Redis in rule engine.

Config file: emqx_backend_redis.conf

# Configure the Redis Server

Config Connection Pool of Multiple Redis Servers:

## Redis Server
backend.redis.pool1.server = 127.0.0.1:6379

## Redis Sentinel
## backend.redis.server = 127.0.0.1:26378

##Redis Sentinel Cluster name
## backend.redis.sentinel = mymaster

## Redis Pool Size
backend.redis.pool1.pool_size = 8

## Redis database
backend.redis.pool1.database = 1

## Redis subscribe channel
backend.redis.pool1.channel = mqtt_channel
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# Configure Persistence Hooks

## Expired after seconds, if =< 0 take the default value
backend.redis.msg.expired_after = 3600

## Client Connected Record
backend.redis.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

## Subscribe Lookup Record
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

## Client DisConnected Record
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

## Lookup Unread Message for one QOS > 0
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## Lookup Unread Message for many QOS > 0
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

## Lookup Retain Message
backend.redis.hook.session.subscribed.3  = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## Store Publish Message  QOS > 0
backend.redis.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Store Retain Message
backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

## Delete Retain Message
backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

## Store Ack for one
backend.redis.hook.message.acked.1       = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}

## Store Ack for many
backend.redis.hook.message.acked.2       = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# Description of Persistence Hooks

hook topic action/function Description
client.connected on_client_connected Store client connected state
client.connected on_subscribe_lookup Subscribe to topics
client.disconnected on_client_disconnected Store the client disconnected state
session.subscribed queue/# on_message_fetch_for_queue Fetch one to one offline message
session.subscribed pubsub/# on_message_fetch_for_pubsub Fetch one to many offline message
session.subscribed # on_retain_lookup Lookup retained message
message.publish # on_message_publish Store the published messages
message.publish # on_message_retain Store retained messages
message.publish # on_retain_delete Delete retained messages
message.acked queue/# on_message_acked_for_queue Process ACK of one to one messages
message.acked pubsub/# on_message_acked_for_pubsub Process ACK of one to many messages

# Redis Command Line Parameters

hook Parameter Example (Fields separated exactly by one space)
client.connected clientid SET conn:${clientid} clientid
client.disconnected clientid SET disconn:${clientid} clientid
session.subscribed clientid, topic, qos HSET sub:${clientid} topic qos
session.unsubscribed clientid, topic SET unsub:${clientid} topic
message.publish message, msgid, topic, payload, qos, clientid RPUSH pub:${topic} msgid
message.acked msgid, topic, clientid HSET ack:${clientid} topic msgid
message.delivered msgid, topic, clientid HSET delivered:${clientid} topic msgid

# Configure 'action' with Redis Commands

Redis backend supports raw 'commands' in 'action', e.g.:

## After a client connected to the EMQX server, it executes a redis command (multiple redis commands also supported)
backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} clientid"]}, "pool": "pool1"}
Copied!
1
2

# Using Redis Hash for Devices' Connection State

mqtt:client Hash for devices' connection state:

hmset
key = mqtt:client:${clientid}
value = {state:int, online_at:timestamp, offline_at:timestamp}

hset
key = mqtt:node:${node}
field = ${clientid}
value = ${ts}
Copied!
1
2
3
4
5
6
7
8

Lookup devices' connection state:

HGETALL "mqtt:client:${clientId}"
Copied!
1

E.g.: Client with ClientId 'test' goes online:

HGETALL mqtt:client:test
1) "state"
2) "1"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "undefined"
Copied!
1
2
3
4
5
6
7

Client with ClientId 'test' goes offline:

HGETALL mqtt:client:test
1) "state"
2) "0"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "1481685924"
Copied!
1
2
3
4
5
6
7

# Using Redis Hash for Retained Messages

mqtt:retain Hash for retained messages:

hmset
key = mqtt:retain:${topic}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
Copied!
1
2
3

Lookup retained message:

HGETALL "mqtt:retain:${topic}"
Copied!
1

Lookup retained messages with a topic of 'retain':

HGETALL mqtt:retain:topic
    1) "id"

>   -     2) "6P9NLcJ65VXBbC22sYb4"
>     3)  "from"
>   -     4) "test"
>     5)  "qos"
>     6)  "1"
>     7)  "topic"
>     8)  "topic"
>     9)  "retain"
>   - 10\) "true"
>     11) "payload"
>     12) "Hello world\!"
>     13) "ts"
>     14) "1481690659"

Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# Using Redis Hash for messages

mqtt:msg Hash for MQTT messages:

hmset
key = mqtt:msg:${msgid}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}

zadd
key = mqtt:msg:${topic}
field = 1
value = ${msgid}
Copied!
1
2
3
4
5
6
7
8

# Using Redis Set for Message Acknowledgements

mqtt:acked SET stores acknowledgements from the clients:

set
key = mqtt:acked:${clientid}:${topic}
value = ${msgid}
Copied!
1
2
3

# Using Redis Hash for Subscription

mqtt:sub Hash for Subscriptions:

hset
key = mqtt:sub:${clientid}
field = ${topic}
value = ${qos}
Copied!
1
2
3
4

A client subscribes to a topic:

HSET mqtt:sub:${clientid} ${topic} ${qos}
Copied!
1

A client with ClientId of 'test' subscribes to topic1 and topic2:

HSET "mqtt:sub:test" "topic1" 1
HSET "mqtt:sub:test" "topic2" 2
Copied!
1
2

Lookup the subscribed topics of client with ClientId of 'test':

HGETALL mqtt:sub:test
1) "topic1"
2) "1"
3) "topic2"
4) "2"
Copied!
1
2
3
4
5

# Redis SUB/UNSUB Publish

When a device subscribes / unsubscribes to topics, EMQX broker publish an event to the Redis:

PUBLISH
channel = "mqtt_channel"
message = {type: string , topic: string, clientid: string, qos: int}
\*type: [subscribe/unsubscribe]
Copied!
1
2
3
4

client with ClientID 'test' subscribe to 'topic0':

PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"
Copied!
1

Client with ClientId 'test' unsubscribes to 'test_topic0':

PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"
Copied!
1

# Enable Redis Backend

./bin/emqx_ctl plugins load emqx_backend_redis
Copied!
1