CassandraへのMQTTデータ取り込み
Apache Cassandra は、大規模データセットの処理と高スループットアプリケーションの構築を目的とした、人気のあるオープンソースの分散型NoSQLデータベース管理システムです。EMQXとApache Cassandraの統合により、メッセージやイベントをCassandraデータベースに保存できるようになり、時系列データの保存、デバイス登録および管理、リアルタイムデータ分析などの機能を実現します。
本ページでは、EMQXとCassandra間のデータ統合について包括的に紹介し、データ統合の作成および検証に関する実践的な手順を提供します。
TIP
現在の実装はCassandra v3.xのみ対応しており、v4.xには未対応です。
動作概要
Cassandraデータ統合はEMQXの標準機能であり、EMQXのデバイス接続およびメッセージ送信機能とCassandraの強力なデータ保存機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQXからCassandraへのデータ取り込みが簡素化され、複雑なコーディングを必要としません。
以下の図は、EMQXとCassandra間のデータ統合の典型的なアーキテクチャを示しています。

CassandraへのMQTTデータ取り込みは以下のように動作します:
- メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQXに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQXはこれらのメッセージを受信すると、ルールエンジン内でマッチング処理を開始します。
- メッセージデータの処理:メッセージが到着すると、ルールエンジンを通過し、EMQXで定義されたルールに従って処理されます。ルールは事前定義された条件に基づき、Cassandraにルーティングすべきメッセージを判別します。ペイロードの変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などの処理が適用されます。
- Cassandraへのデータ取り込み:ルールエンジンがCassandraへの保存対象メッセージを特定すると、Cassandraへの転送アクションがトリガーされます。処理済みデータはCassandraデータベースのコレクションにシームレスに書き込まれます。
- データの保存と活用:データがCassandraに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに対応できます。例えば、コネクテッドビークル分野では、車両の状態管理、リアルタイム指標に基づくルート最適化、資産追跡に利用できます。IIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化などに活用されます。
特長とメリット
Cassandraとのデータ統合は、効率的なデータ伝送、保存、活用を実現するための多彩な特長とメリットを提供します:
- 大規模時系列データの保存:EMQXは大量のデバイス接続とメッセージ送信を処理可能です。Cassandraの高いスケーラビリティと分散ストレージ機能を活用し、大規模データセット(時系列データを含む)の保存と管理を実現し、時間範囲に基づくクエリや集計操作をサポートします。
- リアルタイムデータストリーミング:EMQXはリアルタイムデータストリームの処理に最適化されており、ソースシステムからCassandraへの効率的かつ信頼性の高いデータ伝送を保証します。即時の洞察とアクションが必要なユースケースに理想的です。
- 高可用性の保証:EMQXとCassandraは共にクラスター機能を提供します。組み合わせて使用することで、デバイス接続とデータを複数のサーバーに分散可能です。ノード障害時には自動的に他の利用可能なノードにフェイルオーバーし、高いスケーラビリティとフォールトトレランスを実現します。
- 柔軟なデータ変換:EMQXの強力なSQLベースのルールエンジンにより、Cassandraに保存する前にデータを前処理できます。フィルタリング、ルーティング、集計、エンリッチメントなど多様な変換機能をサポートし、ニーズに応じたデータ整形が可能です。
- 柔軟なデータモデル:Cassandraはカラムベースのデータモデルを採用し、柔軟なスキーマとカラムの動的追加をサポートします。構造化されたデバイスイベントやメッセージデータの保存・管理に適しており、多様なMQTTメッセージデータを容易に格納できます。
はじめる前に
このセクションでは、TimescaleDBデータブリッジの作成前に必要な準備事項を説明します。Cassandraサーバーのインストール方法やキー スペースおよびテーブルの作成方法も含みます。
前提条件
Cassandraサーバーのインストール
以下のコマンドでDockerを使い、シンプルなCassandraサービスを起動します:
docker run --name cassa --rm -p 9042:9042 cassandra:3.11.14キースペースとテーブルの作成
Cassandra用のデータブリッジを作成する前に、キースペースとテーブルを作成する必要があります。
mqttという名前のキースペースを作成します:
docker exec -it cassa cqlsh "-e CREATE KEYSPACE mqtt WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}"- Cassandraに
mqtt_msgテーブルを作成します:
docker exec -it cassa cqlsh "-e \
CREATE TABLE mqtt.mqtt_msg( \
msgid text, \
topic text, \
qos int, \
payload text, \
arrived timestamp, \
PRIMARY KEY(msgid, topic));"コネクターの作成
このセクションでは、SinkをCassandraサーバーに接続するためのコネクター作成方法を説明します。
以下の手順は、EMQXとCassandraをローカルマシンで実行していることを前提としています。リモート環境で実行している場合は、設定を適宜調整してください。
- EMQXダッシュボードに入り、Integration -> Connectorsをクリックします。
- ページ右上のCreateをクリックします。
- Create ConnectorページでCassandraを選択し、Nextをクリックします。
- Configurationステップで以下を設定します:
- コネクター名を入力します。英数字の組み合わせで、例:
my_cassandra - Serversに
127.0.0.1:9042、Keyspaceにmqttを入力し、その他はデフォルトのままにします。 - TLSを有効にするかどうかを選択します。TLS接続オプションの詳細は外部リソースアクセスのTLS有効化を参照してください。
- コネクター名を入力します。英数字の組み合わせで、例:
- Createをクリックする前に、Test ConnectivityをクリックしてコネクターがCassandraサーバーに接続できるかテストできます。
- ページ下部のCreateボタンをクリックしてコネクター作成を完了します。ポップアップダイアログでBack to Connector Listをクリックするか、Create RuleをクリックしてルールとSinkの作成を続行できます。詳細はCreate a Rule with Cassandra Sinkを参照してください。
Cassandra Sinkを用いたルールの作成
このセクションでは、Dashboard上でソースMQTTトピックt/#のメッセージを処理し、処理結果をCassandraのmqtt_msgテーブルに保存するルールの作成方法を説明します。
EMQXダッシュボードにアクセスし、Integration -> Rulesをクリックします。
ページ右上のCreateをクリックします。
ルールIDに
my_ruleを入力し、SQL Editorでルールを設定します。例えば、トピックt/#配下のMQTTメッセージをCassandraに転送したい場合、以下のSQL構文を使用します。注意:独自のSQL構文を指定する場合は、Sinkで必要なすべてのフィールドが
SELECT部分に含まれていることを確認してください。sqlSELECT * FROM "t/#"注意:初心者の方はSQL Examplesをクリックし、Enable TestでSQLルールの学習とテストが可能です。
+ Add Actionボタンをクリックし、ルールにトリガーされるアクションを定義します。このアクションにより、EMQXはルールで処理したデータをCassandraに送信します。
Type of Actionドロップダウンから
Cassandraを選択します。ActionドロップダウンはデフォルトのCreate Actionのままにします。既に作成済みのSinkを選択することも可能ですが、ここでは新しいSinkを作成します。Sinkの名前を入力します。英数字の組み合わせで指定してください。
Connectorドロップダウンから先ほど作成した
my_cassandraを選択します。ドロップダウン横のボタンから新しいコネクターを作成することも可能です。設定パラメータの詳細はCreate a Connectorを参照してください。Cassandraに
topic、id、clientid、qos、payload、timestampを保存するためのCQLテンプレートを設定します。このテンプレートはCassandra Query Languageで実行され、サンプルコードは以下の通りです:sqlinsert into mqtt_msg(msgid, topic, qos, payload, arrived) values (${id}, ${topic}, ${qos}, ${payload}, ${timestamp})フォールバックアクション(任意):メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細はFallback Actionsを参照してください。
詳細設定(任意):必要に応じて**同期(sync)または非同期(async)**クエリモードを選択します。詳細はFeatures of Sinkを参照してください。
CreateボタンをクリックしてSink設定を完了します。Create Ruleページに戻ると、Action Outputsタブに新しいSinkが表示されます。
Create Ruleページで設定内容を確認し、Createボタンをクリックしてルールを生成します。作成したルールはルール一覧に表示され、statusは
connectedとなります。
これでルールの作成が完了し、Ruleページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいCassandra Sinkが確認できます。
また、Integration -> Flow Designerをクリックするとトポロジーを確認でき、トピックt/#配下のメッセージがルールmy_ruleで解析され、Cassandraに送信・保存されていることがわかります。
ルールのテスト
MQTTXを使ってトピックt/1にメッセージを送信します:
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Cassandra" }'ルールとSinkの稼働状況を確認し、統計カウントが増加していることを確認してください。
以下のコマンドでメッセージがCassandraに保存されているか確認します:
docker exec -it cassa cqlsh "-e SELECT * FROM mqtt.mqtt_msg;"