From 36caed2122a70df0c1d296cfd0862084491b0661 Mon Sep 17 00:00:00 2001 From: Theo Arends <11044339+arendst@users.noreply.github.com> Date: Tue, 11 May 2021 16:26:29 +0200 Subject: [PATCH] Increase Settings MQTT file upload buffer --- tasmota/xdrv_02_mqtt.ino | 61 +++++++++++++++++++--------- tools/mqtt-file/download-settings.py | 2 - tools/mqtt-file/upload-settings.py | 45 ++++++++++++-------- 3 files changed, 68 insertions(+), 40 deletions(-) diff --git a/tasmota/xdrv_02_mqtt.ino b/tasmota/xdrv_02_mqtt.ino index 30b04a1fa..eb972ae33 100644 --- a/tasmota/xdrv_02_mqtt.ino +++ b/tasmota/xdrv_02_mqtt.ino @@ -23,8 +23,6 @@ #define MQTT_WIFI_CLIENT_TIMEOUT 200 // Wifi TCP connection timeout (default is 5000 mSec) #endif -const uint32_t mqtt_file_chuck_size = 700; // Related to base64_encode (+2 / 3 * 4) and MQTT buffer size (MIN_MESSZ = 1040) - #include #define USE_MQTT_NEW_PUBSUBCLIENT @@ -99,16 +97,17 @@ void (* const MqttCommand[])(void) PROGMEM = { struct MQTT { uint32_t file_pos = 0; // MQTT file position during upload/download - uint32_t file_id = 0; // MQTT unique file id during upload/download - uint32_t file_type = 0; // MQTT File type (See UploadTypes) uint32_t file_size = 0; // MQTT total file size + uint32_t file_type = 0; // MQTT File type (See UploadTypes) uint8_t* file_buffer = nullptr; // MQTT file buffer MD5Builder md5; // MQTT md5 String file_md5; // MQTT received file md5 (32 chars) uint16_t connect_count = 0; // MQTT re-connect count uint16_t retry_counter = 1; // MQTT connection retry counter uint16_t retry_counter_delay = 1; // MQTT retry counter multiplier + uint16_t topic_size; // MQTT topic length with terminating uint8_t initial_connection_state = 2; // MQTT connection messages state + uint8_t file_id = 0; // MQTT unique file id during upload/download bool connected = false; // MQTT virtual connection status bool allowed = false; // MQTT enabled and parameters valid bool mqtt_tls = false; // MQTT TLS is enabled @@ -548,6 +547,7 @@ void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len #else strlcpy(topic, mqtt_topic, sizeof(topic)); #endif // USE_MQTT_AZURE_IOT + Mqtt.topic_size = strlen(topic) + 1; mqtt_data[data_len] = 0; char data[data_len +1]; memcpy(data, mqtt_data, sizeof(data)); @@ -1416,14 +1416,37 @@ void CmndStateRetain(void) { ResponseCmndStateText(Settings.flag5.mqtt_state_retain); // CMND_STATERETAIN } +/* + The download chunk size is the data size before it is encoded to base64. + It is smaller than the upload chunksize as it is bound by MESSZ + The download buffer with length MESSZ (1042) contains + - Payload ({"Id":117,"Data":""}) +*/ +const uint32_t FileTransferHeaderSize = 21; // {"Id":116,"Data":""} +const uint32_t mqtt_file_chuck_size = (((MESSZ - FileTransferHeaderSize) / 4) * 3) -2; + +uint32_t FileUploadChunckSize(void) { +/* + The upload chunk size is the data size before it is encoded to base64. + It can be larger than the download chunksize which is bound by MESSZ + The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE (1200) contains + - Header of 5 bytes (MQTT_MAX_HEADER_SIZE) + - Topic string terminated with a zero (stat/demo/FILEUPLOAD) + - Payload ({"Id":116,"Data":""}) +*/ + const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE + + return (((MQTT_MAX_PACKET_SIZE - PubSubClientHeaderSize - Mqtt.topic_size - FileTransferHeaderSize) / 4) * 3) -2; +} + void CmndFileUpload(void) { /* Upload (binary) max 700 bytes chunks of data base64 encoded with MD5 hash over base64 decoded data FileUpload 0 - Abort current upload - FileUpload {"File":"Config_wemos10_9.4.0.3.dmp","Id":1620385091,"Type":2,"Size":4096} - FileUpload {"Id":1620385091,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} - FileUpload {"Id":1620385091,"Data":" ... "} - FileUpload {"Id":1620385091,"Md5":"496fcbb433bbca89833063174d2c5747"} + FileUpload {"File":"Config_wemos10_9.4.0.3.dmp","Id":116,"Type":2,"Size":4096} + FileUpload {"Id":116,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} + FileUpload {"Id":116,"Data":" ... "} + FileUpload {"Id":116,"Md5":"496fcbb433bbca89833063174d2c5747"} */ const char* base64_data = nullptr; uint32_t rcv_id = 0; @@ -1496,7 +1519,8 @@ void CmndFileUpload(void) { } if ((Mqtt.file_pos < Mqtt.file_size) || (Mqtt.file_md5.length() != 32)) { - ResponseCmndChar(PSTR(D_JSON_ACK)); + // {"Id":116,"MaxSize":"765"} + Response_P(PSTR("{\"Id\":%d,\"MaxSize\":%d}"), Mqtt.file_id, FileUploadChunckSize()); } else { Mqtt.md5.calculate(); if (strcasecmp(Mqtt.file_md5.c_str(), Mqtt.md5.toString().c_str())) { @@ -1537,6 +1561,7 @@ void CmndFileDownload(void) { FileDownload 2 - Start download of settings file FileDownload - Continue downloading data until reception of MD5 hash */ + if (Mqtt.file_id && Mqtt.file_buffer) { bool finished = false; @@ -1551,12 +1576,8 @@ void CmndFileDownload(void) { uint8_t* buffer = Mqtt.file_buffer + Mqtt.file_pos; Mqtt.md5.add(buffer, write_bytes); - // {"Id":1620385091,"Seq":1,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} -// uint32_t sequence = (Mqtt.file_pos / mqtt_file_chuck_size) +1; -// Response_P(PSTR("{\"Id\":%u,\"Seq\":%d,\"Data\":\""), Mqtt.file_id, sequence); - - // {"Id":1620385091,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} - Response_P(PSTR("{\"Id\":%u,\"Data\":\""), Mqtt.file_id); + // {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="} + Response_P(PSTR("{\"Id\":%d,\"Data\":\""), Mqtt.file_id); // FileTransferHeaderSize char base64_data[encode_base64_length(write_bytes)]; encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data); ResponseAppend_P(base64_data); @@ -1566,8 +1587,8 @@ void CmndFileDownload(void) { } else { Mqtt.md5.calculate(); - // {"Id":1620385091,"Md5":"496fcbb433bbca89833063174d2c5747"} - Response_P(PSTR("{\"Id\":%u,\"Md5\":\"%s\"}"), Mqtt.file_id, Mqtt.md5.toString().c_str()); + // {"Id":117,"Md5":"496fcbb433bbca89833063174d2c5747"} + Response_P(PSTR("{\"Id\":%d,\"Md5\":\"%s\"}"), Mqtt.file_id, Mqtt.md5.toString().c_str()); finished = true; } @@ -1581,7 +1602,7 @@ void CmndFileDownload(void) { } else if (XdrvMailbox.data_len) { Mqtt.file_buffer = nullptr; - Mqtt.file_id = UtcTime(); + Mqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255 if (UPL_SETTINGS == XdrvMailbox.payload) { uint32_t len = SettingsConfigBackup(); @@ -1590,8 +1611,8 @@ void CmndFileDownload(void) { Mqtt.file_buffer = settings_buffer; Mqtt.file_size = len; - // {"File":"Config_wemos10_9.4.0.3.dmp","Id":1620385091,"Type":2,"Size":4096} - Response_P(PSTR("{\"File\":\"%s\",\"Id\":%u,\"Type\":%d,\"Size\":%d}"), + // {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096} + Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"), SettingsConfigFilename().c_str(), Mqtt.file_id, Mqtt.file_type, len); } } diff --git a/tools/mqtt-file/download-settings.py b/tools/mqtt-file/download-settings.py index 247af2a88..046a35997 100644 --- a/tools/mqtt-file/download-settings.py +++ b/tools/mqtt-file/download-settings.py @@ -51,8 +51,6 @@ myfiletype = 2 # Tasmota Settings file type mypublish = "cmnd/"+mytopic+"/filedownload" mysubscribe = "stat/"+mytopic+"/FILEDOWNLOAD" # Case sensitive -# Tasmota currently supports MQTT message size of 1040 characters. Base64 adds 0.25 chars -chucksize = 700 # Tasmota max chunk size Ack_flag = False file_name = "" diff --git a/tools/mqtt-file/upload-settings.py b/tools/mqtt-file/upload-settings.py index 548669df4..ef075b8e0 100644 --- a/tools/mqtt-file/upload-settings.py +++ b/tools/mqtt-file/upload-settings.py @@ -22,7 +22,7 @@ Requirements: - Python 3.x and Pip: sudo apt-get install python3 python3-pip - pip3 install paho-mqtt + pip3 install paho-mqtt json Instructions: Edit file and change parameters in User Configuration Section @@ -35,6 +35,7 @@ import paho.mqtt.client as mqtt import time import base64 import hashlib +import json # **** Start of User Configuration Section @@ -47,26 +48,29 @@ myfiletype = 2 # Tasmota Settings file type # **** End of User Configuration Section -# Derive from myfile -myfilesize = 4096 - -# Derive from time epoch -myid = 1620484815 - # Derive fulltopic from broker LWT message mypublish = "cmnd/"+mytopic+"/fileupload" mysubscribe = "stat/"+mytopic+"/FILEUPLOAD" # Case sensitive -# Tasmota currently supports MQTT message size of 1040 characters. Base64 adds 0.25 chars -chucksize = 700 # Tasmota max chunk size - -# Example does use feedback Acknowledge Ack_flag = False +file_id = 116 # Even id between 2 and 254 +file_chunk_size = 700 # Default Tasmota MQTT max message size + # The callback for when mysubscribe message is received def on_message(client, userdata, msg): global Ack_flag + global file_chunk_size + + rcv_id = 0 + # print("Received message =",str(msg.payload.decode("utf-8"))) + + root = json.loads(msg.payload.decode("utf-8")) + if "Id" in root: rcv_id = root["Id"] + if rcv_id == file_id: + if "MaxSize" in root: file_chunk_size = root["MaxSize"] + Ack_flag = False def wait_for_ack(): @@ -90,29 +94,34 @@ client.subscribe(mysubscribe) time_start = time.time() print("Uploading file "+myfile+" to "+mytopic+" ...") -client.publish(mypublish, "{\"File\":\""+myfile+"\",\"Id\":"+str(myid)+",\"Type\":"+str(myfiletype)+",\"Size\":"+str(myfilesize)+"}") +fo = open(myfile,"rb") +fo.seek(0, 2) # os.SEEK_END +file_size = fo.tell() +fo.seek(0, 0) # os.SEEK_SET + +client.publish(mypublish, "{\"File\":\""+myfile+"\",\"Id\":"+str("%3d"%file_id)+",\"Type\":"+str(myfiletype)+",\"Size\":"+str(file_size)+"}") Ack_flag = True out_hash_md5 = hashlib.md5() -fo = open(myfile,"rb") Run_flag = True while Run_flag: - if wait_for_ack(): # We use Ack here + if wait_for_ack(): # We use Ack here Run_flag = False else: - chunk = fo.read(chucksize) + chunk = fo.read(file_chunk_size) if chunk: - out_hash_md5.update(chunk) # Update hash + out_hash_md5.update(chunk) # Update hash base64_encoded_data = base64.b64encode(chunk) base64_data = base64_encoded_data.decode('utf-8') - client.publish(mypublish, "{\"Id\":"+str(myid)+",\"Data\":\""+base64_data+"\"}") + # Message length used by Tasmota (FileTransferHeaderSize) + client.publish(mypublish, "{\"Id\":"+str("%3d"%file_id)+",\"Data\":\""+base64_data+"\"}") Ack_flag = True else: md5_hash = out_hash_md5.hexdigest() - client.publish(mypublish, "{\"Id\":"+str(myid)+",\"Md5\":\""+md5_hash+"\"}") + client.publish(mypublish, "{\"Id\":"+str("%3d"%file_id)+",\"Md5\":\""+md5_hash+"\"}") Run_flag = False fo.close()