Ingest Data into Cassandra
Install Cassandra Server and Create Keyspace and Tables
Set up a Cassandra database, and change the root/password to root/public, taking Mac OSX for instance:
$ brew install cassandra
## change the config file to enable authentication
$ vim /usr/local/etc/cassandra/cassandra.yaml
authenticator: PasswordAuthenticator
authorizer: CassandraAuthorizer
$ brew services start cassandra
## login to cql shell and then create the root user
$ cqlsh -ucassandra -pcassandra
cassandra@cqlsh> create user root with password 'public' superuser;
Initiate Cassandra Table:
$ cqlsh -uroot -ppublic
Create Keyspace "test":
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
Create "t_mqtt_msg" table:
USE test;
CREATE TABLE t_mqtt_msg (
msgid text,
topic text,
qos int,
payload text,
retain int,
arrived timestamp,
PRIMARY KEY (msgid, topic)
);
Create a Rule
Go to EMQX Dashboard, select the "rule" tab on the menu to the left.
Select "message.publish", then type in the following SQL:
SELECT
*
FROM
"message.publish"
Add an Action
Click on the "+ Add" button under "Action Handler", and then select "Data to Cassandra" in the pop-up dialog window.
Fill in the parameters required by the action:
Two parameters are required by action "Data to Cassandra":
1). SQL template. SQL template is the SQL command you'd like to run when the action is triggered. In this example, we'll insert a message into Cassandra, so type in the following SQL template:
insert into t_mqtt_msg(msgid, topic, qos, payload, retain, arrived) values (${id}, ${topic}, ${qos}, ${payload}, ${retain}, ${timestamp})
Before data is inserted into the table, placeholders like ${key} will be replaced by the corresponding values.
2). Bind a resource to the action. Since the dropdown list "Resource" is empty for now, we create a new resource by clicking on the "New Resource" to the top right, and then select "Cassandra":
Configure the resource:
Set "Cassandra Keyspace" to "test", "Cassandra Username" to "root", "Cassandra Password" to "public", and keep all other configs as default, and click on the "Testing Connection" button to make sure the connection can be created successfully.
Then click on the "Create" button.
Back to the "Actions" dialog, and then click on the "Confirm" button.
Back to the creating rule page, then click on "Create" button. The rule we created will be show in the rule list:
Test the Rule
We have finished, test the rule by sending an MQTT message to EMQX:
> Topic: "t/cass"
> QoS: 1
> Retained: true
> Payload: "hello"
Then inspect the Cassandra table, verify a new record has been inserted:
And from the rule list, verify that the "Matched" column has increased to 1: