# 集成 Redis

提示

EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 Redis规则引擎中创建 保存数据到 Redis

配置文件: emqx_backend_redis.conf

# 配置 Redis 服务器

支持配置多台 Redis 服务器连接池:

## Redis 服务集群类型: single | sentinel | cluster
backend.redis.pool1.type = single

## Redis 服务器地址列表
backend.redis.pool1.server = 127.0.0.1:6379

## Redis sentinel 模式下的 sentinel 名称
## backend.redis.pool1.sentinel = mymaster

## Redis 连接池大小
backend.redis.pool1.pool_size = 8

## Redis 数据库名称
backend.redis.pool1.database = 0

## Redis 密码
## backend.redis.pool1.password =

## 订阅的 Redis channel 名称
backend.redis.pool1.channel = mqtt_channel
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 配置 Redis 存储规则

backend.redis.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
backend.redis.hook.session.created.1     = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
backend.redis.hook.session.subscribed.3  = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
backend.redis.hook.session.unsubscribed.1= {"topic": "#", "action": {"commands": ["DEL mqtt:acked:${clientid}:${topic}"]}, "pool": "pool1"}
backend.redis.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "expired_time" : 3600, "pool": "pool1"}
backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "expired_time" : 3600, "pool": "pool1"}
backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
backend.redis.hook.message.acked.1       = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
backend.redis.hook.message.acked.2       = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}

## backend.redis.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch_for_keep_latest"}, "pool": "pool1"}
## backend.redis.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_store_keep_latest"}, "expired_time" : 3600, "pool": "pool1"}
## backend.redis.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked_for_keep_latest"}, "pool": "pool1"}
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# Redis 存储规则说明

hook topic action/function 说明
client.connected on_client_connected 存储客户端在线状态
session.created on_subscribe_lookup 订阅主题
client.disconnected on_client_disconnected 存储客户端离线状态
session.subscribed queue/# on_message_fetch_for_queue 获取一对一离线消息
session.subscribed pubsub/# on_message_fetch_for_pubsub 获取一对多离线消息
session.subscribed # on_retain_lookup 获取 retain 消息
session.unsubscribed # 删除 acked 消息
message.publish # on_message_publish 存储发布消息
message.publish # on_message_retain 存储 retain 消息
message.publish # on_retain_delete 删除 retain 消息
message.acked queue/# on_message_acked_for_queue 一对一消息 ACK 处理
message.acked pubsub/# on_message_acked_for_pubsub 一对多消息 ACK 处理

# Redis 命令行参数说明

hook 可用参数 示例(每个字段分隔,必须是一个空格)
client.connected clientid SET conn:${clientid} ${clientid}
client.disconnected clientid SET disconn:${clientid} ${clientid}
session.subscribed clientid, topic, qos HSET sub:${clientid} ${topic} ${qos}
session.unsubscribed clientid, topic SET unsub:${clientid} ${topic}
message.publish message, msgid, topic, payload, qos, clientid RPUSH pub:${topic} ${msgid}
message.acked msgid, topic, clientid HSET ack:${clientid} ${topic} ${msgid}
message.deliver msgid, topic, clientid HSET deliver:${clientid} ${topic} ${msgid}

# Redis 命令行配置 Action

Redis 存储支持用户采用 Redis Commands 语句配置 Action,例如:

## 在客户端连接到 EMQX 服务器后,执行一条 redis
backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} ${clientid}"]}, "pool": "pool1"}
Copied!
1
2

# Redis 设备在线状态 Hash

mqtt:client Hash 存储设备在线状态:

hmset
key = mqtt:client:${clientid}
value = {state:int, online_at:timestamp, offline_at:timestamp}

hset
key = mqtt:node:${node}
field = ${clientid}
value = ${ts}
Copied!
1
2
3
4
5
6
7
8

查询设备在线状态:

HGETALL "mqtt:client:${clientId}"
Copied!
1

例如 ClientId 为 test 客户端上线:

HGETALL mqtt:client:test
1) "state"
2) "1"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "undefined"
Copied!
1
2
3
4
5
6
7

例如 ClientId 为 test 客户端下线:

HGETALL mqtt:client:test
1) "state"
2) "0"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "1481685924"
Copied!
1
2
3
4
5
6
7

# Redis 保留消息 Hash

mqtt:retain Hash 存储 Retain 消息:

hmset
key = mqtt:retain:${topic}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
Copied!
1
2
3

查询 retain 消息:

HGETALL "mqtt:retain:${topic}"
Copied!
1

例如查看 topic 为 topic 的 retain 消息:

    HGETALL mqtt:retain:topic
     1) "id"

>   -     2) "6P9NLcJ65VXBbC22sYb4"
>     3)  "from"
>   -     4) "test"
>     5)  "qos"
>     6)  "1"
>     7)  "topic"
>     8)  "topic"
>     9)  "retain"
>   - 10\) "true"
>     11) "payload"
>     12) "Hello world\!"
>     13) "ts"
>     14) "1481690659"
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# Redis 消息存储 Hash

mqtt:msg Hash 存储 MQTT 消息:

hmset
key = mqtt:msg:${msgid}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}

zadd
key = mqtt:msg:${topic}
field = 1
value = ${msgid}
Copied!
1
2
3
4
5
6
7
8

# Redis 消息确认 SET

mqtt:acked SET 存储客户端消息确认:

set
key = mqtt:acked:${clientid}:${topic}
value = ${msgid}
Copied!
1
2
3

# Redis 订阅存储 Hash

mqtt:sub Hash 存储订阅关系:

hset
key = mqtt:sub:${clientid}
field = ${topic}
value = ${qos}
Copied!
1
2
3
4

某个客户端订阅主题:

HSET mqtt:sub:${clientid} ${topic} ${qos}
Copied!
1

例如为 ClientId 为 test 的客户端订阅主题 topic1, topic2 :

HSET "mqtt:sub:test" "topic1" 1
HSET "mqtt:sub:test" "topic2" 2
Copied!
1
2

查询 ClientId 为 test 的客户端已订阅主题:

HGETALL mqtt:sub:test
1) "topic1"
2) "1"
3) "topic2"
4) "2"
Copied!
1
2
3
4
5

# Redis SUB/UNSUB 事件发布

设备需要订阅/取消订阅主题时,业务服务器向 Redis 发布事件消息:

PUBLISH
channel = "mqtt_channel"
message = {type: string , topic: string, clientid: string, qos: int}
\*type: [subscribe/unsubscribe]
Copied!
1
2
3
4

例如 ClientId 为 test 客户端订阅主题 topic0:

PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"
Copied!
1

例如 ClientId 为 test 客户端取消订阅主题:

PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"
Copied!
1

提示

Redis Cluster 无法使用 Redis PUB/SUB 功能。

# 启用 Redis 数据存储插件

./bin/emqx_ctl plugins load emqx_backend_redis
Copied!
1