/* xdrv_02_1_mqtt_file.ino - mqtt file support for Tasmota Copyright (C) 2021 Theo Arends This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #define USE_MQTT_FILE #ifdef USE_MQTT_FILE /*********************************************************************************************\ * MQTT file transfer * * Supports both binary and base64 encoded binary data transfer * * See tools/mqtt-file for python ota-upload and settings-upload and download examples \*********************************************************************************************/ #include #include extern PubSubClient MqttClient; struct FMQTT { uint32_t file_pos = 0; // MQTT file position during upload/download uint32_t file_size = 0; // MQTT total file size uint32_t file_type = 0; // MQTT File type (See UploadTypes) uint32_t chunk_size; uint8_t* file_buffer = nullptr; // MQTT file buffer const char* file_password = nullptr; // MQTT password MD5Builder md5; // MQTT md5 String file_md5; // MQTT received file md5 (32 chars) uint16_t topic_size; // MQTT topic length with terminating uint8_t file_id = 0; // MQTT unique file id during upload/download bool file_binary = false; // MQTT binary file transfer } FMqtt; const uint32_t FileTransferHeaderSize = 21; // {"Id":116,"Data":""} void MqttFileValidate(uint32_t error) { if (error) { FMqtt.file_buffer = nullptr; MqttDisableLogging(false); if (4 == error) { ResponseCmndChar(PSTR(D_JSON_ABORTED)); } else { char error_txt[20]; snprintf_P(error_txt, sizeof(error_txt), PSTR(D_JSON_ERROR " %d"), error); ResponseCmndChar(error_txt); } } } void MqttFilePublish(void) { if (!FMqtt.file_buffer) { MqttDisableLogging(false); FMqtt.file_id = 0; FMqtt.file_size = 0; FMqtt.file_type = 0; FMqtt.file_binary = false; FMqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory FMqtt.file_password = nullptr; } MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); ResponseClear(); } /*********************************************************************************************\ * MQTT Upload to device \*********************************************************************************************/ uint32_t MqttFileUploadValidate(uint32_t rcv_id) { if (XdrvMailbox.grpflg) { return 5; } // No grouptopic supported if ((0 == FMqtt.file_id) && (rcv_id > 0) && (FMqtt.file_size > 0) && (FMqtt.file_type > 0)) { FMqtt.file_buffer = nullptr; // Init upload buffer if (!FMqtt.file_password || (strcmp(FMqtt.file_password, SettingsText(SET_MQTT_PWD)) != 0)) { return 1; // Invalid password } // Check buffer size if (UPL_SETTINGS == FMqtt.file_type) { if (FMqtt.file_size > sizeof(TSettings)) { return 2; // Settings supports max 4k size } } else { // Check enough flash space for intermediate upload uint32_t head_room = (FlashWriteMaxSector() - FlashWriteStartSector()) * SPI_FLASH_SEC_SIZE; uint32_t rounded_size = (FMqtt.file_size + SPI_FLASH_SEC_SIZE -1) & (~(SPI_FLASH_SEC_SIZE - 1)); if (rounded_size > head_room) { return 2; // Not enough space } } // Init file_buffer if (UPL_SETTINGS == FMqtt.file_type) { if (SettingsConfigBackup()) { FMqtt.file_buffer = settings_buffer; } } else { if (UPL_TASMOTA == FMqtt.file_type) { if (Update.begin(FMqtt.file_size)) { FMqtt.file_buffer = &FMqtt.file_id; // Dummy buffer // FMqtt.file_buffer = (uint8_t*)malloc(SPI_FLASH_SEC_SIZE); // if (FMqtt.file_buffer) { SetLedLink(1); SettingsSave(1); // Free flash for OTA update // } } else { Update.end(true); } } else { return 3; // Invalid file type } } if (!FMqtt.file_buffer) { return 6; } // No buffer FMqtt.file_id = rcv_id; FMqtt.file_pos = 0; FMqtt.md5 = MD5Builder(); FMqtt.md5.begin(); ResponseCmndChar(PSTR(D_JSON_STARTED)); MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD /* The upload chunk size is the data size of the payload. The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE contains - Header of 5 bytes (MQTT_MAX_HEADER_SIZE) - Topic string terminated with a zero (stat/demo/FILEUPLOAD) - Payload ({"Id":116,"Data":""}) or () */ const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE FMqtt.chunk_size = MqttClient.getBufferSize() - PubSubClientHeaderSize - FMqtt.topic_size -1; #ifdef USE_TASMESH if (MESHroleNode()) { // TasMesh default payload size (topic+payload) is 160 if (MESHmaxPayloadSize() < FMqtt.chunk_size) { FMqtt.chunk_size = MESHmaxPayloadSize(); } } #endif // USE_TASMESH } else if (((FMqtt.file_id > 0) && (FMqtt.file_id != rcv_id)) || (0 == XdrvMailbox.payload)) { // Error receiving data if (UPL_SETTINGS == FMqtt.file_type) { SettingsBufferFree(); } else { if (UPL_TASMOTA == FMqtt.file_type) { Update.end(true); SetLedLink(0); } /* if (FMqtt.file_buffer != nullptr) { free(FMqtt.file_buffer); FMqtt.file_buffer = nullptr; } */ } return 4; // Upload aborted } return 0; // No error } void CmndFileUpload(void) { /* Upload bytes chunks of data either base64 encoded or binary with MD5 hash Supported Type: 1 - OTA firmware 2 - Settings FileUpload 0 - Abort current upload Start an upload session: FileUpload {"Password":"","File":"Config_wemos10_9.4.0.3.dmp","Id":116,"Type":2,"Size":4096} Upload data using base64: FileUpload {"Id":116,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} FileUpload {"Id":116,"Data":" ... "} Or binary: FileUpload201 Finish upload session: FileUpload {"Id":116,"Md5":"496fcbb433bbca89833063174d2c5747"} */ const char* base64_data = nullptr; uint32_t rcv_id = 0; bool binary_data = (XdrvMailbox.index > 199); // Check for raw data if (!binary_data) { if (strlen(XdrvMailbox.data) > 8) { // Workaround exception if empty JSON like {} - Needs checks JsonParser parser((char*) XdrvMailbox.data); JsonParserObject root = parser.getRootObject(); if (root) { JsonParserToken val = root[PSTR("ID")]; if (val) { rcv_id = val.getUInt(); } val = root[PSTR("TYPE")]; if (val) { FMqtt.file_type = val.getUInt(); } val = root[PSTR("SIZE")]; if (val) { FMqtt.file_size = val.getUInt(); } val = root[PSTR("MD5")]; if (val) { FMqtt.file_md5 = val.getStr(); } val = root[PSTR("DATA")]; if (val) { base64_data = val.getStr(); } val = root[PSTR("PASSWORD")]; if (val) { FMqtt.file_password = val.getStr(); } } } } else { rcv_id = FMqtt.file_id; } MqttFileValidate(MqttFileUploadValidate(rcv_id)); if (FMqtt.file_buffer) { if ((FMqtt.file_pos < FMqtt.file_size) && (binary_data || base64_data)) { // Save upload into buffer - Handle possible buffer overflows unsigned char* raw_data = (unsigned char*)XdrvMailbox.data; uint32_t rcvd_bytes = XdrvMailbox.data_len; if (!binary_data) { raw_data = (unsigned char*)malloc(XdrvMailbox.data_len); if (raw_data) { rcvd_bytes = decode_base64((unsigned char*)base64_data, (unsigned char*)raw_data); } } if (raw_data) { uint32_t bytes_left = FMqtt.file_size - FMqtt.file_pos; uint32_t read_bytes = (bytes_left < rcvd_bytes) ? bytes_left : rcvd_bytes; FMqtt.md5.add(raw_data, read_bytes); if (UPL_SETTINGS == FMqtt.file_type) { uint8_t* buffer = FMqtt.file_buffer + FMqtt.file_pos; memcpy(buffer, raw_data, read_bytes); } else { if (UPL_TASMOTA == FMqtt.file_type) { Update.write(raw_data, read_bytes); } } if (!binary_data) { free(raw_data); } FMqtt.file_pos += read_bytes; } if ((FMqtt.file_pos > rcvd_bytes) && ((FMqtt.file_pos % 102400) <= rcvd_bytes)) { TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_UPLOAD "Progress %d kB"), (FMqtt.file_pos / 10240) * 10); TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging } } if ((FMqtt.file_pos < FMqtt.file_size) || (FMqtt.file_md5.length() != 32)) { MqttDisableLogging(true); if (!binary_data && !FMqtt.file_pos) { FMqtt.chunk_size = (((FMqtt.chunk_size - FileTransferHeaderSize) / 4) * 3) -2; // Calculate base64 chunk size } // {"Id":116,"MaxSize":"765"} Response_P(PSTR("{\"Id\":%d,\"MaxSize\":%d}"), FMqtt.file_id, FMqtt.chunk_size); } else { FMqtt.md5.calculate(); if (strcasecmp(FMqtt.file_md5.c_str(), FMqtt.md5.toString().c_str())) { ResponseCmndChar(PSTR(D_JSON_MD5_MISMATCH)); } else { // Process upload data en free buffer ResponseCmndDone(); if (UPL_SETTINGS == FMqtt.file_type) { if (!SettingsConfigRestore()) { ResponseCmndFailed(); } else { TasmotaGlobal.restart_flag = 2; // Restart to load new settings } } else { if (UPL_TASMOTA == FMqtt.file_type) { if (!Update.end(true)) { SetLedLink(0); ResponseCmndFailed(); } else { TasmotaGlobal.restart_flag = 2; // Restart to load new firmware } } /* if (FMqtt.file_buffer != nullptr) { free(FMqtt.file_buffer); FMqtt.file_buffer = nullptr; } */ } } FMqtt.file_buffer = nullptr; } } MqttFilePublish(); } /*********************************************************************************************\ * MQTT Download from device \*********************************************************************************************/ uint32_t MqttFileDownloadValidate(void) { if (XdrvMailbox.grpflg) { return 5; } // No grouptopic supported if ((0 == FMqtt.file_id) && (FMqtt.file_type > 0)) { FMqtt.file_buffer = nullptr; // Init upload buffer if (!FMqtt.file_password || (strcmp(FMqtt.file_password, SettingsText(SET_MQTT_PWD)) != 0)) { return 1; // Invalid password } FMqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255 // Init file_buffer if (UPL_SETTINGS == FMqtt.file_type) { uint32_t len = SettingsConfigBackup(); if (!len) { return 2; } FMqtt.file_type = UPL_SETTINGS; FMqtt.file_buffer = settings_buffer; FMqtt.file_size = len; // {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096} Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"), SettingsConfigFilename().c_str(), FMqtt.file_id, FMqtt.file_type, len); } else { return 3; // Invalid file type } FMqtt.file_pos = 0; FMqtt.md5 = MD5Builder(); FMqtt.md5.begin(); char payload[50]; snprintf_P(payload, sizeof(payload), S_JSON_COMMAND_SVALUE, XdrvMailbox.command, PSTR(D_JSON_STARTED)); MqttPublishPayloadPrefixTopic_P(STAT, XdrvMailbox.command, payload); // Enforce stat/wemos10/FILEUPLOAD MqttDisableLogging(true); } else if (0 == XdrvMailbox.payload) { if (UPL_SETTINGS == FMqtt.file_type) { SettingsBufferFree(); } return 4; // Upload aborted } return 0; // No error } void CmndFileDownload(void) { /* Download chunks of data base64 encoded with MD5 hash Supported Type: 2 - Settings FileDownload 0 - Abort current download Start a download session: FileDownload {"Password":"","Type":2} Download data using base64 until reception of MD5 hash: FileDownload */ if (FMqtt.file_buffer) { if (FMqtt.file_pos < FMqtt.file_size) { uint32_t chunk_size = 4096; if (!FMqtt.file_binary) { /* The download chunk size is the data size before it is encoded to base64. The download buffer contains - Payload ({"Id":117,"Data":""}) */ chunk_size = (((ResponseSize() - FileTransferHeaderSize) / 4) * 3) -2; } #ifdef USE_TASMESH if (MESHroleNode() && (chunk_size > 2048)) { chunk_size = 2048; } #endif // USE_TASMESH uint32_t bytes_left = FMqtt.file_size - FMqtt.file_pos; uint32_t write_bytes = (bytes_left < chunk_size) ? bytes_left : chunk_size; uint8_t* buffer = FMqtt.file_buffer + FMqtt.file_pos; FMqtt.md5.add(buffer, write_bytes); FMqtt.file_pos += write_bytes; if (FMqtt.file_binary) { // Binary data up to 4k MqttPublishPayloadPrefixTopic_P(STAT, XdrvMailbox.command, (const char*)buffer, write_bytes); } else { // {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} char* base64_data = (char*)malloc(encode_base64_length(write_bytes) +2); if (base64_data) { Response_P(PSTR("{\"Id\":%d,\"Data\":\""), FMqtt.file_id); // FileTransferHeaderSize SHOW_FREE_MEM(PSTR("CmndFileDownload")); encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data); ResponseAppend_P(base64_data); ResponseAppend_P("\"}"); MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); free(base64_data); } else { XdrvMailbox.payload = 0; // Abort } } ResponseClear(); if (XdrvMailbox.payload != 0) { return; } // No error } else { FMqtt.md5.calculate(); // {"Id":117,"Md5":"496fcbb433bbca89833063174d2c5747"} Response_P(PSTR("{\"Id\":%d,\"Md5\":\"%s\"}"), FMqtt.file_id, FMqtt.md5.toString().c_str()); MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD ResponseCmndDone(); if (UPL_SETTINGS == FMqtt.file_type) { SettingsBufferFree(); } FMqtt.file_buffer = nullptr; } } if (strlen(XdrvMailbox.data) > 8) { // Workaround exception if empty JSON like {} - Needs checks JsonParser parser((char*) XdrvMailbox.data); JsonParserObject root = parser.getRootObject(); if (root) { JsonParserToken val = root[PSTR("TYPE")]; if (val) { FMqtt.file_type = val.getUInt(); } val = root[PSTR("BINARY")]; if (val) { FMqtt.file_binary = val.getUInt(); } val = root[PSTR("PASSWORD")]; if (val) { FMqtt.file_password = val.getStr(); } } } MqttFileValidate(MqttFileDownloadValidate()); MqttFilePublish(); } #endif // USE_MQTT_FILE