Tasmota/tools/decode-config.py

2799 lines
133 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
VER = '2.1.0009'
"""
decode-config.py - Backup/Restore Sonoff-Tasmota configuration data
Copyright (C) 2018 Norbert Richter <nr@prsolution.eu>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Requirements:
- Python
- pip install json pycurl urllib2 configargparse
Instructions:
Execute command with option -d to retrieve config data from a host
or use -f to read a configuration file saved using Tasmota Web-UI
For further information read 'decode-config.md'
For help execute command with argument -h (or -H for advanced help)
Usage: decode-config.py [-f <filename>] [-d <host>] [-P <port>]
[-u <username>] [-p <password>] [-i <filename>]
[-o <filename>] [-t json|bin|dmp] [-E] [-e] [-F]
[--json-indent <indent>] [--json-compact]
[--json-hide-pw] [--json-show-pw]
[--cmnd-indent <indent>] [--cmnd-groups]
[--cmnd-nogroups] [--cmnd-sort] [--cmnd-unsort]
[-c <filename>] [-S] [-T json|cmnd|command]
[-g {Display,Domoticz,Internal,KNX,Led,Logging,MCP230xx,MQTT,Main,Management,Pow,Sensor,Serial,SetOption,SonoffRF,System,Timers,Wifi} [{Display,Domoticz,Internal,KNX,Led,Logging,MCP230xx,MQTT,Main,Management,Pow,Sensor,Serial,SetOption,SonoffRF,System,Timers,Wifi} ...]]
[--ignore-warnings] [-h] [-H] [-v] [-V]
Backup/Restore Sonoff-Tasmota configuration data. Args that start with '--'
(eg. -f) can also be set in a config file (specified via -c). Config file
syntax allows: key=value, flag=true, stuff=[a,b,c] (for details, see syntax at
https://goo.gl/R74nmi). If an arg is specified in more than one place, then
commandline values override config file values which override defaults.
Source:
Read/Write Tasmota configuration from/to
-f, --file, --tasmota-file <filename>
file to retrieve/write Tasmota configuration from/to
(default: None)'
-d, --device, --host <host>
hostname or IP address to retrieve/send Tasmota
configuration from/to (default: None)
-P, --port <port> TCP/IP port number to use for the host connection
(default: 80)
-u, --username <username>
host HTTP access username (default: admin)
-p, --password <password>
host HTTP access password (default: None)
Backup/Restore:
Backup & restore specification
-i, --restore-file <filename>
file to restore configuration from (default: None).
Replacements: @v=firmware version from config,
@f=device friendly name from config, @h=device
hostname from config, @H=device hostname from device
(-d arg only)
-o, --backup-file <filename>
file to backup configuration to (default: None).
Replacements: @v=firmware version from config,
@f=device friendly name from config, @h=device
hostname from config, @H=device hostname from device
(-d arg only)
-t, --backup-type json|bin|dmp
backup filetype (default: 'json')
-E, --extension append filetype extension for -i and -o filename
(default)
-e, --no-extension do not append filetype extension, use -i and -o
filename as passed
-F, --force-restore force restore even configuration is identical
JSON output:
JSON format specification
--json-indent <indent>
pretty-printed JSON output using indent level
(default: 'None'). -1 disables indent.
--json-compact compact JSON output by eliminate whitespace
--json-hide-pw hide passwords
--json-show-pw, --json-unhide-pw
unhide passwords (default)
Tasmota command output:
Tasmota command output format specification
--cmnd-indent <indent>
Tasmota command grouping indent level (default: '2').
0 disables indent
--cmnd-groups group Tasmota commands (default)
--cmnd-nogroups leave Tasmota commands ungrouped
--cmnd-sort sort Tasmota commands (default)
--cmnd-unsort leave Tasmota commands unsorted
Common:
Optional arguments
-c, --config <filename>
program config file - can be used to set default
command args (default: None)
-S, --output display output regardsless of backup/restore usage
(default do not output on backup or restore usage)
-T, --output-format json|cmnd|command
display output format (default: 'json')
-g, --group {Display,Domoticz,Internal,KNX,Led,Logging,MCP230xx,MQTT,Main,Management,Pow,Sensor,Serial,SetOption,SonoffRF,System,Timers,Wifi}
limit data processing to command groups (default no
filter)
--ignore-warnings do not exit on warnings. Not recommended, used by your
own responsibility!
Info:
Extra information
-h, --help show usage help message and exit
-H, --full-help show full help message and exit
-v, --verbose produce more output about what the program does
-V, --version show program's version number and exit
Either argument -d <host> or -f <filename> must be given.
Returns:
0: successful
1: restore skipped
2: program argument error
3: file not found
4: data size mismatch
5: data CRC error
6: unsupported configuration version
7: configuration file read error
8: JSON file decoding error
9: Restore file data error
10: Device data download error
11: Device data upload error
20: python module missing
21: Internal error
>21: python library exit code
4xx, 5xx: HTTP errors
"""
class ExitCode:
OK = 0
RESTORE_SKIPPED = 1
ARGUMENT_ERROR = 2
FILE_NOT_FOUND = 3
DATA_SIZE_MISMATCH = 4
DATA_CRC_ERROR = 5
UNSUPPORTED_VERSION = 6
FILE_READ_ERROR = 7
JSON_READ_ERROR = 8
RESTORE_DATA_ERROR = 9
DOWNLOAD_CONFIG_ERROR = 10
UPLOAD_CONFIG_ERROR = 11
MODULE_NOT_FOUND = 20
INTERNAL_ERROR = 21
# ======================================================================
# imports
# ======================================================================
import os.path
import io
import sys, platform
def ModuleImportError(module):
er = str(module)
print >> sys.stderr, "{}. Try 'pip install {}' to install it".format(er,er.split(' ')[len(er.split(' '))-1])
sys.exit(ExitCode.MODULE_NOT_FOUND)
try:
from datetime import datetime
import time
import copy
import struct
import socket
import re
import math
import inspect
import json
import configargparse
import pycurl
import urllib2
except ImportError, e:
ModuleImportError(e)
# ======================================================================
# globals
# ======================================================================
PROG='{} v{} by Norbert Richter <nr@prsolution.eu>'.format(os.path.basename(sys.argv[0]),VER)
CONFIG_FILE_XOR = 0x5A
BINARYFILE_MAGIC = 0x63576223
STR_ENCODING = 'utf8'
HIDDEN_PASSWORD = '********'
INTERNAL = 'Internal'
DEFAULTS = {
'source':
{
'device': None,
'port': 80,
'username': 'admin',
'password': None,
'tasmotafile': None,
},
'backup':
{
'restorefile': None,
'backupfile': None,
'backupfileformat': 'json',
'extension': True,
'forcerestore': False,
},
'jsonformat':
{
'jsonindent': None,
'jsoncompact': False,
'jsonsort': True,
'jsonhidepw': False,
},
'cmndformat':
{
'cmndindent': 2,
'cmndgroup': True,
'cmndsort': True,
},
'common':
{
'output': False,
'outputformat': 'json',
'configfile': None,
'ignorewarning':False,
'filter': None,
},
}
args = {}
exitcode = 0
# ======================================================================
# Settings mapping
# ======================================================================
"""
Settings dictionary describes the config file fields definition:
<setting> = { <name> : <def> }
<name>: "string"
a python valid dictionary key (string)
<def>: ( <format>, <addrdef>, <datadef> [,<converter>] )
a tuple containing the following items:
<format>: <formatstring> | <setting>
data type & format definition
<formatstring>: <string>
defines the use of data at <addrdef>
format is defined in 'struct module format string'
see
https://docs.python.org/2.7/library/struct.html#format-strings
<setting>: <setting>
A dictionary describes a (sub)setting dictonary
and can recursively define another <setting>
<addrdef>: <baseaddr> | (<baseaddr>, <bits>, <bitshift>)
address definition
<baseaddr>: <uint>
The address (starting from 0) within binary config data.
<bits>: <uint>
number of bits used (positive integer)
<bitshift>: <int>
bit shift <bitshift>:
<bitshift> >= 0: shift the result right
<bitshift> < 0: shift the result left
<datadef>: <arraydef> | (<arraydef>, <validate> [,cmd])
data definition
<arraydef>: None | <dim> | [<dim>] | [<dim> ,<dim>...]
None:
Single value, not an array
<dim>: <int>
[<dim>]
Defines a one-dimensional array of size <n>
[<dim> ,<dim>...]
Defines a one- or multi-dimensional array
<validate>: <function>
value validation function
<cmd>: (<group>, <tasmotacmnd>)
Tasmota command definition
<group>: <string>
command group string
<tasmotacmnd>: <function>
convert data into Tasmota command function
<converter>: <readconverter> | (<readconverter>, <writeconverter>)
read/write converter
<readconverter>: None | <function>
Will be used in Bin2Mapping to convert values read
from the binary data object into mapping dictionary
None
None indicates not read conversion
<function>
to convert value from binary object to JSON.
<writeconverter>: None | False | <function>
Will be used in Mapping2Bin to convert values read
from mapping dictionary before write to binary
data object
None
None indicates not write conversion
False
False indicates the value is readonly and will
not be written into the binary object.
<function>
to convert value from JSON back to binary object
Common definitions
<function>: <functionname> | <string> | None
function to be called or string to evaluate:
<functionname>:
A function name will be called with one or two parameter:
The value to be processed
(optional) the current array index (1,n)
<string>
A string will be evaluate as is. The following
placeholder can be used to replace it by runtime values:
'$':
will be replaced by the mapping name value
'#':
will be replace by array index (if any)
'@':
can be used as reference to other mapping values
see definition below for examples
<string>: 'string' | "string"
characters enclosed in ' or "
<int>: integer
numbers in the range -2147483648 through 2147483647
<uint>: unsigned integer
numbers in the range 0 through 4294967295
"""
# ----------------------------------------------------------------------
# Settings helper
# ----------------------------------------------------------------------
def passwordread(value):
return HIDDEN_PASSWORD if args.jsonhidepw else value
def passwordwrite(value):
return None if value == HIDDEN_PASSWORD else value
def bitsRead(x, n=0, c=1):
"""
Reads bit(s) of a number
@param x:
the number from which to read
@param n:
which bit position to read
@param c:
how many bits to read (1 if omitted)
@return:
the bit value(s)
"""
if isinstance(x,str):
x = int(x, 0)
if isinstance(x,str):
n = int(n, 0)
if n >= 0:
x >>= n
else:
x <<= abs(n)
if c>0:
x &= (1<<c)-1
return x
def MqttFingerprint(value, idx=None):
fingerprint = ""
for i in value:
fingerprint += "{:02x} ".format(ord(i))
return "MqttFingerprint{} {}".format('' if idx is None else idx, fingerprint.strip())
# ----------------------------------------------------------------------
# Tasmota configuration data definition
# ----------------------------------------------------------------------
Groups = ('Main','Sensor','Timers','Management','Wifi','MQTT','Serial','SetOption','Logging','Pow','Led','KNX','Domoticz','Display','MCP230xx')
Setting_5_10_0 = {
# <format>, <addrdef>, <datadef> [,<converter>]
'cfg_holder': ('<L', 0x000, (None, None, (INTERNAL, None)), '"0x{:08x}".format($)' ),
'save_flag': ('<L', 0x004, (None, None, ('System', None)), (None, False) ),
'version': ('<L', 0x008, (None, None, (INTERNAL, None)), ('hex($)', False) ),
'bootcount': ('<L', 0x00C, (None, None, ('System', None)), (None, False) ),
'flag': ({
'save_state': ('<L', (0x010,1, 0), (None, None, ('Management', '"SetOption0 {}".format($)')) ),
'button_restrict': ('<L', (0x010,1, 1), (None, None, ('Management', '"SetOption1 {}".format($)')) ),
'value_units': ('<L', (0x010,1, 2), (None, None, ('MQTT', '"SetOption2 {}".format($)')) ),
'mqtt_enabled': ('<L', (0x010,1, 3), (None, None, ('MQTT', '"SetOption3 {}".format($)')) ),
'mqtt_response': ('<L', (0x010,1, 4), (None, None, ('MQTT', '"SetOption4 {}".format($)')) ),
'mqtt_power_retain': ('<L', (0x010,1, 5), (None, None, ('Main', '"PowerRetain {}".format($)')) ),
'mqtt_button_retain': ('<L', (0x010,1, 6), (None, None, ('MQTT', '"ButtonRetain {}".format($)')) ),
'mqtt_switch_retain': ('<L', (0x010,1, 7), (None, None, ('MQTT', '"SwitchRetain {}".format($)')) ),
'temperature_conversion': ('<L', (0x010,1, 8), (None, None, ('Sensor', '"SetOption8 {}".format($)')) ),
'mqtt_sensor_retain': ('<L', (0x010,1, 9), (None, None, ('MQTT', '"SensorRetain {}".format($)')) ),
'mqtt_offline': ('<L', (0x010,1,10), (None, None, ('MQTT', '"SetOption10 {}".format($)')) ),
'button_swap': ('<L', (0x010,1,11), (None, None, ('Main', '"SetOption11 {}".format($)')) ),
'stop_flash_rotate': ('<L', (0x010,1,12), (None, None, ('Management', '"SetOption12 {}".format($)')) ),
'button_single': ('<L', (0x010,1,13), (None, None, ('Main', '"SetOption13 {}".format($)')) ),
'interlock': ('<L', (0x010,1,14), (None, None, ('Main', '"SetOption14 {}".format($)')) ),
'pwm_control': ('<L', (0x010,1,15), (None, None, ('Main', '"SetOption15 {}".format($)')) ),
'ws_clock_reverse': ('<L', (0x010,1,16), (None, None, ('SetOption', '"SetOption16 {}".format($)')) ),
'decimal_text': ('<L', (0x010,1,17), (None, None, ('SetOption', '"SetOption17 {}".format($)')) ),
}, 0x010, (None, None, ('*', None)), (None, False) ),
'save_data': ('<h', 0x014, (None, '0 <= $ <= 3600', ('Management', '"SaveData {}".format($)')) ),
'timezone': ('b', 0x016, (None, '-13 <= $ <= 13 or $==99', ('Management', '"Timezone {}".format($)')) ),
'ota_url': ('101s',0x017, (None, None, ('Main', '"OtaUrl {}".format($)')) ),
'mqtt_prefix': ('11s', 0x07C, ([3], None, ('MQTT', '"Prefix{} {}".format(#,$)')) ),
'seriallog_level': ('B', 0x09E, (None, '0 <= $ <= 5', ('Logging', '"SerialLog {}".format($)')) ),
'sta_config': ('B', 0x09F, (None, '0 <= $ <= 5', ('Wifi', '"WifiConfig {}".format($)')) ),
'sta_active': ('B', 0x0A0, (None, '0 <= $ <= 1', ('Wifi', '"AP {}".format($)')) ),
'sta_ssid': ('33s', 0x0A1, ([2], None, ('Wifi', '"SSId{} {}".format(#,$)')) ),
'sta_pwd': ('65s', 0x0E3, ([2], None, ('Wifi', '"Password{} {}".format(#,$)')), (passwordread,passwordwrite) ),
'hostname': ('33s', 0x165, (None, None, ('Wifi', '"Hostname {}".format($)')) ),
'syslog_host': ('33s', 0x186, (None, None, ('Logging', '"LogHost {}".format($)')) ),
'syslog_port': ('<H', 0x1A8, (None, '1 <= $ <= 32766', ('Logging', '"LogPort {}".format($)')) ),
'syslog_level': ('B', 0x1AA, (None, '0 <= $ <= 4', ('Logging', '"SysLog {}".format($)')) ),
'webserver': ('B', 0x1AB, (None, '0 <= $ <= 2', ('Wifi', '"WebServer {}".format($)')) ),
'weblog_level': ('B', 0x1AC, (None, '0 <= $ <= 4', ('Logging', '"WebLog {}".format($)')) ),
'mqtt_fingerprint': ('60s', 0x1AD, (None, None, ('MQTT', None)) ),
'mqtt_host': ('33s', 0x1E9, (None, None, ('MQTT', '"MqttHost {}".format($)')) ),
'mqtt_port': ('<H', 0x20A, (None, None, ('MQTT', '"MqttPort {}".format($)')) ),
'mqtt_client': ('33s', 0x20C, (None, None, ('MQTT', '"MqttClient {}".format($)')) ),
'mqtt_user': ('33s', 0x22D, (None, None, ('MQTT', '"MqttUser {}".format($)')) ),
'mqtt_pwd': ('33s', 0x24E, (None, None, ('MQTT', '"MqttPassword {}".format($)')), (passwordread,passwordwrite) ),
'mqtt_topic': ('33s', 0x26F, (None, None, ('MQTT', '"FullTopic {}".format($)')) ),
'button_topic': ('33s', 0x290, (None, None, ('MQTT', '"ButtonTopic {}".format($)')) ),
'mqtt_grptopic': ('33s', 0x2B1, (None, None, ('MQTT', '"GroupTopic {}".format($)')) ),
'mqtt_fingerprinth': ('B', 0x2D2, ([20], None, ('MQTT', None)) ),
'pwm_frequency': ('<H', 0x2E6, (None, '$==1 or 100 <= $ <= 4000', ('Management', '"PwmFrequency {}".format($)')) ),
'power': ({
'power1': ('<L', (0x2E8,1,0), (None, None, ('Main', '"Power1 {}".format($)')) ),
'power2': ('<L', (0x2E8,1,1), (None, None, ('Main', '"Power2 {}".format($)')) ),
'power3': ('<L', (0x2E8,1,2), (None, None, ('Main', '"Power3 {}".format($)')) ),
'power4': ('<L', (0x2E8,1,3), (None, None, ('Main', '"Power4 {}".format($)')) ),
'power5': ('<L', (0x2E8,1,4), (None, None, ('Main', '"Power5 {}".format($)')) ),
'power6': ('<L', (0x2E8,1,5), (None, None, ('Main', '"Power6 {}".format($)')) ),
'power7': ('<L', (0x2E8,1,6), (None, None, ('Main', '"Power7 {}".format($)')) ),
'power8': ('<L', (0x2E8,1,7), (None, None, ('Main', '"Power8 {}".format($)')) ),
}, 0x2E8, (None, None, ('Main', None)), (None, False) ),
'pwm_value': ('<H', 0x2EC, ([5], '0 <= $ <= 1023', ('Management', '"Pwm{} {}".format(#,$)')) ),
'altitude': ('<h', 0x2F6, (None, '-30000 <= $ <= 30000', ('Sensor', '"Altitude {}".format($)')) ),
'tele_period': ('<H', 0x2F8, (None, '0 <= $ <= 1 or 10 <= $ <= 3600',('MQTT', '"TelePeriod {}".format($)')) ),
'ledstate': ('B', 0x2FB, (None, '0 <= ($ & 0x7) <= 7', ('Main', '"LedState {}".format($)')) ),
'param': ('B', 0x2FC, ([23], None, ('SetOption', '"SetOption{} {}".format(#+31,$)')) ),
'state_text': ('11s', 0x313, ([4], None, ('MQTT', '"StateText{} {}".format(#,$)')) ),
'domoticz_update_timer': ('<H', 0x340, (None, '0 <= $ <= 3600', ('Domoticz', '"DomoticzUpdateTimer {}".format($)')) ),
'pwm_range': ('<H', 0x342, (None, '$==1 or 255 <= $ <= 1023', ('Management', '"PwmRange {}".format($)')) ),
'domoticz_relay_idx': ('<L', 0x344, ([4], None, ('Domoticz', '"DomoticzIdx{} {}".format(#,$)')) ),
'domoticz_key_idx': ('<L', 0x354, ([4], None, ('Domoticz', '"DomoticzKeyIdx{} {}".format(#,$)')) ),
'energy_power_calibration': ('<L', 0x364, (None, None, ('Pow', '"PowerSet {}".format($)')) ),
'energy_voltage_calibration': ('<L', 0x368, (None, None, ('Pow', '"VoltageSet {}".format($)')) ),
'energy_current_calibration': ('<L', 0x36C, (None, None, ('Pow', '"CurrentSet {}".format($)')) ),
'energy_kWhtoday': ('<L', 0x370, (None, '0 <= $ <= 42500', ('Pow', '"EnergyReset1 {}".format($)')) ),
'energy_kWhyesterday': ('<L', 0x374, (None, '0 <= $ <= 42500', ('Pow', '"EnergyReset2 {}".format($)')) ),
'energy_kWhdoy': ('<H', 0x378, (None, None, ('Pow', None)) ),
'energy_min_power': ('<H', 0x37A, (None, None, ('Pow', '"PowerLow {}".format($)')) ),
'energy_max_power': ('<H', 0x37C, (None, None, ('Pow', '"PowerHigh {}".format($)')) ),
'energy_min_voltage': ('<H', 0x37E, (None, None, ('Pow', '"VoltageLow {}".format($)')) ),
'energy_max_voltage': ('<H', 0x380, (None, None, ('Pow', '"VoltageHigh {}".format($)')) ),
'energy_min_current': ('<H', 0x382, (None, None, ('Pow', '"CurrentLow {}".format($)')) ),
'energy_max_current': ('<H', 0x384, (None, None, ('Pow', '"CurrentHigh {}".format($)')) ),
'energy_max_power_limit': ('<H', 0x386, (None, None, ('Pow', '"MaxPower {}".format($)')) ),
'energy_max_power_limit_hold': ('<H', 0x388, (None, None, ('Pow', '"MaxPowerHold {}".format($)')) ),
'energy_max_power_limit_window':('<H', 0x38A, (None, None, ('Pow', '"MaxPowerWindow {}".format($)')) ),
'energy_max_power_safe_limit': ('<H', 0x38C, (None, None, ('Pow', '"SavePower {}".format($)')) ),
'energy_max_power_safe_limit_hold':
('<H', 0x38E, (None, None, ('Pow', '"SavePowerHold {}".format($)')) ),
'energy_max_power_safe_limit_window':
('<H', 0x390, (None, None, ('Pow', '"SavePowerWindow {}".format($)')) ),
'energy_max_energy': ('<H', 0x392, (None, None, ('Pow', '"MaxEnergy {}".format($)')) ),
'energy_max_energy_start': ('<H', 0x394, (None, None, ('Pow', '"MaxEnergyStart {}".format($)')) ),
'mqtt_retry': ('<H', 0x396, (None, '10 <= $ <= 32000', ('MQTT', '"MqttRetry {}".format($)')) ),
'poweronstate': ('B', 0x398, (None, '0 <= $ <= 5', ('Main', '"PowerOnState {}".format($)')) ),
'last_module': ('B', 0x399, (None, None, ('System', None)) ),
'blinktime': ('<H', 0x39A, (None, '2 <= $ <= 3600', ('Main', '"BlinkTime {}".format($)')) ),
'blinkcount': ('<H', 0x39C, (None, '0 <= $ <= 32000', ('Main', '"BlinkCount {}".format($)')) ),
'friendlyname': ('33s', 0x3AC, ([4], None, ('Management', '"FriendlyName{} {}".format(#,$)')) ),
'switch_topic': ('33s', 0x430, (None, None, ('MQTT', '"SwitchTopic {}".format($)')) ),
'sleep': ('B', 0x453, (None, '0 <= $ <= 250', ('Management', '"Sleep {}".format($)')) ),
'domoticz_switch_idx': ('<H', 0x454, ([4], None, ('Domoticz', '"DomoticzSwitchIdx{} {}".format(#,$)')) ),
'domoticz_sensor_idx': ('<H', 0x45C, ([12], None, ('Domoticz', '"DomoticzSensorIdx{} {}".format(#,$)')) ),
'module': ('B', 0x474, (None, None, ('Management', '"Module {}".format($)')) ),
'ws_color': ('B', 0x475, ([4,3],None, ('Led', None)) ),
'ws_width': ('B', 0x481, ([3], None, ('Led', None)) ),
'my_gp': ('B', 0x484, ([18], None, ('Management', '"Gpio{} {}".format(#,$)')) ),
'light_pixels': ('<H', 0x496, (None, '1 <= $ <= 512', ('Led', '"Pxels {}".format($)')) ),
'light_color': ('B', 0x498, ([5], None, ('Led', None)) ),
'light_correction': ('B', 0x49D , (None, '0 <= $ <= 1', ('Led', '"LedTable {}".format($)')) ),
'light_dimmer': ('B', 0x49E, (None, '0 <= $ <= 100', ('Led', '"Wakeup {}".format($)')) ),
'light_fade': ('B', 0x4A1, (None, '0 <= $ <= 1', ('Led', '"Fade {}".format($)')) ),
'light_speed': ('B', 0x4A2, (None, '1 <= $ <= 20', ('Led', '"Speed {}".format($)')) ),
'light_scheme': ('B', 0x4A3, (None, None, ('Led', '"Scheme {}".format($)')) ),
'light_width': ('B', 0x4A4, (None, '0 <= $ <= 4', ('Led', '"Width {}".format($)')) ),
'light_wakeup': ('<H', 0x4A6, (None, '0 <= $ <= 3100', ('Led', '"WakeUpDuration {}".format($)')) ),
'web_password': ('33s', 0x4A9, (None, None, ('Wifi', '"WebPassword {}".format($)')), (passwordread,passwordwrite) ),
'switchmode': ('B', 0x4CA, ([4], '0 <= $ <= 7', ('Main', '"SwitchMode{} {}".format(#,$)')) ),
'ntp_server': ('33s', 0x4CE, ([3], None, ('Wifi', '"NtpServer{} {}".format(#,$)')) ),
'ina219_mode': ('B', 0x531, (None, '0 <= $ <= 7', ('Sensor', '"Sensor13 {}".format($)')) ),
'pulse_timer': ('<H', 0x532, ([8], '0 <= $ <= 64900', ('Main', '"PulseTime{} {}".format(#,$)')), ("float($)/10 if 1 <= $ <= 111 else $-100 if $ != 0 else 0", "int($*10) if 0.1 <= $ < 12 else $+100 if $ != 0 else 0") ),
'ip_address': ('<L', 0x544, ([4], None, ('Wifi', '"IPAddress{} {}".format(#,$)')), ("socket.inet_ntoa(struct.pack('<L', $))", "struct.unpack('<L', socket.inet_aton($))[0]")),
'energy_kWhtotal': ('<L', 0x554, (None, None, ('Pow', None)) ),
'mqtt_fulltopic': ('100s',0x558, (None, None, ('MQTT', '"FullTopic {}".format($)')) ),
'flag2': ({
'current_resolution': ('<L', (0x5BC,2,15), (None, '0 <= $ <= 3', ('Pow', '"AmpRes {}".format($)')) ),
'voltage_resolution': ('<L', (0x5BC,2,17), (None, '0 <= $ <= 3', ('Pow', '"VoltRes {}".format($)')) ),
'wattage_resolution': ('<L', (0x5BC,2,19), (None, '0 <= $ <= 3', ('Pow', '"WattRes {}".format($)')) ),
'emulation': ('<L', (0x5BC,2,21), (None, '0 <= $ <= 2', ('Management', '"Emulation {}".format($)')) ),
'energy_resolution': ('<L', (0x5BC,3,23), (None, '0 <= $ <= 5', ('Pow', '"EnergyRes {}".format($)')) ),
'pressure_resolution': ('<L', (0x5BC,2,26), (None, '0 <= $ <= 3', ('Sensor', '"PressRes {}".format($)')) ),
'humidity_resolution': ('<L', (0x5BC,2,28), (None, '0 <= $ <= 3', ('Sensor', '"HumRes {}".format($)')) ),
'temperature_resolution': ('<L', (0x5BC,2,30), (None, '0 <= $ <= 3', ('Sensor', '"TempRes {}".format($)')) ),
}, 0x5BC, (None, None, ('*', None)), (None, False) ),
'pulse_counter': ('<L', 0x5C0, ([4], None, ('Sensor', '"Counter{} {}".format(#,$)')) ),
'pulse_counter_type': ('<H', 0x5D0, (None, None, ('Sensor', '"CounterType {}".format($)')) ),
'pulse_counter_type': ({
'pulse_counter_type1': ('<H', (0x5D0,1,0), (None, None, ('Sensor', '"CounterType1 {}".format($)')) ),
'pulse_counter_type2': ('<H', (0x5D0,1,1), (None, None, ('Sensor', '"CounterType2 {}".format($)')) ),
'pulse_counter_type3': ('<H', (0x5D0,1,2), (None, None, ('Sensor', '"CounterType3 {}".format($)')) ),
'pulse_counter_type4': ('<H', (0x5D0,1,3), (None, None, ('Sensor', '"CounterType4 {}".format($)')) ),
}, 0x5D0, (None, None, ('Sensor', None)), (None, False) ),
'pulse_counter_debounce': ('<H', 0x5D2, (None, '0 <= $ <= 3200', ('Sensor', '"CounterDebounce {}".format($)')) ),
'rf_code': ('B', 0x5D4, ([17,9],None, ('SonoffRF', None)), '"0x{:02x}".format($)'),
}
# ======================================================================
Setting_5_11_0 = copy.deepcopy(Setting_5_10_0)
Setting_5_11_0.update ({
'display_model': ('B', 0x2D2, (None, '0 <= $ <= 16', ('Display', '"Model {}".format($)')) ),
'display_mode': ('B', 0x2D3, (None, '0 <= $ <= 5', ('Display', '"Mode {}".format($)')) ),
'display_refresh': ('B', 0x2D4, (None, '1 <= $ <= 7', ('Display', '"Refresh {}".format($)')) ),
'display_rows': ('B', 0x2D5, (None, '1 <= $ <= 32', ('Display', '"Rows {}".format($)')) ),
'display_cols': ('B', 0x2D6, ([2], '1 <= $ <= 40', ('Display', '"Cols{} {}".format(#,$)')) ),
'display_address': ('B', 0x2D8, ([8], None, ('Display', '"Address{} {}".format(#,$)')) ),
'display_dimmer': ('B', 0x2E0, (None, '0 <= $ <= 100', ('Display', '"Dimmer {}".format($)')) ),
'display_size': ('B', 0x2E1, (None, '1 <= $ <= 4', ('Display', '"Size {}".format($)')) ),
})
Setting_5_11_0['flag'][0].update ({
'light_signal': ('<L', (0x010,1,18), (None, None, ('Sensor', '"SetOption18 {}".format($)')) ),
})
Setting_5_11_0.pop('mqtt_fingerprinth',None)
# ======================================================================
Setting_5_12_0 = copy.deepcopy(Setting_5_11_0)
Setting_5_12_0['flag'][0].update ({
'hass_discovery': ('<L', (0x010,1,19), (None, None, ('SetOption', '"SetOption19 {}".format($)')) ),
'not_power_linked': ('<L', (0x010,1,20), (None, None, ('Led', '"SetOption20 {}".format($)')) ),
'no_power_on_check': ('<L', (0x010,1,21), (None, None, ('Pow', '"SetOption21 {}".format($)')) ),
})
# ======================================================================
Setting_5_13_1 = copy.deepcopy(Setting_5_12_0)
Setting_5_13_1['flag'][0].update ({
'mqtt_serial': ('<L', (0x010,1,22), (None, None, ('SetOption', '"SetOption22 {}".format($)')) ),
'rules_enabled': ('<L', (0x010,1,23), (None, None, ('SetOption', '"SetOption23 {}".format($)')) ),
'rules_once': ('<L', (0x010,1,24), (None, None, ('SetOption', '"SetOption24 {}".format($)')) ),
'knx_enabled': ('<L', (0x010,1,25), (None, None, ('KNX', '"KNX_ENABLED {}".format($)')) ),
})
Setting_5_13_1.update ({
'baudrate': ('B', 0x09D, (None, None, ('Serial', '"Baudrate {}".format($)')), ('$ * 1200','$ / 1200') ),
'mqtt_fingerprint': ('20s', 0x1AD, ([2], None, ('MQTT', MqttFingerprint)) ),
'energy_power_delta': ('B', 0x33F, (None, None, ('Pow', '"PowerDelta {}".format($)')) ),
'light_rotation': ('<H', 0x39E, (None, None, ('Led', '"Rotation {}".format($)')) ),
'serial_delimiter': ('B', 0x451, (None, None, ('Serial', '"SerialDelimiter {}".format($)')) ),
'sbaudrate': ('B', 0x452, (None, None, ('Serial', '"SBaudrate {}".format($)')), ('$ * 1200','$ / 1200') ),
'knx_GA_registered': ('B', 0x4A5, (None, None, ('KNX', None)) ),
'knx_CB_registered': ('B', 0x4A8, (None, None, ('KNX', None)) ),
'timer': ({
'value': ('<L', 0x670, (None, None, ('Timers', '"Timer{} {{\\\"Arm\\\":{arm},\\\"Mode\\\":{mode},\\\"Time\\\":\\\"{tsign}{time}\\\",\\\"Window\\\":{window},\\\"Days\\\":\\\"{days}\\\",\\\"Repeat\\\":{repeat},\\\"Output\\\":{device},\\\"Action\\\":{power}}}".format(#, arm=bitsRead($,31),mode=bitsRead($,29,2),tsign="-" if bitsRead($,29,2)>0 and bitsRead($,0,11)>(12*60) else "",time=time.strftime("%H:%M",time.gmtime((bitsRead($,0,11) if bitsRead($,29,2)==0 else bitsRead($,0,11) if bitsRead($,0,11)<=(12*60) else bitsRead($,0,11)-(12*60))*60)),window=bitsRead($,11,4),repeat=bitsRead($,15),days="{:07b}".format(bitsRead($,16,7))[::-1],device=bitsRead($,23,4)+1,power=bitsRead($,27,2) )')), ('"0x{:08x}".format($)', False) ),
'time': ('<L', (0x670,11, 0),(None, '0 <= $ < 1440', ('Timers', None)) ),
'window': ('<L', (0x670, 4,11),(None, None, ('Timers', None)) ),
'repeat': ('<L', (0x670, 1,15),(None, None, ('Timers', None)) ),
'days': ('<L', (0x670, 7,16),(None, None, ('Timers', None)), '"0b{:07b}".format($)' ),
'device': ('<L', (0x670, 4,23),(None, None, ('Timers', None)) ),
'power': ('<L', (0x670, 2,27),(None, None, ('Timers', None)) ),
'mode': ('<L', (0x670, 2,29),(None, '0 <= $ <= 3', ('Timers', None)) ),
'arm': ('<L', (0x670, 1,31),(None, None, ('Timers', None)) ),
}, 0x670, ([16], None, ('Timers', None)) ),
'latitude': ('i', 0x6B0, (None, None, ('Timers', '"Latitude {}".format($)')), ('float($) / 1000000', 'int($ * 1000000)')),
'longitude': ('i', 0x6B4, (None, None, ('Timers', '"Longitude {}".format($)')), ('float($) / 1000000', 'int($ * 1000000)')),
'knx_physsical_addr': ('<H', 0x6B8, (None, None, ('KNX', None)) ),
'knx_GA_addr': ('<H', 0x6BA, ([10], None, ('KNX', None)) ),
'knx_CB_addr': ('<H', 0x6CE, ([10], None, ('KNX', None)) ),
'knx_GA_param': ('B', 0x6E2, ([10], None, ('KNX', None)) ),
'knx_CB_param': ('B', 0x6EC, ([10], None, ('KNX', None)) ),
'rules': ('512s',0x800, (None, None, ('Management', '"Rule {}".format("\\"" if len($)==0 else $)')) ),
})
# ======================================================================
Setting_5_14_0 = copy.deepcopy(Setting_5_13_1)
Setting_5_14_0['flag'][0].update ({
'device_index_enable': ('<L', (0x010,1,26), (None, None, ('Main', '"SetOption26 {}".format($)')) ),
})
Setting_5_14_0['flag'][0].pop('rules_once',None)
Setting_5_14_0.update ({
'tflag': ({
'hemis': ('<H', (0x2E2,1, 0), (None, None, ('Management', None)) ),
'week': ('<H', (0x2E2,3, 1), (None, '0 <= $ <= 4', ('Management', None)) ),
'month': ('<H', (0x2E2,4, 4), (None, '1 <= $ <= 12', ('Management', None)) ),
'dow': ('<H', (0x2E2,3, 8), (None, '1 <= $ <= 7', ('Management', None)) ),
'hour': ('<H', (0x2E2,5,11), (None, '0 <= $ <= 23', ('Management', None)) ),
}, 0x2E2, ([2], None, ('Management', None)), (None, False) ),
'param': ('B', 0x2FC, ([18], None, ('SetOption', '"SetOption{} {}".format(#+31,$)')) ),
'toffset': ('<h', 0x30E, ([2], None, ('Management', '"{cmnd} {hemis},{week},{month},{dow},{hour},{toffset}".format(cmnd="TimeSTD" if idx==1 else "TimeDST", hemis=@["tflag"][#-1]["hemis"], week=@["tflag"][#-1]["week"], month=@["tflag"][#-1]["month"], dow=@["tflag"][#-1]["dow"], hour=@["tflag"][#-1]["hour"], toffset=value)')) ),
})
# ======================================================================
Setting_6_0_0 = copy.deepcopy(Setting_5_14_0)
Setting_6_0_0.update({
'cfg_holder': ('<H', 0x000, (None, None, ('System', None)), ),
'cfg_size': ('<H', 0x002, (None, None, (INTERNAL, None)), (None, False)),
'bootcount': ('<H', 0x00C, (None, None, ('System', None)), (None, False)),
'cfg_crc': ('<H', 0x00E, (None, None, (INTERNAL, None)), '"0x{:04x}".format($)'),
'rule_enabled': ({
'rule1': ('B', (0x49F,1,0), (None, None, ('Management', '"Rule1 {}".format($)')) ),
'rule2': ('B', (0x49F,1,1), (None, None, ('Management', '"Rule2 {}".format($)')) ),
'rule3': ('B', (0x49F,1,2), (None, None, ('Management', '"Rule3 {}".format($)')) ),
}, 0x49F, (None, None, ('Management', None)), (None, False) ),
'rule_once': ({
'rule1': ('B', (0x4A0,1,0), (None, None, ('Management', '"Rule1 {}".format($+4)')) ),
'rule2': ('B', (0x4A0,1,1), (None, None, ('Management', '"Rule2 {}".format($+4)')) ),
'rule3': ('B', (0x4A0,1,2), (None, None, ('Management', '"Rule3 {}".format($+4)')) ),
}, 0x4A0, (None, None, ('Management', None)), (None, False) ),
'mems': ('10s', 0x7CE, ([5], None, ('Management', '"Mem{} {}".format(#,"\\"" if len($)==0 else $)')) ),
'rules': ('512s',0x800, ([3], None, ('Management', '"Rule{} {}".format(#,"\\"" if len($)==0 else $)')) ),
})
Setting_6_0_0['flag'][0].update ({
'knx_enable_enhancement': ('<L', (0x010,1,27), (None, None, ('KNX', '"KNX_ENHANCED {}".format($)')) ),
})
# ======================================================================
Setting_6_1_1 = copy.deepcopy(Setting_6_0_0)
Setting_6_1_1.update ({
'flag3': ('<L', 0x3A0, (None, None, ('System', None)), '"0x{:08x}".format($)' ),
'switchmode': ('B', 0x3A4, ([8], '0 <= $ <= 7', ('Main', '"SwitchMode{} {}".format(#,$)')) ),
'mcp230xx_config': ({
'value': ('<L', 0x6F6, (None, None, ('MCP230xx', '"Sensor29 {pin},{pinmode},{pullup},{intmode}".format(pin=#-1, pinmode=@["mcp230xx_config"][#-1]["pinmode"], pullup=@["mcp230xx_config"][#-1]["pullup"], intmode=@["mcp230xx_config"][#-1]["int_report_mode"])')), ('"0x{:08x}".format($)', False) ),
'pinmode': ('<L', (0x6F6,3, 0), (None, '0 <= $ <= 5', ('MCP230xx', None)) ),
'pullup': ('<L', (0x6F6,1, 3), (None, None, ('MCP230xx', None)) ),
'saved_state': ('<L', (0x6F6,1, 4), (None, None, ('MCP230xx', None)) ),
'int_report_mode': ('<L', (0x6F6,2, 5), (None, None, ('MCP230xx', None)) ),
'int_report_defer': ('<L', (0x6F6,4, 7), (None, None, ('MCP230xx', None)) ),
'int_count_en': ('<L', (0x6F6,1,11), (None, None, ('MCP230xx', None)) ),
}, 0x6F6, ([16], None, ('MCP230xx', None)), (None, False) ),
})
Setting_6_1_1['flag'][0].update ({
'rf_receive_decimal': ('<L', (0x010,1,28), (None, None, ('SetOption' , '"SetOption28 {}".format($)')) ),
'ir_receive_decimal': ('<L', (0x010,1,29), (None, None, ('SetOption', '"SetOption29 {}".format($)')) ),
'hass_light': ('<L', (0x010,1,30), (None, None, ('SetOption', '"SetOption30 {}".format($)')) ),
})
# ======================================================================
Setting_6_2_1 = copy.deepcopy(Setting_6_1_1)
Setting_6_2_1.update ({
'rule_stop': ({
'rule1': ('B', (0x1A7,1,0), (None, None, ('Management', '"Rule1 {}".format($+8)')) ),
'rule2': ('B', (0x1A7,1,1), (None, None, ('Management', '"Rule2 {}".format($+8)')) ),
'rule3': ('B', (0x1A7,1,2), (None, None, ('Management', '"Rule3 {}".format($+8)')) ),
}, 0x1A7, None),
'display_rotate': ('B', 0x2FA, (None, '0 <= $ <= 3', ('Display', '"Rotate {}".format($)')) ),
'display_font': ('B', 0x312, (None, '1 <= $ <= 4', ('Display', '"Font {}".format($)')) ),
'flag3': ({
'timers_enable': ('<L', (0x3A0,1, 0), (None, None, ('Timers', '"Timers {}".format($)')) ),
'user_esp8285_enable': ('<L', (0x3A0,1,31), (None, None, ('System', None)) ),
}, 0x3A0, (None, None, ('*', None)), (None, False) ),
'button_debounce': ('<H', 0x542, (None, '40 <= $ <= 1000', ('Main', '"ButtonDebounce {}".format($)')) ),
'switch_debounce': ('<H', 0x66E, (None, '40 <= $ <= 1000', ('Main', '"SwitchDebounce {}".format($)')) ),
'mcp230xx_int_prio': ('B', 0x716, (None, None, ('MCP230xx', None)) ),
'mcp230xx_int_timer': ('<H', 0x718, (None, None, ('MCP230xx', None)) ),
})
Setting_6_2_1['flag'][0].pop('rules_enabled',None)
Setting_6_2_1['flag'][0].update ({
'mqtt_serial_raw': ('<L', (0x010,1,23), (None, None, ('SetOption', '"SetOption23 {}".format($)')) ),
'global_state': ('<L', (0x010,1,31), (None, None, ('SetOption', '"SetOption31 {}".format($)')) ),
})
Setting_6_2_1['flag2'][0].update ({
# currently unsupported Tasmota command, should be Sensor32, still needs to implement
'axis_resolution': ('<L', (0x5BC,2,13), (None, None, ('System', None)) ),
})
# ======================================================================
Setting_6_2_1_2 = copy.deepcopy(Setting_6_2_1)
Setting_6_2_1_2['flag3'][0].update ({
'user_esp8285_enable': ('<L', (0x3A0,1, 1), (None, None, ('SetOption', '"SetOption51 {}".format($)')) ),
})
# ======================================================================
Setting_6_2_1_3 = copy.deepcopy(Setting_6_2_1_2)
Setting_6_2_1_3['flag2'][0].update ({
'frequency_resolution': ('<L', (0x5BC,2,11), (None, '0 <= $ <= 3', ('Pow', '"FreqRes {}".format($)')) ),
})
Setting_6_2_1_3['flag3'][0].update ({
'time_append_timezone': ('<L', (0x3A0,1, 2), (None, None, ('SetOption', '"SetOption52 {}".format($)')) ),
})
# ======================================================================
Setting_6_2_1_6 = copy.deepcopy(Setting_6_2_1_3)
Setting_6_2_1_6.update({
'energy_frequency_calibration': ('<L', 0x7C8, (None, '45000 < $ < 65000', ('Pow', '"FrequencySet {}".format($)')) ),
})
# ======================================================================
Setting_6_2_1_10 = copy.deepcopy(Setting_6_2_1_6)
Setting_6_2_1_10.update({
'rgbwwTable': ('B', 0x71A, ([5], None, ('System', None)) ), # RGBWWTable 255,135,70,255,255
})
# ======================================================================
Setting_6_2_1_14 = copy.deepcopy(Setting_6_2_1_10)
Setting_6_2_1_14.update({
'weight_item': ('<H', 0x7BC, (None, None, ('Management', '"Sensor34 6 {}".format($)')), ('int($ * 10)', 'float($) / 10') ), # undocumented
'weight_max': ('<H', 0x7BE, (None, None, ('Management', '"Sensor34 5 {}".format($)')), ('float($) / 1000', 'int($ * 1000)') ), # undocumented
'weight_reference': ('<L', 0x7C0, (None, None, ('Management', '"Sensor34 3 {}".format($)')) ), # undocumented
'weight_calibration': ('<L', 0x7C4, (None, None, ('Management', '"Sensor34 4 {}".format($)')) ), # undocumented
'web_refresh': ('<H', 0x7CC, (None, '1000 <= $ <= 10000', ('Management', '"WebRefresh {}".format($)')) ), # undocumented
})
Setting_6_2_1_14['flag2'][0].update ({
'weight_resolution': ('<L', (0x5BC,2, 9), (None, '0 <= $ <= 3', ('Management', '"WeightRes {}".format($)')) ), # undocumented
})
# ======================================================================
Setting_6_2_1_19 = copy.deepcopy(Setting_6_2_1_14)
Setting_6_2_1_19.update({
'weight_item': ('<L', 0x7B8, (None, None, ('Management', '"Sensor34 6 {}".format($)')), ('int($ * 10)', 'float($) / 10') ), # undocumented
})
Setting_6_2_1_20 = Setting_6_2_1_19
Setting_6_2_1_20['flag3'][0].update ({
'gui_hostname_ip': ('<L', (0x3A0,1,3), (None, None, ('SetOption', '"SetOption53 {}".format($)')) ),
})
# ======================================================================
Setting_6_3_0 = copy.deepcopy(Setting_6_2_1_20)
Setting_6_3_0.update({
'energy_kWhtotal_time': ('<L', 0x7B4, (None, None, ('System', None)) ),
})
# ======================================================================
Setting_6_3_0_2 = copy.deepcopy(Setting_6_3_0)
Setting_6_3_0_2.update({
'timezone_minutes': ('B', 0x66D, (None, None, ('System', None)) ),
})
Setting_6_3_0_2['flag'][0].pop('rules_once',None)
Setting_6_3_0_2['flag'][0].update ({
'pressure_conversion': ('<L', (0x010,1,24), (None, None, ('SetOption', '"SetOption24 {}".format($)')) ),
})
# ======================================================================
Setting_6_3_0_4 = copy.deepcopy(Setting_6_3_0_2)
Setting_6_3_0_4.update({
'drivers': ('<L', 0x794, ([3], None, ('System', None)), '"0x{:08x}".format($)' ),
'monitors': ('<L', 0x7A0, (None, None, ('System', None)), '"0x{:08x}".format($)' ),
'sensors': ('<L', 0x7A4, ([3], None, ('System', None)), '"0x{:08x}".format($)' ),
'displays': ('<L', 0x7B0, (None, None, ('System', None)), '"0x{:08x}".format($)' ),
})
Setting_6_3_0_4['flag3'][0].update ({
'tuya_apply_o20': ('<L', (0x3A0,1, 4), (None, None, ('SetOption', '"SetOption54 {}".format($)')) ),
})
# ======================================================================
Setting_6_3_0_8 = copy.deepcopy(Setting_6_3_0_4)
Setting_6_3_0_8['flag3'][0].update ({
'hass_short_discovery_msg': ('<L', (0x3A0,1, 5), (None, None, ('SetOption', '"SetOption55 {}".format($)')) ),
})
# ======================================================================
Setting_6_3_0_10 = copy.deepcopy(Setting_6_3_0_8)
Setting_6_3_0_10['flag3'][0].update ({
'use_wifi_scan': ('<L', (0x3A0,1, 6), (None, None, ('SetOption', '"SetOption56 {}".format($)')) ),
'use_wifi_rescan': ('<L', (0x3A0,1, 7), (None, None, ('SetOption', '"SetOption57 {}".format($)')) ),
})
# ======================================================================
Settings = [
(0x603000A, 0xe00, Setting_6_3_0_10),
(0x6030008, 0xe00, Setting_6_3_0_8),
(0x6030004, 0xe00, Setting_6_3_0_4),
(0x6030002, 0xe00, Setting_6_3_0_2),
(0x6030000, 0xe00, Setting_6_3_0),
(0x6020114, 0xe00, Setting_6_2_1_20),
(0x6020113, 0xe00, Setting_6_2_1_19),
(0x602010E, 0xe00, Setting_6_2_1_14),
(0x602010A, 0xe00, Setting_6_2_1_10),
(0x6020106, 0xe00, Setting_6_2_1_6),
(0x6020103, 0xe00, Setting_6_2_1_3),
(0x6020102, 0xe00, Setting_6_2_1_2),
(0x6020100, 0xe00, Setting_6_2_1),
(0x6010100, 0xe00, Setting_6_1_1),
(0x6000000, 0xe00, Setting_6_0_0),
(0x50e0000, 0xa00, Setting_5_14_0),
(0x50d0100, 0xa00, Setting_5_13_1),
(0x50c0000, 0x670, Setting_5_12_0),
(0x50b0000, 0x670, Setting_5_11_0),
(0x50a0000, 0x670, Setting_5_10_0),
]
# ======================================================================
# Common helper
# ======================================================================
class LogType:
INFO = 'INFO'
WARNING = 'WARNING'
ERROR = 'ERROR'
def message(msg, typ=None, status=None, line=None):
"""
Writes a message to stdout
@param msg:
message to output
@param typ:
INFO, WARNING or ERROR
@param status:
status number
"""
print >> sys.stderr, '{styp}{sdelimiter}{sstatus}{slineno}{scolon}{smgs}'.format(\
styp=typ if typ is not None else '',
sdelimiter=' ' if status is not None and status > 0 and typ is not None else '',
sstatus=status if status is not None and status > 0 else '',
scolon=': ' if typ is not None or line is not None else '',
smgs=msg,
slineno=' (@{:04d})'.format(line) if line is not None else '')
def exit(status=0, msg="end", typ=LogType.ERROR, src=None, doexit=True, line=None):
"""
Called when the program should be exit
@param status:
the exit status program returns to callert
@param msg:
the msg logged before exit
@param typ:
msg type: 'INFO', 'WARNING' or 'ERROR'
@param doexit:
True to exit program, otherwise return
"""
if src is not None:
msg = '{} ({})'.format(src, msg)
message(msg, typ=typ if status!=ExitCode.OK else LogType.INFO, status=status, line=line)
exitcode = status
if doexit:
sys.exit(exitcode)
def ShortHelp(doexit=True):
"""
Show short help (usage) only - ued by own -h handling
@param doexit:
sys.exit with OK if True
"""
print parser.description
print
parser.print_usage()
print
print "For advanced help use '{prog} -H' or '{prog} --full-help'".format(prog=os.path.basename(sys.argv[0]))
if doexit:
sys.exit(ExitCode.OK)
class HTTPHeader:
"""
pycurl helper class retrieving the request header
"""
def __init__(self):
self.contents = ''
def clear(self):
self.contents = ''
def store(self, _buffer):
self.contents = "{}{}".format(self.contents, _buffer)
def response(self):
header = str(self.contents).split('\n')
if len(header) > 0:
return header[0].rstrip()
return ''
def contenttype(self):
for item in str(self.contents).split('\n'):
ditem = item.split(":")
if ditem[0].strip().lower() == 'content-type' and len(ditem) > 1:
return ditem[1].strip()
return ''
def __str__(self):
return self.contents
class CustomHelpFormatter(configargparse.HelpFormatter):
"""
Class for customizing the help output
"""
def _format_action_invocation(self, action):
"""
Reformat multiple metavar output
-d <host>, --device <host>, --host <host>
to single output
-d, --device, --host <host>
"""
orgstr = configargparse.HelpFormatter._format_action_invocation(self, action)
if orgstr and orgstr[0] != '-': # only optional arguments
return orgstr
res = getattr(action, '_formatted_action_invocation', None)
if res:
return res
options = orgstr.split(', ')
if len(options) <= 1:
action._formatted_action_invocation = orgstr
return orgstr
return_list = []
for option in options:
meta = ""
arg = option.split(' ')
if len(arg) > 1:
meta = arg[1]
return_list.append(arg[0])
if len(meta) > 0 and len(return_list) > 0:
return_list[len(return_list)-1] += " "+meta
action._formatted_action_invocation = ', '.join(return_list)
return action._formatted_action_invocation
# ======================================================================
# Tasmota config data handling
# ======================================================================
def GetTemplateSizes():
"""
Get all possible template sizes as list
@param version:
<int> version number from read binary data to search for
@return:
template sizes as list []
"""
sizes = []
for cfg in Settings:
sizes.append(cfg[1])
# return unique sizes only (remove duplicates)
return list(set(sizes))
def GetTemplateSetting(decode_cfg):
"""
Search for version, size and settings to be used depending on given binary config data
@param decode_cfg:
binary config data (decrypted)
@return:
version, size, settings to use; None if version is invalid
"""
version = 0x0
size = setting = None
version = GetField(decode_cfg, 'version', Setting_6_2_1['version'], raw=True)
# search setting definition top-down
for cfg in sorted(Settings, key=lambda s: s[0], reverse=True):
if version >= cfg[0]:
size = cfg[1]
setting = cfg[2]
break
return version, size, setting
def GetGroupList(setting):
"""
Get all avilable group definition from setting
@return:
configargparse.parse_args() result
"""
groups = set()
for name in setting:
dev = setting[name]
format, group = GetFieldDef(dev, fields="format, group")
if group is not None and len(group) > 0:
groups.add(group)
if isinstance(format, dict):
subgroups = GetGroupList(format)
if subgroups is not None and len(subgroups) > 0:
for group in subgroups:
groups.add(group)
groups=list(groups)
groups.sort()
return groups
class FileType:
FILE_NOT_FOUND = None
DMP = 'dmp'
JSON = 'json'
BIN = 'bin'
UNKNOWN = 'unknown'
INCOMPLETE_JSON = 'incomplete json'
INVALID_JSON = 'invalid json'
INVALID_BIN = 'invalid bin'
def GetFileType(filename):
"""
Get the FileType class member of a given filename
@param filename:
filename of the file to analyse
@return:
FileType class member
"""
filetype = FileType.UNKNOWN
# try filename
try:
isfile = os.path.isfile(filename)
try:
f = open(filename, "r")
try:
# try reading as json
inputjson = json.load(f)
if 'header' in inputjson:
filetype = FileType.JSON
else:
filetype = FileType.INCOMPLETE_JSON
except ValueError:
filetype = FileType.INVALID_JSON
# not a valid json, get filesize and compare it with all possible sizes
try:
size = os.path.getsize(filename)
except:
filetype = FileType.UNKNOWN
sizes = GetTemplateSizes()
# size is one of a dmp file size
if size in sizes:
filetype = FileType.DMP
elif (size - ((len(hex(BINARYFILE_MAGIC))-2)/2)) in sizes:
# check if the binary file has the magic header
try:
inputfile = open(filename, "rb")
inputbin = inputfile.read()
inputfile.close()
if struct.unpack_from('<L', inputbin, 0)[0] == BINARYFILE_MAGIC:
filetype = FileType.BIN
else:
filetype = FileType.INVALID_BIN
except:
pass
# ~ else:
# ~ filetype = FileType.UNKNOWN
finally:
f.close()
except:
filetype = FileType.FILE_NOT_FOUND
except:
filetype = FileType.FILE_NOT_FOUND
return filetype
def GetVersionStr(version):
"""
Create human readable version string
@param version:
version integer
@return:
version string
"""
if isinstance(version, (unicode,str)):
version = int(version, 0)
major = ((version>>24) & 0xff)
minor = ((version>>16) & 0xff)
release = ((version>> 8) & 0xff)
subrelease = (version & 0xff)
if major >= 6:
if subrelease > 0:
subreleasestr = str(subrelease)
else:
subreleasestr = ''
else:
if subrelease > 0:
subreleasestr = str(chr(subrelease+ord('a')-1))
else:
subreleasestr = ''
return "{:d}.{:d}.{:d}{}{}".format( major, minor, release, '.' if (major >= 6 and subreleasestr != '') else '', subreleasestr)
def MakeFilename(filename, filetype, configmapping):
"""
Replace variables within a filename
@param filename:
original filename possible containing replacements:
@v:
Tasmota version from config data
@f:
friendlyname from config data
@h:
hostname from config data
@H:
hostname from device (-d arg only)
@param filetype:
FileType.x object - creates extension if not None
@param configmapping:
binary config data (decrypted)
@return:
New filename with replacements
"""
config_version = config_friendlyname = config_hostname = device_hostname = ''
if 'version' in configmapping:
config_version = GetVersionStr( int(str(configmapping['version']), 0) )
if 'friendlyname' in configmapping:
config_friendlyname = configmapping['friendlyname'][0]
if 'hostname' in configmapping:
if configmapping['hostname'].find('%') < 0:
config_hostname = configmapping['hostname']
if filename.find('@H') >= 0 and args.device is not None:
device_hostname = GetTasmotaHostname(args.device, args.port, username=args.username, password=args.password)
if device_hostname is None:
device_hostname = ''
filename = filename.replace('@v', config_version)
filename = filename.replace('@f', config_friendlyname )
filename = filename.replace('@h', config_hostname )
filename = filename.replace('@H', device_hostname )
dirname = basename = ext = ''
name = filename
# split file parts
dirname = os.path.normpath(os.path.dirname(filename))
basename = os.path.basename(filename)
name, ext = os.path.splitext(basename)
# make a valid filename
try:
name = name.decode('unicode-escape').translate(dict((ord(char), None) for char in '\/*?:"<>|'))
except:
pass
name = str(name.replace(' ','_'))
# append extension based on filetype if not given
if len(ext) and ext[0]=='.':
ext = ext[1:]
if filetype is not None and args.extension and (len(ext)<2 or all(c.isdigit() for c in ext)):
ext = filetype.lower()
# join filename + extension
if len(ext):
name_ext = name+'.'+ext
else:
name_ext = name
# join path and filename
try:
filename = os.path.join(dirname, name_ext)
except:
pass
return filename
def MakeUrl(host, port=80, location=''):
"""
Create a Tasmota host url
@param host:
hostname or IP of Tasmota host
@param port:
port number to use for http connection
@param location:
http url location
@return:
Tasmota http url
"""
return "http://{shost}{sdelimiter}{sport}/{slocation}".format(\
shost=host,
sdelimiter=':' if port != 80 else '',
sport=port if port != 80 else '',
slocation=location )
def LoadTasmotaConfig(filename):
"""
Load config from Tasmota file
@param filename:
filename to load
@return:
binary config data (encrypted) or None on error
"""
encode_cfg = None
# read config from a file
if not os.path.isfile(filename): # check file exists
exit(ExitCode.FILE_NOT_FOUND, "File '{}' not found".format(filename),line=inspect.getlineno(inspect.currentframe()))
try:
tasmotafile = open(filename, "rb")
encode_cfg = tasmotafile.read()
tasmotafile.close()
except Exception, e:
exit(e[0], "'{}' {}".format(filename, e[1]),line=inspect.getlineno(inspect.currentframe()))
return encode_cfg
def TasmotaGet(cmnd, host, port, username=DEFAULTS['source']['username'], password=None, contenttype = None):
"""
Tasmota http request
@param host:
hostname or IP of Tasmota device
@param port:
http port of Tasmota device
@param username:
optional username for Tasmota web login
@param password
optional password for Tasmota web login
@return:
binary config data (encrypted) or None on error
"""
body = None
# read config direct from device via http
c = pycurl.Curl()
buffer = io.BytesIO()
c.setopt(c.WRITEDATA, buffer)
header = HTTPHeader()
c.setopt(c.HEADERFUNCTION, header.store)
c.setopt(c.FOLLOWLOCATION, True)
c.setopt(c.URL, MakeUrl(host, port, cmnd))
if username is not None and password is not None:
c.setopt(c.HTTPAUTH, c.HTTPAUTH_BASIC)
c.setopt(c.USERPWD, username + ':' + password)
c.setopt(c.HTTPGET, True)
c.setopt(c.VERBOSE, False)
responsecode = 200
try:
c.perform()
responsecode = c.getinfo(c.RESPONSE_CODE)
response = header.response()
except Exception, e:
exit(e[0], e[1],line=inspect.getlineno(inspect.currentframe()))
finally:
c.close()
if responsecode >= 400:
exit(responsecode, 'HTTP result: {}'.format(header.response()),line=inspect.getlineno(inspect.currentframe()))
elif contenttype is not None and header.contenttype()!=contenttype:
exit(ExitCode.DOWNLOAD_CONFIG_ERROR, "Device did not response properly, may be Tasmota webserver admin mode is disabled (WebServer 2)",line=inspect.getlineno(inspect.currentframe()))
try:
body = buffer.getvalue()
except:
pass
return responsecode, body
def GetTasmotaHostname(host, port, username=DEFAULTS['source']['username'], password=None):
"""
Get Tasmota hostname from device
@param host:
hostname or IP of Tasmota device
@param port:
http port of Tasmota device
@param username:
optional username for Tasmota web login
@param password
optional password for Tasmota web login
@return:
Tasmota real hostname or None on error
"""
hostname = None
loginstr = ""
if password is not None:
loginstr = "user={}&password={}&".format(urllib2.quote(username), urllib2.quote(password))
# get hostname
responsecode, body = TasmotaGet("cm?{}cmnd=status%205".format(loginstr), host, port, username=username, password=password)
if body is not None:
jsonbody = json.loads(body)
if "StatusNET" in jsonbody and "Hostname" in jsonbody["StatusNET"]:
hostname = jsonbody["StatusNET"]["Hostname"]
if args.verbose:
message("Hostname for '{}' retrieved: '{}'".format(host, hostname), typ=LogType.INFO)
return hostname
def PullTasmotaConfig(host, port, username=DEFAULTS['source']['username'], password=None):
"""
Pull config from Tasmota device
@param host:
hostname or IP of Tasmota device
@param port:
http port of Tasmota device
@param username:
optional username for Tasmota web login
@param password
optional password for Tasmota web login
@return:
binary config data (encrypted) or None on error
"""
responsecode, body = TasmotaGet('dl', host, port, username, password, contenttype='application/octet-stream')
return body
def PushTasmotaConfig(encode_cfg, host, port, username=DEFAULTS['source']['username'], password=None):
"""
Upload binary data to a Tasmota host using http
@param encode_cfg:
encrypted binary data or filename containing Tasmota encrypted binary config
@param host:
hostname or IP of Tasmota device
@param port:
http port of Tasmota device
@param username:
optional username for Tasmota web login
@param password
optional password for Tasmota web login
@return
errorcode, errorstring
errorcode=0 if success, otherwise http response or exception code
"""
if isinstance(encode_cfg, bytearray):
encode_cfg = str(encode_cfg)
# get restore config page first to set internal Tasmota vars
responsecode, body = TasmotaGet('rs?', host, port, username, password, contenttype='text/html')
if body is None:
return responsecode, "ERROR"
# post data
c = pycurl.Curl()
header = HTTPHeader()
c.setopt(c.HEADERFUNCTION, header.store)
c.setopt(c.WRITEFUNCTION, lambda x: None)
c.setopt(c.POST, 1)
c.setopt(c.URL, MakeUrl(host, port, 'u2'))
if username is not None and password is not None:
c.setopt(c.HTTPAUTH, c.HTTPAUTH_BASIC)
c.setopt(c.USERPWD, username + ':' + password)
try:
isfile = os.path.isfile(encode_cfg)
except:
isfile = False
if isfile:
c.setopt(c.HTTPPOST, [("file", (c.FORM_FILE, encode_cfg))])
else:
# use as binary data
c.setopt(c.HTTPPOST, [
('fileupload', (
c.FORM_BUFFER, '{sprog}_v{sver}.dmp'.format(sprog=os.path.basename(sys.argv[0]), sver=VER),
c.FORM_BUFFERPTR, encode_cfg
)),
])
responsecode = 200
try:
c.perform()
responsecode = c.getinfo(c.RESPONSE_CODE)
except Exception, e:
return e[0], e[1]
c.close()
if responsecode >= 400:
return responsecode, header.response()
elif header.contenttype() != 'text/html':
return ExitCode.UPLOAD_CONFIG_ERROR, "Device did not response properly, may be Tasmota webserver admin mode is disabled (WebServer 2)"
return 0, 'OK'
def DecryptEncrypt(obj):
"""
Decrpt/Encrypt binary config data
@param obj:
binary config data
@return:
decrypted configuration (if obj contains encrypted data)
"""
if isinstance(obj, bytearray):
obj = str(obj)
dobj = obj[0:2]
for i in range(2, len(obj)):
dobj += chr( (ord(obj[i]) ^ (CONFIG_FILE_XOR +i)) & 0xff )
return dobj
def GetSettingsCrc(dobj):
"""
Return binary config data calclulated crc
@param dobj:
decrypted binary config data
@return:
2 byte unsigned integer crc value
"""
if isinstance(dobj, bytearray):
dobj = str(dobj)
crc = 0
for i in range(0, len(dobj)):
if not i in [14,15]: # Skip crc
byte = ord(dobj[i])
crc += byte * (i+1)
return crc & 0xffff
def GetFieldDef(fielddef, fields="format, addrdef, baseaddr, bits, bitshift, datadef, arraydef, validate, cmd, group, tasmotacmnd, converter, readconverter, writeconverter"):
"""
Get field definition items
@param fielddef:
field format - see "Settings dictionary" above
@param fields:
comma separated string list of values to be returned
possible values see fields default
@return:
set of values defined in <fields>
"""
format = addrdef = baseaddr = datadef = arraydef = validate = cmd = group = tasmotacmnd = converter = readconverter = writeconverter = None
bits = bitshift = 0
# calling with nothing is wrong
if fielddef is None:
print >> sys.stderr, '<fielddef> is None'
raise SyntaxError('<fielddef> error')
# get top level items
if len(fielddef) == 3:
# converter not present
format, addrdef, datadef = fielddef
elif len(fielddef) == 4:
# converter present
format, addrdef, datadef, converter = fielddef
else:
print >> sys.stderr, 'wrong <fielddef> {} length ({}) in setting'.format(fielddef, len(fielddef))
raise SyntaxError('<fielddef> error')
# ignore calls with 'root' setting
if isinstance(format, dict) and baseaddr is None and datadef is None:
return eval(fields)
if not isinstance(format, (unicode,str,dict)):
print >> sys.stderr, 'wrong <format> {} type {} in <fielddef> {}'.format(format, type(format), fielddef)
raise SyntaxError('<fielddef> error')
# extract addrdef items
baseaddr = addrdef
if isinstance(baseaddr, (list,tuple)):
if len(baseaddr) == 3:
# baseaddr bit definition
baseaddr, bits, bitshift = baseaddr
if not isinstance(bits, int):
print >> sys.stderr, '<bits> must be a integer in <fielddef> {}'.format(bits, fielddef)
raise SyntaxError('<fielddef> error')
if not isinstance(bitshift, int):
print >> sys.stderr, '<bitshift> must be a integer in <fielddef> {}'.format(bitshift, fielddef)
raise SyntaxError('<fielddef> error')
else:
print >> sys.stderr, 'wrong <addrdef> {} length ({}) in <fielddef> {}'.format(addrdef, len(addrdef), fielddef)
raise SyntaxError('<fielddef> error')
if not isinstance(baseaddr, int):
print >> sys.stderr, '<baseaddr> must be a integer in <fielddef> {}'.format(baseaddr, fielddef)
raise SyntaxError('<fielddef> error')
# extract datadef items
arraydef = datadef
if isinstance(datadef, (tuple)):
if len(datadef) == 2:
# datadef has a validator
arraydef, validate = datadef
elif len(datadef) == 3:
# datadef has a validator and cmd set
arraydef, validate, cmd = datadef
# cmd must be a tuple with 2 objects
if isinstance(cmd, (tuple)) and len(cmd) == 2:
group, tasmotacmnd = cmd
if group is not None and not isinstance(group, (str, unicode)):
print >> sys.stderr, 'wrong <group> {} in <fielddef> {}'.format(group, fielddef)
raise SyntaxError('<fielddef> error')
if tasmotacmnd is not None and not callable(tasmotacmnd) and not isinstance(tasmotacmnd, (str, unicode)):
print >> sys.stderr, 'wrong <tasmotacmnd> {} in <fielddef> {}'.format(tasmotacmnd, fielddef)
raise SyntaxError('<fielddef> error')
else:
print >> sys.stderr, 'wrong <cmd> {} length ({}) in <fielddef> {}'.format(cmd, len(cmd), fielddef)
raise SyntaxError('<fielddef> error')
else:
print >> sys.stderr, 'wrong <datadef> {} length ({}) in <fielddef> {}'.format(datadef, len(datadef), fielddef)
raise SyntaxError('<fielddef> error')
if validate is not None and (not isinstance(validate, (unicode,str)) and not callable(validate)):
print >> sys.stderr, 'wrong <validate> {} type {} in <fielddef> {}'.format(validate, type(validate), fielddef)
raise SyntaxError('<fielddef> error')
# convert single int into one-dimensional list
if isinstance(arraydef, int):
arraydef = [arraydef]
if arraydef is not None and not isinstance(arraydef, (list)):
print >> sys.stderr, 'wrong <arraydef> {} type {} in <fielddef> {}'.format(arraydef, type(arraydef), fielddef)
raise SyntaxError('<fielddef> error')
# get read/write converter items
readconverter = converter
if isinstance(converter, (tuple)):
if len(converter) == 2:
# converter has read/write converter
readconverter, writeconverter = converter
if readconverter is not None and not isinstance(readconverter, (str,unicode)) and not callable(readconverter):
print >> sys.stderr, 'wrong <readconverter> {} type {} in <fielddef> {}'.format(readconverter, type(readconverter), fielddef)
raise SyntaxError('<fielddef> error')
if writeconverter is not None and (not isinstance(writeconverter, (bool,str,unicode)) and not callable(writeconverter)):
print >> sys.stderr, 'wrong <writeconverter> {} type {} in <fielddef> {}'.format(writeconverter, type(writeconverter), fielddef)
raise SyntaxError('<fielddef> error')
else:
print >> sys.stderr, 'wrong <converter> {} length ({}) in <fielddef> {}'.format(converter, len(converter), fielddef)
raise SyntaxError('<fielddef> error')
return eval(fields)
def ReadWriteConverter(value, fielddef, read=True, raw=False):
"""
Convert field value based on field desc
@param value:
original value
@param fielddef
field definition - see "Settings dictionary" above
@param read
use read conversion if True, otherwise use write conversion
@param raw
return raw values (True) or converted values (False)
@return:
(un)converted value
"""
converter, readconverter, writeconverter = GetFieldDef(fielddef, fields='converter, readconverter, writeconverter')
# call password functions even if raw value should be processed
if read and callable(readconverter) and readconverter == passwordread:
raw = False
if not read and callable(writeconverter) and writeconverter == passwordwrite:
raw = False
if not raw and converter is not None:
conv = readconverter if read else writeconverter
try:
if isinstance(conv, str): # evaluate strings
return eval(conv.replace('$','value'))
elif callable(conv): # use as format function
return conv(value)
except Exception, e:
exit(e[0], e[1], typ=LogType.WARNING, line=inspect.getlineno(inspect.currentframe()))
return value
def CmndConverter(valuemapping, value, idx, fielddef):
"""
Convert field value into Tasmota command if available
@param valuemapping:
data mapping
@param value:
original value
@param fielddef
field definition - see "Settings dictionary" above
@return:
converted value or None if unable to convert
"""
converter, readconverter, writeconverter, group, tasmotacmnd = GetFieldDef(fielddef, fields='converter, readconverter, writeconverter, group, tasmotacmnd')
result = None
if (callable(readconverter) and readconverter == passwordread) or (callable(writeconverter) and writeconverter == passwordwrite):
if value == HIDDEN_PASSWORD:
return None
else:
result = value
if tasmotacmnd is not None and (callable(tasmotacmnd) or len(tasmotacmnd) > 0):
if idx is not None:
idx += 1
if isinstance(tasmotacmnd, str): # evaluate strings
if idx is not None:
evalstr = tasmotacmnd.replace('$','value').replace('#','idx').replace('@','valuemapping')
else:
evalstr = tasmotacmnd.replace('$','value').replace('@','valuemapping')
# ~ try:
result = eval(evalstr)
# ~ except:
# ~ print evalstr
# ~ print value
elif callable(tasmotacmnd): # use as format function
if idx is not None:
result = tasmotacmnd(value, idx)
else:
result = tasmotacmnd(value)
return result
def ValidateValue(value, fielddef):
"""
Validate a value if validator is defined in fielddef
@param value:
original value
@param fielddef
field definition - see "Settings dictionary" above
@return:
True if value is valid, False if invalid
"""
validate = GetFieldDef(fielddef, fields='validate')
if value == 0:
# can not complete all validate condition
# some Tasmota values are not allowed to be 0 on input
# even though these values are set to 0 on Tasmota initial.
# so we can't validate 0 values
return True;
valid = True
try:
if isinstance(validate, str): # evaluate strings
valid = eval(validate.replace('$','value'))
elif callable(validate): # use as format function
valid = validate(value)
except:
valid = False
return valid
def GetFieldMinMax(fielddef):
"""
Get minimum, maximum of field based on field format definition
@param fielddef:
field format - see "Settings dictionary" above
@return:
min, max
"""
minmax = {'c': (0, 1),
'?': (0, 1),
'b': (~0x7f, 0x7f),
'B': (0, 0xff),
'h': (~0x7fff, 0x7fff),
'H': (0, 0xffff),
'i': (~0x7fffffff, 0x7fffffff),
'I': (0, 0xffffffff),
'l': (~0x7fffffff, 0x7fffffff),
'L': (0, 0xffffffff),
'q': (~0x7fffffffffffffff, 0x7fffffffffffffff),
'Q': (0, 0x7fffffffffffffff),
'f': (sys.float_info.min, sys.float_info.max),
'd': (sys.float_info.min, sys.float_info.max),
}
format = GetFieldDef(fielddef, fields='format')
_min = 0
_max = 0
if format[-1:] in minmax:
_min, _max = minmax[format[-1:]]
elif format[-1:] in ['s','p']:
# s and p may have a prefix as length
match = re.search("\s*(\d+)", format)
if match:
_max=int(match.group(0))
return _min,_max
def GetFieldLength(fielddef):
"""
Get length of a field in bytes based on field format definition
@param fielddef:
field format - see "Settings dictionary" above
@return:
length of field in bytes
"""
length=0
format, addrdef, arraydef = GetFieldDef(fielddef, fields='format, addrdef, arraydef')
# <arraydef> contains a integer list
if isinstance(arraydef, list) and len(arraydef) > 0:
# arraydef contains a list
# calc size recursive by sum of all elements
for i in range(0, arraydef[0]):
subfielddef = GetSubfieldDef(fielddef)
if len(arraydef) > 1:
length += GetFieldLength( (format, addrdef, subfielddef) )
# single array
else:
length += GetFieldLength( (format, addrdef, None) )
elif isinstance(format, dict):
# -> iterate through format
addr = None
setting = format
for name in setting:
baseaddr, bits, bitshift = GetFieldDef(setting[name], fields='baseaddr, bits, bitshift')
_len = GetFieldLength(setting[name])
if addr != baseaddr:
addr = baseaddr
length += _len
# a simple value
elif isinstance(format, str):
if format[-1:] in ['b','B','c','?']:
length=1
elif format[-1:] in ['h','H']:
length=2
elif format[-1:] in ['i','I','l','L','f']:
length=4
elif format[-1:] in ['q','Q','d']:
length=8
elif format[-1:] in ['s','p']:
# s and p may have a prefix as length
match = re.search("\s*(\d+)", format)
if match:
length=int(match.group(0))
return length
def GetSubfieldDef(fielddef):
"""
Get subfield definition from a given field definition
@param fielddef:
see Settings desc above
@return:
subfield definition
"""
format, addrdef, datadef, arraydef, validate, cmd, converter = GetFieldDef(fielddef, fields='format, addrdef, datadef, arraydef, validate, cmd, converter')
# create new arraydef
if len(arraydef) > 1:
arraydef = arraydef[1:]
else:
arraydef = None
# create new datadef
if isinstance(datadef, tuple):
if cmd is not None:
datadef = (arraydef, validate, cmd)
else:
datadef = (arraydef, validate)
else:
datadef = arraydef
# set new field def
subfielddef = None
if converter is not None:
subfielddef = (format, addrdef, datadef, converter)
else:
subfielddef = (format, addrdef, datadef)
return subfielddef
def IsFilterGroup(group):
"""
Check if group is valid on filter
@param grooup:
group name to check
@return:
True if group is in filter, otherwise False
"""
if args.filter is not None:
if group is None:
return False
if group != INTERNAL and group != '*' and group not in args.filter:
return False
return True
def GetField(dobj, fieldname, fielddef, raw=False, addroffset=0):
"""
Get field value from definition
@param dobj:
decrypted binary config data
@param fieldname:
name of the field
@param fielddef:
see Settings desc above
@param raw
return raw values (True) or converted values (False)
@param addroffset
use offset for baseaddr (used for recursive calls)
@return:
field mapping
"""
if isinstance(dobj, bytearray):
dobj = str(dobj)
valuemapping = None
# get field definition
format, baseaddr, bits, bitshift, arraydef, group, tasmotacmnd = GetFieldDef(fielddef, fields='format, baseaddr, bits, bitshift, arraydef, group, tasmotacmnd')
# filter groups
if not IsFilterGroup(group):
return valuemapping
# <arraydef> contains a integer list
if isinstance(arraydef, list) and len(arraydef) > 0:
valuemapping = []
offset = 0
for i in range(0, arraydef[0]):
subfielddef = GetSubfieldDef(fielddef)
length = GetFieldLength(subfielddef)
if length != 0:
value = GetField(dobj, fieldname, subfielddef, raw=raw, addroffset=addroffset+offset)
valuemapping.append(value)
offset += length
# <format> contains a dict
elif isinstance(format, dict):
mapping_value = {}
# -> iterate through format
for name in format:
value = None
value = GetField(dobj, name, format[name], raw=raw, addroffset=addroffset)
if value is not None:
mapping_value[name] = value
# copy complete returned mapping
valuemapping = copy.deepcopy(mapping_value)
# a simple value
elif isinstance(format, (str, bool, int, float, long)):
if GetFieldLength(fielddef) != 0:
valuemapping = struct.unpack_from(format, dobj, baseaddr+addroffset)[0]
if not format[-1:].lower() in ['s','p']:
valuemapping = bitsRead(valuemapping, bitshift, bits)
# additional processing for strings
if format[-1:].lower() in ['s','p']:
# use left string until \0
s = str(valuemapping).split('\0')[0]
# remove character > 127
valuemapping = unicode(s, errors='ignore')
valuemapping = ReadWriteConverter(valuemapping, fielddef, read=True, raw=raw)
else:
exit(ExitCode.INTERNAL_ERROR, "Wrong mapping format definition: '{}'".format(format), typ=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe()))
return valuemapping
def SetField(dobj, fieldname, fielddef, restore, addroffset=0, filename=""):
"""
Get field value from definition
@param dobj:
decrypted binary config data
@param fieldname:
name of the field
@param fielddef:
see Settings desc above
@param restore
restore mapping with the new value(s)
@param addroffset
use offset for baseaddr (used for recursive calls)
@param filename
related filename (for messages only)
@return:
new decrypted binary config data
"""
format, baseaddr, bits, bitshift, arraydef, group, writeconverter = GetFieldDef(fielddef, fields='format, baseaddr, bits, bitshift, arraydef, group, writeconverter')
# cast unicode
fieldname = str(fieldname)
# filter groups
if not IsFilterGroup(group):
return dobj
# do not write readonly values
if writeconverter is False:
if args.debug:
print >> sys.stderr, "SetField(): Readonly '{}' using '{}'/{}{} @{} skipped".format(fieldname, format, arraydef, bits, hex(baseaddr+addroffset))
return dobj
# <arraydef> contains a list
if isinstance(arraydef, list) and len(arraydef) > 0:
offset = 0
if len(restore) > arraydef[0]:
exit(ExitCode.RESTORE_DATA_ERROR, "file '{sfile}', array '{sname}[{selem}]' exceeds max number of elements [{smax}]".format(sfile=filename, sname=fieldname, selem=len(restore), smax=arraydef[0]), typ=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe()))
for i in range(0, arraydef[0]):
subfielddef = GetSubfieldDef(fielddef)
length = GetFieldLength(subfielddef)
if length != 0:
if i >= len(restore): # restore data list may be shorter than definition
break
subrestore = restore[i]
dobj = SetField(dobj, fieldname, subfielddef, subrestore, addroffset=addroffset+offset, filename=filename)
offset += length
# <format> contains a dict
elif isinstance(format, dict):
for name in format: # -> iterate through format
if name in restore:
dobj = SetField(dobj, name, format[name], restore[name], addroffset=addroffset, filename=filename)
# a simple value
elif isinstance(format, (str, bool, int, float, long)):
valid = True
err = ""
errformat = ""
_min, _max = GetFieldMinMax(fielddef)
value = _value = None
skip = False
# simple char value
if format[-1:] in ['c']:
try:
value = ReadWriteConverter(restore.encode(STR_ENCODING)[0], fielddef, read=False)
except Exception, e:
exit(e[0], e[1], typ=LogType.WARNING, line=inspect.getlineno(inspect.currentframe()))
valid = False
# bool
elif format[-1:] in ['?']:
try:
value = ReadWriteConverter(bool(restore), fielddef, read=False)
except Exception, e:
exit(e[0], e[1], typ=LogType.WARNING, line=inspect.getlineno(inspect.currentframe()))
valid = False
# integer
elif format[-1:] in ['b','B','h','H','i','I','l','L','q','Q','P']:
value = ReadWriteConverter(restore, fielddef, read=False)
if isinstance(value, (str, unicode)):
value = int(value, 0)
else:
value = int(value)
# bits
if bits != 0:
bitvalue = value
value = struct.unpack_from(format, dobj, baseaddr+addroffset)[0]
# validate restore value
valid = ValidateValue(bitvalue, fielddef)
if not valid:
err = "valid bit range exceeding"
else:
mask = (1<<bits)-1
if bitvalue > mask:
_min = 0
_max = mask
_value = bitvalue
valid = False
else:
if bitshift >= 0:
bitvalue <<= bitshift
mask <<= bitshift
else:
bitvalue >>= abs(bitshift)
mask >>= abs(bitshift)
v=value
value &= (0xffffffff ^ mask)
value |= bitvalue
# full size values
else:
# validate restore function
valid = ValidateValue(value, fielddef)
if not valid:
err = "valid range exceeding"
_value = value
# float
elif format[-1:] in ['f','d']:
try:
value = ReadWriteConverter(float(restore), fielddef, read=False)
except:
valid = False
# string
elif format[-1:] in ['s','p']:
value = ReadWriteConverter(restore.encode(STR_ENCODING), fielddef, read=False)
err = "string length exceeding"
if value is not None:
# be aware 0 byte at end of string (str must be < max, not <= max)
_max -= 1
valid = _min <= len(value) < _max
else:
skip = True
valid = True
if value is None and not skip:
# None is an invalid value
valid = False
if valid is None and not skip:
# validate against object type size
valid = _min <= value <= _max
if not valid:
err = "type range exceeding"
errformat = " [{smin},{smax}]"
if _value is None:
# copy value before possible change below
_value = value
if isinstance(_value, (str, unicode)):
_value = "'{}'".format(_value)
if valid:
if not skip:
if args.debug:
if bits:
sbits=" {} bits shift {}".format(bits, bitshift)
else:
sbits = ""
print >> sys.stderr, "SetField(): Set '{}' using '{}'/{}{} @{} to {}".format(fieldname, format, arraydef, sbits, hex(baseaddr+addroffset), _value)
if fieldname != 'cfg_crc':
prevvalue = struct.unpack_from(format, dobj, baseaddr+addroffset)[0]
struct.pack_into(format, dobj, baseaddr+addroffset, value)
curvalue = struct.unpack_from(format, dobj, baseaddr+addroffset)[0]
if prevvalue != curvalue and args.verbose:
message("Value for '{}' changed from {} to {}".format(fieldname, prevvalue, curvalue), typ=LogType.INFO)
else:
sformat = "file '{sfile}' - {{'{sname}': {svalue}}} ({serror})"+errformat
exit(ExitCode.RESTORE_DATA_ERROR, sformat.format(sfile=filename, sname=fieldname, serror=err, svalue=_value, smin=_min, smax=_max), typ=LogType.WARNING, doexit=not args.ignorewarning)
return dobj
def SetCmnd(cmnds, fieldname, fielddef, valuemapping, mappedvalue, addroffset=0, idx=None):
"""
Get field value from definition
@param cmnds:
Tasmota command mapping: { 'group': ['cmnd' <,'cmnd'...>] ... }
@param fieldname:
name of the field
@param fielddef:
see Settings desc above
@param valuemapping:
data mapping
@param mappedvalue
mappedvalue mapping with the new value(s)
@param addroffset
use offset for baseaddr (used for recursive calls)
@param idx
optional array index
@return:
new Tasmota command mapping
"""
format, baseaddr, bits, bitshift, arraydef, group, tasmotacmnd, writeconverter = GetFieldDef(fielddef, fields='format, baseaddr, bits, bitshift, arraydef, group, tasmotacmnd, writeconverter')
# cast unicode
fieldname = str(fieldname)
# filter groups
if not IsFilterGroup(group):
return cmnds
# <arraydef> contains a list
if isinstance(arraydef, list) and len(arraydef) > 0:
offset = 0
if len(mappedvalue) > arraydef[0]:
exit(ExitCode.RESTORE_DATA_ERROR, "array '{sname}[{selem}]' exceeds max number of elements [{smax}]".format(sname=fieldname, selem=len(mappedvalue), smax=arraydef[0]), typ=LogType.WARNING, doexit=not args.ignorewarning, line=inspect.getlineno(inspect.currentframe()))
for i in range(0, arraydef[0]):
subfielddef = GetSubfieldDef(fielddef)
length = GetFieldLength(subfielddef)
if length != 0:
if i >= len(mappedvalue): # mappedvalue data list may be shorter than definition
break
subrestore = mappedvalue[i]
cmnds = SetCmnd(cmnds, fieldname, subfielddef, valuemapping, subrestore, addroffset=addroffset+offset, idx=i)
offset += length
# <format> contains a dict
elif isinstance(format, dict):
for name in format: # -> iterate through format
if name in mappedvalue:
cmnds = SetCmnd(cmnds, name, format[name], valuemapping, mappedvalue[name], addroffset=addroffset, idx=idx)
# a simple value
elif isinstance(format, (str, bool, int, float, long)):
cmnd = CmndConverter(valuemapping, mappedvalue, idx, fielddef)
if group is not None and cmnd is not None:
if group not in cmnds:
cmnds[group] = []
cmnds[group].append(cmnd)
return cmnds
def Bin2Mapping(decode_cfg):
"""
Decodes binary data stream into pyhton mappings dict
@param decode_cfg:
binary config data (decrypted)
@return:
valuemapping data as mapping dictionary
"""
if isinstance(decode_cfg, bytearray):
decode_cfg = str(decode_cfg)
# get binary header and template to use
version, size, setting = GetTemplateSetting(decode_cfg)
# if we did not found a mathching setting
if setting is None:
exit(ExitCode.UNSUPPORTED_VERSION, "Tasmota configuration version 0x{:x} not supported".format(version),line=inspect.getlineno(inspect.currentframe()))
if 'version' in setting:
cfg_version = GetField(decode_cfg, 'version', setting['version'], raw=True)
# check size if exists
if 'cfg_size' in setting:
cfg_size = GetField(decode_cfg, 'cfg_size', setting['cfg_size'], raw=True)
# read size should be same as definied in setting
if cfg_size > size:
# may be processed
exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read does ot match - read {}, expected {} byte".format(cfg_size, size), typ=LogType.ERROR,line=inspect.getlineno(inspect.currentframe()))
elif cfg_size < size:
# less number of bytes can not be processed
exit(ExitCode.DATA_SIZE_MISMATCH, "Number of bytes read to small to process - read {}, expected {} byte".format(cfg_size, size), typ=LogType.ERROR,line=inspect.getlineno(inspect.currentframe()))
# check crc if exists
if 'cfg_crc' in setting:
cfg_crc = GetField(decode_cfg, 'cfg_crc', setting['cfg_crc'], raw=True)
else:
cfg_crc = GetSettingsCrc(decode_cfg)
if cfg_crc != GetSettingsCrc(decode_cfg):
exit(ExitCode.DATA_CRC_ERROR, 'Data CRC error, read 0x{:x} should be 0x{:x}'.format(cfg_crc, GetSettingsCrc(decode_cfg)), typ=LogType.WARNING, doexit=not args.ignorewarning,line=inspect.getlineno(inspect.currentframe()))
# get valuemapping
valuemapping = GetField(decode_cfg, None, (setting,0,(None, None, (INTERNAL, None))))
# add header info
timestamp = datetime.now()
valuemapping['header'] = { 'timestamp':timestamp.strftime("%Y-%m-%d %H:%M:%S"),
'format': {
'jsonindent': args.jsonindent,
'jsoncompact': args.jsoncompact,
'jsonsort': args.jsonsort,
'jsonhidepw': args.jsonhidepw,
},
'template': {
'version': hex(version),
'crc': hex(cfg_crc),
},
'data': {
'crc': hex(GetSettingsCrc(decode_cfg)),
'size': len(decode_cfg),
},
'script': {
'name': os.path.basename(__file__),
'version': VER,
},
'os': (platform.machine(), platform.system(), platform.release(), platform.version(), platform.platform()),
'python': platform.python_version(),
}
if 'cfg_crc' in setting:
valuemapping['header']['template'].update({'size': cfg_size})
if 'version' in setting:
valuemapping['header']['data'].update({'version': hex(cfg_version)})
return valuemapping
def Mapping2Bin(decode_cfg, jsonconfig, filename=""):
"""
Encodes into binary data stream
@param decode_cfg:
binary config data (decrypted)
@param jsonconfig:
restore data mapping
@param filename:
name of the restore file (for error output only)
@return:
changed binary config data (decrypted) or None on error
"""
if isinstance(decode_cfg, str):
decode_cfg = bytearray(decode_cfg)
# get binary header data to use the correct version template from device
version, size, setting = GetTemplateSetting(decode_cfg)
# make empty binarray array
_buffer = bytearray()
# add data
_buffer.extend(decode_cfg)
if setting is not None:
# iterate through restore data mapping
for name in jsonconfig:
# key must exist in both dict
if name in setting:
SetField(_buffer, name, setting[name], jsonconfig[name], addroffset=0, filename=filename)
else:
if name != 'header':
exit(ExitCode.RESTORE_DATA_ERROR, "Restore file '{}' contains obsolete name '{}', skipped".format(filename, name), typ=LogType.WARNING, doexit=not args.ignorewarning)
if 'cfg_crc' in setting:
crc = GetSettingsCrc(_buffer)
struct.pack_into(setting['cfg_crc'][0], _buffer, setting['cfg_crc'][1], crc)
return _buffer
else:
exit(ExitCode.UNSUPPORTED_VERSION,"File '{}', Tasmota configuration version 0x{:x} not supported".format(filename, version), typ=LogType.WARNING, doexit=not args.ignorewarning)
return None
def Mapping2Cmnd(decode_cfg, valuemapping, filename=""):
"""
Encodes mapping data into Tasmota command mapping
@param decode_cfg:
binary config data (decrypted)
@param valuemapping:
data mapping
@param filename:
name of the restore file (for error output only)
@return:
Tasmota command mapping {group: [cmnd <,cmnd <,...>>]}
"""
if isinstance(decode_cfg, str):
decode_cfg = bytearray(decode_cfg)
# get binary header data to use the correct version template from device
version, size, setting = GetTemplateSetting(decode_cfg)
cmnds = {}
if setting is not None:
# iterate through restore data mapping
for name in valuemapping:
# key must exist in both dict
if name in setting:
cmnds = SetCmnd(cmnds, name, setting[name], valuemapping, valuemapping[name], addroffset=0)
else:
if name != 'header':
exit(ExitCode.RESTORE_DATA_ERROR, "Restore file '{}' contains obsolete name '{}', skipped".format(filename, name), typ=LogType.WARNING, doexit=not args.ignorewarning)
return cmnds
else:
exit(ExitCode.UNSUPPORTED_VERSION,"File '{}', Tasmota configuration version 0x{:x} not supported".format(filename, version), typ=LogType.WARNING, doexit=not args.ignorewarning)
return None
def Backup(backupfile, backupfileformat, encode_cfg, decode_cfg, configmapping):
"""
Create backup file
@param backupfile:
Raw backup filename from program args
@param backupfileformat:
Backup file format
@param encode_cfg:
binary config data (encrypted)
@param decode_cfg:
binary config data (decrypted)
@param configmapping:
config data mapppings
"""
backupfileformat = args.backupfileformat
name, ext = os.path.splitext(backupfile)
if ext.lower() == '.'+FileType.BIN.lower():
backupfileformat = FileType.BIN
elif ext.lower() == '.'+FileType.DMP.lower():
backupfileformat = FileType.DMP
elif ext.lower() == '.'+FileType.JSON.lower():
backupfileformat = FileType.JSON
fileformat = ""
# Tasmota format
if backupfileformat.lower() == FileType.DMP.lower():
fileformat = "Tasmota"
backup_filename = MakeFilename(backupfile, FileType.DMP, configmapping)
if args.verbose:
message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), typ=LogType.INFO)
try:
backupfp = open(backup_filename, "wb")
backupfp.write(encode_cfg)
except Exception, e:
exit(e[0], "'{}' {}".format(backup_filename, e[1]),line=inspect.getlineno(inspect.currentframe()))
finally:
backupfp.close()
# binary format
elif backupfileformat.lower() == FileType.BIN.lower():
fileformat = "binary"
backup_filename = MakeFilename(backupfile, FileType.BIN, configmapping)
if args.verbose:
message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), typ=LogType.INFO)
try:
backupfp = open(backup_filename, "wb")
magic = BINARYFILE_MAGIC
backupfp.write(struct.pack('<L',magic))
backupfp.write(decode_cfg)
except Exception, e:
exit(e[0], "'{}' {}".format(backup_filename, e[1]),line=inspect.getlineno(inspect.currentframe()))
finally:
backupfp.close()
# JSON format
elif backupfileformat.lower() == FileType.JSON.lower():
fileformat = "JSON"
backup_filename = MakeFilename(backupfile, FileType.JSON, configmapping)
if args.verbose:
message("Writing backup file '{}' ({} format)".format(backup_filename, fileformat), typ=LogType.INFO)
try:
backupfp = open(backup_filename, "w")
json.dump(configmapping, backupfp, sort_keys=args.jsonsort, indent=None if args.jsonindent < 0 else args.jsonindent, separators=(',', ':') if args.jsoncompact else (', ', ': ') )
except Exception, e:
exit(e[0], "'{}' {}".format(backup_filename, e[1]),line=inspect.getlineno(inspect.currentframe()))
finally:
backupfp.close()
if args.verbose:
srctype = 'device'
src = args.device
if args.tasmotafile is not None:
srctype = 'file'
src = args.tasmotafile
message("Backup successful from {} '{}' to file '{}' ({} format)".format(srctype, src, backup_filename, fileformat), typ=LogType.INFO)
def Restore(restorefile, encode_cfg, decode_cfg, configmapping):
"""
Restore from file
@param encode_cfg:
binary config data (encrypted)
@param decode_cfg:
binary config data (decrypted)
@param configmapping:
config data mapppings
"""
new_encode_cfg = None
restorefilename = MakeFilename(restorefile, None, configmapping)
filetype = GetFileType(restorefilename)
if filetype == FileType.DMP:
if args.verbose:
message("Reading restore file '{}' (Tasmota format)".format(restorefilename), typ=LogType.INFO)
try:
restorefp = open(restorefilename, "rb")
new_encode_cfg = restorefp.read()
restorefp.close()
except Exception, e:
exit(e[0], "'{}' {}".format(restorefilename, e[1]),line=inspect.getlineno(inspect.currentframe()))
elif filetype == FileType.BIN:
if args.verbose:
message("Reading restore file '{}' (binary format)".format(restorefilename), typ=LogType.INFO)
try:
restorefp = open(restorefilename, "rb")
restorebin = restorefp.read()
restorefp.close()
except Exception, e:
exit(e[0], "'{}' {}".format(restorefilename, e[1]),line=inspect.getlineno(inspect.currentframe()))
header = struct.unpack_from('<L', restorebin, 0)[0]
if header == BINARYFILE_MAGIC:
decode_cfg = restorebin[4:] # remove header from encrypted config file
new_encode_cfg = DecryptEncrypt(decode_cfg) # process binary to binary config
elif filetype == FileType.JSON or filetype == FileType.INVALID_JSON:
if args.verbose:
message("Reading restore file '{}' (JSON format)".format(restorefilename), typ=LogType.INFO)
try:
restorefp = open(restorefilename, "r")
jsonconfig = json.load(restorefp)
except ValueError as e:
exit(ExitCode.JSON_READ_ERROR, "File '{}' invalid JSON: {}".format(restorefilename, e), line=inspect.getlineno(inspect.currentframe()))
finally:
restorefp.close()
# process json config to binary config
new_decode_cfg = Mapping2Bin(decode_cfg, jsonconfig, restorefilename)
new_encode_cfg = DecryptEncrypt(new_decode_cfg)
elif filetype == FileType.FILE_NOT_FOUND:
exit(ExitCode.FILE_NOT_FOUND, "File '{}' not found".format(restorefilename),line=inspect.getlineno(inspect.currentframe()))
elif filetype == FileType.INCOMPLETE_JSON:
exit(ExitCode.JSON_READ_ERROR, "File '{}' incomplete JSON, missing name 'header'".format(restorefilename),line=inspect.getlineno(inspect.currentframe()))
elif filetype == FileType.INVALID_BIN:
exit(ExitCode.FILE_READ_ERROR, "File '{}' invalid BIN format".format(restorefilename),line=inspect.getlineno(inspect.currentframe()))
else:
exit(ExitCode.FILE_READ_ERROR, "File '{}' unknown error".format(restorefilename),line=inspect.getlineno(inspect.currentframe()))
if new_encode_cfg is not None:
if args.forcerestore or new_encode_cfg != encode_cfg:
# write config direct to device via http
if args.device is not None:
if args.verbose:
message("Push new data to '{}' using restore file '{}'".format(args.device, restorefilename), typ=LogType.INFO)
error_code, error_str = PushTasmotaConfig(new_encode_cfg, args.device, args.port, args.username, args.password)
if error_code:
exit(ExitCode.UPLOAD_CONFIG_ERROR, "Config data upload failed - {}".format(error_str),line=inspect.getlineno(inspect.currentframe()))
else:
if args.verbose:
message("Restore successful to device '{}' using restore file '{}'".format(args.device, restorefilename), typ=LogType.INFO)
# write config from a file
elif args.tasmotafile is not None:
if args.verbose:
message("Write new data to file '{}' using restore file '{}'".format(args.tasmotafile, restorefilename), typ=LogType.INFO)
try:
outputfile = open(args.tasmotafile, "wb")
outputfile.write(new_encode_cfg)
except Exception, e:
exit(e[0], "'{}' {}".format(args.tasmotafile, e[1]),line=inspect.getlineno(inspect.currentframe()))
finally:
outputfile.close()
if args.verbose:
message("Restore successful to file '{}' using restore file '{}'".format(args.tasmotafile, restorefilename), typ=LogType.INFO)
else:
global exitcode
exitcode = ExitCode.RESTORE_SKIPPED
if args.verbose:
message("Configuration data leaving unchanged", typ=LogType.INFO)
def OutputTasmotaCmnds(tasmotacmnds):
"""
Print Tasmota command mapping
@param tasmotacmnds:
Tasmota command mapping {group: [cmnd <,cmnd <,...>>]}
"""
def OutputTasmotaSubCmnds(cmnds):
if args.cmndsort:
for cmnd in sorted(cmnds, key = lambda cmnd:[int(c) if c.isdigit() else c for c in re.split('(\d+)', cmnd)]):
print "{}{}".format(" "*args.cmndindent, cmnd)
else:
for cmnd in cmnds:
print "{}{}".format(" "*args.cmndindent, cmnd)
if args.cmndgroup:
for group in Groups:
if group in tasmotacmnds:
cmnds = tasmotacmnds[group]
print
print "# {}:".format(group)
OutputTasmotaSubCmnds(cmnds)
else:
cmnds = []
for group in Groups:
if group in tasmotacmnds:
cmnds.extend(tasmotacmnds[group])
OutputTasmotaSubCmnds(cmnds)
def ParseArgs():
"""
Program argument parser
@return:
configargparse.parse_args() result
"""
global parser
parser = configargparse.ArgumentParser(description='Backup/Restore Sonoff-Tasmota configuration data.',
epilog='Either argument -d <host> or -f <filename> must be given.',
add_help=False,
formatter_class=lambda prog: CustomHelpFormatter(prog))
source = parser.add_argument_group('Source', 'Read/Write Tasmota configuration from/to')
source.add_argument('-f', '--file', '--tasmota-file',
metavar='<filename>',
dest='tasmotafile',
default=DEFAULTS['source']['tasmotafile'],
help="file to retrieve/write Tasmota configuration from/to (default: {})'".format(DEFAULTS['source']['tasmotafile']))
source.add_argument('-d', '--device', '--host',
metavar='<host>',
dest='device',
default=DEFAULTS['source']['device'],
help="hostname or IP address to retrieve/send Tasmota configuration from/to (default: {})".format(DEFAULTS['source']['device']) )
source.add_argument('-P', '--port',
metavar='<port>',
dest='port',
default=DEFAULTS['source']['port'],
help="TCP/IP port number to use for the host connection (default: {})".format(DEFAULTS['source']['port']) )
source.add_argument('-u', '--username',
metavar='<username>',
dest='username',
default=DEFAULTS['source']['username'],
help="host HTTP access username (default: {})".format(DEFAULTS['source']['username']))
source.add_argument('-p', '--password',
metavar='<password>',
dest='password',
default=DEFAULTS['source']['password'],
help="host HTTP access password (default: {})".format(DEFAULTS['source']['password']))
backup = parser.add_argument_group('Backup/Restore', 'Backup & restore specification')
backup.add_argument('-i', '--restore-file',
metavar='<filename>',
dest='restorefile',
default=DEFAULTS['backup']['backupfile'],
help="file to restore configuration from (default: {}). Replacements: @v=firmware version from config, @f=device friendly name from config, @h=device hostname from config, @H=device hostname from device (-d arg only)".format(DEFAULTS['backup']['restorefile']))
backup.add_argument('-o', '--backup-file',
metavar='<filename>',
dest='backupfile',
default=DEFAULTS['backup']['backupfile'],
help="file to backup configuration to (default: {}). Replacements: @v=firmware version from config, @f=device friendly name from config, @h=device hostname from config, @H=device hostname from device (-d arg only)".format(DEFAULTS['backup']['backupfile']))
backup_file_formats = ['json', 'bin', 'dmp']
backup.add_argument('-t', '--backup-type',
metavar='|'.join(backup_file_formats),
dest='backupfileformat',
choices=backup_file_formats,
default=DEFAULTS['backup']['backupfileformat'],
help="backup filetype (default: '{}')".format(DEFAULTS['backup']['backupfileformat']) )
backup.add_argument('-E', '--extension',
dest='extension',
action='store_true',
default=DEFAULTS['backup']['extension'],
help="append filetype extension for -i and -o filename{}".format(' (default)' if DEFAULTS['backup']['extension'] else '') )
backup.add_argument('-e', '--no-extension',
dest='extension',
action='store_false',
default=DEFAULTS['backup']['extension'],
help="do not append filetype extension, use -i and -o filename as passed{}".format(' (default)' if not DEFAULTS['backup']['extension'] else '') )
backup.add_argument('-F', '--force-restore',
dest='forcerestore',
action='store_true',
default=DEFAULTS['backup']['forcerestore'],
help="force restore even configuration is identical{}".format(' (default)' if DEFAULTS['backup']['forcerestore'] else '') )
jsonformat = parser.add_argument_group('JSON output', 'JSON format specification')
jsonformat.add_argument('--json-indent',
metavar='<indent>',
dest='jsonindent',
type=int,
default=DEFAULTS['jsonformat']['jsonindent'],
help="pretty-printed JSON output using indent level (default: '{}'). -1 disables indent.".format(DEFAULTS['jsonformat']['jsonindent']) )
jsonformat.add_argument('--json-compact',
dest='jsoncompact',
action='store_true',
default=DEFAULTS['jsonformat']['jsoncompact'],
help="compact JSON output by eliminate whitespace{}".format(' (default)' if DEFAULTS['jsonformat']['jsoncompact'] else '') )
jsonformat.add_argument('--json-sort',
dest='jsonsort',
action='store_true',
default=DEFAULTS['jsonformat']['jsonsort'],
help=configargparse.SUPPRESS) #"sort json keywords{}".format(' (default)' if DEFAULTS['jsonformat']['jsonsort'] else '') )
jsonformat.add_argument('--json-unsort',
dest='jsonsort',
action='store_false',
default=DEFAULTS['jsonformat']['jsonsort'],
help=configargparse.SUPPRESS) #"do not sort json keywords{}".format(' (default)' if not DEFAULTS['jsonformat']['jsonsort'] else '') )
jsonformat.add_argument('--json-hide-pw',
dest='jsonhidepw',
action='store_true',
default=DEFAULTS['jsonformat']['jsonhidepw'],
help="hide passwords{}".format(' (default)' if DEFAULTS['jsonformat']['jsonhidepw'] else '') )
jsonformat.add_argument('--json-show-pw', '--json-unhide-pw',
dest='jsonhidepw',
action='store_false',
default=DEFAULTS['jsonformat']['jsonhidepw'],
help="unhide passwords{}".format(' (default)' if not DEFAULTS['jsonformat']['jsonhidepw'] else '') )
cmndformat = parser.add_argument_group('Tasmota command output', 'Tasmota command output format specification')
cmndformat.add_argument('--cmnd-indent',
metavar='<indent>',
dest='cmndindent',
type=int,
default=DEFAULTS['cmndformat']['cmndindent'],
help="Tasmota command grouping indent level (default: '{}'). 0 disables indent".format(DEFAULTS['cmndformat']['cmndindent']) )
cmndformat.add_argument('--cmnd-groups',
dest='cmndgroup',
action='store_true',
default=DEFAULTS['cmndformat']['cmndgroup'],
help="group Tasmota commands{}".format(' (default)' if DEFAULTS['cmndformat']['cmndgroup'] else '') )
cmndformat.add_argument('--cmnd-nogroups',
dest='cmndgroup',
action='store_false',
default=DEFAULTS['cmndformat']['cmndgroup'],
help="leave Tasmota commands ungrouped{}".format(' (default)' if not DEFAULTS['cmndformat']['cmndgroup'] else '') )
cmndformat.add_argument('--cmnd-sort',
dest='cmndsort',
action='store_true',
default=DEFAULTS['cmndformat']['cmndsort'],
help="sort Tasmota commands{}".format(' (default)' if DEFAULTS['cmndformat']['cmndsort'] else '') )
cmndformat.add_argument('--cmnd-unsort',
dest='cmndsort',
action='store_false',
default=DEFAULTS['cmndformat']['cmndsort'],
help="leave Tasmota commands unsorted{}".format(' (default)' if not DEFAULTS['cmndformat']['cmndsort'] else '') )
common = parser.add_argument_group('Common', 'Optional arguments')
common.add_argument('-c', '--config',
metavar='<filename>',
dest='configfile',
default=DEFAULTS['common']['configfile'],
is_config_file=True,
help="program config file - can be used to set default command args (default: {})".format(DEFAULTS['common']['configfile']) )
common.add_argument('-S', '--output',
dest='output',
action='store_true',
default=DEFAULTS['common']['output'],
help="display output regardsless of backup/restore usage{}".format(" (default)" if DEFAULTS['common']['output'] else " (default do not output on backup or restore usage)") )
output_formats = ['json', 'cmnd','command']
common.add_argument('-T', '--output-format',
metavar='|'.join(output_formats),
dest='outputformat',
choices=output_formats,
default=DEFAULTS['common']['outputformat'],
help="display output format (default: '{}')".format(DEFAULTS['common']['outputformat']) )
groups = GetGroupList(Settings[0][2])
if '*' in groups:
groups.remove('*')
common.add_argument('-g', '--group',
dest='filter',
choices=groups,
nargs='+',
default=DEFAULTS['common']['filter'],
help="limit data processing to command groups (default {})".format("no filter" if DEFAULTS['common']['filter'] == None else DEFAULTS['common']['filter']) )
common.add_argument('--ignore-warnings',
dest='ignorewarning',
action='store_true',
default=DEFAULTS['common']['ignorewarning'],
help="do not exit on warnings{}. Not recommended, used by your own responsibility!".format(' (default)' if DEFAULTS['common']['ignorewarning'] else '') )
info = parser.add_argument_group('Info','Extra information')
info.add_argument('-D', '--debug',
dest='debug',
action='store_true',
help=configargparse.SUPPRESS)
info.add_argument('-h', '--help',
dest='shorthelp',
action='store_true',
help='show usage help message and exit')
info.add_argument("-H", "--full-help",
action="help",
help="show full help message and exit")
info.add_argument('-v', '--verbose',
dest='verbose',
action='store_true',
help='produce more output about what the program does')
info.add_argument('-V', '--version',
action='version',
version=PROG)
args = parser.parse_args()
if args.debug:
print >> sys.stderr, parser.format_values()
print >> sys.stderr, "Settings:"
for k in args.__dict__:
print >> sys.stderr, " "+str(k), "= ",eval('args.{}'.format(k))
return args
if __name__ == "__main__":
args = ParseArgs()
if args.shorthelp:
ShortHelp()
# check source args
if args.device is not None and args.tasmotafile is not None:
exit(ExitCode.ARGUMENT_ERROR, "Unable to select source, do not use -d and -f together",line=inspect.getlineno(inspect.currentframe()))
# default no configuration available
encode_cfg = None
# pull config from Tasmota device
if args.tasmotafile is not None:
if args.verbose:
message("Load data from file '{}'".format(args.tasmotafile), typ=LogType.INFO)
encode_cfg = LoadTasmotaConfig(args.tasmotafile)
# load config from Tasmota file
if args.device is not None:
if args.verbose:
message("Load data from device '{}'".format(args.device), typ=LogType.INFO)
encode_cfg = PullTasmotaConfig(args.device, args.port, username=args.username, password=args.password)
if encode_cfg is None:
# no config source given
ShortHelp(False)
print
print parser.epilog
sys.exit(ExitCode.OK)
if len(encode_cfg) == 0:
exit(ExitCode.FILE_READ_ERROR, "Unable to read configuration data from {} '{}'".format('device' if args.device is not None else 'file', \
args.device if args.device is not None else args.tasmotafile) \
,line=inspect.getlineno(inspect.currentframe()) )
# decrypt Tasmota config
decode_cfg = DecryptEncrypt(encode_cfg)
# decode into mappings dictionary
configmapping = Bin2Mapping(decode_cfg)
if args.verbose and 'version' in configmapping:
message("{} '{}' is using version {}".format('File' if args.tasmotafile is not None else 'Device',
args.tasmotafile if args.tasmotafile is not None else args.device,
GetVersionStr(configmapping['version'])),
typ=LogType.INFO)
# backup to file
if args.backupfile is not None:
Backup(args.backupfile, args.backupfileformat, encode_cfg, decode_cfg, configmapping)
# restore from file
if args.restorefile is not None:
Restore(args.restorefile, encode_cfg, decode_cfg, configmapping)
# json screen output
if (args.backupfile is None and args.restorefile is None) or args.output:
if args.outputformat == 'json':
print json.dumps(configmapping, sort_keys=args.jsonsort, indent=None if args.jsonindent<0 else args.jsonindent, separators=(',', ':') if args.jsoncompact else (', ', ': ') )
if args.outputformat == 'cmnd' or args.outputformat == 'command':
tasmotacmnds = Mapping2Cmnd(decode_cfg, configmapping)
OutputTasmotaCmnds(tasmotacmnds)
sys.exit(exitcode)