/* support_device_groups.ino - device groups support for Tasmota Copyright (C) 2020 Paul C Diem Device group allow multiple devices to be in a group with power, light brightness, fade and speed settings and other module-specific settings kept in sync across all devices in the group. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #ifdef USE_DEVICE_GROUPS //#define DEVICE_GROUPS_DEBUG extern bool udp_connected; struct device_group_member { struct device_group_member * flink; IPAddress ip_address; uint32_t timeout_time; uint16_t received_sequence; uint16_t acked_sequence; }; struct device_group { uint32_t next_ack_check_time; uint16_t last_full_status_sequence; uint16_t message_length; uint8_t message_header_length; uint8_t initial_status_requests_remaining; bool local; char group_name[TOPSZ]; char message[128]; uint8_t group_member_count; struct device_group_member * device_group_members; }; struct device_group * device_groups; uint16_t outgoing_sequence = 0; bool device_groups_initialized = false; bool device_groups_initialization_failed = false; bool building_status_message = false; bool processing_remote_device_message = false; bool waiting_for_acks; bool udp_was_connected = false; void DeviceGroupsInit(void) { // Initialize the device information for each device group. The group name is the MQTT group topic. device_groups = (struct device_group *)calloc(device_group_count, sizeof(struct device_group)); if (device_groups == nullptr) { AddLog_P2(LOG_LEVEL_ERROR, PSTR("DGR: error allocating %u-element device group array"), device_group_count); device_groups_initialization_failed = true; return; } for (uint32_t device_group_index = 0; device_group_index < device_group_count; device_group_index++) { struct device_group * device_group = &device_groups[device_group_index]; strcpy(device_group->group_name, SettingsText((device_group_index == 0 ? SET_MQTT_GRP_TOPIC : SET_MQTT_GRP_TOPIC2 + device_group_index - 1))); device_group->message_header_length = sprintf_P(device_group->message, PSTR("%s%s HTTP/1.1\n\n"), kDeviceGroupMessage, device_group->group_name); device_group->last_full_status_sequence = -1; } device_groups[0].local = true; // If both in and out shared items masks are 0, assume they're unitialized and initialize them. if (!Settings.device_group_share_in && !Settings.device_group_share_out) { Settings.device_group_share_in = Settings.device_group_share_out = 0xffffffff; } device_groups_initialized = true; } char * IPAddressToString(const IPAddress& ip_address) { static char buffer[16]; sprintf_P(buffer, PSTR("%u.%u.%u.%u"), ip_address[0], ip_address[1], ip_address[2], ip_address[3]); return buffer; } // Return true if we're configured to share the specified item. bool DeviceGroupItemShared(bool incoming, uint8_t item) { uint8_t mask = 0; switch (item) { case DGR_ITEM_LIGHT_BRI: mask = DGR_SHARE_LIGHT_BRI; break; case DGR_ITEM_POWER: mask = DGR_SHARE_POWER; break; case DGR_ITEM_LIGHT_SCHEME: mask = DGR_SHARE_LIGHT_SCHEME; break; case DGR_ITEM_LIGHT_FIXED_COLOR: case DGR_ITEM_LIGHT_CHANNELS: mask = DGR_SHARE_LIGHT_COLOR; break; case DGR_ITEM_LIGHT_FADE: case DGR_ITEM_LIGHT_SPEED: mask = DGR_SHARE_LIGHT_FADE; break; case DGR_ITEM_DIMMER_RANGE: mask = DGR_SHARE_DIMMER_RANGE; break; } return (!mask || ((incoming ? Settings.device_group_share_in : Settings.device_group_share_out) & mask)); } void SendDeviceGroupPacket(IPAddress ip, char * packet, int len, const char * label) { for (int attempt = 1; attempt <= 5; attempt++) { if (PortUdp.beginPacket(ip, 1900)) { PortUdp.write(packet, len); if (PortUdp.endPacket()) return; } delay(10); } AddLog_P2(LOG_LEVEL_ERROR, PSTR("DGR: error sending %s packet"), label); } void _SendDeviceGroupMessage(uint8_t device_group_index, DeviceGroupMessageType message_type, ...) { // If device groups are not enabled, ignore this request. if (!Settings.flag4.device_groups_enabled) return; // If UDP is not set up, ignore this request. if (!udp_connected) return; // If we're currently processing a remote device message, ignore this request. if (processing_remote_device_message) return; // Get a pointer to the device information for this device. device_group * device_group = &device_groups[device_group_index]; // If we're still sending initial status requests, ignore this request. if (device_group->initial_status_requests_remaining) return; // A full status request is a request from a remote device for the status of every item we // control. As long as we're building it, we may as well multicast the status update to all // device group members. char * message_ptr = &device_group->message[device_group->message_header_length]; if (message_type == DGR_MSGTYP_FULL_STATUS) { // Set the flag indicating we're currently building a status message. SendDeviceGroupMessage // will build but not send messages while this flag is set. building_status_message = true; // Call the drivers to build the status update. if (!++outgoing_sequence) outgoing_sequence = 1; #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("Building device group %s full status packet"), device_group->group_name); #endif // DEVICE_GROUPS_DEBUG device_group->message_length = 0; SendDeviceGroupMessage(device_group_index, DGR_MSGTYP_PARTIAL_UPDATE, DGR_ITEM_POWER, power); XdrvMailbox.command_code = DGR_ITEM_STATUS; XdrvCall(FUNC_DEVICE_GROUP_REQUEST); building_status_message = false; // If we have nothing to share with the other members, restore the message sequence and return. if (!device_group->message_length) { if (!--outgoing_sequence) outgoing_sequence = -1; return; } device_group->last_full_status_sequence = outgoing_sequence; // Set the status update flag in the outgoing message. *(message_ptr + 2) |= DGR_FLAG_FULL_STATUS; } else { bool shared; uint8_t item; uint32_t value; uint8_t * value_ptr; va_list ap; #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("Building device group %s packet"), device_group->group_name); #endif // DEVICE_GROUPS_DEBUG uint16_t original_sequence = outgoing_sequence; if (!building_status_message && message_type != DGR_MSGTYP_PARTIAL_UPDATE && !++outgoing_sequence) outgoing_sequence = 1; *message_ptr++ = outgoing_sequence & 0xff; *message_ptr++ = outgoing_sequence >> 8; value = 0; if (message_type == DGR_MSGTYP_UPDATE_MORE_TO_COME) value |= DGR_FLAG_MORE_TO_COME; else if (message_type == DGR_MSGTYP_UPDATE_DIRECT) value |= DGR_FLAG_DIRECT; #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR(">sequence=%u, flags=%u"), outgoing_sequence, value); #endif // DEVICE_GROUPS_DEBUG *message_ptr++ = value & 0xff; *message_ptr++ = value >> 8; char * first_item_ptr = message_ptr; // If we're still building this update or all group members haven't acknowledged the previous // update yet, update the message to include these new updates. First we need to rebuild the // previous update message to remove any items and their values that are included in this new // update. if (device_group->message_length) { uint8_t item_array[32]; int item_index = 0; int kept_item_count = 0; // Build an array of all the items in this new update. va_start(ap, message_type); while ((item = va_arg(ap, int))) { item_array[item_index++] = item; if (item <= DGR_ITEM_MAX_32BIT) va_arg(ap, int); else if (item <= DGR_ITEM_MAX_STRING) va_arg(ap, char *); else { switch (item) { case DGR_ITEM_LIGHT_CHANNELS: va_arg(ap, uint8_t *) ; break; } } } va_end(ap); item_array[item_index] = 0; // Rebuild the previous update message, removing any items their values that are included in // this new update. char * previous_message_ptr = message_ptr; while (item = *previous_message_ptr++) { // Search for this item in the new update. for (item_index = 0; item_array[item_index]; item_index++) { if (item_array[item_index] == item) break; } // If this item was found in the new update skip over it and it's value. if (item_array[item_index]) { if (item <= DGR_ITEM_MAX_32BIT) { previous_message_ptr++; if (item > DGR_ITEM_MAX_8BIT) { previous_message_ptr++; if (item > DGR_ITEM_MAX_16BIT) { previous_message_ptr++; previous_message_ptr++; } } } else if (item <= DGR_ITEM_MAX_STRING) previous_message_ptr += *previous_message_ptr++; else { switch (item) { case DGR_ITEM_LIGHT_CHANNELS: previous_message_ptr += 5; break; } } } // If this item was not found in the new udpate, copy it to the new update message. else { *message_ptr++ = item; if (item <= DGR_ITEM_MAX_32BIT) { *message_ptr++ = *previous_message_ptr++; if (item > DGR_ITEM_MAX_8BIT) { *message_ptr++ = *previous_message_ptr++; if (item > DGR_ITEM_MAX_16BIT) { *message_ptr++ = *previous_message_ptr++; *message_ptr++ = *previous_message_ptr++; } } } else if (item <= DGR_ITEM_MAX_STRING) { *message_ptr++ = value = *previous_message_ptr++; memmove(message_ptr, previous_message_ptr, value); previous_message_ptr += value; message_ptr += value; } else { switch (item) { case DGR_ITEM_LIGHT_CHANNELS: memmove(message_ptr, previous_message_ptr, 5); previous_message_ptr += 5; message_ptr += 5; break; } } kept_item_count++; } } #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("%u items carried over from previous update"), kept_item_count); #endif // DEVICE_GROUPS_DEBUG } // Itertate through the passed items adding them and their values to the message. va_start(ap, message_type); while ((item = va_arg(ap, int))) { shared = DeviceGroupItemShared(false, item); if (shared) *message_ptr++ = item; if (item <= DGR_ITEM_MAX_32BIT) { value = va_arg(ap, int); if (shared) { #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR(">item=%u, value=%u"), item, value); #endif // DEVICE_GROUPS_DEBUG *message_ptr++ = value & 0xff; if (item > DGR_ITEM_MAX_8BIT) { value >>= 8; *message_ptr++ = value & 0xff; } if (item > DGR_ITEM_MAX_16BIT) { value >>= 8; *message_ptr++ = value & 0xff; *message_ptr++ = value >> 8; } } } else if (item <= DGR_ITEM_MAX_STRING) { value_ptr = va_arg(ap, uint8_t *); if (shared) { value = strlen((const char *)value_ptr); #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR(">item=%u, value=%s"), item, value_ptr); #endif // DEVICE_GROUPS_DEBUG *message_ptr++ = value; memcpy(message_ptr, value_ptr, value); message_ptr += value; } } else { switch (item) { case DGR_ITEM_LIGHT_CHANNELS: value_ptr = va_arg(ap, uint8_t *); if (shared) { #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR(">item=%u, value=%u,%u,%u,%u,%u"), item, *value_ptr, *(value_ptr + 1), *(value_ptr + 2), *(value_ptr + 3), *(value_ptr + 4)); #endif // DEVICE_GROUPS_DEBUG memmove(message_ptr, value_ptr, 5); message_ptr += 5; } break; } } } va_end(ap); // If there weren't any items added to the message, restore the outgoing message sequence and // return. if (message_ptr == first_item_ptr) { outgoing_sequence = original_sequence; return; } // Add the EOL item code and calculate the message length. *message_ptr++ = 0; device_group->message_length = message_ptr - device_group->message; // If there's going to be more items added to this message, return. if (building_status_message || message_type == DGR_MSGTYP_PARTIAL_UPDATE) return; } // Multicast the packet. #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("DGR: sending %u-byte device group %s packet via multicast, sequence=%u"), device_group->message_length, device_group->group_name, device_group->message[device_group->message_header_length] | device_group->message[device_group->message_header_length + 1] << 8); #endif // DEVICE_GROUPS_DEBUG SendDeviceGroupPacket(IPAddress(239,255,255,250), device_group->message, device_group->message_length, PSTR("Multicast")); device_group->next_ack_check_time = millis() + 100; if (message_type == DGR_MSGTYP_UPDATE_MORE_TO_COME) { for (struct device_group_member * device_group_member = device_group->device_group_members; device_group_member != nullptr; device_group_member = device_group_member->flink) { device_group_member->acked_sequence = outgoing_sequence; } } else { waiting_for_acks = true; } } void ProcessDeviceGroupMessage(char * packet, int packet_length) { // Make the group name a null-terminated string. char * message_group_name = packet + sizeof(DEVICE_GROUP_MESSAGE) - 1; char * message_ptr = strchr(message_group_name, ' '); if (message_ptr == nullptr) return; *message_ptr = 0; // Search for a device group with the target group name. If one isn't found, return. struct device_group * device_group; uint32_t device_group_index = 0; for (;;) { device_group = &device_groups[device_group_index]; if (!strcmp(message_group_name, device_group->group_name)) break; if (++device_group_index >= device_group_count) return; } *message_ptr++ = ' '; // Find the group member. If this is a new group member, add it. IPAddress remote_ip = PortUdp.remoteIP(); struct device_group_member * * flink = &device_group->device_group_members; struct device_group_member * device_group_member; for (;;) { device_group_member = *flink; if (!device_group_member) { device_group_member = (struct device_group_member *)calloc(1, sizeof(struct device_group_member)); if (device_group_member == nullptr) { AddLog_P2(LOG_LEVEL_ERROR, PSTR("DGR: error allocating device group member block")); return; } #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("DGR: adding member %s (%p)"), IPAddressToString(remote_ip), device_group_member); #endif // DEVICE_GROUPS_DEBUG device_group_member->ip_address = remote_ip; *flink = device_group_member; break; } else if (device_group_member->ip_address == remote_ip) { break; } flink = &device_group_member->flink; } // Find the start of the actual message (after the http header). message_ptr = strstr_P(message_ptr, PSTR("\n\n")); if (message_ptr == nullptr) return; message_ptr += 2; bool light_fade; uint16_t flags; uint16_t item; uint16_t message_sequence; int32_t value; // Get the message sequence and flags. if (packet_length - (message_ptr - packet) < 4) goto badmsg; // Malformed message - must be at least 16-bit sequence, 16-bit flags left message_sequence = *message_ptr++; message_sequence |= *message_ptr++ << 8; flags = *message_ptr++; flags |= *message_ptr++ << 8; #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("Received device group %s packet from %s: sequence=%u, flags=%u"), device_group->group_name, IPAddressToString(remote_ip), message_sequence, flags); #endif // DEVICE_GROUPS_DEBUG // If this is an ack message, save the message sequence if it's newwer than the last ack we // received from this member. if (flags == DGR_FLAG_ACK) { if (message_sequence > device_group_member->acked_sequence || device_group_member->acked_sequence - message_sequence < 64536) { device_group_member->acked_sequence = message_sequence; } device_group_member->timeout_time = 0; #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("acked_sequence != device_group->last_full_status_sequence) { _SendDeviceGroupMessage(device_group_index, DGR_MSGTYP_FULL_STATUS); } #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("received_sequence && device_group_member->received_sequence - message_sequence > 64536) { #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("received_sequence = message_sequence; // Set the flag indicating we're currently processing a remote device message. // SendDeviceGroupMessage will not send messages while this flag is set. processing_remote_device_message = true; /* XdrvMailbox bool grpflg bool usridx uint16_t command_code Item code uint32_t index 0:15 Flags, 16:23 Device group index uint32_t data_len String item value length int32_t payload Integer item value char *topic char *data Pointer to non-integer item value char *command nullptr */ XdrvMailbox.command = nullptr; // Indicates the source is a device group update XdrvMailbox.index = flags | device_group_index << 16; if (flags & (DGR_FLAG_MORE_TO_COME | DGR_FLAG_DIRECT)) skip_light_fade = true; for (;;) { if (packet_length - (message_ptr - packet) < 1) goto badmsg; // Malformed message item = *message_ptr++; if (!item) break; // Done #ifdef DEVICE_GROUPS_DEBUG switch (item) { case DGR_ITEM_LIGHT_FADE: case DGR_ITEM_LIGHT_SPEED: case DGR_ITEM_LIGHT_BRI: case DGR_ITEM_LIGHT_SCHEME: case DGR_ITEM_LIGHT_FIXED_COLOR: case DGR_ITEM_BRI_PRESET_LOW: case DGR_ITEM_BRI_PRESET_HIGH: case DGR_ITEM_BRI_POWER_ON: case DGR_ITEM_POWER: case DGR_ITEM_LIGHT_CHANNELS: case DGR_ITEM_DIMMER_RANGE: break; default: AddLog_P2(LOG_LEVEL_ERROR, PSTR("DGR: ********** invalid item=%u received from device group %s member %s"), item, device_group->group_name, IPAddressToString(remote_ip)); } #endif // DEVICE_GROUPS_DEBUG XdrvMailbox.command_code = item; if (item <= DGR_ITEM_LAST_32BIT) { value = *message_ptr++; if (item > DGR_ITEM_MAX_8BIT) { value |= *message_ptr++ << 8; if (item > DGR_ITEM_MAX_16BIT) { value |= *message_ptr++ << 16; value |= *message_ptr++ << 24; } } #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("= packet_length - (message_ptr - packet)) goto badmsg; // Malformed message #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("message[device_group->message_header_length]; if (!++outgoing_sequence) outgoing_sequence = 1; *message_ptr++ = outgoing_sequence & 0xff; *message_ptr++ = outgoing_sequence >> 8; *message_ptr++ = DGR_FLAG_RESET | DGR_FLAG_STATUS_REQUEST; *message_ptr++ = 0; device_group->message_length = message_ptr - device_group->message; device_group->initial_status_requests_remaining = 5; device_group->next_ack_check_time = millis() + 1000; } waiting_for_acks = true; } if (device_groups_initialization_failed) return; if (waiting_for_acks) { uint32_t now = millis(); waiting_for_acks = false; for (uint32_t device_group_index = 0; device_group_index < device_group_count; device_group_index++) { device_group * device_group = &device_groups[device_group_index]; if (device_group->next_ack_check_time) { if (device_group->next_ack_check_time <= now) { if (device_group->initial_status_requests_remaining) { #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("DGR: sending initial status request for group %s"), device_group->group_name); #endif // DEVICE_GROUPS_DEBUG if (--device_group->initial_status_requests_remaining) { SendDeviceGroupPacket(IPAddress(239,255,255,250), device_group->message, device_group->message_length, PSTR("Initial")); device_group->message[device_group->message_header_length + 2] = DGR_FLAG_STATUS_REQUEST; // The reset flag is on only for the first packet - turn it off now device_group->next_ack_check_time = now + 200; waiting_for_acks = true; } else { device_group->next_ack_check_time = 0; _SendDeviceGroupMessage(device_group_index, DGR_MSGTYP_FULL_STATUS); } } else { bool acked = true; struct device_group_member ** flink = &device_group->device_group_members; struct device_group_member * device_group_member; while ((device_group_member = *flink)) { if (device_group_member->acked_sequence != outgoing_sequence) { if (device_group_member->timeout_time && device_group_member->timeout_time < now) { #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("DGR: removing member %s (%p)"), IPAddressToString(device_group_member->ip_address), device_group_member); #endif // DEVICE_GROUPS_DEBUG *flink = device_group_member->flink; free(device_group_member); } else { #ifdef DEVICE_GROUPS_DEBUG AddLog_P2(LOG_LEVEL_DEBUG, PSTR("DGR: sending %u-byte device group %s packet via unicast to %s, sequence %u, last message acked=%u"), device_group->message_length, device_group->group_name, IPAddressToString(device_group_member->ip_address), outgoing_sequence, device_group_member->acked_sequence); #endif // DEVICE_GROUPS_DEBUG SendDeviceGroupPacket(device_group_member->ip_address, device_group->message, device_group->message_length, PSTR("Unicast")); if (!device_group_member->timeout_time) device_group_member->timeout_time = now + 15000; acked = false; flink = &device_group_member->flink; } } else { flink = &device_group_member->flink; } } if (acked) { device_group->next_ack_check_time = 0; device_group->message_length = 0; } else { device_group->next_ack_check_time = now + 500; waiting_for_acks = true; } } } else { waiting_for_acks = true; } } } } } else { udp_was_connected = false; } } #endif // USE_DEVICE_GROUPS