Refactor MQTT pre-processing

Refactor MQTT pre-processing
This commit is contained in:
Theo Arends 2019-07-26 09:52:14 +02:00
parent 250c991c97
commit cc9d7bb8e9
2 changed files with 58 additions and 46 deletions

View File

@ -40,7 +40,7 @@ const char kTasmotaCommands[] PROGMEM =
/********************************************************************************************/
void ExecuteCommand(char *cmnd, int source)
void ExecuteCommand(char *cmnd, uint32_t source)
{
char *start;
char *token;
@ -73,65 +73,33 @@ void ExecuteCommand(char *cmnd, int source)
// topic: cmnd/sonoffs/power1 data: toggle = Mqtt command using a group topic
// topic: cmnd/DVES_83BB10_fb/power1 data: toggle = Mqtt command using fallback topic
void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
void CommandHandler(char* topic, uint8_t* data, uint32_t data_len)
{
if (data_len > MQTT_MAX_PACKET_SIZE) { return; } // Do not allow more data than would be feasable within stack space
char *str;
if (!strcmp(Settings.mqtt_prefix[0],Settings.mqtt_prefix[1])) {
str = strstr(topic,Settings.mqtt_prefix[0]);
if ((str == topic) && mqtt_cmnd_publish) {
if (mqtt_cmnd_publish > 3) {
mqtt_cmnd_publish -= 3;
} else {
mqtt_cmnd_publish = 0;
}
return;
}
}
char topicBuf[TOPSZ];
char dataBuf[data_len+1];
char command [CMDSZ];
char stemp1[TOPSZ];
char *p;
char *type = nullptr;
uint8_t lines = 1;
bool jsflg = false;
bool grpflg = false;
bool user_index = false;
uint32_t i = 0;
uint32_t index;
uint32_t address;
#ifdef USE_DEBUG_DRIVER
ShowFreeMem(PSTR("CommandHandler"));
#endif
char topicBuf[TOPSZ];
strlcpy(topicBuf, topic, sizeof(topicBuf));
uint32_t i = 0;
for (i = 0; i < data_len; i++) {
if (!isspace(data[i])) { break; }
if (!isspace(data[i])) { break; } // Skip leading spaces in data
}
data_len -= i;
char dataBuf[data_len+1];
memcpy(dataBuf, data +i, sizeof(dataBuf));
dataBuf[sizeof(dataBuf)-1] = 0;
if (topicBuf[0] != '/') { ShowSource(SRC_MQTT); }
AddLog_P2(LOG_LEVEL_DEBUG_MORE, PSTR(D_LOG_RESULT D_RECEIVED_TOPIC " %s, " D_DATA_SIZE " %d, " D_DATA " %s"), topicBuf, data_len, dataBuf);
// if (LOG_LEVEL_DEBUG_MORE <= seriallog_level) { Serial.println(dataBuf); }
if (XdrvMqttData(topicBuf, sizeof(topicBuf), dataBuf, sizeof(dataBuf))) { return; }
grpflg = (strstr(topicBuf, Settings.mqtt_grptopic) != nullptr);
bool grpflg = (strstr(topicBuf, Settings.mqtt_grptopic) != nullptr);
char stemp1[TOPSZ];
GetFallbackTopic_P(stemp1, CMND, ""); // Full Fallback topic = cmnd/DVES_xxxxxxxx_fb/
fallback_topic_flag = (!strncmp(topicBuf, stemp1, strlen(stemp1)));
type = strrchr(topicBuf, '/'); // Last part of received topic is always the command (type)
char *type = strrchr(topicBuf, '/'); // Last part of received topic is always the command (type)
index = 1;
uint32_t index = 1;
bool user_index = false;
if (type != nullptr) {
type++;
for (i = 0; i < strlen(type); i++) {
@ -147,7 +115,7 @@ void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
type[i] = '\0';
}
AddLog_P2(LOG_LEVEL_DEBUG, PSTR(D_LOG_RESULT D_GROUP " %d, " D_INDEX " %d, " D_COMMAND " %s, " D_DATA " %s"), grpflg, index, type, dataBuf);
AddLog_P2(LOG_LEVEL_DEBUG, PSTR(D_LOG_COMMAND D_GROUP " %d, " D_INDEX " %d, " D_COMMAND " \"%s\", " D_DATA " \"%s\""), grpflg, index, type, dataBuf);
if (type != nullptr) {
Response_P(PSTR("{\"" D_JSON_COMMAND "\":\"" D_JSON_ERROR "\"}"));
@ -156,6 +124,7 @@ void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
if (!strcmp(dataBuf,"?")) { data_len = 0; }
int16_t payload = -99; // No payload
uint16_t payload16 = 0;
char *p;
long payload32 = strtol(dataBuf, &p, 0); // decimal, octal (0) or hex (0x)
if (p != dataBuf) {
payload = (int16_t) payload32; // -32766 - 32767
@ -170,6 +139,7 @@ void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
// AddLog_P2(LOG_LEVEL_DEBUG, PSTR(D_LOG_RESULT "Payload %d, Payload16 %d, payload32 %u"), payload, payload16, payload32);
char command [CMDSZ];
int command_code = GetCommandCode(command, sizeof(command), type, kTasmotaCommands);
if (-1 == command_code) {
// XdrvMailbox.valid = 1;
@ -541,6 +511,8 @@ void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
}
else if (CMND_MODULES == command_code) {
uint8_t midx = USER_MODULE;
bool jsflg = false;
uint8_t lines = 1;
for (uint32_t i = 0; i <= sizeof(kModuleNiceList); i++) {
if (i > 0) { midx = pgm_read_byte(kModuleNiceList + i -1); }
if (!jsflg) {
@ -569,6 +541,7 @@ void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
}
else if (CMND_ADCS == command_code) {
Response_P(PSTR("{\"" D_CMND_ADCS "\":["));
bool jsflg = false;
for (uint32_t i = 0; i < ADC0_END; i++) {
if (jsflg) {
ResponseAppend_P(PSTR(","));
@ -599,6 +572,7 @@ void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
}
}
Response_P(PSTR("{"));
bool jsflg = false;
for (uint32_t i = 0; i < sizeof(Settings.my_gp); i++) {
if (ValidGPIO(i, cmodule.io[i])) {
if (jsflg) { ResponseAppend_P(PSTR(",")); }
@ -616,6 +590,8 @@ void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
myio cmodule;
ModuleGpios(&cmodule);
uint8_t midx;
uint8_t lines = 1;
bool jsflg = false;
for (uint32_t i = 0; i < sizeof(kGpioNiceList); i++) {
midx = pgm_read_byte(kGpioNiceList + i);
if (!GetUsedInModule(midx, cmodule.io)) {
@ -806,6 +782,7 @@ void CommandHandler(char* topic, uint8_t* data, unsigned int data_len)
Response_P(S_JSON_COMMAND_NVALUE, command, Settings.syslog_port);
}
else if ((CMND_IPADDRESS == command_code) && (index > 0) && (index <= 4)) {
uint32_t address;
if (ParseIp(&address, dataBuf)) {
Settings.ip_address[index -1] = address;
// restart_flag = 2;

View File

@ -169,6 +169,41 @@ bool MqttPublishLib(const char* topic, bool retained)
return result;
}
void MqttDataHandler(char* topic, uint8_t* data, unsigned int data_len)
{
#ifdef USE_DEBUG_DRIVER
ShowFreeMem(PSTR("MqttDataHandler"));
#endif
// Do not allow more data than would be feasable within stack space
if (data_len >= MQTT_MAX_PACKET_SIZE) { return; }
// Do not execute multiple times if Prefix1 equals Prefix2
if (!strcmp(Settings.mqtt_prefix[0], Settings.mqtt_prefix[1])) {
char *str = strstr(topic, Settings.mqtt_prefix[0]);
if ((str == topic) && mqtt_cmnd_publish) {
if (mqtt_cmnd_publish > 3) {
mqtt_cmnd_publish -= 3;
} else {
mqtt_cmnd_publish = 0;
}
return;
}
}
data[data_len] = 0;
AddLog_P2(LOG_LEVEL_DEBUG_MORE, PSTR(D_LOG_MQTT D_RECEIVED_TOPIC " %s, " D_DATA_SIZE " %d, " D_DATA " %s"), topic, data_len, data);
// if (LOG_LEVEL_DEBUG_MORE <= seriallog_level) { Serial.println(dataBuf); }
// MQTT pre-processing
if (XdrvMqttData(topic, strlen(topic), (char*)data, data_len)) { return; }
ShowSource(SRC_MQTT);
CommandHandler(topic, data, data_len);
}
/*********************************************************************************************/
#ifdef USE_DISCOVERY
@ -487,7 +522,7 @@ void MqttReconnect(void)
mqtt_initial_connection_state = 1;
}
MqttClient.setCallback(CommandHandler);
MqttClient.setCallback(MqttDataHandler);
#if defined(USE_MQTT_TLS) && defined(USE_MQTT_AWS_IOT)
MqttClient.setServer(AWS_endpoint, Settings.mqtt_port);
#else