Merge pull request #4304 from curzon01/development

add v6.3.0.4 config changes
This commit is contained in:
Theo Arends 2018-11-07 15:01:28 +01:00 committed by GitHub
commit c7d920f1df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 297 additions and 182 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
VER = '2.0.0004'
VER = '2.0.0005'
"""
decode-config.py - Backup/Restore Sonoff-Tasmota configuration data
@ -155,6 +155,7 @@ def ModuleImportError(module):
sys.exit(ExitCode.MODULE_NOT_FOUND)
try:
from datetime import datetime
import copy
import struct
import socket
import re
@ -445,8 +446,8 @@ Setting_5_10_0 = {
'pulse_counter_debounce': ('<H', 0x5D2, None),
'rf_code': ('B', 0x5D4, [17,9], '"0x{:02x}".format($)'),
}
Setting_5_11_0 = Setting_5_10_0
# ========
Setting_5_11_0 = copy.deepcopy(Setting_5_10_0)
Setting_5_11_0.update({
'display_model': ('B', 0x2D2, None),
'display_mode': ('B', 0x2D3, None),
@ -461,15 +462,15 @@ Setting_5_11_0['flag'][0].update ({
'light_signal': ('<L', (0x010, 1, 18), None),
})
Setting_5_11_0.pop('mqtt_fingerprinth',None)
Setting_5_12_0 = Setting_5_11_0
# ========
Setting_5_12_0 = copy.deepcopy(Setting_5_11_0)
Setting_5_12_0['flag'][0].update ({
'hass_discovery': ('<L', (0x010, 1, 19), None),
'not_power_linked': ('<L', (0x010, 1, 20), None),
'no_power_on_check': ('<L', (0x010, 1, 21), None),
})
Setting_5_13_1 = Setting_5_12_0
# ========
Setting_5_13_1 = copy.deepcopy(Setting_5_12_0)
Setting_5_13_1.update({
'baudrate': ('B', 0x09D, None, ('$ * 1200','$ / 1200') ),
'mqtt_fingerprint': ('20s', 0x1AD, [2]),
@ -505,8 +506,8 @@ Setting_5_13_1['flag'][0].update ({
'rules_once': ('<L', (0x010, 1, 24), None),
'knx_enabled': ('<L', (0x010, 1, 25), None),
})
Setting_5_14_0 = Setting_5_13_1
# ========
Setting_5_14_0 = copy.deepcopy(Setting_5_13_1)
Setting_5_14_0.update({
'tflag': ({
'raw': ('<H', 0x2E2, None, ('"0x{:04x}".format($)', None)),
@ -523,8 +524,8 @@ Setting_5_14_0['flag'][0].update ({
'device_index_enable': ('<L', (0x010, 1, 26), None),
})
Setting_5_14_0['flag'][0].pop ('rules_once',None)
Setting_6_0_0 = Setting_5_14_0
# ========
Setting_6_0_0 = copy.deepcopy(Setting_5_14_0)
Setting_6_0_0.update({
'cfg_holder': ('<H', 0x000, None),
'cfg_size': ('<H', 0x002, None, (None, None)),
@ -548,8 +549,8 @@ Setting_6_0_0.update({
Setting_6_0_0['flag'][0].update ({
'knx_enable_enhancement': ('<L', (0x010, 1, 27), None),
})
Setting_6_1_1 = Setting_6_0_0
# ========
Setting_6_1_1 = copy.deepcopy(Setting_6_0_0)
Setting_6_1_1.update({
'flag3': ('<L', 0x3A0, None, '"0x{:08x}".format($)'),
'switchmode': ('B', 0x3A4, [8]),
@ -568,8 +569,8 @@ Setting_6_1_1['flag'][0].update ({
'ir_receive_decimal': ('<L', (0x010, 1, 29), None),
'hass_light': ('<L', (0x010, 1, 30), None),
})
Setting_6_2_1 = Setting_6_1_1
# ========
Setting_6_2_1 = copy.deepcopy(Setting_6_1_1)
Setting_6_2_1.update({
'rule_stop': ({
'raw': ('B', 0x1A7, None, (None, None)),
@ -597,31 +598,31 @@ Setting_6_2_1['flag'][0].update ({
Setting_6_2_1['flag2'][0].update ({
'axis_resolution': ('<L', (0x5BC, 2, 13), None),
})
Setting_6_2_1_2 = Setting_6_2_1
# ========
Setting_6_2_1_2 = copy.deepcopy(Setting_6_2_1)
Setting_6_2_1_2['flag3'][0].update ({
'user_esp8285_enable': ('<L', (0x3A0, 1, 1), None),
})
Setting_6_2_1_3 = Setting_6_2_1_2
# ========
Setting_6_2_1_3 = copy.deepcopy(Setting_6_2_1_2)
Setting_6_2_1_3['flag2'][0].update ({
'frequency_resolution': ('<L', (0x5BC, 2, 11), None),
})
Setting_6_2_1_3['flag3'][0].update ({
'time_append_timezone': ('<L', (0x3A0, 1, 2), None),
})
Setting_6_2_1_6 = Setting_6_2_1_3
# ========
Setting_6_2_1_6 = copy.deepcopy(Setting_6_2_1_3)
Setting_6_2_1_6.update({
'energy_frequency_calibration': ('<L', 0x7C8, None),
})
Setting_6_2_1_10 = Setting_6_2_1_6
# ========
Setting_6_2_1_10 = copy.deepcopy(Setting_6_2_1_6)
Setting_6_2_1_10.update({
'rgbwwTable': ('B', 0x71A, [5]),
})
Setting_6_2_1_14 = Setting_6_2_1_10
# ========
Setting_6_2_1_14 = copy.deepcopy(Setting_6_2_1_10)
Setting_6_2_1_14.update({
'weight_item': ('<H', 0x7BC, None),
'weight_max': ('<H', 0x7BE, None, ('float($) / 10', 'int($ * 10)')),
@ -632,23 +633,23 @@ Setting_6_2_1_14.update({
Setting_6_2_1_14['flag2'][0].update ({
'weight_resolution': ('<L', (0x5BC, 2, 9), None),
})
Setting_6_2_1_19 = Setting_6_2_1_14
# ========
Setting_6_2_1_19 = copy.deepcopy(Setting_6_2_1_14)
Setting_6_2_1_19.update({
'weight_max': ('<L', 0x7B8, None, ('float($) / 10', 'int($ * 10)')),
})
Setting_6_2_1_20 = Setting_6_2_1_19
# ========
Setting_6_2_1_20 = copy.deepcopy(Setting_6_2_1_19)
Setting_6_2_1_20['flag3'][0].update ({
'gui_hostname_ip': ('<L', (0x3A0, 1, 3), None),
})
Setting_6_3_0 = Setting_6_2_1_20
# ========
Setting_6_3_0 = copy.deepcopy(Setting_6_2_1_20)
Setting_6_3_0.update({
'energy_kWhtotal_time': ('<L', 0x7B4, None),
})
Setting_6_3_0_2 = Setting_6_3_0
# ========
Setting_6_3_0_2 = copy.deepcopy(Setting_6_3_0)
Setting_6_3_0_2.update({
'timezone_minutes': ('B', 0x66D, None),
})
@ -656,8 +657,22 @@ Setting_6_3_0_2['flag'][0].pop('rules_once',None)
Setting_6_3_0_2['flag'][0].update ({
'pressure_conversion': ('<L', (0x010, 1, 24), None),
})
# ========
Setting_6_3_0_4 = copy.deepcopy(Setting_6_3_0_2)
Setting_6_3_0_4.update({
'drivers': ('<L', 0x794, [3]),
'monitors': ('<L', 0x7A0, None),
'sensors': ('<L', 0x7A4, [3]),
'displays': ('<L', 0x7B0, None),
})
Setting_6_3_0_4['flag3'][0].update ({
'tuya_apply_o20': ('<L', (0x3A0, 1, 4), None),
})
# ========
Settings = [
(0x6030004, 0xe00, Setting_6_3_0_4),
(0x6030002, 0xe00, Setting_6_3_0_2),
(0x6030000, 0xe00, Setting_6_3_0),
(0x6020114, 0xe00, Setting_6_2_1_20),
@ -699,32 +714,27 @@ def GetTemplateSizes():
def GetTemplateSetting(decode_cfg):
"""
Search for version, template, size and settings to be used depending on given binary config data
Search for version, size and settings to be used depending on given binary config data
@param decode_cfg:
binary config data (decrypted)
@return:
version, template, size, settings to use; None if version is invalid
version, size, settings to use; None if version is invalid
"""
version = 0x0
template = size = setting = None
try:
version = GetField(decode_cfg, 'version', Setting_6_2_1['version'], raw=True)
# search setting definition
template = None
setting = None
size = None
for cfg in Settings:
if version >= cfg[0]:
template = cfg
size = template[1]
setting = template[2]
break
except:
pass
size = setting = None
return version, template, size, setting
version = GetField(decode_cfg, 'version', Setting_6_2_1['version'], raw=True)
# search setting definition top-down
for cfg in sorted(Settings, key=lambda s: s[0], reverse=True):
if version >= cfg[0]:
version = cfg[0]
size = cfg[1]
setting = cfg[2].copy()
break
return version, size, setting
class LogType:
@ -944,6 +954,8 @@ def GetVersionStr(version):
@return:
version string
"""
if isinstance(version, (unicode,str)):
version = int(version, 0)
major = ((version>>24) & 0xff)
minor = ((version>>16) & 0xff)
release = ((version>> 8) & 0xff)
@ -1007,16 +1019,29 @@ def MakeFilename(filename, filetype, decode_cfg):
if 'hostname' in decode_cfg:
filename = filename.replace('@h', decode_cfg['hostname'] )
filename = MakeValidFilename(filename)
ext = ''
dirname = basename = ext = ''
try:
name, ext = os.path.splitext(filename)
dirname = os.path.normpath(os.path.dirname(filename))
basename = os.path.basename(filename)
name, ext = os.path.splitext(basename)
except:
pass
name = MakeValidFilename(name)
if len(ext) and ext[0]=='.':
ext = ext[1:]
if filetype is not None and args.extension and (len(ext)<2 or all(c.isdigit() for c in ext)):
filename += '.'+filetype.lower()
ext = filetype.lower()
if len(ext):
name_ext = name+'.'+ext
else:
name_ext = name
try:
filename = os.path.join(dirname, name_ext)
except:
pass
return filename
@ -1042,27 +1067,62 @@ def MakeUrl(host, port=80, location=''):
slocation=location )
def PullTasmotaConfig():
def LoadTasmotaConfig(filename):
"""
Pull config from Tasmota device/file
Load config from Tasmota file
@param filename:
filename to load
@return:
binary config data (encrypted) or None on error
"""
if args.device is not None:
# read config direct from device via http
encode_cfg = None
# read config from a file
if not os.path.isfile(filename): # check file exists
exit(ExitCode.FILE_NOT_FOUND, "File '{}' not found".format(filename),line=inspect.getlineno(inspect.currentframe()))
try:
tasmotafile = open(filename, "rb")
encode_cfg = tasmotafile.read()
tasmotafile.close()
except Exception, e:
exit(e[0], "'{}' {}".format(filename, e[1]),line=inspect.getlineno(inspect.currentframe()))
return encode_cfg
def PullTasmotaConfig(host, port, username=DEFAULTS['source']['username'], password=None):
"""
Pull config from Tasmota device
@param host:
hostname or IP of Tasmota device
@param port:
http port of Tasmota device
@param username:
optional username for Tasmota web login
@param password
optional password for Tasmota web login
@return:
binary config data (encrypted) or None on error
"""
encode_cfg = None
# read config direct from device via http
c = pycurl.Curl()
buffer = io.BytesIO()
c.setopt(c.WRITEDATA, buffer)
header = HTTPHeader()
c.setopt(c.HEADERFUNCTION, header.store)
c.setopt(c.FOLLOWLOCATION, True)
c.setopt(c.URL, MakeUrl(args.device, args.port, 'dl'))
if args.username is not None and args.password is not None:
c.setopt(c.URL, MakeUrl(host, port, 'dl'))
if username is not None and password is not None:
c.setopt(c.HTTPAUTH, c.HTTPAUTH_BASIC)
c.setopt(c.USERPWD, args.username + ':' + args.password)
c.setopt(c.USERPWD, username + ':' + password)
c.setopt(c.VERBOSE, False)
responsecode = 200
@ -1079,21 +1139,11 @@ def PullTasmotaConfig():
exit(responsecode, 'HTTP result: {}'.format(header.response()),line=inspect.getlineno(inspect.currentframe()))
elif header.contenttype()!='application/octet-stream':
exit(ExitCode.DOWNLOAD_CONFIG_ERROR, "Device did not response properly, may be Tasmota webserver admin mode is disabled (WebServer 2)",line=inspect.getlineno(inspect.currentframe()))
encode_cfg = buffer.getvalue()
elif args.tasmotafile is not None:
# read config from a file
if not os.path.isfile(args.tasmotafile): # check file exists
exit(ExitCode.FILE_NOT_FOUND, "File '{}' not found".format(args.tasmotafile),line=inspect.getlineno(inspect.currentframe()))
try:
tasmotafile = open(args.tasmotafile, "rb")
encode_cfg = tasmotafile.read()
tasmotafile.close()
except Exception, e:
exit(e[0], "'{}' {}".format(args.tasmotafile, e[1]),line=inspect.getlineno(inspect.currentframe()))
else:
return None
encode_cfg = buffer.getvalue()
except:
pass
return encode_cfg
@ -1238,14 +1288,14 @@ def GetFieldDef(fielddef):
<format>, <baseaddr>, <bits>, <bitshift>, <datadef>, <convert>
undefined items can be None
"""
_format = baseaddr = datadef = convert = None
format = baseaddr = datadef = convert = None
bits = bitshift = 0
if len(fielddef)==3:
# def without convert tuple
_format, baseaddr, datadef = fielddef
format, baseaddr, datadef = fielddef
elif len(fielddef)==4:
# def with convert tuple
_format, baseaddr, datadef, convert = fielddef
format, baseaddr, datadef, convert = fielddef
if isinstance(baseaddr, (list,tuple)):
baseaddr, bits, bitshift = baseaddr
@ -1253,7 +1303,7 @@ def GetFieldDef(fielddef):
if isinstance(datadef, int):
# convert single int into list with one item
datadef = [datadef]
return _format, baseaddr, bits, bitshift, datadef, convert
return format, baseaddr, bits, bitshift, datadef, convert
def MakeFieldBaseAddr(baseaddr, bits, bitshift):
@ -1293,7 +1343,7 @@ def ConvertFieldValue(value, fielddef, read=True, raw=False):
@return:
(un)converted value
"""
_format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
# call password functions even if raw value should be processed
if callable(convert) and (convert==passwordread or convert==passwordwrite):
@ -1345,15 +1395,15 @@ def GetFieldMinMax(fielddef):
'f': (sys.float_info.min, sys.float_info.max),
'd': (sys.float_info.min, sys.float_info.max),
}
_format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
_min = 0
_max = 0
if _format[-1:] in minmax:
_min, _max = minmax[_format[-1:]]
elif _format[-1:] in ['s','p']:
if format[-1:] in minmax:
_min, _max = minmax[format[-1:]]
elif format[-1:] in ['s','p']:
# s and p may have a prefix as length
match = re.search("\s*(\d+)", _format)
match = re.search("\s*(\d+)", format)
if match:
_max=int(match.group(0))
return _min,_max
@ -1371,7 +1421,7 @@ def GetFieldLength(fielddef):
"""
length=0
_format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
if datadef is not None:
# datadef contains a list
@ -1388,10 +1438,10 @@ def GetFieldLength(fielddef):
length += GetFieldLength( (fielddef[0], fielddef[1], None) )
else:
if isinstance(_format, dict):
# -> iterate through _format
if isinstance(format, dict):
# -> iterate through format
addr = None
setting = _format
setting = format
for name in setting:
_dummy1, baseaddr, bits, bitshift, _dummy2, _dummy3 = GetFieldDef(setting[name])
_len = GetFieldLength(setting[name])
@ -1400,17 +1450,17 @@ def GetFieldLength(fielddef):
length += _len
else:
if _format[-1:] in ['b','B','c','?']:
if format[-1:] in ['b','B','c','?']:
length=1
elif _format[-1:] in ['h','H']:
elif format[-1:] in ['h','H']:
length=2
elif _format[-1:] in ['i','I','l','L','f']:
elif format[-1:] in ['i','I','l','L','f']:
length=4
elif _format[-1:] in ['q','Q','d']:
elif format[-1:] in ['q','Q','d']:
length=8
elif _format[-1:] in ['s','p']:
elif format[-1:] in ['s','p']:
# s and p may have a prefix as length
match = re.search("\s*(\d+)", _format)
match = re.search("\s*(\d+)", format)
if match:
length=int(match.group(0))
@ -1429,18 +1479,18 @@ def GetSubfieldDef(fielddef):
"""
subfielddef = None
_format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
if isinstance(datadef, list) and len(datadef)>1:
if len(fielddef)<4:
subfielddef = (_format, MakeFieldBaseAddr(baseaddr, bits, bitshift), datadef[1:])
subfielddef = (format, MakeFieldBaseAddr(baseaddr, bits, bitshift), datadef[1:])
else:
subfielddef = (_format, MakeFieldBaseAddr(baseaddr, bits, bitshift), datadef[1:], convert)
subfielddef = (format, MakeFieldBaseAddr(baseaddr, bits, bitshift), datadef[1:], convert)
# single array
else:
if len(fielddef)<4:
subfielddef = (_format, MakeFieldBaseAddr(baseaddr, bits, bitshift), None)
subfielddef = (format, MakeFieldBaseAddr(baseaddr, bits, bitshift), None)
else:
subfielddef = (_format, MakeFieldBaseAddr(baseaddr, bits, bitshift), None, convert)
subfielddef = (format, MakeFieldBaseAddr(baseaddr, bits, bitshift), None, convert)
return subfielddef
@ -1472,7 +1522,7 @@ def GetField(dobj, fieldname, fielddef, raw=False, addroffset=0):
return result
# get field definition
_format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
# <datadef> contains a integer list
if isinstance(datadef, list):
@ -1486,19 +1536,19 @@ def GetField(dobj, fieldname, fielddef, raw=False, addroffset=0):
offset += length
# <format> contains a dict
elif isinstance(_format, dict):
elif isinstance(format, dict):
config = {}
for name in _format: # -> iterate through _format
for name in format: # -> iterate through format
if name != 'raw' or args.jsonrawkeys:
config[name] = GetField(dobj, name, _format[name], raw=raw, addroffset=addroffset)
config[name] = GetField(dobj, name, format[name], raw=raw, addroffset=addroffset)
result = config
# a simple value
elif isinstance(_format, (str, bool, int, float, long)):
elif isinstance(format, (str, bool, int, float, long)):
if GetFieldLength(fielddef) != 0:
result = struct.unpack_from(_format, dobj, baseaddr+addroffset)[0]
result = struct.unpack_from(format, dobj, baseaddr+addroffset)[0]
if not _format[-1:].lower() in ['s','p']:
if not format[-1:].lower() in ['s','p']:
if bitshift>=0:
result >>= bitshift
else:
@ -1507,7 +1557,7 @@ def GetField(dobj, fieldname, fielddef, raw=False, addroffset=0):
result &= (1<<bits)-1
# additional processing for strings
if _format[-1:].lower() in ['s','p']:
if format[-1:].lower() in ['s','p']:
# use left string until \0
s = str(result).split('\0')[0]
# remove character > 127
@ -1516,7 +1566,7 @@ def GetField(dobj, fieldname, fielddef, raw=False, addroffset=0):
result = ConvertFieldValue(result, fielddef, read=True, raw=raw)
else:
exit(ExitCode.INTERNAL_ERROR, "Wrong mapping format definition: '{}'".format(_format), typ=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe()))
exit(ExitCode.INTERNAL_ERROR, "Wrong mapping format definition: '{}'".format(format), typ=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe()))
return result
@ -1538,7 +1588,7 @@ def SetField(dobj, fieldname, fielddef, restore, raw=False, addroffset=0, filena
@param restore
restore mapping with the new value(s)
"""
_format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef)
fieldname = str(fieldname)
if fieldname == 'raw' and not args.jsonrawkeys:
@ -1547,7 +1597,7 @@ def SetField(dobj, fieldname, fielddef, restore, raw=False, addroffset=0, filena
# do not write readonly values
if isinstance(convert, (list,tuple)) and len(convert)>1 and convert[1]==None:
if args.debug:
print >> sys.stderr, "SetField(): Readonly '{}' using '{}'/{}{} @{} skipped".format(fieldname, _format, datadef, bits, hex(baseaddr+addroffset))
print >> sys.stderr, "SetField(): Readonly '{}' using '{}'/{}{} @{} skipped".format(fieldname, format, datadef, bits, hex(baseaddr+addroffset))
return dobj
# <datadef> contains a list
@ -1569,41 +1619,46 @@ def SetField(dobj, fieldname, fielddef, restore, raw=False, addroffset=0, filena
offset += length
# <format> contains a dict
elif isinstance(_format, dict):
for name in _format: # -> iterate through _format
elif isinstance(format, dict):
for name in format: # -> iterate through format
if name in restore:
dobj = SetField(dobj, name, _format[name], restore[name], raw=raw, addroffset=addroffset, filename=filename)
dobj = SetField(dobj, name, format[name], restore[name], raw=raw, addroffset=addroffset, filename=filename)
# a simple value
elif isinstance(_format, (str, bool, int, float, long)):
elif isinstance(format, (str, bool, int, float, long)):
valid = True
err = "outside range"
err = ""
errformat = ""
_min, _max = GetFieldMinMax(fielddef)
value = _value = valid = None
value = _value = None
skip = False
# simple one value
if _format[-1:] in ['c']:
if format[-1:] in ['c']:
try:
value = ConvertFieldValue(restore.encode(STR_ENCODING)[0], fielddef, read=False, raw=raw)
except:
err = "valid range exceeding"
valid = False
# bool
elif _format[-1:] in ['?']:
elif format[-1:] in ['?']:
try:
value = ConvertFieldValue(bool(restore), fielddef, read=False, raw=raw)
except:
err = "valid range exceeding"
valid = False
# integer
elif _format[-1:] in ['b','B','h','H','i','I','l','L','q','Q','P']:
elif format[-1:] in ['b','B','h','H','i','I','l','L','q','Q','P']:
try:
value = ConvertFieldValue(restore, fielddef, read=False, raw=raw)
if isinstance(value, (str, unicode)):
value = int(value, 0)
else:
value = int(value)
# bits
# bit value
if bits!=0:
value = struct.unpack_from(_format, dobj, baseaddr+addroffset)[0]
value = struct.unpack_from(format, dobj, baseaddr+addroffset)[0]
bitvalue = int(restore)
mask = (1<<bits)-1
if bitvalue>mask:
@ -1611,6 +1666,7 @@ def SetField(dobj, fieldname, fielddef, restore, raw=False, addroffset=0, filena
_max = mask
_value = bitvalue
valid = False
err = "valid bit range exceeding"
else:
if bitshift>=0:
bitvalue <<= bitshift
@ -1620,46 +1676,69 @@ def SetField(dobj, fieldname, fielddef, restore, raw=False, addroffset=0, filena
mask >>= abs(bitshift)
value &= (0xffffffff ^ mask)
value |= bitvalue
# full size values
else:
_value = value
except:
valid = False
err = "valid range exceeding"
# float
elif _format[-1:] in ['f','d']:
elif format[-1:] in ['f','d']:
try:
value = ConvertFieldValue(float(restore), fielddef, read=False, raw=raw)
except:
err = "valid range exceeding"
valid = False
# string
elif _format[-1:] in ['s','p']:
elif format[-1:] in ['s','p']:
try:
value = ConvertFieldValue(restore.encode(STR_ENCODING), fielddef, read=False, raw=raw)
err = "string length exceeding"
if value is not None:
# be aware 0 byte at end of string (str must be < max, not <= max)
_max -= 1
valid = (len(value)>=_min) and (len(value)<=_max)
err = "string exceeds max length"
valid = _min <= len(value) < _max
else:
skip = True
valid = True
except:
valid = False
if value is None:
if value is None and not skip:
# None is an invalid value
valid = False
if valid is None:
valid = (value>=_min) and (value<=_max)
if valid is None and not skip:
# validate against object type size
valid = _min <= value <= _max
if not valid:
err = "type range exceeding"
errformat = " [{smin},{smax}]"
if _value is None:
# copy value before possible change below
_value = value
if isinstance(value, (str, unicode)):
_value = "'{}'".format(_value)
if valid:
if not skip:
if args.debug:
if bits:
sbits=" {} bits shift {}".format(bits, bitshift)
else:
sbits = ""
print >> sys.stderr, "SetField(): Set '{}' using '{}'/{}{} @{} to {}".format(fieldname, _format, datadef, sbits, hex(baseaddr+addroffset), _value)
struct.pack_into(_format, dobj, baseaddr+addroffset, value)
print >> sys.stderr, "SetField(): Set '{}' using '{}'/{}{} @{} to {}".format(fieldname, format, datadef, sbits, hex(baseaddr+addroffset), _value)
if fieldname != 'cfg_crc':
prevvalue = struct.unpack_from(format, dobj, baseaddr+addroffset)[0]
struct.pack_into(format, dobj, baseaddr+addroffset, value)
curvalue = struct.unpack_from(format, dobj, baseaddr+addroffset)[0]
if prevvalue != curvalue and args.verbose:
message("Value for '{}' changed from {} to {}".format(fieldname, prevvalue, curvalue), typ=LogType.INFO)
else:
exit(ExitCode.RESTORE_DATA_ERROR, "file '{sfile}', value for name '{sname}': {svalue} {serror} [{smin},{smax}]".format(sfile=filename, sname=fieldname, serror=err, svalue=_value, smin=_min, smax=_max), typ=LogType.WARNING, doexit=not args.ignorewarning)
sformat = "file '{sfile}' - {{'{sname}': {svalue}}} ({serror})"+errformat
exit(ExitCode.RESTORE_DATA_ERROR, sformat.format(sfile=filename, sname=fieldname, serror=err, svalue=_value, smin=_min, smax=_max), typ=LogType.WARNING, doexit=not args.ignorewarning)
return dobj
@ -1679,23 +1758,26 @@ def Bin2Mapping(decode_cfg, raw=True):
if isinstance(decode_cfg, bytearray):
decode_cfg = str(decode_cfg)
# get binary header and template to use
version, template, size, setting = GetTemplateSetting(decode_cfg)
# get binary header to use
version, size, setting = GetTemplateSetting(decode_cfg)
# if we did not found a mathching setting
if template is None:
if setting is None:
exit(ExitCode.UNSUPPORTED_VERSION, "Tasmota configuration version 0x{:x} not supported".format(version),line=inspect.getlineno(inspect.currentframe()))
if 'version' in setting:
cfg_version = GetField(decode_cfg, 'version', setting['version'], raw=True)
# check size if exists
if 'cfg_size' in setting:
cfg_size = GetField(decode_cfg, 'cfg_size', setting['cfg_size'], raw=True)
# read size should be same as definied in template
if cfg_size > size:
# may be processed
exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read does ot match - read {}, expected {} byte".format(cfg_size, template[1]), typ=LogType.ERROR,line=inspect.getlineno(inspect.currentframe()))
exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read does ot match - read {}, expected {} byte".format(cfg_size, size), typ=LogType.ERROR,line=inspect.getlineno(inspect.currentframe()))
elif cfg_size < size:
# less number of bytes can not be processed
exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read to small to process - read {}, expected {} byte".format(cfg_size, template[1]), typ=LogType.ERROR,line=inspect.getlineno(inspect.currentframe()))
exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read to small to process - read {}, expected {} byte".format(cfg_size, size), typ=LogType.ERROR,line=inspect.getlineno(inspect.currentframe()))
# check crc if exists
if 'cfg_crc' in setting:
@ -1719,15 +1801,13 @@ def Bin2Mapping(decode_cfg, raw=True):
'jsonrawkeys': args.jsonrawkeys,
'jsonhidepw': args.jsonhidepw,
},
'src': {
'crc': hex(cfg_crc),
'size': cfg_size,
'template': {
'version': hex(version),
'crc': hex(cfg_crc),
},
'data': {
'crc': hex(GetSettingsCrc(decode_cfg)),
'size': len(decode_cfg),
'version': hex(template[0]),
},
'script': {
'name': os.path.basename(__file__),
@ -1736,6 +1816,10 @@ def Bin2Mapping(decode_cfg, raw=True):
'os': (platform.machine(), platform.system(), platform.release(), platform.version(), platform.platform()),
'python': platform.python_version(),
}
if 'cfg_crc' in setting:
config['header']['template'].update({'size': cfg_size})
if 'version' in setting:
config['header']['data'].update({'version': hex(cfg_version)})
return config
@ -1759,12 +1843,12 @@ def Mapping2Bin(decode_cfg, jsonconfig, filename=""):
# get binary header data to use the correct version template from device
version, template, size, setting = GetTemplateSetting(decode_cfg)
version, size, setting = GetTemplateSetting(decode_cfg)
_buffer = bytearray()
_buffer.extend(decode_cfg)
if template is not None:
if setting is not None:
try:
raw = jsonconfig['header']['format']['jsonrawvalues']
except:
@ -1828,6 +1912,8 @@ def Backup(backupfile, backupfileformat, encode_cfg, decode_cfg, configuration):
if backupfileformat.lower() == FileType.BIN.lower():
fileformat = "binary"
backup_filename = MakeFilename(backupfile, FileType.BIN, configuration)
if args.verbose:
message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), typ=LogType.INFO)
try:
backupfp = open(backup_filename, "wb")
magic = BINARYFILE_MAGIC
@ -1842,6 +1928,8 @@ def Backup(backupfile, backupfileformat, encode_cfg, decode_cfg, configuration):
if backupfileformat.lower() == FileType.DMP.lower():
fileformat = "Tasmota"
backup_filename = MakeFilename(backupfile, FileType.DMP, configuration)
if args.verbose:
message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), typ=LogType.INFO)
try:
backupfp = open(backup_filename, "wb")
backupfp.write(encode_cfg)
@ -1854,6 +1942,8 @@ def Backup(backupfile, backupfileformat, encode_cfg, decode_cfg, configuration):
elif backupfileformat.lower() == FileType.JSON.lower():
fileformat = "JSON"
backup_filename = MakeFilename(backupfile, FileType.JSON, configuration)
if args.verbose:
message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), typ=LogType.INFO)
try:
backupfp = open(backup_filename, "w")
json.dump(configuration, backupfp, sort_keys=args.jsonsort, indent=None if args.jsonindent<0 else args.jsonindent, separators=(',', ':') if args.jsoncompact else (', ', ': ') )
@ -1867,7 +1957,7 @@ def Backup(backupfile, backupfileformat, encode_cfg, decode_cfg, configuration):
if args.tasmotafile is not None:
srctype = 'file'
src = args.tasmotafile
message("Backup successful from {} '{}' using {} format to file '{}' ".format(srctype, src, fileformat, backup_filename), typ=LogType.INFO)
message("Backup successful from {} '{}' to file '{}' ({} format)".format(srctype, src, backup_filename, fileformat), typ=LogType.INFO)
def Restore(restorefile, encode_cfg, decode_cfg, configuration):
@ -1888,6 +1978,8 @@ def Restore(restorefile, encode_cfg, decode_cfg, configuration):
filetype = GetFileType(restorefilename)
if filetype == FileType.DMP:
if args.verbose:
message("Reading restore file '{}' (Tasmota format)".format(restorefilename), typ=LogType.INFO)
try:
restorefp = open(restorefilename, "rb")
new_encode_cfg = restorefp.read()
@ -1896,6 +1988,8 @@ def Restore(restorefile, encode_cfg, decode_cfg, configuration):
exit(e[0], "'{}' {}".format(restorefilename, e[1]),line=inspect.getlineno(inspect.currentframe()))
elif filetype == FileType.BIN:
if args.verbose:
message("Reading restore file '{}' (binary format)".format(restorefilename), typ=LogType.INFO)
try:
restorefp = open(restorefilename, "rb")
restorebin = restorefp.read()
@ -1908,6 +2002,8 @@ def Restore(restorefile, encode_cfg, decode_cfg, configuration):
new_encode_cfg = DecryptEncrypt(decode_cfg) # process binary to binary config
elif filetype == FileType.JSON or filetype == FileType.INVALID_JSON:
if args.verbose:
message("Reading restore file '{}' (JSON format)".format(restorefilename), typ=LogType.INFO)
try:
restorefp = open(restorefilename, "r")
jsonconfig = json.load(restorefp)
@ -1932,6 +2028,8 @@ def Restore(restorefile, encode_cfg, decode_cfg, configuration):
if new_encode_cfg != encode_cfg or args.ignorewarning:
# write config direct to device via http
if args.device is not None:
if args.verbose:
message("Push new data to '{}' using restore file '{}'".format(args.device, restorefilename), typ=LogType.INFO)
error_code, error_str = PushTasmotaConfig(new_encode_cfg, args.device, args.port, args.username, args.password)
if error_code:
exit(ExitCode.UPLOAD_CONFIG_ERROR, "Config data upload failed - {}".format(error_str),line=inspect.getlineno(inspect.currentframe()))
@ -1941,6 +2039,8 @@ def Restore(restorefile, encode_cfg, decode_cfg, configuration):
# write config from a file
elif args.tasmotafile is not None:
if args.verbose:
message("Write new data to file '{}' using restore file '{}'".format(args.tasmotafile, restorefilename), typ=LogType.INFO)
try:
outputfile = open(args.tasmotafile, "wb")
outputfile.write(new_encode_cfg)
@ -1955,7 +2055,7 @@ def Restore(restorefile, encode_cfg, decode_cfg, configuration):
global exitcode
exitcode = ExitCode.RESTORE_SKIPPED
if args.verbose:
exit(exitcode, "Configuration data unchanged, upload skipped", typ=LogType.WARNING)
message("Configuration data leaving unchanged", typ=LogType.INFO)
def ParseArgs():
@ -2132,15 +2232,25 @@ if __name__ == "__main__":
if args.shorthelp:
ShortHelp()
# default no configuration available
encode_cfg = None
# check source args
if args.device is not None and args.tasmotafile is not None:
exit(ExitCode.ARGUMENT_ERROR, "Unable to select source, do not use -d and -f together",line=inspect.getlineno(inspect.currentframe()))
# pull config from Tasmota device/file
encode_cfg = PullTasmotaConfig()
# default no configuration available
encode_cfg = None
# pull config from Tasmota device
if args.tasmotafile is not None:
if args.verbose:
message("Load data from file '{}'".format(args.tasmotafile), typ=LogType.INFO)
encode_cfg = LoadTasmotaConfig(args.tasmotafile)
# load config from Tasmota file
if args.device is not None:
if args.verbose:
message("Load data from device '{}'".format(args.device), typ=LogType.INFO)
encode_cfg = PullTasmotaConfig(args.device, args.port, username=args.username, password=args.password)
if encode_cfg is None:
# no config source given
ShortHelp(False)
@ -2157,6 +2267,11 @@ if __name__ == "__main__":
# decode into mappings dictionary
configuration = Bin2Mapping(decode_cfg, args.jsonrawvalues)
if args.verbose and 'version' in configuration:
if args.tasmotafile is not None:
message("File '{}' contains data for Tasmota v{}".format(args.tasmotafile, GetVersionStr(configuration['version'])),typ=LogType.INFO)
else:
message("Device '{}' runs Tasmota v{}".format(args.device,GetVersionStr(configuration['version'])),typ=LogType.INFO)
# backup to file
if args.backupfile is not None: