# Cassandra Backend
TIP
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to Cassandra to setup Save data to Cassandra in rule engine.
Config file: etc/plugins/emqx_backend_cassa.conf
# Configure Cassandra Cluster
Multi node Cassandra cluster is supported:
## Cassandra Node backend.ecql.pool1.nodes = 127.0.0.1:9042 ## Cassandra Pool Size backend.ecql.pool1.size = 8 ## Cassandra auto reconnect flag backend.ecql.pool1.auto_reconnect = 1 ## Cassandra Username backend.ecql.pool1.username = cassandra ## Cassandra Password backend.ecql.pool1.password = cassandra ## Cassandra Keyspace backend.ecql.pool1.keyspace = mqtt ## Cassandra Logger type backend.ecql.pool1.logger = info ## Max number of fetch offline messages. Without count limit if infinity ## backend.cassa.max_returned_count = 500 ## Time Range. Without time limit if infinity ## d - day ## h - hour ## m - minute ## s - second ## backend.cassa.time_range = 2h
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Configure Cassandra Persistence Hooks
## Client Connected Record backend.cassa.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"} ## Subscribe Lookup Record backend.cassa.hook.client.connected.2 = {"action": {"function": "on_subscription_lookup"}, "pool": "pool1"} ## Client DisConnected Record backend.cassa.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"} ## Lookup Unread Message QOS > 0 backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"} ## Lookup Retain Message backend.cassa.hook.session.subscribed.2 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"} ## Store Publish Message QOS > 0 backend.cassa.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"} ## Delete Acked Record backend.cassa.hook.session.unsubscribed.1= {"topic": "#", "action": {"cql": ["delete from acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"} ## Store Retain Message backend.cassa.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"} ## Delete Retain Message backend.cassa.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"} ## Store Ack backend.cassa.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"} ## Get offline messages ### "offline_opts": Get configuration for offline messages ### max_returned_count: Maximum number of offline messages get at a time ### time_range: Get only messages in the current time range ## backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"} ## If you need to store Qos0 messages, you can enable the following configuration ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice ## backend.cassa.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Description of Cassandra Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
# CQL Parameters Description
Customized CQL command parameters includes:
hook | Parameter | Example (${name} in CQL represents available parameter |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
# Configure 'action' with CQL
Cassandra backend supports CLQ in 'action':
## After a client is connected to the EMQX server, it executes a CQL command(multiple command also supported): backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
Copied!
2
# Initializing Cassandra
Create KeySpace:
CREATE KEYSPACE mqtt WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE mqtt;
Copied!
2
Import Cassandra tables:
cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"
Copied!
TIP
KeySpace is free of choice
# Cassandra Client Connection Table
mqtt.client stores client connection states:
CREATE TABLE mqtt.client ( clientid text, node text, state int, connected timestamp, disconnected timestamp, PRIMARY KEY(clientid) );
Copied!
2
3
4
5
6
7
8
Query a client's connection state:
select * from mqtt.client where clientid = ${clientid};
Copied!
If client 'test' is online:
select * from mqtt.client where clientid = 'test'; clientid | connected | disconnected | node | state -----------+---------------------------------+---------------+---------------+------- test | 2017-02-14 08:27:29.872000+0000 | null | emqx@127.0.0.1| 1
Copied!
2
3
4
5
Client 'test' is offline:
select * from mqtt.client where clientid = 'test'; clientid | connected | disconnected | node | state -----------+---------------------------------+---------------------------------+---------------+------- test | 2017-02-14 08:27:29.872000+0000 | 2017-02-14 08:27:35.872000+0000 | emqx@127.0.0.1| 0
Copied!
2
3
4
5
# Cassandra Subscription Table
mqtt.sub stores subscriptions of clients:
CREATE TABLE mqtt.sub ( clientid text, topic text, qos int, PRIMARY KEY(clientid, topic) );
Copied!
2
3
4
5
6
Client 'test' subscribes to topic 'test_topic1' and 'test_topic2':
insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic1', 1); insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic2', 2);
Copied!
2
Query subscriptions of a client:
select * from mqtt_sub where clientid = ${clientid};
Copied!
Query subscriptions of client 'test':
select * from mqtt_sub where clientid = 'test'; clientid | topic | qos -----------+-------------+----- test | test_topic1 | 1 test | test_topic2 | 2
Copied!
2
3
4
5
6
# Cassandra Message Table
mqtt.msg stores MQTT messages:
CREATE TABLE mqtt.msg ( topic text, msgid text, sender text, qos int, retain int, payload blob, arrived timestamp, PRIMARY KEY(topic, msgid) ) WITH CLUSTERING ORDER BY (msgid DESC);
Copied!
2
3
4
5
6
7
8
9
10
Query messages published by a client:
select * from mqtt_msg where sender = ${clientid};
Copied!
Query messages published by client 'test':
select * from mqtt_msg where sender = 'test'; topic | msgid | arrived | payload | qos | retain | sender -------+----------------------+---------------------------------+--------------+-----+--------+-------- hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
Copied!
2
3
4
5
6
# Cassandra Retained Message Table
mqtt.retain stores retained messages:
CREATE TABLE mqtt.retain ( topic text, msgid text, PRIMARY KEY(topic) );
Copied!
2
3
4
5
Query retained messages:
select * from mqtt_retain where topic = ${topic};
Copied!
Query retained messages with topic 'retain':
select * from mqtt_retain where topic = 'retain'; topic | msgid --------+---------------------- retain | 2PguFrHsrzEvIIBdctmb
Copied!
2
3
4
5
# Cassandra Acknowledgement Table
mqtt.acked stores acknowledgements from the clients:
CREATE TABLE mqtt.acked ( clientid text, topic text, msgid text, PRIMARY KEY(clientid, topic) );
Copied!
2
3
4
5
6
# Enable Cassandra Backend
./bin/emqx_ctl plugins load emqx_backend_cassa
Copied!
- Configure Cassandra Cluster
- Configure Cassandra Persistence Hooks
- Description of Cassandra Persistence Hooks
- CQL Parameters Description
- Configure 'action' with CQL
- Initializing Cassandra
- Cassandra Client Connection Table
- Cassandra Subscription Table
- Cassandra Message Table
- Cassandra Retained Message Table
- Cassandra Acknowledgement Table
- Enable Cassandra Backend