Increase Settings MQTT file upload buffer

This commit is contained in:
Theo Arends 2021-05-11 16:26:29 +02:00
parent d3874e9834
commit 36caed2122
3 changed files with 68 additions and 40 deletions

View File

@ -23,8 +23,6 @@
#define MQTT_WIFI_CLIENT_TIMEOUT 200 // Wifi TCP connection timeout (default is 5000 mSec)
#endif
const uint32_t mqtt_file_chuck_size = 700; // Related to base64_encode (+2 / 3 * 4) and MQTT buffer size (MIN_MESSZ = 1040)
#include <base64.hpp>
#define USE_MQTT_NEW_PUBSUBCLIENT
@ -99,16 +97,17 @@ void (* const MqttCommand[])(void) PROGMEM = {
struct MQTT {
uint32_t file_pos = 0; // MQTT file position during upload/download
uint32_t file_id = 0; // MQTT unique file id during upload/download
uint32_t file_type = 0; // MQTT File type (See UploadTypes)
uint32_t file_size = 0; // MQTT total file size
uint32_t file_type = 0; // MQTT File type (See UploadTypes)
uint8_t* file_buffer = nullptr; // MQTT file buffer
MD5Builder md5; // MQTT md5
String file_md5; // MQTT received file md5 (32 chars)
uint16_t connect_count = 0; // MQTT re-connect count
uint16_t retry_counter = 1; // MQTT connection retry counter
uint16_t retry_counter_delay = 1; // MQTT retry counter multiplier
uint16_t topic_size; // MQTT topic length with terminating <null>
uint8_t initial_connection_state = 2; // MQTT connection messages state
uint8_t file_id = 0; // MQTT unique file id during upload/download
bool connected = false; // MQTT virtual connection status
bool allowed = false; // MQTT enabled and parameters valid
bool mqtt_tls = false; // MQTT TLS is enabled
@ -548,6 +547,7 @@ void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len
#else
strlcpy(topic, mqtt_topic, sizeof(topic));
#endif // USE_MQTT_AZURE_IOT
Mqtt.topic_size = strlen(topic) + 1;
mqtt_data[data_len] = 0;
char data[data_len +1];
memcpy(data, mqtt_data, sizeof(data));
@ -1416,14 +1416,37 @@ void CmndStateRetain(void) {
ResponseCmndStateText(Settings.flag5.mqtt_state_retain); // CMND_STATERETAIN
}
/*
The download chunk size is the data size before it is encoded to base64.
It is smaller than the upload chunksize as it is bound by MESSZ
The download buffer with length MESSZ (1042) contains
- Payload ({"Id":117,"Data":"<base64 encoded mqtt_file_chuck_size>"}<null>)
*/
const uint32_t FileTransferHeaderSize = 21; // {"Id":116,"Data":""}<null>
const uint32_t mqtt_file_chuck_size = (((MESSZ - FileTransferHeaderSize) / 4) * 3) -2;
uint32_t FileUploadChunckSize(void) {
/*
The upload chunk size is the data size before it is encoded to base64.
It can be larger than the download chunksize which is bound by MESSZ
The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE (1200) contains
- Header of 5 bytes (MQTT_MAX_HEADER_SIZE)
- Topic string terminated with a zero (stat/demo/FILEUPLOAD<null>)
- Payload ({"Id":116,"Data":"<base64 encoded FileUploadChunckSize>"}<null>)
*/
const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE
return (((MQTT_MAX_PACKET_SIZE - PubSubClientHeaderSize - Mqtt.topic_size - FileTransferHeaderSize) / 4) * 3) -2;
}
void CmndFileUpload(void) {
/*
Upload (binary) max 700 bytes chunks of data base64 encoded with MD5 hash over base64 decoded data
FileUpload 0 - Abort current upload
FileUpload {"File":"Config_wemos10_9.4.0.3.dmp","Id":1620385091,"Type":2,"Size":4096}
FileUpload {"Id":1620385091,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
FileUpload {"Id":1620385091,"Data":" ... "}
FileUpload {"Id":1620385091,"Md5":"496fcbb433bbca89833063174d2c5747"}
FileUpload {"File":"Config_wemos10_9.4.0.3.dmp","Id":116,"Type":2,"Size":4096}
FileUpload {"Id":116,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
FileUpload {"Id":116,"Data":" ... "}
FileUpload {"Id":116,"Md5":"496fcbb433bbca89833063174d2c5747"}
*/
const char* base64_data = nullptr;
uint32_t rcv_id = 0;
@ -1496,7 +1519,8 @@ void CmndFileUpload(void) {
}
if ((Mqtt.file_pos < Mqtt.file_size) || (Mqtt.file_md5.length() != 32)) {
ResponseCmndChar(PSTR(D_JSON_ACK));
// {"Id":116,"MaxSize":"765"}
Response_P(PSTR("{\"Id\":%d,\"MaxSize\":%d}"), Mqtt.file_id, FileUploadChunckSize());
} else {
Mqtt.md5.calculate();
if (strcasecmp(Mqtt.file_md5.c_str(), Mqtt.md5.toString().c_str())) {
@ -1537,6 +1561,7 @@ void CmndFileDownload(void) {
FileDownload 2 - Start download of settings file
FileDownload - Continue downloading data until reception of MD5 hash
*/
if (Mqtt.file_id && Mqtt.file_buffer) {
bool finished = false;
@ -1551,12 +1576,8 @@ void CmndFileDownload(void) {
uint8_t* buffer = Mqtt.file_buffer + Mqtt.file_pos;
Mqtt.md5.add(buffer, write_bytes);
// {"Id":1620385091,"Seq":1,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
// uint32_t sequence = (Mqtt.file_pos / mqtt_file_chuck_size) +1;
// Response_P(PSTR("{\"Id\":%u,\"Seq\":%d,\"Data\":\""), Mqtt.file_id, sequence);
// {"Id":1620385091,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
Response_P(PSTR("{\"Id\":%u,\"Data\":\""), Mqtt.file_id);
// {"Id":117,"Data":"CRJcTQ9fYGF ... OT1BRUlNUVVZXWFk="}
Response_P(PSTR("{\"Id\":%d,\"Data\":\""), Mqtt.file_id); // FileTransferHeaderSize
char base64_data[encode_base64_length(write_bytes)];
encode_base64((unsigned char*)buffer, write_bytes, (unsigned char*)base64_data);
ResponseAppend_P(base64_data);
@ -1566,8 +1587,8 @@ void CmndFileDownload(void) {
} else {
Mqtt.md5.calculate();
// {"Id":1620385091,"Md5":"496fcbb433bbca89833063174d2c5747"}
Response_P(PSTR("{\"Id\":%u,\"Md5\":\"%s\"}"), Mqtt.file_id, Mqtt.md5.toString().c_str());
// {"Id":117,"Md5":"496fcbb433bbca89833063174d2c5747"}
Response_P(PSTR("{\"Id\":%d,\"Md5\":\"%s\"}"), Mqtt.file_id, Mqtt.md5.toString().c_str());
finished = true;
}
@ -1581,7 +1602,7 @@ void CmndFileDownload(void) {
}
else if (XdrvMailbox.data_len) {
Mqtt.file_buffer = nullptr;
Mqtt.file_id = UtcTime();
Mqtt.file_id = (UtcTime() & 0xFE) +1; // Odd id between 1 and 255
if (UPL_SETTINGS == XdrvMailbox.payload) {
uint32_t len = SettingsConfigBackup();
@ -1590,8 +1611,8 @@ void CmndFileDownload(void) {
Mqtt.file_buffer = settings_buffer;
Mqtt.file_size = len;
// {"File":"Config_wemos10_9.4.0.3.dmp","Id":1620385091,"Type":2,"Size":4096}
Response_P(PSTR("{\"File\":\"%s\",\"Id\":%u,\"Type\":%d,\"Size\":%d}"),
// {"File":"Config_wemos10_9.4.0.3.dmp","Id":117,"Type":2,"Size":4096}
Response_P(PSTR("{\"File\":\"%s\",\"Id\":%d,\"Type\":%d,\"Size\":%d}"),
SettingsConfigFilename().c_str(), Mqtt.file_id, Mqtt.file_type, len);
}
}

View File

@ -51,8 +51,6 @@ myfiletype = 2 # Tasmota Settings file type
mypublish = "cmnd/"+mytopic+"/filedownload"
mysubscribe = "stat/"+mytopic+"/FILEDOWNLOAD" # Case sensitive
# Tasmota currently supports MQTT message size of 1040 characters. Base64 adds 0.25 chars
chucksize = 700 # Tasmota max chunk size
Ack_flag = False
file_name = ""

View File

@ -22,7 +22,7 @@
Requirements:
- Python 3.x and Pip:
sudo apt-get install python3 python3-pip
pip3 install paho-mqtt
pip3 install paho-mqtt json
Instructions:
Edit file and change parameters in User Configuration Section
@ -35,6 +35,7 @@ import paho.mqtt.client as mqtt
import time
import base64
import hashlib
import json
# **** Start of User Configuration Section
@ -47,26 +48,29 @@ myfiletype = 2 # Tasmota Settings file type
# **** End of User Configuration Section
# Derive from myfile
myfilesize = 4096
# Derive from time epoch
myid = 1620484815
# Derive fulltopic from broker LWT message
mypublish = "cmnd/"+mytopic+"/fileupload"
mysubscribe = "stat/"+mytopic+"/FILEUPLOAD" # Case sensitive
# Tasmota currently supports MQTT message size of 1040 characters. Base64 adds 0.25 chars
chucksize = 700 # Tasmota max chunk size
# Example does use feedback Acknowledge
Ack_flag = False
file_id = 116 # Even id between 2 and 254
file_chunk_size = 700 # Default Tasmota MQTT max message size
# The callback for when mysubscribe message is received
def on_message(client, userdata, msg):
global Ack_flag
global file_chunk_size
rcv_id = 0
# print("Received message =",str(msg.payload.decode("utf-8")))
root = json.loads(msg.payload.decode("utf-8"))
if "Id" in root: rcv_id = root["Id"]
if rcv_id == file_id:
if "MaxSize" in root: file_chunk_size = root["MaxSize"]
Ack_flag = False
def wait_for_ack():
@ -90,29 +94,34 @@ client.subscribe(mysubscribe)
time_start = time.time()
print("Uploading file "+myfile+" to "+mytopic+" ...")
client.publish(mypublish, "{\"File\":\""+myfile+"\",\"Id\":"+str(myid)+",\"Type\":"+str(myfiletype)+",\"Size\":"+str(myfilesize)+"}")
fo = open(myfile,"rb")
fo.seek(0, 2) # os.SEEK_END
file_size = fo.tell()
fo.seek(0, 0) # os.SEEK_SET
client.publish(mypublish, "{\"File\":\""+myfile+"\",\"Id\":"+str("%3d"%file_id)+",\"Type\":"+str(myfiletype)+",\"Size\":"+str(file_size)+"}")
Ack_flag = True
out_hash_md5 = hashlib.md5()
fo = open(myfile,"rb")
Run_flag = True
while Run_flag:
if wait_for_ack(): # We use Ack here
if wait_for_ack(): # We use Ack here
Run_flag = False
else:
chunk = fo.read(chucksize)
chunk = fo.read(file_chunk_size)
if chunk:
out_hash_md5.update(chunk) # Update hash
out_hash_md5.update(chunk) # Update hash
base64_encoded_data = base64.b64encode(chunk)
base64_data = base64_encoded_data.decode('utf-8')
client.publish(mypublish, "{\"Id\":"+str(myid)+",\"Data\":\""+base64_data+"\"}")
# Message length used by Tasmota (FileTransferHeaderSize)
client.publish(mypublish, "{\"Id\":"+str("%3d"%file_id)+",\"Data\":\""+base64_data+"\"}")
Ack_flag = True
else:
md5_hash = out_hash_md5.hexdigest()
client.publish(mypublish, "{\"Id\":"+str(myid)+",\"Md5\":\""+md5_hash+"\"}")
client.publish(mypublish, "{\"Id\":"+str("%3d"%file_id)+",\"Md5\":\""+md5_hash+"\"}")
Run_flag = False
fo.close()