Skip to content

MQTT 插件开发示例

北向应用开发主要包含以下几个部分,最底层的是指令处理层开发,最外层的是应用层开发。

模块文件说明
指令处理层开发command.c command.h common.h heartbeat.c heartbeat.h read_write.c read_write.h插件对指令的解析
应用层开发mqtt_plugin.c mqtt_plugin.h mqtt_util.c mqtt_util.h插件主题框架的实现
插件设置文件mqtt.json插件设置文件的定义

常量说明

常量说明
TOPIC_TYPE_READ主题类型为读
TOPIC_TYPE_WRITE主题类型为写
TOPIC_TYPE_UPLOAD主题类型为上传
TOPIC_TYPE_HEARTBEAT主题类型为心跳包

第一步,指令处理层开发

MQTT 目前实现了上传数据、心跳数据、读 Tags 和写 Tags 的接口。command.c 中定义了 mqtt 请求对应响应的具体处理, heartbeat.c 和 read_write.c 为 command.c 提供需要的函数实现。

函数说明
command_response_handlemqtt 响应处理
command_read_once_response读主题的响应处理
command_read_periodic_response上传主题的响应处理
command_write_response写主题的响应处理
command_heartbeat_response心跳数据的响应处理

第二步,应用层开发

mqtt_util.c 和 mqtt_util.h 文件定义 mqtt_plugin.c 文件中使用的具体函数实现。

南北向驱动层开发中都需要先构建 neu_plugin_intf_funs_t 的结构体,并实现结构体中的每个元素的功能。

c
static const neu_plugin_intf_funs_t plugin_intf_funs = {
    .open    = mqtt_plugin_open,
    .close   = mqtt_plugin_close,
    .init    = mqtt_plugin_init,
    .uninit  = mqtt_plugin_uninit,
    .start   = mqtt_plugin_start,
    .stop    = mqtt_plugin_stop,
    .setting = mqtt_plugin_config,
    .request = mqtt_plugin_request,
};

.open

调用 mqtt_plugin_open 函数,基于 plugin 创建 node 时 neuron 第一个调用的函数,创建插件自己定义的结构体 struct neu_plugin。该结构体在 mqtt_plugin.h 中定义,需要注意的是结构体中的第一个成员一定是 neu_plugin_common_t common,其他成员可根据驱动的具体实现增加。

c
static neu_plugin_t *mqtt_plugin_open(void)
{
    neu_plugin_t *plugin = (neu_plugin_t *) calloc(1, sizeof(neu_plugin_t));

    neu_plugin_common_init(&plugin->common);
	
    return plugin;
}

.close

调用 mqtt_plugin_close 函数,删除 node 时,neuron 调用的最后一个函数,用于释放由 open 创建的 neu_plugin_t。

c
static int mqtt_plugin_close(neu_plugin_t *plugin)
{
    free(plugin);

    return NEU_ERR_SUCCESS;
}

.init

调用 mqtt_plugin_init 函数,在创建 node 时,neuron 调用完 open 后,紧接着调用的函数。此函数主要做插件内需要初始化的一些资源,mqtt 插件中主要初始化 mqtt 的运行状态及配置。

c
static int mqtt_plugin_init(neu_plugin_t *plugin)
{
    assert(NULL != plugin);

    plugin->routine = NULL;
    plugin->running = false;
    plugin->config  = NULL;
    plugin_cache_init(plugin);

    const char *name = neu_plugin_module.module_name;
    plog_info(plugin, "initialize plugin: %s", name);

    return NEU_ERR_SUCCESS;
}

.uninit

调用 mqtt_plugin_uninit 函数,删除 node 时,neuron 首先调用的函数,此函数主要释放一些在 init 中申请以及初始化的资源。

c
static int mqtt_plugin_uninit(neu_plugin_t *plugin)
{
    assert(NULL != plugin);

    plugin_stop_running(plugin);
    plugin_cache_uninit(plugin);
    plugin_config_free(plugin);

    const char *name = neu_plugin_module.module_name;
    plog_info(plugin, "uninitialize plugin: %s", name);
    return NEU_ERR_SUCCESS;
}

.start

调用 mqtt_plugin_start 函数,用户在 neuron node 页面,点击开始时,neuron 会调用此函数,通知插件开始运行,以及开始连接设备等,如果配置不正确,将会返回节点设置无效的错误。

mqtt-start

c
static int mqtt_plugin_start(neu_plugin_t *plugin)
{
    assert(NULL != plugin);

    int rc = plguin_start_running(plugin);
    if (0 != rc) {
        return NEU_ERR_NODE_SETTING_INVALID;
    }

    return NEU_ERR_SUCCESS;
}

.stop

调用 mqtt_plugin_stop 函数,用户在 neuron node 页面,点击停止时,neuron 会调用此函数,stop 通知插件停止运行,关闭插件与 neuron 之间的连接。

mqtt-stop

c
static int mqtt_plugin_stop(neu_plugin_t *plugin)
{
    assert(NULL != plugin);

    plugin_stop_running(plugin);
    return NEU_ERR_SUCCESS;
}

.setting

调用 mqtt_plugin_config 函数,用户在 neuron node 设置页面进行设置时使用,node 设置的参数将通过 json 方式呈现(json 文件的配置请参考 插件设置文件 ),neuron 将通过此函数通知插件进行设置。mqtt_plugin_config 函数首先会解析并保存配置信息,然后建立连接。

mqtt-config

c
static int mqtt_plugin_config(neu_plugin_t *plugin, const char *config)
{
	plog_info(plugin, "config: %s", config);

    return NEU_ERR_SUCCESS;
}

.request

调用 mqtt_plugin_request 函数,根据请求类型对应响应处理。

c
static int mqtt_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
                               void *data)
{
    assert(NULL != plugin);
    assert(NULL != head);
    assert(NULL != data);

    neu_err_code_e error = NEU_ERR_SUCCESS;

    switch (head->type) {
    case NEU_RESP_ERROR:
        error = write_response(plugin, head, data);
        break;
    case NEU_RESP_READ_GROUP:
        error = read_response(plugin, head, data);
        break;
    case NEU_REQRESP_TRANS_DATA:
        error = trans_data(plugin, data);
        break;
    case NEU_REQRESP_NODES_STATE: {
        error = node_state_send(plugin, head, data);
        break;
    }
    case NEU_REQRESP_NODE_DELETED:
        break;
    default:
        error = NEU_ERR_MQTT_FAILURE;
        break;
    }

    return error;
}

第三步,插件设置文件

mqtt.json 文件设置应用配置参数,配置 mqtt 插件每个参数的字段说明如下所示。

参数说明
name页面显示该参数的名称
description该参数的具体说明
type该参数的类型,目前常用的是 int 和 string 两种类型
attribute该参数的属性,只有两种可选和必选,即 required 和 optional
default设置该参数的默认值
valid该参数可填写的范围,string 类型用 length,int 类型使用 max 和 min
map用于设置选项框
json
{
	"upload-topic": {
		"name": "upload topic",
		"description": "User defined upload topic",
		"type": "string",
		"attribute": "required",
		"default": "/neuron/${node-name}/upload",
		"valid": {
			"length": 255
		}
	},
	"heartbeat-topic": {
		"name": "heartbeat topic",
		"description": "User defined heartbeat topic",
		"type": "string",
		"attribute": "required",
		"default": "/neuron/${node-name}/heartbeat",
		"valid": {
			"length": 255
		}
	},
	"format": {
		"name": "upload format",
		"description": "The json format of the data reported in the upload topic. In the values mode, all items are contained in the values object or the errors object, respectively. In tags mode, all items are contained in an array",
		"attribute": "optional",
		"type": "map",
		"default": 0,
		"valid": {
			"map": [
				{
					"key": "format-values",
					"value": 0
				},
				{
					"key": "format-tags",
					"value": 1
				}
			]
		}
	},
	"cache": {
		"name": "cache size(MB)",
		"description": "The maximum byte limit in MB for the data backlog when an MQTT connection exception occurs",
		"type": "int",
		"attribute": "optional",
		"default": 64,
		"valid": {
			"min": 1,
			"max": 256 
		}
	},
	"ssl": {
		"name": "ssl",
		"description": "Enable SSL connection",
		"attribute": "optional",
		"type": "bool",
		"default": false,
		"valid": {}
	},
	"host": {
		"name": "host",
		"description": "MQTT broker host",
		"attribute": "required",
		"type": "string",
		"default": "broker.emqx.io",
		"valid": {
			"length": 255
		}
	},
	"port": {
		"name": "port",
		"description": "MQTT broker port",
		"attribute": "required",
		"type": "int",
		"default": 1883,
		"valid": {
			"min": 1024,
			"max": 65535
		}
	},
	"username": {
		"name": "username",
		"description": "User name",
		"attribute": "optional",
		"type": "string",
		"default": "",
		"valid": {
			"length": 255
		}
	},
	"password": {
		"name": "password",
		"description": "Password",
		"attribute": "optional",
		"type": "string",
		"default": "",
		"valid": {
			"length":255 
		}
	},
	"ca": {
		"name": "CA",
		"description": "CA certificate file",
		"attribute": "required",
		"type": "file",
		"condition": {
			"field": "ssl",
			"value": true
		},
		"default": "",
		"valid": {
			"length": 81960
		}
	},
	"cert": {
		"name": "client cert",
		"description": "client x509 certificate file",
		"attribute": "optional",
		"type": "file",
		"condition": {
			"field": "ssl",
			"value": true
		},
		"default": "",
		"valid": {
			"length": 81960
		}
	},
	"key": {
		"name": "client key",
		"description": "client key file",
		"attribute": "optional",
		"type": "file",
		"condition": {
			"field": "ssl",
			"value": true
		},
		"default": "",
		"valid": {
			"length": 81960
		}
	},
	"keypass": {
		"name": "keypass",
		"description": "key password",
		"attribute": "optional",
		"type": "string",
		"condition": {
			"field": "ssl",
			"value": true
		},
		"default": "",
		"valid": {
			"length": 256
		}
	}
}