Prep MQTT file upload/download for TasMesh

This commit is contained in:
Theo Arends 2021-07-01 15:28:18 +02:00
parent b7519b9c90
commit 213746f50f
5 changed files with 82 additions and 52 deletions

View File

@ -37,6 +37,7 @@ struct FMQTT {
uint32_t file_pos = 0; // MQTT file position during upload/download
uint32_t file_size = 0; // MQTT total file size
uint32_t file_type = 0; // MQTT File type (See UploadTypes)
uint32_t chunk_size;
uint8_t* file_buffer = nullptr; // MQTT file buffer
const char* file_password = nullptr; // MQTT password
MD5Builder md5; // MQTT md5
@ -48,6 +49,41 @@ struct FMQTT {
const uint32_t FileTransferHeaderSize = 21; // {"Id":116,"Data":""}<null>
void MqttFileValidate(uint32_t error) {
if (error) {
FMqtt.file_buffer = nullptr;
MqttDisableLogging(false);
if (4 == error) {
ResponseCmndChar(PSTR(D_JSON_ABORTED));
} else {
char error_txt[20];
snprintf_P(error_txt, sizeof(error_txt), PSTR(D_JSON_ERROR " %d"), error);
ResponseCmndChar(error_txt);
}
}
}
void MqttFilePublish(void) {
if (!FMqtt.file_buffer) {
MqttDisableLogging(false);
FMqtt.file_id = 0;
FMqtt.file_size = 0;
FMqtt.file_type = 0;
FMqtt.file_binary = false;
FMqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory
FMqtt.file_password = nullptr;
}
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command);
ResponseClear();
}
/*********************************************************************************************\
* MQTT Upload to device
\*********************************************************************************************/
uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
if (XdrvMailbox.grpflg) { return 5; } // No grouptopic supported
@ -105,6 +141,25 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
ResponseCmndChar(PSTR(D_JSON_STARTED));
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command); // Enforce stat/wemos10/FILEUPLOAD
/*
The upload chunk size is the data size of the payload.
The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE contains
- Header of 5 bytes (MQTT_MAX_HEADER_SIZE)
- Topic string terminated with a zero (stat/demo/FILEUPLOAD<null>)
- Payload ({"Id":116,"Data":"<base64 encoded chunk_size>"}<null>) or (<binary data>)
*/
const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE
FMqtt.chunk_size = MqttClient.getBufferSize() - PubSubClientHeaderSize - FMqtt.topic_size -1;
#ifdef USE_TASMESH
if (MESHroleNode()) {
// TasMesh default payload size (topic+payload) is 160
if (MESHmaxPayloadSize() < FMqtt.chunk_size) {
FMqtt.chunk_size = MESHmaxPayloadSize();
}
}
#endif // USE_TASMESH
}
else if (((FMqtt.file_id > 0) && (FMqtt.file_id != rcv_id)) || (0 == XdrvMailbox.payload)) {
// Error receiving data
@ -130,37 +185,6 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
return 0; // No error
}
void MqttFileValidate(uint32_t error) {
if (error) {
FMqtt.file_buffer = nullptr;
MqttDisableLogging(false);
if (4 == error) {
ResponseCmndChar(PSTR(D_JSON_ABORTED));
} else {
char error_txt[20];
snprintf_P(error_txt, sizeof(error_txt), PSTR(D_JSON_ERROR " %d"), error);
ResponseCmndChar(error_txt);
}
}
}
void MqttFilePublish(void) {
if (!FMqtt.file_buffer) {
MqttDisableLogging(false);
FMqtt.file_id = 0;
FMqtt.file_size = 0;
FMqtt.file_type = 0;
FMqtt.file_binary = false;
FMqtt.file_md5 = (const char*) nullptr; // Force deallocation of the String internal memory
FMqtt.file_password = nullptr;
}
MqttPublishPrefixTopic_P(STAT, XdrvMailbox.command);
ResponseClear();
}
void CmndFileUpload(void) {
/*
Upload <MaxSize> bytes chunks of data either base64 encoded or binary with MD5 hash
@ -257,20 +281,11 @@ void CmndFileUpload(void) {
if ((FMqtt.file_pos < FMqtt.file_size) || (FMqtt.file_md5.length() != 32)) {
MqttDisableLogging(true);
/*
The upload chunk size is the data size of the payload.
The PubSubClient upload buffer with length MQTT_MAX_PACKET_SIZE contains
- Header of 5 bytes (MQTT_MAX_HEADER_SIZE)
- Topic string terminated with a zero (stat/demo/FILEUPLOAD<null>)
- Payload ({"Id":116,"Data":"<base64 encoded chunk_size>"}<null>) or (<binary data>)
*/
const uint32_t PubSubClientHeaderSize = 5; // MQTT_MAX_HEADER_SIZE
uint32_t chunk_size = MqttClient.getBufferSize() - PubSubClientHeaderSize - FMqtt.topic_size -1;
if (!binary_data) {
chunk_size = (((chunk_size - FileTransferHeaderSize) / 4) * 3) -2; // Calculate base64 chunk size
if (!binary_data && !FMqtt.file_pos) {
FMqtt.chunk_size = (((FMqtt.chunk_size - FileTransferHeaderSize) / 4) * 3) -2; // Calculate base64 chunk size
}
// {"Id":116,"MaxSize":"765"}
Response_P(PSTR("{\"Id\":%d,\"MaxSize\":%d}"), FMqtt.file_id, chunk_size);
Response_P(PSTR("{\"Id\":%d,\"MaxSize\":%d}"), FMqtt.file_id, FMqtt.chunk_size);
} else {
FMqtt.md5.calculate();
if (strcasecmp(FMqtt.file_md5.c_str(), FMqtt.md5.toString().c_str())) {
@ -310,6 +325,10 @@ void CmndFileUpload(void) {
MqttFilePublish();
}
/*********************************************************************************************\
* MQTT Download from device
\*********************************************************************************************/
uint32_t MqttFileDownloadValidate(void) {
if (XdrvMailbox.grpflg) { return 5; } // No grouptopic supported
@ -377,9 +396,6 @@ void CmndFileDownload(void) {
*/
if (FMqtt.file_buffer) {
if (FMqtt.file_pos < FMqtt.file_size) {
#ifdef USE_TASMESH
uint32_t chunk_size = 2048;
#else
uint32_t chunk_size = 4096;
if (!FMqtt.file_binary) {
/*
@ -389,7 +405,12 @@ void CmndFileDownload(void) {
*/
chunk_size = (((ResponseSize() - FileTransferHeaderSize) / 4) * 3) -2;
}
#endif
#ifdef USE_TASMESH
if (MESHroleNode() && (chunk_size > 2048)) {
chunk_size = 2048;
}
#endif // USE_TASMESH
uint32_t bytes_left = FMqtt.file_size - FMqtt.file_pos;
uint32_t write_bytes = (bytes_left < chunk_size) ? bytes_left : chunk_size;
uint8_t* buffer = FMqtt.file_buffer + FMqtt.file_pos;

View File

@ -145,13 +145,16 @@ def on_message(client, userdata, msg):
Ack_flag = False
def wait_for_ack():
timeout = 100
global Err_flag
timeout = 500
while Ack_flag and Err_flag == False and timeout > 0:
time.sleep(0.01)
timeout = timeout -1
if 0 == timeout:
print("Error: Timeout")
Err_flag = True
return Ack_flag

View File

@ -50,7 +50,7 @@ myfiletype = 1 # Tasmota firmware file type
# **** End of User Configuration Section
use_base64 = False
use_base64 = True
# Derive fulltopic from broker LWT message
mypublish = "cmnd/"+mytopic+"/fileupload"
@ -104,13 +104,16 @@ def on_message(client, userdata, msg):
Ack_flag = False
def wait_for_ack():
timeout = 100
global Err_flag
timeout = 500
while Ack_flag and Err_flag == False and timeout > 0:
time.sleep(0.01)
timeout = timeout -1
if 0 == timeout:
print("Error: Timeout")
Err_flag = True
return Ack_flag

View File

@ -44,7 +44,7 @@ broker_port = 1883 # MQTT broker port
mypassword = "" # Tasmota MQTT password
mytopic = "demo" # Tasmota MQTT topic
myfile = "Config_demo_9.4.0.4.dmp" # Tasmota Settings file name
myfile = "Config_demo_9.5.0.1.dmp" # Tasmota Settings file name
myfiletype = 2 # Tasmota Settings file type
# **** End of User Configuration Section
@ -103,13 +103,16 @@ def on_message(client, userdata, msg):
Ack_flag = False
def wait_for_ack():
timeout = 100
global Err_flag
timeout = 500
while Ack_flag and Err_flag == False and timeout > 0:
time.sleep(0.01)
timeout = timeout -1
if 0 == timeout:
print("Error: Timeout")
Err_flag = True
return Ack_flag