#!/usr/bin/env python # -*- coding: utf-8 -*- VER = '2.0.0005' """ decode-config.py - Backup/Restore Sonoff-Tasmota configuration data Copyright (C) 2018 Norbert Richter This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . Requirements: - Python - pip install json pycurl urllib2 configargparse Instructions: Execute command with option -d to retrieve config data from a host or use -f to read a configuration file saved using Tasmota Web-UI For further information read 'decode-config.md' For help execute command with argument -h (or -H for advanced help) Usage: decode-config.py [-f ] [-d ] [-P ] [-u ] [-p ] [-i ] [-o ] [-F json|bin|dmp] [-E] [-e] [--json-indent ] [--json-compact] [--json-hide-pw] [--json-unhide-pw] [-h] [-H] [-v] [-V] [-c ] [--ignore-warnings] Backup/Restore Sonoff-Tasmota configuration data. Args that start with '--' (eg. -f) can also be set in a config file (specified via -c). Config file syntax allows: key=value, flag=true, stuff=[a,b,c] (for details, see syntax at https://goo.gl/R74nmi). If an arg is specified in more than one place, then commandline values override config file values which override defaults. optional arguments: -c, --config program config file - can be used to set default command args (default: None) --ignore-warnings do not exit on warnings. Not recommended, used by your own responsibility! Source: Read/Write Tasmota configuration from/to -f, --file, --tasmota-file file to retrieve/write Tasmota configuration from/to (default: None)' -d, --device, --host hostname or IP address to retrieve/send Tasmota configuration from/to (default: None) -P, --port TCP/IP port number to use for the host connection (default: 80) -u, --username host HTTP access username (default: admin) -p, --password host HTTP access password (default: None) Backup/Restore: Backup/Restore configuration file specification -i, --restore-file file to restore configuration from (default: None). Replacements: @v=firmware version, @f=device friendly name, @h=device hostname -o, --backup-file file to backup configuration to (default: None). Replacements: @v=firmware version, @f=device friendly name, @h=device hostname -F, --backup-type json|bin|dmp backup filetype (default: 'json') -E, --extension append filetype extension for -i and -o filename (default) -e, --no-extension do not append filetype extension, use -i and -o filename as passed JSON: JSON backup format specification --json-indent pretty-printed JSON output using indent level (default: 'None'). -1 disables indent. --json-compact compact JSON output by eliminate whitespace --json-hide-pw hide passwords (default) --json-unhide-pw unhide passwords Info: additional information -h, --help show usage help message and exit -H, --full-help show full help message and exit -v, --verbose produce more output about what the program does -V, --version show program's version number and exit Either argument -d or -f must be given. Returns: 0: successful 1: restore skipped 2: program argument error 3: file not found 4: data size mismatch 5: data CRC error 6: unsupported configuration version 7: configuration file read error 8: JSON file decoding error 9: Restore file data error 10: Device data download error 11: Device data upload error 20: python module missing 21: Internal error >21: python library exit code 4xx, 5xx: HTTP errors """ class ExitCode: OK = 0 RESTORE_SKIPPED = 1 ARGUMENT_ERROR = 2 FILE_NOT_FOUND = 3 DATA_SIZE_MISMATCH = 4 DATA_CRC_ERROR = 5 UNSUPPORTED_VERSION = 6 FILE_READ_ERROR = 7 JSON_READ_ERROR = 8 RESTORE_DATA_ERROR = 9 DOWNLOAD_CONFIG_ERROR = 10 UPLOAD_CONFIG_ERROR = 11 MODULE_NOT_FOUND = 20 INTERNAL_ERROR = 21 import os.path import io import sys, platform def ModuleImportError(module): er = str(module) print >> sys.stderr, "{}. Try 'pip install {}' to install it".format(er,er.split(' ')[len(er.split(' '))-1]) sys.exit(ExitCode.MODULE_NOT_FOUND) try: from datetime import datetime import copy import struct import socket import re import math import inspect import json import configargparse import pycurl import urllib2 except ImportError, e: ModuleImportError(e) PROG='{} v{} by Norbert Richter '.format(os.path.basename(sys.argv[0]),VER) CONFIG_FILE_XOR = 0x5A BINARYFILE_MAGIC = 0x63576223 STR_ENCODING = 'utf8' DEFAULTS = { 'DEFAULT': { 'configfile': None, 'ignorewarning':False, }, 'source': { 'device': None, 'port': 80, 'username': 'admin', 'password': None, 'tasmotafile': None, }, 'backup': { 'restorefile': None, 'backupfile': None, 'backupfileformat': 'json', 'extension': True, }, 'jsonformat': { 'jsonindent': None, 'jsoncompact': False, 'jsonsort': True, 'jsonrawvalues':False, 'jsonrawkeys': False, 'jsonhidepw': False, }, } args = {} exitcode = 0 """ Settings dictionary describes the config file fields definition: Each setting name has a tuple containing the following items: (format, baseaddr, datadef, ) where format Define the data interpretation. It is either a string or a tuple containing a string and a sub-Settings dictionary. 'xxx': A string is used to interpret the data at The string defines the format interpretion as described in 'struct module format string', see https://docs.python.org/2.7/library/struct.html#format-strings In addition to this format string there is as special meaning of a dot '.' - this means a bit with an optional prefix length. If no prefix is given, 1 is assumed. {}: A dictionary describes itself a 'Settings' dictonary (recursive) baseaddr The address (starting from 0) within config data. For bit fields must be a tuple. n: Defines a simple address within config data. must be a positive integer. (n, b, s): A tuple defines a bit field: is the address within config data (integer) how many bits are used (positive integer) bit shift (integer) positive shift the result right bits negative shift the result left bits datadef Data definition, is either a array definition or a tuple containing an array definition and min/max values Format: arraydef|(arraydef, min, max) arraydef: None: None must be given if the field contains a simple value desrcibed by the prefix n: [n]: Defines a one-dimensional array of size [n, m <,o...>] Defines a multi-dimensional array min: defines a minimum valid value or None if all values for this format is allowed. max: defines a maximum valid value or None if all values for this format is allowed. converter (optional) Conversion methode(s): 'xxx'|func or ('xxx'|func, 'xxx'|func) Read conversion is used if args.jsonrawvalues is False Write conversion is used if jsonrawvalues from restore json file is False or args.jsonrawvalues is False. Converter is either a single methode 'xxx'|func or a tuple Single methode will be used for reading conversion only: 'xxx': string will used for reading conversion and will be evaluate as is, this can also contain python code. Use '$' for current value. func: name of a formating function that will be used for reading conversion None: will read as definied in (read, write): a tuple with 2 objects. Each can be of the same type as the single method above ('xxx'|func) or None. read: method will be used for read conversion (unpack data from dmp object) write: method will be used for write conversion (pack data to dmp object) If write method is None indicates value is readable only and will not be write """ def passwordread(value): return "********" if args.jsonhidepw else value def passwordwrite(value): return None if value=="********" else value Setting_5_10_0 = { 'cfg_holder': (' version number from read binary data to search for @return: template sizes as list [] """ sizes = [] for cfg in Settings: sizes.append(cfg[1]) # return unique sizes only (remove duplicates) return list(set(sizes)) def GetTemplateSetting(decode_cfg): """ Search for version, size and settings to be used depending on given binary config data @param decode_cfg: binary config data (decrypted) @return: version, size, settings to use; None if version is invalid """ version = 0x0 size = setting = None version = GetField(decode_cfg, 'version', Setting_6_2_1['version'], raw=True) # search setting definition top-down for cfg in sorted(Settings, key=lambda s: s[0], reverse=True): if version >= cfg[0]: version = cfg[0] size = cfg[1] setting = cfg[2].copy() break return version, size, setting class LogType: INFO = 'INFO' WARNING = 'WARNING' ERROR = 'ERROR' def message(msg, typ=None, status=None, line=None): """ Writes a message to stdout @param msg: message to output @param typ: INFO, WARNING or ERROR @param status: status number """ print >> sys.stderr, '{styp}{sdelimiter}{sstatus}{slineno}{scolon}{smgs}'.format(\ styp=typ if typ is not None else '', sdelimiter=' ' if status is not None and status>0 and typ is not None else '', sstatus=status if status is not None and status>0 else '', scolon=': ' if typ is not None or line is not None else '', smgs=msg, slineno=' (@{:04d})'.format(line) if line is not None else '') def exit(status=0, msg="end", typ=LogType.ERROR, src=None, doexit=True, line=None): """ Called when the program should be exit @param status: the exit status program returns to callert @param msg: the msg logged before exit @param typ: msg type: 'INFO', 'WARNING' or 'ERROR' @param doexit: True to exit program, otherwise return """ if src is not None: msg = '{} ({})'.format(src, msg) message(msg, typ=typ if status!=ExitCode.OK else LogType.INFO, status=status, line=line) exitcode = status if doexit: sys.exit(exitcode) def ShortHelp(doexit=True): """ Show short help (usage) only - ued by own -h handling @param doexit: sys.exit with OK if True """ print parser.description print parser.print_usage() print print "For advanced help use '{prog} -H' or '{prog} --full-help'".format(prog=os.path.basename(sys.argv[0])) if doexit: sys.exit(ExitCode.OK) class HTTPHeader: """ pycurl helper class retrieving the request header """ def __init__(self): self.contents = '' def clear(self): self.contents = '' def store(self, _buffer): self.contents = "{}{}".format(self.contents, _buffer) def response(self): header = str(self.contents).split('\n') if len(header)>0: return header[0].rstrip() return '' def contenttype(self): for item in str(self.contents).split('\n'): ditem = item.split(":") if ditem[0].strip().lower()=='content-type' and len(ditem)>1: return ditem[1].strip() return '' def __str__(self): return self.contents class CustomHelpFormatter(configargparse.HelpFormatter): """ Class for customizing the help output """ def _format_action_invocation(self, action): """ Reformat multiple metavar output -d , --device , --host to single output -d, --device, --host """ orgstr = configargparse.HelpFormatter._format_action_invocation(self, action) if orgstr and orgstr[0] != '-': # only optional arguments return orgstr res = getattr(action, '_formatted_action_invocation', None) if res: return res options = orgstr.split(', ') if len(options) <=1: action._formatted_action_invocation = orgstr return orgstr return_list = [] for option in options: meta = "" arg = option.split(' ') if len(arg)>1: meta = arg[1] return_list.append(arg[0]) if len(meta) >0 and len(return_list) >0: return_list[len(return_list)-1] += " "+meta action._formatted_action_invocation = ', '.join(return_list) return action._formatted_action_invocation # ---------------------------------------------------------------------- # Tasmota config data handling # ---------------------------------------------------------------------- class FileType: FILE_NOT_FOUND = None DMP = 'dmp' JSON = 'json' BIN = 'bin' UNKNOWN = 'unknown' INCOMPLETE_JSON = 'incomplete json' INVALID_JSON = 'invalid json' INVALID_BIN = 'invalid bin' def GetFileType(filename): """ Get the FileType class member of a given filename @param filename: filename of the file to analyse @return: FileType class member """ filetype = FileType.UNKNOWN # try filename try: isfile = os.path.isfile(filename) try: f = open(filename, "r") try: # try reading as json inputjson = json.load(f) if 'header' in inputjson: filetype = FileType.JSON else: filetype = FileType.INCOMPLETE_JSON except ValueError: filetype = FileType.INVALID_JSON # not a valid json, get filesize and compare it with all possible sizes try: size = os.path.getsize(filename) except: filetype = FileType.UNKNOWN sizes = GetTemplateSizes() # size is one of a dmp file size if size in sizes: filetype = FileType.DMP elif (size - ((len(hex(BINARYFILE_MAGIC))-2)/2)) in sizes: # check if the binary file has the magic header try: inputfile = open(filename, "rb") inputbin = inputfile.read() inputfile.close() if struct.unpack_from('>24) & 0xff) minor = ((version>>16) & 0xff) release = ((version>> 8) & 0xff) subrelease = (version & 0xff) if major>=6: if subrelease>0: subreleasestr = str(subrelease) else: subreleasestr = '' else: if subrelease>0: subreleasestr = str(chr(subrelease+ord('a')-1)) else: subreleasestr = '' return "{:d}.{:d}.{:d}{}{}".format( major, minor, release, '.' if (major>=6 and subreleasestr!='') else '', subreleasestr) def MakeValidFilename(filename): """ Make a valid filename @param filename: filename src @return: valid filename removed invalid chars and replace space with _ """ try: filename = filename.decode('unicode-escape').translate(dict((ord(char), None) for char in '\/*?:"<>|')) except: pass return str(filename.replace(' ','_')) def MakeFilename(filename, filetype, decode_cfg): """ Replace variable within a filename @param filename: original filename possible containing replacements: @v: Tasmota version @f: friendlyname @h: hostname @param filetype: FileType.x object - creates extension if not None @param decode_cfg: binary config data (decrypted) @return: New filename with replacements """ v = f1 = f2 = f3 = f4 = '' if 'version' in decode_cfg: v = GetVersionStr( int(str(decode_cfg['version']), 0) ) filename = filename.replace('@v', v) if 'friendlyname' in decode_cfg: filename = filename.replace('@f', decode_cfg['friendlyname'][0] ) if 'hostname' in decode_cfg: filename = filename.replace('@h', decode_cfg['hostname'] ) dirname = basename = ext = '' try: dirname = os.path.normpath(os.path.dirname(filename)) basename = os.path.basename(filename) name, ext = os.path.splitext(basename) except: pass name = MakeValidFilename(name) if len(ext) and ext[0]=='.': ext = ext[1:] if filetype is not None and args.extension and (len(ext)<2 or all(c.isdigit() for c in ext)): ext = filetype.lower() if len(ext): name_ext = name+'.'+ext else: name_ext = name try: filename = os.path.join(dirname, name_ext) except: pass return filename def MakeUrl(host, port=80, location=''): """ Create a Tasmota host url @param host: hostname or IP of Tasmota host @param port: port number to use for http connection @param location: http url location @return: Tasmota http url """ return "http://{shost}{sdelimiter}{sport}/{slocation}".format(\ shost=host, sdelimiter=':' if port != 80 else '', sport=port if port != 80 else '', slocation=location ) def LoadTasmotaConfig(filename): """ Load config from Tasmota file @param filename: filename to load @return: binary config data (encrypted) or None on error """ encode_cfg = None # read config from a file if not os.path.isfile(filename): # check file exists exit(ExitCode.FILE_NOT_FOUND, "File '{}' not found".format(filename),line=inspect.getlineno(inspect.currentframe())) try: tasmotafile = open(filename, "rb") encode_cfg = tasmotafile.read() tasmotafile.close() except Exception, e: exit(e[0], "'{}' {}".format(filename, e[1]),line=inspect.getlineno(inspect.currentframe())) return encode_cfg def PullTasmotaConfig(host, port, username=DEFAULTS['source']['username'], password=None): """ Pull config from Tasmota device @param host: hostname or IP of Tasmota device @param port: http port of Tasmota device @param username: optional username for Tasmota web login @param password optional password for Tasmota web login @return: binary config data (encrypted) or None on error """ encode_cfg = None # read config direct from device via http c = pycurl.Curl() buffer = io.BytesIO() c.setopt(c.WRITEDATA, buffer) header = HTTPHeader() c.setopt(c.HEADERFUNCTION, header.store) c.setopt(c.FOLLOWLOCATION, True) c.setopt(c.URL, MakeUrl(host, port, 'dl')) if username is not None and password is not None: c.setopt(c.HTTPAUTH, c.HTTPAUTH_BASIC) c.setopt(c.USERPWD, username + ':' + password) c.setopt(c.VERBOSE, False) responsecode = 200 try: c.perform() responsecode = c.getinfo(c.RESPONSE_CODE) response = header.response() except Exception, e: exit(e[0], e[1],line=inspect.getlineno(inspect.currentframe())) finally: c.close() if responsecode >= 400: exit(responsecode, 'HTTP result: {}'.format(header.response()),line=inspect.getlineno(inspect.currentframe())) elif header.contenttype()!='application/octet-stream': exit(ExitCode.DOWNLOAD_CONFIG_ERROR, "Device did not response properly, may be Tasmota webserver admin mode is disabled (WebServer 2)",line=inspect.getlineno(inspect.currentframe())) try: encode_cfg = buffer.getvalue() except: pass return encode_cfg def PushTasmotaConfig(encode_cfg, host, port, username=DEFAULTS['source']['username'], password=None): """ Upload binary data to a Tasmota host using http @param encode_cfg: encrypted binary data or filename containing Tasmota encrypted binary config @param host: hostname or IP of Tasmota device @param username: optional username for Tasmota web login @param password optional password for Tasmota web login @return errorcode, errorstring errorcode=0 if success, otherwise http response or exception code """ # ~ return 0, 'OK' if isinstance(encode_cfg, bytearray): encode_cfg = str(encode_cfg) c = pycurl.Curl() buffer = io.BytesIO() c.setopt(c.WRITEDATA, buffer) header = HTTPHeader() c.setopt(c.HEADERFUNCTION, header.store) c.setopt(c.FOLLOWLOCATION, True) # get restore config page first to set internal Tasmota vars c.setopt(c.URL, MakeUrl(host, port, 'rs?')) if args.username is not None and args.password is not None: c.setopt(c.HTTPAUTH, c.HTTPAUTH_BASIC) c.setopt(c.USERPWD, args.username + ':' + args.password) c.setopt(c.HTTPGET, True) c.setopt(c.VERBOSE, False) responsecode = 200 try: c.perform() responsecode = c.getinfo(c.RESPONSE_CODE) except Exception, e: c.close() return e[0], e[1] if responsecode>=400: c.close() return responsecode, header.response() elif header.contenttype()!='text/html': c.close() return ExitCode.UPLOAD_CONFIG_ERROR, "Device did not response properly, may be Tasmota webserver admin mode is disabled (WebServer 2)" # post data header.clear() c.setopt(c.HEADERFUNCTION, header.store) c.setopt(c.POST, 1) c.setopt(c.URL, MakeUrl(host, port, 'u2')) try: isfile = os.path.isfile(encode_cfg) except: isfile = False if isfile: c.setopt(c.HTTPPOST, [("file", (c.FORM_FILE, encode_cfg))]) else: # use as binary data c.setopt(c.HTTPPOST, [ ('fileupload', ( c.FORM_BUFFER, '{sprog}_v{sver}.dmp'.format(sprog=os.path.basename(sys.argv[0]), sver=VER), c.FORM_BUFFERPTR, encode_cfg )), ]) responsecode = 200 try: c.perform() responsecode = c.getinfo(c.RESPONSE_CODE) except Exception, e: return e[0], e[1] c.close() if responsecode>=400: return responsecode, header.response() elif header.contenttype()!='text/html': return ExitCode.UPLOAD_CONFIG_ERROR, "Device did not response properly, may be Tasmota webserver admin mode is disabled (WebServer 2)" return 0, 'OK' def DecryptEncrypt(obj): """ Decrpt/Encrypt binary config data @param obj: binary config data @return: decrypted configuration (if obj contains encrypted data) encrypted configuration (if obj contains decrypted data) """ if isinstance(obj, bytearray): obj = str(obj) dobj = obj[0:2] for i in range(2, len(obj)): dobj += chr( (ord(obj[i]) ^ (CONFIG_FILE_XOR +i)) & 0xff ) return dobj def GetSettingsCrc(dobj): """ Return binary config data calclulated crc @param dobj: decrypted binary config data @return: 2 byte unsigned integer crc value """ if isinstance(dobj, bytearray): dobj = str(dobj) crc = 0 for i in range(0, len(dobj)): if not i in [14,15]: # Skip crc byte = ord(dobj[i]) crc += byte * (i+1) return crc & 0xffff def GetFieldDef(fielddef): """ Get the field def items @param fielddef: field format - see "Settings dictionary" above @return: , , , , , undefined items can be None """ format = baseaddr = datadef = convert = None bits = bitshift = 0 if len(fielddef)==3: # def without convert tuple format, baseaddr, datadef = fielddef elif len(fielddef)==4: # def with convert tuple format, baseaddr, datadef, convert = fielddef if isinstance(baseaddr, (list,tuple)): baseaddr, bits, bitshift = baseaddr if isinstance(datadef, int): # convert single int into list with one item datadef = [datadef] return format, baseaddr, bits, bitshift, datadef, convert def MakeFieldBaseAddr(baseaddr, bits, bitshift): """ Return a based on given arguments @param baseaddr: baseaddr from Settings definition @param bits: 0 or bits @param bitshift: 0 or bitshift @return: (,,) if bits != 0 baseaddr if bits == 0 """ if bits!=0: return (baseaddr, bits, bitshift) return baseaddr def ConvertFieldValue(value, fielddef, read=True, raw=False): """ Convert field value based on field desc @param value: original value @param fielddef field definition - see "Settings dictionary" above @param read use read conversion if True, otherwise use write conversion @param raw return raw values (True) or converted values (False) @return: (un)converted value """ format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef) # call password functions even if raw value should be processed if callable(convert) and (convert==passwordread or convert==passwordwrite): raw = False if isinstance(convert, (list,tuple)) and len(convert)>0 and (convert[0]==passwordread or convert[0]==passwordwrite): raw = False if isinstance(convert, (list,tuple)) and len(convert)>1 and (convert[1]==passwordread or convert[1]==passwordwrite): raw = False if not raw and convert is not None: if isinstance(convert, (list,tuple)): # extract read conversion if tuple is given if read: convert = convert[0] else: convert = convert[1] try: if isinstance(convert, str): # evaluate strings return eval(convert.replace('$','value')) elif callable(convert): # use as format function return convert(value) except: pass return value def GetFieldMinMax(fielddef): """ Get minimum, maximum of field based on field format definition @param fielddef: field format - see "Settings dictionary" above @return: min, max """ minmax = {'c': (0, 1), '?': (0, 1), 'b': (~0x7f, 0x7f), 'B': (0, 0xff), 'h': (~0x7fff, 0x7fff), 'H': (0, 0xffff), 'i': (~0x7fffffff, 0x7fffffff), 'I': (0, 0xffffffff), 'l': (~0x7fffffff, 0x7fffffff), 'L': (0, 0xffffffff), 'q': (~0x7fffffffffffffff, 0x7fffffffffffffff), 'Q': (0, 0x7fffffffffffffff), 'f': (sys.float_info.min, sys.float_info.max), 'd': (sys.float_info.min, sys.float_info.max), } format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef) _min = 0 _max = 0 if format[-1:] in minmax: _min, _max = minmax[format[-1:]] elif format[-1:] in ['s','p']: # s and p may have a prefix as length match = re.search("\s*(\d+)", format) if match: _max=int(match.group(0)) return _min,_max def GetFieldLength(fielddef): """ Get length of a field in bytes based on field format definition @param fielddef: field format - see "Settings dictionary" above @return: length of field in bytes """ length=0 format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef) if datadef is not None: # datadef contains a list # calc size recursive by sum of all elements if isinstance(datadef, list): for i in range(0, datadef[0]): # multidimensional array if isinstance(datadef, list) and len(datadef)>1: length += GetFieldLength( (fielddef[0], fielddef[1], fielddef[2][1:]) ) # single array else: length += GetFieldLength( (fielddef[0], fielddef[1], None) ) else: if isinstance(format, dict): # -> iterate through format addr = None setting = format for name in setting: _dummy1, baseaddr, bits, bitshift, _dummy2, _dummy3 = GetFieldDef(setting[name]) _len = GetFieldLength(setting[name]) if addr != baseaddr: addr = baseaddr length += _len else: if format[-1:] in ['b','B','c','?']: length=1 elif format[-1:] in ['h','H']: length=2 elif format[-1:] in ['i','I','l','L','f']: length=4 elif format[-1:] in ['q','Q','d']: length=8 elif format[-1:] in ['s','p']: # s and p may have a prefix as length match = re.search("\s*(\d+)", format) if match: length=int(match.group(0)) return length def GetSubfieldDef(fielddef): """ Get subfield definition from a given field definition @param fielddef: see Settings desc above @return: subfield definition """ subfielddef = None format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef) if isinstance(datadef, list) and len(datadef)>1: if len(fielddef)<4: subfielddef = (format, MakeFieldBaseAddr(baseaddr, bits, bitshift), datadef[1:]) else: subfielddef = (format, MakeFieldBaseAddr(baseaddr, bits, bitshift), datadef[1:], convert) # single array else: if len(fielddef)<4: subfielddef = (format, MakeFieldBaseAddr(baseaddr, bits, bitshift), None) else: subfielddef = (format, MakeFieldBaseAddr(baseaddr, bits, bitshift), None, convert) return subfielddef def GetField(dobj, fieldname, fielddef, raw=False, addroffset=0): """ Get field value from definition @param dobj: decrypted binary config data @param fieldname: name of the field @param fielddef: see Settings desc above @param raw return raw values (True) or converted values (False) @param addroffset use offset for baseaddr (used for recursive calls) @return: read field value """ if isinstance(dobj, bytearray): dobj = str(dobj) result = None if fieldname == 'raw' and not args.jsonrawkeys: return result # get field definition format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef) # contains a integer list if isinstance(datadef, list): result = [] offset = 0 for i in range(0, datadef[0]): subfielddef = GetSubfieldDef(fielddef) length = GetFieldLength(subfielddef) if length != 0: result.append(GetField(dobj, fieldname, subfielddef, raw=raw, addroffset=addroffset+offset)) offset += length # contains a dict elif isinstance(format, dict): config = {} for name in format: # -> iterate through format if name != 'raw' or args.jsonrawkeys: config[name] = GetField(dobj, name, format[name], raw=raw, addroffset=addroffset) result = config # a simple value elif isinstance(format, (str, bool, int, float, long)): if GetFieldLength(fielddef) != 0: result = struct.unpack_from(format, dobj, baseaddr+addroffset)[0] if not format[-1:].lower() in ['s','p']: if bitshift>=0: result >>= bitshift else: result <<= abs(bitshift) if bits>0: result &= (1< 127 result = unicode(s, errors='ignore') result = ConvertFieldValue(result, fielddef, read=True, raw=raw) else: exit(ExitCode.INTERNAL_ERROR, "Wrong mapping format definition: '{}'".format(format), typ=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe())) return result def SetField(dobj, fieldname, fielddef, restore, raw=False, addroffset=0, filename=""): """ Get field value from definition @param dobj: decrypted binary config data @param fieldname: name of the field @param fielddef: see Settings desc above @param raw handle values as raw values (True) or converted (False) @param addroffset use offset for baseaddr (used for recursive calls) @param restore restore mapping with the new value(s) """ format, baseaddr, bits, bitshift, datadef, convert = GetFieldDef(fielddef) fieldname = str(fieldname) if fieldname == 'raw' and not args.jsonrawkeys: return dobj # do not write readonly values if isinstance(convert, (list,tuple)) and len(convert)>1 and convert[1]==None: if args.debug: print >> sys.stderr, "SetField(): Readonly '{}' using '{}'/{}{} @{} skipped".format(fieldname, format, datadef, bits, hex(baseaddr+addroffset)) return dobj # contains a list if isinstance(datadef, list): offset = 0 if len(restore)>datadef[0]: exit(ExitCode.RESTORE_DATA_ERROR, "file '{sfile}', array '{sname}[{selem}]' exceeds max number of elements [{smax}]".format(sfile=filename, sname=fieldname, selem=len(restore), smax=datadef[0]), typ=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe())) for i in range(0, datadef[0]): subfielddef = GetSubfieldDef(fielddef) length = GetFieldLength(subfielddef) if length != 0: if i>=len(restore): # restore data list may be shorter than definition break try: subrestore = restore[i] dobj = SetField(dobj, fieldname, subfielddef, subrestore, raw=raw, addroffset=addroffset+offset, filename=filename) except: pass offset += length # contains a dict elif isinstance(format, dict): for name in format: # -> iterate through format if name in restore: dobj = SetField(dobj, name, format[name], restore[name], raw=raw, addroffset=addroffset, filename=filename) # a simple value elif isinstance(format, (str, bool, int, float, long)): valid = True err = "" errformat = "" _min, _max = GetFieldMinMax(fielddef) value = _value = None skip = False # simple one value if format[-1:] in ['c']: try: value = ConvertFieldValue(restore.encode(STR_ENCODING)[0], fielddef, read=False, raw=raw) except: err = "valid range exceeding" valid = False # bool elif format[-1:] in ['?']: try: value = ConvertFieldValue(bool(restore), fielddef, read=False, raw=raw) except: err = "valid range exceeding" valid = False # integer elif format[-1:] in ['b','B','h','H','i','I','l','L','q','Q','P']: try: value = ConvertFieldValue(restore, fielddef, read=False, raw=raw) if isinstance(value, (str, unicode)): value = int(value, 0) else: value = int(value) # bit value if bits!=0: value = struct.unpack_from(format, dobj, baseaddr+addroffset)[0] bitvalue = int(restore) mask = (1<mask: _min = 0 _max = mask _value = bitvalue valid = False err = "valid bit range exceeding" else: if bitshift>=0: bitvalue <<= bitshift mask <<= bitshift else: bitvalue >>= abs(bitshift) mask >>= abs(bitshift) value &= (0xffffffff ^ mask) value |= bitvalue # full size values else: _value = value except: valid = False err = "valid range exceeding" # float elif format[-1:] in ['f','d']: try: value = ConvertFieldValue(float(restore), fielddef, read=False, raw=raw) except: err = "valid range exceeding" valid = False # string elif format[-1:] in ['s','p']: try: value = ConvertFieldValue(restore.encode(STR_ENCODING), fielddef, read=False, raw=raw) err = "string length exceeding" if value is not None: # be aware 0 byte at end of string (str must be < max, not <= max) _max -= 1 valid = _min <= len(value) < _max else: skip = True valid = True except: valid = False if value is None and not skip: # None is an invalid value valid = False if valid is None and not skip: # validate against object type size valid = _min <= value <= _max if not valid: err = "type range exceeding" errformat = " [{smin},{smax}]" if _value is None: # copy value before possible change below _value = value if isinstance(value, (str, unicode)): _value = "'{}'".format(_value) if valid: if not skip: if args.debug: if bits: sbits=" {} bits shift {}".format(bits, bitshift) else: sbits = "" print >> sys.stderr, "SetField(): Set '{}' using '{}'/{}{} @{} to {}".format(fieldname, format, datadef, sbits, hex(baseaddr+addroffset), _value) if fieldname != 'cfg_crc': prevvalue = struct.unpack_from(format, dobj, baseaddr+addroffset)[0] struct.pack_into(format, dobj, baseaddr+addroffset, value) curvalue = struct.unpack_from(format, dobj, baseaddr+addroffset)[0] if prevvalue != curvalue and args.verbose: message("Value for '{}' changed from {} to {}".format(fieldname, prevvalue, curvalue), typ=LogType.INFO) else: sformat = "file '{sfile}' - {{'{sname}': {svalue}}} ({serror})"+errformat exit(ExitCode.RESTORE_DATA_ERROR, sformat.format(sfile=filename, sname=fieldname, serror=err, svalue=_value, smin=_min, smax=_max), typ=LogType.WARNING, doexit=not args.ignorewarning) return dobj def Bin2Mapping(decode_cfg, raw=True): """ Decodes binary data stream into pyhton mappings dict @param decode_cfg: binary config data (decrypted) @param raw: decode raw values (True) or converted values (False) @return: config data as mapping dictionary """ if isinstance(decode_cfg, bytearray): decode_cfg = str(decode_cfg) # get binary header to use version, size, setting = GetTemplateSetting(decode_cfg) # if we did not found a mathching setting if setting is None: exit(ExitCode.UNSUPPORTED_VERSION, "Tasmota configuration version 0x{:x} not supported".format(version),line=inspect.getlineno(inspect.currentframe())) if 'version' in setting: cfg_version = GetField(decode_cfg, 'version', setting['version'], raw=True) # check size if exists if 'cfg_size' in setting: cfg_size = GetField(decode_cfg, 'cfg_size', setting['cfg_size'], raw=True) # read size should be same as definied in template if cfg_size > size: # may be processed exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read does ot match - read {}, expected {} byte".format(cfg_size, size), typ=LogType.ERROR,line=inspect.getlineno(inspect.currentframe())) elif cfg_size < size: # less number of bytes can not be processed exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read to small to process - read {}, expected {} byte".format(cfg_size, size), typ=LogType.ERROR,line=inspect.getlineno(inspect.currentframe())) # check crc if exists if 'cfg_crc' in setting: cfg_crc = GetField(decode_cfg, 'cfg_crc', setting['cfg_crc'], raw=True) else: cfg_crc = GetSettingsCrc(decode_cfg) if cfg_crc != GetSettingsCrc(decode_cfg): exit(ExitCode.DATA_CRC_ERROR, 'Data CRC error, read 0x{:x} should be 0x{:x}'.format(cfg_crc, GetSettingsCrc(decode_cfg)), typ=LogType.WARNING, doexit=not args.ignorewarning,line=inspect.getlineno(inspect.currentframe())) # get config config = GetField(decode_cfg, None, (setting,None,None), raw=raw) # add header info timestamp = datetime.now() config['header'] = {'timestamp':timestamp.strftime("%Y-%m-%d %H:%M:%S"), 'format': { 'jsonindent': args.jsonindent, 'jsoncompact': args.jsoncompact, 'jsonsort': args.jsonsort, 'jsonrawvalues':args.jsonrawvalues, 'jsonrawkeys': args.jsonrawkeys, 'jsonhidepw': args.jsonhidepw, }, 'template': { 'version': hex(version), 'crc': hex(cfg_crc), }, 'data': { 'crc': hex(GetSettingsCrc(decode_cfg)), 'size': len(decode_cfg), }, 'script': { 'name': os.path.basename(__file__), 'version': VER, }, 'os': (platform.machine(), platform.system(), platform.release(), platform.version(), platform.platform()), 'python': platform.python_version(), } if 'cfg_crc' in setting: config['header']['template'].update({'size': cfg_size}) if 'version' in setting: config['header']['data'].update({'version': hex(cfg_version)}) return config def Mapping2Bin(decode_cfg, jsonconfig, filename=""): """ Encodes into binary data stream @param decode_cfg: binary config data (decrypted) @param jsonconfig: restore data mapping @param filename: name of the restore file (for error output only) @return: changed binary config data (decrypted) """ if isinstance(decode_cfg, str): decode_cfg = bytearray(decode_cfg) # get binary header data to use the correct version template from device version, size, setting = GetTemplateSetting(decode_cfg) _buffer = bytearray() _buffer.extend(decode_cfg) if setting is not None: try: raw = jsonconfig['header']['format']['jsonrawvalues'] except: if 'header' not in jsonconfig: errkey = 'header' elif 'format' not in jsonconfig['header']: errkey = 'header.format' elif 'jsonrawvalues' not in jsonconfig['header']['format']: errkey = 'header.format.jsonrawvalues' exit(ExitCode.RESTORE_DATA_ERROR, "Restore file '{sfile}' name '{skey}' missing, don't know how to evaluate restore data!".format(sfile=filename, skey=errkey), typ=LogType.ERROR, doexit=not args.ignorewarning) # iterate through restore data mapping for name in jsonconfig: # key must exist in both dict if name in setting: SetField(_buffer, name, setting[name], jsonconfig[name], raw=raw, addroffset=0, filename=filename) else: if name != 'header': exit(ExitCode.RESTORE_DATA_ERROR, "Restore file '{}' contains obsolete name '{}', skipped".format(filename, name), typ=LogType.WARNING, doexit=not args.ignorewarning) crc = GetSettingsCrc(_buffer) struct.pack_into(setting['cfg_crc'][0], _buffer, setting['cfg_crc'][1], crc) return _buffer else: exit(ExitCode.UNSUPPORTED_VERSION,"File '{}', Tasmota configuration version 0x{:x} not supported".format(filename, version), typ=LogType.WARNING, doexit=not args.ignorewarning) return decode_cfg def Backup(backupfile, backupfileformat, encode_cfg, decode_cfg, configuration): """ Create backup file @param backupfile: Raw backup filename from program args @param backupfileformat: Backup file format @param encode_cfg: binary config data (encrypted) @param decode_cfg: binary config data (decrypted) @param configuration: config data mapppings """ backupfileformat = args.backupfileformat try: name, ext = os.path.splitext(backupfile) if ext.lower() == '.'+FileType.BIN.lower(): backupfileformat = FileType.BIN elif ext.lower() == '.'+FileType.DMP.lower(): backupfileformat = FileType.DMP elif ext.lower() == '.'+FileType.JSON.lower(): backupfileformat = FileType.JSON except: pass fileformat = "" # binary format if backupfileformat.lower() == FileType.BIN.lower(): fileformat = "binary" backup_filename = MakeFilename(backupfile, FileType.BIN, configuration) if args.verbose: message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), typ=LogType.INFO) try: backupfp = open(backup_filename, "wb") magic = BINARYFILE_MAGIC backupfp.write(struct.pack('> sys.stderr, parser.format_values() print >> sys.stderr, "Settings:" for k in args.__dict__: print >> sys.stderr, " "+str(k), "= ",eval('args.{}'.format(k)) return args if __name__ == "__main__": args = ParseArgs() if args.shorthelp: ShortHelp() # check source args if args.device is not None and args.tasmotafile is not None: exit(ExitCode.ARGUMENT_ERROR, "Unable to select source, do not use -d and -f together",line=inspect.getlineno(inspect.currentframe())) # default no configuration available encode_cfg = None # pull config from Tasmota device if args.tasmotafile is not None: if args.verbose: message("Load data from file '{}'".format(args.tasmotafile), typ=LogType.INFO) encode_cfg = LoadTasmotaConfig(args.tasmotafile) # load config from Tasmota file if args.device is not None: if args.verbose: message("Load data from device '{}'".format(args.device), typ=LogType.INFO) encode_cfg = PullTasmotaConfig(args.device, args.port, username=args.username, password=args.password) if encode_cfg is None: # no config source given ShortHelp(False) print print parser.epilog sys.exit(ExitCode.OK) if len(encode_cfg) == 0: exit(ExitCode.FILE_READ_ERROR, "Unable to read configuration data from {} '{}'".format('device' if args.device is not None else 'file', \ args.device if args.device is not None else args.tasmotafile) \ ,line=inspect.getlineno(inspect.currentframe()) ) # decrypt Tasmota config decode_cfg = DecryptEncrypt(encode_cfg) # decode into mappings dictionary configuration = Bin2Mapping(decode_cfg, args.jsonrawvalues) if args.verbose and 'version' in configuration: if args.tasmotafile is not None: message("File '{}' contains data for Tasmota v{}".format(args.tasmotafile, GetVersionStr(configuration['version'])),typ=LogType.INFO) else: message("Device '{}' runs Tasmota v{}".format(args.device,GetVersionStr(configuration['version'])),typ=LogType.INFO) # backup to file if args.backupfile is not None: Backup(args.backupfile, args.backupfileformat, encode_cfg, decode_cfg, configuration) # restore from file if args.restorefile is not None: Restore(args.restorefile, encode_cfg, decode_cfg, configuration) # json screen output if args.backupfile is None and args.restorefile is None: print json.dumps(configuration, sort_keys=args.jsonsort, indent=None if args.jsonindent<0 else args.jsonindent, separators=(',', ':') if args.jsoncompact else (', ', ': ') ) sys.exit(exitcode)