mirror of https://github.com/arendst/Tasmota.git
868 lines
33 KiB
Plaintext
868 lines
33 KiB
Plaintext
#######################################################################
|
|
# Partition manager for ESP32 - ESP32C3 - ESP32S2
|
|
#
|
|
# use : `import partition`
|
|
#
|
|
# Provides low-level objects and a Web UI
|
|
#######################################################################
|
|
|
|
var partition = module('partition')
|
|
|
|
#######################################################################
|
|
# Class for a partition table entry
|
|
#
|
|
# typedef struct {
|
|
# uint16_t magic;
|
|
# uint8_t type;
|
|
# uint8_t subtype;
|
|
# uint32_t offset;
|
|
# uint32_t size;
|
|
# uint8_t label[16];
|
|
# uint32_t flags;
|
|
# } esp_partition_info_t_simplified;
|
|
#
|
|
#######################################################################
|
|
class Partition_info
|
|
var type
|
|
var subtype
|
|
var start
|
|
var sz
|
|
var label
|
|
var flags
|
|
|
|
#- remove trailing NULL chars from a buffer before converting to string -#
|
|
#- Berry strings can contain NULL, but this messes up C-Berry interface -#
|
|
static def remove_trailing_zeroes(b)
|
|
while size(b) > 0
|
|
if b[-1] == 0 b.resize(size(b)-1)
|
|
else break end
|
|
end
|
|
return b
|
|
end
|
|
|
|
#
|
|
def init(raw)
|
|
if raw == nil || !issubclass(bytes, raw) # no payload, empty partition information
|
|
self.type = 0
|
|
self.subtype = 0
|
|
self.start = 0
|
|
self.sz = 0
|
|
self.label = ''
|
|
self.flags = 0
|
|
return
|
|
end
|
|
|
|
#- we have a payload, parse it -#
|
|
var magic = raw.get(0,2)
|
|
if magic == 0x50AA #- partition entry -#
|
|
|
|
self.type = raw.get(2,1)
|
|
self.subtype = raw.get(3,1)
|
|
self.start = raw.get(4,4)
|
|
self.sz = raw.get(8,4)
|
|
self.label = self.remove_trailing_zeroes(raw[12..27]).asstring()
|
|
self.flags = raw.get(28,4)
|
|
|
|
elif magic == 0xEBEB #- MD5 -#
|
|
else
|
|
raise "internal_error", format("invalid magic number %02X", magic)
|
|
end
|
|
|
|
end
|
|
|
|
# check if the parition is an OTA partition
|
|
# if yes, return OTA number (starting at 0)
|
|
# if no, return nil
|
|
def is_ota()
|
|
if self.type == 0 && (self.subtype >= 0x10 && self.subtype < 0x20)
|
|
return self.subtype - 0x10
|
|
end
|
|
end
|
|
|
|
# check if the parition is a SPIFFS partition
|
|
# returns bool
|
|
def is_spiffs()
|
|
return self.type == 1 && self.subtype == 130
|
|
end
|
|
|
|
# get the actual image size give of the partition
|
|
# returns -1 if the partition is not an app ota partition
|
|
def get_image_size()
|
|
import flash
|
|
if self.is_ota() == nil return -1 end
|
|
try
|
|
var addr = self.start
|
|
var magic_byte = flash.read(addr, 1).get(0, 1)
|
|
if magic_byte != 0xE9 return -1 end
|
|
|
|
var seg_count = flash.read(addr+1, 1).get(0, 1)
|
|
# print("Segment count", seg_count)
|
|
|
|
var seg_offset = addr + 0x20 # sizeof(esp_image_header_t) + sizeof(esp_image_segment_header_t) = 24 + 8
|
|
|
|
for seg_num:0..seg_count-1
|
|
# print(format("Reading 0x%08X", seg_offset))
|
|
var segment_header = flash.read(seg_offset - 8, 8)
|
|
var seg_start_addr = segment_header.get(0, 4)
|
|
var seg_size = segment_header.get(4,4)
|
|
# print(format("Segment %i: flash_offset=0x%08X start_addr=0x%08X size=0x%08X", seg_num, seg_offset, seg_start_addr, seg_size))
|
|
|
|
seg_offset += seg_size + 8 # add segment_length + sizeof(esp_image_segment_header_t)
|
|
end
|
|
var total_size = seg_offset - addr + 1 # add 1KB for safety
|
|
|
|
# print(format("Total size = %i KB", total_size/1024))
|
|
|
|
return total_size
|
|
except .. as e, m
|
|
print("BRY: Exception> '" + e + "' - " + m)
|
|
return -1
|
|
end
|
|
end
|
|
|
|
def tostring()
|
|
var type_s = ""
|
|
var subtype_s = ""
|
|
if self.type == 0 type_s = "app"
|
|
if self.subtype == 0 subtype_s = "factory"
|
|
elif self.subtype >= 0x10 && self.subtype < 0x20 subtype_s = "ota" + str(self.subtype - 0x10)
|
|
elif self.subtype == 0x20 subtype_s = "test"
|
|
end
|
|
elif self.type == 1 type_s = "data"
|
|
if self.subtype == 0x00 subtype_s = "otadata"
|
|
elif self.subtype == 0x01 subtype_s = "phy"
|
|
elif self.subtype == 0x02 subtype_s = "nvs"
|
|
elif self.subtype == 0x03 subtype_s = "coredump"
|
|
elif self.subtype == 0x04 subtype_s = "nvskeys"
|
|
elif self.subtype == 0x05 subtype_s = "efuse_em"
|
|
elif self.subtype == 0x80 subtype_s = "esphttpd"
|
|
elif self.subtype == 0x81 subtype_s = "fat"
|
|
elif self.subtype == 0x82 subtype_s = "spiffs"
|
|
end
|
|
end
|
|
|
|
#- reformat strings -#
|
|
if type_s != "" type_s = " (" + type_s + ")" end
|
|
if subtype_s != "" subtype_s = " (" + subtype_s + ")" end
|
|
return format("<instance: Partition_info(%d%s,%d%s,0x%08X,0x%08X,'%s',0x%X)>",
|
|
self.type, type_s,
|
|
self.subtype, subtype_s,
|
|
self.start, self.sz,
|
|
self.label, self.flags)
|
|
end
|
|
|
|
def tobytes()
|
|
#- convert to raw bytes -#
|
|
var b = bytes('AA50') #- set magic number -#
|
|
b.resize(32).resize(2) #- pre-reserve 32 bytes -#
|
|
b.add(self.type, 1)
|
|
b.add(self.subtype, 1)
|
|
b.add(self.start, 4)
|
|
b.add(self.sz, 4)
|
|
var label = bytes().fromstring(self.label)
|
|
label.resize(16)
|
|
b = b + label
|
|
b.add(self.flags, 4)
|
|
return b
|
|
end
|
|
|
|
end
|
|
partition.Partition_info = Partition_info
|
|
|
|
#-------------------------------------------------------------
|
|
- OTA Data
|
|
-
|
|
- Selection of the active OTA partition
|
|
-
|
|
typedef struct {
|
|
uint32_t ota_seq;
|
|
uint8_t seq_label[20];
|
|
uint32_t ota_state;
|
|
uint32_t crc; /* CRC32 of ota_seq field only */
|
|
} esp_ota_select_entry_t;
|
|
|
|
- Excerp from esp_ota_ops.c
|
|
esp32_idf use two sector for store information about which partition is running
|
|
it defined the two sector as ota data partition,two structure esp_ota_select_entry_t is saved in the two sector
|
|
named data in first sector as otadata[0], second sector data as otadata[1]
|
|
e.g.
|
|
if otadata[0].ota_seq == otadata[1].ota_seq == 0xFFFFFFFF,means ota info partition is in init status
|
|
so it will boot factory application(if there is),if there's no factory application,it will boot ota[0] application
|
|
if otadata[0].ota_seq != 0 and otadata[1].ota_seq != 0,it will choose a max seq ,and get value of max_seq%max_ota_app_number
|
|
and boot a subtype (mask 0x0F) value is (max_seq - 1)%max_ota_app_number,so if want switch to run ota[x],can use next formulas.
|
|
for example, if otadata[0].ota_seq = 4, otadata[1].ota_seq = 5, and there are 8 ota application,
|
|
current running is (5-1)%8 = 4,running ota[4],so if we want to switch to run ota[7],
|
|
we should add otadata[0].ota_seq (is 4) to 4 ,(8-1)%8=7,then it will boot ota[7]
|
|
if A=(B - C)%D
|
|
then B=(A + C)%D + D*n ,n= (0,1,2...)
|
|
so current ota app sub type id is x , dest bin subtype is y,total ota app count is n
|
|
seq will add (x + n*1 + 1 - seq)%n
|
|
-------------------------------------------------------------#
|
|
class Partition_otadata
|
|
var maxota #- number of highest OTA partition, default 1 (double ota0/ota1) -#
|
|
var offset #- offset of the otadata partition (0x2000 in length), default 0xE000 -#
|
|
var active_otadata #- which otadata block is active, 0 or 1, i.e. 0xE000 or 0xF000 -#
|
|
var seq0 #- ota_seq of first block -#
|
|
var seq1 #- ota_seq of second block -#
|
|
|
|
#-------------------------------------------------------------
|
|
- Simple CRC32 imple
|
|
-
|
|
- adapted from Python https://rosettacode.org/wiki/CRC-32#Python
|
|
-------------------------------------------------------------#
|
|
static def crc32_create_table()
|
|
var a = []
|
|
for i:0..255
|
|
var k = i
|
|
for j:0..7
|
|
if k & 1
|
|
k = (k >> 1) & 0x7FFFFFFF
|
|
k ^= 0xedb88320
|
|
else
|
|
k = (k >> 1) & 0x7FFFFFFF
|
|
end
|
|
end
|
|
a.push(k)
|
|
end
|
|
return a
|
|
end
|
|
static crc32_table = Partition_otadata.crc32_create_table()
|
|
|
|
static def crc32_update(buf, crc)
|
|
crc ^= 0xffffffff
|
|
for k:0..size(buf)-1
|
|
crc = (crc >> 8 & 0x00FFFFFF) ^ Partition_otadata.crc32_table[(crc & 0xff) ^ buf[k]]
|
|
end
|
|
return crc ^ 0xffffffff
|
|
end
|
|
|
|
#- crc32 for ota_seq as 32 bits unsigned, with init vector -1 -#
|
|
static def crc32_ota_seq(seq)
|
|
return Partition_otadata.crc32_update(bytes().add(seq, 4), 0xFFFFFFFF)
|
|
end
|
|
|
|
#---------------------------------------------------------------------#
|
|
# Rest of the class
|
|
#---------------------------------------------------------------------#
|
|
def init(maxota, offset)
|
|
self.maxota = maxota
|
|
if self.maxota == nil self.maxota = 1 end
|
|
self.offset = offset
|
|
if self.offset == nil self.offset = 0xE000 end
|
|
self.active_otadata = 0
|
|
self.load()
|
|
end
|
|
|
|
#- update ota_max, needs to recompute everything -#
|
|
def set_ota_max(n)
|
|
self.maxota = n
|
|
end
|
|
|
|
# change the active OTA partition
|
|
def set_active(n)
|
|
var seq_max = 0 #- current highest seq number -#
|
|
var block_act = 0 #- block number containing the highest seq number -#
|
|
|
|
if self.seq0 != nil
|
|
seq_max = self.seq0
|
|
block_act = 0
|
|
end
|
|
if self.seq1 != nil && self.seq1 > seq_max
|
|
seq_max = self.seq1
|
|
block_act = 1
|
|
end
|
|
|
|
#- compute the next sequence number -#
|
|
var actual_ota = (seq_max - 1) % (self.maxota + 1)
|
|
if actual_ota != n #- change only if different -#
|
|
if n > actual_ota seq_max += n - actual_ota
|
|
else seq_max += (self.maxota + 1) - actual_ota + n
|
|
end
|
|
|
|
#- update internal structure -#
|
|
if block_act == 1 #- current block is 1, so update block 0 -#
|
|
self.seq0 = seq_max
|
|
else #- or write to block 1 -#
|
|
self.seq1 = seq_max
|
|
end
|
|
self._validate()
|
|
end
|
|
end
|
|
|
|
#- load otadata from SPI Flash -#
|
|
def load()
|
|
import flash
|
|
var otadata0 = flash.read(0xE000, 32)
|
|
var otadata1 = flash.read(0xF000, 32)
|
|
self.seq0 = otadata0.get(0, 4) #- ota_seq for block 1 -#
|
|
self.seq1 = otadata1.get(0, 4) #- ota_seq for block 2 -#
|
|
var valid0 = otadata0.get(28, 4) == self.crc32_ota_seq(self.seq0) #- is CRC32 valid? -#
|
|
var valid1 = otadata1.get(28, 4) == self.crc32_ota_seq(self.seq1) #- is CRC32 valid? -#
|
|
if !valid0 self.seq0 = nil end
|
|
if !valid1 self.seq1 = nil end
|
|
|
|
self._validate()
|
|
end
|
|
|
|
#- internally used, validate data -#
|
|
def _validate()
|
|
self.active_otadata = 0 #- if none is valid, default to OTA0 -#
|
|
if self.seq0 != nil
|
|
self.active_otadata = (self.seq0 - 1) % (self.maxota + 1)
|
|
end
|
|
if self.seq1 != nil && (self.seq0 == nil || self.seq1 > self.seq0)
|
|
self.active_otadata = (self.seq1 - 1) % (self.maxota + 1)
|
|
end
|
|
end
|
|
|
|
# Save partition information to SPI Flash
|
|
def save()
|
|
import flash
|
|
#- check the block number to save, 0 or 1. Choose the highest ota_seq -#
|
|
var block_to_save = -1 #- invalid -#
|
|
var seq_to_save = -1 #- invalid value -#
|
|
|
|
# check seq0
|
|
if self.seq0 != nil
|
|
seq_to_save = self.seq0
|
|
block_to_save = 0
|
|
end
|
|
if (self.seq1 != nil) && (self.seq1 > seq_to_save)
|
|
seq_to_save = self.seq1
|
|
block_to_save = 1
|
|
end
|
|
# if none was good
|
|
if block_to_save < 0 block_to_save = 0 end
|
|
if seq_to_save < 0 seq_to_save = 1 end
|
|
|
|
var offset_to_save = self.offset + 0x1000 * block_to_save #- default 0xE000 or 0xF000 -#
|
|
|
|
var bytes_to_save = bytes()
|
|
bytes_to_save.add(seq_to_save, 4)
|
|
bytes_to_save += bytes("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
|
|
bytes_to_save.add(self.crc32_ota_seq(seq_to_save), 4)
|
|
|
|
#- erase flash area and write -#
|
|
flash.erase(offset_to_save, 0x1000)
|
|
flash.write(offset_to_save, bytes_to_save)
|
|
end
|
|
|
|
# Produce a human-readable representation of the object with relevant information
|
|
def tostring()
|
|
return format("<instance: Partition_otadata(ota_active:%d, ota_seq=[%d,%d], ota_max=%d)>",
|
|
self.active_otadata, self.seq0, self.seq1, self.maxota)
|
|
end
|
|
end
|
|
partition.Partition_otadata = Partition_otadata
|
|
|
|
#-------------------------------------------------------------
|
|
- Class for a partition table entry
|
|
-------------------------------------------------------------#
|
|
class Partition
|
|
var raw #- raw bytes of the partition table in flash -#
|
|
var md5 #- md5 hash of partition list -#
|
|
var slots
|
|
var otadata #- instance of Partition_otadata() -#
|
|
|
|
def init()
|
|
self.slots = []
|
|
self.load()
|
|
self.parse()
|
|
self.load_otadata()
|
|
end
|
|
|
|
# Load partition information from SPI Flash
|
|
def load()
|
|
import flash
|
|
self.raw = flash.read(0x8000,0x1000)
|
|
end
|
|
|
|
#- parse the raw bytes to a structured list of partition items -#
|
|
def parse()
|
|
for i:0..94 # there are maximum 95 slots + md5 (0xC00)
|
|
var item_raw = self.raw[i*32..(i+1)*32-1]
|
|
var magic = item_raw.get(0,2)
|
|
if magic == 0x50AA #- partition entry -#
|
|
var slot = Partition_info(item_raw)
|
|
self.slots.push(slot)
|
|
elif magic == 0xEBEB #- MD5 -#
|
|
self.md5 = self.raw[i*32+16..i*33-1]
|
|
break
|
|
else
|
|
break
|
|
end
|
|
end
|
|
end
|
|
|
|
def get_ota_slot(n)
|
|
for slot: self.slots
|
|
if slot.is_ota() == n return slot end
|
|
end
|
|
return nil
|
|
end
|
|
|
|
#- compute the highest ota<x> partition -#
|
|
def ota_max()
|
|
var ota_max = 0
|
|
for slot:self.slots
|
|
if slot.type == 0 && (slot.subtype >= 0x10 && slot.subtype < 0x20)
|
|
var ota_num = slot.subtype - 0x10
|
|
if ota_num > ota_max ota_max = ota_num end
|
|
end
|
|
end
|
|
return ota_max
|
|
end
|
|
|
|
def load_otadata()
|
|
#- look for otadata partition offset, and max_ota -#
|
|
var otadata_offset = 0xE000 #- default value -#
|
|
var ota_max = self.ota_max()
|
|
for slot:self.slots
|
|
if slot.type == 1 && slot.subtype == 0 #- otadata -#
|
|
otadata_offset = slot.start
|
|
end
|
|
end
|
|
|
|
self.otadata = Partition_otadata(ota_max, otadata_offset)
|
|
end
|
|
|
|
# get the active OTA app partition number
|
|
def get_active()
|
|
return self.otadata.active_otadata
|
|
end
|
|
|
|
#- change the active partition -#
|
|
def set_active(n)
|
|
if n < 0 || n > self.ota_max() raise "value_error", "Invalid ota partition number" end
|
|
self.otadata.set_ota_max(self.ota_max()) #- update ota_max if it changed -#
|
|
self.otadata.set_active(n)
|
|
end
|
|
|
|
#- convert to human readble -#
|
|
def tostring()
|
|
var ret = "<instance: Partition([\n"
|
|
for slot: self.slots
|
|
ret += " "
|
|
ret += slot.tostring()
|
|
ret += "\n"
|
|
end
|
|
ret += "],\n "
|
|
ret += self.otadata.tostring()
|
|
ret += "\n)>"
|
|
return ret
|
|
end
|
|
|
|
#- convert the slots to raw bytes, ready to falsh to parition page -#
|
|
def tobytes()
|
|
if size(self.slots) > 95 raise "value_error", "Too many partiition slots" end
|
|
var b = bytes()
|
|
for slot: self.slots
|
|
b += slot.tobytes()
|
|
end
|
|
#- compute MD5 -#
|
|
var md5 = MD5()
|
|
md5.update(b)
|
|
#- add the last segment -#
|
|
b += bytes("EBEBFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
|
|
b += md5.finish()
|
|
#- complete -#
|
|
return b
|
|
end
|
|
|
|
#- write back to flash -#
|
|
def save()
|
|
import flash
|
|
var b = self.tobytes()
|
|
#- erase flash area and write -#
|
|
flash.erase(0x8000, 0x1000)
|
|
flash.write(0x8000, b)
|
|
self.otadata.save()
|
|
end
|
|
|
|
#- invalidate SPIFFS partition to force format at next boot -#
|
|
#- we simply erase the first byte of the first 2 blocks in the SPIFFS partition -#
|
|
def invalidate_spiffs()
|
|
import flash
|
|
#- we expect the SPIFFS partition to be the last one -#
|
|
var spiffs = self.slots[-1]
|
|
if !spiffs.is_spiffs() raise 'value_error', 'No FS partition found' end
|
|
|
|
var b = bytes("00") #- flash memory: we can turn bits from '1' to '0' -#
|
|
flash.write(spiffs.start , b) #- block #0 -#
|
|
flash.write(spiffs.start + 0x1000, b) #- block #1 -#
|
|
end
|
|
end
|
|
partition.Partition = Partition
|
|
|
|
#################################################################################
|
|
# Partition_manager_UI
|
|
#
|
|
# WebUI for the partition manager
|
|
#################################################################################
|
|
class Partition_manager_UI
|
|
static app_size_min = 896 # Min OTA size - let's set it at a safe 896KB for minimal Tasmota32 with TLS
|
|
static app_size_max = 3072 # Max OTA size - (4096 - 896 - 128)
|
|
|
|
# create a method for adding a button to the main menu
|
|
# the button 'Partition Manager' redirects to '/part_mgr?'
|
|
def web_add_button()
|
|
import webserver
|
|
webserver.content_send(
|
|
"<form id=but_part_mgr style='display: block;' action='part_mgr' method='get'><button>Partition Manager</button></form><p></p>")
|
|
end
|
|
|
|
#- ---------------------------------------------------------------------- -#
|
|
# Show a single OTA Partition
|
|
#- ---------------------------------------------------------------------- -#
|
|
def page_show_partition(slot, active, ota_num, maxota)
|
|
import webserver
|
|
#- define `bdis` style for gray disabled buttons -#
|
|
webserver.content_send("<fieldset><style>.bdis{background:#888;}.bdis:hover{background:#888;}</style>")
|
|
webserver.content_send(format("<legend><b title='Start: 0x%03X 000'> %s%s </b></legend>",
|
|
slot.start / 0x1000, slot.label, active ? " (active)" : ""))
|
|
|
|
webserver.content_send(format("<p><b>Partition size: </b>%i KB</p>", slot.sz / 1024))
|
|
var used = slot.get_image_size()
|
|
if used > slot.sz slot.used = -1 end # we may have a leftover of a previous firmware but the slot shrank - in this case the slot is inknown
|
|
if used >= 0
|
|
webserver.content_send(format("<p><b>Used: </b>%i KB</p>", used / 1024))
|
|
webserver.content_send(format("<p><b>Free: </b>%i KB</p>", (slot.sz - used) / 1024))
|
|
else
|
|
webserver.content_send("<p><b>Used</b>: unknown</p>")
|
|
webserver.content_send("<p><b>Free</b>: unknown</p>")
|
|
end
|
|
if maxota != nil && maxota > 0
|
|
if !active && used > 0
|
|
webserver.content_send("<p><form id=setactive style='display: block;' action='/part_mgr' method='post' ")
|
|
webserver.content_send("onsubmit='return confirm(\"This will change the active partition and cause a restart.\");'>")
|
|
webserver.content_send("<button name='setactive' class='button bgrn'>Switch To This Partition</button>")
|
|
webserver.content_send(format("<input name='ota' type='hidden' value='%d'>", ota_num))
|
|
webserver.content_send("</form></p>")
|
|
else
|
|
# put a fake disabled button
|
|
webserver.content_send("<p><form style='display: block;'>")
|
|
if used >= 0
|
|
webserver.content_send("<button name='setactive' class='button bdis' disabled title=\"No need to click, it's already the active partition\">Current Active Partition</button>")
|
|
else
|
|
webserver.content_send("<button name='setactive' class='button bdis' disabled>Empty Partition</button>")
|
|
end
|
|
webserver.content_send("</form></p>")
|
|
end
|
|
end
|
|
|
|
webserver.content_send("<p></p></fieldset><p></p>")
|
|
end
|
|
|
|
|
|
#- ---------------------------------------------------------------------- -#
|
|
# Show a single OTA Partition
|
|
#- ---------------------------------------------------------------------- -#
|
|
def page_show_spiffs(slot, free_mem)
|
|
import webserver
|
|
webserver.content_send(format("<fieldset><legend><b title='Start: 0x%03X 000'> filesystem </b></legend>",
|
|
slot.start / 0x1000))
|
|
|
|
webserver.content_send(format("<p><b>Partition size:</b> %i KB</p>", slot.sz / 1024))
|
|
|
|
if free_mem != nil
|
|
webserver.content_send(format("<p><b>Max size: </b>%i KB</p>", (slot.sz + free_mem) / 1024))
|
|
webserver.content_send(format("<p><b>Unallocated: </b>%i KB</p>", free_mem / 1024))
|
|
end
|
|
|
|
#- display Resize button -#
|
|
webserver.content_send("<hr><p><b>New size:</b> (multiple of 16 KB)</p>")
|
|
webserver.content_send("<form action='/part_mgr' method='post' ")
|
|
webserver.content_send("onsubmit='return confirm(\"This will DELETE the content of the file system and cause a restart.\");'>")
|
|
webserver.content_send(format("<input type='number' min='0' max='%d' step='16' name='fs_size' value='%i'>", (slot.sz + free_mem) / 1024, ((slot.sz + free_mem) / 1024 / 16)*16))
|
|
webserver.content_send("<p></p><button name='resize' class='button bred'>Resize filesystem</button></form></p>")
|
|
webserver.content_send("<p></p></fieldset><p></p>")
|
|
end
|
|
|
|
#- ---------------------------------------------------------------------- -#
|
|
#- Show each partition one after the other - only OTA and SPIFFS
|
|
#- ---------------------------------------------------------------------- -#
|
|
def page_show_partitions(p)
|
|
# display ota partitions
|
|
for slot: p.slots
|
|
# is the slot app?
|
|
var ota_num = slot.is_ota()
|
|
if ota_num != nil
|
|
# we have an OTA partition
|
|
self.page_show_partition(slot, ota_num == p.otadata.active_otadata, ota_num, p.otadata.maxota)
|
|
elif slot.is_factory()
|
|
self.page_show_partition(slot, false, nil, nil)
|
|
elif slot.is_spiffs()
|
|
var flash_size = tasmota.memory()['flash'] * 1024
|
|
var used_size = (slot.start + slot.sz)
|
|
self.page_show_spiffs(slot, slot == p.slots[-1] ? flash_size - used_size : nil)
|
|
end
|
|
end
|
|
end
|
|
|
|
#- ---------------------------------------------------------------------- -#
|
|
#- Display the Re-partition section - both OTA different sizes
|
|
#- ---------------------------------------------------------------------- -#
|
|
def page_show_repartition_asym(p)
|
|
import webserver
|
|
if p.get_active() != 0
|
|
webserver.content_send("<p style='width:320px;'>Re-partition can be done only if 'app0' is active.</p>")
|
|
else
|
|
# we can proceed
|
|
var app0 = p.get_ota_slot(0)
|
|
var app0_size_kb = ((app0.sz / 1024 + 63) / 64) * 64 # rounded to upper 64kb
|
|
var app0_used_kb = (((app0.get_image_size()) / 1024 / 64) + 1) * 64
|
|
|
|
var app1 = p.get_ota_slot(1)
|
|
var app1_size_kb = ((app1.sz / 1024 + 63) / 64) * 64 # rounded to upper 64kb
|
|
# var app1_used_kb = (((app1.get_image_size()) / 1024 / 64) + 1) * 64 # we don't actually need it
|
|
|
|
var flash_size_kb = tasmota.memory()['flash']
|
|
|
|
webserver.content_send("<p><b>Resize app Partitions.</b><br>It is highly recommended to set<br>both partition with the same size.<br>Filesystem is adjusted accordinlgy.</p>")
|
|
|
|
webserver.content_send("<form action='/part_mgr' method='post' ")
|
|
webserver.content_send("onsubmit='return confirm(\"This will DELETE the content of the file system and cause a restart.\");'>")
|
|
|
|
webserver.content_send("<hr><p><b>app0:</b></p>")
|
|
webserver.content_send(format("<p><b>Min:</b> %i KB</p>", app0_used_kb))
|
|
webserver.content_send(format("<p><b>Max:</b> %i KB</p>", self.app_size_max))
|
|
webserver.content_send("<p><b>New:</b> (multiple of 64 KB)</p>")
|
|
webserver.content_send(format("<input type='number' min='%d' max='%d' step='64' name='app0' value='%i'>", app0_used_kb, self.app_size_max, app0_size_kb))
|
|
|
|
webserver.content_send("<hr><p><b>app1:</b></p>")
|
|
webserver.content_send(format("<p><b>Min:</b> %i KB</p>", self.app_size_min))
|
|
webserver.content_send(format("<p><b>Max:</b> %i KB</p>", self.app_size_max))
|
|
webserver.content_send("<p><b>New:</b> (multiple of 64 KB)</p>")
|
|
webserver.content_send(format("<input type='number' min='%d' max='%d' step='64' name='app1' value='%i'>", self.app_size_min, self.app_size_max, app1_size_kb))
|
|
|
|
webserver.content_send("<p></p><button name='resize' class='button bred'>Resize Partitions</button></form></p>")
|
|
|
|
end
|
|
end
|
|
#######################################################################
|
|
# Display the complete page
|
|
#######################################################################
|
|
def page_part_mgr()
|
|
import webserver
|
|
import partition_core
|
|
if !webserver.check_privileged_access() return nil end
|
|
var p = partition_core.Partition()
|
|
|
|
webserver.content_start("Partition Manager") #- title of the web page -#
|
|
webserver.content_send_style() #- send standard Tasmota styles -#
|
|
|
|
# webserver.content_send("<p style='width:340px;'><b style='color:#f56'>Warning:</b> This can brick your device. Don't use unless you know what you are doing.</p>")
|
|
webserver.content_send("<fieldset><legend><b> Partition Manager </b></legend><p></p>")
|
|
webserver.content_send("<p style='width:320px;'><b style='color:#f56'>Warning:</b> This can brick your device.</p>")
|
|
self.page_show_partitions(p)
|
|
webserver.content_send("<p></p></fieldset><p></p>")
|
|
|
|
if p.otadata.maxota > 0
|
|
webserver.content_send("<fieldset><legend><b> Re-partition </b></legend><p></p>")
|
|
self.page_show_repartition_asym(p)
|
|
webserver.content_send("<p></p></fieldset><p></p>")
|
|
end
|
|
|
|
webserver.content_button(webserver.BUTTON_MANAGEMENT) #- button back to management page -#
|
|
webserver.content_stop() #- end of web page -#
|
|
end
|
|
|
|
#######################################################################
|
|
# Web Controller, called by POST to `/part_mgr`
|
|
#######################################################################
|
|
def page_part_ctl()
|
|
import webserver
|
|
import partition_core
|
|
if !webserver.check_privileged_access() return nil end
|
|
|
|
#- check that the partition is valid -#
|
|
var p = partition_core.Partition()
|
|
|
|
try
|
|
#---------------------------------------------------------------------#
|
|
# Switch OTA partition from one to another
|
|
#---------------------------------------------------------------------#
|
|
if webserver.has_arg("ota")
|
|
#- OTA switch partition -#
|
|
var ota_target = int(webserver.arg("ota"))
|
|
if ota_target < 0 || ota_target > p.ota_max() raise "value_error", format("Invalid partition #%d", ota_target) end
|
|
var ota_slot = p.get_ota_slot(ota_target)
|
|
|
|
if ota_slot == nil || ota_slot.get_image_size() < 0
|
|
raise "value_error", format("Invalid OTA slot #%d", ota_target)
|
|
end
|
|
|
|
print(format("Trying to change active partition to %d", ota_target))
|
|
#- do the change -#
|
|
p.set_active(ota_target)
|
|
p.otadata.save() #- write to disk -#
|
|
|
|
#- and force restart -#
|
|
webserver.redirect("/?rst=")
|
|
|
|
#---------------------------------------------------------------------#
|
|
# Resize the SPIFFS partition, generally to extend it to full free size
|
|
#---------------------------------------------------------------------#
|
|
elif webserver.has_arg("fs_size")
|
|
#- SPIFFS size change -#
|
|
var spiffs_size_kb = int(webserver.arg("fs_size"))
|
|
var spiffs_slot = p.slots[-1] # last slot
|
|
|
|
var spiffs_max_size = ((tasmota.memory()['flash'] - (spiffs_slot.start / 1024)) / 16) * 16
|
|
|
|
if spiffs_slot == nil || !spiffs_slot.is_spiffs() raise "value_error", "Last slot is not FS type" end
|
|
var flash_size_kb = tasmota.memory()['flash']
|
|
if spiffs_size_kb < 0 || spiffs_size_kb > spiffs_max_size
|
|
raise "value_error", format("Invalid fs_size %i, should be between 0 and %i", spiffs_size_kb, spiffs_max_size)
|
|
end
|
|
if spiffs_size_kb == spiffs_slot.sz/1024 raise "value_error", "FS size unchanged, abort" end
|
|
|
|
#- write the new SPIFFS partition size -#
|
|
spiffs_slot.sz = spiffs_size_kb * 1024
|
|
p.save()
|
|
p.invalidate_spiffs() # erase SPIFFS or data is corrupt
|
|
|
|
#- and force restart -#
|
|
webserver.redirect("/?rst=")
|
|
|
|
#---------------------------------------------------------------------#
|
|
# Repartition symmetrical OTA with a new SPIFFS size
|
|
#---------------------------------------------------------------------#
|
|
elif webserver.has_arg("repartition")
|
|
if p.get_active() != 0 raise "value_error", "Can't repartition unless active partition is app0" end
|
|
#- complete repartition -#
|
|
var app0 = p.get_ota_slot(0)
|
|
var app1 = p.get_ota_slot(1)
|
|
var spiffs = p.slots[-1]
|
|
|
|
if !spiffs.is_spiffs() raise 'internal_error', 'No FS partition found' end
|
|
if app0 == nil || app1 == nil
|
|
raise "internal_error", "Unable to find partitions app0 and app1"
|
|
end
|
|
if p.ota_max() != 1
|
|
raise "internal_error", "There are more than 2 OTA partition, abort"
|
|
end
|
|
var app0_size_kb = ((app0.sz / 1024 + 63) / 64) * 64 # rounded to upper 64kb
|
|
var app0_used_kb = (((app0.get_image_size()) / 1024 / 64) + 1) * 64
|
|
var flash_size_kb = tasmota.memory()['flash']
|
|
|
|
var part_size_kb = int(webserver.arg("repartition"))
|
|
if part_size_kb < app0_used_kb || part_size_kb > self.app_size_max
|
|
raise "value_error", format("Invalid partition size %i KB, should be between %i and %i", part_size_kb, app0_used_kb, self.app_size_max)
|
|
end
|
|
if part_size_kb == app0_size_kb raise "value_error", "No change to partition size, abort" end
|
|
|
|
#- all good, proceed -#
|
|
# resize app0
|
|
app0.sz = part_size_kb * 1024
|
|
# change app1
|
|
app1.start = app0.start + app0.sz
|
|
app1.sz = part_size_kb * 1024
|
|
# change spiffs
|
|
spiffs.start = app1.start + app1.sz
|
|
spiffs.sz = flash_size_kb * 1024 - spiffs.start
|
|
|
|
p.save()
|
|
p.invalidate_spiffs() # erase SPIFFS or data is corrupt
|
|
#- and force restart -#
|
|
webserver.redirect("/?rst=")
|
|
|
|
#---------------------------------------------------------------------#
|
|
# Repartition OTA with a new SPIFFS size
|
|
#---------------------------------------------------------------------#
|
|
elif webserver.has_arg("app0") && webserver.has_arg("app1")
|
|
if p.get_active() != 0 raise "value_error", "Can't repartition unless active partition is app0" end
|
|
#- complete repartition -#
|
|
var app0 = p.get_ota_slot(0)
|
|
var app1 = p.get_ota_slot(1)
|
|
var spiffs = p.slots[-1]
|
|
|
|
if !spiffs.is_spiffs() raise 'internal_error', 'No FS partition found' end
|
|
if app0 == nil || app1 == nil
|
|
raise "internal_error", "Unable to find partitions app0 and app1"
|
|
end
|
|
if p.ota_max() != 1
|
|
raise "internal_error", "There are more than 2 OTA partition, abort"
|
|
end
|
|
var app0_size_kb = ((app0.sz / 1024 + 63) / 64) * 64 # rounded to upper 64kb
|
|
var app0_used_kb = (((app0.get_image_size()) / 1024 / 64) + 1) * 64
|
|
var app1_size_kb = ((app1.sz / 1024 + 63) / 64) * 64 # rounded to upper 64kb
|
|
var flash_size_kb = tasmota.memory()['flash']
|
|
|
|
var part0_size_kb = int(webserver.arg("app0"))
|
|
if part0_size_kb < app0_used_kb || part0_size_kb > self.app_size_max
|
|
raise "value_error", format("Invalid partition size app%i %i KB, should be between %i and %i", 0, part0_size_kb, app0_used_kb, self.app_size_max)
|
|
end
|
|
var part1_size_kb = int(webserver.arg("app1"))
|
|
if part1_size_kb < self.app_size_min || part1_size_kb > self.app_size_max
|
|
raise "value_error", format("Invalid partition size app%i %i KB, should be between %i and %i", 1, part1_size_kb, self.app_size_min, self.app_size_max)
|
|
end
|
|
if part0_size_kb == app0_size_kb && part1_size_kb == app1_size_kb raise "value_error", "No change to partition sizes, abort" end
|
|
|
|
#- all good, proceed -#
|
|
# resize app0
|
|
app0.sz = part0_size_kb * 1024
|
|
# change app1
|
|
app1.start = app0.start + app0.sz
|
|
app1.sz = part1_size_kb * 1024
|
|
# change spiffs
|
|
spiffs.start = app1.start + app1.sz
|
|
spiffs.sz = flash_size_kb * 1024 - spiffs.start
|
|
|
|
p.save()
|
|
p.invalidate_spiffs() # erase SPIFFS or data is corrupt
|
|
#- and force restart -#
|
|
webserver.redirect("/?rst=")
|
|
#---------------------------------------------------------------------#
|
|
else
|
|
raise "value_error", "Unknown command"
|
|
end
|
|
except .. as e, m
|
|
print(format("BRY: Exception> '%s' - %s", e, m))
|
|
#- display error page -#
|
|
webserver.content_start("Parameter error") #- title of the web page -#
|
|
webserver.content_send_style() #- send standard Tasmota styles -#
|
|
|
|
webserver.content_send(format("<p style='width:340px;'><b>Exception:</b><br>'%s'<br>%s</p>", e, m))
|
|
# webserver.content_send("<p></p></fieldset><p></p>")
|
|
|
|
webserver.content_button(webserver.BUTTON_MANAGEMENT) #- button back to management page -#
|
|
webserver.content_stop() #- end of web page -#
|
|
end
|
|
end
|
|
|
|
#- ---------------------------------------------------------------------- -#
|
|
# respond to web_add_handler() event to register web listeners
|
|
#- ---------------------------------------------------------------------- -#
|
|
#- this is called at Tasmota start-up, as soon as Wifi/Eth is up and web server running -#
|
|
def web_add_handler()
|
|
import webserver
|
|
#- we need to register a closure, not just a function, that captures the current instance -#
|
|
webserver.on("/part_mgr", / -> self.page_part_mgr(), webserver.HTTP_GET)
|
|
webserver.on("/part_mgr", / -> self.page_part_ctl(), webserver.HTTP_POST)
|
|
end
|
|
end
|
|
partition.Partition_manager_UI = Partition_manager_UI
|
|
|
|
|
|
#- create and register driver in Tasmota -#
|
|
if tasmota
|
|
import partition_core
|
|
var partition_manager_ui = partition.Partition_manager_UI()
|
|
tasmota.add_driver(partition_manager_ui)
|
|
## can be removed if put in 'autoexec.bat'
|
|
partition_manager_ui.web_add_handler()
|
|
end
|
|
|
|
return partition
|
|
|
|
#- Example
|
|
|
|
import partition
|
|
|
|
# read
|
|
p = partition.Partition()
|
|
print(p)
|
|
|
|
-#
|