Skip to content

MQTT Application Development Example

Northbound application development mainly includes the following parts. The bottom layer is the development of the instruction processing layer, and the outermost layer is the development of the application layer.

ModulesDocumentationInstructions
Instruction processing layer developmentcommand.c command.h common.h heartbeat.c heartbeat.h read_write.c read_write.hPlugin parsing of directives
Application layer developmentmqtt_plugin.c mqtt_plugin.h mqtt_util.c mqtt_util.hImplementation of plugin theme framework
Plugin Settings Filemqtt.jsonDefinition of Plugin Settings File

Constant Description

ConstantDescription
TOPIC_TYPE_READTopic type is read
TOPIC_TYPE_WRITETopic type is write
TOPIC_TYPE_UPLOADTopic type is upload
TOPIC_TYPE_HEARTBEATTopic type is heartbeat

Step 1 Instruction Processing Layer Development

MQTT currently implements the interfaces for uploading data, heartbeat data, reading Tags and writing Tags. The specific processing of the response corresponding to the mqtt request is defined in command.c, and heartbeat.c and read_write.c provide the required function implementation for command.c.

FunctionDescription
command_response_handleMQTT response processing
command_read_once_responseResponse handling for read topics
command_read_periodic_responseResponse handling for uploading topics
command_write_responseResponse handling fot write topics
command_heartbeat_responseResponse processing of heartbeat data

Step 2 Driver Layer Development

The mqtt_util.c and mqtt_util.h files define the specific function implementations used in the mqtt.c file.

In the development of the north-south driver layer,it is necessary to constrict the structure of neu_plugin_intf_funs_t first, and realize the function of each element in the structure.

c
static const neu_plugin_intf_funs_t plugin_intf_funs = {
    .open    = mqtt_plugin_open,
    .close   = mqtt_plugin_close,
    .init    = mqtt_plugin_init,
    .uninit  = mqtt_plugin_uninit,
    .start   = mqtt_plugin_start,
    .stop    = mqtt_plugin_stop,
    .setting = mqtt_plugin_config,
    .request = mqtt_plugin_request,
};

.open

Call the mqtt_plugin_open function, and create a structure struct neu_plugin defined by the plugin itself based on the first function called by neuron when the plugin creates a node. The structure is defined in mqtt_plugin.h. It should be noted that the first member of the structure must be neu_plugin_common_t common, and other members can be added according to the specific implementation of the driver.

c
static neu_plugin_t *mqtt_plugin_open(void)
{
    neu_plugin_t *plugin = (neu_plugin_t *) calloc(1, sizeof(neu_plugin_t));

    neu_plugin_common_init(&plugin->common);
	
    return plugin;
}

.close

The mqtt_plugin_close function is called, the last function called by neuron when the node is deleted, to release the neu_plugin_t created by open.

c
static int mqtt_plugin_close(neu_plugin_t *plugin)
{
    free(plugin);

    return NEU_ERR_SUCCESS;
}

.init

Call the mqtt_plugin_init function, which is called immediately after neuron calls open when creating a node. This function is mainly used for some resources that need to be initialized in the plug-in. The mqtt plug-in mainly initializes the running status and configuration of mqtt.

c
static int mqtt_plugin_init(neu_plugin_t *plugin)
{
    assert(NULL != plugin);

    plugin->routine = NULL;
    plugin->running = false;
    plugin->config  = NULL;
    plugin_cache_init(plugin);

    const char *name = neu_plugin_module.module_name;
    plog_info(plugin, "initialize plugin: %s", name);

    return NEU_ERR_SUCCESS;
}

.uninit

Call the mqtt_plugin_uninit function, the first function called by neuron when the node is deleted. This function mainly releases some resources applied and initialized in init.

c
static int mqtt_plugin_uninit(neu_plugin_t *plugin)
{
    assert(NULL != plugin);

    plugin_stop_running(plugin);
    plugin_cache_uninit(plugin);
    plugin_config_free(plugin);

    const char *name = neu_plugin_module.module_name;
    plog_info(plugin, "uninitialize plugin: %s", name);
    return NEU_ERR_SUCCESS;
}

.start

Call the mqtt_plugin_start function. When the user clicks start on the neuron node page, neuron will call this function to notify the plug-in to start running and start connecting to the device. If the configuration is incorrect, it will return the node setting invalid error.

mqtt_start

c
static int mqtt_plugin_start(neu_plugin_t *plugin)
{
    assert(NULL != plugin);

    int rc = plguin_start_running(plugin);
    if (0 != rc) {
        return NEU_ERR_NODE_SETTING_INVALID;
    }

    return NEU_ERR_SUCCESS;
}

.stop

Call the mqtt_plugin_stop function, when the user clicks stop on the neuron node page, neuron will call this function, stop notifies the plug-in to stop running, and closes the connection between the plug-in and neuron.

mqtt_stop

c
static int mqtt_plugin_stop(neu_plugin_t *plugin)
{
    assert(NULL != plugin);

    plugin_stop_running(plugin);
    return NEU_ERR_SUCCESS;
}

.setting

Call the mqtt_plugin_config function, which is used by the user when making settings on the neuron node setting page. The parameters set by the node will be presented in json (for the configuration of the json file, please refer to Plugin Setting File ), and neuron will notify the plugin to set through this function. The mqtt_plugin_config function first parses and saves the configuration information, and then establishes the connection.

mqtt_config

c
static int mqtt_plugin_config(neu_plugin_t *plugin, const char *config)
{
	plog_info(plugin, "config: %s", config);

    return NEU_ERR_SUCCESS;
}

.request

Call the mqtt_plugin_request function to handle the response according to the request type.

c
static int mqtt_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
                               void *data)
{
    assert(NULL != plugin);
    assert(NULL != head);
    assert(NULL != data);

    neu_err_code_e error = NEU_ERR_SUCCESS;

    switch (head->type) {
    case NEU_RESP_ERROR:
        error = write_response(plugin, head, data);
        break;
    case NEU_RESP_READ_GROUP:
        error = read_response(plugin, head, data);
        break;
    case NEU_REQRESP_TRANS_DATA:
        error = trans_data(plugin, data);
        break;
    case NEU_REQRESP_NODES_STATE: {
        error = node_state_send(plugin, head, data);
        break;
    }
    case NEU_REQRESP_NODE_DELETED:
        break;
    default:
        error = NEU_ERR_MQTT_FAILURE;
        break;
    }

    return error;
}

Step 3 Plugin Setting File

The mqtt.json file sets the application configuration parameters. The field descriptions for each parameter of the mqtt plugin are as follows.

ParametersDescription
nameThe page displays the name of the parameter
descriptionThe specific description of this parameter
typeThe type of the parameter, currently commonly used two types: int and string
attributeThe attributes of this parameter, there are only two optional and required, namely required and optional
defaultSet the default value of this parameter
validThe range that this parameter can fill in, use length for string type, max and min for int type
map用于设置选项框
json
{
	"upload-topic": {
		"name": "upload topic",
		"description": "User defined upload topic",
		"type": "string",
		"attribute": "required",
		"default": "/neuron/${node-name}/upload",
		"valid": {
			"length": 255
		}
	},
	"heartbeat-topic": {
		"name": "heartbeat topic",
		"description": "User defined heartbeat topic",
		"type": "string",
		"attribute": "required",
		"default": "/neuron/${node-name}/heartbeat",
		"valid": {
			"length": 255
		}
	},
	"format": {
		"name": "upload format",
		"description": "The json format of the data reported in the upload topic. In the values mode, all items are contained in the values object or the errors object, respectively. In tags mode, all items are contained in an array",
		"attribute": "optional",
		"type": "map",
		"default": 0,
		"valid": {
			"map": [
				{
					"key": "format-values",
					"value": 0
				},
				{
					"key": "format-tags",
					"value": 1
				}
			]
		}
	},
	"cache": {
		"name": "cache size(MB)",
		"description": "The maximum byte limit in MB for the data backlog when an MQTT connection exception occurs",
		"type": "int",
		"attribute": "optional",
		"default": 64,
		"valid": {
			"min": 1,
			"max": 256 
		}
	},
	"ssl": {
		"name": "ssl",
		"description": "Enable SSL connection",
		"attribute": "optional",
		"type": "bool",
		"default": false,
		"valid": {}
	},
	"host": {
		"name": "host",
		"description": "MQTT broker host",
		"attribute": "required",
		"type": "string",
		"default": "broker.emqx.io",
		"valid": {
			"length": 255
		}
	},
	"port": {
		"name": "port",
		"description": "MQTT broker port",
		"attribute": "required",
		"type": "int",
		"default": 1883,
		"valid": {
			"min": 1024,
			"max": 65535
		}
	},
	"username": {
		"name": "username",
		"description": "User name",
		"attribute": "optional",
		"type": "string",
		"default": "",
		"valid": {
			"length": 255
		}
	},
	"password": {
		"name": "password",
		"description": "Password",
		"attribute": "optional",
		"type": "string",
		"default": "",
		"valid": {
			"length":255 
		}
	},
	"ca": {
		"name": "CA",
		"description": "CA certificate file",
		"attribute": "required",
		"type": "file",
		"condition": {
			"field": "ssl",
			"value": true
		},
		"default": "",
		"valid": {
			"length": 81960
		}
	},
	"cert": {
		"name": "client cert",
		"description": "client x509 certificate file",
		"attribute": "optional",
		"type": "file",
		"condition": {
			"field": "ssl",
			"value": true
		},
		"default": "",
		"valid": {
			"length": 81960
		}
	},
	"key": {
		"name": "client key",
		"description": "client key file",
		"attribute": "optional",
		"type": "file",
		"condition": {
			"field": "ssl",
			"value": true
		},
		"default": "",
		"valid": {
			"length": 81960
		}
	},
	"keypass": {
		"name": "keypass",
		"description": "key password",
		"attribute": "optional",
		"type": "string",
		"condition": {
			"field": "ssl",
			"value": true
		},
		"default": "",
		"valid": {
			"length": 256
		}
	}
}