Rules: Trigger Event with MQTT Subscriptions

Support subscribe/unsubscribe MQTT topics and trigger specified event with the subscribed MQTT topic.
You can subscribe a MQTT topic and assign an event name. Once we received subscribed MQTT message, an event will be automatically triggered. So you can set up a rule with "ON EVENT#<event_name> DO ..." to do whatever you want based on this MQTT message. The payload is passed as a parameter once the event been triggered. If the payload is in JSON format, you are able to get the value of specified key as parameter.
For example, if you have a Tasmota based thermostat and multiple temperature sensors in different place, usually you have to set up a centre home automation system like Domoticz to control the thermostat. Right now, with this new feature, you can write a rule to do this.
Two new commands in Rules:
1. Subscribe
Subscribe a MQTT topic (with or without key) and assign an event name to it.
Command format:
	Subscribe [<event_name>, <topic> [, <key>]]
		This command will subscribe a <topic> and give it an event name <event_name>.
		The optional parameter <key> is for parse the specified key/value from MQTT message
			payload with JSON format.
		In order to parse value from two level JSON data, you can use one dot (".") to split the key into two section.
		Subscribe command without any parameter will list all topics currently subscribed.
2. Unsubscribe
Unsubscribe specified MQTT event.
Command format:
	Unsubscribe [<event_name>]
		Unsubscribe a topic subscribed by specify the event name.
		If no event specified, Unsubscribe all topics subscribed.
Examples:
1.
	Subscribe BkLight, Tasmota/BackyardLight/stat/POWER
		And define a rule like:
	Rule1 on event#BkLight=ON do ruletimer4 60 endon
2.
	Subscribe DnTemp, Tasmota/RoomSensor1/stat/SENSOR, DS18B20.Temperature
		Define a rule to deal with the MQTT message like {"Time":"2017-02-16T10:13:52", "DS18B20":{"Temperature":20.6}}
	Rule1 ON EVENT#DnTemp>=21 DO ... ENDON
This commit is contained in:
Laurent 2019-02-23 22:33:09 -05:00
parent 3298048c60
commit dd27ade7ef
4 changed files with 242 additions and 6 deletions

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Subnet Mask"
#define D_SUBSCRIBE_TO "Subscribe to"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Successful"
#define D_SUNRISE "Sunrise"
#define D_SUNSET "Sunset"

View File

@ -280,6 +280,7 @@
// -- Rules ---------------------------------------
#define USE_RULES // Add support for rules (+4k4 code)
#define USE_EXPRESSION // Add support for expression evaluation in rules (+3k2 code, +64 bytes mem)
#define SUPPORT_MQTT_EVENT // Support trigger event with MQTT subscriptions (+3k5 code)
// -- Internal Analog input -----------------------
#define USE_ADC_VCC // Display Vcc in Power status. Disable for use as Analog input on selected devices

View File

@ -110,12 +110,18 @@ void MqttDisconnect(void)
MqttClient.disconnect();
}
void MqttSubscribeLib(char *topic)
void MqttSubscribeLib(const char *topic)
{
MqttClient.subscribe(topic);
MqttClient.loop(); // Solve LmacRxBlk:1 messages
}
void MqttUnsubscribeLib(const char *topic)
{
MqttClient.unsubscribe(topic);
MqttClient.loop(); // Solve LmacRxBlk:1 messages
}
bool MqttPublishLib(const char* topic, bool retained)
{
bool result = MqttClient.publish(topic, mqtt_data, retained);
@ -148,11 +154,16 @@ void MqttDisconnectedCb(void)
MqttDisconnected(MqttClient.State()); // status codes are documented in file mqtt.h as tConnState
}
void MqttSubscribeLib(char *topic)
void MqttSubscribeLib(const char *topic)
{
MqttClient.Subscribe(topic, 0);
}
void MqttUnsubscribeLib(const char *topic)
{
MqttClient.Unsubscribe(topic, 0);
}
bool MqttPublishLib(const char* topic, bool retained)
{
return MqttClient.Publish(topic, mqtt_data, strlen(mqtt_data), 0, retained);
@ -190,11 +201,16 @@ void MqttMyDataCb(String &topic, String &data)
MqttDataHandler((char*)topic.c_str(), (uint8_t*)data.c_str(), data.length());
}
void MqttSubscribeLib(char *topic)
void MqttSubscribeLib(const char *topic)
{
MqttClient.subscribe(topic, 0);
}
void MqttUnsubscribeLib(const char *topic)
{
MqttClient.unsubscribe(topic, 0);
}
bool MqttPublishLib(const char* topic, bool retained)
{
return MqttClient.publish(topic, mqtt_data, strlen(mqtt_data), retained, 0);
@ -251,13 +267,20 @@ void MqttRetryCounter(uint8_t value)
mqtt_retry_counter = value;
}
void MqttSubscribe(char *topic)
void MqttSubscribe(const char *topic)
{
snprintf_P(log_data, sizeof(log_data), PSTR(D_LOG_MQTT D_SUBSCRIBE_TO " %s"), topic);
AddLog(LOG_LEVEL_DEBUG);
MqttSubscribeLib(topic);
}
void MqttUnsubscribe(const char *topic)
{
snprintf_P(log_data, sizeof(log_data), PSTR(D_LOG_MQTT D_UNSUBSCRIBE_FROM " %s"), topic);
AddLog(LOG_LEVEL_DEBUG);
MqttUnsubscribeLib(topic);
}
void MqttPublishDirect(const char* topic, bool retained)
{
char sretained[CMDSZ];

View File

@ -75,6 +75,8 @@
#define D_CMND_MULT "Mult"
#define D_CMND_SCALE "Scale"
#define D_CMND_CALC_RESOLUTION "CalcRes"
#define D_CMND_SUBSCRIBE "Subscribe"
#define D_CMND_UNSUBSCRIBE "Unsubscribe"
#define D_JSON_INITIATED "Initiated"
@ -105,8 +107,18 @@ const char kCompareOperators[] PROGMEM = "=\0>\0<\0|\0==!=>=<=";
#define MAX_EXPRESSION_OPERATOR_PRIORITY 4
#endif // USE_EXPRESSION
enum RulesCommands { CMND_RULE, CMND_RULETIMER, CMND_EVENT, CMND_VAR, CMND_MEM, CMND_ADD, CMND_SUB, CMND_MULT, CMND_SCALE, CMND_CALC_RESOLUTION };
const char kRulesCommands[] PROGMEM = D_CMND_RULE "|" D_CMND_RULETIMER "|" D_CMND_EVENT "|" D_CMND_VAR "|" D_CMND_MEM "|" D_CMND_ADD "|" D_CMND_SUB "|" D_CMND_MULT "|" D_CMND_SCALE "|" D_CMND_CALC_RESOLUTION ;
enum RulesCommands { CMND_RULE, CMND_RULETIMER, CMND_EVENT, CMND_VAR, CMND_MEM, CMND_ADD, CMND_SUB, CMND_MULT, CMND_SCALE, CMND_CALC_RESOLUTION, CMND_SUBSCRIBE, CMND_UNSUBSCRIBE };
const char kRulesCommands[] PROGMEM = D_CMND_RULE "|" D_CMND_RULETIMER "|" D_CMND_EVENT "|" D_CMND_VAR "|" D_CMND_MEM "|" D_CMND_ADD "|" D_CMND_SUB "|" D_CMND_MULT "|" D_CMND_SCALE "|" D_CMND_CALC_RESOLUTION "|" D_CMND_SUBSCRIBE "|" D_CMND_UNSUBSCRIBE ;
#ifdef SUPPORT_MQTT_EVENT
#include <LinkedList.h> // Import LinkedList library
typedef struct {
String Event;
String Topic;
String Key;
} MQTT_Subscription;
LinkedList<MQTT_Subscription> subscriptions;
#endif //SUPPORT_MQTT_EVENT
String rules_event_value;
unsigned long rules_timer[MAX_RULE_TIMERS] = { 0 };
@ -577,6 +589,192 @@ void RulesTeleperiod(void)
rules_teleperiod = 0;
}
#ifdef SUPPORT_MQTT_EVENT
/********************************************************************************************/
/*
* Rules: Process received MQTT message.
* If the message is in our subscription list, trigger an event with the value parsed from MQTT data
* Input:
* void - We are going to access XdrvMailbox data directly.
* Return:
* true - The message is consumed.
* false - The message is not in our list.
*/
bool RulesMqttData(void)
{
bool serviced = false;
if (XdrvMailbox.data_len < 1 || XdrvMailbox.data_len > 128) {
return false;
}
String sTopic = XdrvMailbox.topic;
String sData = XdrvMailbox.data;
//snprintf_P(log_data, sizeof(log_data), PSTR("RUL: MQTT Topic %s, Event %s"), XdrvMailbox.topic, XdrvMailbox.data);
//AddLog(LOG_LEVEL_DEBUG);
MQTT_Subscription event_item;
//Looking for matched topic
for (int index = 0; index < subscriptions.size(); index++) {
event_item = subscriptions.get(index);
//snprintf_P(log_data, sizeof(log_data), PSTR("RUL: Match MQTT message Topic %s with subscription topic %s"), sTopic.c_str(), event_item.Topic.c_str());
//AddLog(LOG_LEVEL_DEBUG);
if (sTopic.startsWith(event_item.Topic)) {
//This topic is subscribed by us, so serve it
serviced = true;
String value;
if (event_item.Key.length() == 0) { //If did not specify Key
value = sData;
} else { //If specified Key, need to parse Key/Value from JSON data
StaticJsonBuffer<400> jsonBuf;
JsonObject& jsonData = jsonBuf.parseObject(sData);
String key1 = event_item.Key;
String key2;
if (!jsonData.success()) break; //Failed to parse JSON data, ignore this message.
int dot;
if ((dot = key1.indexOf('.')) > 0) {
key2 = key1.substring(dot+1);
key1 = key1.substring(0, dot);
if (!jsonData[key1][key2].success()) break; //Failed to get the key/value, ignore this message.
value = (const char *)jsonData[key1][key2];
} else {
if (!jsonData[key1].success()) break;
value = (const char *)jsonData[key1];
}
}
value.trim();
//Create an new event. Cannot directly call RulesProcessEvent().
snprintf_P(event_data, sizeof(event_data), PSTR("%s=%s"), event_item.Event.c_str(), value.c_str());
}
}
return serviced;
}
/********************************************************************************************/
/*
* Subscribe a MQTT topic (with or without key) and assign an event name to it
* Command Subscribe format:
* Subscribe <event_name>, <topic> [, <key>]
* This command will subscribe a <topic> and give it an event name <event_name>.
* The optional parameter <key> is for parse the specified key/value from MQTT message
* payload with JSON format.
* Subscribe
* Subscribe command without any parameter will list all topics currently subscribed.
* Input:
* data - A char buffer with all the parameters
* data_len - Length of the parameters
* Return:
* A string include subscribed event, topic and key.
*/
String RulesSubscribe(const char *data, int data_len)
{
MQTT_Subscription subscription_item;
String events;
if (data_len > 0) {
char parameters[data_len+1];
memcpy(parameters, data, data_len);
parameters[data_len] = '\0';
String event_name, topic, key;
char * pos = strtok(parameters, ",");
if (pos) {
event_name = Trim(pos);
pos = strtok(NULL, ",");
if (pos) {
topic = Trim(pos);
pos = strtok(NULL, ",");
if (pos) {
key = Trim(pos);
}
}
}
//snprintf_P(log_data, sizeof(log_data), PSTR("RUL: Subscribe command with parameters: %s, %s, %s."), event_name.c_str(), topic.c_str(), key.c_str());
//AddLog(LOG_LEVEL_DEBUG);
event_name.toUpperCase();
if (event_name.length() > 0 && topic.length() > 0) {
//Search all subscriptions
for (int index=0; index < subscriptions.size(); index++) {
if (subscriptions.get(index).Event.equals(event_name)) {
//If find exists one, remove it.
String stopic = subscriptions.get(index).Topic + "/#";
MqttUnsubscribe(stopic.c_str());
subscriptions.remove(index);
break;
}
}
//Add "/#" to the topic
if (!topic.endsWith("#")) {
if (topic.endsWith("/")) {
topic.concat("#");
} else {
topic.concat("/#");
}
}
//snprintf_P(log_data, sizeof(log_data), PSTR("RUL: New topic: %s."), topic.c_str());
//AddLog(LOG_LEVEL_DEBUG);
//MQTT Subscribe
subscription_item.Event = event_name;
subscription_item.Topic = topic.substring(0, topic.length() - 2); //Remove "/#" so easy to match
subscription_item.Key = key;
subscriptions.add(subscription_item);
MqttSubscribe(topic.c_str());
events.concat(event_name + "," + topic
+ (key.length()>0 ? "," : "")
+ key);
} else {
events = D_JSON_WRONG_PARAMETERS;
}
} else {
//If did not specify the event name, list all subscribed event
for (int index=0; index < subscriptions.size(); index++) {
subscription_item = subscriptions.get(index);
events.concat(subscription_item.Event + "," + subscription_item.Topic
+ (subscription_item.Key.length()>0 ? "," : "")
+ subscription_item.Key + "; ");
}
}
return events;
}
/********************************************************************************************/
/*
* Unsubscribe specified MQTT event. If no event specified, Unsubscribe all.
* Command Unsubscribe format:
* Unsubscribe [<event_name>]
* Input:
* data - Event name
* data_len - Length of the parameters
* Return:
* list all the events unsubscribed.
*/
String RulesUnsubscribe(const char * data, int data_len)
{
MQTT_Subscription subscription_item;
String events;
if (data_len > 0) {
for (int index = 0; index < subscriptions.size(); index++) {
subscription_item = subscriptions.get(index);
if (subscription_item.Event.equalsIgnoreCase(data)) {
String stopic = subscription_item.Topic + "/#";
MqttUnsubscribe(stopic.c_str());
events = subscription_item.Event;
subscriptions.remove(index);
break;
}
}
} else {
//If did not specify the event name, unsubscribe all event
String stopic;
while (subscriptions.size() > 0) {
events.concat(subscriptions.get(0).Event + "; ");
stopic = subscriptions.get(0).Topic + "/#";
MqttUnsubscribe(stopic.c_str());
subscriptions.remove(0);
}
}
return events;
}
#endif // SUPPORT_MQTT_EVENT
#ifdef USE_EXPRESSION
/********************************************************************************************/
/*
@ -1026,6 +1224,14 @@ bool RulesCommand(void)
}
}
snprintf_P(mqtt_data, sizeof(mqtt_data), S_JSON_COMMAND_INDEX_SVALUE, command, index, vars[index -1]);
#ifdef SUPPORT_MQTT_EVENT
} else if (CMND_SUBSCRIBE == command_code) { //MQTT Subscribe command. Subscribe <Event>, <Topic> [, <Key>]
String result = RulesSubscribe(XdrvMailbox.data, XdrvMailbox.data_len);
snprintf_P(mqtt_data, sizeof(mqtt_data), S_JSON_COMMAND_SVALUE, command, result.c_str());
} else if (CMND_UNSUBSCRIBE == command_code) { //MQTT Un-subscribe command. UnSubscribe <Event>
String result = RulesUnsubscribe(XdrvMailbox.data, XdrvMailbox.data_len);
snprintf_P(mqtt_data, sizeof(mqtt_data), S_JSON_COMMAND_SVALUE, command, result.c_str());
#endif //SUPPORT_MQTT_EVENT
}
else serviced = false; // Unknown command
@ -1067,6 +1273,11 @@ bool Xdrv10(uint8_t function)
case FUNC_RULES_PROCESS:
result = RulesProcess();
break;
#ifdef SUPPORT_MQTT_EVENT
case FUNC_MQTT_DATA:
result = RulesMqttData();
break;
#endif //SUPPORT_MQTT_EVENT
}
return result;
}