mirror of https://github.com/arendst/Tasmota.git
Prep for MQTT binary file transfer
This commit is contained in:
parent
44577c7772
commit
75c783d2d5
|
@ -192,11 +192,6 @@ void CommandHandler(char* topicBuf, char* dataBuf, uint32_t data_len)
|
|||
ShowFreeMem(PSTR("CommandHandler"));
|
||||
#endif
|
||||
|
||||
while (*dataBuf && isspace(*dataBuf)) {
|
||||
dataBuf++; // Skip leading spaces in data
|
||||
data_len--;
|
||||
}
|
||||
|
||||
bool grpflg = false;
|
||||
uint32_t real_index = SET_MQTT_GRP_TOPIC;
|
||||
for (uint32_t i = 0; i < MAX_GROUP_TOPICS; i++) {
|
||||
|
@ -237,20 +232,32 @@ void CommandHandler(char* topicBuf, char* dataBuf, uint32_t data_len)
|
|||
type[i] = '\0';
|
||||
}
|
||||
|
||||
AddLog_P(LOG_LEVEL_DEBUG, PSTR("CMD: " D_GROUP " %d, " D_INDEX " %d, " D_COMMAND " \"%s\", " D_DATA " \"%s\""), grpflg, index, type, dataBuf);
|
||||
bool binary_data = (index > 199); // Suppose binary data on topic index > 199
|
||||
if (!binary_data) {
|
||||
while (*dataBuf && isspace(*dataBuf)) {
|
||||
dataBuf++; // Skip leading spaces in data
|
||||
data_len--;
|
||||
}
|
||||
}
|
||||
|
||||
AddLog_P(LOG_LEVEL_DEBUG, PSTR("CMD: " D_GROUP " %d, " D_INDEX " %d, " D_COMMAND " \"%s\", " D_DATA " \"%s\""),
|
||||
grpflg, index, type, (binary_data) ? PSTR("Binary") : dataBuf);
|
||||
|
||||
if (type != nullptr) {
|
||||
Response_P(PSTR("{\"" D_JSON_COMMAND "\":\"" D_JSON_ERROR "\"}"));
|
||||
|
||||
if (Settings.ledstate &0x02) { TasmotaGlobal.blinks++; }
|
||||
|
||||
if (!strcmp(dataBuf,"?")) { data_len = 0; }
|
||||
int32_t payload = -99;
|
||||
if (!binary_data) {
|
||||
if (!strcmp(dataBuf,"?")) { data_len = 0; }
|
||||
|
||||
char *p;
|
||||
int32_t payload = strtol(dataBuf, &p, 0); // decimal, octal (0) or hex (0x)
|
||||
if (p == dataBuf) { payload = -99; }
|
||||
int temp_payload = GetStateNumber(dataBuf);
|
||||
if (temp_payload > -1) { payload = temp_payload; }
|
||||
char *p;
|
||||
payload = strtol(dataBuf, &p, 0); // decimal, octal (0) or hex (0x)
|
||||
if (p == dataBuf) { payload = -99; }
|
||||
int temp_payload = GetStateNumber(dataBuf);
|
||||
if (temp_payload > -1) { payload = temp_payload; }
|
||||
}
|
||||
|
||||
DEBUG_CORE_LOG(PSTR("CMD: Payload %d"), payload);
|
||||
|
||||
|
|
|
@ -0,0 +1,290 @@
|
|||
/*
|
||||
xdrv_02_mqtt_1_file.ino - mqtt file support for Tasmota
|
||||
|
||||
Copyright (C) 2021 Theo Arends
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define USE_MQTT_FILE
|
||||
|
||||
#ifdef USE_MQTT_FILE
|
||||
/*********************************************************************************************\
|
||||
* MQTT file transfer
|
||||
*
|
||||
* Supports base64 encoded binary data transfer
|
||||
\*********************************************************************************************/
|
||||
|
||||
#include <base64.hpp>
|
||||
|
||||
struct FMQTT {
|
||||
uint32_t file_pos = 0; // MQTT file position during upload/download
|
||||
uint32_t file_size = 0; // MQTT total file size
|
||||
uint32_t file_type = 0; // MQTT File type (See UploadTypes)
|
||||
uint8_t* file_buffer = nullptr; // MQTT file buffer
|
||||
MD5Builder md5; // MQTT md5
|
||||
String file_md5; // MQTT received file md5 (32 chars)
|
||||
uint16_t topic_size; // MQTT topic length with terminating <null>
|
||||
uint8_t file_id = 0; // MQTT unique file id during upload/download
|
||||
} FMqtt;
|
||||
|
||||
void MqttTopicSize(uint32_t topic_size) {
|
||||
FMqtt.topic_size = topic_size +1;
|
||||
}
|
||||
|
||||
/*
|
||||
The download chunk size is the data size before it is encoded to base64.
|
||||
It is smaller than the upload chunksize as it is bound by MESSZ
|
||||
The download buffer with length MESSZ (1042) contains
|
||||
- Payload ({"Id":117,"Data":"<base64 encoded mqtt_file_chuck_size>"}<null>)
|
||||
*/
|
||||
const uint32_t FileTransferHeaderSize = 21; // {"Id":116,"Data":""}<null>
|
||||
const uint32_t mqtt_file_chuck_size = (((MESSZ - FileTransferHeaderSize) / 4) * 3) -2;
|
||||
|
||||
uint32_t FileUploadChunckSize(void) {
|
||||
/*
|
||||
The upload chunk size is the data size before it is encoded to base64.
|
||||
It can be larger than the download chunksize which is bound by MESSZ
|
||||
The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE (1200) contains
|
||||
- Header of 5 bytes (MQTT_MAX_HEADER_SIZE)
|
||||
- Topic string terminated with a zero (stat/demo/FILEUPLOAD<null>)
|
||||
- Payload ({"Id":116,"Data":"<base64 encoded FileUploadChunckSize>"}<null>)
|
||||
*/
|
||||
const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE
|
||||
|
||||
return (((MQTT_MAX_PACKET_SIZE - PubSubClientHeaderSize - FMqtt.topic_size - FileTransferHeaderSize) / 4) * 3) -2;
|
||||
}
|
||||
|
||||
void CmndFileUpload(void) {
|
||||
/*
|
||||
Upload (binary) max 700 bytes chunks of data base64 encoded with MD5 hash over base64 decoded data
|
||||
FileUpload 0 - Abort current upload
|
||||
FileUpload {"File":"Config_wemos10_9.4.0.3.dmp","Id":116,"Type":2,"Size":4096}
|
||||
FileUpload {"Id":116,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
|
||||
FileUpload {"Id":116,"Data":" ... "}
|
||||
FileUpload {"Id":116,"Md5":"496fcbb433bbca89833063174d2c5747"}
|
||||
*/
|
||||
const char* base64_data = nullptr;
|
||||
uint32_t rcv_id = 0;
|
||||
|
||||
char* dataBuf = (char*)XdrvMailbox.data;
|
||||
if (strlen(dataBuf) > 8) { // Workaround exception if empty JSON like {} - Needs checks
|
||||
JsonParser parser((char*) dataBuf);
|
||||
JsonParserObject root = parser.getRootObject();
|
||||
if (root) {
|
||||
JsonParserToken val = root[PSTR("ID")];
|
||||
if (val) { rcv_id = val.getUInt(); }
|
||||
val = root[PSTR("TYPE")];
|
||||
if (val) { FMqtt.file_type = val.getUInt(); }
|
||||
val = root[PSTR("SIZE")];
|
||||
if (val) { FMqtt.file_size = val.getUInt(); }
|
||||
val = root[PSTR("MD5")];
|
||||
if (val) { FMqtt.file_md5 = val.getStr(); }
|
||||
val = root[PSTR("DATA")];
|
||||
if (val) { base64_data = val.getStr(); }
|
||||
}
|
||||
}
|
||||
|
||||
if ((0 == FMqtt.file_id) && (rcv_id > 0) && (FMqtt.file_size > 0) && (FMqtt.file_type > 0)) {
|
||||
// Init upload buffer
|
||||
FMqtt.file_buffer = nullptr;
|
||||
|
||||
if (UPL_TASMOTA == FMqtt.file_type) {
|
||||
if (Update.begin(FMqtt.file_size)) {
|
||||
FMqtt.file_buffer = &FMqtt.file_id; // Dummy buffer
|
||||
}
|
||||
}
|
||||
else if (UPL_SETTINGS == FMqtt.file_type) {
|
||||
if (SettingsConfigBackup()) {
|
||||
FMqtt.file_buffer = settings_buffer;
|
||||
}
|
||||
}
|
||||
|
||||
if (!FMqtt.file_buffer) {
|
||||
ResponseCmndChar(PSTR(D_JSON_INVALID_FILE_TYPE));
|
||||
} else {
|
||||
FMqtt.file_id = rcv_id;
|
||||
FMqtt.file_pos = 0;
|
||||
|
||||
FMqtt.md5 = MD5Builder();
|
||||
FMqtt.md5.begin();
|
||||
|
||||
ResponseCmndChar(PSTR(D_JSON_STARTED));
|
||||
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD
|
||||
}
|
||||
}
|
||||
else if ((FMqtt.file_id > 0) && (FMqtt.file_id != rcv_id)) {
|
||||
// Error receiving data
|
||||
|
||||
if (UPL_TASMOTA == FMqtt.file_type) {
|
||||
Update.end(true);
|
||||
}
|
||||
else if (UPL_SETTINGS == FMqtt.file_type) {
|
||||
SettingsBufferFree();
|
||||
}
|
||||
|
||||
FMqtt.file_buffer = nullptr;
|
||||
ResponseCmndChar(PSTR(D_JSON_ABORTED));
|
||||
}
|
||||
|
||||
if (FMqtt.file_buffer) {
|
||||
if ((FMqtt.file_pos < FMqtt.file_size) && base64_data) {
|
||||
// Save upload into buffer - Handle possible buffer overflows
|
||||
uint32_t rcvd_bytes = decode_base64_length((unsigned char*)base64_data);
|
||||
unsigned char decode_output[rcvd_bytes];
|
||||
decode_base64((unsigned char*)base64_data, (unsigned char*)decode_output);
|
||||
|
||||
uint32_t bytes_left = FMqtt.file_size - FMqtt.file_pos;
|
||||
uint32_t read_bytes = (bytes_left < rcvd_bytes) ? bytes_left : rcvd_bytes;
|
||||
FMqtt.md5.add(decode_output, read_bytes);
|
||||
|
||||
if (UPL_TASMOTA == FMqtt.file_type) {
|
||||
Update.write(decode_output, read_bytes);
|
||||
} else {
|
||||
uint8_t* buffer = FMqtt.file_buffer + FMqtt.file_pos;
|
||||
memcpy(buffer, decode_output, read_bytes);
|
||||
}
|
||||
|
||||
FMqtt.file_pos += read_bytes;
|
||||
|
||||
if ((FMqtt.file_pos > rcvd_bytes) && ((FMqtt.file_pos % 102400) <= rcvd_bytes)) {
|
||||
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
|
||||
AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_UPLOAD "Progress %d kB"), FMqtt.file_pos / 1024);
|
||||
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
|
||||
}
|
||||
}
|
||||
|
||||
if ((FMqtt.file_pos < FMqtt.file_size) || (FMqtt.file_md5.length() != 32)) {
|
||||
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
|
||||
|
||||
// {"Id":116,"MaxSize":"765"}
|
||||
Response_P(PSTR("{\"Id\":%d,\"MaxSize\":%d}"), FMqtt.file_id, FileUploadChunckSize());
|
||||
} else {
|
||||
FMqtt.md5.calculate();
|
||||
if (strcasecmp(FMqtt.file_md5.c_str(), FMqtt.md5.toString().c_str())) {
|
||||
ResponseCmndChar(PSTR(D_JSON_MD5_MISMATCH));
|
||||
} else {
|
||||
// Process upload data en free buffer
|
||||
ResponseCmndDone();
|
||||
|
||||
if (UPL_TASMOTA == FMqtt.file_type) {
|
||||
if (!Update.end(true)) {
|
||||
ResponseCmndFailed();
|
||||
} else {
|
||||
TasmotaGlobal.restart_flag = 2; // Always restart to re-enable disabled features during update
|
||||
}
|
||||
}
|
||||
else if (UPL_SETTINGS == FMqtt.file_type) {
|
||||
if (!SettingsConfigRestore()) {
|
||||
ResponseCmndFailed();
|
||||
} else {
|
||||
TasmotaGlobal.restart_flag = 2; // Always restart to re-enable disabled features during update
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
FMqtt.file_buffer = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (!FMqtt.file_buffer) {
|
||||
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
|
||||
FMqtt.file_id = 0;
|
||||
FMqtt.file_size = 0;
|
||||
FMqtt.file_type = 0;
|
||||
FMqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory
|
||||
}
|
||||
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD
|
||||
ResponseClear();
|
||||
}
|
||||
|
||||
void CmndFileDownload(void) {
|
||||
/*
|
||||
Download (binary) max 700 bytes chunks of data base64 encoded with MD5 hash over base64 decoded data
|
||||
Currently supports Settings (file type 2)
|
||||
Filedownload 0 - Abort current download
|
||||
FileDownload 2 - Start download of settings file
|
||||
FileDownload - Continue downloading data until reception of MD5 hash
|
||||
*/
|
||||
|
||||
if (FMqtt.file_id && FMqtt.file_buffer) {
|
||||
bool finished = false;
|
||||
|
||||
if (0 == XdrvMailbox.payload) { // Abort file download
|
||||
ResponseCmndChar(PSTR(D_JSON_ABORTED));
|
||||
finished = true;
|
||||
}
|
||||
else if (FMqtt.file_pos < FMqtt.file_size) {
|
||||
uint32_t bytes_left = FMqtt.file_size - FMqtt.file_pos;
|
||||
uint32_t write_bytes = (bytes_left < mqtt_file_chuck_size) ? bytes_left : mqtt_file_chuck_size;
|
||||
|
||||
uint8_t* buffer = FMqtt.file_buffer + FMqtt.file_pos;
|
||||
FMqtt.md5.add(buffer, write_bytes);
|
||||
|
||||
// {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
|
||||
Response_P(PSTR("{\"Id\":%d,\"Data\":\""), FMqtt.file_id); // FileTransferHeaderSize
|
||||
char base64_data[encode_base64_length(write_bytes)];
|
||||
encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data);
|
||||
ResponseAppend_P(base64_data);
|
||||
ResponseAppend_P("\"}");
|
||||
|
||||
FMqtt.file_pos += write_bytes;
|
||||
} else {
|
||||
FMqtt.md5.calculate();
|
||||
|
||||
// {"Id":117,"Md5":"496fcbb433bbca89833063174d2c5747"}
|
||||
Response_P(PSTR("{\"Id\":%d,\"Md5\":\"%s\"}"), FMqtt.file_id, FMqtt.md5.toString().c_str());
|
||||
finished = true;
|
||||
}
|
||||
|
||||
if (finished) {
|
||||
if (UPL_SETTINGS == FMqtt.file_type) {
|
||||
SettingsBufferFree();
|
||||
}
|
||||
|
||||
FMqtt.file_id = 0;
|
||||
}
|
||||
}
|
||||
else if (XdrvMailbox.data_len) {
|
||||
FMqtt.file_buffer = nullptr;
|
||||
FMqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255
|
||||
|
||||
if (UPL_SETTINGS == XdrvMailbox.payload) {
|
||||
uint32_t len = SettingsConfigBackup();
|
||||
if (len) {
|
||||
FMqtt.file_type = UPL_SETTINGS;
|
||||
FMqtt.file_buffer = settings_buffer;
|
||||
FMqtt.file_size = len;
|
||||
|
||||
// {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096}
|
||||
Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"),
|
||||
SettingsConfigFilename().c_str(), FMqtt.file_id, FMqtt.file_type, len);
|
||||
}
|
||||
}
|
||||
|
||||
if (FMqtt.file_buffer) {
|
||||
FMqtt.file_pos = 0;
|
||||
|
||||
FMqtt.md5 = MD5Builder();
|
||||
FMqtt.md5.begin();
|
||||
} else {
|
||||
FMqtt.file_id = 0;
|
||||
ResponseCmndFailed();
|
||||
}
|
||||
}
|
||||
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command);
|
||||
ResponseClear();
|
||||
}
|
||||
|
||||
#endif // USE_MQTT_FILE
|
|
@ -23,8 +23,6 @@
|
|||
#define MQTT_WIFI_CLIENT_TIMEOUT 200 // Wifi TCP connection timeout (default is 5000 mSec)
|
||||
#endif
|
||||
|
||||
#include <base64.hpp>
|
||||
|
||||
#define USE_MQTT_NEW_PUBSUBCLIENT
|
||||
|
||||
// #define DEBUG_DUMP_TLS // allow dumping of TLS Flash keys
|
||||
|
@ -45,6 +43,7 @@ WiFiClient EspClient; // Wifi Client - non-TLS
|
|||
HTTPClient httpsClient;
|
||||
int httpsClientReturn;
|
||||
#endif // USE_MQTT_AZURE_DPS_SCOPEID
|
||||
#include <base64.hpp>
|
||||
#include <t_bearssl.h>
|
||||
#include <JsonParser.h>
|
||||
#endif // USE_MQTT_AZURE_IOT
|
||||
|
@ -64,10 +63,13 @@ const char kMqttCommands[] PROGMEM = "|" // No prefix
|
|||
#if defined(USE_MQTT_TLS) && defined(USE_MQTT_AWS_IOT)
|
||||
D_CMND_TLSKEY "|"
|
||||
#endif
|
||||
#ifdef USE_MQTT_FILE
|
||||
D_CMND_FILEUPLOAD "|" D_CMND_FILEDOWNLOAD "|"
|
||||
#endif // USE_MQTT_FILE
|
||||
D_CMND_MQTTHOST "|" D_CMND_MQTTPORT "|" D_CMND_MQTTRETRY "|" D_CMND_STATETEXT "|" D_CMND_MQTTCLIENT "|"
|
||||
D_CMND_FULLTOPIC "|" D_CMND_PREFIX "|" D_CMND_GROUPTOPIC "|" D_CMND_TOPIC "|" D_CMND_PUBLISH "|" D_CMND_MQTTLOG "|"
|
||||
D_CMND_BUTTONTOPIC "|" D_CMND_SWITCHTOPIC "|" D_CMND_BUTTONRETAIN "|" D_CMND_SWITCHRETAIN "|" D_CMND_POWERRETAIN "|"
|
||||
D_CMND_SENSORRETAIN "|" D_CMND_INFORETAIN "|" D_CMND_STATERETAIN "|" D_CMND_FILEUPLOAD "|" D_CMND_FILEDOWNLOAD ;
|
||||
D_CMND_SENSORRETAIN "|" D_CMND_INFORETAIN "|" D_CMND_STATERETAIN ;
|
||||
|
||||
SO_SYNONYMS(kMqttSynonyms,
|
||||
90,
|
||||
|
@ -90,24 +92,19 @@ void (* const MqttCommand[])(void) PROGMEM = {
|
|||
#if defined(USE_MQTT_TLS) && defined(USE_MQTT_AWS_IOT)
|
||||
&CmndTlsKey,
|
||||
#endif
|
||||
#ifdef USE_MQTT_FILE
|
||||
&CmndFileUpload, &CmndFileDownload,
|
||||
#endif // USE_MQTT_FILE
|
||||
&CmndMqttHost, &CmndMqttPort, &CmndMqttRetry, &CmndStateText, &CmndMqttClient,
|
||||
&CmndFullTopic, &CmndPrefix, &CmndGroupTopic, &CmndTopic, &CmndPublish, &CmndMqttlog,
|
||||
&CmndButtonTopic, &CmndSwitchTopic, &CmndButtonRetain, &CmndSwitchRetain, &CmndPowerRetain, &CmndSensorRetain,
|
||||
&CmndInfoRetain, &CmndStateRetain, &CmndFileUpload, &CmndFileDownload };
|
||||
&CmndInfoRetain, &CmndStateRetain };
|
||||
|
||||
struct MQTT {
|
||||
uint32_t file_pos = 0; // MQTT file position during upload/download
|
||||
uint32_t file_size = 0; // MQTT total file size
|
||||
uint32_t file_type = 0; // MQTT File type (See UploadTypes)
|
||||
uint8_t* file_buffer = nullptr; // MQTT file buffer
|
||||
MD5Builder md5; // MQTT md5
|
||||
String file_md5; // MQTT received file md5 (32 chars)
|
||||
uint16_t connect_count = 0; // MQTT re-connect count
|
||||
uint16_t retry_counter = 1; // MQTT connection retry counter
|
||||
uint16_t retry_counter_delay = 1; // MQTT retry counter multiplier
|
||||
uint16_t topic_size; // MQTT topic length with terminating <null>
|
||||
uint8_t initial_connection_state = 2; // MQTT connection messages state
|
||||
uint8_t file_id = 0; // MQTT unique file id during upload/download
|
||||
bool connected = false; // MQTT virtual connection status
|
||||
bool allowed = false; // MQTT enabled and parameters valid
|
||||
bool mqtt_tls = false; // MQTT TLS is enabled
|
||||
|
@ -117,7 +114,7 @@ struct MQTT {
|
|||
|
||||
// This part of code is necessary to store Private Key and Cert in Flash
|
||||
#ifdef USE_MQTT_AWS_IOT
|
||||
//#include <base64.hpp>
|
||||
#include <base64.hpp>
|
||||
|
||||
const br_ec_private_key *AWS_IoT_Private_Key = nullptr;
|
||||
const br_x509_certificate *AWS_IoT_Client_Certificate = nullptr;
|
||||
|
@ -547,7 +544,9 @@ void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len
|
|||
#else
|
||||
strlcpy(topic, mqtt_topic, sizeof(topic));
|
||||
#endif // USE_MQTT_AZURE_IOT
|
||||
Mqtt.topic_size = strlen(topic) + 1;
|
||||
#ifdef USE_MQTT_FILE
|
||||
MqttTopicSize(strlen(topic));
|
||||
#endif // USE_MQTT_FILE
|
||||
mqtt_data[data_len] = 0;
|
||||
char data[data_len +1];
|
||||
memcpy(data, mqtt_data, sizeof(data));
|
||||
|
@ -1416,250 +1415,6 @@ void CmndStateRetain(void) {
|
|||
ResponseCmndStateText(Settings.flag5.mqtt_state_retain); // CMND_STATERETAIN
|
||||
}
|
||||
|
||||
/*
|
||||
The download chunk size is the data size before it is encoded to base64.
|
||||
It is smaller than the upload chunksize as it is bound by MESSZ
|
||||
The download buffer with length MESSZ (1042) contains
|
||||
- Payload ({"Id":117,"Data":"<base64 encoded mqtt_file_chuck_size>"}<null>)
|
||||
*/
|
||||
const uint32_t FileTransferHeaderSize = 21; // {"Id":116,"Data":""}<null>
|
||||
const uint32_t mqtt_file_chuck_size = (((MESSZ - FileTransferHeaderSize) / 4) * 3) -2;
|
||||
|
||||
uint32_t FileUploadChunckSize(void) {
|
||||
/*
|
||||
The upload chunk size is the data size before it is encoded to base64.
|
||||
It can be larger than the download chunksize which is bound by MESSZ
|
||||
The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE (1200) contains
|
||||
- Header of 5 bytes (MQTT_MAX_HEADER_SIZE)
|
||||
- Topic string terminated with a zero (stat/demo/FILEUPLOAD<null>)
|
||||
- Payload ({"Id":116,"Data":"<base64 encoded FileUploadChunckSize>"}<null>)
|
||||
*/
|
||||
const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE
|
||||
|
||||
return (((MQTT_MAX_PACKET_SIZE - PubSubClientHeaderSize - Mqtt.topic_size - FileTransferHeaderSize) / 4) * 3) -2;
|
||||
}
|
||||
|
||||
void CmndFileUpload(void) {
|
||||
/*
|
||||
Upload (binary) max 700 bytes chunks of data base64 encoded with MD5 hash over base64 decoded data
|
||||
FileUpload 0 - Abort current upload
|
||||
FileUpload {"File":"Config_wemos10_9.4.0.3.dmp","Id":116,"Type":2,"Size":4096}
|
||||
FileUpload {"Id":116,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
|
||||
FileUpload {"Id":116,"Data":" ... "}
|
||||
FileUpload {"Id":116,"Md5":"496fcbb433bbca89833063174d2c5747"}
|
||||
*/
|
||||
const char* base64_data = nullptr;
|
||||
uint32_t rcv_id = 0;
|
||||
|
||||
char* dataBuf = (char*)XdrvMailbox.data;
|
||||
if (strlen(dataBuf) > 8) { // Workaround exception if empty JSON like {} - Needs checks
|
||||
JsonParser parser((char*) dataBuf);
|
||||
JsonParserObject root = parser.getRootObject();
|
||||
if (root) {
|
||||
JsonParserToken val = root[PSTR("ID")];
|
||||
if (val) { rcv_id = val.getUInt(); }
|
||||
val = root[PSTR("TYPE")];
|
||||
if (val) { Mqtt.file_type = val.getUInt(); }
|
||||
val = root[PSTR("SIZE")];
|
||||
if (val) { Mqtt.file_size = val.getUInt(); }
|
||||
val = root[PSTR("MD5")];
|
||||
if (val) { Mqtt.file_md5 = val.getStr(); }
|
||||
val = root[PSTR("DATA")];
|
||||
if (val) { base64_data = val.getStr(); }
|
||||
}
|
||||
}
|
||||
|
||||
if ((0 == Mqtt.file_id) && (rcv_id > 0) && (Mqtt.file_size > 0) && (Mqtt.file_type > 0)) {
|
||||
// Init upload buffer
|
||||
Mqtt.file_buffer = nullptr;
|
||||
|
||||
if (UPL_TASMOTA == Mqtt.file_type) {
|
||||
if (Update.begin(Mqtt.file_size)) {
|
||||
Mqtt.file_buffer = &Mqtt.file_id; // Dummy buffer
|
||||
}
|
||||
}
|
||||
else if (UPL_SETTINGS == Mqtt.file_type) {
|
||||
if (SettingsConfigBackup()) {
|
||||
Mqtt.file_buffer = settings_buffer;
|
||||
}
|
||||
}
|
||||
|
||||
if (!Mqtt.file_buffer) {
|
||||
ResponseCmndChar(PSTR(D_JSON_INVALID_FILE_TYPE));
|
||||
} else {
|
||||
Mqtt.file_id = rcv_id;
|
||||
Mqtt.file_pos = 0;
|
||||
|
||||
Mqtt.md5 = MD5Builder();
|
||||
Mqtt.md5.begin();
|
||||
|
||||
ResponseCmndChar(PSTR(D_JSON_STARTED));
|
||||
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD
|
||||
}
|
||||
}
|
||||
else if ((Mqtt.file_id > 0) && (Mqtt.file_id != rcv_id)) {
|
||||
// Error receiving data
|
||||
|
||||
if (UPL_TASMOTA == Mqtt.file_type) {
|
||||
Update.end(true);
|
||||
}
|
||||
else if (UPL_SETTINGS == Mqtt.file_type) {
|
||||
SettingsBufferFree();
|
||||
}
|
||||
|
||||
Mqtt.file_buffer = nullptr;
|
||||
ResponseCmndChar(PSTR(D_JSON_ABORTED));
|
||||
}
|
||||
|
||||
if (Mqtt.file_buffer) {
|
||||
if ((Mqtt.file_pos < Mqtt.file_size) && base64_data) {
|
||||
// Save upload into buffer - Handle possible buffer overflows
|
||||
uint32_t rcvd_bytes = decode_base64_length((unsigned char*)base64_data);
|
||||
unsigned char decode_output[rcvd_bytes];
|
||||
decode_base64((unsigned char*)base64_data, (unsigned char*)decode_output);
|
||||
|
||||
uint32_t bytes_left = Mqtt.file_size - Mqtt.file_pos;
|
||||
uint32_t read_bytes = (bytes_left < rcvd_bytes) ? bytes_left : rcvd_bytes;
|
||||
Mqtt.md5.add(decode_output, read_bytes);
|
||||
|
||||
if (UPL_TASMOTA == Mqtt.file_type) {
|
||||
Update.write(decode_output, read_bytes);
|
||||
} else {
|
||||
uint8_t* buffer = Mqtt.file_buffer + Mqtt.file_pos;
|
||||
memcpy(buffer, decode_output, read_bytes);
|
||||
}
|
||||
|
||||
Mqtt.file_pos += read_bytes;
|
||||
|
||||
if ((Mqtt.file_pos > rcvd_bytes) && ((Mqtt.file_pos % 102400) <= rcvd_bytes)) {
|
||||
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
|
||||
AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_UPLOAD "Progress %d kB"), Mqtt.file_pos / 1024);
|
||||
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
|
||||
}
|
||||
}
|
||||
|
||||
if ((Mqtt.file_pos < Mqtt.file_size) || (Mqtt.file_md5.length() != 32)) {
|
||||
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
|
||||
|
||||
// {"Id":116,"MaxSize":"765"}
|
||||
Response_P(PSTR("{\"Id\":%d,\"MaxSize\":%d}"), Mqtt.file_id, FileUploadChunckSize());
|
||||
} else {
|
||||
Mqtt.md5.calculate();
|
||||
if (strcasecmp(Mqtt.file_md5.c_str(), Mqtt.md5.toString().c_str())) {
|
||||
ResponseCmndChar(PSTR(D_JSON_MD5_MISMATCH));
|
||||
} else {
|
||||
// Process upload data en free buffer
|
||||
ResponseCmndDone();
|
||||
|
||||
if (UPL_TASMOTA == Mqtt.file_type) {
|
||||
if (!Update.end(true)) {
|
||||
ResponseCmndFailed();
|
||||
} else {
|
||||
TasmotaGlobal.restart_flag = 2; // Always restart to re-enable disabled features during update
|
||||
}
|
||||
}
|
||||
else if (UPL_SETTINGS == Mqtt.file_type) {
|
||||
if (!SettingsConfigRestore()) {
|
||||
ResponseCmndFailed();
|
||||
} else {
|
||||
TasmotaGlobal.restart_flag = 2; // Always restart to re-enable disabled features during update
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Mqtt.file_buffer = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (!Mqtt.file_buffer) {
|
||||
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
|
||||
Mqtt.file_id = 0;
|
||||
Mqtt.file_size = 0;
|
||||
Mqtt.file_type = 0;
|
||||
Mqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory
|
||||
}
|
||||
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD
|
||||
ResponseClear();
|
||||
}
|
||||
|
||||
void CmndFileDownload(void) {
|
||||
/*
|
||||
Download (binary) max 700 bytes chunks of data base64 encoded with MD5 hash over base64 decoded data
|
||||
Currently supports Settings (file type 2)
|
||||
Filedownload 0 - Abort current download
|
||||
FileDownload 2 - Start download of settings file
|
||||
FileDownload - Continue downloading data until reception of MD5 hash
|
||||
*/
|
||||
|
||||
if (Mqtt.file_id && Mqtt.file_buffer) {
|
||||
bool finished = false;
|
||||
|
||||
if (0 == XdrvMailbox.payload) { // Abort file download
|
||||
ResponseCmndChar(PSTR(D_JSON_ABORTED));
|
||||
finished = true;
|
||||
}
|
||||
else if (Mqtt.file_pos < Mqtt.file_size) {
|
||||
uint32_t bytes_left = Mqtt.file_size - Mqtt.file_pos;
|
||||
uint32_t write_bytes = (bytes_left < mqtt_file_chuck_size) ? bytes_left : mqtt_file_chuck_size;
|
||||
|
||||
uint8_t* buffer = Mqtt.file_buffer + Mqtt.file_pos;
|
||||
Mqtt.md5.add(buffer, write_bytes);
|
||||
|
||||
// {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
|
||||
Response_P(PSTR("{\"Id\":%d,\"Data\":\""), Mqtt.file_id); // FileTransferHeaderSize
|
||||
char base64_data[encode_base64_length(write_bytes)];
|
||||
encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data);
|
||||
ResponseAppend_P(base64_data);
|
||||
ResponseAppend_P("\"}");
|
||||
|
||||
Mqtt.file_pos += write_bytes;
|
||||
} else {
|
||||
Mqtt.md5.calculate();
|
||||
|
||||
// {"Id":117,"Md5":"496fcbb433bbca89833063174d2c5747"}
|
||||
Response_P(PSTR("{\"Id\":%d,\"Md5\":\"%s\"}"), Mqtt.file_id, Mqtt.md5.toString().c_str());
|
||||
finished = true;
|
||||
}
|
||||
|
||||
if (finished) {
|
||||
if (UPL_SETTINGS == Mqtt.file_type) {
|
||||
SettingsBufferFree();
|
||||
}
|
||||
|
||||
Mqtt.file_id = 0;
|
||||
}
|
||||
}
|
||||
else if (XdrvMailbox.data_len) {
|
||||
Mqtt.file_buffer = nullptr;
|
||||
Mqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255
|
||||
|
||||
if (UPL_SETTINGS == XdrvMailbox.payload) {
|
||||
uint32_t len = SettingsConfigBackup();
|
||||
if (len) {
|
||||
Mqtt.file_type = UPL_SETTINGS;
|
||||
Mqtt.file_buffer = settings_buffer;
|
||||
Mqtt.file_size = len;
|
||||
|
||||
// {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096}
|
||||
Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"),
|
||||
SettingsConfigFilename().c_str(), Mqtt.file_id, Mqtt.file_type, len);
|
||||
}
|
||||
}
|
||||
|
||||
if (Mqtt.file_buffer) {
|
||||
Mqtt.file_pos = 0;
|
||||
|
||||
Mqtt.md5 = MD5Builder();
|
||||
Mqtt.md5.begin();
|
||||
} else {
|
||||
Mqtt.file_id = 0;
|
||||
ResponseCmndFailed();
|
||||
}
|
||||
}
|
||||
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command);
|
||||
ResponseClear();
|
||||
}
|
||||
|
||||
/*********************************************************************************************\
|
||||
* TLS private key and certificate - store into Flash
|
||||
\*********************************************************************************************/
|
||||
|
@ -1960,9 +1715,6 @@ bool Xdrv02(uint8_t function)
|
|||
|
||||
if (Settings.flag.mqtt_enabled) { // SetOption3 - Enable MQTT
|
||||
switch (function) {
|
||||
case FUNC_PRE_INIT:
|
||||
MqttInit();
|
||||
break;
|
||||
case FUNC_EVERY_50_MSECOND: // https://github.com/knolleary/pubsubclient/issues/556
|
||||
MqttClient.loop();
|
||||
break;
|
||||
|
@ -1977,6 +1729,9 @@ bool Xdrv02(uint8_t function)
|
|||
case FUNC_COMMAND:
|
||||
result = DecodeCommand(kMqttCommands, MqttCommand, kMqttSynonyms);
|
||||
break;
|
||||
case FUNC_PRE_INIT:
|
||||
MqttInit();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
Loading…
Reference in New Issue