#!/usr/bin/env python # -*- coding: utf-8 -*- VER = '2.3.0036' """ decode-config.py - Backup/Restore Tasmota configuration data Copyright (C) 2019 Norbert Richter This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . Requirements: - Python - pip install json pycurl urllib2 configargparse Instructions: Execute command with option -d to retrieve config data from a host or use -f to read a configuration file saved using Tasmota Web-UI For further information read 'decode-config.md' For help execute command with argument -h (or -H for advanced help) Usage: decode-config.py [-f ] [-d ] [-P ] [-u ] [-p ] [-i ] [-o ] [-t json|bin|dmp] [-E] [-e] [-F] [--json-indent ] [--json-compact] [--json-hide-pw] [--json-show-pw] [--cmnd-indent ] [--cmnd-groups] [--cmnd-nogroups] [--cmnd-sort] [--cmnd-unsort] [-c ] [-S] [-T json|cmnd|command] [-g {Control,Devices,Display,Domoticz,Internal,Knx,Light,Management,Mqtt,Power,Rules,Sensor,Serial,Setoption,Shutter,Sonoffrf,System,Timer,Wifi} [{Control,Devices,Display,Domoticz,Internal,Knx,Light,Management,Mqtt,Power,Rules,Sensor,Serial,Setoption,Shutter,Sonoffrf,System,Timer,Wifi} ...]] [--ignore-warnings] [-h] [-H] [-v] [-V] Backup/Restore Tasmota configuration data. Args that start with '--' (eg. -f) can also be set in a config file (specified via -c). Config file syntax allows: key=value, flag=true, stuff=[a,b,c] (for details, see syntax at https://goo.gl/R74nmi). If an arg is specified in more than one place, then commandline values override config file values which override defaults. Source: Read/Write Tasmota configuration from/to -f, --file, --tasmota-file file to retrieve/write Tasmota configuration from/to (default: None)' -d, --device, --host hostname or IP address to retrieve/send Tasmota configuration from/to (default: None) -P, --port TCP/IP port number to use for the host connection (default: 80) -u, --username host HTTP access username (default: admin) -p, --password host HTTP access password (default: None) Backup/Restore: Backup & restore specification -i, --restore-file file to restore configuration from (default: None). Replacements: @v=firmware version from config, @f=device friendly name from config, @h=device hostname from config, @H=device hostname from device (-d arg only) -o, --backup-file file to backup configuration to (default: None). Replacements: @v=firmware version from config, @f=device friendly name from config, @h=device hostname from config, @H=device hostname from device (-d arg only) -t, --backup-type json|bin|dmp backup filetype (default: 'json') -E, --extension append filetype extension for -i and -o filename (default) -e, --no-extension do not append filetype extension, use -i and -o filename as passed -F, --force-restore force restore even configuration is identical JSON output: JSON format specification --json-indent pretty-printed JSON output using indent level (default: 'None'). -1 disables indent. --json-compact compact JSON output by eliminate whitespace --json-hide-pw hide passwords --json-show-pw, --json-unhide-pw unhide passwords (default) Tasmota command output: Tasmota command output format specification --cmnd-indent Tasmota command grouping indent level (default: '2'). 0 disables indent --cmnd-groups group Tasmota commands (default) --cmnd-nogroups leave Tasmota commands ungrouped --cmnd-sort sort Tasmota commands (default) --cmnd-unsort leave Tasmota commands unsorted Common: Optional arguments -c, --config program config file - can be used to set default command args (default: None) -S, --output display output regardsless of backup/restore usage (default do not output on backup or restore usage) -T, --output-format json|cmnd|command display output format (default: 'json') -g, --group {Control,Devices,Display,Domoticz,Internal,Knx,Light,Management,Mqtt,Power,Rules,Sensor,Serial,Setoption,Shutter,Sonoffrf,System,Timer,Wifi} limit data processing to command groups (default no filter) --ignore-warnings do not exit on warnings. Not recommended, used by your own responsibility! Info: Extra information -h, --help show usage help message and exit -H, --full-help show full help message and exit -v, --verbose produce more output about what the program does -V, --version show program's version number and exit Either argument -d or -f must be given. Returns: 0: successful 1: restore skipped 2: program argument error 3: file not found 4: data size mismatch 5: data CRC error 6: unsupported configuration version 7: configuration file read error 8: JSON file decoding error 9: Restore file data error 10: Device data download error 11: Device data upload error 20: python module missing 21: Internal error >21: python library exit code 4xx, 5xx: HTTP errors """ class ExitCode: OK = 0 RESTORE_SKIPPED = 1 ARGUMENT_ERROR = 2 FILE_NOT_FOUND = 3 DATA_SIZE_MISMATCH = 4 DATA_CRC_ERROR = 5 UNSUPPORTED_VERSION = 6 FILE_READ_ERROR = 7 JSON_READ_ERROR = 8 RESTORE_DATA_ERROR = 9 DOWNLOAD_CONFIG_ERROR = 10 UPLOAD_CONFIG_ERROR = 11 MODULE_NOT_FOUND = 20 INTERNAL_ERROR = 21 # ====================================================================== # imports # ====================================================================== import os.path import io import sys, platform def ModuleImportError(module): er = str(module) print >> sys.stderr, "{}. Try 'pip install {}' to install it".format(er,er.split(' ')[len(er.split(' '))-1]) sys.exit(ExitCode.MODULE_NOT_FOUND) try: from datetime import datetime import time import copy import struct import socket import re import math import inspect import json import configargparse import pycurl import urllib2 except ImportError, e: ModuleImportError(e) # ====================================================================== # globals # ====================================================================== PROG='{} v{} by Norbert Richter '.format(os.path.basename(sys.argv[0]),VER) CONFIG_FILE_XOR = 0x5A BINARYFILE_MAGIC = 0x63576223 STR_ENCODING = 'utf8' HIDDEN_PASSWORD = '********' INTERNAL = 'Internal' DEFAULTS = { 'source': { 'device': None, 'port': 80, 'username': 'admin', 'password': None, 'tasmotafile': None, }, 'backup': { 'restorefile': None, 'backupfile': None, 'backupfileformat': 'json', 'extension': True, 'forcerestore': False, }, 'jsonformat': { 'jsonindent': None, 'jsoncompact': False, 'jsonsort': True, 'jsonhidepw': False, }, 'cmndformat': { 'cmndindent': 2, 'cmndgroup': True, 'cmndsort': True, }, 'common': { 'output': False, 'outputformat': 'json', 'configfile': None, 'ignorewarning':False, 'filter': None, }, } args = {} exitcode = 0 # ====================================================================== # Settings mapping # ====================================================================== """ Settings dictionary describes the config file fields definition: = { : } : "string" a python valid dictionary key (string) : ( , , [,] ) a tuple containing the following items: : | data type & format definition : defines the use of data at format is defined in 'struct module format string' see https://docs.python.org/2.7/library/struct.html#format-strings : A dictionary describes a (sub)setting dictonary and can recursively define another : | (, , ) address definition : The address (starting from 0) within binary config data. : number of bits used (positive integer) : bit shift : >= 0: shift the result right < 0: shift the result left : | (, [,cmd]) data definition : None | | [] | [ ,...] None: Single value, not an array : [] Defines a one-dimensional array of size [ ,...] Defines a one- or multi-dimensional array : value validation function : (, ) Tasmota command definition : command group string : | (,...) convert data into Tasmota command function : | (, ) read/write converter : None | Will be used in Bin2Mapping to convert values read from the binary data object into mapping dictionary None None indicates not read conversion to convert value from binary object to JSON. : None | False | Will be used in Mapping2Bin to convert values read from mapping dictionary before write to binary data object None None indicates not write conversion False False indicates the value is readonly and will not be written into the binary object. to convert value from JSON back to binary object Common definitions : | | None function to be called or string to evaluate: : A function name will be called with one or two parameter: The value to be processed (optional) the current array index (1,n) A string will be evaluate as is. The following placeholder can be used to replace it by runtime values: '$': will be replaced by the mapping name value '#': will be replace by array index (if any) '@': can be used as reference to other mapping values see definition below for examples : 'string' | "string" characters enclosed in ' or " : integer numbers in the range -2147483648 through 2147483647 : unsigned integer numbers in the range 0 through 4294967295 """ # ---------------------------------------------------------------------- # Settings helper # ---------------------------------------------------------------------- def passwordread(value): return HIDDEN_PASSWORD if args.jsonhidepw else value def passwordwrite(value): return None if value == HIDDEN_PASSWORD else value def bitsRead(x, n=0, c=1): """ Reads bit(s) of a number @param x: the number from which to read @param n: which bit position to read @param c: how many bits to read (1 if omitted) @return: the bit value(s) """ if isinstance(x,str): x = int(x, 0) if isinstance(x,str): n = int(n, 0) if n >= 0: x >>= n else: x <<= abs(n) if c>0: x &= (1<, , [,] 'cfg_holder': ('0 and bitsRead($,0,11)>(12*60) else "",time=time.strftime("%H:%M",time.gmtime((bitsRead($,0,11) if bitsRead($,29,2)==0 else bitsRead($,0,11) if bitsRead($,0,11)<=(12*60) else bitsRead($,0,11)-(12*60))*60)),window=bitsRead($,11,4),repeat=bitsRead($,15),days="{:07b}".format(bitsRead($,16,7))[::-1],device=bitsRead($,23,4)+1,power=bitsRead($,27,2) )')), ('"0x{:08x}".format($)', False) ), 'time': ('> sys.stderr, '{styp}{sdelimiter}{sstatus}{slineno}{scolon}{smgs}'.format(\ styp=type_ if type_ is not None else '', sdelimiter=' ' if status is not None and status > 0 and type_ is not None else '', sstatus=status if status is not None and status > 0 else '', scolon=': ' if type_ is not None or line is not None else '', smgs=msg, slineno=' (@{:04d})'.format(line) if line is not None else '') def exit(status=0, msg="end", type_=LogType.ERROR, src=None, doexit=True, line=None): """ Called when the program should be exit @param status: the exit status program returns to callert @param msg: the msg logged before exit @param type_: msg type: 'INFO', 'WARNING' or 'ERROR' @param doexit: True to exit program, otherwise return """ if src is not None: msg = '{} ({})'.format(src, msg) message(msg, type_=type_ if status!=ExitCode.OK else LogType.INFO, status=status, line=line) exitcode = status if doexit: sys.exit(exitcode) def ShortHelp(doexit=True): """ Show short help (usage) only - ued by own -h handling @param doexit: sys.exit with OK if True """ print parser.description print parser.print_usage() print print "For advanced help use '{prog} -H' or '{prog} --full-help'".format(prog=os.path.basename(sys.argv[0])) if doexit: sys.exit(ExitCode.OK) class HTTPHeader: """ pycurl helper class retrieving the request header """ def __init__(self): self.contents = '' def clear(self): self.contents = '' def store(self, _buffer): self.contents = "{}{}".format(self.contents, _buffer) def response(self): header = str(self.contents).split('\n') if len(header) > 0: return header[0].rstrip() return '' def contenttype(self): for item in str(self.contents).split('\n'): ditem = item.split(":") if ditem[0].strip().lower() == 'content-type' and len(ditem) > 1: return ditem[1].strip() return '' def __str__(self): return self.contents class CustomHelpFormatter(configargparse.HelpFormatter): """ Class for customizing the help output """ def _format_action_invocation(self, action): """ Reformat multiple metavar output -d , --device , --host to single output -d, --device, --host """ orgstr = configargparse.HelpFormatter._format_action_invocation(self, action) if orgstr and orgstr[0] != '-': # only optional arguments return orgstr res = getattr(action, '_formatted_action_invocation', None) if res: return res options = orgstr.split(', ') if len(options) <= 1: action._formatted_action_invocation = orgstr return orgstr return_list = [] for option in options: meta = "" arg = option.split(' ') if len(arg) > 1: meta = arg[1] return_list.append(arg[0]) if len(meta) > 0 and len(return_list) > 0: return_list[len(return_list)-1] += " "+meta action._formatted_action_invocation = ', '.join(return_list) return action._formatted_action_invocation # ====================================================================== # Tasmota config data handling # ====================================================================== def GetTemplateSizes(): """ Get all possible template sizes as list @param version: version number from read binary data to search for @return: template sizes as list [] """ sizes = [] for cfg in Settings: sizes.append(cfg[1]) # return unique sizes only (remove duplicates) return list(set(sizes)) def GetTemplateSetting(decode_cfg): """ Search for version, size and settings to be used depending on given binary config data @param decode_cfg: binary config data (decrypted) @return: version, size, settings to use; None if version is invalid """ version = 0x0 size = setting = None version = GetField(decode_cfg, 'version', Setting_6_2_1['version'], raw=True) # search setting definition top-down for cfg in sorted(Settings, key=lambda s: s[0], reverse=True): if version >= cfg[0]: size = cfg[1] setting = cfg[2] break return version, size, setting def GetGroupList(setting): """ Get all avilable group definition from setting @return: configargparse.parse_args() result """ groups = set() for name in setting: dev = setting[name] format_, group = GetFieldDef(dev, fields="format_, group") if group is not None and len(group) > 0: groups.add(group.title()) if isinstance(format_, dict): subgroups = GetGroupList(format_) if subgroups is not None and len(subgroups) > 0: for group in subgroups: groups.add(group.title()) groups=list(groups) groups.sort() return groups class FileType: FILE_NOT_FOUND = None DMP = 'dmp' JSON = 'json' BIN = 'bin' UNKNOWN = 'unknown' INCOMPLETE_JSON = 'incomplete json' INVALID_JSON = 'invalid json' INVALID_BIN = 'invalid bin' def GetFileType(filename): """ Get the FileType class member of a given filename @param filename: filename of the file to analyse @return: FileType class member """ filetype = FileType.UNKNOWN # try filename try: isfile = os.path.isfile(filename) try: with open(filename, "r") as f: try: # try reading as json inputjson = json.load(f) if 'header' in inputjson: filetype = FileType.JSON else: filetype = FileType.INCOMPLETE_JSON except ValueError: filetype = FileType.INVALID_JSON # not a valid json, get filesize and compare it with all possible sizes try: size = os.path.getsize(filename) except: filetype = FileType.UNKNOWN sizes = GetTemplateSizes() # size is one of a dmp file size if size in sizes: filetype = FileType.DMP elif (size - ((len(hex(BINARYFILE_MAGIC))-2)/2)) in sizes: # check if the binary file has the magic header with open(filename, "rb") as inputfile: inputbin = inputfile.read() if struct.unpack_from('>24) & 0xff) minor = ((version>>16) & 0xff) release = ((version>> 8) & 0xff) subrelease = (version & 0xff) if major >= 6: if subrelease > 0: subreleasestr = str(subrelease) else: subreleasestr = '' else: if subrelease > 0: subreleasestr = str(chr(subrelease+ord('a')-1)) else: subreleasestr = '' return "{:d}.{:d}.{:d}{}{}".format( major, minor, release, '.' if (major >= 6 and subreleasestr != '') else '', subreleasestr) def MakeFilename(filename, filetype, configmapping): """ Replace variables within a filename @param filename: original filename possible containing replacements: @v: Tasmota version from config data @f: friendlyname from config data @h: hostname from config data @H: hostname from device (-d arg only) @param filetype: FileType.x object - creates extension if not None @param configmapping: binary config data (decrypted) @return: New filename with replacements """ config_version = config_friendlyname = config_hostname = device_hostname = '' if 'version' in configmapping: config_version = GetVersionStr( int(str(configmapping['version']), 0) ) if 'friendlyname' in configmapping: config_friendlyname = re.sub('[^0-9a-zA-Z]','_', configmapping['friendlyname'][0]) if 'hostname' in configmapping: if configmapping['hostname'].find('%') < 0: config_hostname = re.sub('[^0-9a-zA-Z]','_', configmapping['hostname']) if filename.find('@H') >= 0 and args.device is not None: device_hostname = GetTasmotaHostname(args.device, args.port, username=args.username, password=args.password) if device_hostname is None: device_hostname = '' dirname = basename = ext = '' # split file parts dirname = os.path.normpath(os.path.dirname(filename)) basename = os.path.basename(filename) name, ext = os.path.splitext(basename) # make a valid filename try: name = name.decode('unicode-escape').translate(dict((ord(char), None) for char in '\/*?:"<>|')) except: pass name = str(name.replace(' ','_')) # append extension based on filetype if not given if len(ext) and ext[0]=='.': ext = ext[1:] if filetype is not None and args.extension and (len(ext)<2 or all(c.isdigit() for c in ext)): ext = filetype.lower() # join filename + extension if len(ext): name_ext = name+'.'+ext else: name_ext = name # join path and filename try: filename = os.path.join(dirname, name_ext) except: pass filename = filename.replace('@v', config_version) filename = filename.replace('@f', config_friendlyname ) filename = filename.replace('@h', config_hostname ) filename = filename.replace('@H', device_hostname ) return filename def MakeUrl(host, port=80, location=''): """ Create a Tasmota host url @param host: hostname or IP of Tasmota host @param port: port number to use for http connection @param location: http url location @return: Tasmota http url """ return "http://{shost}{sdelimiter}{sport}/{slocation}".format(\ shost=host, sdelimiter=':' if port != 80 else '', sport=port if port != 80 else '', slocation=location ) def LoadTasmotaConfig(filename): """ Load config from Tasmota file @param filename: filename to load @return: binary config data (encrypted) or None on error """ encode_cfg = None # read config from a file if not os.path.isfile(filename): # check file exists exit(ExitCode.FILE_NOT_FOUND, "File '{}' not found".format(filename),line=inspect.getlineno(inspect.currentframe())) try: with open(filename, "rb") as tasmotafile: encode_cfg = tasmotafile.read() except Exception, e: exit(e[0], "'{}' {}".format(filename, e[1]),line=inspect.getlineno(inspect.currentframe())) return encode_cfg def TasmotaGet(cmnd, host, port, username=DEFAULTS['source']['username'], password=None, contenttype = None): """ Tasmota http request @param host: hostname or IP of Tasmota device @param port: http port of Tasmota device @param username: optional username for Tasmota web login @param password optional password for Tasmota web login @return: binary config data (encrypted) or None on error """ body = None # read config direct from device via http c = pycurl.Curl() buffer = io.BytesIO() c.setopt(c.WRITEDATA, buffer) header = HTTPHeader() c.setopt(c.HEADERFUNCTION, header.store) c.setopt(c.FOLLOWLOCATION, True) c.setopt(c.URL, MakeUrl(host, port, cmnd)) if username is not None and password is not None: c.setopt(c.HTTPAUTH, c.HTTPAUTH_BASIC) c.setopt(c.USERPWD, username + ':' + password) c.setopt(c.HTTPGET, True) c.setopt(c.VERBOSE, False) responsecode = 200 try: c.perform() responsecode = c.getinfo(c.RESPONSE_CODE) response = header.response() except Exception, e: exit(e[0], e[1],line=inspect.getlineno(inspect.currentframe())) finally: c.close() if responsecode >= 400: exit(responsecode, 'HTTP result: {}'.format(header.response()),line=inspect.getlineno(inspect.currentframe())) elif contenttype is not None and header.contenttype()!=contenttype: exit(ExitCode.DOWNLOAD_CONFIG_ERROR, "Device did not response properly, may be Tasmota webserver admin mode is disabled (WebServer 2)",line=inspect.getlineno(inspect.currentframe())) try: body = buffer.getvalue() except: pass return responsecode, body def GetTasmotaHostname(host, port, username=DEFAULTS['source']['username'], password=None): """ Get Tasmota hostname from device @param host: hostname or IP of Tasmota device @param port: http port of Tasmota device @param username: optional username for Tasmota web login @param password optional password for Tasmota web login @return: Tasmota real hostname or None on error """ hostname = None loginstr = "" if password is not None: loginstr = "user={}&password={}&".format(urllib2.quote(username), urllib2.quote(password)) # get hostname responsecode, body = TasmotaGet("cm?{}cmnd=status%205".format(loginstr), host, port, username=username, password=password) if body is not None: jsonbody = json.loads(body) if "StatusNET" in jsonbody and "Hostname" in jsonbody["StatusNET"]: hostname = jsonbody["StatusNET"]["Hostname"] if args.verbose: message("Hostname for '{}' retrieved: '{}'".format(host, hostname), type_=LogType.INFO) return hostname def PullTasmotaConfig(host, port, username=DEFAULTS['source']['username'], password=None): """ Pull config from Tasmota device @param host: hostname or IP of Tasmota device @param port: http port of Tasmota device @param username: optional username for Tasmota web login @param password optional password for Tasmota web login @return: binary config data (encrypted) or None on error """ responsecode, body = TasmotaGet('dl', host, port, username, password, contenttype='application/octet-stream') return body def PushTasmotaConfig(encode_cfg, host, port, username=DEFAULTS['source']['username'], password=None): """ Upload binary data to a Tasmota host using http @param encode_cfg: encrypted binary data or filename containing Tasmota encrypted binary config @param host: hostname or IP of Tasmota device @param port: http port of Tasmota device @param username: optional username for Tasmota web login @param password optional password for Tasmota web login @return errorcode, errorstring errorcode=0 if success, otherwise http response or exception code """ if isinstance(encode_cfg, bytearray): encode_cfg = str(encode_cfg) # get restore config page first to set internal Tasmota vars responsecode, body = TasmotaGet('rs?', host, port, username, password, contenttype='text/html') if body is None: return responsecode, "ERROR" # post data c = pycurl.Curl() header = HTTPHeader() buffer_ = io.BytesIO() c.setopt(c.HEADERFUNCTION, header.store) c.setopt(c.WRITEFUNCTION, lambda x: None) c.setopt(c.WRITEDATA, buffer_) c.setopt(c.POST, 1) c.setopt(c.URL, MakeUrl(host, port, 'u2')) if username is not None and password is not None: c.setopt(c.HTTPAUTH, c.HTTPAUTH_BASIC) c.setopt(c.USERPWD, username + ':' + password) try: isfile = os.path.isfile(encode_cfg) except: isfile = False if isfile: c.setopt(c.HTTPPOST, [("file", (c.FORM_FILE, encode_cfg))]) else: # use as binary data c.setopt(c.HTTPPOST, [ ('fileupload', ( c.FORM_BUFFER, '{sprog}_v{sver}.dmp'.format(sprog=os.path.basename(sys.argv[0]), sver=VER), c.FORM_BUFFERPTR, encode_cfg )), ]) responsecode = 200 try: c.perform() responsecode = c.getinfo(c.RESPONSE_CODE) except Exception, e: return e[0], e[1] c.close() if responsecode >= 400: return responsecode, header.response() elif header.contenttype() != 'text/html': return ExitCode.UPLOAD_CONFIG_ERROR, "Device did not response properly, may be Tasmota webserver admin mode is disabled (WebServer 2)" body = buffer_.getvalue() findUpload = body.find("Upload") if findUpload < 0: return ExitCode.UPLOAD_CONFIG_ERROR, "Device did not response properly with upload result page" body = body[findUpload:] findSuccessful = body.find("Successful") if findSuccessful < 0: errmatch = re.search("(\S*)

(.*)
", body) reason = "Unknown error" if errmatch and len(errmatch.groups()) > 1: reason = errmatch.group(2) return ExitCode.UPLOAD_CONFIG_ERROR, reason return 0, 'OK' def DecryptEncrypt(obj): """ Decrpt/Encrypt binary config data @param obj: binary config data @return: decrypted configuration (if obj contains encrypted data) """ if isinstance(obj, bytearray): obj = str(obj) dobj = obj[0:2] for i in range(2, len(obj)): dobj += chr( (ord(obj[i]) ^ (CONFIG_FILE_XOR +i)) & 0xff ) return dobj def GetSettingsCrc(dobj): """ Return binary config data calclulated crc @param dobj: decrypted binary config data @return: 2 byte unsigned integer crc value """ if isinstance(dobj, bytearray): dobj = str(dobj) version, size, setting = GetTemplateSetting(dobj) if version < 0x06060007 or version > 0x0606000A: size = 3584 crc = 0 for i in range(0, size): if not i in [14,15]: # Skip crc byte = ord(dobj[i]) crc += byte * (i+1) return crc & 0xffff def GetSettingsCrc32(dobj): """ Return binary config data calclulated crc32 @param dobj: decrypted binary config data @return: 4 byte unsigned integer crc value """ if isinstance(dobj, bytearray): dobj = str(dobj) crc = 0 for i in range(0, len(dobj)-4): crc ^= ord(dobj[i]) for j in range(0, 8): crc = (crc >> 1) ^ (-int(crc & 1) & 0xEDB88320); return ~crc & 0xffffffff def GetFieldDef(fielddef, fields="format_, addrdef, baseaddr, bits, bitshift, datadef, arraydef, validate, cmd, group, tasmotacmnd, converter, readconverter, writeconverter"): """ Get field definition items @param fielddef: field format - see "Settings dictionary" above @param fields: comma separated string list of values to be returned possible values see fields default @return: set of values defined in """ format_ = addrdef = baseaddr = datadef = arraydef = validate = cmd = group = tasmotacmnd = converter = readconverter = writeconverter = None bits = bitshift = 0 # calling with nothing is wrong if fielddef is None: print >> sys.stderr, ' is None' raise SyntaxError(' error') # get top level items if len(fielddef) == 3: # converter not present format_, addrdef, datadef = fielddef elif len(fielddef) == 4: # converter present format_, addrdef, datadef, converter = fielddef else: print >> sys.stderr, 'wrong {} length ({}) in setting'.format(fielddef, len(fielddef)) raise SyntaxError(' error') # ignore calls with 'root' setting if isinstance(format_, dict) and baseaddr is None and datadef is None: return eval(fields) if not isinstance(format_, (unicode,str,dict)): print >> sys.stderr, 'wrong {} type {} in {}'.format(format_, type(format_), fielddef) raise SyntaxError(' error') # extract addrdef items baseaddr = addrdef if isinstance(baseaddr, (list,tuple)): if len(baseaddr) == 3: # baseaddr bit definition baseaddr, bits, bitshift = baseaddr if not isinstance(bits, int): print >> sys.stderr, ' must be defined as integer in {}'.format(bits, fielddef) raise SyntaxError(' error') if not isinstance(bitshift, int): print >> sys.stderr, ' must be defined as integer in {}'.format(bitshift, fielddef) raise SyntaxError(' error') else: print >> sys.stderr, 'wrong {} length ({}) in {}'.format(addrdef, len(addrdef), fielddef) raise SyntaxError(' error') if not isinstance(baseaddr, int): print >> sys.stderr, ' must be defined as integer in {}'.format(baseaddr, fielddef) raise SyntaxError(' error') # extract datadef items arraydef = datadef if isinstance(datadef, (tuple)): if len(datadef) == 2: # datadef has a validator arraydef, validate = datadef elif len(datadef) == 3: # datadef has a validator and cmd set arraydef, validate, cmd = datadef # cmd must be a tuple with 2 objects if isinstance(cmd, (tuple)) and len(cmd) == 2: group, tasmotacmnd = cmd if group is not None and not isinstance(group, (str, unicode)): print >> sys.stderr, 'wrong {} in {}'.format(group, fielddef) raise SyntaxError(' error') if tasmotacmnd is isinstance(tasmotacmnd, tuple): tasmotacmnds = tasmotacmnd for tasmotacmnd in tasmotacmnds: if tasmotacmnd is not None and not callable(tasmotacmnd) and not isinstance(tasmotacmnd, (str, unicode)): print >> sys.stderr, 'wrong {} in {}'.format(tasmotacmnd, fielddef) raise SyntaxError(' error') else: if tasmotacmnd is not None and not callable(tasmotacmnd) and not isinstance(tasmotacmnd, (str, unicode)): print >> sys.stderr, 'wrong {} in {}'.format(tasmotacmnd, fielddef) raise SyntaxError(' error') else: print >> sys.stderr, 'wrong {} length ({}) in {}'.format(cmd, len(cmd), fielddef) raise SyntaxError(' error') else: print >> sys.stderr, 'wrong {} length ({}) in {}'.format(datadef, len(datadef), fielddef) raise SyntaxError(' error') if validate is not None and (not isinstance(validate, (unicode,str)) and not callable(validate)): print >> sys.stderr, 'wrong {} type {} in {}'.format(validate, type(validate), fielddef) raise SyntaxError(' error') # convert single int into one-dimensional list if isinstance(arraydef, int): arraydef = [arraydef] if arraydef is not None and not isinstance(arraydef, (list)): print >> sys.stderr, 'wrong {} type {} in {}'.format(arraydef, type(arraydef), fielddef) raise SyntaxError(' error') # get read/write converter items readconverter = converter if isinstance(converter, (tuple)): if len(converter) == 2: # converter has read/write converter readconverter, writeconverter = converter if readconverter is not None and not isinstance(readconverter, (str,unicode)) and not callable(readconverter): print >> sys.stderr, 'wrong {} type {} in {}'.format(readconverter, type(readconverter), fielddef) raise SyntaxError(' error') if writeconverter is not None and (not isinstance(writeconverter, (bool,str,unicode)) and not callable(writeconverter)): print >> sys.stderr, 'wrong {} type {} in {}'.format(writeconverter, type(writeconverter), fielddef) raise SyntaxError(' error') else: print >> sys.stderr, 'wrong {} length ({}) in {}'.format(converter, len(converter), fielddef) raise SyntaxError(' error') return eval(fields) def ReadWriteConverter(value, fielddef, read=True, raw=False): """ Convert field value based on field desc @param value: original value @param fielddef field definition - see "Settings dictionary" above @param read use read conversion if True, otherwise use write conversion @param raw return raw values (True) or converted values (False) @return: (un)converted value """ converter, readconverter, writeconverter = GetFieldDef(fielddef, fields='converter, readconverter, writeconverter') # call password functions even if raw value should be processed if read and callable(readconverter) and readconverter == passwordread: raw = False if not read and callable(writeconverter) and writeconverter == passwordwrite: raw = False if not raw and converter is not None: conv = readconverter if read else writeconverter try: if isinstance(conv, str): # evaluate strings return eval(conv.replace('$','value')) elif callable(conv): # use as format function return conv(value) except Exception, e: exit(e[0], e[1], type_=LogType.WARNING, line=inspect.getlineno(inspect.currentframe())) return value def CmndConverter(valuemapping, value, idx, fielddef): """ Convert field value into Tasmota command if available @param valuemapping: data mapping @param value: original value @param fielddef field definition - see "Settings dictionary" above @return: converted value, list of values or None if unable to convert """ converter, readconverter, writeconverter, group, tasmotacmnd = GetFieldDef(fielddef, fields='converter, readconverter, writeconverter, group, tasmotacmnd') result = None if (callable(readconverter) and readconverter == passwordread) or (callable(writeconverter) and writeconverter == passwordwrite): if value == HIDDEN_PASSWORD: return None else: result = value if tasmotacmnd is not None and (callable(tasmotacmnd) or len(tasmotacmnd) > 0): if idx is not None: idx += 1 if isinstance(tasmotacmnd, str): # evaluate strings if idx is not None: evalstr = tasmotacmnd.replace('$','value').replace('#','idx').replace('@','valuemapping') else: evalstr = tasmotacmnd.replace('$','value').replace('@','valuemapping') result = eval(evalstr) elif callable(tasmotacmnd): # use as format function if idx is not None: result = tasmotacmnd(value, idx) else: result = tasmotacmnd(value) return result def ValidateValue(value, fielddef): """ Validate a value if validator is defined in fielddef @param value: original value @param fielddef field definition - see "Settings dictionary" above @return: True if value is valid, False if invalid """ validate = GetFieldDef(fielddef, fields='validate') if value == 0: # can not complete all validate condition # some Tasmota values are not allowed to be 0 on input # even though these values are set to 0 on Tasmota initial. # so we can't validate 0 values return True; valid = True try: if isinstance(validate, str): # evaluate strings valid = eval(validate.replace('$','value')) elif callable(validate): # use as format function valid = validate(value) except: valid = False return valid def GetFormatCount(format_): """ Get format prefix count @param format_: format specifier @return: prefix count or 1 if not specified """ if isinstance(format_, str): match = re.search("\s*(\d+)", format_) if match: return int(match.group(0)) return 1 def GetFormatType(format_): """ Get format type and bitsize without prefix @param format_: format specifier @return: (format_, 0) or (format without prefix, bitsize) """ formattype = format_ bitsize = 0 if isinstance(format_, str): match = re.search("\s*(\D+)", format_) if match: formattype = match.group(0) bitsize = struct.calcsize(formattype) * 8 return formattype, bitsize def GetFieldMinMax(fielddef): """ Get minimum, maximum of field based on field format definition @param fielddef: field format - see "Settings dictionary" above @return: min, max """ minmax = {'c': (0, 0xff), '?': (0, 1), 'b': (~0x7f, 0x7f), 'B': (0, 0xff), 'h': (~0x7fff, 0x7fff), 'H': (0, 0xffff), 'i': (~0x7fffffff, 0x7fffffff), 'I': (0, 0xffffffff), 'l': (~0x7fffffff, 0x7fffffff), 'L': (0, 0xffffffff), 'q': (~0x7fffffffffffffff, 0x7fffffffffffffff), 'Q': (0, 0x7fffffffffffffff), 'f': (sys.float_info.min, sys.float_info.max), 'd': (sys.float_info.min, sys.float_info.max), } format_ = GetFieldDef(fielddef, fields='format_') min_ = 0 max_ = 0 if format_[-1:] in minmax: min_, max_ = minmax[format_[-1:]] max_ *= GetFormatCount(format_) elif format_[-1:] in ['s','p']: # s and p may have a prefix as length max_ = GetFormatCount(format_) return min_,max_ def GetFieldLength(fielddef): """ Get length of a field in bytes based on field format definition @param fielddef: field format - see "Settings dictionary" above @return: length of field in bytes """ length=0 format_, addrdef, arraydef = GetFieldDef(fielddef, fields='format_, addrdef, arraydef') # contains a integer list if isinstance(arraydef, list) and len(arraydef) > 0: # arraydef contains a list # calc size recursive by sum of all elements for i in range(0, arraydef[0]): subfielddef = GetSubfieldDef(fielddef) if len(arraydef) > 1: length += GetFieldLength( (format_, addrdef, subfielddef) ) # single array else: length += GetFieldLength( (format_, addrdef, None) ) elif isinstance(format_, dict): # -> iterate through format addr = None setting = format_ for name in setting: baseaddr, bits, bitshift = GetFieldDef(setting[name], fields='baseaddr, bits, bitshift') _len = GetFieldLength(setting[name]) if addr != baseaddr: addr = baseaddr length += _len # a simple value elif isinstance(format_, str): length = struct.calcsize(format_) return length def GetSubfieldDef(fielddef): """ Get subfield definition from a given field definition @param fielddef: see Settings desc above @return: subfield definition """ format_, addrdef, datadef, arraydef, validate, cmd, converter = GetFieldDef(fielddef, fields='format_, addrdef, datadef, arraydef, validate, cmd, converter') # create new arraydef if len(arraydef) > 1: arraydef = arraydef[1:] else: arraydef = None # create new datadef if isinstance(datadef, tuple): if cmd is not None: datadef = (arraydef, validate, cmd) else: datadef = (arraydef, validate) else: datadef = arraydef # set new field def subfielddef = None if converter is not None: subfielddef = (format_, addrdef, datadef, converter) else: subfielddef = (format_, addrdef, datadef) return subfielddef def IsFilterGroup(group): """ Check if group is valid on filter @param grooup: group name to check @return: True if group is in filter, otherwise False """ if args.filter is not None: if group is None: return False if group == '*': return True if group.title() != INTERNAL.title() and group.title() not in (groupname.title() for groupname in args.filter): return False return True def GetFieldValue(fielddef, dobj, addr): """ Get single field value from definition @param fielddef: see Settings desc @param dobj: decrypted binary config data @param addr addr within dobj @return: value read from dobj """ format_, bits, bitshift = GetFieldDef(fielddef, fields='format_, bits, bitshift') value_ = 0 unpackedvalue = struct.unpack_from(format_, dobj, addr) singletype, bitsize = GetFormatType(format_) if not format_[-1:].lower() in ['s','p']: for val in unpackedvalue: value_ <<= bitsize value_ = value_ + val value_ = bitsRead(value_, bitshift, bits) else: value_ = unpackedvalue[0] s = str(value_).split('\0')[0] # use left string until \0 value_ = unicode(s, errors='ignore') # remove character > 127 return value_ def SetFieldValue(fielddef, dobj, addr, value): """ Set single field value from definition @param fielddef: see Settings desc @param dobj: decrypted binary config data @param addr addr within dobj @param value new value @return: new decrypted binary config data """ format_, bits, bitshift = GetFieldDef(fielddef, fields='format_, bits, bitshift') formatcnt = GetFormatCount(format_) singletype, bitsize = GetFormatType(format_) if args.debug >= 2: print >> sys.stderr, "SetFieldValue(): fielddef {}, addr 0x{:04x} value {} formatcnt {} singletype {} bitsize {} ".format(fielddef,addr,value,formatcnt,singletype,bitsize) if not format_[-1:].lower() in ['s','p']: addr += (bitsize / 8) * formatcnt for _ in range(0, formatcnt): addr -= (bitsize / 8) maxunsigned = ((2**bitsize) - 1) maxsigned = ((2**bitsize)>>1)-1 val = value & maxunsigned if isinstance(value,int) and value < 0 and val > maxsigned: val = ((maxunsigned+1)-val) * (-1) if args.debug >= 3: print >> sys.stderr, "SetFieldValue(): Single type - fielddef {}, addr 0x{:04x} value {} singletype {} bitsize {}".format(fielddef,addr,val,singletype,bitsize) try: struct.pack_into(singletype, dobj, addr, val) except struct.error as e: exit(ExitCode.RESTORE_DATA_ERROR, "Single type {} [fielddef={}, addr=0x{:04x}, value={}] - skipped!".format(e,fielddef,addr,val), type_=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe())) value >>= bitsize else: if args.debug >= 3: print >> sys.stderr, "SetFieldValue(): String type - fielddef {}, addr 0x{:04x} value {} format_ {}".format(fielddef,addr,value,format_) try: struct.pack_into(format_, dobj, addr, value) except struct.error as e: exit(ExitCode.RESTORE_DATA_ERROR, "String type {} [fielddef={}, addr=0x{:04x}, value={}} - skipped!".format(e,fielddef,addr,value), type_=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe())) return dobj def GetField(dobj, fieldname, fielddef, raw=False, addroffset=0): """ Get field value from definition @param dobj: decrypted binary config data @param fieldname: name of the field @param fielddef: see Settings desc above @param raw return raw values (True) or converted values (False) @param addroffset use offset for baseaddr (used for recursive calls) @return: field mapping """ if isinstance(dobj, bytearray): dobj = str(dobj) valuemapping = None # get field definition format_, baseaddr, bits, bitshift, arraydef, group, tasmotacmnd = GetFieldDef(fielddef, fields='format_, baseaddr, bits, bitshift, arraydef, group, tasmotacmnd') # filter groups if not IsFilterGroup(group): return valuemapping # contains a integer list if isinstance(arraydef, list) and len(arraydef) > 0: valuemapping = [] offset = 0 for i in range(0, arraydef[0]): subfielddef = GetSubfieldDef(fielddef) length = GetFieldLength(subfielddef) if length != 0: value = GetField(dobj, fieldname, subfielddef, raw=raw, addroffset=addroffset+offset) valuemapping.append(value) offset += length # contains a dict elif isinstance(format_, dict): mapping_value = {} # -> iterate through format for name in format_: value = None value = GetField(dobj, name, format_[name], raw=raw, addroffset=addroffset) if value is not None: mapping_value[name] = value # copy complete returned mapping valuemapping = copy.deepcopy(mapping_value) # a simple value elif isinstance(format_, (str, bool, int, float, long)): if GetFieldLength(fielddef) != 0: valuemapping = ReadWriteConverter(GetFieldValue(fielddef, dobj, baseaddr+addroffset), fielddef, read=True, raw=raw) else: exit(ExitCode.INTERNAL_ERROR, "Wrong mapping format definition: '{}'".format(format_), type_=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe())) return valuemapping def SetField(dobj, fieldname, fielddef, restore, addroffset=0, filename=""): """ Get field value from definition @param dobj: decrypted binary config data @param fieldname: name of the field @param fielddef: see Settings desc above @param restore restore mapping with the new value(s) @param addroffset use offset for baseaddr (used for recursive calls) @param filename related filename (for messages only) @return: new decrypted binary config data """ format_, baseaddr, bits, bitshift, arraydef, group, writeconverter = GetFieldDef(fielddef, fields='format_, baseaddr, bits, bitshift, arraydef, group, writeconverter') # cast unicode fieldname = str(fieldname) # filter groups if not IsFilterGroup(group): return dobj # do not write readonly values if writeconverter is False: if args.debug >= 2: print >> sys.stderr, "SetField(): Readonly '{}' using '{}'/{}{} @{} skipped".format(fieldname, format_, arraydef, bits, hex(baseaddr+addroffset)) return dobj # contains a list if isinstance(arraydef, list) and len(arraydef) > 0: offset = 0 if len(restore) > arraydef[0]: exit(ExitCode.RESTORE_DATA_ERROR, "file '{sfile}', array '{sname}[{selem}]' exceeds max number of elements [{smax}]".format(sfile=filename, sname=fieldname, selem=len(restore), smax=arraydef[0]), type_=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe())) for i in range(0, arraydef[0]): subfielddef = GetSubfieldDef(fielddef) length = GetFieldLength(subfielddef) if length != 0: if i >= len(restore): # restore data list may be shorter than definition break subrestore = restore[i] dobj = SetField(dobj, fieldname, subfielddef, subrestore, addroffset=addroffset+offset, filename=filename) offset += length # contains a dict elif isinstance(format_, dict): for name in format_: # -> iterate through format if name in restore: dobj = SetField(dobj, name, format_[name], restore[name], addroffset=addroffset, filename=filename) # a simple value elif isinstance(format_, (str, bool, int, float, long)): valid = True err = "" errformat = "" min_, max_ = GetFieldMinMax(fielddef) value = _value = None skip = False # simple char value if format_[-1:] in ['c']: try: value = ReadWriteConverter(restore.encode(STR_ENCODING)[0], fielddef, read=False) except Exception, e: exit(e[0], e[1], type_=LogType.WARNING, line=inspect.getlineno(inspect.currentframe())) valid = False # bool elif format_[-1:] in ['?']: try: value = ReadWriteConverter(bool(restore), fielddef, read=False) except Exception, e: exit(e[0], e[1], type_=LogType.WARNING, line=inspect.getlineno(inspect.currentframe())) valid = False # integer elif format_[-1:] in ['b','B','h','H','i','I','l','L','q','Q','P']: value = ReadWriteConverter(restore, fielddef, read=False) if isinstance(value, (str, unicode)): value = int(value, 0) else: value = int(value) # bits if bits != 0: bitvalue = value value = struct.unpack_from(format_, dobj, baseaddr+addroffset)[0] # validate restore value valid = ValidateValue(bitvalue, fielddef) if not valid: err = "valid bit range exceeding" value = bitvalue else: mask = (1< mask: min_ = 0 max_ = mask _value = bitvalue valid = False else: if bitshift >= 0: bitvalue <<= bitshift mask <<= bitshift else: bitvalue >>= abs(bitshift) mask >>= abs(bitshift) v=value value &= (0xffffffff ^ mask) value |= bitvalue # full size values else: # validate restore function valid = ValidateValue(value, fielddef) if not valid: err = "valid range exceeding" _value = value # float elif format_[-1:] in ['f','d']: try: value = ReadWriteConverter(float(restore), fielddef, read=False) except: valid = False # string elif format_[-1:] in ['s','p']: value = ReadWriteConverter(restore.encode(STR_ENCODING), fielddef, read=False) err = "string length exceeding" if value is not None: max_ -= 1 valid = min_ <= len(value) <= max_ else: skip = True valid = True if value is None and not skip: # None is an invalid value valid = False if valid is None and not skip: # validate against object type size valid = min_ <= value <= max_ if not valid: err = "type range exceeding" errformat = " [{smin},{smax}]" if _value is None: # copy value before possible change below _value = value if isinstance(_value, (str, unicode)): _value = "'{}'".format(_value) if valid: if not skip: if args.debug >= 2: sbits = " {} bits shift {}".format(bits, bitshift) if bits else "" strvalue = "{} [{}]".format(_value, hex(value)) if isinstance(_value, int) else _value print >> sys.stderr, "SetField(): Set '{}' using '{}'/{}{} @{} to {}".format(fieldname, format_, arraydef, sbits, hex(baseaddr+addroffset), strvalue) if fieldname != 'cfg_crc' and fieldname != '_': prevvalue = GetFieldValue(fielddef, dobj, baseaddr+addroffset) dobj = SetFieldValue(fielddef, dobj, baseaddr+addroffset, value) curvalue = GetFieldValue(fielddef, dobj, baseaddr+addroffset) if prevvalue != curvalue and args.verbose: message("Value for '{}' changed from {} to {}".format(fieldname, prevvalue, curvalue), type_=LogType.INFO) else: if args.debug >= 2: print >> sys.stderr, "SetField(): Special field '{}' using '{}'/{}{} @{} skipped".format(fieldname, format_, arraydef, bits, hex(baseaddr+addroffset)) else: sformat = "file '{sfile}' - {{'{sname}': {svalue}}} ({serror})"+errformat exit(ExitCode.RESTORE_DATA_ERROR, sformat.format(sfile=filename, sname=fieldname, serror=err, svalue=_value, smin=min_, smax=max_), type_=LogType.WARNING, doexit=not args.ignorewarning) return dobj def SetCmnd(cmnds, fieldname, fielddef, valuemapping, mappedvalue, addroffset=0, idx=None): """ Get field value from definition @param cmnds: Tasmota command mapping: { 'group': ['cmnd' <,'cmnd'...>] ... } @param fieldname: name of the field @param fielddef: see Settings desc above @param valuemapping: data mapping @param mappedvalue mappedvalue mapping with the new value(s) @param addroffset use offset for baseaddr (used for recursive calls) @param idx optional array index @return: new Tasmota command mapping """ format_, baseaddr, bits, bitshift, arraydef, group, tasmotacmnd, writeconverter = GetFieldDef(fielddef, fields='format_, baseaddr, bits, bitshift, arraydef, group, tasmotacmnd, writeconverter') # cast unicode fieldname = str(fieldname) # filter groups if not IsFilterGroup(group): return cmnds # contains a list if isinstance(arraydef, list) and len(arraydef) > 0: offset = 0 if len(mappedvalue) > arraydef[0]: exit(ExitCode.RESTORE_DATA_ERROR, "array '{sname}[{selem}]' exceeds max number of elements [{smax}]".format(sname=fieldname, selem=len(mappedvalue), smax=arraydef[0]), type_=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe())) for i in range(0, arraydef[0]): subfielddef = GetSubfieldDef(fielddef) length = GetFieldLength(subfielddef) if length != 0: if i >= len(mappedvalue): # mappedvalue data list may be shorter than definition break subrestore = mappedvalue[i] cmnds = SetCmnd(cmnds, fieldname, subfielddef, valuemapping, subrestore, addroffset=addroffset+offset, idx=i) offset += length # contains a dict elif isinstance(format_, dict): for name in format_: # -> iterate through format if name in mappedvalue: cmnds = SetCmnd(cmnds, name, format_[name], valuemapping, mappedvalue[name], addroffset=addroffset, idx=idx) # a simple value elif isinstance(format_, (str, bool, int, float, long)): if group is not None: group = group.title(); if isinstance(tasmotacmnd, tuple): tasmotacmnds = tasmotacmnd for tasmotacmnd in tasmotacmnds: cmnd = CmndConverter(valuemapping, mappedvalue, idx, fielddef) if group is not None and cmnd is not None: if group not in cmnds: cmnds[group] = [] if isinstance(cmnd, list): for c in cmnd: cmnds[group].append(c) else: cmnds[group].append(cmnd) else: cmnd = CmndConverter(valuemapping, mappedvalue, idx, fielddef) if group is not None and cmnd is not None: if group not in cmnds: cmnds[group] = [] if isinstance(cmnd, list): for c in cmnd: cmnds[group].append(c) else: cmnds[group].append(cmnd) return cmnds def Bin2Mapping(decode_cfg): """ Decodes binary data stream into pyhton mappings dict @param decode_cfg: binary config data (decrypted) @return: valuemapping data as mapping dictionary """ if isinstance(decode_cfg, bytearray): decode_cfg = str(decode_cfg) # get binary header and template to use version, size, setting = GetTemplateSetting(decode_cfg) # if we did not found a mathching setting if setting is None: exit(ExitCode.UNSUPPORTED_VERSION, "Tasmota configuration version {} not supported".format(version),line=inspect.getlineno(inspect.currentframe())) if 'version' in setting: cfg_version = GetField(decode_cfg, 'version', setting['version'], raw=True) # check size if exists if 'cfg_size' in setting: cfg_size = GetField(decode_cfg, 'cfg_size', setting['cfg_size'], raw=True) # read size should be same as definied in setting if cfg_size > size: # may be processed exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read does ot match - read {}, expected {} byte".format(cfg_size, size), type_=LogType.ERROR,line=inspect.getlineno(inspect.currentframe())) elif cfg_size < size: # less number of bytes can not be processed exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read to small to process - read {}, expected {} byte".format(cfg_size, size), type_=LogType.ERROR,line=inspect.getlineno(inspect.currentframe())) # check crc if exists if 'cfg_crc' in setting: cfg_crc = GetField(decode_cfg, 'cfg_crc', setting['cfg_crc'], raw=True) else: cfg_crc = GetSettingsCrc(decode_cfg) if 'cfg_crc32' in setting: cfg_crc32 = GetField(decode_cfg, 'cfg_crc32', setting['cfg_crc32'], raw=True) else: cfg_crc32 = GetSettingsCrc32(decode_cfg) if version < 0x0606000B: if cfg_crc != GetSettingsCrc(decode_cfg): exit(ExitCode.DATA_CRC_ERROR, 'Data CRC error, read 0x{:4x} should be 0x{:4x}'.format(cfg_crc, GetSettingsCrc(decode_cfg)), type_=LogType.WARNING, doexit=not args.ignorewarning,line=inspect.getlineno(inspect.currentframe())) else: if cfg_crc32 != GetSettingsCrc32(decode_cfg): exit(ExitCode.DATA_CRC_ERROR, 'Data CRC32 error, read 0x{:8x} should be 0x{:8x}'.format(cfg_crc32, GetSettingsCrc32(decode_cfg)), type_=LogType.WARNING, doexit=not args.ignorewarning,line=inspect.getlineno(inspect.currentframe())) # get valuemapping valuemapping = GetField(decode_cfg, None, (setting,0,(None, None, (INTERNAL, None)))) # add header info timestamp = datetime.now() valuemapping['header'] = { 'timestamp':timestamp.strftime("%Y-%m-%d %H:%M:%S"), 'format': { 'jsonindent': args.jsonindent, 'jsoncompact': args.jsoncompact, 'jsonsort': args.jsonsort, 'jsonhidepw': args.jsonhidepw, }, 'template': { 'version': hex(version), 'crc': hex(cfg_crc), }, 'data': { 'crc': hex(GetSettingsCrc(decode_cfg)), 'size': len(decode_cfg), }, 'script': { 'name': os.path.basename(__file__), 'version': VER, }, 'os': (platform.machine(), platform.system(), platform.release(), platform.version(), platform.platform()), 'python': platform.python_version(), } if 'cfg_crc' in setting: valuemapping['header']['template'].update({'size': cfg_size}) if 'cfg_crc32' in setting: valuemapping['header']['template'].update({'crc32': hex(cfg_crc32)}) valuemapping['header']['data'].update({'crc32': hex(GetSettingsCrc32(decode_cfg))}) if 'version' in setting: valuemapping['header']['data'].update({'version': hex(cfg_version)}) return valuemapping def Mapping2Bin(decode_cfg, jsonconfig, filename=""): """ Encodes into binary data stream @param decode_cfg: binary config data (decrypted) @param jsonconfig: restore data mapping @param filename: name of the restore file (for error output only) @return: changed binary config data (decrypted) or None on error """ if isinstance(decode_cfg, str): decode_cfg = bytearray(decode_cfg) # get binary header data to use the correct version template from device version, size, setting = GetTemplateSetting(decode_cfg) # make empty binarray array _buffer = bytearray() # add data _buffer.extend(decode_cfg) if setting is not None: # iterate through restore data mapping for name in jsonconfig: # key must exist in both dict if name in setting: SetField(_buffer, name, setting[name], jsonconfig[name], addroffset=0, filename=filename) else: if name != 'header': exit(ExitCode.RESTORE_DATA_ERROR, "Restore file '{}' contains obsolete name '{}', skipped".format(filename, name), type_=LogType.WARNING, doexit=not args.ignorewarning) if 'cfg_crc' in setting: crc = GetSettingsCrc(_buffer) struct.pack_into(setting['cfg_crc'][0], _buffer, setting['cfg_crc'][1], crc) if 'cfg_crc32' in setting: crc32 = GetSettingsCrc32(_buffer) struct.pack_into(setting['cfg_crc32'][0], _buffer, setting['cfg_crc32'][1], crc32) return _buffer else: exit(ExitCode.UNSUPPORTED_VERSION,"File '{}', Tasmota configuration version 0x{:x} not supported".format(filename, version), type_=LogType.WARNING, doexit=not args.ignorewarning) return None def Mapping2Cmnd(decode_cfg, valuemapping, filename=""): """ Encodes mapping data into Tasmota command mapping @param decode_cfg: binary config data (decrypted) @param valuemapping: data mapping @param filename: name of the restore file (for error output only) @return: Tasmota command mapping {group: [cmnd <,cmnd <,...>>]} """ if isinstance(decode_cfg, str): decode_cfg = bytearray(decode_cfg) # get binary header data to use the correct version template from device version, size, setting = GetTemplateSetting(decode_cfg) cmnds = {} if setting is not None: # iterate through restore data mapping for name in valuemapping: # key must exist in both dict if name in setting: cmnds = SetCmnd(cmnds, name, setting[name], valuemapping, valuemapping[name], addroffset=0) else: if name != 'header': exit(ExitCode.RESTORE_DATA_ERROR, "Restore file '{}' contains obsolete name '{}', skipped".format(filename, name), type_=LogType.WARNING, doexit=not args.ignorewarning) return cmnds else: exit(ExitCode.UNSUPPORTED_VERSION,"File '{}', Tasmota configuration version 0x{:x} not supported".format(filename, version), type_=LogType.WARNING, doexit=not args.ignorewarning) return None def Backup(backupfile, backupfileformat, encode_cfg, decode_cfg, configmapping): """ Create backup file @param backupfile: Raw backup filename from program args @param backupfileformat: Backup file format @param encode_cfg: binary config data (encrypted) @param decode_cfg: binary config data (decrypted) @param configmapping: config data mapppings """ name, ext = os.path.splitext(backupfile) if ext.lower() == '.'+FileType.BIN.lower(): backupfileformat = FileType.BIN elif ext.lower() == '.'+FileType.DMP.lower(): backupfileformat = FileType.DMP elif ext.lower() == '.'+FileType.JSON.lower(): backupfileformat = FileType.JSON fileformat = "" # Tasmota format if backupfileformat.lower() == FileType.DMP.lower(): fileformat = "Tasmota" backup_filename = MakeFilename(backupfile, FileType.DMP, configmapping) if args.verbose: message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), type_=LogType.INFO) try: with open(backup_filename, "wb") as backupfp: backupfp.write(encode_cfg) except Exception, e: exit(e[0], "'{}' {}".format(backup_filename, e[1]),line=inspect.getlineno(inspect.currentframe())) # binary format elif backupfileformat.lower() == FileType.BIN.lower(): fileformat = "binary" backup_filename = MakeFilename(backupfile, FileType.BIN, configmapping) if args.verbose: message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), type_=LogType.INFO) try: with open(backup_filename, "wb") as backupfp: backupfp.write(struct.pack('>]} """ def OutputTasmotaSubCmnds(cmnds): if args.cmndsort: for cmnd in sorted(cmnds, key = lambda cmnd:[int(c) if c.isdigit() else c for c in re.split('(\d+)', cmnd)]): print "{}{}".format(" "*args.cmndindent, cmnd) else: for cmnd in cmnds: print "{}{}".format(" "*args.cmndindent, cmnd) groups = GetGroupList(Settings[0][2]) if args.cmndgroup: for group in groups: if group.title() in (groupname.title() for groupname in tasmotacmnds): cmnds = tasmotacmnds[group] print print "# {}:".format(group) OutputTasmotaSubCmnds(cmnds) else: cmnds = [] for group in groups: if group.title() in (groupname.title() for groupname in tasmotacmnds): cmnds.extend(tasmotacmnds[group]) OutputTasmotaSubCmnds(cmnds) def ParseArgs(): """ Program argument parser @return: configargparse.parse_args() result """ global parser parser = configargparse.ArgumentParser(description='Backup/Restore Tasmota configuration data.', epilog='Either argument -d or -f must be given.', add_help=False, formatter_class=lambda prog: CustomHelpFormatter(prog)) source = parser.add_argument_group('Source', 'Read/Write Tasmota configuration from/to') source.add_argument('-f', '--file', '--tasmota-file', metavar='', dest='tasmotafile', default=DEFAULTS['source']['tasmotafile'], help="file to retrieve/write Tasmota configuration from/to (default: {})'".format(DEFAULTS['source']['tasmotafile'])) source.add_argument('-d', '--device', '--host', metavar='', dest='device', default=DEFAULTS['source']['device'], help="hostname or IP address to retrieve/send Tasmota configuration from/to (default: {})".format(DEFAULTS['source']['device']) ) source.add_argument('-P', '--port', metavar='', dest='port', default=DEFAULTS['source']['port'], help="TCP/IP port number to use for the host connection (default: {})".format(DEFAULTS['source']['port']) ) source.add_argument('-u', '--username', metavar='', dest='username', default=DEFAULTS['source']['username'], help="host HTTP access username (default: {})".format(DEFAULTS['source']['username'])) source.add_argument('-p', '--password', metavar='', dest='password', default=DEFAULTS['source']['password'], help="host HTTP access password (default: {})".format(DEFAULTS['source']['password'])) backup = parser.add_argument_group('Backup/Restore', 'Backup & restore specification') backup.add_argument('-i', '--restore-file', metavar='', dest='restorefile', default=DEFAULTS['backup']['backupfile'], help="file to restore configuration from (default: {}). Replacements: @v=firmware version from config, @f=device friendly name from config, @h=device hostname from config, @H=device hostname from device (-d arg only)".format(DEFAULTS['backup']['restorefile'])) backup.add_argument('-o', '--backup-file', metavar='', dest='backupfile', default=DEFAULTS['backup']['backupfile'], help="file to backup configuration to (default: {}). Replacements: @v=firmware version from config, @f=device friendly name from config, @h=device hostname from config, @H=device hostname from device (-d arg only)".format(DEFAULTS['backup']['backupfile'])) backup_file_formats = ['json', 'bin', 'dmp'] backup.add_argument('-t', '--backup-type', metavar='|'.join(backup_file_formats), dest='backupfileformat', choices=backup_file_formats, default=DEFAULTS['backup']['backupfileformat'], help="backup filetype (default: '{}')".format(DEFAULTS['backup']['backupfileformat']) ) backup.add_argument('-E', '--extension', dest='extension', action='store_true', default=DEFAULTS['backup']['extension'], help="append filetype extension for -i and -o filename{}".format(' (default)' if DEFAULTS['backup']['extension'] else '') ) backup.add_argument('-e', '--no-extension', dest='extension', action='store_false', default=DEFAULTS['backup']['extension'], help="do not append filetype extension, use -i and -o filename as passed{}".format(' (default)' if not DEFAULTS['backup']['extension'] else '') ) backup.add_argument('-F', '--force-restore', dest='forcerestore', action='store_true', default=DEFAULTS['backup']['forcerestore'], help="force restore even configuration is identical{}".format(' (default)' if DEFAULTS['backup']['forcerestore'] else '') ) jsonformat = parser.add_argument_group('JSON output', 'JSON format specification') jsonformat.add_argument('--json-indent', metavar='', dest='jsonindent', type=int, default=DEFAULTS['jsonformat']['jsonindent'], help="pretty-printed JSON output using indent level (default: '{}'). -1 disables indent.".format(DEFAULTS['jsonformat']['jsonindent']) ) jsonformat.add_argument('--json-compact', dest='jsoncompact', action='store_true', default=DEFAULTS['jsonformat']['jsoncompact'], help="compact JSON output by eliminate whitespace{}".format(' (default)' if DEFAULTS['jsonformat']['jsoncompact'] else '') ) jsonformat.add_argument('--json-sort', dest='jsonsort', action='store_true', default=DEFAULTS['jsonformat']['jsonsort'], help=configargparse.SUPPRESS) #"sort json keywords{}".format(' (default)' if DEFAULTS['jsonformat']['jsonsort'] else '') ) jsonformat.add_argument('--json-unsort', dest='jsonsort', action='store_false', default=DEFAULTS['jsonformat']['jsonsort'], help=configargparse.SUPPRESS) #"do not sort json keywords{}".format(' (default)' if not DEFAULTS['jsonformat']['jsonsort'] else '') ) jsonformat.add_argument('--json-hide-pw', dest='jsonhidepw', action='store_true', default=DEFAULTS['jsonformat']['jsonhidepw'], help="hide passwords{}".format(' (default)' if DEFAULTS['jsonformat']['jsonhidepw'] else '') ) jsonformat.add_argument('--json-show-pw', '--json-unhide-pw', dest='jsonhidepw', action='store_false', default=DEFAULTS['jsonformat']['jsonhidepw'], help="unhide passwords{}".format(' (default)' if not DEFAULTS['jsonformat']['jsonhidepw'] else '') ) cmndformat = parser.add_argument_group('Tasmota command output', 'Tasmota command output format specification') cmndformat.add_argument('--cmnd-indent', metavar='', dest='cmndindent', type=int, default=DEFAULTS['cmndformat']['cmndindent'], help="Tasmota command grouping indent level (default: '{}'). 0 disables indent".format(DEFAULTS['cmndformat']['cmndindent']) ) cmndformat.add_argument('--cmnd-groups', dest='cmndgroup', action='store_true', default=DEFAULTS['cmndformat']['cmndgroup'], help="group Tasmota commands{}".format(' (default)' if DEFAULTS['cmndformat']['cmndgroup'] else '') ) cmndformat.add_argument('--cmnd-nogroups', dest='cmndgroup', action='store_false', default=DEFAULTS['cmndformat']['cmndgroup'], help="leave Tasmota commands ungrouped{}".format(' (default)' if not DEFAULTS['cmndformat']['cmndgroup'] else '') ) cmndformat.add_argument('--cmnd-sort', dest='cmndsort', action='store_true', default=DEFAULTS['cmndformat']['cmndsort'], help="sort Tasmota commands{}".format(' (default)' if DEFAULTS['cmndformat']['cmndsort'] else '') ) cmndformat.add_argument('--cmnd-unsort', dest='cmndsort', action='store_false', default=DEFAULTS['cmndformat']['cmndsort'], help="leave Tasmota commands unsorted{}".format(' (default)' if not DEFAULTS['cmndformat']['cmndsort'] else '') ) common = parser.add_argument_group('Common', 'Optional arguments') common.add_argument('-c', '--config', metavar='', dest='configfile', default=DEFAULTS['common']['configfile'], is_config_file=True, help="program config file - can be used to set default command args (default: {})".format(DEFAULTS['common']['configfile']) ) common.add_argument('-S', '--output', dest='output', action='store_true', default=DEFAULTS['common']['output'], help="display output regardsless of backup/restore usage{}".format(" (default)" if DEFAULTS['common']['output'] else " (default do not output on backup or restore usage)") ) output_formats = ['json', 'cmnd','command'] common.add_argument('-T', '--output-format', metavar='|'.join(output_formats), dest='outputformat', choices=output_formats, default=DEFAULTS['common']['outputformat'], help="display output format (default: '{}')".format(DEFAULTS['common']['outputformat']) ) groups = GetGroupList(Settings[0][2]) if '*' in groups: groups.remove('*') common.add_argument('-g', '--group', dest='filter', choices=groups, nargs='+', type=lambda s : s.title(), default=DEFAULTS['common']['filter'], help="limit data processing to command groups (default {})".format("no filter" if DEFAULTS['common']['filter'] == None else DEFAULTS['common']['filter']) ) common.add_argument('--ignore-warnings', dest='ignorewarning', action='store_true', default=DEFAULTS['common']['ignorewarning'], help="do not exit on warnings{}. Not recommended, used by your own responsibility!".format(' (default)' if DEFAULTS['common']['ignorewarning'] else '') ) info = parser.add_argument_group('Info','Extra information') info.add_argument('-D', '--debug', dest='debug', action='count', help=configargparse.SUPPRESS) info.add_argument('-h', '--help', dest='shorthelp', action='store_true', help='show usage help message and exit') info.add_argument("-H", "--full-help", action="help", help="show full help message and exit") info.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='produce more output about what the program does') info.add_argument('-V', '--version', action='version', version=PROG) args = parser.parse_args() if args.debug >= 1: print >> sys.stderr, parser.format_values() print >> sys.stderr, "Settings:" for k in args.__dict__: print >> sys.stderr, " "+str(k), "= ",eval('args.{}'.format(k)) return args if __name__ == "__main__": args = ParseArgs() if args.shorthelp: ShortHelp() # check source args if args.device is not None and args.tasmotafile is not None: exit(ExitCode.ARGUMENT_ERROR, "Unable to select source, do not use -d and -f together",line=inspect.getlineno(inspect.currentframe())) # default no configuration available encode_cfg = None # pull config from Tasmota device if args.tasmotafile is not None: if args.verbose: message("Load data from file '{}'".format(args.tasmotafile), type_=LogType.INFO) encode_cfg = LoadTasmotaConfig(args.tasmotafile) # load config from Tasmota file if args.device is not None: if args.verbose: message("Load data from device '{}'".format(args.device), type_=LogType.INFO) encode_cfg = PullTasmotaConfig(args.device, args.port, username=args.username, password=args.password) if encode_cfg is None: # no config source given ShortHelp(False) print print parser.epilog sys.exit(ExitCode.OK) if len(encode_cfg) == 0: exit(ExitCode.FILE_READ_ERROR, "Unable to read configuration data from {} '{}'".format('device' if args.device is not None else 'file', \ args.device if args.device is not None else args.tasmotafile) \ ,line=inspect.getlineno(inspect.currentframe()) ) # decrypt Tasmota config decode_cfg = DecryptEncrypt(encode_cfg) # decode into mappings dictionary configmapping = Bin2Mapping(decode_cfg) if args.verbose and 'version' in configmapping: message("{} '{}' is using Tasmota {}".format('File' if args.tasmotafile is not None else 'Device', args.tasmotafile if args.tasmotafile is not None else args.device, GetVersionStr(configmapping['version'])), type_=LogType.INFO) # backup to file if args.backupfile is not None: Backup(args.backupfile, args.backupfileformat, encode_cfg, decode_cfg, configmapping) # restore from file if args.restorefile is not None: Restore(args.restorefile, args.backupfileformat, encode_cfg, decode_cfg, configmapping) # json screen output if (args.backupfile is None and args.restorefile is None) or args.output: if args.outputformat == 'json': print json.dumps(configmapping, sort_keys=args.jsonsort, indent=None if args.jsonindent<0 else args.jsonindent, separators=(',', ':') if args.jsoncompact else (', ', ': ') ) if args.outputformat == 'cmnd' or args.outputformat == 'command': tasmotacmnds = Mapping2Cmnd(decode_cfg, configmapping) OutputTasmotaCmnds(tasmotacmnds) sys.exit(exitcode)