Skip to content

CassandraへのMQTTデータ取り込み

Apache Cassandraは、大規模データセットの処理と高スループットアプリケーションの構築を目的とした、人気のあるオープンソースの分散型NoSQLデータベース管理システムです。EMQX CloudとApache Cassandraの統合により、Cassandraデータベースへのメッセージやイベントの保存が可能となり、時系列データの保存、デバイス登録および管理、リアルタイムデータ分析などの機能を実現します。

本ページでは、EMQX CloudとCassandra間のデータ統合について、実践的な作成および検証手順を含めて包括的に紹介します。

TIP

現在の実装はCassandra v3.xのみ対応しており、v4.xには未対応です。

動作概要

Cassandraデータ統合はEMQX Cloudに標準搭載された機能であり、EMQX Cloudのデバイス接続およびメッセージ送受信機能とCassandraの強力なデータ保存機能を組み合わせています。組み込みのルールエンジンコンポーネントにより、EMQX CloudからCassandraへのデータ取り込みが簡素化され、複雑なコーディングを不要にします。

以下の図は、EMQXとCassandra間の典型的なデータ統合アーキテクチャを示しています。

EMQX Cloud Integration Cassandra

CassandraへのMQTTデータ取り込みは以下のように動作します:

  1. メッセージのパブリッシュと受信:接続された車両、IIoTシステム、エネルギー管理プラットフォームなどのIoTデバイスは、MQTTプロトコルを通じてEMQX Cloudに正常に接続し、特定のトピックにMQTTメッセージをパブリッシュします。EMQX Cloudがこれらのメッセージを受信すると、ルールエンジン内でマッチング処理が開始されます。
  2. メッセージデータの処理:メッセージが到着するとルールエンジンを通過し、EMQX Cloudで定義されたルールに従って処理されます。ルールは事前定義された条件に基づき、どのメッセージをCassandraへルーティングするかを判断します。ペイロード変換が指定されている場合は、データ形式の変換、特定情報のフィルタリング、追加コンテキストによるペイロードの強化などが適用されます。
  3. Cassandraへのデータ取り込み:ルールエンジンがCassandraへの保存対象メッセージを特定すると、Cassandraへメッセージを転送するアクションをトリガーします。処理済みデータはCassandraデータベースのコレクションにシームレスに書き込まれます。
  4. データの保存と活用:データがCassandraに保存されることで、企業はそのクエリ機能を活用して様々なユースケースに利用できます。例えば、接続車両の分野では、保存されたデータを用いて車両の状態管理、リアルタイム指標に基づくルート最適化、資産追跡などが可能です。同様にIIoT環境では、機械の状態監視、メンテナンス予測、生産スケジュールの最適化に利用されます。

特長とメリット

Cassandraとのデータ統合は、効率的なデータ送信、保存、活用を実現するための多彩な特長とメリットを備えています:

  • 大規模時系列データ保存:EMQX Cloudは大量のデバイス接続とメッセージ送信を処理可能です。Cassandraの高いスケーラビリティと分散ストレージ機能を活用し、大規模データセット(時系列データを含む)の保存・管理を実現し、時間範囲に基づくクエリや集計操作をサポートします。
  • リアルタイムデータストリーミング:EMQX Cloudはリアルタイムデータストリーム処理に最適化されており、ソースシステムからCassandraへの効率的かつ信頼性の高いデータ送信を保証します。即時の洞察とアクションが必要なユースケースに適しています。
  • 高可用性の保証:EMQXとCassandraの両方がクラスター機能を提供します。組み合わせて使用することで、デバイス接続とデータを複数サーバーに分散可能です。ノード障害発生時には他の利用可能なノードに自動的に切り替わり、高いスケーラビリティとフォールトトレランスを実現します。
  • 柔軟なデータ変換:EMQX Cloudは強力なSQLベースのルールエンジンを提供し、Cassandraに保存する前にデータを前処理できます。フィルタリング、ルーティング、集計、強化など多様なデータ変換機能をサポートし、組織のニーズに応じたデータ整形が可能です。
  • 柔軟なデータモデル:Cassandraはカラムベースのデータモデルを採用し、柔軟なスキーマと動的なカラム追加をサポートします。構造化されたデバイスイベントやメッセージデータの保存・管理に適しており、多様なMQTTメッセージデータを容易に格納できます。

はじめる前に

このセクションでは、TimescaleDBデータブリッジ作成の前準備として、Cassandraサーバーのインストール方法やキー スペースおよびテーブルの作成方法について説明します。

前提条件

ネットワーク設定

データ統合を構成する前に、EMQX Cloudのデプロイメントを作成し、EMQX Cloudと対象サービス間のネットワーク接続を確立していることを確認してください。

  • Dedicated Flexデプロイメントの場合

    EMQX CloudのVPCと対象サービスのVPC間でVPCピアリング接続を作成します。ピアリング接続が確立されると、EMQX Cloudは対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    パブリックIP経由でのアクセスが必要な場合は、NATゲートウェイを構成してアウトバウンド接続を有効にしてください。

  • BYOC(Bring Your Own Cloud)デプロイメントの場合

    BYOCデプロイメントが稼働しているVPCと対象サービスをホストするVPC間でVPCピアリング接続を作成します。ピアリングが確立されると、対象サービスのプライベートIPアドレスを介してアクセス可能になります。

    対象サービスにパブリックIP経由でアクセスする必要がある場合は、クラウドプロバイダーのコンソールを使用してBYOC VPCにNATゲートウェイを構成してください。

Cassandraサーバーのインストール

以下のコマンドでDockerを使って簡単にCassandraサービスを起動します:

bash
docker run --name cassa --rm -p 9042:9042 cassandra:3.11.14

キースペースとテーブルの作成

Cassandra用のデータブリッジを作成する前に、キースペースとテーブルを作成する必要があります。

  1. mqttという名前のキースペースを作成します:
bash
docker exec -it cassa cqlsh "-e CREATE KEYSPACE mqtt WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}"
  1. Cassandraにtemp_humテーブルを作成します:
bash
docker exec -it cassa cqlsh "-e \
    CREATE TABLE mqtt.temp_hum( \
        msgid text, \
        temp text, \
        hum text, \
        arrived timestamp, \
        PRIMARY KEY(msgid));"

Cassandraコネクターの作成

データ統合ルールを作成する前に、まずCassandraサーバーにアクセスするためのCassandraコネクターを作成する必要があります。

  1. デプロイメントに移動し、左側のナビゲーションメニューからデータ統合をクリックします。

  2. 初めてコネクターを作成する場合は、データ永続化カテゴリの中からCassandraを選択します。既にコネクターを作成済みの場合は、新しいコネクターを選択し、続いてデータ永続化カテゴリの中からCassandraを選択します。

  3. 接続情報を入力します:

    • Servers:サーバーのIPアドレスとポート
    • Keyspacemqtt
    • その他はデフォルトのままにします。
    • 暗号化接続を確立したい場合は、TLSを有効にするのトグルスイッチをオンにします。
  4. 詳細設定(任意)

  5. テストボタンをクリックし、Cassandraサービスにアクセスできることを確認します。成功すると成功メッセージが表示されます。

  6. 新規作成ボタンをクリックして作成を完了します。

ルールの作成

次に、書き込むデータを指定し、処理済みデータをCassandraへ転送するアクションをルールに追加するためのルールを作成します。

  1. ルールエリアで新規ルールをクリックするか、作成したコネクターのアクション列にある新規ルールアイコンをクリックします。

  2. SQLエディターにルールのマッチングSQL文を入力します。以下の例では、メッセージが報告された時間arrived、クライアントID、ペイロードをtemp_hum/emqxトピックから読み取ります。また、このトピックから温度と湿度を取得しています。

    sql
      SELECT
        id as msgid,
        payload.temp as temp,
        payload.hum as hum,
        timestamp as arrived
      FROM
        "temp_hum/emqx"

    TIP

    初心者の方は、SQL例をクリックし、試してみるでSQLルールを学習・テストできます。

  3. 次へをクリックしてアクションを追加します。

  4. コネクターのドロップダウンから先ほど作成したコネクターを選択します。

  5. CQLテンプレートを設定し、msgidtemphumarrivedをCassandraに保存します。このテンプレートはCassandra Query Languageで実行され、サンプルコードは以下の通りです:

    sql
      INSERT INTO temp_hum(msgid, temp, hum, arrived)
      VALUES (
        ${msgid},
        ${temp},
        ${hum},
        ${arrived}
      )
  6. 詳細設定(任意)

  7. 確定ボタンをクリックしてルール作成を完了します。

  8. 新規ルール成功のポップアップでルールに戻るをクリックし、データ統合設定の全工程が完了します。

ルールのテスト

MQTTXを使って温度・湿度データの報告をシミュレートすることを推奨しますが、他の任意のクライアントでも構いません。

  1. MQTTXを使ってデプロイメントに接続し、以下のトピックにメッセージを送信します。

    • トピック:temp_hum/emqx

    • ペイロード:

      json
      {
        "temp": "27.5",
        "hum": "41.8"
      }
  2. 以下のコマンドでメッセージがCassandraに保存されているか確認します:

    bash
     $ docker exec -it cassa cqlsh "-e SELECT * FROM mqtt.temp_hum;"
    
     msgid                            | arrived                         | hum  | temp
     ----------------------------------+---------------------------------+------+------
     00061488D7FBFE8F2C770000467D0011 | 2024-03-26 04:37:11.987000+0000 | 41.8 | 27.5
    
     (1 rows)
  3. コンソールで運用データを確認します。ルール一覧でルールIDをクリックすると、ルールの統計情報およびそのルールに紐づく全アクションの統計情報を閲覧できます。