Add stability to MQTT file control

This commit is contained in:
Theo Arends 2021-05-21 16:19:40 +02:00
parent 0202613014
commit 8650875ceb
6 changed files with 103 additions and 35 deletions

View File

@ -60,7 +60,7 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
// Check buffer size
if (UPL_SETTINGS == FMqtt.file_type) {
if (FMqtt.file_size > 4096) {
if (FMqtt.file_size > sizeof(Settings)) {
return 2; // Settings supports max 4k size
}
} else { // Check enough flash space for intermediate upload
@ -72,23 +72,31 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
}
// Init file_buffer
if (UPL_TASMOTA == FMqtt.file_type) {
if (Update.begin(FMqtt.file_size)) {
FMqtt.file_buffer = &FMqtt.file_id; // Dummy buffer
TasmotaGlobal.blinks = 201;
TasmotaGlobal.blinkstate = true; // Stay lit
SettingsSave(1); // Free flash for OTA update
}
}
else if (UPL_SETTINGS == FMqtt.file_type) {
if (UPL_SETTINGS == FMqtt.file_type) {
if (SettingsConfigBackup()) {
FMqtt.file_buffer = settings_buffer;
}
}
else {
return 3; // Invalid file type
if (UPL_TASMOTA == FMqtt.file_type) {
if (Update.begin(FMqtt.file_size)) {
FMqtt.file_buffer = &FMqtt.file_id; // Dummy buffer
// FMqtt.file_buffer = (uint8_t*)malloc(SPI_FLASH_SEC_SIZE);
// if (FMqtt.file_buffer) {
SetLedLink(1);
SettingsSave(1); // Free flash for OTA update
// }
} else {
Update.end(true);
}
}
else {
return 3; // Invalid file type
}
}
if (!FMqtt.file_buffer) { return 6; } // No buffer
FMqtt.file_id = rcv_id;
FMqtt.file_pos = 0;
@ -101,13 +109,22 @@ uint32_t MqttFileUploadValidate(uint32_t rcv_id) {
else if (((FMqtt.file_id > 0) && (FMqtt.file_id != rcv_id)) || (0 == XdrvMailbox.payload)) {
// Error receiving data
if (UPL_TASMOTA == FMqtt.file_type) {
Update.end(true);
TasmotaGlobal.blinkstate = false; // Turn led off
}
else if (UPL_SETTINGS == FMqtt.file_type) {
if (UPL_SETTINGS == FMqtt.file_type) {
SettingsBufferFree();
}
else {
if (UPL_TASMOTA == FMqtt.file_type) {
Update.end(true);
SetLedLink(0);
}
/*
if (FMqtt.file_buffer != nullptr) {
free(FMqtt.file_buffer);
FMqtt.file_buffer = nullptr;
}
*/
}
return 4; // Upload aborted
}
return 0; // No error
@ -117,7 +134,7 @@ void MqttFileValidate(uint32_t error) {
if (error) {
FMqtt.file_buffer = nullptr;
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
MqttDisableLogging(false);
if (4 == error) {
ResponseCmndChar(PSTR(D_JSON_ABORTED));
@ -131,7 +148,8 @@ void MqttFileValidate(uint32_t error) {
void MqttFilePublish(void) {
if (!FMqtt.file_buffer) {
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
MqttDisableLogging(false);
FMqtt.file_id = 0;
FMqtt.file_size = 0;
FMqtt.file_type = 0;
@ -212,12 +230,15 @@ void CmndFileUpload(void) {
uint32_t read_bytes = (bytes_left < rcvd_bytes) ? bytes_left : rcvd_bytes;
FMqtt.md5.add(raw_data, read_bytes);
if (UPL_TASMOTA == FMqtt.file_type) {
Update.write(raw_data, read_bytes);
} else {
if (UPL_SETTINGS == FMqtt.file_type) {
uint8_t* buffer = FMqtt.file_buffer + FMqtt.file_pos;
memcpy(buffer, raw_data, read_bytes);
}
else {
if (UPL_TASMOTA == FMqtt.file_type) {
Update.write(raw_data, read_bytes);
}
}
if (!binary_data) {
free(raw_data);
@ -227,14 +248,14 @@ void CmndFileUpload(void) {
}
if ((FMqtt.file_pos > rcvd_bytes) && ((FMqtt.file_pos % 102400) <= rcvd_bytes)) {
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
AddLog(LOG_LEVEL_DEBUG, PSTR(D_LOG_UPLOAD "Progress %d kB"), (FMqtt.file_pos / 10240) * 10);
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
}
}
if ((FMqtt.file_pos < FMqtt.file_size) || (FMqtt.file_md5.length() != 32)) {
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
MqttDisableLogging(true);
/*
The upload chunk size is the data size of the payload.
@ -259,22 +280,29 @@ void CmndFileUpload(void) {
// Process upload data en free buffer
ResponseCmndDone();
if (UPL_TASMOTA == FMqtt.file_type) {
if (!Update.end(true)) {
TasmotaGlobal.blinkstate = false; // Turn led off
ResponseCmndFailed();
} else {
TasmotaGlobal.restart_flag = 2; // Restart to load new firmware
}
}
else if (UPL_SETTINGS == FMqtt.file_type) {
if (UPL_SETTINGS == FMqtt.file_type) {
if (!SettingsConfigRestore()) {
ResponseCmndFailed();
} else {
TasmotaGlobal.restart_flag = 2; // Restart to load new settings
}
}
else {
if (UPL_TASMOTA == FMqtt.file_type) {
if (!Update.end(true)) {
SetLedLink(0);
ResponseCmndFailed();
} else {
TasmotaGlobal.restart_flag = 2; // Restart to load new firmware
}
}
/*
if (FMqtt.file_buffer != nullptr) {
free(FMqtt.file_buffer);
FMqtt.file_buffer = nullptr;
}
*/
}
}
FMqtt.file_buffer = nullptr;
}
@ -321,7 +349,7 @@ uint32_t MqttFileDownloadValidate(void) {
snprintf_P(payload, sizeof(payload), S_JSON_COMMAND_SVALUE, XdrvMailbox.command, PSTR(D_JSON_STARTED));
MqttPublishPayloadPrefixTopic_P(STAT, XdrvMailbox.command, payload); // Enforce stat/wemos10/FILEUPLOAD
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide upload data logging
MqttDisableLogging(true);
}
else if (0 == XdrvMailbox.payload) {

View File

@ -108,6 +108,7 @@ struct MQTT {
bool connected = false; // MQTT virtual connection status
bool allowed = false; // MQTT enabled and parameters valid
bool mqtt_tls = false; // MQTT TLS is enabled
bool disable_logging = false; // Temporarly disable logging on some commands
} Mqtt;
#ifdef USE_MQTT_TLS
@ -173,6 +174,12 @@ void MakeValidMqtt(uint32_t option, char* str) {
}
}
void MqttDisableLogging(bool state) {
// Disable logging only on repeating MQTT messages
Mqtt.disable_logging = state;
TasmotaGlobal.masterlog_level = (Mqtt.disable_logging) ? LOG_LEVEL_DEBUG_MORE : LOG_LEVEL_NONE;
}
/*********************************************************************************************\
* MQTT driver specific code need to provide the following functions:
*
@ -553,6 +560,10 @@ void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len
char data[data_len +1];
memcpy(data, mqtt_data, sizeof(data));
if (Mqtt.disable_logging) {
TasmotaGlobal.masterlog_level = LOG_LEVEL_DEBUG_MORE; // Hide logging
}
// MQTT pre-processing
XdrvMailbox.index = strlen(topic);
XdrvMailbox.data_len = data_len;
@ -563,6 +574,10 @@ void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len
ShowSource(SRC_MQTT);
CommandHandler(topic, data, data_len);
if (Mqtt.disable_logging) {
TasmotaGlobal.masterlog_level = LOG_LEVEL_NONE; // Enable logging
}
}
/*********************************************************************************************/

View File

@ -67,7 +67,6 @@ file_md5 = ""
def on_message(client, userdata, msg):
global Ack_flag
global Err_flag
global Run_flag
global file_name
global file_id
global file_type
@ -88,6 +87,10 @@ def on_message(client, userdata, msg):
if root:
if "FileDownload" in root:
rcv_code = root["FileDownload"]
if "Aborted" in rcv_code:
print("Error: Aborted")
Err_flag = True
return
if "Started" in rcv_code:
return
if "Error" in rcv_code:

View File

@ -76,6 +76,14 @@ def on_message(client, userdata, msg):
root = json.loads(msg.payload.decode("utf-8"))
if "FileUpload" in root:
rcv_code = root["FileUpload"]
if "Aborted" in rcv_code:
print("Error: Aborted")
Err_flag = True
return
if "MD5 mismatch" in rcv_code:
print("Error: MD5 mismatch")
Err_flag = True
return
if "Started" in rcv_code:
return
if "Error" in rcv_code:
@ -119,6 +127,7 @@ fo = open(myfile,"rb")
fo.seek(0, 2) # os.SEEK_END
file_size = fo.tell()
fo.seek(0, 0) # os.SEEK_SET
file_pos = 0
client.publish(mypublish, "{\"Password\":\""+mypassword+"\",\"File\":\""+myfile+"\",\"Id\":"+str("%3d"%file_id)+",\"Type\":"+str(myfiletype)+",\"Size\":"+str(file_size)+"}")
Ack_flag = True
@ -142,6 +151,11 @@ while Run_flag:
client.publish(mypublish, "{\"Id\":"+str("%3d"%file_id)+",\"Data\":\""+base64_data+"\"}")
else:
client.publish(mypublish+"201", chunk)
file_pos = file_pos + file_chunk_size
if file_pos % 102400 < file_chunk_size:
progress = round((file_pos / 10240)) * 10
print("Progress "+str("%d"%progress)+" kB")
Ack_flag = True
else:

View File

@ -75,6 +75,14 @@ def on_message(client, userdata, msg):
root = json.loads(msg.payload.decode("utf-8"))
if "FileUpload" in root:
rcv_code = root["FileUpload"]
if "Aborted" in rcv_code:
print("Error: Aborted")
Err_flag = True
return
if "MD5 mismatch" in rcv_code:
print("Error: MD5 mismatch")
Err_flag = True
return
if "Started" in rcv_code:
return
if "Error" in rcv_code: