Merge pull request #5320 from laurentdong/development

Support subscribe/unsubscribe MQTT topics and trigger specified event with the subscribed MQTT topic.
This commit is contained in:
Theo Arends 2019-02-24 17:02:45 +01:00 committed by GitHub
commit e16178d912
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 262 additions and 6 deletions

View File

@ -148,6 +148,7 @@
#define D_STOP "Стоп"
#define D_SUBNET_MASK "Маска на подмрежата"
#define D_SUBSCRIBE_TO "Записване за"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Успешно"
#define D_SUNRISE "Изгрев"
#define D_SUNSET "Залез"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Maska podsítě"
#define D_SUBSCRIBE_TO "Přihlaš se do"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "úspěšné."
#define D_SUNRISE "Svítání"
#define D_SUNSET "Soumrak"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Subnetzmaske"
#define D_SUBSCRIBE_TO "abonniere"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "erfolgreich"
#define D_SUNRISE "Sonnenaufgang"
#define D_SUNSET "Sonnenuntergang"

View File

@ -148,6 +148,7 @@
#define D_STOP "Τερματισμός"
#define D_SUBNET_MASK "Μάσκα υποδικτύου"
#define D_SUBSCRIBE_TO "Εγγραφή στο"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Επιτυχές"
#define D_SUNRISE "Σούρουπο"
#define D_SUNSET "Ηλιοβασίλεμα"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Subnet Mask"
#define D_SUBSCRIBE_TO "Subscribe to"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Successful"
#define D_SUNRISE "Sunrise"
#define D_SUNSET "Sunset"

View File

@ -148,6 +148,7 @@
#define D_STOP "Detener"
#define D_SUBNET_MASK "Máscara Subred"
#define D_SUBSCRIBE_TO "Suscribir a"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Exitosa"
#define D_SUNRISE "Salida del Sol"
#define D_SUNSET "Puesta del Sol"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Masque sous-réseau"
#define D_SUBSCRIBE_TO "Souscrire à"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Réussi"
#define D_SUNRISE "Lever du jour"
#define D_SUNSET "Tombée de la nuit"

View File

@ -148,6 +148,7 @@
#define D_STOP "עצירה"
#define D_SUBNET_MASK "רשת מסכת משנה"
#define D_SUBSCRIBE_TO "הרשם ל"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "הצליח"
#define D_SUNRISE "זריחה"
#define D_SUNSET "שקיעה"

View File

@ -148,6 +148,7 @@
#define D_STOP "Leállítás"
#define D_SUBNET_MASK "Alhálózati maszk"
#define D_SUBSCRIBE_TO "Feliratkozás a(z)"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Sikeres"
#define D_SUNRISE "Napkelte"
#define D_SUNSET "Napnyugta"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Maschera sottorete"
#define D_SUBSCRIBE_TO "Sottoscrivi a"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Riuscito"
#define D_SUNRISE "Alba"
#define D_SUNSET "Tramonto"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Subnet Masker"
#define D_SUBSCRIBE_TO "Abonneer op"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Gelukt"
#define D_SUNRISE "Zonsopgang"
#define D_SUNSET "Zonsondergang"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Maska podsieci"
#define D_SUBSCRIBE_TO "Subskrybuj do"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Powodzenie"
#define D_SUNRISE "Wschód słońca"
#define D_SUNSET "Zachód słońca"

View File

@ -148,6 +148,7 @@
#define D_STOP "Parar"
#define D_SUBNET_MASK "Máscara sub rede"
#define D_SUBSCRIBE_TO "Subescrever para"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Successo"
#define D_SUNRISE "Nascer do sol"
#define D_SUNSET "Por do sol"

View File

@ -148,6 +148,7 @@
#define D_STOP "Parar"
#define D_SUBNET_MASK "Mascara sub rede"
#define D_SUBSCRIBE_TO "Subescrever para"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Successo"
#define D_SUNRISE "Sunrise"
#define D_SUNSET "Sunset"

View File

@ -148,6 +148,7 @@
#define D_STOP "Стоп"
#define D_SUBNET_MASK "Маска Подсети"
#define D_SUBSCRIBE_TO "Подписаться на"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Успешно"
#define D_SUNRISE "Sunrise"
#define D_SUNSET "Sunset"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stop"
#define D_SUBNET_MASK "Maska podsiete"
#define D_SUBSCRIBE_TO "Prihlásiť do"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "úspešné."
#define D_SUNRISE "Svitanie"
#define D_SUNSET "Súmrak"

View File

@ -148,6 +148,7 @@
#define D_STOP "Stoppa"
#define D_SUBNET_MASK "Nätmask"
#define D_SUBSCRIBE_TO "Prenumera på"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Lyckat"
#define D_SUNRISE "Soluppgång"
#define D_SUNSET "Solnedgång"

View File

@ -148,6 +148,7 @@
#define D_STOP "Durdur"
#define D_SUBNET_MASK "Altağ Geçidi Maskesi"
#define D_SUBSCRIBE_TO "Abone olunan"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Başarıyla Tamamlandı"
#define D_SUNRISE "Gün doğumu"
#define D_SUNSET "Gün batımı"

View File

@ -148,6 +148,7 @@
#define D_STOP "Стоп"
#define D_SUBNET_MASK "Маска Підмережі"
#define D_SUBSCRIBE_TO "Підписатись на"
#define D_UNSUBSCRIBE_FROM "Unsubscribe from"
#define D_SUCCESSFUL "Успішно"
#define D_SUNRISE "Схід сонця"
#define D_SUNSET "Захід сонця"

View File

@ -148,6 +148,7 @@
#define D_STOP "停止"
#define D_SUBNET_MASK "子网掩码"
#define D_SUBSCRIBE_TO "订阅"
#define D_UNSUBSCRIBE_FROM "退订"
#define D_SUCCESSFUL "成功"
#define D_SUNRISE "日出"
#define D_SUNSET "日落"

View File

@ -148,6 +148,7 @@
#define D_STOP "停止"
#define D_SUBNET_MASK "子網遮罩"
#define D_SUBSCRIBE_TO "訂閱"
#define D_UNSUBSCRIBE_FROM "退訂"
#define D_SUCCESSFUL "成功"
#define D_SUNRISE "Sunrise"
#define D_SUNSET "Sunset"

View File

@ -280,6 +280,7 @@
// -- Rules ---------------------------------------
#define USE_RULES // Add support for rules (+4k4 code)
#define USE_EXPRESSION // Add support for expression evaluation in rules (+3k2 code, +64 bytes mem)
#define SUPPORT_MQTT_EVENT // Support trigger event with MQTT subscriptions (+3k5 code)
// -- Internal Analog input -----------------------
#define USE_ADC_VCC // Display Vcc in Power status. Disable for use as Analog input on selected devices

View File

@ -110,12 +110,18 @@ void MqttDisconnect(void)
MqttClient.disconnect();
}
void MqttSubscribeLib(char *topic)
void MqttSubscribeLib(const char *topic)
{
MqttClient.subscribe(topic);
MqttClient.loop(); // Solve LmacRxBlk:1 messages
}
void MqttUnsubscribeLib(const char *topic)
{
MqttClient.unsubscribe(topic);
MqttClient.loop(); // Solve LmacRxBlk:1 messages
}
bool MqttPublishLib(const char* topic, bool retained)
{
bool result = MqttClient.publish(topic, mqtt_data, retained);
@ -148,11 +154,16 @@ void MqttDisconnectedCb(void)
MqttDisconnected(MqttClient.State()); // status codes are documented in file mqtt.h as tConnState
}
void MqttSubscribeLib(char *topic)
void MqttSubscribeLib(const char *topic)
{
MqttClient.Subscribe(topic, 0);
}
void MqttUnsubscribeLib(const char *topic)
{
MqttClient.Unsubscribe(topic, 0);
}
bool MqttPublishLib(const char* topic, bool retained)
{
return MqttClient.Publish(topic, mqtt_data, strlen(mqtt_data), 0, retained);
@ -190,11 +201,16 @@ void MqttMyDataCb(String &topic, String &data)
MqttDataHandler((char*)topic.c_str(), (uint8_t*)data.c_str(), data.length());
}
void MqttSubscribeLib(char *topic)
void MqttSubscribeLib(const char *topic)
{
MqttClient.subscribe(topic, 0);
}
void MqttUnsubscribeLib(const char *topic)
{
MqttClient.unsubscribe(topic, 0);
}
bool MqttPublishLib(const char* topic, bool retained)
{
return MqttClient.publish(topic, mqtt_data, strlen(mqtt_data), retained, 0);
@ -251,13 +267,20 @@ void MqttRetryCounter(uint8_t value)
mqtt_retry_counter = value;
}
void MqttSubscribe(char *topic)
void MqttSubscribe(const char *topic)
{
snprintf_P(log_data, sizeof(log_data), PSTR(D_LOG_MQTT D_SUBSCRIBE_TO " %s"), topic);
AddLog(LOG_LEVEL_DEBUG);
MqttSubscribeLib(topic);
}
void MqttUnsubscribe(const char *topic)
{
snprintf_P(log_data, sizeof(log_data), PSTR(D_LOG_MQTT D_UNSUBSCRIBE_FROM " %s"), topic);
AddLog(LOG_LEVEL_DEBUG);
MqttUnsubscribeLib(topic);
}
void MqttPublishDirect(const char* topic, bool retained)
{
char sretained[CMDSZ];

View File

@ -75,6 +75,8 @@
#define D_CMND_MULT "Mult"
#define D_CMND_SCALE "Scale"
#define D_CMND_CALC_RESOLUTION "CalcRes"
#define D_CMND_SUBSCRIBE "Subscribe"
#define D_CMND_UNSUBSCRIBE "Unsubscribe"
#define D_JSON_INITIATED "Initiated"
@ -105,8 +107,18 @@ const char kCompareOperators[] PROGMEM = "=\0>\0<\0|\0==!=>=<=";
#define MAX_EXPRESSION_OPERATOR_PRIORITY 4
#endif // USE_EXPRESSION
enum RulesCommands { CMND_RULE, CMND_RULETIMER, CMND_EVENT, CMND_VAR, CMND_MEM, CMND_ADD, CMND_SUB, CMND_MULT, CMND_SCALE, CMND_CALC_RESOLUTION };
const char kRulesCommands[] PROGMEM = D_CMND_RULE "|" D_CMND_RULETIMER "|" D_CMND_EVENT "|" D_CMND_VAR "|" D_CMND_MEM "|" D_CMND_ADD "|" D_CMND_SUB "|" D_CMND_MULT "|" D_CMND_SCALE "|" D_CMND_CALC_RESOLUTION ;
enum RulesCommands { CMND_RULE, CMND_RULETIMER, CMND_EVENT, CMND_VAR, CMND_MEM, CMND_ADD, CMND_SUB, CMND_MULT, CMND_SCALE, CMND_CALC_RESOLUTION, CMND_SUBSCRIBE, CMND_UNSUBSCRIBE };
const char kRulesCommands[] PROGMEM = D_CMND_RULE "|" D_CMND_RULETIMER "|" D_CMND_EVENT "|" D_CMND_VAR "|" D_CMND_MEM "|" D_CMND_ADD "|" D_CMND_SUB "|" D_CMND_MULT "|" D_CMND_SCALE "|" D_CMND_CALC_RESOLUTION "|" D_CMND_SUBSCRIBE "|" D_CMND_UNSUBSCRIBE ;
#ifdef SUPPORT_MQTT_EVENT
#include <LinkedList.h> // Import LinkedList library
typedef struct {
String Event;
String Topic;
String Key;
} MQTT_Subscription;
LinkedList<MQTT_Subscription> subscriptions;
#endif //SUPPORT_MQTT_EVENT
String rules_event_value;
unsigned long rules_timer[MAX_RULE_TIMERS] = { 0 };
@ -577,6 +589,192 @@ void RulesTeleperiod(void)
rules_teleperiod = 0;
}
#ifdef SUPPORT_MQTT_EVENT
/********************************************************************************************/
/*
* Rules: Process received MQTT message.
* If the message is in our subscription list, trigger an event with the value parsed from MQTT data
* Input:
* void - We are going to access XdrvMailbox data directly.
* Return:
* true - The message is consumed.
* false - The message is not in our list.
*/
bool RulesMqttData(void)
{
bool serviced = false;
if (XdrvMailbox.data_len < 1 || XdrvMailbox.data_len > 128) {
return false;
}
String sTopic = XdrvMailbox.topic;
String sData = XdrvMailbox.data;
//snprintf_P(log_data, sizeof(log_data), PSTR("RUL: MQTT Topic %s, Event %s"), XdrvMailbox.topic, XdrvMailbox.data);
//AddLog(LOG_LEVEL_DEBUG);
MQTT_Subscription event_item;
//Looking for matched topic
for (int index = 0; index < subscriptions.size(); index++) {
event_item = subscriptions.get(index);
//snprintf_P(log_data, sizeof(log_data), PSTR("RUL: Match MQTT message Topic %s with subscription topic %s"), sTopic.c_str(), event_item.Topic.c_str());
//AddLog(LOG_LEVEL_DEBUG);
if (sTopic.startsWith(event_item.Topic)) {
//This topic is subscribed by us, so serve it
serviced = true;
String value;
if (event_item.Key.length() == 0) { //If did not specify Key
value = sData;
} else { //If specified Key, need to parse Key/Value from JSON data
StaticJsonBuffer<400> jsonBuf;
JsonObject& jsonData = jsonBuf.parseObject(sData);
String key1 = event_item.Key;
String key2;
if (!jsonData.success()) break; //Failed to parse JSON data, ignore this message.
int dot;
if ((dot = key1.indexOf('.')) > 0) {
key2 = key1.substring(dot+1);
key1 = key1.substring(0, dot);
if (!jsonData[key1][key2].success()) break; //Failed to get the key/value, ignore this message.
value = (const char *)jsonData[key1][key2];
} else {
if (!jsonData[key1].success()) break;
value = (const char *)jsonData[key1];
}
}
value.trim();
//Create an new event. Cannot directly call RulesProcessEvent().
snprintf_P(event_data, sizeof(event_data), PSTR("%s=%s"), event_item.Event.c_str(), value.c_str());
}
}
return serviced;
}
/********************************************************************************************/
/*
* Subscribe a MQTT topic (with or without key) and assign an event name to it
* Command Subscribe format:
* Subscribe <event_name>, <topic> [, <key>]
* This command will subscribe a <topic> and give it an event name <event_name>.
* The optional parameter <key> is for parse the specified key/value from MQTT message
* payload with JSON format.
* Subscribe
* Subscribe command without any parameter will list all topics currently subscribed.
* Input:
* data - A char buffer with all the parameters
* data_len - Length of the parameters
* Return:
* A string include subscribed event, topic and key.
*/
String RulesSubscribe(const char *data, int data_len)
{
MQTT_Subscription subscription_item;
String events;
if (data_len > 0) {
char parameters[data_len+1];
memcpy(parameters, data, data_len);
parameters[data_len] = '\0';
String event_name, topic, key;
char * pos = strtok(parameters, ",");
if (pos) {
event_name = Trim(pos);
pos = strtok(NULL, ",");
if (pos) {
topic = Trim(pos);
pos = strtok(NULL, ",");
if (pos) {
key = Trim(pos);
}
}
}
//snprintf_P(log_data, sizeof(log_data), PSTR("RUL: Subscribe command with parameters: %s, %s, %s."), event_name.c_str(), topic.c_str(), key.c_str());
//AddLog(LOG_LEVEL_DEBUG);
event_name.toUpperCase();
if (event_name.length() > 0 && topic.length() > 0) {
//Search all subscriptions
for (int index=0; index < subscriptions.size(); index++) {
if (subscriptions.get(index).Event.equals(event_name)) {
//If find exists one, remove it.
String stopic = subscriptions.get(index).Topic + "/#";
MqttUnsubscribe(stopic.c_str());
subscriptions.remove(index);
break;
}
}
//Add "/#" to the topic
if (!topic.endsWith("#")) {
if (topic.endsWith("/")) {
topic.concat("#");
} else {
topic.concat("/#");
}
}
//snprintf_P(log_data, sizeof(log_data), PSTR("RUL: New topic: %s."), topic.c_str());
//AddLog(LOG_LEVEL_DEBUG);
//MQTT Subscribe
subscription_item.Event = event_name;
subscription_item.Topic = topic.substring(0, topic.length() - 2); //Remove "/#" so easy to match
subscription_item.Key = key;
subscriptions.add(subscription_item);
MqttSubscribe(topic.c_str());
events.concat(event_name + "," + topic
+ (key.length()>0 ? "," : "")
+ key);
} else {
events = D_JSON_WRONG_PARAMETERS;
}
} else {
//If did not specify the event name, list all subscribed event
for (int index=0; index < subscriptions.size(); index++) {
subscription_item = subscriptions.get(index);
events.concat(subscription_item.Event + "," + subscription_item.Topic
+ (subscription_item.Key.length()>0 ? "," : "")
+ subscription_item.Key + "; ");
}
}
return events;
}
/********************************************************************************************/
/*
* Unsubscribe specified MQTT event. If no event specified, Unsubscribe all.
* Command Unsubscribe format:
* Unsubscribe [<event_name>]
* Input:
* data - Event name
* data_len - Length of the parameters
* Return:
* list all the events unsubscribed.
*/
String RulesUnsubscribe(const char * data, int data_len)
{
MQTT_Subscription subscription_item;
String events;
if (data_len > 0) {
for (int index = 0; index < subscriptions.size(); index++) {
subscription_item = subscriptions.get(index);
if (subscription_item.Event.equalsIgnoreCase(data)) {
String stopic = subscription_item.Topic + "/#";
MqttUnsubscribe(stopic.c_str());
events = subscription_item.Event;
subscriptions.remove(index);
break;
}
}
} else {
//If did not specify the event name, unsubscribe all event
String stopic;
while (subscriptions.size() > 0) {
events.concat(subscriptions.get(0).Event + "; ");
stopic = subscriptions.get(0).Topic + "/#";
MqttUnsubscribe(stopic.c_str());
subscriptions.remove(0);
}
}
return events;
}
#endif // SUPPORT_MQTT_EVENT
#ifdef USE_EXPRESSION
/********************************************************************************************/
/*
@ -1026,6 +1224,14 @@ bool RulesCommand(void)
}
}
snprintf_P(mqtt_data, sizeof(mqtt_data), S_JSON_COMMAND_INDEX_SVALUE, command, index, vars[index -1]);
#ifdef SUPPORT_MQTT_EVENT
} else if (CMND_SUBSCRIBE == command_code) { //MQTT Subscribe command. Subscribe <Event>, <Topic> [, <Key>]
String result = RulesSubscribe(XdrvMailbox.data, XdrvMailbox.data_len);
snprintf_P(mqtt_data, sizeof(mqtt_data), S_JSON_COMMAND_SVALUE, command, result.c_str());
} else if (CMND_UNSUBSCRIBE == command_code) { //MQTT Un-subscribe command. UnSubscribe <Event>
String result = RulesUnsubscribe(XdrvMailbox.data, XdrvMailbox.data_len);
snprintf_P(mqtt_data, sizeof(mqtt_data), S_JSON_COMMAND_SVALUE, command, result.c_str());
#endif //SUPPORT_MQTT_EVENT
}
else serviced = false; // Unknown command
@ -1067,6 +1273,11 @@ bool Xdrv10(uint8_t function)
case FUNC_RULES_PROCESS:
result = RulesProcess();
break;
#ifdef SUPPORT_MQTT_EVENT
case FUNC_MQTT_DATA:
result = RulesMqttData();
break;
#endif //SUPPORT_MQTT_EVENT
}
return result;
}