# PostgreSQL Backend
TIP
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to PostgreSQL to setup Save data to PostgreSQL in rule engine.
Config file: emqx_backend_pgsql.conf
# Configure PostgreSQL Server
TIP
Support PostgreSQL 13 and below versions
Connection pool of multiple PostgreSQL servers is supported:
## Pgsql Server backend.pgsql.pool1.server = 127.0.0.1:5432 ## Pgsql Pool Size backend.pgsql.pool1.pool_size = 8 ## Pgsql Username backend.pgsql.pool1.username = root ## Pgsql Password backend.pgsql.pool1.password = public ## Pgsql Database backend.pgsql.pool1.database = mqtt ## Pgsql Ssl backend.pgsql.pool1.ssl = false ## Max number of fetch offline messages. Without count limit if infinity ## backend.pgsql.max_returned_count = 500 ## Time Range. Without time limit if infinity ## d - day ## h - hour ## m - minute ## s - second ## backend.pgsql.time_range = 2h
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Configure PostgreSQL Persistence Hooks
## Client Connected Record backend.pgsql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"} ## Subscribe Lookup Record backend.pgsql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"} ## Client DisConnected Record backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"} ## Lookup Unread Message QOS > 0 backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"} ## Lookup Retain Message backend.pgsql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"} ## Store Publish Message QOS > 0 backend.pgsql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"} ## Store Retain Message backend.pgsql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"} ## Delete Retain Message backend.pgsql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"} ## Store Ack backend.pgsql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"} ## Get offline messages ### "offline_opts": Get configuration for offline messages ### max_returned_count: Maximum number of offline messages get at a time ### time_range: Get only messages in the current time range ## backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"} ## If you need to store Qos0 messages, you can enable the following configuration ## Tip: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice ## backend.pgsql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
Copied!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Description of PostgreSQL Persistence Hooks
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
# SQL Parameters Description
hook | Parameters | Example (${name} represents available parameter) |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
# Configure 'action' with SQL
PostgreSQL backend supports SQL in 'action':
## After a client is connected to the EMQX server, it executes a SQL command (multiple command also supported) backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
Copied!
2
# Create PostgreSQL DB
createdb mqtt -E UTF8 -e
Copied!
# Import PostgreSQL DB & Table Schema
\i etc/sql/emqx_backend_pgsql.sql
Copied!
TIP
DB name is free of choice
# PostgreSQL Client Connection Table
mqtt_client stores client connection states:
CREATE TABLE mqtt_client( id SERIAL primary key, clientid character varying(100), state integer, node character varying(100), online_at timestamp, offline_at timestamp, created timestamp without time zone, UNIQUE (clientid) );
Copied!
2
3
4
5
6
7
8
9
10
Query a client's connection state:
select * from mqtt_client where clientid = ${clientid};
Copied!
E.g., if client 'test' is online:
select * from mqtt_client where clientid = 'test'; id | clientid | state | node | online_at | offline_at | created ----+----------+-------+----------------+---------------------+---------------------+--------------------- 1 | test | 1 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL | 2016-12-24 09:40:22 (1 rows)
Copied!
2
3
4
5
6
Client 'test' is offline:
select * from mqtt_client where clientid = 'test'; id | clientid | state | nod | online_at | offline_at | created ----+----------+-------+----------------+---------------------+---------------------+--------------------- 1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22 (1 rows)
Copied!
2
3
4
5
6
# PostgreSQL Subscription Table
mqtt_sub stores subscriptions of clients:
CREATE TABLE mqtt_sub( id SERIAL primary key, clientid character varying(100), topic character varying(200), qos integer, created timestamp without time zone, UNIQUE (clientid, topic) );
Copied!
2
3
4
5
6
7
8
E.g., client 'test' subscribes to topic 'test_topic1' and 'test_topic2':
insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic1', 1); insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic2', 2);
Copied!
2
Query subscription of a client:
select * from mqtt_sub where clientid = ${clientid};
Copied!
Query subscription of client 'test':
select * from mqtt_sub where clientid = 'test'; id | clientId | topic | qos | created ----+--------------+-------------+------+--------------------- 1 | test | test_topic1 | 1 | 2016-12-24 17:09:05 2 | test | test_topic2 | 2 | 2016-12-24 17:12:51 (2 rows)
Copied!
2
3
4
5
6
7
# PostgreSQL Message Table
mqtt_msg stores MQTT messages:
CREATE TABLE mqtt_msg ( id SERIAL primary key, msgid character varying(60), sender character varying(100), topic character varying(200), qos integer, retain integer, payload text, arrived timestamp without time zone );
Copied!
2
3
4
5
6
7
8
9
10
Query messages published by a client:
select * from mqtt_msg where sender = ${clientid};
Copied!
Query messages published by client 'test':
select * from mqtt_msg where sender = 'test'; id | msgid | topic | sender | node | qos | retain | payload | arrived ----+-------------------------------+----------+--------+------+-----+--------+---------+--------------------- 1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12 2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45 (2 rows)
Copied!
2
3
4
5
6
7
# PostgreSQL Retained Message Table
mqtt_retain stores retained messages:
CREATE TABLE mqtt_retain( id SERIAL primary key, topic character varying(200), msgid character varying(60), sender character varying(100), qos integer, payload text, arrived timestamp without time zone, UNIQUE (topic) );
Copied!
2
3
4
5
6
7
8
9
10
Query retained messages:
select * from mqtt_retain where topic = ${topic};
Copied!
Query retained messages with topic 'retain':
select * from mqtt_retain where topic = 'retain'; id | topic | msgid | sender | node | qos | payload | arrived ----+----------+-------------------------------+---------+------+------+---------+--------------------- 1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18 (1 rows)
Copied!
2
3
4
5
6
# PostgreSQL Acknowledgement Table
mqtt_acked stores acknowledgements from the clients:
CREATE TABLE mqtt_acked ( id SERIAL primary key, clientid character varying(100), topic character varying(100), mid integer, created timestamp without time zone, UNIQUE (clientid, topic) );
Copied!
2
3
4
5
6
7
8
# Enable PostgreSQL Backend
./bin/emqx_ctl plugins load emqx_backend_pgsql
Copied!
- Configure PostgreSQL Server
- Configure PostgreSQL Persistence Hooks
- Description of PostgreSQL Persistence Hooks
- SQL Parameters Description
- Configure 'action' with SQL
- Create PostgreSQL DB
- Import PostgreSQL DB & Table Schema
- PostgreSQL Client Connection Table
- PostgreSQL Subscription Table
- PostgreSQL Message Table
- PostgreSQL Retained Message Table
- PostgreSQL Acknowledgement Table
- Enable PostgreSQL Backend