Berry refactor IM to detect failed subscription heartbeats (#21706)

This commit is contained in:
s-hadinger 2024-06-29 12:27:27 +02:00 committed by GitHub
parent c58607a5b5
commit 117eb79953
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 852 additions and 790 deletions

View File

@ -111,9 +111,13 @@ class Matter_IM
def process_incoming_ack(msg) def process_incoming_ack(msg)
# check if there is an exchange_id interested in receiving this # check if there is an exchange_id interested in receiving this
var message = self.find_sendqueue_by_exchangeid(msg.exchange_id) var message = self.find_sendqueue_by_exchangeid(msg.exchange_id)
# log(format("MTR: process_incoming_ack exch=%i message=%i", msg.exchange_id, message != nil ? 1 : 0), 4) # log(format("MTR: process_incoming_ack exch=%i message=%i", msg.exchange_id, message != nil ? 1 : 0), 3)
if message if message
return message.ack_received(msg) # dispatch to IM_Message var ret = message.ack_received(msg) # dispatch to IM_Message
if message.finished
self.remove_sendqueue_by_exchangeid(msg.exchange_id)
end
return ret
end end
return false return false
end end
@ -137,13 +141,12 @@ class Matter_IM
while idx < size(self.send_queue) while idx < size(self.send_queue)
var message = self.send_queue[idx] var message = self.send_queue[idx]
if !message.finish && message.ready if !message.finished && message.ready
message.send_im(responder) # send message message.send_im(responder) # send message
end end
if message.finish if message.finished
log("MTR: remove IM message exch="+str(message.resp.exchange_id), 4) self.remove_sendqueue_by_exchangeid(message.resp.exchange_id)
self.send_queue.remove(idx)
else else
idx += 1 idx += 1
end end
@ -169,11 +172,12 @@ class Matter_IM
############################################################# #############################################################
# find in send_queue by exchangeid # find in send_queue by exchangeid
# #
def remove_sendqueue_by_exchangeid(exchangeid) def remove_sendqueue_by_exchangeid(exchange_id)
if exchangeid == nil return end if exchange_id == nil return end
var idx = 0 var idx = 0
while idx < size(self.send_queue) while idx < size(self.send_queue)
if self.send_queue[idx].get_exchangeid() == exchangeid if self.send_queue[idx].get_exchangeid() == exchange_id
# log(f"MTR: remove IM message exch={exchange_id}", 3)
self.send_queue.remove(idx) self.send_queue.remove(idx)
else else
idx += 1 idx += 1

View File

@ -38,40 +38,57 @@ class Matter_IM_Message
var expiration # expiration time for the reporting var expiration # expiration time for the reporting
var resp # response Frame object var resp # response Frame object
var ready # bool: ready to send (true) or wait (false) var ready # bool: ready to send (true) or wait (false)
var finish # if true, the message is removed from the queue var finishing # we have sent all packet, just wait for a final Ack
var finished # if true, the message is removed from the queue
var data # TLV data of the response (if any) var data # TLV data of the response (if any)
var last_counter # counter value of last sent packet (to match ack) var last_counter # counter value of last sent packet (to match ack)
#################################################################################
# build a response message stub # build a response message stub
#
# Args:
# - msg: the message object
# - opcode: (int) the Matter opcode of the response
# - reliable: (bool) if true, then we send the response as reliable, i.e. we expect a Ack to confirm it was received
def init(msg, opcode, reliable) def init(msg, opcode, reliable)
self.reset(msg, opcode, reliable) self.reset(msg, opcode, reliable)
end end
#################################################################################
def reset(msg, opcode, reliable) def reset(msg, opcode, reliable)
self.resp = (msg != nil) ? msg.build_response(opcode, reliable) : nil # is nil for spontaneous reports self.resp = (msg != nil) ? msg.build_response(opcode, reliable) : nil # is nil for spontaneous reports
self.ready = true # by default send immediately self.ready = true # by default send immediately
self.expiration = tasmota.millis() + self.MSG_TIMEOUT self.expiration = tasmota.millis() + self.MSG_TIMEOUT
self.last_counter = 0 # avoid `nil` value self.last_counter = 0 # avoid `nil` value
self.finish = false self.finishing = false
self.finished = false
self.data = nil self.data = nil
end end
#################################################################################
# the message is being removed due to expiration # the message is being removed due to expiration
def reached_timeout() def reached_timeout()
# log(f"MTR: IM_Message reached_timeout exch={self.resp.exchange_id}", 3)
end end
#################################################################################
# ack received for previous message, proceed to next (if any) # ack received for previous message, proceed to next (if any)
# return true if we manage the ack ourselves, false if it needs to be done upper # return true if we manage the ack ourselves, false if it needs to be done upper
def ack_received(msg) def ack_received(msg)
# log("MTR: IM_Message ack_received exch="+str(self.resp.exchange_id), 3) # log(f"MTR: IM_Message ack_received exch={self.resp.exchange_id} {self.finishing=} {self.finished=}", 3)
self.expiration = tasmota.millis() + self.MSG_TIMEOUT # give more time if self.finishing # if finishing, we are waiting for final Ack before removing from queue
return false self.finished = true # remove exchange
else
self.expiration = tasmota.millis() + self.MSG_TIMEOUT # else give more time to the timer
end
return false # return false to indicate that we didn't answer ourselves
end end
#################################################################################
# Status Report OK received for previous message, proceed to next (if any) # Status Report OK received for previous message, proceed to next (if any)
# return true if we manage the ack ourselves, false if it needs to be done upper # return true if we manage the ack ourselves, false if it needs to be done upper
def status_ok_received(msg) def status_ok_received(msg)
# log(format("MTR: IM_Message status_ok_received exch=%i", self.resp.exchange_id), 3) # log(f"MTR: IM_Message status_ok_received exch={self.resp.exchange_id}", 3)
self.expiration = tasmota.millis() + self.MSG_TIMEOUT # give more time self.expiration = tasmota.millis() + self.MSG_TIMEOUT # give more time
if msg if msg
self.resp = msg.build_response(self.resp.opcode, self.resp.x_flag_r, self.resp) # update packet self.resp = msg.build_response(self.resp.opcode, self.resp.x_flag_r, self.resp) # update packet
@ -80,34 +97,45 @@ class Matter_IM_Message
return true return true
end end
#################################################################################
# we received an ACK error, do any necessary cleaning # we received an ACK error, do any necessary cleaning
#
# Arg:
# - msg: the message received
#
# No return value
def status_error_received(msg) def status_error_received(msg)
end end
#################################################################################
# get the exchange-id for this message # get the exchange-id for this message
#
# No return value
def get_exchangeid() def get_exchangeid()
return self.resp.exchange_id return self.resp.exchange_id
end end
#################################################################################
# default responder for data # default responder for data
#
# This is the main entry point for seding the next response.
# This is called only when `ready` is `true`
#
# Arg:
# - responder: instance of MessageHandler to create the response object
def send_im(responder) def send_im(responder)
# log(format("MTR: IM_Message send_im exch=%i ready=%i", self.resp.exchange_id, self.ready ? 1 : 0), 3) # log(f"MTR: IM_Message send_im exch={self.resp.exchange_id}", 3)
if !self.ready return false end # if !self.ready return false end # we're not supposed to be called if ready is false - dead code
# import debug
var resp = self.resp var resp = self.resp
var data_tlv = self.data.to_TLV() var data_raw = self.data.to_TLV().tlv2raw() # payload in cleartext
# matter.profiler.log(str(data_tlv))
var data_raw = data_tlv.tlv2raw() # payload in cleartext
# matter.profiler.log(data_raw.tohex())
resp.encode_frame(data_raw) # payload in cleartext resp.encode_frame(data_raw) # payload in cleartext
resp.encrypt() resp.encrypt()
if tasmota.loglevel(4) if tasmota.loglevel(4)
log(format("MTR: <snd (%6i) id=%i exch=%i rack=%s", resp.session.local_session_id, resp.message_counter, resp.exchange_id, resp.ack_message_counter), 4) log(f"MTR: <snd ({resp.session.local_session_id:6i}) id={resp.message_counter} exch={resp.exchange_id} rack={resp.ack_message_counter}", 4)
end end
# log("MTR: Perf/Send = " + str(debug.counters()), 4)
responder.send_response_frame(resp) responder.send_response_frame(resp)
self.last_counter = resp.message_counter self.last_counter = resp.message_counter
self.finish = true # by default we remove the packet after it is sent self.finishing = true # we wait for final ack
end end
end end
@ -120,11 +148,12 @@ matter.IM_Message = Matter_IM_Message
################################################################################# #################################################################################
class Matter_IM_Status : Matter_IM_Message class Matter_IM_Status : Matter_IM_Message
#################################################################################
def init(msg, status) def init(msg, status)
super(self).init(msg, 0x01 #-Status Response-#, true #-reliable-#) super(self).init(msg, 0x01 #-Status Response-#, true #-reliable-#)
var sr = matter.StatusResponseMessage() var sr = matter.StatusResponseMessage()
sr.status = status sr.status = status
self.data = sr self.data = sr # prepare the context in `self.data` that will be sent
end end
end end
matter.IM_Status = Matter_IM_Status matter.IM_Status = Matter_IM_Status
@ -136,6 +165,7 @@ matter.IM_Status = Matter_IM_Status
################################################################################# #################################################################################
class Matter_IM_InvokeResponse : Matter_IM_Message class Matter_IM_InvokeResponse : Matter_IM_Message
#################################################################################
def init(msg, data) def init(msg, data)
super(self).init(msg, 0x09 #-Invoke Response-#, true) super(self).init(msg, 0x09 #-Invoke Response-#, true)
self.data = data self.data = data
@ -150,6 +180,7 @@ matter.IM_InvokeResponse = Matter_IM_InvokeResponse
################################################################################# #################################################################################
class Matter_IM_WriteResponse : Matter_IM_Message class Matter_IM_WriteResponse : Matter_IM_Message
#################################################################################
def init(msg, data) def init(msg, data)
super(self).init(msg, 0x07 #-Write Response-#, true) super(self).init(msg, 0x07 #-Write Response-#, true)
self.data = data self.data = data
@ -176,30 +207,48 @@ class Matter_IM_ReportData_Pull : Matter_IM_Message
var suppress_response # if not `nil`, suppress_response attribute var suppress_response # if not `nil`, suppress_response attribute
var data_ev # left-overs of events, mirroring of data for attributes var data_ev # left-overs of events, mirroring of data for attributes
#################################################################################
#
# Args
# - msg: message object
# - ctx_generator_or_arr: generator(s) for attributes (array, single instance, or nil)
# - event_generator_or_arr: generator(s) for events (array, single instance, or nil)
def init(msg, ctx_generator_or_arr, event_generator_or_arr) def init(msg, ctx_generator_or_arr, event_generator_or_arr)
super(self).init(msg, 0x05 #-Report Data-#, true) super(self).init(msg, 0x05 #-Report Data-#, true)
self.generator_or_arr = ctx_generator_or_arr self.generator_or_arr = ctx_generator_or_arr
self.event_generator_or_arr = event_generator_or_arr self.event_generator_or_arr = event_generator_or_arr
end end
#################################################################################
# set_subscription_id
#
# Sets the SubscriptionId of the exchange (to be used by subclasses)
def set_subscription_id(subscription_id) def set_subscription_id(subscription_id)
self.subscription_id = subscription_id self.subscription_id = subscription_id
end end
#################################################################################
# set_suppress_response
#
# Sets the SuppressReponse attribute (bool or nil)
#
# If false, the controller must respond with StatusReport
# If true, the controller must not respond (only a Ack to confirm reliable message)
# `nil` means the default value which is `false` as per Matter spec
def set_suppress_response(suppress_response) def set_suppress_response(suppress_response)
self.suppress_response = suppress_response self.suppress_response = suppress_response
end end
#################################################################################
# default responder for data # default responder for data
def send_im(responder) def send_im(responder)
# log(format(">>>: Matter_IM_ReportData_Pull send_im exch=%i ready=%i", self.resp.exchange_id, self.ready ? 1 : 0), 3) # log(format(">>>: Matter_IM_ReportData_Pull send_im exch=%i ready=%i", self.resp.exchange_id, self.ready ? 1 : 0), 3)
if !self.ready return false end
var resp = self.resp # response frame object var resp = self.resp # response frame object
var data = (self.data != nil) ? self.data : bytes() # bytes() object of the TLV encoded response var data = (self.data != nil) ? self.data : bytes() # bytes() object of the TLV encoded response
self.data = nil # we remove the data that was saved for next packet self.data = nil # we remove the data that was saved for next packet
var not_full = true # marker used to exit imbricated loops var not_full = true # marker used to exit imbricated loops
var debug = responder.device.debug var debug = responder.device.debug # set debug flag in local variable to ease access below
var data_ev = (self.data_ev != nil) ? self.data_ev : ((self.event_generator_or_arr != nil) ? bytes() : nil) # bytes for events or nil if no event generators var data_ev = (self.data_ev != nil) ? self.data_ev : ((self.event_generator_or_arr != nil) ? bytes() : nil) # bytes for events or nil if no event generators
# if event_generator_or_arr != nil then data_ev contains a bytes() object # if event_generator_or_arr != nil then data_ev contains a bytes() object
@ -305,8 +354,7 @@ class Matter_IM_ReportData_Pull : Matter_IM_Message
ret.more_chunked_messages = (self.data != nil) || (self.data_ev != nil) # we got more data to send ret.more_chunked_messages = (self.data != nil) || (self.data_ev != nil) # we got more data to send
# print(">>>>> send elements before encode") # print(">>>>> send elements before encode")
var raw_tlv = ret.to_TLV() var encoded_tlv = ret.to_TLV().tlv2raw(bytes(self.MAX_MESSAGE)) # takes time
var encoded_tlv = raw_tlv.tlv2raw(bytes(self.MAX_MESSAGE)) # takes time
resp.encode_frame(encoded_tlv) # payload in cleartext, pre-allocate max buffer resp.encode_frame(encoded_tlv) # payload in cleartext, pre-allocate max buffer
resp.encrypt() resp.encrypt()
# log(format("MTR: <snd (%6i) id=%i exch=%i rack=%s", resp.session.local_session_id, resp.message_counter, resp.exchange_id, resp.ack_message_counter), 4) # log(format("MTR: <snd (%6i) id=%i exch=%i rack=%s", resp.session.local_session_id, resp.message_counter, resp.exchange_id, resp.ack_message_counter), 4)
@ -318,7 +366,7 @@ class Matter_IM_ReportData_Pull : Matter_IM_Message
# keep alive # keep alive
else else
# log(f">>>: ReportData_Pull finished",3) # log(f">>>: ReportData_Pull finished",3)
self.finish = true # finished, remove self.finishing = true # finishing, remove after final Ack
end end
end end
@ -337,7 +385,7 @@ class Matter_IM_ReportDataSubscribed_Pull : Matter_IM_ReportData_Pull
# var expiration # expiration time for the reporting # var expiration # expiration time for the reporting
# var resp # response Frame object # var resp # response Frame object
# var ready # bool: ready to send (true) or wait (false) # var ready # bool: ready to send (true) or wait (false)
# var finish # if true, the message is removed from the queue # var finished # if true, the message is removed from the queue
# var data # TLV data of the response (if any) # var data # TLV data of the response (if any)
# var last_counter # counter value of last sent packet (to match ack) # var last_counter # counter value of last sent packet (to match ack)
# inherited from Matter_IM_ReportData_Pull # inherited from Matter_IM_ReportData_Pull
@ -348,6 +396,7 @@ class Matter_IM_ReportDataSubscribed_Pull : Matter_IM_ReportData_Pull
var sub # subscription object var sub # subscription object
var report_data_phase # true during reportdata var report_data_phase # true during reportdata
#################################################################################
def init(message_handler, session, ctx_generator_or_arr, event_generator_or_arr, sub) def init(message_handler, session, ctx_generator_or_arr, event_generator_or_arr, sub)
super(self).init(nil, ctx_generator_or_arr, event_generator_or_arr) # send msg=nil to avoid creating a reponse super(self).init(nil, ctx_generator_or_arr, event_generator_or_arr) # send msg=nil to avoid creating a reponse
# we need to initiate a new virtual response, because it's a spontaneous message # we need to initiate a new virtual response, because it's a spontaneous message
@ -359,35 +408,40 @@ class Matter_IM_ReportDataSubscribed_Pull : Matter_IM_ReportData_Pull
self.set_suppress_response(false) self.set_suppress_response(false)
end end
#################################################################################
def reached_timeout() def reached_timeout()
# log(f"MTR: IM_ReportDataSubscribed_Pull reached_timeout()", 3)
self.sub.remove_self() self.sub.remove_self()
end end
#################################################################################
# ack received, confirm the heartbeat # ack received, confirm the heartbeat
def ack_received(msg) def ack_received(msg)
# log(format("MTR: IM_ReportDataSubscribed_Pull ack_received sub=%i", self.sub.subscription_id), 3) # log(f"MTR: IM_ReportDataSubscribed_Pull ack_received sub={self.sub.subscription_id}", 3)
super(self).ack_received(msg) super(self).ack_received(msg)
if !self.report_data_phase if !self.report_data_phase
# if ack is received while all data is sent, means that it finished without error # if ack is received while all data is sent, means that it finished without error
if self.sub.is_keep_alive # only if keep-alive, for normal reports, re_arm is called at last StatusReport if self.sub.is_keep_alive # only if keep-alive, for normal reports, re_arm is called at last StatusReport
self.sub.re_arm() # signal that we can proceed to next sub report self.sub.re_arm() # signal that we can proceed to next sub report
end end
return true # proceed to calling send() which removes the message return false # proceed to calling send() which removes the message
else else
return false # do nothing return false # do nothing
end end
end end
#################################################################################
# we received an ACK error, remove subscription # we received an ACK error, remove subscription
def status_error_received(msg) def status_error_received(msg)
# log(format("MTR: IM_ReportDataSubscribed_Pull status_error_received sub=%i exch=%i", self.sub.subscription_id, self.resp.exchange_id), 3) # log(f"MTR: IM_ReportDataSubscribed_Pull status_error_received sub={self.sub.subscription_id} exch={self.resp.exchange_id}", 3)
self.sub.remove_self() self.sub.remove_self()
end end
#################################################################################
# ack received for previous message, proceed to next (if any) # ack received for previous message, proceed to next (if any)
# return true if we manage the ack ourselves, false if it needs to be done upper # return true if we manage the ack ourselves, false if it needs to be done upper
def status_ok_received(msg) def status_ok_received(msg)
# log(format("MTR: IM_ReportDataSubscribed_Pull status_ok_received sub=%i exch=%i", self.sub.subscription_id, self.resp.exchange_id), 3) # log(f"MTR: IM_ReportDataSubscribed_Pull status_ok_received sub={self.sub.subscription_id} exch={self.resp.exchange_id}", 3)
if self.report_data_phase if self.report_data_phase
return super(self).status_ok_received(msg) return super(self).status_ok_received(msg)
else else
@ -397,22 +451,22 @@ class Matter_IM_ReportDataSubscribed_Pull : Matter_IM_ReportData_Pull
end end
end end
#################################################################################
# returns true if transaction is complete (remove object from queue) # returns true if transaction is complete (remove object from queue)
# default responder for data # default responder for data
def send_im(responder) def send_im(responder)
# log(format("MTR: IM_ReportDataSubscribed_Pull send sub=%i exch=%i ready=%i", self.sub.subscription_id, self.resp.exchange_id, self.ready ? 1 : 0), 3) # log(format("MTR: IM_ReportDataSubscribed_Pull send sub=%i exch=%i ready=%i", self.sub.subscription_id, self.resp.exchange_id, self.ready ? 1 : 0), 3)
# log(format("MTR: ReportDataSubscribed::send_im size(self.data.attribute_reports)=%i ready=%s report_data_phase=%s", size(self.data.attribute_reports), str(self.ready), str(self.report_data_phase)), 3)
if !self.ready return false end if !self.ready return false end
if (self.generator_or_arr != nil) || (self.event_generator_or_arr != nil) # do we have still attributes or events to send if (self.generator_or_arr != nil) || (self.event_generator_or_arr != nil) # do we have still attributes or events to send
if self.report_data_phase if self.report_data_phase
super(self).send_im(responder) super(self).send_im(responder)
# log(format("MTR: ReportDataSubscribed::send_im called super finish=%i", self.finish), 3) # log(format("MTR: ReportDataSubscribed::send_im called super finished=%i", self.finished), 3)
if !self.finish return end # ReportData needs to continue if !self.finishing return end # ReportData needs to continue
# ReportData is finished # ReportData is finished
self.report_data_phase = false self.report_data_phase = false
self.ready = false self.ready = false
self.finish = false # while a ReadReport would stop here, we continue for subscription self.finished = false # while a ReadReport would stop here, we continue for subscription
else else
# send a simple ACK # send a simple ACK
var resp = self.resp.build_standalone_ack(false) var resp = self.resp.build_standalone_ack(false)
@ -423,7 +477,7 @@ class Matter_IM_ReportDataSubscribed_Pull : Matter_IM_ReportData_Pull
end end
responder.send_response_frame(resp) responder.send_response_frame(resp)
self.last_counter = resp.message_counter self.last_counter = resp.message_counter
self.finish = true # self.finished = true
self.sub.re_arm() # signal that we can proceed to next sub report self.sub.re_arm() # signal that we can proceed to next sub report
end end
@ -433,7 +487,7 @@ class Matter_IM_ReportDataSubscribed_Pull : Matter_IM_ReportData_Pull
super(self).send_im(responder) super(self).send_im(responder)
self.report_data_phase = false self.report_data_phase = false
else else
self.finish = true # self.finished = true
self.sub.re_arm() # signal that we can proceed to next sub report self.sub.re_arm() # signal that we can proceed to next sub report
end end
end end
@ -451,6 +505,7 @@ matter.IM_ReportDataSubscribed_Pull = Matter_IM_ReportDataSubscribed_Pull
class Matter_IM_SubscribedHeartbeat : Matter_IM_ReportData_Pull class Matter_IM_SubscribedHeartbeat : Matter_IM_ReportData_Pull
var sub # subscription object var sub # subscription object
#################################################################################
def init(message_handler, session, sub) def init(message_handler, session, sub)
super(self).init(nil, nil #-no ctx_generator_or_arr-#, nil #-no event_generator_or_arr-#) # send msg=nil to avoid creating a reponse super(self).init(nil, nil #-no ctx_generator_or_arr-#, nil #-no event_generator_or_arr-#) # send msg=nil to avoid creating a reponse
# we need to initiate a new virtual response, because it's a spontaneous message # we need to initiate a new virtual response, because it's a spontaneous message
@ -458,40 +513,48 @@ class Matter_IM_SubscribedHeartbeat : Matter_IM_ReportData_Pull
# #
self.sub = sub self.sub = sub
self.set_subscription_id(sub.subscription_id) self.set_subscription_id(sub.subscription_id)
self.set_suppress_response(true) self.set_suppress_response(true) # as per Matter definition, heartbeat requires no StatusReport, only a simple Ack
end end
#################################################################################
# reached_timeout
#
# The heartbeat was not acked within 5 seconds, and after all retries,
# then the controller is not expecting any more answers,
# remove the subscription
def reached_timeout() def reached_timeout()
# log(f"MTR: IM_SubscribedHeartbeat reached_timeout()", 3)
self.sub.remove_self() self.sub.remove_self()
end end
# ack received, confirm the heartbeat #################################################################################
# ack received, confirm the heartbeat and remove the packet from the queue
def ack_received(msg) def ack_received(msg)
# log(format("MTR: Matter_IM_SubscribedHeartbeat ack_received sub=%i", self.sub.subscription_id), 3) # log(format("MTR: IM_SubscribedHeartbeat ack_received sub=%i", self.sub.subscription_id), 3)
super(self).ack_received(msg) super(self).ack_received(msg)
self.finish = true return false # no further response
return true # proceed to calling send() which removes the message
end end
#################################################################################
# we received an ACK error, remove subscription # we received an ACK error, remove subscription
def status_error_received(msg) def status_error_received(msg)
# log(format("MTR: Matter_IM_SubscribedHeartbeat status_error_received sub=%i exch=%i", self.sub.subscription_id, self.resp.exchange_id), 3) # log(format("MTR: IM_SubscribedHeartbeat status_error_received sub=%i exch=%i", self.sub.subscription_id, self.resp.exchange_id), 3)
self.sub.remove_self() self.sub.remove_self()
return false # let the caller to the ack return false # let the caller to the ack
end end
#################################################################################
# ack received for previous message, proceed to next (if any) # ack received for previous message, proceed to next (if any)
# return true if we manage the ack ourselves, false if it needs to be done upper # return true if we manage the ack ourselves, false if it needs to be done upper
def status_ok_received(msg) def status_ok_received(msg)
# log(format("MTR: Matter_IM_SubscribedHeartbeat status_ok_received sub=%i exch=%i", self.sub.subscription_id, self.resp.exchange_id), 3) # log(format("MTR: IM_SubscribedHeartbeat status_ok_received sub=%i exch=%i", self.sub.subscription_id, self.resp.exchange_id), 3)
return false # let the caller to the ack return false # let the caller to the ack
end end
#################################################################################
# default responder for data # default responder for data
def send_im(responder) def send_im(responder)
# log(format("MTR: Matter_IM_SubscribedHeartbeat send sub=%i exch=%i ready=%i", self.sub.subscription_id, self.resp.exchange_id, self.ready ? 1 : 0), 3) # log(format("MTR: IM_SubscribedHeartbeat send sub=%i exch=%i ready=%i", self.sub.subscription_id, self.resp.exchange_id, self.ready ? 1 : 0), 3)
if !self.ready return false end
super(self).send_im(responder) super(self).send_im(responder)
self.ready = false self.ready = false
end end
@ -510,6 +573,7 @@ class Matter_IM_SubscribeResponse_Pull : Matter_IM_ReportData_Pull
var sub # subscription object var sub # subscription object
var report_data_phase # true during reportdata var report_data_phase # true during reportdata
#################################################################################
def init(msg, ctx_generator_or_arr, event_generator_or_arr, sub) def init(msg, ctx_generator_or_arr, event_generator_or_arr, sub)
super(self).init(msg, ctx_generator_or_arr, event_generator_or_arr) super(self).init(msg, ctx_generator_or_arr, event_generator_or_arr)
self.sub = sub self.sub = sub
@ -517,16 +581,16 @@ class Matter_IM_SubscribeResponse_Pull : Matter_IM_ReportData_Pull
self.set_subscription_id(sub.subscription_id) self.set_subscription_id(sub.subscription_id)
end end
#################################################################################
# default responder for data # default responder for data
def send_im(responder) def send_im(responder)
# log(format("MTR: Matter_IM_SubscribeResponse send sub=%i ready=%i", self.sub.subscription_id, self.ready ? 1 : 0), 3) # log(format("MTR: Matter_IM_SubscribeResponse send sub=%i ready=%i report_data_phase=%s", self.sub.subscription_id, self.ready ? 1 : 0, self.report_data_phase), 3)
if !self.ready return false end
if self.report_data_phase if self.report_data_phase
super(self).send_im(responder) super(self).send_im(responder)
if self.finish if self.finishing
# finished reporting of data, we still need to send SubscribeResponseMessage after next StatusReport # finished reporting of data, we still need to send SubscribeResponseMessage after next StatusReport
self.report_data_phase = false self.report_data_phase = false
self.finish = false # we continue to subscribe response self.finishing = false # we continue to subscribe response
end end
self.ready = false # wait for Status Report before continuing sending self.ready = false # wait for Status Report before continuing sending
@ -543,20 +607,23 @@ class Matter_IM_SubscribeResponse_Pull : Matter_IM_ReportData_Pull
resp.encrypt() resp.encrypt()
responder.send_response_frame(resp) responder.send_response_frame(resp)
self.last_counter = resp.message_counter self.last_counter = resp.message_counter
# log(format("MTR: Send SubscribeResponseMessage sub=%i id=%i", self.sub.subscription_id, resp.message_counter), 3) # log(f"MTR: Send SubscribeResponseMessage sub={self.sub.subscription_id} id={resp.message_counter}", 3)
self.sub.re_arm() self.sub.re_arm()
self.finish = true # remove exchange self.finishing = true # remove exchange
end end
end end
# Status ok received # Status ok received
def status_ok_received(msg) def status_ok_received(msg)
# log(format("MTR: IM_SubscribeResponse status_ok_received sub=%i exch=%i ack=%i last_counter=%i", self.sub.subscription_id, self.resp.exchange_id, msg.ack_message_counter ? msg.ack_message_counter : 0 , self.last_counter), 3) # log(format("MTR: IM_SubscribeResponse status_ok_received sub=%i exch=%i ack=%i last_counter=%i finished=%s", self.sub.subscription_id, self.resp.exchange_id, msg.ack_message_counter ? msg.ack_message_counter : 0 , self.last_counter, self.finished), 3)
# once we receive ack, open flow for subscriptions # once we receive ack, open flow for subscriptions
if tasmota.loglevel(3) if tasmota.loglevel(3)
log(format("MTR: >Sub_OK (%6i) sub=%i", msg.session.local_session_id, self.sub.subscription_id), 3) log(format("MTR: >Sub_OK (%6i) sub=%i", msg.session.local_session_id, self.sub.subscription_id), 3)
end end
return super(self).status_ok_received(msg) return super(self).status_ok_received(msg)
if !self.report_data_phase
self.finishing = true
end
end end
end end

View File

@ -78,7 +78,7 @@ be_local_closure(class_Matter_IM_process_read_request_pull, /* name */
extern const bclass be_class_Matter_IM; extern const bclass be_class_Matter_IM;
be_local_closure(class_Matter_IM_process_incoming_ack, /* name */ be_local_closure(class_Matter_IM_process_incoming_ack, /* name */
be_nested_proto( be_nested_proto(
6, /* nstack */ 7, /* nstack */
2, /* argc */ 2, /* argc */
2, /* varg */ 2, /* varg */
0, /* has upvals */ 0, /* has upvals */
@ -86,24 +86,31 @@ be_local_closure(class_Matter_IM_process_incoming_ack, /* name */
0, /* has sup protos */ 0, /* has sup protos */
&be_class_Matter_IM, &be_class_Matter_IM,
1, /* has constants */ 1, /* has constants */
( &(const bvalue[ 3]) { /* constants */ ( &(const bvalue[ 5]) { /* constants */
/* K0 */ be_nested_str_weak(find_sendqueue_by_exchangeid), /* K0 */ be_nested_str_weak(find_sendqueue_by_exchangeid),
/* K1 */ be_nested_str_weak(exchange_id), /* K1 */ be_nested_str_weak(exchange_id),
/* K2 */ be_nested_str_weak(ack_received), /* K2 */ be_nested_str_weak(ack_received),
/* K3 */ be_nested_str_weak(finished),
/* K4 */ be_nested_str_weak(remove_sendqueue_by_exchangeid),
}), }),
be_str_weak(process_incoming_ack), be_str_weak(process_incoming_ack),
&be_const_str_solidified, &be_const_str_solidified,
( &(const binstruction[10]) { /* code */ ( &(const binstruction[15]) { /* code */
0x8C080100, // 0000 GETMET R2 R0 K0 0x8C080100, // 0000 GETMET R2 R0 K0
0x88100301, // 0001 GETMBR R4 R1 K1 0x88100301, // 0001 GETMBR R4 R1 K1
0x7C080400, // 0002 CALL R2 2 0x7C080400, // 0002 CALL R2 2
0x780A0003, // 0003 JMPF R2 #0008 0x780A0008, // 0003 JMPF R2 #000D
0x8C0C0502, // 0004 GETMET R3 R2 K2 0x8C0C0502, // 0004 GETMET R3 R2 K2
0x5C140200, // 0005 MOVE R5 R1 0x5C140200, // 0005 MOVE R5 R1
0x7C0C0400, // 0006 CALL R3 2 0x7C0C0400, // 0006 CALL R3 2
0x80040600, // 0007 RET 1 R3 0x88100503, // 0007 GETMBR R4 R2 K3
0x500C0000, // 0008 LDBOOL R3 0 0 0x78120002, // 0008 JMPF R4 #000C
0x80040600, // 0009 RET 1 R3 0x8C100104, // 0009 GETMET R4 R0 K4
0x88180301, // 000A GETMBR R6 R1 K1
0x7C100400, // 000B CALL R4 2
0x80040600, // 000C RET 1 R3
0x500C0000, // 000D LDBOOL R3 0 0
0x80040600, // 000E RET 1 R3
}) })
) )
); );
@ -1564,28 +1571,26 @@ be_local_closure(class_Matter_IM_send_enqueued, /* name */
0, /* has sup protos */ 0, /* has sup protos */
&be_class_Matter_IM, &be_class_Matter_IM,
1, /* has constants */ 1, /* has constants */
( &(const bvalue[11]) { /* constants */ ( &(const bvalue[ 9]) { /* constants */
/* K0 */ be_const_int(0), /* K0 */ be_const_int(0),
/* K1 */ be_nested_str_weak(send_queue), /* K1 */ be_nested_str_weak(send_queue),
/* K2 */ be_nested_str_weak(finish), /* K2 */ be_nested_str_weak(finished),
/* K3 */ be_nested_str_weak(ready), /* K3 */ be_nested_str_weak(ready),
/* K4 */ be_nested_str_weak(send_im), /* K4 */ be_nested_str_weak(send_im),
/* K5 */ be_nested_str_weak(log), /* K5 */ be_nested_str_weak(remove_sendqueue_by_exchangeid),
/* K6 */ be_nested_str_weak(MTR_X3A_X20remove_X20IM_X20message_X20exch_X3D), /* K6 */ be_nested_str_weak(resp),
/* K7 */ be_nested_str_weak(resp), /* K7 */ be_nested_str_weak(exchange_id),
/* K8 */ be_nested_str_weak(exchange_id), /* K8 */ be_const_int(1),
/* K9 */ be_nested_str_weak(remove),
/* K10 */ be_const_int(1),
}), }),
be_str_weak(send_enqueued), be_str_weak(send_enqueued),
&be_const_str_solidified, &be_const_str_solidified,
( &(const binstruction[33]) { /* code */ ( &(const binstruction[25]) { /* code */
0x58080000, // 0000 LDCONST R2 K0 0x58080000, // 0000 LDCONST R2 K0
0x600C000C, // 0001 GETGBL R3 G12 0x600C000C, // 0001 GETGBL R3 G12
0x88100101, // 0002 GETMBR R4 R0 K1 0x88100101, // 0002 GETMBR R4 R0 K1
0x7C0C0200, // 0003 CALL R3 1 0x7C0C0200, // 0003 CALL R3 1
0x140C0403, // 0004 LT R3 R2 R3 0x140C0403, // 0004 LT R3 R2 R3
0x780E0019, // 0005 JMPF R3 #0020 0x780E0011, // 0005 JMPF R3 #0018
0x880C0101, // 0006 GETMBR R3 R0 K1 0x880C0101, // 0006 GETMBR R3 R0 K1
0x940C0602, // 0007 GETIDX R3 R3 R2 0x940C0602, // 0007 GETIDX R3 R3 R2
0x88100702, // 0008 GETMBR R4 R3 K2 0x88100702, // 0008 GETMBR R4 R3 K2
@ -1596,23 +1601,15 @@ be_local_closure(class_Matter_IM_send_enqueued, /* name */
0x5C180200, // 000D MOVE R6 R1 0x5C180200, // 000D MOVE R6 R1
0x7C100400, // 000E CALL R4 2 0x7C100400, // 000E CALL R4 2
0x88100702, // 000F GETMBR R4 R3 K2 0x88100702, // 000F GETMBR R4 R3 K2
0x7812000C, // 0010 JMPF R4 #001E 0x78120004, // 0010 JMPF R4 #0016
0xB8120A00, // 0011 GETNGBL R4 K5 0x8C100105, // 0011 GETMET R4 R0 K5
0x60140008, // 0012 GETGBL R5 G8 0x88180706, // 0012 GETMBR R6 R3 K6
0x88180707, // 0013 GETMBR R6 R3 K7 0x88180D07, // 0013 GETMBR R6 R6 K7
0x88180D08, // 0014 GETMBR R6 R6 K8 0x7C100400, // 0014 CALL R4 2
0x7C140200, // 0015 CALL R5 1 0x70020000, // 0015 JMP #0017
0x00160C05, // 0016 ADD R5 K6 R5 0x00080508, // 0016 ADD R2 R2 K8
0x541A0003, // 0017 LDINT R6 4 0x7001FFE8, // 0017 JMP #0001
0x7C100400, // 0018 CALL R4 2 0x80000000, // 0018 RET 0
0x88100101, // 0019 GETMBR R4 R0 K1
0x8C100909, // 001A GETMET R4 R4 K9
0x5C180400, // 001B MOVE R6 R2
0x7C100400, // 001C CALL R4 2
0x70020000, // 001D JMP #001F
0x0008050A, // 001E ADD R2 R2 K10
0x7001FFE0, // 001F JMP #0001
0x80000000, // 0020 RET 0
}) })
) )
); );