# Cassandra Backend

TIP

After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to Cassandra to setup Save data to Cassandra in rule engine.

Config file: etc/plugins/emqx_backend_cassa.conf

# Configure Cassandra Cluster

Multi node Cassandra cluster is supported:

## Cassandra Node
backend.ecql.pool1.nodes = 127.0.0.1:9042

## Cassandra Pool Size
backend.ecql.pool1.size = 8

## Cassandra auto reconnect flag
backend.ecql.pool1.auto_reconnect = 1

## Cassandra Username
backend.ecql.pool1.username = cassandra

## Cassandra Password
backend.ecql.pool1.password = cassandra

## Cassandra Keyspace
backend.ecql.pool1.keyspace = mqtt

## Cassandra Logger type
backend.ecql.pool1.logger = info

## Max number of fetch offline messages. Without count limit if infinity
## backend.cassa.max_returned_count = 500

## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.cassa.time_range = 2h
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# Configure Cassandra Persistence Hooks

## Client Connected Record
backend.cassa.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

## Subscribe Lookup Record
backend.cassa.hook.client.connected.2    = {"action": {"function": "on_subscription_lookup"}, "pool": "pool1"}

## Client DisConnected Record
backend.cassa.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

## Lookup Unread Message QOS > 0
backend.cassa.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}

## Lookup Retain Message
backend.cassa.hook.session.subscribed.2  = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## Store Publish Message  QOS > 0
backend.cassa.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Delete Acked Record
backend.cassa.hook.session.unsubscribed.1= {"topic": "#", "action": {"cql": ["delete from acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}

## Store Retain Message
backend.cassa.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

## Delete Retain Message
backend.cassa.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

## Store Ack
backend.cassa.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## Get offline messages
### "offline_opts": Get configuration for offline messages
### max_returned_count: Maximum number of offline messages get at a time
### time_range: Get only messages in the current time range
## backend.cassa.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}

## If you need to store Qos0 messages, you can enable the following configuration
## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.cassa.hook.message.publish.4     = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# Description of Cassandra Persistence Hooks

hook topic action Description
client.connected on_client_connected Store client connected state
client.connected on_subscribe_lookup Subscribed topics
client.disconnected on_client_disconnected Store client disconnected state
session.subscribed # on_message_fetch Fetch offline messages
session.subscribed # on_retain_lookup Lookup retained messages
message.publish # on_message_publish Store published messages
message.publish # on_message_retain Store retained messages
message.publish # on_retain_delete Delete retained messages
message.acked # on_message_acked Process ACK

# CQL Parameters Description

Customized CQL command parameters includes:

hook Parameter Example (${name} in CQL represents available parameter
client.connected clientid insert into conn(clientid) values(${clientid})
client.disconnected clientid insert into disconn(clientid) values(${clientid})
session.subscribed clientid, topic, qos insert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribed clientid, topic delete from sub where topic = ${topic}
message.publish msgid, topic, payload, qos, clientid insert into msg(msgid, topic) values(${msgid}, ${topic})
message.acked msgid, topic, clientid insert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivered msgid, topic, clientid insert into delivered(msgid, topic) values(${msgid}, ${topic})

# Configure 'action' with CQL

Cassandra backend supports CLQ in 'action':

## After a client is connected to the EMQX server, it executes a CQL command(multiple command also supported):
backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
Copied!
1
2

# Initializing Cassandra

Create KeySpace:

CREATE KEYSPACE mqtt WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USE mqtt;
Copied!
1
2

Import Cassandra tables:

cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"
Copied!
1

TIP

KeySpace is free of choice

# Cassandra Client Connection Table

mqtt.client stores client connection states:

CREATE TABLE mqtt.client (
    clientid text,
    node text,
    state int,
    connected timestamp,
    disconnected timestamp,
    PRIMARY KEY(clientid)
);
Copied!
1
2
3
4
5
6
7
8

Query a client's connection state:

select * from mqtt.client where clientid = ${clientid};
Copied!
1

If client 'test' is online:

select * from mqtt.client where clientid = 'test';

    clientid | connected                       | disconnected  | node          | state
-----------+---------------------------------+---------------+---------------+-------
        test | 2017-02-14 08:27:29.872000+0000 |          null | emqx@127.0.0.1|     1
Copied!
1
2
3
4
5

Client 'test' is offline:

select * from mqtt.client where clientid = 'test';

    clientid | connected                       | disconnected                    | node          | state
-----------+---------------------------------+---------------------------------+---------------+-------
        test | 2017-02-14 08:27:29.872000+0000 | 2017-02-14 08:27:35.872000+0000 | emqx@127.0.0.1|     0
Copied!
1
2
3
4
5

# Cassandra Subscription Table

mqtt.sub stores subscriptions of clients:

CREATE TABLE mqtt.sub (
    clientid text,
    topic text,
    qos int,
    PRIMARY KEY(clientid, topic)
);
Copied!
1
2
3
4
5
6

Client 'test' subscribes to topic 'test_topic1' and 'test_topic2':

insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic1', 1);
insert into mqtt.sub(clientid, topic, qos) values('test', 'test_topic2', 2);
Copied!
1
2

Query subscriptions of a client:

select * from mqtt_sub where clientid = ${clientid};
Copied!
1

Query subscriptions of client 'test':

select * from mqtt_sub where clientid = 'test';

    clientid | topic       | qos
-----------+-------------+-----
        test | test_topic1 |   1
        test | test_topic2 |   2
Copied!
1
2
3
4
5
6

# Cassandra Message Table

mqtt.msg stores MQTT messages:

CREATE TABLE mqtt.msg (
    topic text,
    msgid text,
    sender text,
    qos int,
    retain int,
    payload blob,
    arrived timestamp,
    PRIMARY KEY(topic, msgid)
    ) WITH CLUSTERING ORDER BY (msgid DESC);
Copied!
1
2
3
4
5
6
7
8
9
10

Query messages published by a client:

select * from mqtt_msg where sender = ${clientid};
Copied!
1

Query messages published by client 'test':

select * from mqtt_msg where sender = 'test';

    topic | msgid                | arrived                         | payload      | qos | retain | sender
-------+----------------------+---------------------------------+--------------+-----+--------+--------
    hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! |   1 |      0 |   test
    world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! |   1 |      0 |   test
Copied!
1
2
3
4
5
6

# Cassandra Retained Message Table

mqtt.retain stores retained messages:

CREATE TABLE mqtt.retain (
    topic text,
    msgid text,
    PRIMARY KEY(topic)
);
Copied!
1
2
3
4
5

Query retained messages:

select * from mqtt_retain where topic = ${topic};
Copied!
1

Query retained messages with topic 'retain':

select * from mqtt_retain where topic = 'retain';

    topic  | msgid
--------+----------------------
    retain | 2PguFrHsrzEvIIBdctmb
Copied!
1
2
3
4
5

# Cassandra Acknowledgement Table

mqtt.acked stores acknowledgements from the clients:

CREATE TABLE mqtt.acked (
    clientid text,
    topic text,
    msgid text,
    PRIMARY KEY(clientid, topic)
);
Copied!
1
2
3
4
5
6

# Enable Cassandra Backend

./bin/emqx_ctl plugins load emqx_backend_cassa
Copied!
1