From a8aa6fac005245cdac7f79d574ff81e6ff147d82 Mon Sep 17 00:00:00 2001 From: Stephan Hadinger Date: Thu, 26 May 2022 18:42:00 +0200 Subject: [PATCH] Zigbee flasher for Sonoff Zigbee Bridge Pro --- tasmota/berry/zigbee/cc2652_flasher.be | 385 ++++++++++++++++++ .../berry/zigbee/cc2652_flasher_example.be | 5 + tasmota/berry/zigbee/intelhex.be | 133 ++++++ tasmota/berry/zigbee/sonoff_zb_pro_flasher.be | 173 ++++++++ 4 files changed, 696 insertions(+) create mode 100644 tasmota/berry/zigbee/cc2652_flasher.be create mode 100644 tasmota/berry/zigbee/cc2652_flasher_example.be create mode 100644 tasmota/berry/zigbee/intelhex.be create mode 100644 tasmota/berry/zigbee/sonoff_zb_pro_flasher.be diff --git a/tasmota/berry/zigbee/cc2652_flasher.be b/tasmota/berry/zigbee/cc2652_flasher.be new file mode 100644 index 000000000..d4431a4fa --- /dev/null +++ b/tasmota/berry/zigbee/cc2652_flasher.be @@ -0,0 +1,385 @@ + +################################################################################# +# +# class `cc2652_flasher` +# +# Flash libraries for CC2652: read, write, verify... +# +# The serial protocol requires the CC2652 to boot in BSL (bootloader) mode. +# On Sonoff Zigbee Bridge Pro, it requires to reset the MCU with DIO_8 low. +# +# When starting the flasher, normal zigbee operations are aborterd. Restarting +# normal zigbee functions requires a Tasmota restart. +# +# Required condiguration: +# - Zigbee Rx: must be configured as `Zigbee Rx` or `TCP Rx` +# - Zigbee Tx: must be configured as `Zigbee Tx` or `TCP Tx` +# - Zigbee Reset: must be configured as `Zigbee Rst - 1`` +# - Zigbee BSL mode (low): must be configured as `Zigbee Rts - 2` +# For Sonoff Zibeee Bridge Pro: Rx=23 Tx=19 Rst-1=15 Rst-2=22 +# +# How to use: +# - `import cc2652_flasher as cc` +# aborts all zigbee operations and configures the serial port +# Output: `FLH: cc2652_flasher rx=23 tx=19 rst=15 bsl=22` +# +# - `cc.start()` to start the flasher` +# restarts the CC2652 in BSL mode, and establishes connection +# Use `cc.start(true)` to enable debug verbose mode +# +# - `cc.ping()` sends a ping command and waits for ACK (does nothing) +# +# - `cc.cmd_get_chip_id()` returns the chip ID +# +# - `cc.flash_read(addr, len)` reads `len` bytes from address `addr`` +# len must be a multiple of 4 and less or equal than 128 bytes +# Returns a bytes() object +# +# - `cc.flash_crc32(addr, len)` returns the CRC32 of a flash region +# +# - `cc.flash_write(addr, data)` writes bytes to the flash +# `data` is a bytes() buffer, its len must be a multiple of 4 and less or equal than 128 +# This call does not erase the flash, so it must have been erased before. +# The bootloader checks that the bytes were correctly written, i.e. that +# the appropriate bits were changed from `1` to `0`. +# Chaning bits from `0` to `1` requires a flash erase. +# +# - `cc.flash_erase()` erase the entire flash. +# Use with caution. After the flash is erased, there is no valid application +# in flash so the MCU always starts in BSL bootloader mode until a valid +# app is flashed. +# +# - `cc.flash_dump_to_file(filename, addr, len)` dumps the CC2652 flash into a file +# `filename` is the output file in binary format, make sure there are 360KB free in filesystem. +# Dumping the complete file is done as follows (it takes 3 minutes during which Tasmota is unresponsive): +# `cc.dump_to_file("cc2652_dump.bin", 0x00000, 0x58000)` +# +################################################################################# + +#- Example + +import string +import cc2652_flasher as cc +cc.start() +cc.ping() +print(string.format("0x%08X", cc.cmd_get_chip_id())) +# output: 0x3202F000 + +# Dumping CC2652 flash into filesystem +# This takes 3 minutes during which Tasmota is unresponsive +# +import cc2652_flasher as cc +cc.start() +cc.flash_dump_to_file("cc2652_dump.bin", 0x00000, 0x58000) + +-# + +class cc2652_flasher + var ser # serial object + var debug # verbose logs? + var rx, tx, rst, bsl # GPIO numbers + + # init - abort zigbee operations and starts the serial driver + # args are optional + def init(rx, tx, rst, bsl) + import string + + self.debug = false + self.rx = (rx == nil) ? -1 : rx + self.tx = (tx == nil) ? -1 : tx + self.rst = (rst == nil) ? -1 : rst + self.bsl = (bsl == nil) ? -1 : bsl + # + if self.rx < 0 self.rx = gpio.pin(gpio.ZIGBEE_RX) end + if self.rx < 0 self.rx = gpio.pin(gpio.TCP_RX) end + if self.tx < 0 self.tx = gpio.pin(gpio.ZIGBEE_TX) end + if self.tx < 0 self.tx = gpio.pin(gpio.TCP_TX) end + if self.rst < 0 self.rst = gpio.pin(gpio.ZIGBEE_RST, 0) end + if self.bsl < 0 self.bsl = gpio.pin(gpio.ZIGBEE_RST, 1) end + print(string.format("FLH: cc2652_flasher rx=%i tx=%i rst=%i bsl=%i", self.rx, self.tx, self.rst, self.bsl)) + # tasmota.log(string.format("FLH: cc2652_flasher rx=%i tx=%i rst=%i bsl=%i", self.rx, self.tx, self.rst, self.bsl), 3) + if self.rx < 0 || self.tx < 0 || self.rst < 0 || self.bsl < 0 + raise "value_error", "cc2652_flasher unspecified GPIOs" + end + # stop all zigbee activity + import zigbee + zigbee.abort() + # good to go + self.ser = serial(self.rx, self.tx, 115200) # initialize UART serial port + end + + # restart the MCU in BSL mode and establish communication + def start(debug) + if debug == nil debug = false end + self.debug = bool(debug) + self.reset_bsl() + # + # print("FLH: cc2652_flasher started") + end + + ################################################################################# + # Low level methods + ################################################################################# + + # restart MCU and enter BSL + # + # arg: + # ser: serial object + def reset_bsl() + self.ser.flush() + + gpio.digital_write(self.bsl, 0) # trigger BSL + + gpio.digital_write(self.rst, 0) + tasmota.delay(10) # wait 10ms + gpio.digital_write(self.rst, 1) + tasmota.delay(100) # wait 100ms + + self.ser.write(bytes("5555")) # trigger auto baudrate detector + var ret = self.recv_raw(100) + if self.debug print("ret=", ret) end + if ret != bytes('CC') + raise "protocol_error" + end + end + + # received buffer and give up if timeout + def recv_raw(timeout) + var due = tasmota.millis() + timeout + while !tasmota.time_reached(due) + if self.ser.available() + var b = self.ser.read() + if self.debug print("b:",b) end + while size(b) > 0 && b[0] == 0 + b = b[1..] + end + return b + end + tasmota.delay(5) # check every 5ms + end + raise "timeout_error", "serial timeout" + end + + # send simple ACK + def send_ack() + if self.debug print("send ACK") end + self.ser.write(bytes("00CC")) + end + + # encode payload + static def encode_payload(b) + var checksum = 0 + for i:0..size(b)-1 + checksum = (checksum + b[i]) & 0xFF + end + var payload = bytes("0000") + payload[0] = size(b) + 2 + payload[1] = checksum + payload += b + payload += bytes("00") + return payload + end + + static def decode_ack(b) + # skip any 00 or CC bytes + while size(b) > 0 && b[0] == 0 + b = b[1..] + end + if size(b) == 0 || b[0] != 0xCC + raise "serial_error", "missing ACK" + end + end + + static def decode_payload(b) + # skip any 00 or CC bytes + while size(b) > 0 && (b[0] == 0 || b[0] == 0xCC) + b = b[1..] + end + + # check buffer + var sz = b[0] + if size(b) < sz || sz < 2 raise "serial_error", "buffer too small" end + # + var payload = b[2..sz-1] + + var checksum = 0 + for i:0..size(payload)-1 + checksum = (checksum + payload[i]) & 0xFF + end + if checksum != b[1] raise "serial_error", "invalid checksum received" end + + return payload + end + + # send + # args: + # b: logical bytes() to send + # no_response: true if ignore any response, or ignore to get a response + def send(b, no_response) + # compute + var payload = self.encode_payload(b) + if self.debug print("sending:", payload) end + self.ser.write(payload) + var ret = self.recv_raw(500) + if self.debug print("ret=", ret) end + if no_response == true + #ignore + self.decode_ack(ret) + return nil + else + payload = self.decode_payload(ret) + self.send_ack() + return payload + end + end + + # Higher level functions + # 64 - COMMAND_RET_SUCCESS + # 65 - COMMAND_RET_UNKNOWN_CMD + # 66 - COMMAND_RET_INVALID_CMD + # 67 - COMMAND_RET_INVALID_ADR + # 68 - COMMAND_RET_FLASH_FAIL + def cmd_get_status() + var payload = self.send(bytes("23")) + return payload[0] + end + + # Get the value of the 32-bit user ID from the AON_PMCTL JTAGUSERCODE register + def cmd_get_chip_id() + var payload = self.send(bytes("28")) + return payload.get(0, -4) + end + + def cmd_memory_read(addr, len) + if (len % 4 != 0) raise "value_error", "len must be a multiple of 4" end + if len > 128 raise "value_error", "len is bigger than 128" end + var b = bytes("2A") + b.add(addr, -4) + b.add(1) + b.add(len/4) + + return self.send(b) + end + + # does not look to be implemented + # def cmd_memory_write(addr, data) + # var sz = size(data) + # if (sz % 4 != 0) raise "value_error", "len must be a multiple of 4" end + # if sz > 128 raise "value_error", "len is bigger than 128" end + # var b = bytes("2B") + # b.add(addr, -4) + # b.add(1) + # b += data + # print("cmd_memory_write",b) + + # return self.send(b) + # end + + def cmd_download(addr, sz) + if (sz % 4 != 0) raise "value_error", "len must be a multiple of 4" end + if sz > 128 raise "value_error", "len is bigger than 128" end + var b = bytes("21") + b.add(addr, -4) + b.add(sz, -4) + if self.debug print("cmd_download",b) + + return self.send(b, true) + end + + def cmd_send_data(data) + var sz = size(data) + if (sz % 4 != 0) raise "value_error", "len must be a multiple of 4" end + if sz > 128 raise "value_error", "len is bigger than 128" end + var b = bytes("24") + b += data + if self.debug print("cmd_send_data",b) + + return self.send(b, true) + end + + # WARNING: this command erases all of the customer-accessible flash sectors + # After this operation, since CCFG is not configured, the device will always reboot in BSL (bootloader) mode + # until CCFG is actually re-written + # + def cmd_bank_erase() + self.send(bytes("2C"), true) + end + + # compute crc32 for a memory range + # repeat count if forced to 0x00000000 to read each location only once + def cmd_crc32(addr, len) + var b = bytes("27") + b.add(addr, -4) + b.add(len, -4) + b.add(0, -4) # repeat count = 0 + return self.send(b) + end + + ################################################################################# + # High level methods + ################################################################################# + + def flash_read(addr, len) + return self.cmd_memory_read(addr, len) + end + + def flash_crc32(addr, len) + return self.cmd_crc32(addr, len) + end + + def flash_erase() + self.cmd_bank_erase() + end + + # send ping + def ping() + self.send(bytes("20"), true) + end + + # higher level + def flash_write(addr, data) + import string + + var sz = size(data) + if (sz % 4 != 0) raise "value_error", "len must be a multiple of 4" end + if sz > 128 raise "value_error", "len is bigger than 128" end + + var ret + + ret = self.cmd_download(addr, size(data)) + #print(">cmd_download", r) + + var ack + ack = self.cmd_get_status() + if ack != 0x40 raise "serial_error", string.format("command failed: 0x%02X - 0x%06X (%i)", ack, addr, sz) end + + ret = self.cmd_send_data(data) + + ack = self.cmd_get_status() + if ack != 0x40 raise "serial_error", string.format("command failed: 0x%02X - 0x%06X (%i)", ack, addr, sz) end + + end + + # dump the flash into a bin file + def flash_dump_to_file(filename, addr, len) + var offset = addr + var f + + try + f = open(filename,"w") + while len > 0 + var b = self.cmd_memory_read(offset, 32) + f.write(b) + offset += 32 + len -= 32 + tasmota.yield() + end + except .. as e, m + if f != nil f.close() end + raise e, m + end + f.close() + end +end + +return cc2652_flasher() diff --git a/tasmota/berry/zigbee/cc2652_flasher_example.be b/tasmota/berry/zigbee/cc2652_flasher_example.be new file mode 100644 index 000000000..b1e7e725d --- /dev/null +++ b/tasmota/berry/zigbee/cc2652_flasher_example.be @@ -0,0 +1,5 @@ +import cc2652_flasher as fl + +fl.start(true) +print("CCFG=",fl.cmd_memory_read(0x57FD8,4)) +print("crc32=",fl.cmd_crc32(0x0,0x30000)) # bytes('1598929A') diff --git a/tasmota/berry/zigbee/intelhex.be b/tasmota/berry/zigbee/intelhex.be new file mode 100644 index 000000000..08393b585 --- /dev/null +++ b/tasmota/berry/zigbee/intelhex.be @@ -0,0 +1,133 @@ +# parse intelhex file +# +# use: `import intelhex` +# +# This class allows to open and parse an Intel HEX format file. +# Basic checks are done to make sure that the format is valid. +# +# If anything goes wrong, an exception is raised. +# +# You provide 3 callbacks: +# - pre: method called when file was opened, but content not parsed - no arg, no return (raise an exception if error) +# this is a good place to erase flash before writing +# - cb: method called for each line containing data to be flashed. args(address:int, len_in_bytes:int, data:bytes, offset:int) +# address: full address in Flash (high and low are automatically combined) +# len_in_bytes: length of the data line, generally max 32 bytes +# data: bytes() object containing raw bytes +# offset: offset in bytes() object, skipping first bytes without needing to reallocate buffer +# - post: method called when all data was parsed, and before the file is closed - no arg, no return (raise an exception if error) +# this is a good place to check CRC32 or any checksum +# + +class intelhex + var filename # filename of hex file + var f # file object + var file_parsed # was the file already parsed. It cannot be flashed if not previously parsed and validated + var file_validated # was the file already validated. It cannot be flashed if not previously parsed and validated + + def init(filename) + self.filename = str(filename) + self.file_parsed = false + self.file_validated = true + end + + def close() + if self.f != nil + self.f.close() + self.f = nil + end + end + + # open the intelhex file and parse from start to end + # + # Args: + # - pre: method called when file was opened, but content not parsed - no arg, no return (raise an exception if error) + # this is a good place to erase flash before writing + # - cb: method called for each line containing data to be flashed. args(address:int, len_in_bytes:int, data:bytes, offset:int) + # address: full address in Flash (high and low are automatically combined) + # len_in_bytes: length of the data line, generally max 32 bytes + # data: bytes() object containing raw bytes + # offset: offset in bytes() object, skipping first bytes without needing to reallocate buffer + # - post: method called when all data was parsed, and before the file is closed - no arg, no return (raise an exception if error) + # this is a good place to check CRC32 or any checksum + # + def parse(pre, parse_cb, post) + try + self.f = open(self.filename, "rb") + self.file_parsed = true # we tried to parse + pre() + + self.munch_line(parse_cb) + + post() # validated is computed internally + + except .. as e, m + self.close() + self.file_validated = false + raise e, m # re-raise + end + + self.close() + end + + # internally used, verify each line + def munch_line(parse_cb) + import crc + var crc_sum = crc.sum + var tas = tasmota + var yield = tasmota.yield + + var offset_high = 0 + var offset_low = 0 + var b = bytes() + var b_get = b.get + var b_fromhex = b.fromhex + var self_f = self.f + var readline = self_f.readline + while true + yield(tas) # tasmota.yield() -- faster version + var line = readline(self_f) # self.f.readline() + # print(line) + if line == "" raise "value_error", "unexpected end of file" end + if line[0] != ":" continue end # ignore empty line or not starting with ':' + + b = b_fromhex(b, line, 1) # b.fromhex(line, 1) # convert to bytes, avoid allocating a new object + var sz = b[0] + + # check size + if size(b) != sz+5 raise "value_error", "invalid size for line: "+line end + + var record_type = b[3] + # 00: low address + data + # 01: end of file + # 04: high address + if record_type != 0 && record_type != 1 && record_type != 4 + raise "value_error", "unsupported record_type: "+str(record_type) + end + + offset_low = b_get(b, 1, -2) # b.get(1,-2) + var checksum = crc_sum(b) + if checksum != 0 raise "value_error", "invalid checksum" end + + if record_type == 1 break end # end of file + if record_type == 0 + # data + var address = offset_high << 16 | offset_low # full address + #var data = b[4..-2] # actual payload + parse_cb(address, sz, b, 4) + + # OK + # do whatever needed + import string + # print(string.format("addr=0x%06X len=0x%02X", address, sz)) + elif record_type == 4 + if offset_low != 0 raise "value_error", "offset_low not null for cmd 04" end + offset_high = b_get(b, 4, -2) # b.get(4,-2) + + end + end + end + +end + +return intelhex diff --git a/tasmota/berry/zigbee/sonoff_zb_pro_flasher.be b/tasmota/berry/zigbee/sonoff_zb_pro_flasher.be new file mode 100644 index 000000000..5c6f5e8e8 --- /dev/null +++ b/tasmota/berry/zigbee/sonoff_zb_pro_flasher.be @@ -0,0 +1,173 @@ + +################################################################################# +# +# class `sonoff_zb_pro_flasher` +# +################################################################################# + +class sonoff_zb_pro_flasher + static CCFG_address = 0x057FD8 + static CCFG_reference = 0xC5FE08C5 + + ################################################################################# + # Flashing from Intel HEX files + ################################################################################# + var filename # filename of hex file + var f # file object + var file_checked # was the file already parsed. It cannot be flashed if not previously parsed and validated + var file_validated # was the file already validated. It cannot be flashed if not previously parsed and validated + var file_hex # intelhex object + var flasher # low-level flasher object (cc2652_flasher instance) + + def init() + self.file_checked = false + self.file_validated = false + end + + def load(filename) + import intelhex + + if type(filename) != 'string' raise "value_error", "invalid file name" end + self.filename = filename + self.file_hex = intelhex(filename) # prepare the parser object + self.file_checked = false + self.file_validated = false + end + + ################################################################################# + # check that the HEX file is valid + # parse it completely once, and verify some values + ################################################################################# + def check() + self.file_hex.parse(/ -> self._check_pre(), + / address, len, data, offset -> self._check_cb(address, len, data, offset), + / -> self._check_post() + ) + end + + ################################################################################# + # Flash the firmware to the device + # + # Actions: + # 1. + ################################################################################# + def flash() + if !self.file_checked + print("FLH: firmware not checked, use `cc.check()`") + raise "flash_error", "firmware not checked" + end + if !self.file_validated + print("FLH: firmware not validated, use `cc.check()`") + raise "flash_error", "firmware not validated" + end + + import cc2652_flasher # this stops zigbee and configures serial + self.flasher = cc2652_flasher + + try + self.file_hex.parse(/ -> self._flash_pre(), + / address, len, data, offset -> self._flash_cb(address, len, data, offset), + / -> self._flash_post() + ) + except .. as e, m + self.file_checked = false + self.file_validated = false + raise e, m + end + end + + + ################################################################################# + # low-level + ################################################################################# + def _flash_pre() + print("FLH: Flashing started") + self.flasher.start() + self.flasher.ping() + # erase flash + self.flasher.flash_erase() + end + + def _flash_post() + print("FLH: Flashing completed: OK") + var flash_crc = self.flasher.cmd_crc32(0x0,0x30000) + print("FLH: Flash crc32 0x000000 - 0x2FFFF = " + str(flash_crc)); + # tasmota.log("FLH: Verification of HEX file OK", 2) + end + + def _flash_cb(addr, sz, data, offset) + var payload = data[offset .. offset + sz - 1] + + # final check + if size(payload) != sz raise "flash_error", "incomplete payload" end + + self.flasher.flash_write(addr, payload) + end + + + # start verification (log only) + def _check_pre() + print("FLH: Starting verification of HEX file") + # tasmota.log("FLH: Starting verification of HEX file", 2) + end + + # don't flash so ignore data + # check CCFG at location 0x57FD8 (4 bytes) + def _check_cb(addr, sz, data, offset) + # import string + + # check than sz is a multiple of 4 + if (sz % 4 != 0) + import string + raise "value_error", string.format("size of payload is not a mutliple of 4: 0x%06X", addr) + end + + # print(string.format("> addr=0x%06X sz=0x%02X data=%s", addr, sz, data[offset..offset+sz-1])) + var CCFG = self.CCFG_address + if addr <= CCFG && addr+sz > CCFG+4 + # we have CCFG in the buffer + var ccfg_bytes = data.get(4 + CCFG - addr, 4) + + if ccfg_bytes != self.CCFG_reference + import string + raise "value_error", string.format("incorrect CCFG, BSL is not set to DIO_8 LOW (0x%08X expected 0x%08X)", ccfg_bytes, self.CCFG_reference) end + self.file_validated = true # if we are here, it means that the file looks correct + end + end + + def _check_post() + print("FLH: Verification of HEX file OK") + # tasmota.log("FLH: Verification of HEX file OK", 2) + self.file_checked = true + end + +end + +return sonoff_zb_pro_flasher() + + +#- + +import sonoff_zb_pro_flasher as cc +cc.load("znp_patched.hex") +cc.check() +cc.flash() + + +# test with invalid +import sonoff_zb_pro_flasher as cc +cc.load("znp_dont_use.hex") +cc.check() + + + + +print("start") +var f = open("znp_patched.hex") +while true + var r = f.readline() + if r == "" break end +end +print("end") + +-#