/*
xdrv_57_tasmesh.ino - Mesh support for Tasmota using ESP-Now
Copyright (C) 2021 Christian Baars, Federico Leoni and Theo Arends
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
/*
--------------------------------------------------------------------------------------------
Version yyyymmdd Action Description
--------------------------------------------------------------------------------------------
0.9.5.1 20210622 integrate Expand number of chunks to satisfy larger MQTT messages
Refactor to latest Tasmota standards
---
0.9.4.1 20210503 integrate Add some minor tweak for channel management by Federico Leoni
---
0.9.0.0 20200927 started From scratch by Christian Baars
*/
#ifdef USE_TASMESH
/*********************************************************************************************\
* Build a mesh of nodes using ESP-Now
* Connect it through an ESP32-broker to WLAN
\*********************************************************************************************/
#define XDRV_57 57
/*********************************************************************************************\
* Callbacks
\*********************************************************************************************/
#ifdef ESP32
void CB_MESHDataSent(const uint8_t *MAC, esp_now_send_status_t sendStatus);
void CB_MESHDataSent(const uint8_t *MAC, esp_now_send_status_t sendStatus) {
char _destMAC[18];
ToHex_P(MAC, 6, _destMAC, 18, ':');
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Sent to %s status %d"), _destMAC, sendStatus);
}
void CB_MESHDataReceived(const uint8_t *MAC, const uint8_t *packet, int len) {
static bool _locked = false;
if (_locked) { return; }
_locked = true;
char _srcMAC[18];
ToHex_P(MAC, 6, _srcMAC, 18, ':');
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Rcvd from %s"), _srcMAC);
mesh_packet_t *_recvPacket = (mesh_packet_t*)packet;
if ((_recvPacket->type == PACKET_TYPE_REGISTER_NODE) || (_recvPacket->type == PACKET_TYPE_REFRESH_NODE)) {
if (MESHcheckPeerList((const uint8_t *)MAC) == false) {
MESHencryptPayload(_recvPacket, 0); //decrypt it and check
if (memcmp(_recvPacket->payload, MESH.broker, 6) == 0) {
MESHaddPeer((uint8_t*)MAC);
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: Rcvd topic %s, payload %*_H"), (char*)_recvPacket->payload + 6, MESH.packetToConsume.front().chunkSize+5, (uint8_t *)&MESH.packetToConsume.front().payload);
for (auto &_peer : MESH.peers) {
if (memcmp(_peer.MAC, _recvPacket->sender, 6) == 0) {
strcpy(_peer.topic, (char*)_recvPacket->payload + 6);
MESHsubscribe((char*)&_peer.topic);
_locked = false;
return;
}
}
} else {
char _cryptMAC[18];
ToHex_P(_recvPacket->payload, 6, _cryptMAC, 18, ':');
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Peer %s denied, wrong MAC %s"), _srcMAC, _cryptMAC);
_locked = false;
return;
}
} else {
if (_recvPacket->type == PACKET_TYPE_REGISTER_NODE) {
MESH.flags.nodeWantsTimeASAP = 1; //this could happen after wake from deepsleep on battery powered device
} else {
MESH.flags.nodeWantsTime = 1;
}
}
}
MESH.lmfap = millis();
if (MESHcheckPeerList(MAC) == true){
MESH.packetToConsume.push(*_recvPacket);
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Packet %d from %s to queue"), MESH.packetToConsume.size(), _srcMAC);
}
_locked = false;
}
#else // ESP8266
void CB_MESHDataSent(uint8_t *MAC, uint8_t sendStatus) {
char _destMAC[18];
ToHex_P(MAC, 6, _destMAC, 18, ':');
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Sent to %s status %d"), _destMAC, sendStatus);
}
void CB_MESHDataReceived(uint8_t *MAC, uint8_t *packet, uint8_t len) {
MESH.lmfap = millis(); //any peer
if (memcmp(MAC, MESH.broker, 6) == 0) {
MESH.lastMessageFromBroker = millis(); //directly from the broker
}
mesh_packet_t *_recvPacket = (mesh_packet_t*)packet;
switch (_recvPacket->type) {
case PACKET_TYPE_TIME:
Rtc.utc_time = _recvPacket->senderTime;
Rtc.user_time_entry = true;
MESH.lastMessageFromBroker = millis();
if (MESH.flags.nodeGotTime == 0) {
RtcSync("Mesh");
TasmotaGlobal.rules_flag.system_boot = 1; // for now we consider the node booted and let trigger system#boot on RULES
}
MESH.flags.nodeGotTime = 1;
//Wifi.retry = 0;
// Response_P(PSTR("{\"%s\":{\"Time\":1}}"), D_CMND_MESH); //got the time, now we can publish some sensor data
// XdrvRulesProcess();
break;
case PACKET_TYPE_PEERLIST:
MESH.packetToConsume.push(*_recvPacket);
return;
break;
default:
// nothing for now;
break;
}
if (memcmp(_recvPacket->receiver, MESH.sendPacket.sender, 6) != 0) { //MESH.sendPacket.sender simply stores the MAC of the node
if (ROLE_NODE_SMALL == MESH.role) {
return; // a 'small node' does not perform mesh functions
}
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Packet to resend ..."));
MESH.packetToResend.push(*_recvPacket);
return;
} else {
if (_recvPacket->type == PACKET_TYPE_WANTTOPIC) {
MESH.flags.brokerNeedsTopic = 1;
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Broker needs topic ..."));
return; //nothing left to be done
}
// for(auto &_message : MESH.packetsAlreadyReceived){
// if(memcmp(_recvPacket,_message,15==0)){
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: Packet already received"));
// return;
// }
// }
// MESH.packetsAlreadyReceived.push_back((mesh_packet_header_t*) _recvPacket);
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Packet to consume ..."));
MESH.packetToConsume.push(*_recvPacket);
}
}
#endif // ESP32
/*********************************************************************************************\
* init driver
\*********************************************************************************************/
void MESHInit(void) {
MESH.interval = MESH_REFRESH;
MESH.role = ROLE_NONE;
MESH.packetsAlreadyReceived.reserve(5);
MESH.peers.reserve(10);
MESH.multiPackets.reserve(2);
MESH.sendPacket.counter = 0;
MESH.sendPacket.chunks = 1;
MESH.sendPacket.chunk = 0;
MESH.sendPacket.type = PACKET_TYPE_TIME;
MESH.sendPacket.TTL = 2;
MESHsetWifi(1); // (Re-)enable wifi as long as Mesh is not enabled
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Initialized"));
}
void MESHdeInit(void) {
#ifdef ESP8266 // only ESP8266, ESP32 as a broker should not use deepsleep
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Stopping"));
// TODO: degister from the broker, so he can stop MQTT-proxy
esp_now_deinit();
#endif // ESP8266
}
/*********************************************************************************************\
* MQTT proxy functions
\*********************************************************************************************/
#ifdef ESP32
/**
* @brief Subscribes as a proxy
*
* @param topic - received from the referring node
*/
void MESHsubscribe(char *topic) {
char stopic[TOPSZ];
GetTopic_P(stopic, CMND, topic, PSTR("#"));
MqttSubscribe(stopic);
}
void MESHunsubscribe(char *topic) {
char stopic[TOPSZ];
GetTopic_P(stopic, CMND, topic, PSTR("#"));
MqttUnsubscribe(stopic);
}
void MESHconnectMQTT(void){
for (auto &_peer : MESH.peers) {
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Reconnect topic %s"), _peer.topic);
if (_peer.topic[0] != 0) {
MESHsubscribe(_peer.topic);
}
}
}
/**
* @brief Intercepts mqtt message, that the broker (ESP32) subscribes to as a proxy for a node.
* Is called from xdrv_02_mqtt.ino. Will send the message in the payload via ESP-NOW.
*
* @param _topic
* @param _data
* @param data_len
* @return true
* @return false
*/
bool MESHinterceptMQTTonBroker(char* _topic, uint8_t* _data, unsigned int data_len) {
if (MESH.role != ROLE_BROKER) { return false; }
char stopic[TOPSZ];
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Intercept topic %s"), _topic);
for (auto &_peer : MESH.peers) {
GetTopic_P(stopic, CMND, _peer.topic, PSTR("")); //cmnd/topic/
if (strlen(_topic) != strlen(_topic)) {
return false; // prevent false result when _topic is the leading substring of stopic
}
if (memcmp(_topic, stopic, strlen(stopic)) == 0) {
MESH.sendPacket.chunkSize = strlen(_topic) +1;
if (MESH.sendPacket.chunkSize + data_len > MESH_PAYLOAD_SIZE) {
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Intercept payload oversized %d"), data_len);
return false;
}
memcpy(MESH.sendPacket.receiver, _peer.MAC, 6);
memcpy(MESH.sendPacket.payload, _topic, MESH.sendPacket.chunkSize);
memcpy(MESH.sendPacket.payload + MESH.sendPacket.chunkSize, _data, data_len);
MESH.sendPacket.chunkSize += data_len;
MESH.sendPacket.chunks = 1;
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Intercept payload '%s'"), MESH.sendPacket.payload);
MESH.sendPacket.type = PACKET_TYPE_MQTT;
MESH.sendPacket.senderTime = Rtc.utc_time;
MESHsendPacket(&MESH.sendPacket);
// int result = esp_now_send(MESH.sendPacket.receiver, (uint8_t *)&MESH.sendPacket, (sizeof(MESH.sendPacket))-(MESH_PAYLOAD_SIZE-MESH.sendPacket.chunkSize));
//send to Node
return true;
}
}
return false;
}
#else // ESP8266
void MESHreceiveMQTT(mesh_packet_t *_packet);
void MESHreceiveMQTT(mesh_packet_t *_packet){
uint32_t _slength = strlen((char*)_packet->payload);
if (_packet->chunks == 1) { //single chunk message
MqttDataHandler((char*)_packet->payload, (uint8_t*)(_packet->payload)+_slength+1, (_packet->chunkSize)-_slength);
} else {
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Multiple chunks %u not supported yet"), _packet->chunks);
// TODO: reconstruct message in buffer or only handle short messages
}
}
#endif // ESP32
bool MESHroleNode(void) {
return (MESH.role > ROLE_BROKER);
}
/**
* @brief Redirects the outgoing mqtt message on the node just before it would have been sent to
* the broker via ESP-NOW
*
* @param _topic
* @param _data
* @param _retained - currently unused
* @return true
* @return false
*/
bool MESHrouteMQTTtoMESH(const char* _topic, char* _data, bool _retained) {
if (!MESHroleNode()) { return false; }
size_t _bytesLeft = strlen(_topic) + strlen(_data) +2;
MESH.sendPacket.counter++;
MESH.sendPacket.chunk = 0;
MESH.sendPacket.chunks = (_bytesLeft / MESH_PAYLOAD_SIZE) +1;
memcpy(MESH.sendPacket.receiver, MESH.broker, 6);
MESH.sendPacket.type = PACKET_TYPE_MQTT;
MESH.sendPacket.chunkSize = MESH_PAYLOAD_SIZE;
MESH.sendPacket.peerIndex = 0;
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Chunks %u, Counter %u"), MESH.sendPacket.chunks, MESH.sendPacket.counter);
size_t _topicSize = strlen(_topic) +1;
size_t _offsetData = 0;
while (_bytesLeft > 0) {
size_t _byteLeftInChunk = MESH_PAYLOAD_SIZE;
// MESH.sendPacket.chunkSize = MESH_PAYLOAD_SIZE;
if (MESH.sendPacket.chunk == 0) {
memcpy(MESH.sendPacket.payload, _topic, _topicSize);
MESH.sendPacket.chunkSize = _topicSize;
MESH.currentTopicSize = MESH.sendPacket.chunkSize;
_bytesLeft -= _topicSize;
_byteLeftInChunk -= _topicSize;
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Topic in payload '%s'"), (char*)MESH.sendPacket.payload);
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: After topic -> chunk:%u, pre-size: %u"),MESH.sendPacket.chunk,MESH.sendPacket.chunkSize);
}
if (_byteLeftInChunk > 0) {
if (_byteLeftInChunk > _bytesLeft) {
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: only last chunk bL:%u bLiC:%u oSD:%u"),_bytesLeft,_byteLeftInChunk,_offsetData);
_byteLeftInChunk = _bytesLeft;
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: only last chunk after correction -> chunk:%u, pre-size: %u"),MESH.sendPacket.chunk,MESH.sendPacket.chunkSize);
}
if (MESH.sendPacket.chunk > 0) { _topicSize = 0; }
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: %u"),_topicSize);
memcpy(MESH.sendPacket.payload + _topicSize, _data + _offsetData, _byteLeftInChunk);
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Data in payload '%s'"), (char*)MESH.sendPacket.payload + _topicSize);
_offsetData += _byteLeftInChunk;
_bytesLeft -= _byteLeftInChunk;
}
MESH.sendPacket.chunkSize += _byteLeftInChunk;
MESH.packetToResend.push(MESH.sendPacket);
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: chunk %u, size %u, payload %*_H"),MESH.sendPacket.chunk,MESH.sendPacket.chunkSize,MESH.sendPacket.chunkSize,(uint8_t*)MESH.sendPacket.payload);
if (MESH.sendPacket.chunk == MESH.sendPacket.chunks) {
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: Too many chunks %u"), MESH.sendPacket.chunk +1);
}
SHOW_FREE_MEM(PSTR("MESHrouteMQTTtoMESH"));
MESH.sendPacket.chunk++;
MESH.sendPacket.chunkSize = 0;
}
return true;
}
/**
* @brief The node sends its mqtt topic to the broker
*
*/
void MESHregisterNode(uint8_t mode){
memcpy(MESH.sendPacket.receiver, MESH.broker, 6); // First 6 bytes -> MAC of broker
strcpy((char*)MESH.sendPacket.payload +6, TasmotaGlobal.mqtt_topic); // Remaining bytes -> topic of node
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Register node with topic '%s'"), (char*)MESH.sendPacket.payload +6);
MESH.sendPacket.TTL = 2;
MESH.sendPacket.chunks = 1;
MESH.sendPacket.chunk = 0;
MESH.sendPacket.chunkSize = strlen(TasmotaGlobal.mqtt_topic) + 1 + 6;
memcpy(MESH.sendPacket.payload, MESH.broker, 6);
MESH.sendPacket.type = (mode == 0) ? PACKET_TYPE_REGISTER_NODE : PACKET_TYPE_REFRESH_NODE;
MESHsendPacket(&MESH.sendPacket);
}
/*********************************************************************************************\
* Generic functions
\*********************************************************************************************/
void MESHstartNode(int32_t _channel, uint8_t _role){ //we need a running broker with a known channel at that moment
#ifdef ESP8266 // for now only ESP8266, might be added for the ESP32 later
MESH.channel = _channel;
WiFi.mode(WIFI_STA);
WiFi.begin("", "", MESH.channel, nullptr, false); //fake connection attempt to set channel
wifi_promiscuous_enable(1);
wifi_set_channel(MESH.channel);
wifi_promiscuous_enable(0);
WiFi.disconnect();
MESHsetWifi(0);
esp_now_deinit(); // in case it was already initialized but disconnected
int init_result = esp_now_init();
if (init_result != 0) {
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Node init failed with error: %d"), init_result);
// try to re-launch wifi
MESH.role = ROLE_NONE;
MESHsetWifi(1);
WifiBegin(3, MESH.channel);
return;
}
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: Node initialized, channel: %u"),wifi_get_channel()); //check if we succesfully set the
Response_P(PSTR("{\"%s\":{\"Node\":1,\"Channel\":%u,\"Role\":%u}}"), D_CMND_MESH, wifi_get_channel(), _role);
XdrvRulesProcess(0);
esp_now_set_self_role(ESP_NOW_ROLE_COMBO);
esp_now_register_send_cb(CB_MESHDataSent);
esp_now_register_recv_cb(CB_MESHDataReceived);
MESHsetKey(MESH.key);
memcpy(MESH.sendPacket.receiver, MESH.broker, 6);
WiFi.macAddress(MESH.sendPacket.sender);
MESHaddPeer(MESH.broker); //must always be peer 0!! -return code -7 for peer list full
MESHcountPeers();
MESH.lastMessageFromBroker = millis(); // Init
MESH.role = (0 == _role) ? ROLE_NODE_SMALL : ROLE_NODE_FULL;
MESHsetSleep();
MESHregisterNode(0);
#endif // ESP8266
}
void MESHstartBroker(void) { // Must be called after WiFi is initialized!! Rule - on system#boot do meshbroker endon
#ifdef ESP32
WiFi.mode(WIFI_AP_STA);
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Broker MAC %s"), WiFi.softAPmacAddress().c_str());
WiFi.softAPmacAddress(MESH.broker); //set MESH.broker to the needed MAC
uint32_t _channel = WiFi.channel();
esp_now_deinit(); // in case it was already initialized by disconnected
esp_err_t init_result = esp_now_init();
if (esp_err_t() != ESP_OK) {
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Broker init failed with error: %s"), init_result);
return;
}
Response_P(PSTR("{\"%s\":{\"Broker\":1,\"Channel\":%u}}"), D_CMND_MESH, _channel);
XdrvRulesProcess(0);
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: Broker initialized on channel %u"), _channel);
esp_now_register_send_cb(CB_MESHDataSent);
esp_now_register_recv_cb(CB_MESHDataReceived);
MESHsetKey(MESH.key);
MESHcountPeers();
memcpy(MESH.sendPacket.sender, MESH.broker, 6);
MESH.role = ROLE_BROKER;
MESHsetSleep();
#endif // ESP32
}
/*********************************************************************************************\
* Main loops
\*********************************************************************************************/
#ifdef ESP32
void MESHevery50MSecond(void) {
// if (MESH.packetToResend.size() > 0) {
// // pass the packets
// }
if (MESH.packetToConsume.size() > 0) {
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: _ %15_H"), (uint8_t *)&MESH.packetToConsume.front());
for (auto &_headerBytes : MESH.packetsAlreadyReceived) {
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: . %15_H"), (uint8_t *)_headerBytes.raw);
if (memcmp(MESH.packetToConsume.front().sender, _headerBytes.raw, 15) == 0) {
MESH.packetToConsume.pop();
return;
}
}
mesh_first_header_bytes _bytes;
memcpy(_bytes.raw, &MESH.packetToConsume.front(), 15);
MESH.packetsAlreadyReceived.push_back(_bytes);
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: ... %15_H"), (uint8_t *)_bytes.raw);
if (MESH.packetsAlreadyReceived.size() > MESH_MAX_PACKETS) {
MESH.packetsAlreadyReceived.erase(MESH.packetsAlreadyReceived.begin());
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Erase received data"));
}
// do something on the node
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: %30_H), (uint8_t *)&MESH.packetToConsume.front());
#ifdef USE_TASMESH_HEARTBEAT
for (auto &_peer : MESH.peers){
if (memcmp(_peer.MAC, MESH.packetToConsume.front().sender, 6) == 0) {
_peer.lastHeartbeatFromPeer = millis();
if (!_peer.isAlive) {
_peer.isAlive = true;
char stopic[TOPSZ];
GetTopic_P(stopic, TELE, _peer.topic, S_LWT);
MqttPublishPayload(stopic, PSTR(MQTT_LWT_ONLINE));
}
break;
}
}
#endif // USE_TASMESH_HEARTBEAT
MESHencryptPayload(&MESH.packetToConsume.front(), 0);
switch (MESH.packetToConsume.front().type) {
// case PACKET_TYPE_REGISTER_NODE:
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: received topic: %s"), (char*)MESH.packetToConsume.front().payload + 6);
// // AddLog(LOG_LEVEL_INFO, PSTR("MSH: %*_H), MESH.packetToConsume.front().chunkSize+5, (uint8_t *)&MESH.packetToConsume.front().payload);
// for(auto &_peer : MESH.peers){
// if(memcmp(_peer.MAC,MESH.packetToConsume.front().sender,6)==0){
// strcpy(_peer.topic,(char*)MESH.packetToConsume.front().payload+6);
// MESHsubscribe((char*)&_peer.topic);
// }
// }
// break;
case PACKET_TYPE_PEERLIST:
for (uint32_t i = 0; i < MESH.packetToConsume.front().chunkSize; i += 6) {
if (memcmp(MESH.packetToConsume.front().payload +i, MESH.sendPacket.sender, 6) == 0) {
continue; // Do not add myself
}
if (MESHcheckPeerList(MESH.packetToConsume.front().payload +i) == false) {
MESHaddPeer(MESH.packetToConsume.front().payload +i);
}
}
break;
case PACKET_TYPE_MQTT: // Redirected MQTT from node in packet [char* _space_ char*]
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: Received node output '%s'"), (char*)MESH.packetToConsume.front().payload);
if (MESH.packetToConsume.front().chunks > 1) {
bool _foundMultiPacket = false;
for (auto &_packet_combined : MESH.multiPackets) {
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: Append to multipacket"));
if (memcmp(_packet_combined.header.sender, MESH.packetToConsume.front().sender, 12) == 0) {
if (_packet_combined.header.counter == MESH.packetToConsume.front().counter) {
memcpy(_packet_combined.raw + (MESH.packetToConsume.front().chunk * MESH_PAYLOAD_SIZE), MESH.packetToConsume.front().payload, MESH.packetToConsume.front().chunkSize);
bitSet(_packet_combined.receivedChunks, MESH.packetToConsume.front().chunk);
_foundMultiPacket = true;
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: Multipacket rcvd chunk mask 0x%08X"), _packet_combined.receivedChunks);
}
}
uint32_t _temp = (1 << (uint8_t)MESH.packetToConsume.front().chunks) -1; //example: 1+2+4 == (2^3)-1
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: _temp: %u = %u"),_temp,_packet_combined.receivedChunks);
if (_packet_combined.receivedChunks == _temp) {
char * _data = (char*)_packet_combined.raw + strlen((char*)_packet_combined.raw) + 1;
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Publish multipacket"));
MqttPublishPayload((char*)_packet_combined.raw, _data);
MESH.multiPackets.erase(MESH.multiPackets.begin());
break;
}
}
if (!_foundMultiPacket) {
mesh_packet_combined_t _packet;
memcpy(_packet.header.sender, MESH.packetToConsume.front().sender, sizeof(_packet.header));
memcpy(_packet.raw + (MESH.packetToConsume.front().chunk * MESH_PAYLOAD_SIZE), MESH.packetToConsume.front().payload, MESH.packetToConsume.front().chunkSize);
_packet.receivedChunks = 0;
bitSet(_packet.receivedChunks, MESH.packetToConsume.front().chunk);
MESH.multiPackets.push_back(_packet);
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: New multipacket with chunks %u"), _packet.header.chunks);
}
} else {
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: chunk: %u size: %u"), MESH.packetToConsume.front().chunk, MESH.packetToConsume.front().chunkSize);
// if (MESH.packetToConsume.front().chunk==0) AddLog(LOG_LEVEL_INFO, PSTR("MSH: %*_H), MESH.packetToConsume.front().chunkSize, (uint8_t *)&MESH.packetToConsume.front().payload);
char * _data = (char*)MESH.packetToConsume.front().payload + strlen((char*)MESH.packetToConsume.front().payload) +1;
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Publish packet"));
MqttPublishPayload((char*)MESH.packetToConsume.front().payload, _data);
uint32_t idx = 0;
for (auto &_peer : MESH.peers){
if (memcmp(_peer.MAC, MESH.packetToConsume.front().sender, 6) == 0) {
_peer.lastMessageFromPeer = millis();
MESH.lastTeleMsgs[idx] = std::string(_data);
break;
}
idx++;
}
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: %*_H), MESH.packetToConsume.front().chunkSize, (uint8_t *)&MESH.packetToConsume.front().payload);
}
break;
case PACKET_TYPE_HEARTBEAT:
break;
default:
AddLogBuffer(LOG_LEVEL_DEBUG, (uint8_t *)&MESH.packetToConsume.front(), MESH.packetToConsume.front().chunkSize +5);
break;
}
MESH.packetToConsume.pop();
// AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Consumed one packet %u"), (char*)MESH.packetToConsume.size());
}
}
void MESHEverySecond(void) {
static uint32_t _second = 0;
_second++;
// send a time packet every x seconds
if (MESH.flags.nodeWantsTimeASAP) {
MESHsendTime();
MESH.flags.nodeWantsTimeASAP = 0;
return;
}
if (_second % 5 == 0) {
if ((MESH.flags.nodeWantsTime == 1) || (_second % 30 == 0)) { //every 5 seconds on demand or every 30 seconds anyway
MESHsendTime();
MESH.flags.nodeWantsTime = 0;
return;
}
}
uint32_t _peerNumber = _second%45;
if (_peerNumber < MESH.peers.size()) {
if (MESH.peers[_peerNumber].topic[0] == 0) {
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Broker wants topic from peer %u"), _peerNumber);
MESHdemandTopic(_peerNumber);
}
}
if (MESH.multiPackets.size() > 3) {
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Multi packets in buffer %u"), MESH.multiPackets.size());
MESH.multiPackets.erase(MESH.multiPackets.begin());
}
#ifdef USE_TASMESH_HEARTBEAT
for (auto &_peer : MESH.peers){
if (_peer.isAlive && TimePassedSince(_peer.lastHeartbeatFromPeer) > TASMESH_OFFLINE_DELAY * 1000) {
_peer.isAlive = false;
char stopic[TOPSZ];
GetTopic_P(stopic, TELE, _peer.topic, S_LWT);
MqttPublishPayload(stopic, PSTR(MQTT_LWT_OFFLINE));
}
}
#endif // USE_TASMESH_HEARTBEAT
}
#else // ESP8266
void MESHevery50MSecond(void) {
if (ROLE_NONE == MESH.role) { return; }
if (MESH.packetToResend.size() > 0) {
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Next packet %d to resend of type %u, TTL %u"),
MESH.packetToResend.size(), MESH.packetToResend.front().type, MESH.packetToResend.front().TTL);
if (MESH.packetToResend.front().TTL > 0) {
MESH.packetToResend.front().TTL--;
if (memcmp(MESH.packetToResend.front().sender, MESH.broker, 6) != 0) { //do not send back the packet to the broker
MESHsendPacket(&MESH.packetToResend.front());
}
} else {
MESH.packetToResend.pop();
}
// pass the packets
}
if (MESH.packetToConsume.size() > 0) {
MESHencryptPayload(&MESH.packetToConsume.front(), 0);
switch (MESH.packetToConsume.front().type) {
case PACKET_TYPE_MQTT:
if (memcmp(MESH.packetToConsume.front().sender, MESH.sendPacket.sender, 6) == 0) {
//discard echo
break;
}
// AddLog(LOG_LEVEL_INFO, PSTR("MSH: node received topic: %s"), (char*)MESH.packetToConsume.front().payload);
MESHreceiveMQTT(&MESH.packetToConsume.front());
break;
case PACKET_TYPE_PEERLIST:
for (uint32_t i = 0; i < MESH.packetToConsume.front().chunkSize; i += 6) {
if (memcmp(MESH.packetToConsume.front().payload +i, MESH.sendPacket.sender, 6) == 0) {
continue; //do not add myself
}
if (MESHcheckPeerList(MESH.packetToConsume.front().payload +i) == false) {
MESHaddPeer(MESH.packetToConsume.front().payload +i);
}
}
break;
default:
break;
}
MESH.packetToConsume.pop();
}
}
void MESHEverySecond(void) {
if (MESH.role > ROLE_BROKER) {
if (MESH.flags.brokerNeedsTopic == 1) {
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Broker wants topic"));
MESHregisterNode(1); //refresh info
MESH.flags.brokerNeedsTopic = 0;
}
if (millis() - MESH.lastMessageFromBroker > 31000) {
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Broker not seen for >30 secs"));
MESHregisterNode(1); //refresh info
}
if (millis() - MESH.lastMessageFromBroker > 70000) {
AddLog(LOG_LEVEL_DEBUG, PSTR("MSH: Broker not seen for 70 secs, try to re-launch wifi"));
MESH.role = ROLE_NONE;
MESHdeInit(); // if we don't deinit after losing connection, we will get an error trying to reinit later
MESHsetWifi(1);
WifiBegin(3, MESH.channel);
}
#ifdef USE_TASMESH_HEARTBEAT
MESH.sendPacket.counter++;
MESH.sendPacket.TTL = 2;
MESH.sendPacket.chunks = 0;
MESH.sendPacket.chunk = 0;
MESH.sendPacket.chunkSize = 0;
MESH.sendPacket.type = PACKET_TYPE_HEARTBEAT;
MESHsendPacket(&MESH.sendPacket);
#endif // USE_TASMESH_HEARTBEAT
}
}
#endif // ESP8266
/*********************************************************************************************\
* Presentation
\*********************************************************************************************/
void MESHshow(bool json) {
if (json) {
if (ROLE_BROKER == MESH.role) {
ResponseAppend_P(PSTR(",\"MESH\":{\"channel\":%u"), MESH.channel);
ResponseAppend_P(PSTR(",\"nodes\":%u"),MESH.peers.size());
if (MESH.peers.size() > 0) {
ResponseAppend_P(PSTR(",\"MAC\":["));
bool comma = false;
for (auto &_peer : MESH.peers) {
char _MAC[18];
ToHex_P(_peer.MAC, 6, _MAC,18, ':');
ResponseAppend_P(PSTR("%s\"%s\""), (comma)?",":"", _MAC);
comma = true;
}
ResponseAppend_P(PSTR("]"));
}
ResponseJsonEnd();
}
} else {
#ifdef ESP32 //web UI only on the the broker = ESP32
#ifdef USE_WEBSERVER
if (ROLE_BROKER == MESH.role) {
// WSContentSend_PD(PSTR("TAS-MESH:
"));
WSContentSend_PD(PSTR("Broker MAC %s
"), WiFi.softAPmacAddress().c_str());
WSContentSend_PD(PSTR("Broker Channel %u
"), WiFi.channel());
uint32_t idx = 0;
for (auto &_peer : MESH.peers) {
char _MAC[18];
ToHex_P(_peer.MAC, 6, _MAC, 18, ':');
WSContentSend_PD(PSTR("Node MAC %s
"), _MAC);
WSContentSend_PD(PSTR("Node last message %u ms
"), millis() - _peer.lastMessageFromPeer);
WSContentSend_PD(PSTR("Node MQTT topic %s"), _peer.topic);
/*
WSContentSend_PD(PSTR("Node MQTT topic: %s
"), _peer.topic);
if (MESH.lastTeleMsgs.size() > idx) {
char json_buffer[MESH.lastTeleMsgs[idx].length() +1];
strcpy(json_buffer, (char*)MESH.lastTeleMsgs[idx].c_str());
JsonParser parser(json_buffer);
JsonParserObject root = parser.getRootObject();
for (auto key : root) {
JsonParserObject subObj = key.getValue().getObject();
if (subObj) {
WSContentSend_PD(PSTR("%s:"), key.getStr());
for (auto subkey : subObj) {
WSContentSend_PD(PSTR(""), subkey.getStr(), subkey.getValue().getStr());
}
WSContentSend_PD(PSTR("
"));
} else {
WSContentSend_PD(PSTR(""), key.getStr(), key.getValue().getStr());
}
}
// AddLog(LOG_LEVEL_INFO,PSTR("MSH: teleJSON %s"), (char*)MESH.lastTeleMsgs[idx].c_str());
// AddLog(LOG_LEVEL_INFO,PSTR("MSH: stringsize: %u"),MESH.lastTeleMsgs[idx].length());
} else {
// AddLog(LOG_LEVEL_INFO,PSTR("MSH: telemsgSize: %u"),MESH.lastTeleMsgs.size());
}
*/
WSContentSend_PD(PSTR("
"));
idx++;
}
}
#endif // USE_WEBSERVER
#endif // ESP32
}
}
/*********************************************************************************************\
* Commands
\*********************************************************************************************/
const char kMeshCommands[] PROGMEM = "Mesh|" // Prefix
"Broker|Node|Peer|Channel|Interval";
void (* const MeshCommand[])(void) PROGMEM = {
&CmndMeshBroker, &CmndMeshNode, &CmndMeshPeer, &CmndMeshChannel, &CmndMeshInterval };
void CmndMeshBroker(void) {
#ifdef ESP32 // only ESP32 currently supported as broker
MESH.channel = WiFi.channel(); // The Broker gets the channel from the router, no need to declare it with MESHCHANNEL (will be mandatory set it when ETH will be implemented)
MESHstartBroker();
ResponseCmndNumber(MESH.channel);
#endif // ESP32
}
void CmndMeshNode(void) {
#ifndef ESP32 // only ESP8266 current supported as node
if (XdrvMailbox.data_len > 0) {
MESHHexStringToBytes(XdrvMailbox.data, MESH.broker);
if (XdrvMailbox.index != 0) { XdrvMailbox.index = 1; } // Everything not 0 is a full node
// meshnode FA:KE:AD:DR:ES:S1
bool broker = false;
char EspSsid[11];
String mac_address = XdrvMailbox.data;
snprintf_P(EspSsid, sizeof(EspSsid), PSTR("ESP_%s"), mac_address.substring(6).c_str());
int32_t getWiFiChannel(const char *EspSsid);
if (int32_t ch = WiFi.scanNetworks()) {
for (uint8_t i = 0; i < ch; i++) {
if (!strcmp(EspSsid, WiFi.SSID(i).c_str())) {
MESH.channel = WiFi.channel(i);
broker = true;
AddLog(LOG_LEVEL_INFO, PSTR("MSH: Successfully connected to Mesh Broker using MAC %s as %s on channel %d"),
XdrvMailbox.data, EspSsid, MESH.channel);
MESHstartNode(MESH.channel, XdrvMailbox.index);
ResponseCmndNumber(MESH.channel);
}
}
}
if (!broker) {
AddLog(LOG_LEVEL_INFO, PSTR("MSH: No Mesh Broker found using MAC %s"), XdrvMailbox.data);
}
}
#endif // ESP32
}
void CmndMeshPeer(void) {
if (XdrvMailbox.data_len > 0) {
uint8_t _MAC[6];
MESHHexStringToBytes(XdrvMailbox.data, _MAC);
char _peerMAC[18];
ToHex_P(_MAC, 6, _peerMAC, 18, ':');
AddLog(LOG_LEVEL_DEBUG,PSTR("MSH: MAC-string %s (%s)"), XdrvMailbox.data, _peerMAC);
if (MESHcheckPeerList((const uint8_t *)_MAC) == false) {
MESHaddPeer(_MAC);
MESHcountPeers();
ResponseCmndChar(_peerMAC);
} else if (WiFi.macAddress() == String(_peerMAC) || WiFi.softAPmacAddress() == String(_peerMAC)){
// a device can be added as its own peer, but every send will result in a ESP_NOW_SEND_FAIL
AddLog(LOG_LEVEL_DEBUG,PSTR("MSH: device %s cannot be a peer of itself"), XdrvMailbox.data, _peerMAC);
} else {
AddLog(LOG_LEVEL_DEBUG,PSTR("MSH: %s is already on peer list, will not add"), XdrvMailbox.data, _peerMAC);
}
}
}
void CmndMeshChannel(void) {
if ((XdrvMailbox.payload > 0) && (XdrvMailbox.payload < 14)) {
MESH.channel = XdrvMailbox.payload;
}
ResponseCmndNumber(MESH.channel);
}
void CmndMeshInterval(void) {
if ((XdrvMailbox.payload > 1) && (XdrvMailbox.payload < 201)) {
MESH.interval = XdrvMailbox.payload; // 2 to 200 ms
MESHsetSleep();
}
ResponseCmndNumber(MESH.interval);
}
/*********************************************************************************************\
* Interface
\*********************************************************************************************/
bool Xdrv57(uint32_t function) {
bool result = false;
switch (function) {
case FUNC_COMMAND:
result = DecodeCommand(kMeshCommands, MeshCommand);
break;
case FUNC_PRE_INIT:
MESHInit(); // TODO: save state
break;
}
if (MESH.role) {
switch (function) {
case FUNC_LOOP:
static uint32_t mesh_transceive_msecond = 0; // State 50msecond timer
if (TimeReached(mesh_transceive_msecond)) {
SetNextTimeInterval(mesh_transceive_msecond, MESH.interval);
MESHevery50MSecond();
}
break;
case FUNC_EVERY_SECOND:
MESHEverySecond();
break;
#ifdef USE_WEBSERVER
case FUNC_WEB_SENSOR:
MESHshow(0);
break;
#endif
case FUNC_JSON_APPEND:
MESHshow(1);
break;
#ifdef ESP32
case FUNC_MQTT_SUBSCRIBE:
MESHconnectMQTT();
break;
#endif // ESP32
case FUNC_SHOW_SENSOR:
MESHsendPeerList(); // Sync this to the Teleperiod with a delay
break;
#ifdef USE_DEEPSLEEP
case FUNC_SAVE_BEFORE_RESTART:
MESHdeInit();
break;
#endif // USE_DEEPSLEEP
case FUNC_ACTIVE:
result = true;
break;
}
}
return result;
}
#endif // USE_TASMESH