From 7fee430e9fc854bf226706d14a00c3328a6c4d04 Mon Sep 17 00:00:00 2001 From: Stephan Hadinger Date: Sat, 10 Sep 2022 19:08:26 +0200 Subject: [PATCH] Berry has persistent MQTT subscriptions: auto-subscribe at (re)connection --- CHANGELOG.md | 1 + lib/libesp32/berry_tasmota/src/be_mqtt_lib.c | 716 ++++++++++-------- .../berry_tasmota/src/be_tasmota_lib.c | 4 +- .../berry_tasmota/src/embedded/mqtt.be | 171 +++++ .../xdrv_52_3_berry_mqtt.ino | 88 +-- 5 files changed, 631 insertions(+), 349 deletions(-) create mode 100644 lib/libesp32/berry_tasmota/src/embedded/mqtt.be diff --git a/CHANGELOG.md b/CHANGELOG.md index 112d92676..09350b64f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ All notable changes to this project will be documented in this file. ## [12.1.1.2] ### Added +- Berry has persistent MQTT subscriptions: auto-subscribe at (re)connection ### Changed diff --git a/lib/libesp32/berry_tasmota/src/be_mqtt_lib.c b/lib/libesp32/berry_tasmota/src/be_mqtt_lib.c index 3ac9a911d..a8ab47b3c 100644 --- a/lib/libesp32/berry_tasmota/src/be_mqtt_lib.c +++ b/lib/libesp32/berry_tasmota/src/be_mqtt_lib.c @@ -6,137 +6,330 @@ #include "be_constobj.h" #include "be_mapping.h" -/* Embedded code */ +extern int be_mqtt_publish(bvm *vm); -/* +extern void be_mqtt_subscribe(const char* topic); BE_FUNC_CTYPE_DECLARE(be_mqtt_subscribe, "", "-s") +extern void be_mqtt_unsubscribe(const char* topic); BE_FUNC_CTYPE_DECLARE(be_mqtt_unsubscribe, "", "-s") +extern bbool be_mqtt_connected(void); BE_FUNC_CTYPE_DECLARE(be_mqtt_connected, "b", "-") -def _(action, topic, closure) # inner class for subscribe and unsubscribe - class mqtt_listener - var topic # topic as a list of subtopics - var fulltopic # fulltopic as string - var closure +/* @const_object_info_begin +class be_class_MQTT_ntv (scope: global, name: MQTT_ntv) { + publish, func(be_mqtt_publish) + _subscribe, ctype_func(be_mqtt_subscribe) + _unsubscribe, ctype_func(be_mqtt_unsubscribe) + connected, ctype_func(be_mqtt_connected) +} +@const_object_info_end */ - def init(topic, closure) - import string - self.fulltopic = topic - self.topic = string.split(topic, '/') - self.closure = closure - tasmota.add_driver(self) - end +#include "be_fixed_be_class_MQTT_ntv.h" - def remove() - tasmota.remove_driver(self) - end +// mqtt module +extern const bclass be_class_MQTT; +static int zigbee_init(bvm *vm) { + be_pushntvclass(vm, &be_class_MQTT); + be_call(vm, 0); + be_return(vm); +} - def mqtt_data(topic, idx, payload_s, payload_b) - # check if the topic matches the patter - import string - var topic_elts = string.split(topic, '/') - var topic_sz = size(topic_elts) - var pat = self.topic - var pat_sz = size(pat) - var i = 0 - while i < pat_sz - var pat_elt = pat[i] +#include "be_fixed_mqtt.h" +/* @const_object_info_begin +module mqtt (scope: global) { + init, func(zigbee_init) +} +@const_object_info_end */ - if pat_elt == '#' - # joker, munch whatever is left - # '#' is supposed to be the last character of the topic (we don't check it) - break - elif i >= topic_sz - # the topic is too short - no match - return false - elif pat_elt == '+' - # pass - elif pat_elt != topic_elts[i] - # topic element are different - no match - return false - end - - i += 1 - end - - if i >= pat_sz && pat_sz != topic_sz - # the topic is too long and the pattern did not finish with '#' - no match - return false - end - - var cl = self.closure - var ret = cl(topic, idx, payload_s, payload_b) - if ret == nil ret = true end # return true if the return value is forgotten - return ret - end - end - - import mqtt - if action - # subscribe - mqtt._subscribe(topic) - if type(closure) == 'function' - tasmota.check_not_method(closure) - mqtt_listener(topic, closure) - end - else - # unsubscribe - if topic != nil - mqtt._unsubscribe(topic) - end - - # scan through drivers if any matches the topic - for d: tasmota._drivers.copy() # make a copy of the list since we might modify it on the fly - if isinstance(d, mqtt_listener) - # class of interest - # topic was specified - if topic != nil - if d.fulltopic == topic - d.remove() - end - else - # remove all topics - if d.fulltopic != topic # don't call again unsubscribe if we just did - mqtt._unsubscribe(d.fulltopic) - end - d.remove() - end - end - end - end -end - -def subscribe(topic, closure) - import mqtt - mqtt._(true, topic, closure) -end - -def unsubscribe(topic) - import mqtt - mqtt._(false, topic) -end - -solidify.dump(_) solidify.dump(subscribe) solidify.dump(unsubscribe) - -#- -# example - -import mqtt -def p(a,b,c) print("mqtt",a,b,c) end - -mqtt.subscribe("/a/b", p) -mqtt.subscribe("/a/b/c", p) -mqtt.subscribe("#", p) - -print(">unsub /a/b") -mqtt.unsubscribe("/a/b") -print(">unsub all") -mqtt.unsubscribe() +// private class MQTT +/******************************************************************** +** Solidified function: mqtt_data +********************************************************************/ +be_local_closure(MQTT_mqtt_data, /* name */ + be_nested_proto( + 14, /* nstack */ + 5, /* argc */ + 2, /* varg */ + 0, /* has upvals */ + NULL, /* no upvals */ + 0, /* has sup protos */ + NULL, /* no sub protos */ + 1, /* has constants */ + ( &(const bvalue[ 4]) { /* constants */ + /* K0 */ be_nested_str(topics), + /* K1 */ be_nested_str(closure), + /* K2 */ be_nested_str(mqtt_data), + /* K3 */ be_nested_str(stop_iteration), + }), + &be_const_str_mqtt_data, + &be_const_str_solidified, + ( &(const binstruction[32]) { /* code */ + 0x88140100, // 0000 GETMBR R5 R0 K0 + 0x4C180000, // 0001 LDNIL R6 + 0x1C140A06, // 0002 EQ R5 R5 R6 + 0x78160000, // 0003 JMPF R5 #0005 + 0x80000A00, // 0004 RET 0 + 0x50140000, // 0005 LDBOOL R5 0 0 + 0x60180010, // 0006 GETGBL R6 G16 + 0x881C0100, // 0007 GETMBR R7 R0 K0 + 0x7C180200, // 0008 CALL R6 1 + 0xA8020011, // 0009 EXBLK 0 #001C + 0x5C1C0C00, // 000A MOVE R7 R6 + 0x7C1C0000, // 000B CALL R7 0 + 0x88200F01, // 000C GETMBR R8 R7 K1 + 0x4C240000, // 000D LDNIL R9 + 0x20201009, // 000E NE R8 R8 R9 + 0x7822000A, // 000F JMPF R8 #001B + 0x8C200F02, // 0010 GETMET R8 R7 K2 + 0x5C280200, // 0011 MOVE R10 R1 + 0x5C2C0400, // 0012 MOVE R11 R2 + 0x5C300600, // 0013 MOVE R12 R3 + 0x5C340800, // 0014 MOVE R13 R4 + 0x7C200A00, // 0015 CALL R8 5 + 0x74160001, // 0016 JMPT R5 #0019 + 0x74220000, // 0017 JMPT R8 #0019 + 0x50200001, // 0018 LDBOOL R8 0 1 + 0x50200200, // 0019 LDBOOL R8 1 0 + 0x5C141000, // 001A MOVE R5 R8 + 0x7001FFED, // 001B JMP #000A + 0x58180003, // 001C LDCONST R6 K3 + 0xAC180200, // 001D CATCH R6 1 0 + 0xB0080000, // 001E RAISE 2 R0 R0 + 0x80040A00, // 001F RET 1 R5 + }) + ) +); +/*******************************************************************/ --# +/******************************************************************** +** Solidified function: lazy_init +********************************************************************/ +be_local_closure(MQTT_lazy_init, /* name */ + be_nested_proto( + 5, /* nstack */ + 1, /* argc */ + 2, /* varg */ + 0, /* has upvals */ + NULL, /* no upvals */ + 1, /* has sup protos */ + ( &(const struct bproto*[ 1]) { + be_nested_proto( + 2, /* nstack */ + 0, /* argc */ + 0, /* varg */ + 1, /* has upvals */ + ( &(const bupvaldesc[ 1]) { /* upvals */ + be_local_const_upval(1, 0), + }), + 0, /* has sup protos */ + NULL, /* no sub protos */ + 1, /* has constants */ + ( &(const bvalue[ 1]) { /* constants */ + /* K0 */ be_nested_str(mqtt_connect), + }), + &be_const_str__X3Clambda_X3E, + &be_const_str_solidified, + ( &(const binstruction[ 4]) { /* code */ + 0x68000000, // 0000 GETUPV R0 U0 + 0x8C000100, // 0001 GETMET R0 R0 K0 + 0x7C000200, // 0002 CALL R0 1 + 0x80040000, // 0003 RET 1 R0 + }) + ), + }), + 1, /* has constants */ + ( &(const bvalue[ 5]) { /* constants */ + /* K0 */ be_nested_str(topics), + /* K1 */ be_nested_str(tasmota), + /* K2 */ be_nested_str(add_driver), + /* K3 */ be_nested_str(add_rule), + /* K4 */ be_nested_str(Mqtt_X23Connected), + }), + &be_const_str_lazy_init, + &be_const_str_solidified, + ( &(const binstruction[18]) { /* code */ + 0x88040100, // 0000 GETMBR R1 R0 K0 + 0x4C080000, // 0001 LDNIL R2 + 0x1C040202, // 0002 EQ R1 R1 R2 + 0x7806000B, // 0003 JMPF R1 #0010 + 0x60040012, // 0004 GETGBL R1 G18 + 0x7C040000, // 0005 CALL R1 0 + 0x90020001, // 0006 SETMBR R0 K0 R1 + 0xB8060200, // 0007 GETNGBL R1 K1 + 0x8C040302, // 0008 GETMET R1 R1 K2 + 0x5C0C0000, // 0009 MOVE R3 R0 + 0x7C040400, // 000A CALL R1 2 + 0xB8060200, // 000B GETNGBL R1 K1 + 0x8C040303, // 000C GETMET R1 R1 K3 + 0x580C0004, // 000D LDCONST R3 K4 + 0x84100000, // 000E CLOSURE R4 P0 + 0x7C040600, // 000F CALL R1 3 + 0xA0000000, // 0010 CLOSE R0 + 0x80000000, // 0011 RET 0 + }) + ) +); +/*******************************************************************/ -*/ +/******************************************************************** +** Solidified function: unsubscribe +********************************************************************/ +be_local_closure(MQTT_unsubscribe, /* name */ + be_nested_proto( + 6, /* nstack */ + 2, /* argc */ + 2, /* varg */ + 0, /* has upvals */ + NULL, /* no upvals */ + 0, /* has sup protos */ + NULL, /* no sub protos */ + 1, /* has constants */ + ( &(const bvalue[ 6]) { /* constants */ + /* K0 */ be_nested_str(topics), + /* K1 */ be_const_int(0), + /* K2 */ be_nested_str(fulltopic), + /* K3 */ be_nested_str(_unsubscribe), + /* K4 */ be_nested_str(remove), + /* K5 */ be_const_int(1), + }), + &be_const_str_unsubscribe, + &be_const_str_solidified, + ( &(const binstruction[41]) { /* code */ + 0x88080100, // 0000 GETMBR R2 R0 K0 + 0x4C0C0000, // 0001 LDNIL R3 + 0x1C080403, // 0002 EQ R2 R2 R3 + 0x780A0000, // 0003 JMPF R2 #0005 + 0x80000400, // 0004 RET 0 + 0x58080001, // 0005 LDCONST R2 K1 + 0x600C000C, // 0006 GETGBL R3 G12 + 0x88100100, // 0007 GETMBR R4 R0 K0 + 0x7C0C0200, // 0008 CALL R3 1 + 0x140C0403, // 0009 LT R3 R2 R3 + 0x780E0016, // 000A JMPF R3 #0022 + 0x4C0C0000, // 000B LDNIL R3 + 0x1C0C0203, // 000C EQ R3 R1 R3 + 0x740E0004, // 000D JMPT R3 #0013 + 0x880C0100, // 000E GETMBR R3 R0 K0 + 0x940C0602, // 000F GETIDX R3 R3 R2 + 0x880C0702, // 0010 GETMBR R3 R3 K2 + 0x1C0C0601, // 0011 EQ R3 R3 R1 + 0x780E000C, // 0012 JMPF R3 #0020 + 0x4C0C0000, // 0013 LDNIL R3 + 0x1C0C0203, // 0014 EQ R3 R1 R3 + 0x780E0004, // 0015 JMPF R3 #001B + 0x8C0C0103, // 0016 GETMET R3 R0 K3 + 0x88140100, // 0017 GETMBR R5 R0 K0 + 0x94140A02, // 0018 GETIDX R5 R5 R2 + 0x88140B02, // 0019 GETMBR R5 R5 K2 + 0x7C0C0400, // 001A CALL R3 2 + 0x880C0100, // 001B GETMBR R3 R0 K0 + 0x8C0C0704, // 001C GETMET R3 R3 K4 + 0x5C140400, // 001D MOVE R5 R2 + 0x7C0C0400, // 001E CALL R3 2 + 0x70020000, // 001F JMP #0021 + 0x00080505, // 0020 ADD R2 R2 K5 + 0x7001FFE3, // 0021 JMP #0006 + 0x4C0C0000, // 0022 LDNIL R3 + 0x200C0203, // 0023 NE R3 R1 R3 + 0x780E0002, // 0024 JMPF R3 #0028 + 0x8C0C0103, // 0025 GETMET R3 R0 K3 + 0x5C140200, // 0026 MOVE R5 R1 + 0x7C0C0400, // 0027 CALL R3 2 + 0x80000000, // 0028 RET 0 + }) + ) +); +/*******************************************************************/ +/******************************************************************** +** Solidified function: mqtt_connect +********************************************************************/ +be_local_closure(MQTT_mqtt_connect, /* name */ + be_nested_proto( + 7, /* nstack */ + 1, /* argc */ + 2, /* varg */ + 0, /* has upvals */ + NULL, /* no upvals */ + 0, /* has sup protos */ + NULL, /* no sub protos */ + 1, /* has constants */ + ( &(const bvalue[ 8]) { /* constants */ + /* K0 */ be_nested_str(tasmota), + /* K1 */ be_nested_str(log), + /* K2 */ be_nested_str(BRY_X3A_X20mqtt_X20subscribe_X20all_X20registered_X20topics), + /* K3 */ be_const_int(3), + /* K4 */ be_nested_str(topics), + /* K5 */ be_nested_str(fulltopic), + /* K6 */ be_nested_str(_subscribe), + /* K7 */ be_nested_str(stop_iteration), + }), + &be_const_str_mqtt_connect, + &be_const_str_solidified, + ( &(const binstruction[21]) { /* code */ + 0xB8060000, // 0000 GETNGBL R1 K0 + 0x8C040301, // 0001 GETMET R1 R1 K1 + 0x580C0002, // 0002 LDCONST R3 K2 + 0x58100003, // 0003 LDCONST R4 K3 + 0x7C040600, // 0004 CALL R1 3 + 0x60040010, // 0005 GETGBL R1 G16 + 0x88080104, // 0006 GETMBR R2 R0 K4 + 0x7C040200, // 0007 CALL R1 1 + 0xA8020006, // 0008 EXBLK 0 #0010 + 0x5C080200, // 0009 MOVE R2 R1 + 0x7C080000, // 000A CALL R2 0 + 0x880C0505, // 000B GETMBR R3 R2 K5 + 0x8C100106, // 000C GETMET R4 R0 K6 + 0x5C180600, // 000D MOVE R6 R3 + 0x7C100400, // 000E CALL R4 2 + 0x7001FFF8, // 000F JMP #0009 + 0x58040007, // 0010 LDCONST R1 K7 + 0xAC040200, // 0011 CATCH R1 1 0 + 0xB0080000, // 0012 RAISE 2 R0 R0 + 0x50040000, // 0013 LDBOOL R1 0 0 + 0x80040200, // 0014 RET 1 R1 + }) + ) +); +/*******************************************************************/ + + +/******************************************************************** +** Solidified function: tostring +********************************************************************/ +be_local_closure(mqtt_listener_tostring, /* name */ + be_nested_proto( + 7, /* nstack */ + 1, /* argc */ + 2, /* varg */ + 0, /* has upvals */ + NULL, /* no upvals */ + 0, /* has sup protos */ + NULL, /* no sub protos */ + 1, /* has constants */ + ( &(const bvalue[ 4]) { /* constants */ + /* K0 */ be_nested_str(string), + /* K1 */ be_nested_str(format), + /* K2 */ be_nested_str(_X3Cinstance_X3A_X20_X25s_X28_X27_X25s_X27_X29_X3E), + /* K3 */ be_nested_str(fulltopic), + }), + &be_const_str_tostring, + &be_const_str_solidified, + ( &(const binstruction[ 9]) { /* code */ + 0xA4060000, // 0000 IMPORT R1 K0 + 0x8C080301, // 0001 GETMET R2 R1 K1 + 0x58100002, // 0002 LDCONST R4 K2 + 0x60140005, // 0003 GETGBL R5 G5 + 0x5C180000, // 0004 MOVE R6 R0 + 0x7C140200, // 0005 CALL R5 1 + 0x88180103, // 0006 GETMBR R6 R0 K3 + 0x7C080800, // 0007 CALL R2 4 + 0x80040400, // 0008 RET 1 R2 + }) + ) +); +/*******************************************************************/ + /******************************************************************** ** Solidified function: init @@ -151,19 +344,17 @@ be_local_closure(mqtt_listener_init, /* name */ 0, /* has sup protos */ NULL, /* no sub protos */ 1, /* has constants */ - ( &(const bvalue[ 8]) { /* constants */ + ( &(const bvalue[ 6]) { /* constants */ /* K0 */ be_nested_str(string), /* K1 */ be_nested_str(fulltopic), /* K2 */ be_nested_str(topic), /* K3 */ be_nested_str(split), /* K4 */ be_nested_str(_X2F), /* K5 */ be_nested_str(closure), - /* K6 */ be_nested_str(tasmota), - /* K7 */ be_nested_str(add_driver), }), &be_const_str_init, &be_const_str_solidified, - ( &(const binstruction[13]) { /* code */ + ( &(const binstruction[ 9]) { /* code */ 0xA40E0000, // 0000 IMPORT R3 K0 0x90020201, // 0001 SETMBR R0 K1 R1 0x8C100703, // 0002 GETMET R4 R3 K3 @@ -172,42 +363,7 @@ be_local_closure(mqtt_listener_init, /* name */ 0x7C100600, // 0005 CALL R4 3 0x90020404, // 0006 SETMBR R0 K2 R4 0x90020A02, // 0007 SETMBR R0 K5 R2 - 0xB8120C00, // 0008 GETNGBL R4 K6 - 0x8C100907, // 0009 GETMET R4 R4 K7 - 0x5C180000, // 000A MOVE R6 R0 - 0x7C100400, // 000B CALL R4 2 - 0x80000000, // 000C RET 0 - }) - ) -); -/*******************************************************************/ - - -/******************************************************************** -** Solidified function: remove -********************************************************************/ -be_local_closure(mqtt_listener_remove, /* name */ - be_nested_proto( - 4, /* nstack */ - 1, /* argc */ - 2, /* varg */ - 0, /* has upvals */ - NULL, /* no upvals */ - 0, /* has sup protos */ - NULL, /* no sub protos */ - 1, /* has constants */ - ( &(const bvalue[ 2]) { /* constants */ - /* K0 */ be_nested_str(tasmota), - /* K1 */ be_nested_str(remove_driver), - }), - &be_const_str_remove, - &be_const_str_solidified, - ( &(const binstruction[ 5]) { /* code */ - 0xB8060000, // 0000 GETNGBL R1 K0 - 0x8C040301, // 0001 GETMET R1 R1 K1 - 0x5C0C0000, // 0002 MOVE R3 R0 - 0x7C040400, // 0003 CALL R1 2 - 0x80000000, // 0004 RET 0 + 0x80000000, // 0008 RET 0 }) ) ); @@ -308,109 +464,38 @@ be_local_class(mqtt_listener, NULL, be_nested_map(6, ( (struct bmapnode*) &(const bmapnode[]) { - { be_const_key(remove, -1), be_const_closure(mqtt_listener_remove_closure) }, { be_const_key(mqtt_data, -1), be_const_closure(mqtt_listener_mqtt_data_closure) }, + { be_const_key(tostring, -1), be_const_closure(mqtt_listener_tostring_closure) }, { be_const_key(topic, -1), be_const_var(0) }, - { be_const_key(init, 0), be_const_closure(mqtt_listener_init_closure) }, - { be_const_key(closure, 1), be_const_var(2) }, + { be_const_key(init, -1), be_const_closure(mqtt_listener_init_closure) }, + { be_const_key(closure, 0), be_const_var(2) }, { be_const_key(fulltopic, -1), be_const_var(1) }, })), (bstring*) &be_const_str_mqtt_listener ); /******************************************************************** -** Solidified function: _ +** Solidified function: mqtt_listener_class ********************************************************************/ -be_local_closure(_, /* name */ +be_local_closure(MQTT_mqtt_listener_class, /* name */ be_nested_proto( - 10, /* nstack */ - 3, /* argc */ - 0, /* varg */ + 2, /* nstack */ + 1, /* argc */ + 2, /* varg */ 0, /* has upvals */ NULL, /* no upvals */ 0, /* has sup protos */ NULL, /* no sub protos */ 1, /* has constants */ - ( &(const bvalue[12]) { /* constants */ + ( &(const bvalue[ 1]) { /* constants */ /* K0 */ be_const_class(be_class_mqtt_listener), - /* K1 */ be_nested_str(mqtt), - /* K2 */ be_nested_str(_subscribe), - /* K3 */ be_nested_str(function), - /* K4 */ be_nested_str(tasmota), - /* K5 */ be_nested_str(check_not_method), - /* K6 */ be_nested_str(_unsubscribe), - /* K7 */ be_nested_str(_drivers), - /* K8 */ be_nested_str(copy), - /* K9 */ be_nested_str(fulltopic), - /* K10 */ be_nested_str(remove), - /* K11 */ be_nested_str(stop_iteration), }), - &be_const_str__, + &be_const_str_mqtt_listener_class, &be_const_str_solidified, - ( &(const binstruction[63]) { /* code */ - 0x580C0000, // 0000 LDCONST R3 K0 + ( &(const binstruction[ 3]) { /* code */ + 0x58040000, // 0000 LDCONST R1 K0 0xB4000000, // 0001 CLASS K0 - 0xA4120200, // 0002 IMPORT R4 K1 - 0x78020010, // 0003 JMPF R0 #0015 - 0x8C140902, // 0004 GETMET R5 R4 K2 - 0x5C1C0200, // 0005 MOVE R7 R1 - 0x7C140400, // 0006 CALL R5 2 - 0x60140004, // 0007 GETGBL R5 G4 - 0x5C180400, // 0008 MOVE R6 R2 - 0x7C140200, // 0009 CALL R5 1 - 0x1C140B03, // 000A EQ R5 R5 K3 - 0x78160007, // 000B JMPF R5 #0014 - 0xB8160800, // 000C GETNGBL R5 K4 - 0x8C140B05, // 000D GETMET R5 R5 K5 - 0x5C1C0400, // 000E MOVE R7 R2 - 0x7C140400, // 000F CALL R5 2 - 0x5C140600, // 0010 MOVE R5 R3 - 0x5C180200, // 0011 MOVE R6 R1 - 0x5C1C0400, // 0012 MOVE R7 R2 - 0x7C140400, // 0013 CALL R5 2 - 0x70020028, // 0014 JMP #003E - 0x4C140000, // 0015 LDNIL R5 - 0x20140205, // 0016 NE R5 R1 R5 - 0x78160002, // 0017 JMPF R5 #001B - 0x8C140906, // 0018 GETMET R5 R4 K6 - 0x5C1C0200, // 0019 MOVE R7 R1 - 0x7C140400, // 001A CALL R5 2 - 0x60140010, // 001B GETGBL R5 G16 - 0xB81A0800, // 001C GETNGBL R6 K4 - 0x88180D07, // 001D GETMBR R6 R6 K7 - 0x8C180D08, // 001E GETMET R6 R6 K8 - 0x7C180200, // 001F CALL R6 1 - 0x7C140200, // 0020 CALL R5 1 - 0xA8020018, // 0021 EXBLK 0 #003B - 0x5C180A00, // 0022 MOVE R6 R5 - 0x7C180000, // 0023 CALL R6 0 - 0x601C000F, // 0024 GETGBL R7 G15 - 0x5C200C00, // 0025 MOVE R8 R6 - 0x5C240600, // 0026 MOVE R9 R3 - 0x7C1C0400, // 0027 CALL R7 2 - 0x781E0010, // 0028 JMPF R7 #003A - 0x4C1C0000, // 0029 LDNIL R7 - 0x201C0207, // 002A NE R7 R1 R7 - 0x781E0005, // 002B JMPF R7 #0032 - 0x881C0D09, // 002C GETMBR R7 R6 K9 - 0x1C1C0E01, // 002D EQ R7 R7 R1 - 0x781E0001, // 002E JMPF R7 #0031 - 0x8C1C0D0A, // 002F GETMET R7 R6 K10 - 0x7C1C0200, // 0030 CALL R7 1 - 0x70020007, // 0031 JMP #003A - 0x881C0D09, // 0032 GETMBR R7 R6 K9 - 0x201C0E01, // 0033 NE R7 R7 R1 - 0x781E0002, // 0034 JMPF R7 #0038 - 0x8C1C0906, // 0035 GETMET R7 R4 K6 - 0x88240D09, // 0036 GETMBR R9 R6 K9 - 0x7C1C0400, // 0037 CALL R7 2 - 0x8C1C0D0A, // 0038 GETMET R7 R6 K10 - 0x7C1C0200, // 0039 CALL R7 1 - 0x7001FFE6, // 003A JMP #0022 - 0x5814000B, // 003B LDCONST R5 K11 - 0xAC140200, // 003C CATCH R5 1 0 - 0xB0080000, // 003D RAISE 2 R0 R0 - 0x80000000, // 003E RET 0 + 0x80040200, // 0002 RET 1 R1 }) ) ); @@ -420,30 +505,82 @@ be_local_closure(_, /* name */ /******************************************************************** ** Solidified function: subscribe ********************************************************************/ -be_local_closure(subscribe, /* name */ +be_local_closure(MQTT_subscribe, /* name */ be_nested_proto( - 8, /* nstack */ - 2, /* argc */ - 0, /* varg */ + 10, /* nstack */ + 3, /* argc */ + 2, /* varg */ 0, /* has upvals */ NULL, /* no upvals */ 0, /* has sup protos */ NULL, /* no sub protos */ 1, /* has constants */ - ( &(const bvalue[ 2]) { /* constants */ - /* K0 */ be_nested_str(mqtt), - /* K1 */ be_nested_str(_), + ( &(const bvalue[11]) { /* constants */ + /* K0 */ be_nested_str(lazy_init), + /* K1 */ be_nested_str(topics), + /* K2 */ be_nested_str(fulltopic), + /* K3 */ be_nested_str(closure), + /* K4 */ be_nested_str(stop_iteration), + /* K5 */ be_nested_str(mqtt_listener_class), + /* K6 */ be_nested_str(function), + /* K7 */ be_nested_str(tasmota), + /* K8 */ be_nested_str(check_not_method), + /* K9 */ be_nested_str(push), + /* K10 */ be_nested_str(_subscribe), }), &be_const_str_subscribe, &be_const_str_solidified, - ( &(const binstruction[ 7]) { /* code */ - 0xA40A0000, // 0000 IMPORT R2 K0 - 0x8C0C0501, // 0001 GETMET R3 R2 K1 - 0x50140200, // 0002 LDBOOL R5 1 0 - 0x5C180000, // 0003 MOVE R6 R0 - 0x5C1C0200, // 0004 MOVE R7 R1 - 0x7C0C0800, // 0005 CALL R3 4 - 0x80000000, // 0006 RET 0 + ( &(const binstruction[50]) { /* code */ + 0x8C0C0100, // 0000 GETMET R3 R0 K0 + 0x7C0C0200, // 0001 CALL R3 1 + 0x500C0000, // 0002 LDBOOL R3 0 0 + 0x60100010, // 0003 GETGBL R4 G16 + 0x88140101, // 0004 GETMBR R5 R0 K1 + 0x7C100200, // 0005 CALL R4 1 + 0xA802000A, // 0006 EXBLK 0 #0012 + 0x5C140800, // 0007 MOVE R5 R4 + 0x7C140000, // 0008 CALL R5 0 + 0x88180B02, // 0009 GETMBR R6 R5 K2 + 0x1C180C01, // 000A EQ R6 R6 R1 + 0x781A0004, // 000B JMPF R6 #0011 + 0x88180B03, // 000C GETMBR R6 R5 K3 + 0x1C180C02, // 000D EQ R6 R6 R2 + 0x781A0001, // 000E JMPF R6 #0011 + 0xA8040001, // 000F EXBLK 1 1 + 0x80000C00, // 0010 RET 0 + 0x7001FFF4, // 0011 JMP #0007 + 0x58100004, // 0012 LDCONST R4 K4 + 0xAC100200, // 0013 CATCH R4 1 0 + 0xB0080000, // 0014 RAISE 2 R0 R0 + 0x8C100105, // 0015 GETMET R4 R0 K5 + 0x7C100200, // 0016 CALL R4 1 + 0x60140004, // 0017 GETGBL R5 G4 + 0x5C180400, // 0018 MOVE R6 R2 + 0x7C140200, // 0019 CALL R5 1 + 0x1C140B06, // 001A EQ R5 R5 K6 + 0x7816000B, // 001B JMPF R5 #0028 + 0xB8160E00, // 001C GETNGBL R5 K7 + 0x8C140B08, // 001D GETMET R5 R5 K8 + 0x5C1C0400, // 001E MOVE R7 R2 + 0x7C140400, // 001F CALL R5 2 + 0x88140101, // 0020 GETMBR R5 R0 K1 + 0x8C140B09, // 0021 GETMET R5 R5 K9 + 0x5C1C0800, // 0022 MOVE R7 R4 + 0x5C200200, // 0023 MOVE R8 R1 + 0x5C240400, // 0024 MOVE R9 R2 + 0x7C1C0400, // 0025 CALL R7 2 + 0x7C140400, // 0026 CALL R5 2 + 0x70020005, // 0027 JMP #002E + 0x88140101, // 0028 GETMBR R5 R0 K1 + 0x8C140B09, // 0029 GETMET R5 R5 K9 + 0x5C1C0800, // 002A MOVE R7 R4 + 0x5C200200, // 002B MOVE R8 R1 + 0x7C1C0200, // 002C CALL R7 1 + 0x7C140400, // 002D CALL R5 2 + 0x8C14010A, // 002E GETMET R5 R0 K10 + 0x5C1C0200, // 002F MOVE R7 R1 + 0x7C140400, // 0030 CALL R5 2 + 0x80000000, // 0031 RET 0 }) ) ); @@ -451,51 +588,28 @@ be_local_closure(subscribe, /* name */ /******************************************************************** -** Solidified function: unsubscribe +** Solidified class: MQTT ********************************************************************/ -be_local_closure(unsubscribe, /* name */ - be_nested_proto( - 6, /* nstack */ - 1, /* argc */ - 0, /* varg */ - 0, /* has upvals */ - NULL, /* no upvals */ - 0, /* has sup protos */ - NULL, /* no sub protos */ - 1, /* has constants */ - ( &(const bvalue[ 2]) { /* constants */ - /* K0 */ be_nested_str(mqtt), - /* K1 */ be_nested_str(_), - }), - &be_const_str_unsubscribe, - &be_const_str_solidified, - ( &(const binstruction[ 6]) { /* code */ - 0xA4060000, // 0000 IMPORT R1 K0 - 0x8C080301, // 0001 GETMET R2 R1 K1 - 0x50100000, // 0002 LDBOOL R4 0 0 - 0x5C140000, // 0003 MOVE R5 R0 - 0x7C080600, // 0004 CALL R2 3 - 0x80000000, // 0005 RET 0 - }) - ) +extern const bclass be_class_MQTT_ntv; +be_local_class(MQTT, + 1, + &be_class_MQTT_ntv, + be_nested_map(7, + ( (struct bmapnode*) &(const bmapnode[]) { + { be_const_key(mqtt_connect, -1), be_const_closure(MQTT_mqtt_connect_closure) }, + { be_const_key(mqtt_data, -1), be_const_closure(MQTT_mqtt_data_closure) }, + { be_const_key(lazy_init, -1), be_const_closure(MQTT_lazy_init_closure) }, + { be_const_key(unsubscribe, -1), be_const_closure(MQTT_unsubscribe_closure) }, + { be_const_key(topics, 0), be_const_var(0) }, + { be_const_key(mqtt_listener_class, -1), be_const_closure(MQTT_mqtt_listener_class_closure) }, + { be_const_key(subscribe, -1), be_const_closure(MQTT_subscribe_closure) }, + })), + (bstring*) &be_const_str_MQTT ); /*******************************************************************/ - - -extern int be_mqtt_publish(bvm *vm); - -extern void be_mqtt_subscribe(const char* topic); BE_FUNC_CTYPE_DECLARE(be_mqtt_subscribe, "", "s") -extern void be_mqtt_unsubscribe(const char* topic); BE_FUNC_CTYPE_DECLARE(be_mqtt_unsubscribe, "", "s") - -/* @const_object_info_begin -module mqtt (scope: global) { - publish, func(be_mqtt_publish) - subscribe, closure(subscribe_closure) - unsubscribe, closure(unsubscribe_closure) - _, closure(__closure) - _subscribe, ctype_func(be_mqtt_subscribe) - _unsubscribe, ctype_func(be_mqtt_unsubscribe) +void be_load_MQTT_class(bvm *vm) { + be_pushntvclass(vm, &be_class_MQTT); + be_setglobal(vm, "MQTT"); + be_pop(vm, 1); } -@const_object_info_end */ -#include "be_fixed_mqtt.h" diff --git a/lib/libesp32/berry_tasmota/src/be_tasmota_lib.c b/lib/libesp32/berry_tasmota/src/be_tasmota_lib.c index 5dbb2d54c..8542bde76 100644 --- a/lib/libesp32/berry_tasmota/src/be_tasmota_lib.c +++ b/lib/libesp32/berry_tasmota/src/be_tasmota_lib.c @@ -13,7 +13,7 @@ extern const be_ctypes_structure_t be_tasmota_settings_struct; extern int l_getFreeHeap(bvm *vm); extern int l_arch(bvm *vm); -extern int l_publish(bvm *vm); +extern int be_mqtt_publish(bvm *vm); extern int l_publish_result(bvm *vm); extern int l_publish_rule(bvm *vm); extern int l_cmd(bvm *vm); @@ -2562,7 +2562,7 @@ class be_class_tasmota (scope: global, name: Tasmota) { get_free_heap, func(l_getFreeHeap) arch, func(l_arch) - publish, func(l_publish) + publish, func(be_mqtt_publish) publish_result, func(l_publish_result) publish_rule, func(l_publish_rule) _cmd, func(l_cmd) diff --git a/lib/libesp32/berry_tasmota/src/embedded/mqtt.be b/lib/libesp32/berry_tasmota/src/embedded/mqtt.be new file mode 100644 index 000000000..d0a3f146d --- /dev/null +++ b/lib/libesp32/berry_tasmota/src/embedded/mqtt.be @@ -0,0 +1,171 @@ +#- ------------------------------------------------------------ -# +# Module `lv_tasmota` - piggybacks on `lv` to extend it +#- ------------------------------------------------------------ -# + +#- + +import path +path.remove("mqtt.bec") +import solidify +load('mqtt.be') +var f = open("mqtt.c", "w") +solidify.dump(MQTT, false, f) +f.close() +print("Ok") + +-# + +class MQTT_ntv end # for solidification only + + +class MQTT : MQTT_ntv + var topics + + # def init() + # end + def lazy_init() + if self.topics == nil + self.topics = [] + tasmota.add_driver(self) + tasmota.add_rule("Mqtt#Connected", / -> self.mqtt_connect()) # call mqtt_connect() when the connection occurs + end + end + + def mqtt_listener_class() + + class mqtt_listener + var topic # topic as a list of subtopics + var fulltopic # fulltopic as string + var closure + + def init(topic, closure) + import string + self.fulltopic = topic + self.topic = string.split(topic, '/') + self.closure = closure + end + + def tostring() + import string + return string.format("", classname(self), self.fulltopic) + end + + def mqtt_data(topic, idx, payload_s, payload_b) + # check if the topic matches the patter + import string + var topic_elts = string.split(topic, '/') + var topic_sz = size(topic_elts) + var pat = self.topic + var pat_sz = size(pat) + var i = 0 + while i < pat_sz + var pat_elt = pat[i] + + if pat_elt == '#' + # joker, munch whatever is left + # '#' is supposed to be the last character of the topic (we don't check it) + break + elif i >= topic_sz + # the topic is too short - no match + return false + elif pat_elt == '+' + # pass + elif pat_elt != topic_elts[i] + # topic element are different - no match + return false + end + + i += 1 + end + + if i >= pat_sz && pat_sz != topic_sz + # the topic is too long and the pattern did not finish with '#' - no match + return false + end + + var cl = self.closure + var ret = cl(topic, idx, payload_s, payload_b) + if ret == nil ret = true end # return true if the return value is forgotten + return ret + end + end + return mqtt_listener + end + + def subscribe(topic, closure) + self.lazy_init() + var found = false + for m : self.topics + if m.fulltopic == topic && m.closure == closure + return # we have already the subscription, ignore + end + end + var mqtt_listener = self.mqtt_listener_class() + if type(closure) == 'function' + tasmota.check_not_method(closure) + self.topics.push(mqtt_listener(topic, closure)) + else + self.topics.push(mqtt_listener(topic)) + end + self._subscribe(topic) + end + + # if topic == nil, unsuscribe to all + def unsubscribe(topic) + if self.topics == nil return end # nothing to do + + var i = 0 + while i < size(self.topics) + if topic == nil || self.topics[i].fulltopic == topic + if topic == nil self._unsubscribe(self.topics[i].fulltopic) end + self.topics.remove(i) # remove and don't increment + else + i += 1 + end + end + + if topic != nil self._unsubscribe(topic) end + end + + def mqtt_data(topic, idx, payload_s, payload_b) + if self.topics == nil return end + var ret = false + for m : self.topics + if m.closure != nil # if no closure attached, skip it + var res = m.mqtt_data(topic, idx, payload_s, payload_b) # stop propagation? + ret = ret || res + end + end + return ret # return true to stop propagation as a Tasmota cmd + end + + def mqtt_connect() # called when MQTT connects or re-connects + tasmota.log("BRY: mqtt subscribe all registered topics", 3) + for m : self.topics + var fulltopic = m.fulltopic + self._subscribe(fulltopic) + end + return false + end +end + +#- +# example + +import mqtt +def p1(a,b,c) print("mqtt1",a,b,c) end +def p2(a,b,c) print("mqtt2",a,b,c) end +def p3(a,b,c) print("mqtt3",a,b,c) end + +mqtt.subscribe("/a/b", p1) +mqtt.subscribe("/a/b/c", p2) +#mqtt.subscribe("#", p3) + +print(">unsub /a/b") +mqtt.unsubscribe("/a/b") +print(">unsub all") +mqtt.unsubscribe() + + +-# + diff --git a/tasmota/tasmota_xdrv_driver/xdrv_52_3_berry_mqtt.ino b/tasmota/tasmota_xdrv_driver/xdrv_52_3_berry_mqtt.ino index 77525039c..16139a295 100644 --- a/tasmota/tasmota_xdrv_driver/xdrv_52_3_berry_mqtt.ino +++ b/tasmota/tasmota_xdrv_driver/xdrv_52_3_berry_mqtt.ino @@ -25,54 +25,46 @@ // Berry: `tasmota.publish(topic, payload [, retain:bool, start:int, len:int]) -> nil`` // is_method is true if called from Tasmota class, false if called from mqtt module -int32_t be_mqtt_publish(struct bvm *vm, bool is_method) { - int32_t top = be_top(vm); // Get the number of arguments - if (top >= 2+is_method && be_isstring(vm, 1+is_method) && (be_isstring(vm, 2+is_method) || be_isbytes(vm, 2+is_method))) { // 2 mandatory string arguments - bool retain = false; - int32_t payload_start = 0; - int32_t len = -1; // send all of it - if (top >= 3+is_method) { retain = be_tobool(vm, 3+is_method); } - if (top >= 4+is_method) { - payload_start = be_toint(vm, 4+is_method); - if (payload_start < 0) payload_start = 0; - } - if (top >= 5+is_method) { len = be_toint(vm, 5+is_method); } - const char * topic = be_tostring(vm, 1+is_method); - const char * payload = nullptr; - size_t payload_len = 0; - - if (be_isstring(vm, 2+is_method)) { - payload = be_tostring(vm, 2+is_method); - payload_len = strlen(payload); - } else { - payload = (const char *) be_tobytes(vm, 2+is_method, &payload_len); - } - if (!payload) { be_raise(vm, "value_error", "Empty payload"); } - - // adjust start and len - if (payload_start >= payload_len) { len = 0; } // send empty packet - else if (len < 0) { len = payload_len - payload_start; } // send all packet, adjust len - else if (payload_start + len > payload_len) { len = payload_len - payload_start; } // len is too long, adjust - // adjust start - payload = payload + payload_start; - - be_pop(vm, be_top(vm)); // clear stack to avoid any indirect warning message in subsequent calls to Berry - - MqttPublishPayload(topic, payload, len, retain); - - be_return_nil(vm); // Return - } - be_raise(vm, kTypeError, nullptr); -} - extern "C" { - int32_t l_publish(struct bvm *vm); - int32_t l_publish(struct bvm *vm) { - return be_mqtt_publish(vm, true); - } - + int32_t be_mqtt_publish(struct bvm *vm); int32_t be_mqtt_publish(struct bvm *vm) { - return be_mqtt_publish(vm, false); + int32_t top = be_top(vm); // Get the number of arguments + if (top >= 3 && be_isstring(vm, 2) && (be_isstring(vm, 3) || be_isbytes(vm, 3))) { // 2 mandatory string arguments + bool retain = false; + int32_t payload_start = 0; + int32_t len = -1; // send all of it + if (top >= 4) { retain = be_tobool(vm, 4); } + if (top >= 5) { + payload_start = be_toint(vm, 5); + if (payload_start < 0) payload_start = 0; + } + if (top >= 6) { len = be_toint(vm, 6); } + const char * topic = be_tostring(vm, 2); + const char * payload = nullptr; + size_t payload_len = 0; + + if (be_isstring(vm, 3)) { + payload = be_tostring(vm, 3); + payload_len = strlen(payload); + } else { + payload = (const char *) be_tobytes(vm, 3, &payload_len); + } + if (!payload) { be_raise(vm, "value_error", "Empty payload"); } + + // adjust start and len + if (payload_start >= payload_len) { len = 0; } // send empty packet + else if (len < 0) { len = payload_len - payload_start; } // send all packet, adjust len + else if (payload_start + len > payload_len) { len = payload_len - payload_start; } // len is too long, adjust + // adjust start + payload = payload + payload_start; + + be_pop(vm, be_top(vm)); // clear stack to avoid any indirect warning message in subsequent calls to Berry + + MqttPublishPayload(topic, payload, len, retain); + + be_return_nil(vm); // Return + } + be_raise(vm, kTypeError, nullptr); } void be_mqtt_subscribe(const char* topic) { @@ -84,6 +76,10 @@ extern "C" { if (!topic) { return; } MqttUnsubscribe(topic); } + + bbool be_mqtt_connected(void) { + return Mqtt.connected; + } } #endif // USE_BERRY