#!/usr/bin/python """Attempt an automatic analysis of IRremoteESP8266's Raw data output. Makes suggestions on key values and tried to break down the message into likely chunks.""" # # Copyright 2018 David Conran import argparse import sys class RawIRMessage(): """Basic analyse functions & structure for raw IR messages.""" # pylint: disable=too-many-instance-attributes def __init__(self, margin, timings, output=sys.stdout, verbose=True): self.ldr_mark = None self.hdr_mark = None self.hdr_space = None self.bit_mark = None self.zero_space = None self.one_space = None self.gaps = [] self.margin = margin self.marks = [] self.mark_buckets = {} self.spaces = [] self.space_buckets = {} self.output = output self.verbose = verbose self.section_count = 1 self.rawlen = len(timings) if self.rawlen <= 3: raise ValueError("Too few message timings supplied.") self.timings = timings self._generate_timing_candidates() self._calc_values() def _generate_timing_candidates(self): """Determine the likely values from the given data.""" count = 0 for usecs in self.timings: count = count + 1 if count % 2: self.marks.append(usecs) else: self.spaces.append(usecs) self.marks, self.mark_buckets = self.reduce_list(self.marks) self.spaces, self.space_buckets = self.reduce_list(self.spaces) def reduce_list(self, items): """Reduce a list of numbers into buckets that are at least margin apart.""" result = [] last = -1 buckets = {} for item in sorted(items, reverse=True): if last == -1 or item < last - self.margin: result.append(item) last = item buckets[last] = [item] else: buckets[last].append(item) return result, buckets def _usec_compare(self, seen, expected): """Compare two usec values and see if they match within a subtractive margin.""" return expected - self.margin < seen <= expected def _usec_compares(self, usecs, expecteds): """Compare a usec value to a list of values and return True if they are within a subtractive margin.""" for expected in expecteds: if self._usec_compare(usecs, expected): return True return False def display_binary(self, binary_str): """Display common representations of the suppied binary string.""" num = int(binary_str, 2) bits = len(binary_str) rev_binary_str = binary_str[::-1] rev_num = int(rev_binary_str, 2) self.output.write("\n Bits: %d\n" " Hex: %s (MSB first)\n" " %s (LSB first)\n" " Dec: %s (MSB first)\n" " %s (LSB first)\n" " Bin: 0b%s (MSB first)\n" " 0b%s (LSB first)\n" % (bits, ("0x{0:0%dX}" % (bits / 4)).format(num), ("0x{0:0%dX}" % (bits / 4)).format(rev_num), num, rev_num, binary_str, rev_binary_str)) def add_data_code(self, bin_str, name="", footer=True): """Add the common "data" sequence of code to send the bulk of a message.""" # pylint: disable=no-self-use code = [] nbits = len(bin_str) code.append(" // Data Section #%d" % self.section_count) code.append(" // e.g. data = 0x%X, nbits = %d" % (int(bin_str, 2), nbits)) code.append(" sendData(k%sBitMark, k%sOneSpace, k%sBitMark, " "k%sZeroSpace, send_data, %d, true);" % (name, name, name, name, nbits)) code.append(" send_data >>= %d;" % nbits) if footer: code.append(" // Footer") code.append(" mark(k%sBitMark);" % name) return code def add_data_decode_code(self, bin_str, name="", footer=True): """Add the common "data" sequence code to decode the bulk of a message.""" # pylint: disable=no-self-use code = [] nbits = len(bin_str) code.extend([ "", " // Data Section #%d" % self.section_count, " // e.g. data_result.data = 0x%X, nbits = %d" % (int(bin_str, 2), nbits), " data_result = matchData(&(results->rawbuf[offset]), %s," % nbits, " k%sBitMark, k%sOneSpace," % (name, name), " k%sBitMark, k%sZeroSpace);" % (name, name), " offset += data_result.used;", " if (data_result.success == false) return false; // Fail", " data <<= %s; // Make room for the new bits of data." % nbits, " data |= data_result.data;"]) if footer: code.extend([ "", " // Footer", " if (!matchMark(results->rawbuf[offset++], k%sBitMark))" % name, " return false;"]) return code def add_data_byte_code(self, bin_str, name="", ambles=None): """Add the code to send the data from an array.""" # pylint: disable=no-self-use code = [] nbits = len(bin_str) nbytes = nbits / 8 if ambles is None: ambles = {} firstmark = ambles.get("firstmark", 0) firstspace = ambles.get("firstspace", 0) lastmark = ambles.get("lastmark", "k%sBitMark" % name) lastspace = ambles.get("lastspace", "kDefaultMessageGap") code.append( " // Data Section #%d" % self.section_count) if nbits % 8: code.append(" // DANGER: Nr. of bits is not a multiple of 8. " "This section won't work!") code.extend([ " // e.g.", " // bits = %d; bytes = %d;" % (nbits, nbytes), " // *(data + pos) = {0x%s};" % ( ", 0x".join("%02X" % int(bin_str[i:i + 8], 2) for i in range(0, len(bin_str), 8))), " sendGeneric(%s, %s," % (firstmark, firstspace), " k%sBitMark, k%sOneSpace," % (name, name), " k%sBitMark, k%sZeroSpace," % (name, name), " %s, %s," % (lastmark, lastspace), " data + pos, %d, // Bytes" % nbytes, " k%sFreq, true, kNoRepeat, kDutyDefault);" % name, " pos += %d; // Adjust by how many bytes of data we sent" % nbytes]) return code def add_data_byte_decode_code(self, bin_str, name="", ambles=None): """Add the common byte-wise "data" sequence decode code.""" # pylint: disable=no-self-use code = [] nbits = len(bin_str) nbytes = nbits / 8 if nbits % 8: code.append(" // WARNING: Nr. of bits is not a multiple of 8. " "This section won't work!") if ambles is None: ambles = {} firstmark = ambles.get("firstmark", 0) firstspace = ambles.get("firstspace", 0) lastmark = ambles.get("lastmark", "k%sBitMark" % name) lastspace = ambles.get("lastspace", "kDefaultMessageGap") code.extend([ "", " // Data Section #%d" % self.section_count, " // e.g.", " // bits = %d; bytes = %d;" % (nbits, nbytes), " // *(results->state + pos) = {0x%s};" % ( ", 0x".join("%02X" % int(bin_str[i:i + 8], 2) for i in range(0, len(bin_str), 8))), " used = matchGeneric(results->rawbuf + offset, results->state + pos,", " results->rawlen - offset, %d," % nbits, " %s, %s," % (firstmark, firstspace), " k%sBitMark, k%sOneSpace," % (name, name), " k%sBitMark, k%sZeroSpace," % (name, name), " %s, %s, true);" % (lastmark, lastspace), " if (used == 0) return false; // We failed to find any data.", " offset += used; // Adjust for how much of the message we read.", " pos += %d; // Adjust by how many bytes of data we read" % nbytes]) return code def _calc_values(self): """Calculate the values which describe the standard timings for the protocol.""" if self.verbose: self.output.write("Potential Mark Candidates:\n" "%s\n" "Potential Space Candidates:\n" "%s\n" % (str(self.marks), str(self.spaces))) # The bit mark is likely to be the smallest mark. self.bit_mark = self.marks[-1] if len(self.marks) > 2: # Possible leader mark? self.ldr_mark = self.marks[0] self.hdr_mark = self.marks[1] else: # Largest mark is likely the kHdrMark self.hdr_mark = self.marks[0] if self.is_space_encoded() and len(self.spaces) >= 3: if self.verbose and len(self.marks) > 2: self.output.write("DANGER: Unusual number of mark timings!") # We should have 3 space candidates at least. # They should be: zero_space (smallest), one_space, & hdr_space (largest) spaces = list(self.spaces) self.zero_space = spaces.pop() self.one_space = spaces.pop() self.hdr_space = spaces.pop() # Rest are probably message gaps self.gaps = spaces def is_space_encoded(self): """Make an educated guess if the message is space encoded.""" return len(self.spaces) > len(self.marks) def is_ldr_mark(self, usec): """Is usec the leader mark?""" if self.ldr_mark is None: return False return self._usec_compare(usec, self.ldr_mark) def is_hdr_mark(self, usec): """Is usec the header mark?""" return self._usec_compare(usec, self.hdr_mark) def is_hdr_space(self, usec): """Is usec the header space?""" return self._usec_compare(usec, self.hdr_space) def is_bit_mark(self, usec): """Is usec the bit mark?""" return self._usec_compare(usec, self.bit_mark) def is_one_space(self, usec): """Is usec the one space?""" return self._usec_compare(usec, self.one_space) def is_zero_space(self, usec): """Is usec the zero_space?""" return self._usec_compare(usec, self.zero_space) def is_gap(self, usec): """Is usec the a space gap?""" return self._usec_compares(usec, self.gaps) def avg_list(items): """Return the average of a list of numbers.""" if items: return int(sum(items) / len(items)) return 0 def add_bit(so_far, bit, output=sys.stdout): """Add a bit to the end of the bits collected so far.""" if bit == "reset": return "" output.write(str(bit)) # This effectively displays in LSB first order. return so_far + str(bit) # Storing it in MSB first order. def convert_rawdata(data_str): """Parse a C++ rawdata declaration into a list of values.""" start = data_str.find('{') end = data_str.find('}') if end == -1: end = len(data_str) if start > end: raise ValueError("Raw Data not parsible due to parentheses placement.") data_str = data_str[start + 1:end] results = [] for timing in [x.strip() for x in data_str.split(',')]: try: results.append(int(timing)) except ValueError: raise ValueError( "Raw Data contains a non-numeric value of '%s'." % timing) return results def dump_constants(message, defines, name="", output=sys.stdout): """Dump the key constants and generate the C++ #defines.""" ldr_mark = None if message.ldr_mark is not None: ldr_mark = avg_list(message.mark_buckets[message.ldr_mark]) hdr_mark = avg_list(message.mark_buckets[message.hdr_mark]) bit_mark = avg_list(message.mark_buckets[message.bit_mark]) hdr_space = avg_list(message.space_buckets[message.hdr_space]) one_space = avg_list(message.space_buckets[message.one_space]) zero_space = avg_list(message.space_buckets[message.zero_space]) output.write("Guessing key value:\n" "k%sHdrMark = %d\n" "k%sHdrSpace = %d\n" "k%sBitMark = %d\n" "k%sOneSpace = %d\n" "k%sZeroSpace = %d\n" % (name, hdr_mark, name, hdr_space, name, bit_mark, name, one_space, name, zero_space)) defines.append("const uint16_t k%sHdrMark = %d;" % (name, hdr_mark)) defines.append("const uint16_t k%sBitMark = %d;" % (name, bit_mark)) defines.append("const uint16_t k%sHdrSpace = %d;" % (name, hdr_space)) defines.append("const uint16_t k%sOneSpace = %d;" % (name, one_space)) defines.append("const uint16_t k%sZeroSpace = %d;" % (name, zero_space)) if ldr_mark: output.write("k%sLdrMark = %d\n" % (name, ldr_mark)) defines.append("const uint16_t k%sLdrMark = %d;" % (name, ldr_mark)) avg_gaps = [avg_list(message.space_buckets[x]) for x in message.gaps] if len(message.gaps) == 1: output.write("k%sSpaceGap = %d\n" % (name, avg_gaps[0])) defines.append("const uint16_t k%sSpaceGap = %d;" % (name, avg_gaps[0])) else: count = 0 for gap in avg_gaps: # We probably (still) have a gap in the protocol. count = count + 1 output.write("k%sSpaceGap%d = %d\n" % (name, count, gap)) defines.append("const uint16_t k%sSpaceGap%d = %d;" % (name, count, gap)) defines.append("const uint16_t k%sFreq = 38000; " "// Hz. (Guessing the most common frequency.)" % name) def parse_and_report(rawdata_str, margin, gen_code=False, name="", output=sys.stdout): """Analyse the rawdata c++ definition of a IR message.""" defines = [] code = {} code["send"] = [] code["send64+"] = [] code["recv"] = [] code["recv64+"] = [] # Parse the input. rawdata = convert_rawdata(rawdata_str) output.write("Found %d timing entries.\n" % len(rawdata)) message = RawIRMessage(margin, rawdata, output) output.write("\nGuessing encoding type:\n") if message.is_space_encoded(): output.write("Looks like it uses space encoding. Yay!\n\n") dump_constants(message, defines, name, output) else: output.write("Sorry, it looks like it is Mark encoded. " "I can't do that yet. Exiting.\n") sys.exit(1) total_bits = decode_data(message, defines, code, name, output) if gen_code: generate_code(defines, code, total_bits, name, output) def decode_data(message, defines, code, name="", output=sys.stdout): """Decode the data sequence with the given values in mind.""" # pylint: disable=too-many-branches,too-many-statements # Now we have likely candidates for the key values, go through the original # sequence and break it up and indicate accordingly. output.write("\nDecoding protocol based on analysis so far:\n\n") state = "" code_info = {} count = 1 total_bits = "" binary_value = binary64_value = add_bit("", "reset") if name: def_name = name else: def_name = "TBD" code["send"].extend([ "#if SEND_%s" % def_name.upper(), "// Function should be safe up to 64 bits.", "void IRsend::send%s(const uint64_t data, const uint16_t" " nbits, const uint16_t repeat) {" % def_name, " enableIROut(k%sFreq);" % name, " for (uint16_t r = 0; r <= repeat; r++) {", " uint64_t send_data = data;"]) code["send64+"].extend([ "// Args:", "// data: An array of bytes containing the IR command.", "// It is assumed to be in MSB order for this code.\n" "// nbytes: Nr. of bytes of data in the array." " (>=k%sStateLength)" % name, "// repeat: Nr. of times the message is to be repeated.", "//", "// Status: ALPHA / Untested.", "void IRsend::send%s(const uint8_t data[], const uint16_t nbytes," " const uint16_t repeat) {" % def_name, " for (uint16_t r = 0; r <= repeat; r++) {", " uint16_t pos = 0;"]) code["recv"].extend([ "#if DECODE_%s" % def_name.upper(), "// Function should be safe up to 64 bits.", "bool IRrecv::decode%s(decode_results *results, const uint16_t nbits," " const bool strict) {" % def_name, " if (results->rawlen < 2 * nbits + k%sOverhead)" % name, " return false; // Too short a message to match.", " if (strict && nbits != k%sBits)" % name, " return false;", "", " uint16_t offset = kStartOffset;", " uint64_t data = 0;", " match_result_t data_result;"]) code["recv64+"].extend([ "#if DECODE_%s" % def_name.upper(), "// Function should be safe over 64 bits.", "bool IRrecv::decode%s(decode_results *results, const uint16_t nbits," " const bool strict) {" % def_name, " if (results->rawlen < 2 * nbits + k%sOverhead)" % name, " return false; // Too short a message to match.", " if (strict && nbits != k%sBits)" % name, " return false;", "", " uint16_t offset = kStartOffset;", " uint16_t pos = 0;", " uint16_t used = 0;"]) # states are: # HM: Header/Leader mark # HS: Header space # BM: Bit mark # BS: Bit space # GS: Gap space # UNK: Unknown state. for usec in message.timings: # Handle header/leader marks. if ((message.is_hdr_mark(usec) or message.is_ldr_mark(usec)) and count % 2 and not message.is_bit_mark(usec)): state = "HM" if message.is_hdr_mark(usec): mark_type = "H" # Header else: mark_type = "L" # Leader if binary_value: message.display_binary(binary_value) code["send"].extend(message.add_data_code(binary_value, name, False)) code["recv"].extend(message.add_data_decode_code(binary_value, name, False)) message.section_count = message.section_count + 1 code_info["lastmark"] = "k%s%sdrMark" % (name, mark_type) total_bits = total_bits + binary_value code_info["firstmark"] = "k%s%sdrMark" % (name, mark_type) binary_value = add_bit(binary_value, "reset") output.write("k%s%sdrMark+" % (name, mark_type)) code["send"].extend([" // %seader" % mark_type, " mark(k%s%sdrMark);" % (name, mark_type)]) code["recv"].extend([ "", " // %seader" % mark_type, " if (!matchMark(results->rawbuf[offset++], k%s%sdrMark))" % ( name, mark_type), " return false;"]) # Handle header spaces. elif message.is_hdr_space(usec) and not message.is_one_space(usec): if binary64_value: code_info["lastspace"] = "k%sHdrSpace" % name message.section_count = message.section_count - 1 code["send64+"].extend(message.add_data_byte_code(binary64_value, name, code_info)) code["recv64+"].extend(message.add_data_byte_decode_code(binary64_value, name, code_info)) code_info.clear() binary64_value = binary_value message.section_count = message.section_count + 1 if state != "HM": if binary_value: # If we we are in a header and we have data, add it. message.display_binary(binary_value) total_bits = total_bits + binary_value code["send"].extend(message.add_data_code(binary_value, name)) code["recv"].extend(message.add_data_decode_code(binary_value, name)) code_info["lastspace"] = "k%sHdrSpace" % name message.section_count = message.section_count + 1 binary_value = binary64_value = add_bit(binary_value, "reset") output.write("UNEXPECTED->") state = "HS" output.write("k%sHdrSpace+" % name) code["send"].append(" space(k%sHdrSpace);" % name) code["recv"].extend([ " if (!matchSpace(results->rawbuf[offset++], k%sHdrSpace))" % name, " return false;"]) code_info["firstspace"] = "k%sHdrSpace" % name # Handle bit marks. elif message.is_bit_mark(usec) and count % 2: if state not in ("HS", "BS"): output.write("k%sBitMark(UNEXPECTED)" % name) state = "BM" # Handle "zero" spaces elif message.is_zero_space(usec): if state != "BM": output.write("k%sZeroSpace(UNEXPECTED)" % name) state = "BS" binary_value = binary64_value = add_bit(binary_value, 0, output) # Handle "one" spaces elif message.is_one_space(usec): if state != "BM": output.write("k%sOneSpace(UNEXPECTED)" % name) state = "BS" binary_value = binary64_value = add_bit(binary_value, 1, output) elif message.is_gap(usec): if state != "BM": output.write("UNEXPECTED->") output.write("GAP(%d)" % usec) code_info["lastspace"] = "k%sSpaceGap" % name if binary64_value: code["send64+"].extend(message.add_data_byte_code(binary64_value, name, code_info)) code["recv64+"].extend(message.add_data_byte_decode_code(binary64_value, name, code_info)) code_info.clear() if binary_value: message.display_binary(binary_value) code["send"].extend(message.add_data_code(binary_value, name)) code["recv"].extend(message.add_data_decode_code(binary_value, name)) message.section_count = message.section_count + 1 else: code["recv"].extend(["", " // Gap"]) code["send"].extend([" // Gap"]) if state == "BM": code["send"].extend([" mark(k%sBitMark);" % name]) code["recv"].extend([ " if (!matchMark(results->rawbuf[offset++], k%sBitMark))" % name, " return false;"]) code["send"].append(" space(k%sSpaceGap);" % name) code["recv"].extend([ " if (!matchSpace(results->rawbuf[offset++], k%sSpaceGap))" % name, " return false;"]) total_bits = total_bits + binary_value binary_value = binary64_value = add_bit(binary_value, "reset") state = "GS" else: output.write("UNKNOWN(%d)" % usec) state = "UNK" count = count + 1 if binary64_value: code["send64+"].extend(message.add_data_byte_code(binary64_value, name, code_info)) code["recv64+"].extend(message.add_data_byte_decode_code(binary64_value, name, code_info)) code_info.clear() if binary_value: message.display_binary(binary_value) code["send"].extend(message.add_data_code(binary_value, name)) code["recv"].extend(message.add_data_decode_code(binary_value, name)) message.section_count = message.section_count + 1 code["send"].extend([ " space(kDefaultMessageGap); // A 100% made up guess of the gap" " between messages.", " }", "}", "#endif // SEND_%s" % def_name.upper()]) code["send64+"].extend([ " }", "}", "#endif // SEND_%s" % def_name.upper()]) code["recv"].extend([ "", " // Success", " results->decode_type = decode_type_t::%s;" % def_name.upper(), " results->bits = nbits;", " results->value = data;", " results->command = 0;", " results->address = 0;", " return true;", "}", "#endif // DECODE_%s" % def_name.upper()]) code["recv64+"].extend([ "", " // Success", " results->decode_type = decode_type_t::%s;" % def_name.upper(), " results->bits = nbits;", " return true;", "}", "#endif // DECODE_%s" % def_name.upper()]) total_bits = total_bits + binary_value output.write("\nTotal Nr. of suspected bits: %d\n" % len(total_bits)) defines.append("const uint16_t k%sBits = %d;" " // Move to IRremoteESP8266.h" % (name, len(total_bits))) if len(total_bits) > 64: defines.append("const uint16_t k%sStateLength = %d;" " // Move to IRremoteESP8266.h" % (name, len(total_bits) / 8)) defines.append("const uint16_t k%sOverhead = %d;" % (name, message.rawlen - 2 * len(total_bits))) return total_bits def generate_code(defines, code, bits_str, name="", output=sys.stdout): """Output the estimated C++ code to reproduce & decode the IR message.""" if name: def_name = name else: def_name = "TBD" output.write("\nGenerating a VERY rough code outline:\n\n" "// Copyright 2019 David Conran (crankyoldgit)\n" "// Support for %s protocol\n\n" '#include "IRrecv.h"\n' '#include "IRsend.h"\n' '#include "IRutils.h"\n\n' "// WARNING: This probably isn't directly usable." " It's a guide only.\n\n" "// See https://github.com/crankyoldgit/IRremoteESP8266/wiki/" "Adding-support-for-a-new-IR-protocol\n" "// for details of how to include this in the library." "\n" % def_name) for line in defines: output.write("%s\n" % line) if len(bits_str) > 64: # Will it fit in a uint64_t? output.write("// DANGER: More than 64 bits detected. A uint64_t for " "'data' won't work!\n") # Display the "normal" version's send code incase there are some # oddities in it. for line in code["send"]: output.write("%s\n" % line) if len(bits_str) > 64: # Will it fit in a uint64_t? code["send64+"] = [ "", "#if SEND_%s" % def_name.upper(), "// Alternative >64bit function to send %s messages" % def_name.upper(), "// Where data is:", "// uint8_t data[k%sStateLength] = {0x%s};" % ( name, ", 0x".join("%02X" % int(bits_str[i:i + 8], 2) for i in range(0, len(bits_str), 8))), "//"] + code["send64+"] for line in code["send64+"]: output.write("%s\n" % line) output.write("\n") if len(bits_str) > 64: # Will it fit in a uint64_t? output.write("// DANGER: More than 64 bits detected. A uint64_t for " "'data' won't work!\n") # Display the "normal" version's decode code incase there are some # oddities in it. for line in code["recv"]: output.write("%s\n" % line) # Display the > 64bit version's decode code if len(bits_str) > 64: # Is it too big for a uint64_t? output.write("\n// Note: This should be 64+ bit safe.\n") if len(bits_str) % 8: output.write("\n// WARNING: Data is not a multiple of bytes. " "This won't work!\n") for line in code["recv64+"]: output.write("%s\n" % line) def main(): """Parse the commandline arguments and call the method.""" arg_parser = argparse.ArgumentParser( description="Read an IRremoteESP8266 rawData declaration and tries to " "analyse it.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) arg_parser.add_argument( "-g", "--code", action="store_true", default=False, dest="gen_code", help="Generate a C++ code outline to aid making an IRsend function.") arg_parser.add_argument( "-n", "--name", help="Name of the protocol/device to use in code generation. E.g. Onkyo", dest="name", default="") arg_group = arg_parser.add_mutually_exclusive_group(required=True) arg_group.add_argument( "rawdata", help="A rawData line from IRrecvDumpV2. e.g. 'uint16_t rawbuf[37] = {" "7930, 3952, 494, 1482, 520, 1482, 494, 1508, 494, 520, 494, 1482, 494, " "520, 494, 1482, 494, 1482, 494, 3978, 494, 520, 494, 520, 494, 520, " "494, 520, 520, 520, 494, 520, 494, 520, 494, 520, 494};'", nargs="?") arg_group.add_argument( "-f", "--file", help="Read in a rawData line from the file.") arg_parser.add_argument( "-r", "--range", type=int, help="Max number of micro-seconds difference between values to consider" " it the same value.", dest="margin", default=200) arg_group.add_argument( "--stdin", help="Read in a rawData line from STDIN.", action="store_true", default=False) arg_options = arg_parser.parse_args() if arg_options.stdin: data = sys.stdin.read() elif arg_options.file: with open(arg_options.file) as input_file: data = input_file.read() else: data = arg_options.rawdata parse_and_report(data, arg_options.margin, arg_options.gen_code, arg_options.name) if __name__ == '__main__': main()