# MQTTデータをHTTPサーバーに取り込む

HTTPサーバーデータ統合は、EMQXを外部のHTTPサービスと迅速に連携させる方法を提供します。リクエストメソッドやリクエストデータ形式の柔軟な設定をサポートし、HTTPSによる安全な通信や認証機構も備えています。クライアントのメッセージやイベントデータをリアルタイムかつ効率的に柔軟に送信でき、IoTデバイスの状態通知やアラート通知、データ連携などのシナリオを実現します。

本ページでは、HTTPサーバーとのデータ統合の機能と特徴を詳しく解説し、HTTPサーバーデータ統合の設定方法について実践的なガイドを提供します。

:::tip

HTTPサービスとの連携が必要で、ルールによるデータ処理が不要な場合は、よりシンプルで使いやすい[Webhook](./webhook.md)の利用を推奨します。

:::

## 動作の仕組み

HTTPサーバーデータ統合はEMQXの標準機能で、簡単な設定で外部HTTPサービスと連携できます。HTTPサービス側では、好みのプログラミング言語やフレームワークでコードを書き、柔軟かつ複雑なデータ処理ロジックを実装可能です。

<img src="./assets/emqx-integration-http.jpg" alt="HTTPサーバーとのEMQX統合" style="zoom:67%;" />

EMQXはルールエンジンとSinkを通じてデバイスのイベントやデータをHTTPサーバーに転送します。ワークフローは以下の通りです：

1. **デバイスがEMQXに接続**：IoTデバイスが正常に接続されると、デバイスIDや送信元IPアドレスなどの属性を含むオンラインイベントが発生します。
2. **デバイスがメッセージをパブリッシュ**：デバイスは特定のトピックを通じてテレメトリや状態データをパブリッシュし、ルールエンジンをトリガーします。
3. **ルールエンジンがメッセージを処理**：組み込みのルールエンジンがトピックマッチングに基づき、特定のソースからのMQTTメッセージやイベントを処理します。ルールは対応する処理を行い、データ形式の変換や特定情報のフィルタリング、コンテキスト情報によるメッセージの付加などを実施します。
4. **HTTPサーバーへの転送**：設定されたルールが処理済みのメッセージやイベントをHTTPサーバーに転送するアクションをトリガーします。ユーザーはルール処理結果からデータを抽出し、リクエストヘッダーやボディ、URLを動的に構築でき、外部サービスとの柔軟な連携を実現します。

イベントやメッセージデータがHTTPサーバーに送信された後は、以下のような柔軟な処理が可能です：

- デバイス状態の更新やイベントログの実装により、デバイス管理システムの構築に活用。
- メッセージデータをデータベースに書き込み、軽量なデータストレージ機能を実現。
- SQLルールでフィルタリングした異常データをHTTPサービスで直接アラート通知システムに連携し、デバイス異常監視を実現。

## 特徴とメリット

EMQXのHTTPサーバー統合を利用することで、以下のようなメリットがあります：

- **より多くの下流システムへのデータ配信拡張**：HTTPサービスにより、MQTTデータを分析プラットフォームやクラウドサービスなど多様な外部システムとシームレスに連携でき、複数システム間でのデータ分配を容易にします。
- **リアルタイム応答と業務プロセスのトリガー**：HTTPサービスを介して外部システムがMQTTデータをリアルタイムに受け取り、業務プロセスをトリガー可能。例えばアラートデータの受信で業務フローを即座に開始できます。
- **カスタムデータ処理**：外部システム側で受信データの二次処理が可能で、EMQXの機能に制約されない複雑な業務ロジックを実装できます。
- **疎結合な連携**：HTTPサービスはシンプルなHTTPインターフェースを使うため、システム連携を疎結合に実現します。

まとめると、HTTPサービスはリアルタイムかつ柔軟でカスタマイズ可能なデータ統合機能を提供し、多様なアプリケーション開発ニーズに応えます。

## はじめる前に

このセクションでは、HTTPサーバーデータ統合を作成する前に必要な準備について説明します。簡単なHTTPサーバーのセットアップも含みます。

### 前提条件

- EMQXのデータ統合[ルール](./rules.md)の知識
- [データ統合](./data-bridges.md)の知識

### 簡単なHTTPサーバーのセットアップ

1. Pythonを使って簡単なHTTPサービスを構築します。このHTTPサービスは`POST /`リクエストを受け取り、リクエスト内容を表示した後に`200 OK`を返します：

```bash
from flask import Flask, json, request

api = Flask(__name__)

@api.route('/', methods=['POST'])
def print_messages():
  reply= {"result": "ok", "message": "success"}
  print("got post request: ", request.get_data())
  return json.dumps(reply), 200

if __name__ == '__main__':
  api.run()
```

2. 上記コードを`http_server.py`というファイル名で保存し、以下のコマンドでサーバーを起動します：

```shell
pip install flask

python3 http_server.py
```

## コネクターの作成

このセクションでは、SinkをHTTPサーバーに接続するためのHTTPサーバーコネクターの設定方法を説明します。

1. EMQXダッシュボードにアクセスし、**Integration** -> **Connector**をクリックします。

2. ページ右上の**Create**をクリックし、**HTTP Server**を選択して**Next**をクリックします。

3. コネクター名を入力します。名前は英数字の組み合わせとし、例として`my_httpserver`などが適切です。

4. **URL**に`http://localhost:5000`を設定します。その他の項目はデフォルトのままで問題ありません。

5. 詳細設定（任意）：詳細は[Sinkの機能](./data-bridges.md#features-of-sink)を参照してください。

6. **Create**をクリックする前に、**Test Connectivity**をクリックしてコネクターがHTTPサーバーに接続できるか確認できます。

7. **Create**をクリックしてコネクターの作成を完了します。

これでHTTPサーバーコネクターが作成されました。次に、ルールとSinkを作成してHTTPサーバーに書き込むデータを指定します。

## HTTPサーバーSink付きルールの作成

このセクションでは、HTTPサーバーSinkを追加したルールの作成方法を説明します。

1. EMQXダッシュボードにアクセスし、**Integration** -> **Rules**をクリックします。

2. ページ右上の**Create**をクリックします。

3. ルールIDに`my_rule`を入力し、**SQL Editor**でルールを設定します。

4. 以下のSQL文を例として**SQL Editor**に入力します。これはトピック`t/#`配下のMQTTメッセージをHTTPサーバーに保存することを意味します。

   注意：独自のSQL構文を指定する場合は、Sinkが必要とするすべてのフィールドを`SELECT`句に含めていることを確認してください。

   ```bash
   SELECT
     *
   FROM
     "t/#"
   ```

5. **+ Add Action**ボタンをクリックし、ルールでトリガーされるアクションを定義します。**Type of Action**のドロップダウンリストから`HTTP Server`を選択し、EMQXがルールで処理したデータをHTTPサーバーに送信するようにします。

   **Action**のドロップダウンは`Create Action`のままにするか、既存のHTTPサーバーアクションを選択しても構いません。この例では新しいSinkを作成してルールに追加します。

6. **Name**と**Description**テキストボックスにSinkの名前と説明を入力します。

7. **Connector**のドロップダウンから先ほど作成した`my-httpserver`を選択します。新しいコネクターを作成する場合は、ドロップダウン横のボタンをクリックしてください。設定パラメータの詳細は[コネクターの作成](#コネクターの作成)を参照してください。

8. **URL**に`http://localhost:5000`を設定し、**Method**ドロップダウンから`POST`を選択します。その他の項目はデフォルトのままで問題ありません。

9. **フォールバックアクション（任意）**：メッセージ配信失敗時の信頼性向上のため、1つ以上のフォールバックアクションを定義できます。これらはプライマリSinkがメッセージ処理に失敗した場合にトリガーされます。詳細は[フォールバックアクション](./data-bridges.md#fallback-actions)を参照してください。

10. **Create**ボタンをクリックしてSinkの設定を完了します。**Create Rule**ページに戻ると、**Action Outputs**タブに新しいSinkが表示されます。

11. **Create Rule**ページで設定内容を確認し、**Create**ボタンをクリックしてルールを生成します。

これでルールが正常に作成され、**Rule**ページに新しいルールが表示されます。**Actions(Sink)**タブをクリックすると、新しいHTTPサーバーSinkが確認できます。

また、**Integration** -> **Flow Designer**をクリックするとトポロジーが表示され、トピック`t/#`配下のメッセージがルール`my_rule`で解析されHTTPサーバーに送信・保存されていることが確認できます。

## ルールのテスト

MQTTXを使ってトピック`t/1`にメッセージを送信し、オンライン／オフラインイベントをトリガーします。

```bash
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello HTTP Server" }'
```

**Rule**ページでルール名をクリックし、統計情報を確認します。Sinkの稼働状況をチェックし、新しい受信メッセージと送信メッセージがそれぞれ1件ずつあるはずです。

メッセージがHTTPサーバーに送信されているかを確認します：

```
python3 http_server.py
 * Serving Flask app 'http_server' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:5000 (Press CTRL+C to quit)

got post request:  b'hello HTTP Server'
```
