Add MQTT binary file transfer

This commit is contained in:
Theo Arends 2021-05-19 17:21:50 +02:00
parent b284caa1fa
commit be92738c57
6 changed files with 363 additions and 236 deletions

View File

@ -24,6 +24,8 @@
* MQTT file transfer
*
* Supports both binary and base64 encoded binary data transfer
*
* See tools/mqtt-file for python ota-upload and settings-upload and download examples
\*********************************************************************************************/
#include <PubSubClient.h>
@ -41,32 +43,13 @@ struct FMQTT {
String file_md5; // MQTT received file md5 (32 chars)
uint16_t topic_size; // MQTT topic length with terminating <null>
uint8_t file_id = 0; // MQTT unique file id during upload/download
bool file_binary = false; // MQTT binary file transfer
} FMqtt;
/*
The download chunk size is the data size before it is encoded to base64.
It is smaller than the upload chunksize as it is bound by MESSZ
The download buffer with length MESSZ (1042) contains
- Payload ({"Id":117,"Data":"<base64 encoded mqtt_file_chuck_size>"}<null>)
*/
const uint32_t FileTransferHeaderSize = 21; // {"Id":116,"Data":""}<null>
const uint32_t mqtt_file_chuck_size = (((MESSZ - FileTransferHeaderSize) / 4) * 3) -2;
uint32_t FileUploadChunckSize(void) {
/*
The upload chunk size is the data size of the payload.
It can be larger than the download chunksize which is bound by MESSZ
The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE (1200) contains
- Header of 5 bytes (MQTT_MAX_HEADER_SIZE)
- Topic string terminated with a zero (stat/demo/FILEUPLOAD<null>)
- Payload ({"Id":116,"Data":"<base64 encoded FileUploadChunckSize>"}<null>) or (<binary data>)
*/
const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE
return MqttClient.getBufferSize() - PubSubClientHeaderSize - FMqtt.topic_size -1;
}
uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
if (XdrvMailbox.grpflg) { return 5; }
if (XdrvMailbox.grpflg) { return 5; } // No grouptopic supported
if ((0 == FMqtt.file_id) && (rcv_id > 0) && (FMqtt.file_size > 0) && (FMqtt.file_type > 0)) {
FMqtt.file_buffer = nullptr; // Init upload buffer
@ -92,7 +75,8 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
if (UPL_TASMOTA == FMqtt.file_type) {
if (Update.begin(FMqtt.file_size)) {
FMqtt.file_buffer = &FMqtt.file_id; // Dummy buffer
// TasmotaGlobal.blinkstate = true; // Stay lit
TasmotaGlobal.blinks = 201;
TasmotaGlobal.blinkstate = true; // Stay lit
SettingsSave(1); // Free flash for OTA update
}
}
@ -114,11 +98,12 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
ResponseCmndChar(PSTR(D_JSON_STARTED));
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD
}
else if ((FMqtt.file_id > 0) && (FMqtt.file_id != rcv_id)) {
else if (((FMqtt.file_id > 0) && (FMqtt.file_id != rcv_id)) || (0 == XdrvMailbox.payload)) {
// Error receiving data
if (UPL_TASMOTA == FMqtt.file_type) {
Update.end(true);
TasmotaGlobal.blinkstate = false; // Turn led off
}
else if (UPL_SETTINGS == FMqtt.file_type) {
SettingsBufferFree();
@ -128,6 +113,36 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
return 0; // No error
}
void MqttFileValidate(uint32_t error) {
if (error) {
FMqtt.file_buffer = nullptr;
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
if (4 == error) {
ResponseCmndChar(PSTR(D_JSON_ABORTED));
} else {
char error_txt[20];
snprintf_P(error_txt, sizeof(error_txt), PSTR(D_JSON_ERROR " %d"), error);
ResponseCmndChar(error_txt);
}
}
}
void MqttFilePublish(void) {
if (!FMqtt.file_buffer) {
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
FMqtt.file_id = 0;
FMqtt.file_size = 0;
FMqtt.file_type = 0;
FMqtt.file_binary = false;
FMqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory
FMqtt.file_password = nullptr;
}
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command);
ResponseClear();
}
void CmndFileUpload(void) {
/*
Upload <MaxSize> bytes chunks of data either base64 encoded or binary with MD5 hash
@ -152,13 +167,12 @@ void CmndFileUpload(void) {
*/
const char* base64_data = nullptr;
uint32_t rcv_id = 0;
char* dataBuf = (char*)XdrvMailbox.data;
bool binary_data = (XdrvMailbox.index > 199); // Check for raw data
if (!binary_data) {
if (strlen(dataBuf) > 8) { // Workaround exception if empty JSON like {} - Needs checks
JsonParser parser((char*) dataBuf);
if (strlen(XdrvMailbox.data) > 8) { // Workaround exception if empty JSON like {} - Needs checks
JsonParser parser((char*) XdrvMailbox.data);
JsonParserObject root = parser.getRootObject();
if (root) {
JsonParserToken val = root[PSTR("ID")];
@ -178,17 +192,7 @@ void CmndFileUpload(void) {
} else {
rcv_id = FMqtt.file_id;
}
uint32_t error = MqttFileUploadValidate(rcv_id);
if (error) {
FMqtt.file_buffer = nullptr;
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
char error_txt[20];
snprintf_P(error_txt, sizeof(error_txt), PSTR(D_JSON_ERROR " %d"), error);
ResponseCmndChar(error_txt);
}
MqttFileValidate(MqttFileUploadValidate(rcv_id));
if (FMqtt.file_buffer) {
if ((FMqtt.file_pos < FMqtt.file_size) && (binary_data || base64_data)) {
@ -232,7 +236,16 @@ void CmndFileUpload(void) {
if ((FMqtt.file_pos < FMqtt.file_size) || (FMqtt.file_md5.length() != 32)) {
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
uint32_t chunk_size = FileUploadChunckSize();
/*
The upload chunk size is the data size of the payload.
It can be larger than the download chunksize which is bound by MESSZ
The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE (1200) contains
- Header of 5 bytes (MQTT_MAX_HEADER_SIZE)
- Topic string terminated with a zero (stat/demo/FILEUPLOAD<null>)
- Payload ({"Id":116,"Data":"<base64 encoded chunk_size>"}<null>) or (<binary data>)
*/
const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE
uint32_t chunk_size = MqttClient.getBufferSize() - PubSubClientHeaderSize - FMqtt.topic_size -1;
if (!binary_data) {
chunk_size = (((chunk_size - FileTransferHeaderSize) / 4) * 3) -2; // Calculate base64 chunk size
}
@ -248,16 +261,17 @@ void CmndFileUpload(void) {
if (UPL_TASMOTA == FMqtt.file_type) {
if (!Update.end(true)) {
TasmotaGlobal.blinkstate = false; // Turn led off
ResponseCmndFailed();
} else {
TasmotaGlobal.restart_flag = 2; // Always restart to re-enable disabled features during update
TasmotaGlobal.restart_flag = 2; // Restart to load new firmware
}
}
else if (UPL_SETTINGS == FMqtt.file_type) {
if (!SettingsConfigRestore()) {
ResponseCmndFailed();
} else {
TasmotaGlobal.restart_flag = 2; // Always restart to re-enable disabled features during update
TasmotaGlobal.restart_flag = 2; // Restart to load new settings
}
}
@ -266,95 +280,137 @@ void CmndFileUpload(void) {
}
}
if (!FMqtt.file_buffer) {
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
FMqtt.file_id = 0;
FMqtt.file_size = 0;
FMqtt.file_type = 0;
FMqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory
FMqtt.file_password = nullptr;
MqttFilePublish();
}
uint32_t MqttFileDownloadValidate(void) {
if (XdrvMailbox.grpflg) { return 5; } // No grouptopic supported
if ((0 == FMqtt.file_id) && (FMqtt.file_type > 0)) {
FMqtt.file_buffer = nullptr; // Init upload buffer
if (!FMqtt.file_password || (strcmp(FMqtt.file_password, SettingsText(SET_MQTT_PWD)) != 0)) {
return 1; // Invalid password
}
FMqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255
// Init file_buffer
if (UPL_SETTINGS == FMqtt.file_type) {
uint32_t len = SettingsConfigBackup();
if (!len) { return 2; }
FMqtt.file_type = UPL_SETTINGS;
FMqtt.file_buffer = settings_buffer;
FMqtt.file_size = len;
// {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096}
Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"),
SettingsConfigFilename().c_str(), FMqtt.file_id, FMqtt.file_type, len);
}
else {
return 3; // Invalid file type
}
FMqtt.file_pos = 0;
FMqtt.md5 = MD5Builder();
FMqtt.md5.begin();
char payload[50];
snprintf_P(payload, sizeof(payload), S_JSON_COMMAND_SVALUE, XdrvMailbox.command, PSTR(D_JSON_STARTED));
MqttPublishPayloadPrefixTopic_P(STAT, XdrvMailbox.command, payload); // Enforce stat/wemos10/FILEUPLOAD
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
}
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD
ResponseClear();
else if (0 == XdrvMailbox.payload) {
if (UPL_SETTINGS == FMqtt.file_type) {
SettingsBufferFree();
}
return 4; // Upload aborted
}
return 0; // No error
}
void CmndFileDownload(void) {
/*
Download (binary) max 700 bytes chunks of data base64 encoded with MD5 hash over base64 decoded data
Currently supports Settings (file type 2)
Filedownload 0 - Abort current download
FileDownload 2 - Start download of settings file
FileDownload - Continue downloading data until reception of MD5 hash
Download chunks of data base64 encoded with MD5 hash
Supported Type:
2 - Settings
FileDownload 0 - Abort current download
Start a download session:
FileDownload {"Password":"","Type":2}
Download data using base64 until reception of MD5 hash:
FileDownload
*/
if (XdrvMailbox.grpflg) { return; }
if (FMqtt.file_id && FMqtt.file_buffer) {
bool finished = false;
if (0 == XdrvMailbox.payload) { // Abort file download
ResponseCmndChar(PSTR(D_JSON_ABORTED));
finished = true;
}
else if (FMqtt.file_pos < FMqtt.file_size) {
if (FMqtt.file_buffer) {
if (FMqtt.file_pos < FMqtt.file_size) {
uint32_t bytes_left = FMqtt.file_size - FMqtt.file_pos;
uint32_t write_bytes = (bytes_left < mqtt_file_chuck_size) ? bytes_left : mqtt_file_chuck_size;
/*
The download chunk size is the data size before it is encoded to base64.
It is smaller than the upload chunksize as it is bound by MESSZ
The download buffer with length MESSZ (1042) contains
- Payload ({"Id":117,"Data":"<base64 encoded mqtt_file_chuck_size>"}<null>)
*/
const uint32_t mqtt_file_chunk_size = (((MESSZ - FileTransferHeaderSize) / 4) * 3) -2;
uint32_t chunk_size = (FMqtt.file_binary) ? 4096 : mqtt_file_chunk_size;
uint32_t write_bytes = (bytes_left < chunk_size) ? bytes_left : chunk_size;
uint8_t* buffer = FMqtt.file_buffer + FMqtt.file_pos;
FMqtt.md5.add(buffer, write_bytes);
// {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
Response_P(PSTR("{\"Id\":%d,\"Data\":\""), FMqtt.file_id); // FileTransferHeaderSize
char base64_data[encode_base64_length(write_bytes)];
encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data);
ResponseAppend_P(base64_data);
ResponseAppend_P("\"}");
FMqtt.file_pos += write_bytes;
if (FMqtt.file_binary) {
MqttPublishPayloadPrefixTopic_P(STAT, XdrvMailbox.command, (const char*)buffer, write_bytes);
} else {
// {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
Response_P(PSTR("{\"Id\":%d,\"Data\":\""), FMqtt.file_id); // FileTransferHeaderSize
char base64_data[encode_base64_length(write_bytes)];
encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data);
ResponseAppend_P(base64_data);
ResponseAppend_P("\"}");
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command);
}
ResponseClear();
return;
} else {
FMqtt.md5.calculate();
// {"Id":117,"Md5":"496fcbb433bbca89833063174d2c5747"}
Response_P(PSTR("{\"Id\":%d,\"Md5\":\"%s\"}"), FMqtt.file_id, FMqtt.md5.toString().c_str());
finished = true;
}
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD
ResponseCmndDone();
if (finished) {
if (UPL_SETTINGS == FMqtt.file_type) {
SettingsBufferFree();
}
FMqtt.file_id = 0;
FMqtt.file_buffer = nullptr;
}
}
else if (XdrvMailbox.data_len) {
FMqtt.file_buffer = nullptr;
FMqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255
if (UPL_SETTINGS == XdrvMailbox.payload) {
uint32_t len = SettingsConfigBackup();
if (len) {
FMqtt.file_type = UPL_SETTINGS;
FMqtt.file_buffer = settings_buffer;
FMqtt.file_size = len;
// {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096}
Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"),
SettingsConfigFilename().c_str(), FMqtt.file_id, FMqtt.file_type, len);
}
}
if (FMqtt.file_buffer) {
FMqtt.file_pos = 0;
FMqtt.md5 = MD5Builder();
FMqtt.md5.begin();
} else {
FMqtt.file_id = 0;
ResponseCmndFailed();
if (strlen(XdrvMailbox.data) > 8) { // Workaround exception if empty JSON like {} - Needs checks
JsonParser parser((char*) XdrvMailbox.data);
JsonParserObject root = parser.getRootObject();
if (root) {
JsonParserToken val = root[PSTR("TYPE")];
if (val) { FMqtt.file_type = val.getUInt(); }
val = root[PSTR("BINARY")];
if (val) { FMqtt.file_binary = val.getUInt(); }
val = root[PSTR("PASSWORD")];
if (val) { FMqtt.file_password = val.getStr(); }
}
}
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command);
ResponseClear();
MqttFileValidate(MqttFileDownloadValidate());
MqttFilePublish();
}
#endif // USE_MQTT_FILE

View File

@ -179,7 +179,7 @@ void MakeValidMqtt(uint32_t option, char* str) {
* bool MqttIsConnected()
* void MqttDisconnect()
* void MqttSubscribeLib(char *topic)
* bool MqttPublishLib(const char* topic, bool retained)
* bool MqttPublishLib(const char* topic, const uint8_t* payload, unsigned int plength, bool retained)
\*********************************************************************************************/
#include <PubSubClient.h>
@ -465,7 +465,7 @@ void MqttUnsubscribeLib(const char *topic) {
MqttClient.loop(); // Solve LmacRxBlk:1 messages
}
bool MqttPublishLib(const char* topic, bool retained) {
bool MqttPublishLib(const char* topic, const uint8_t* payload, unsigned int plength, bool retained) {
// If Prefix1 equals Prefix2 disable next MQTT subscription to prevent loop
if (!strcmp(SettingsText(SET_MQTTPREFIX1), SettingsText(SET_MQTTPREFIX2))) {
char *str = strstr(topic, SettingsText(SET_MQTTPREFIX1));
@ -475,35 +475,34 @@ bool MqttPublishLib(const char* topic, bool retained) {
}
}
bool result;
#ifdef USE_MQTT_AZURE_IOT
String sourceTopicString = urlEncodeBase64(String(topic));
String topicString = "devices/" + String(SettingsText(SET_MQTT_CLIENT));
topicString+= "/messages/events/topic=" + sourceTopicString;
topicString += "/messages/events/topic=" + sourceTopicString;
JsonParser mqtt_message((char*) String(TasmotaGlobal.mqtt_data).c_str());
JsonParser mqtt_message((char*) String((const char*)payload).c_str());
JsonParserObject message_object = mqtt_message.getRootObject();
if (message_object.isValid()) { // only sending valid JSON, yet this is optional
result = MqttClient.publish(topicString.c_str(), TasmotaGlobal.mqtt_data, retained);
AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Sending '%s'"), TasmotaGlobal.mqtt_data);
} else {
AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Invalid JSON, '%s' for topic '%s', not sending to Azure IoT Hub"), TasmotaGlobal.mqtt_data, topic);
result = true;
if (!message_object.isValid()) { // only sending valid JSON, yet this is optional
AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Invalid JSON for topic '%s', not sending to Azure IoT Hub"), topic);
return true;
}
#else
result = MqttClient.publish(topic, TasmotaGlobal.mqtt_data, retained);
topic = topicString.c_str();
#endif // USE_MQTT_AZURE_IOT
yield(); // #3313
return result;
}
#ifdef DEBUG_TASMOTA_CORE
void MqttDumpData(char* topic, char* data, uint32_t data_len) {
char dump_data[data_len +1];
memcpy(dump_data, data, sizeof(dump_data)); // Make another copy for removing optional control characters
DEBUG_CORE_LOG(PSTR(D_LOG_MQTT "Size %d, \"%s %s\""), data_len, topic, RemoveControlCharacter(dump_data));
if (!MqttClient.beginPublish(topic, plength, retained)) {
// AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Connection lost or message too large"));
return false;
}
uint32_t written = MqttClient.write(payload, plength);
if (written != plength) {
AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Message too large"));
return false;
}
MqttClient.endPublish();
yield(); // #3313
return true;
}
#endif
void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len) {
#ifdef USE_DEBUG_DRIVER
@ -554,10 +553,6 @@ void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len
char data[data_len +1];
memcpy(data, mqtt_data, sizeof(data));
#ifdef DEBUG_TASMOTA_CORE
MqttDumpData(topic, data, data_len); // Use a function to save stack space used by dump_data
#endif
// MQTT pre-processing
XdrvMailbox.index = strlen(topic);
XdrvMailbox.data_len = data_len;
@ -598,21 +593,28 @@ void MqttPublishLoggingAsync(bool refresh) {
strlcpy(TasmotaGlobal.mqtt_data, line, len); // No JSON and ugly!!
char stopic[TOPSZ];
GetTopic_P(stopic, STAT, TasmotaGlobal.mqtt_topic, PSTR("LOGGING"));
MqttPublishLib(stopic, false);
MqttPublishLib(stopic, (const uint8_t*)TasmotaGlobal.mqtt_data, strlen(TasmotaGlobal.mqtt_data), false);
}
}
void MqttPublish(const char* topic, bool retained) {
void MqttPublishPayload(const char* topic, const char* payload, uint32_t binary_length, bool retained) {
// Publish <topic> payload string or binary when binary_length set with optional retained
#ifdef USE_DEBUG_DRIVER
ShowFreeMem(PSTR("MqttPublish"));
#endif
bool binary_data = (binary_length > 0);
if (!binary_data) {
binary_length = strlen(payload);
}
if (Settings.flag4.mqtt_no_retain) { // SetOption104 - Disable all MQTT retained messages, some brokers don't support it: AWS IoT, Losant
retained = false; // Some brokers don't support retained, they will disconnect if received
}
String log_data; // 20210420 Moved to heap to solve tight stack resulting in exception 2
if (Settings.flag.mqtt_enabled && MqttPublishLib(topic, retained)) { // SetOption3 - Enable MQTT
if (Settings.flag.mqtt_enabled && MqttPublishLib(topic, (const uint8_t*)payload, binary_length, retained)) { // SetOption3 - Enable MQTT
log_data = F(D_LOG_MQTT); // MQT:
log_data += topic; // stat/tasmota/STATUS2
} else {
@ -621,7 +623,7 @@ void MqttPublish(const char* topic, bool retained) {
retained = false; // Without MQTT enabled there is no retained message
}
log_data += F(" = "); // =
log_data += TasmotaGlobal.mqtt_data; // {"StatusFWR":{"Version":...
log_data += (binary_data) ? HexToString((uint8_t*)payload, binary_length) : payload;
if (retained) { log_data += F(" (" D_RETAINED ")"); } // (retained)
AddLogData(LOG_LEVEL_INFO, log_data.c_str()); // MQT: stat/tasmota/STATUS2 = {"StatusFWR":{"Version":...
@ -630,18 +632,32 @@ void MqttPublish(const char* topic, bool retained) {
}
}
void MqttPublishPayload(const char* topic, const char* payload) {
// Publish <topic> payload string no retained
MqttPublishPayload(topic, payload, 0, false);
}
void MqttPublish(const char* topic, bool retained) {
// Publish <topic> default TasmotaGlobal.mqtt_data string with optional retained
MqttPublishPayload(topic, TasmotaGlobal.mqtt_data, 0, retained);
}
void MqttPublish(const char* topic) {
// Publish <topic> default TasmotaGlobal.mqtt_data string no retained
MqttPublish(topic, false);
}
void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic, bool retained) {
/* prefix 0 = cmnd using subtopic
* prefix 1 = stat using subtopic
* prefix 2 = tele using subtopic
* prefix 4 = cmnd using subtopic or RESULT
* prefix 5 = stat using subtopic or RESULT
* prefix 6 = tele using subtopic or RESULT
*/
void MqttPublishPayloadPrefixTopic_P(uint32_t prefix, const char* subtopic, const char* payload, uint32_t binary_length, bool retained) {
/*
Publish <prefix>/<device>/<RESULT or <subtopic>> payload string or binary when binary_length set with optional retained
prefix 0 = cmnd using subtopic
prefix 1 = stat using subtopic
prefix 2 = tele using subtopic
prefix 4 = cmnd using subtopic or RESULT
prefix 5 = stat using subtopic or RESULT
prefix 6 = tele using subtopic or RESULT
*/
char romram[64];
snprintf_P(romram, sizeof(romram), ((prefix > 3) && !Settings.flag.mqtt_response) ? S_RSLT_RESULT : subtopic); // SetOption4 - Switch between MQTT RESULT or COMMAND
UpperCase(romram, romram);
@ -649,7 +665,7 @@ void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic, bool retain
prefix &= 3;
char stopic[TOPSZ];
GetTopic_P(stopic, prefix, TasmotaGlobal.mqtt_topic, romram);
MqttPublish(stopic, retained);
MqttPublishPayload(stopic, payload, binary_length, retained);
#if defined(USE_MQTT_AWS_IOT) || defined(USE_MQTT_AWS_IOT_LIGHT)
if ((prefix > 0) && (Settings.flag4.awsiot_shadow) && (Mqtt.connected)) { // placeholder for SetOptionXX
@ -669,33 +685,53 @@ void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic, bool retain
snprintf_P(romram, sizeof(romram), PSTR("$aws/things/%s/shadow/update"), topic2);
// copy buffer
char *mqtt_save = (char*) malloc(strlen(TasmotaGlobal.mqtt_data)+1);
if (!mqtt_save) { return; } // abort
strcpy(mqtt_save, TasmotaGlobal.mqtt_data);
snprintf_P(TasmotaGlobal.mqtt_data, sizeof(TasmotaGlobal.mqtt_data), PSTR("{\"state\":{\"reported\":%s}}"), mqtt_save);
free(mqtt_save);
String aws_payload = F("{\"state\":{\"reported\":%s}}");
aws_payload += payload;
MqttClient.publish(romram, aws_payload.c_str(), false);
bool result = MqttClient.publish(romram, TasmotaGlobal.mqtt_data, false);
AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_MQTT "Updated shadow: %s"), romram);
yield(); // #3313
}
#endif // USE_MQTT_AWS_IOT
}
void MqttPublishPayloadPrefixTopic_P(uint32_t prefix, const char* subtopic, const char* payload, uint32_t binary_length) {
// Publish <prefix>/<device>/<RESULT or <subtopic>> payload string or binary when binary_length set no retained
MqttPublishPayloadPrefixTopic_P(prefix, subtopic, payload, binary_length, false);
}
void MqttPublishPayloadPrefixTopic_P(uint32_t prefix, const char* subtopic, const char* payload) {
// Publish <prefix>/<device>/<RESULT or <subtopic>> payload string no retained
MqttPublishPayloadPrefixTopic_P(prefix, subtopic, payload, 0, false);
}
void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic, bool retained) {
// Publish <prefix>/<device>/<RESULT or <subtopic>> default TasmotaGlobal.mqtt_data string with optional retained
MqttPublishPayloadPrefixTopic_P(prefix, subtopic, TasmotaGlobal.mqtt_data, 0, retained);
}
void MqttPublishPrefixTopic_P(uint32_t prefix, const char* subtopic) {
// Publish <prefix>/<device>/<RESULT or <subtopic>> default TasmotaGlobal.mqtt_data string no retained
MqttPublishPrefixTopic_P(prefix, subtopic, false);
}
void MqttPublishPrefixTopicRulesProcess_P(uint32_t prefix, const char* subtopic, bool retained) {
// Publish <prefix>/<device>/<RESULT or <subtopic>> default TasmotaGlobal.mqtt_data string with optional retained
// then process rules
MqttPublishPrefixTopic_P(prefix, subtopic, retained);
XdrvRulesProcess(0);
}
void MqttPublishPrefixTopicRulesProcess_P(uint32_t prefix, const char* subtopic) {
// Publish <prefix>/<device>/<RESULT or <subtopic>> default TasmotaGlobal.mqtt_data string no retained
// then process rules
MqttPublishPrefixTopicRulesProcess_P(prefix, subtopic, false);
}
void MqttPublishTeleSensor(void) {
// Publish tele/<device>/SENSOR default TasmotaGlobal.mqtt_data string with optional retained
// then process rules
MqttPublishPrefixTopicRulesProcess_P(TELE, PSTR(D_RSLT_SENSOR), Settings.flag.mqtt_sensor_retain); // CMND_SENSORRETAIN
}

View File

@ -360,8 +360,9 @@ bool DomoticzSendKey(uint8_t key, uint8_t device, uint8_t state, uint8_t svalflg
\*********************************************************************************************/
void DomoticzSendData(uint32_t sensor_idx, uint32_t idx, char *data) {
char payload[128];
if (DZ_AIRQUALITY == sensor_idx) {
Response_P(PSTR("{\"idx\":%d,\"nvalue\":%s,\"Battery\":%d,\"RSSI\":%d}"),
snprintf_P(payload, sizeof(payload), PSTR("{\"idx\":%d,\"nvalue\":%s,\"Battery\":%d,\"RSSI\":%d}"),
idx, data, DomoticzBatteryQuality(), DomoticzRssiQuality());
} else {
uint8_t nvalue = 0;
@ -371,19 +372,15 @@ void DomoticzSendData(uint32_t sensor_idx, uint32_t idx, char *data) {
nvalue = position < 2 ? 0 : (position == 100 ? 1 : 2);
}
#endif // USE_SHUTTER
Response_P(DOMOTICZ_MESSAGE, // "{\"idx\":%d,\"nvalue\":%d,\"svalue\":\"%s\",\"Battery\":%d,\"RSSI\":%d}"
snprintf_P(payload, sizeof(payload), DOMOTICZ_MESSAGE, // "{\"idx\":%d,\"nvalue\":%d,\"svalue\":\"%s\",\"Battery\":%d,\"RSSI\":%d}"
idx, nvalue, data, DomoticzBatteryQuality(), DomoticzRssiQuality());
}
MqttPublish(domoticz_in_topic);
MqttPublishPayload(domoticz_in_topic, payload);
}
void DomoticzSensor(uint8_t idx, char *data) {
if (Settings.domoticz_sensor_idx[idx]) {
char dmess[128]; // {"idx":26700,"nvalue":0,"svalue":"22330.1;10234.4;22000.5;10243.4;1006;3000","Battery":100,"RSSI":10}
memcpy(dmess, TasmotaGlobal.mqtt_data, sizeof(dmess));
DomoticzSendData(idx, Settings.domoticz_sensor_idx[idx], data);
memcpy(TasmotaGlobal.mqtt_data, dmess, sizeof(dmess));
}
}

View File

@ -42,16 +42,20 @@ import json
broker = "domus1" # MQTT broker ip address or name
broker_port = 1883 # MQTT broker port
mypassword = "" # Tasmota MQTT password
mytopic = "demo" # Tasmota MQTT topic
myfiletype = 2 # Tasmota Settings file type
# **** End of User Configuration Section
use_base64 = True
# Derive fulltopic from broker LWT message
mypublish = "cmnd/"+mytopic+"/filedownload"
mysubscribe = "stat/"+mytopic+"/FILEDOWNLOAD" # Case sensitive
Ack_flag = False
Err_flag = False
file_name = ""
file_id = 0
@ -62,6 +66,7 @@ file_md5 = ""
# The callback for when mysubscribe message is received
def on_message(client, userdata, msg):
global Ack_flag
global Err_flag
global Run_flag
global file_name
global file_id
@ -73,56 +78,76 @@ def on_message(client, userdata, msg):
base64_data = ""
rcv_id = 0
# print("Received message =",str(msg.payload.decode("utf-8")))
# try:
# print("Received message =",str(msg.payload.decode("utf-8")))
# except:
# print("Received message = binary data")
root = json.loads(msg.payload.decode("utf-8"))
if "File" in root: file_name = root["File"]
if "Id" in root: rcv_id = root["Id"]
if "Type" in root: file_type = root["Type"]
if "Size" in root: file_size = root["Size"]
if "Data" in root: base64_data = root["Data"]
if "Md5" in root: file_md5 = root["Md5"]
try:
root = json.loads(msg.payload.decode("utf-8"))
if root:
if "FileDownload" in root:
rcv_code = root["FileDownload"]
if "Started" in rcv_code:
return
if "Error" in rcv_code:
print("Error: "+rcv_code)
Err_flag = True
return
if "Command" in root:
rcv_code = root["Command"]
if rcv_code == "Error":
print("Error: Command error")
Err_flag = True
return
if "File" in root: file_name = root["File"]
if "Id" in root: rcv_id = root["Id"]
if "Type" in root: file_type = root["Type"]
if "Size" in root: file_size = root["Size"]
if "Data" in root: base64_data = root["Data"]
if "Md5" in root: file_md5 = root["Md5"]
except:
pass
if file_id == 0 and rcv_id > 0 and file_size > 0 and file_type > 0 and file_name:
file_id = rcv_id
fi = open(file_name,"wb")
fi.close()
else:
if file_id > 0 and file_id != rcv_id:
Run_flag = False
if use_base64 and file_id > 0 and file_id != rcv_id:
Err_flag = True
return
if file_md5 == "" and base64_data:
base64_decoded_data = base64_data.encode('utf-8')
chunk = base64.decodebytes(base64_decoded_data)
in_hash_md5.update(chunk) # Update hash
fi = open(file_name,"ab")
fi.write(chunk)
fi.close()
if file_md5 == "" and file_name:
if use_base64 and base64_data:
base64_decoded_data = base64_data.encode('utf-8')
chunk = base64.decodebytes(base64_decoded_data)
in_hash_md5.update(chunk) # Update hash
fi = open(file_name,"ab")
fi.write(chunk)
fi.close()
if use_base64 == False and 0 == rcv_id:
chunk = msg.payload
in_hash_md5.update(chunk) # Update hash
fi = open(file_name,"ab")
fi.write(chunk)
fi.close()
if file_md5 != "":
md5_hash = in_hash_md5.hexdigest()
if md5_hash != file_md5:
print("Error: MD5 mismatch")
Run_flag = False
Err_flag = True
Ack_flag = False
def wait_for_ack():
global Ack_flag
global Run_flag
if Run_flag == False:
print("Error: Transmission")
return True
timeout = 100
while Ack_flag and timeout > 0:
while Ack_flag and Err_flag == False and timeout > 0:
time.sleep(0.01)
timeout = timeout -1
if Ack_flag:
if 0 == timeout:
print("Error: Timeout")
return Ack_flag
@ -138,35 +163,32 @@ print("Downloading file from "+mytopic+" ...")
in_hash_md5 = hashlib.md5()
Err_flag = False
client.publish(mypublish, str(myfiletype))
if use_base64:
client.publish(mypublish, "{\"Password\":\""+mypassword+"\",\"Type\":"+str(myfiletype)+"}")
else:
client.publish(mypublish, "{\"Password\":\""+mypassword+"\",\"Type\":"+str(myfiletype)+",\"Binary\":1}")
Ack_flag = True
Run_flag = True
while Run_flag:
if wait_for_ack(): # We use Ack here
Err_flag = True
client.publish(mypublish, "0") # Abort any failed download
Run_flag = False
else:
if file_md5 == "":
if file_md5 == "": # Request chunk
client.publish(mypublish, "?")
Ack_flag = True
else:
Run_flag = False
if Err_flag:
client.publish(mypublish, "0") # Abort any failed download
if Err_flag == False:
file_type_name = "Data"
if file_type == 2:
file_type_name = "Settings"
print("Downloaded "+file_type_name+" saved as "+file_name)
time_taken = time.time() - time_start
file_type_name = " Data"
if file_type == 2:
file_type_name = " Settings"
print("Downloaded"+file_type_name+" saved as "+file_name)
print("Done in "+str("%.2f"%time_taken)+" seconds")
client.disconnect() # Disconnect

View File

@ -50,19 +50,22 @@ myfiletype = 1 # Tasmota firmware file type
# **** End of User Configuration Section
use_base64 = False
# Derive fulltopic from broker LWT message
mypublish = "cmnd/"+mytopic+"/fileupload"
mysubscribe = "stat/"+mytopic+"/FILEUPLOAD" # Case sensitive
Ack_flag = False
Err_flag = False
use_base64 = False
file_id = 114 # Even id between 2 and 254
file_chunk_size = 700 # Default Tasmota MQTT max message size
# The callback for when mysubscribe message is received
def on_message(client, userdata, msg):
global Ack_flag
global Err_flag
global file_chunk_size
rcv_code = ""
@ -71,31 +74,35 @@ def on_message(client, userdata, msg):
# print("Received message =",str(msg.payload.decode("utf-8")))
root = json.loads(msg.payload.decode("utf-8"))
if "FileUpload" in root: rcv_code = root["FileUpload"]
if "Error" in rcv_code:
print("Error: "+rcv_code)
return
if "Command" in root: rcv_code = root["Command"]
if rcv_code == "Error":
print("Error: Command error")
return
if "Id" in root: rcv_id = root["Id"]
if rcv_id == file_id:
if "MaxSize" in root: file_chunk_size = root["MaxSize"]
if "FileUpload" in root:
rcv_code = root["FileUpload"]
if "Started" in rcv_code:
return
if "Error" in rcv_code:
print("Error: "+rcv_code)
Err_flag = True
return
if "Command" in root:
rcv_code = root["Command"]
if rcv_code == "Error":
print("Error: Command error")
Err_flag = True
return
if "Id" in root:
rcv_id = root["Id"]
if rcv_id == file_id:
if "MaxSize" in root: file_chunk_size = root["MaxSize"]
Ack_flag = False
def wait_for_ack():
global Ack_flag
timeout = 100
while Ack_flag and timeout > 0:
while Ack_flag and Err_flag == False and timeout > 0:
time.sleep(0.01)
timeout = timeout -1
if Ack_flag:
print("Error: Ack timeout")
if 0 == timeout:
print("Error: Timeout")
return Ack_flag
@ -120,13 +127,14 @@ out_hash_md5 = hashlib.md5()
Run_flag = True
while Run_flag:
if wait_for_ack(): # We use Ack here
if wait_for_ack(): # We use Ack here
client.publish(mypublish, "0") # Abort any failed upload
Run_flag = False
else:
chunk = fo.read(file_chunk_size)
if chunk:
out_hash_md5.update(chunk) # Update hash
out_hash_md5.update(chunk) # Update hash
if use_base64:
base64_encoded_data = base64.b64encode(chunk)
base64_data = base64_encoded_data.decode('utf-8')

View File

@ -44,24 +44,27 @@ broker_port = 1883 # MQTT broker port
mypassword = "" # Tasmota MQTT password
mytopic = "demo" # Tasmota MQTT topic
myfile = "Config_demo_9.4.0.3.dmp" # Tasmota Settings file name
myfile = "Config_demo_9.4.0.4.dmp" # Tasmota Settings file name
myfiletype = 2 # Tasmota Settings file type
# **** End of User Configuration Section
use_base64 = True
# Derive fulltopic from broker LWT message
mypublish = "cmnd/"+mytopic+"/fileupload"
mysubscribe = "stat/"+mytopic+"/FILEUPLOAD" # Case sensitive
Ack_flag = False
Err_flag = False
use_base64 = True
file_id = 116 # Even id between 2 and 254
file_chunk_size = 700 # Default Tasmota MQTT max message size
# The callback for when mysubscribe message is received
def on_message(client, userdata, msg):
global Ack_flag
global Err_flag
global file_chunk_size
rcv_code = ""
@ -70,31 +73,35 @@ def on_message(client, userdata, msg):
# print("Received message =",str(msg.payload.decode("utf-8")))
root = json.loads(msg.payload.decode("utf-8"))
if "FileUpload" in root: rcv_code = root["FileUpload"]
if "Error" in rcv_code:
print("Error: "+rcv_code)
return
if "Command" in root: rcv_code = root["Command"]
if rcv_code == "Error":
print("Error: Command error")
return
if "Id" in root: rcv_id = root["Id"]
if rcv_id == file_id:
if "MaxSize" in root: file_chunk_size = root["MaxSize"]
if "FileUpload" in root:
rcv_code = root["FileUpload"]
if "Started" in rcv_code:
return
if "Error" in rcv_code:
print("Error: "+rcv_code)
Err_flag = True
return
if "Command" in root:
rcv_code = root["Command"]
if rcv_code == "Error":
print("Error: Command error")
Err_flag = True
return
if "Id" in root:
rcv_id = root["Id"]
if rcv_id == file_id:
if "MaxSize" in root: file_chunk_size = root["MaxSize"]
Ack_flag = False
def wait_for_ack():
global Ack_flag
timeout = 100
while Ack_flag and timeout > 0:
while Ack_flag and Err_flag == False and timeout > 0:
time.sleep(0.01)
timeout = timeout -1
if Ack_flag:
print("Error: Ack timeout")
if 0 == timeout:
print("Error: Timeout")
return Ack_flag
@ -119,7 +126,8 @@ out_hash_md5 = hashlib.md5()
Run_flag = True
while Run_flag:
if wait_for_ack(): # We use Ack here
if wait_for_ack(): # We use Ack here
client.publish(mypublish, "0") # Abort any failed upload
Run_flag = False
else: