From c9398d50ee6b19b7a154a62f3f2e1e8ef56c2baf Mon Sep 17 00:00:00 2001 From: James Bowman Date: Wed, 20 Feb 2019 06:46:39 -0800 Subject: [PATCH] initial import of main module --- python/EDS.py | 96 ++++++++++++ python/bargraph.py | 35 +++++ python/ht16k33.py | 22 +++ python/i2cdriver.py | 357 ++++++++++++++++++++++++++++++++++++++++++++ python/lm75b.py | 10 ++ python/setup.py | 10 +- 6 files changed, 528 insertions(+), 2 deletions(-) create mode 100644 python/EDS.py create mode 100644 python/bargraph.py create mode 100644 python/ht16k33.py create mode 100644 python/i2cdriver.py create mode 100644 python/lm75b.py diff --git a/python/EDS.py b/python/EDS.py new file mode 100644 index 0000000..3b68ac1 --- /dev/null +++ b/python/EDS.py @@ -0,0 +1,96 @@ +import struct + +class Dig2: + def __init__(self, i2, a = 0x14): + self.i2 = i2 + self.a = a + + def raw(self, b0, b1): + self.i2.regwr(self.a, 0, b0, b1) + + def hex(self, b): + self.i2.regwr(self.a, 1, b) + + def dec(self, b): + self.i2.regwr(self.a, 2, b) + + def dp(self, p0, p1): + self.i2.regwr(self.a, 3, (p1 << 1) | p0) + + def brightness(self, b): + self.i2.regwr(self.a, 4, b) + +class LED: + def __init__(self, i2, a = 0x08): + self.i2 = i2 + self.a = a + + def rgb(self, r, g, b, t = 0): + if t == 0: + self.i2.start(self.a, 0) + self.i2.write(bytes((0, r, g, b))) + self.i2.stop() + else: + self.i2.start(self.a, 0) + self.i2.write(bytes((1, r, g, b, t))) + self.i2.stop() + + def hex(self, hhh, t = 0): + r = (hhh >> 16) & 0xff + g = (hhh >> 8) & 0xff + b = hhh & 0xff + self.rgb(r, g, b, t) + +class Pot: + def __init__(self, i2, a = 0x28): + self.i2 = i2 + self.a = a + + def read(self): + self.i2.start(self.a, 1) + (r,) = struct.unpack("B", self.i2.read(1)) + self.i2.stop() + return r + + def raw(self): + return self.i2.regrd(self.a, 0, "H") + + def rd(self, r): + return self.i2.regrd(self.a, r) + +class Beep: + def __init__(self, i2, a = 0x30): + self.i2 = i2 + self.a = a + + def beep(self, dur, note): + self.i2.regwr(self.a, dur, note) + +class Remote: + def __init__(self, i2, a = 0x60): + self.i2 = i2 + self.a = a + + def key(self): + while True: + r = self.i2.regrd(self.a, 0) + if r != 0: + return chr(r) + + def raw(self): + r = self.i2.regrd(self.a, 1, "4BH") + if r[:4] != (0xff, 0xff, 0xff, 0xff): + return r + else: + return None + +class Temp: + def __init__(self, i2, a = 0x48): + self.i2 = i2 + self.a = a + + def reg(self, r): + return self.i2.regrd(self.a, r, ">h") + + def read(self): + return (self.reg(0) >> 5) * 0.125 diff --git a/python/bargraph.py b/python/bargraph.py new file mode 100644 index 0000000..b913804 --- /dev/null +++ b/python/bargraph.py @@ -0,0 +1,35 @@ +import sys +import serial +import time +import struct +import random + +from ht16k33 import HT16K33 + +class bargraph(HT16K33): + def image(self, bb): + def swiz(b): + bs = [str((b >> n) & 1) for n in range(8)] + return int(bs[7] + bs[0] + bs[1] + bs[2] + bs[3] + bs[4] + bs[5] + bs[6], 2) + bb = [swiz(b) for b in bb] + self.load([b for s in zip(bb,bb) for b in s]) + + def char(self, c): + n = ord(c) + ch = font[n * 8:n * 8 + 8] + self.image(ch) + + def set(self, pix): + rr = pix + def paint(r, i): + """ Paint pixel i """ + blk = i // 12 + i %= 12 + b = i // 4 + m = 1 << ((i % 4) + 4 * blk) + r[b] |= m + red = [0,0,0] + grn = [0,0,0] + [paint(red, i) for i in range(24) if (pix[i] & 1)] + [paint(grn, i) for i in range(24) if (pix[i] & 2)] + self.load([red[0], grn[0], red[1], grn[1], red[2], grn[2]]) diff --git a/python/ht16k33.py b/python/ht16k33.py new file mode 100644 index 0000000..f7382ac --- /dev/null +++ b/python/ht16k33.py @@ -0,0 +1,22 @@ +class HT16K33: + def __init__(self, i2, a = 0x70): + self.i2 = i2 + self.a = a + self.command(0x21) # Clock on + self.command(0x81) # Display on + self.bright(15) + self.load([0] * 16) + + def bright(self, n): + assert 0 <= n < 16 + self.command(0xe0 + n) + + def command(self, b): + assert(self.i2.start(self.a, 0)) + assert(self.i2.write([b])) + self.i2.stop() + + def load(self, b128): + self.i2.start(self.a, 0) + self.i2.write([0] + b128) + self.i2.stop() diff --git a/python/i2cdriver.py b/python/i2cdriver.py new file mode 100644 index 0000000..4779ace --- /dev/null +++ b/python/i2cdriver.py @@ -0,0 +1,357 @@ +import sys +import serial +import time +import struct +from collections import OrderedDict + +__version__ = '0.0.1' + +PYTHON2 = (sys.version_info < (3, 0)) + +from lm75b import * +import EDS +import bargraph + +class I2CTimeout(Exception): + pass + +class InternalState(OrderedDict): + def __repr__(self): + return "".join(["%8s %4x\n" % (k, v) for (k, v) in self.items()]) + +class START: + pass + +class STOP: + pass + +class I2CDriver: + """ + I2CDriver interface. + + The following variables are available: + + product product code e.g. 'i2cdriver1' + serial serial string of I2CDriver + uptime time since I2CDriver boot, in seconds + voltage USB voltage, in V + current current used by attached device, in mA + temp temperature, in degrees C + scl state of SCL + sda state of SDA + speed current device speed in KHz (100 or 400) + mode IO mode (I2C or bitbang) + pullups programmable pullup enable pins + ccitt_crc CCITT-16 CRC of all transmitted and received bytes + + """ + def __init__(self, port = "/dev/ttyUSB0", reset = True): + self.ser = serial.Serial(port, 1000000, timeout = 1) + + # May be in capture or monitor mode, send char and wait for 50 ms + self.ser.write(b'@') + time.sleep(.050) + + # May be waiting up to 64 bytes of input (command code 0xff) + self.ser.write(b'@' * 64) + self.ser.flush() + + while self.ser.inWaiting(): + self.ser.read(self.ser.inWaiting()) + + for c in [0x55, 0x00, 0xff, 0xaa]: + r = self.__echo(c) + if r != c: + print('Echo test failed - not attached?') + print('Expected %r but received %r' % (c, r)) + raise IOError + self.getstatus() + if reset == "never": + return + if reset or (self.scl, self.sda) != (1, 1): + print('RESET') + if self.reset() != 3: + assert 0 + self.getstatus() + # self.reboot() + self.setspeed(100) + + if PYTHON2: + def __ser_w(self, s): + if isinstance(s, list): + s = "".join([chr(c) for c in s]) + self.ser.write(s) + else: + def __ser_w(self, s): + if isinstance(s, list): + s = bytes(s) + self.ser.write(s) + + def __echo(self, c): + self.__ser_w([ord('e'), c]) + r = self.ser.read(1) + if PYTHON2: + return ord(r[0]) + else: + return r[0] + + def setspeed(self, s): + assert s in (100, 400) + c = {100:b'1', 400:b'4'}[s] + self.__ser_w(c) + + def setpullups(self, s): + assert 0 <= s < 64 + self.__ser_w([ord('u'), s]) + + def start(self, b, rw): + """ start the i2c transaction """ + self.__ser_w([ord('s'), (b << 1) | rw]) + return self.ack() + + def ack(self): + a = ord(self.ser.read(1)) + if a & 2: + raise I2CTimeout + return (a & 1) != 0 + + def stop(self): + """ stop the i2c transaction """ + self.ser.write(b'p') + + def read(self, l): + """ Read l bytes from the I2C device, and NAK the last byte """ + r = [] + if l >= 64: + bulkpart = (l-1) // 64 + for i in range(bulkpart): + self.__ser_w([ord('a'), 64]) + r.append(self.ser.read(64)) + l -= 64 * bulkpart + assert 0 <= l <= 64 + self.__ser_w([0x80 + l - 1]) + r.append(self.ser.read(l)) + return b''.join(r) + + def write(self, bb): + """ Write bb to the I2C device """ + ack = True + for i in range(0, len(bb), 64): + sub = bb[i:i + 64] + self.__ser_w([0xc0 + len(sub) - 1]) + self.__ser_w(sub) + ack = self.ack() + return ack + + def monitor(self, s): + if s: + self.__ser_w(b'm') + time.sleep(.1) + else: + self.__ser_w(b' ') + time.sleep(.1) + self.__echo(0x40) + + def reboot(self): + self.__ser_w(b'_') + time.sleep(.5) + + def reset(self): + self.__ser_w(b'x') + return struct.unpack("B", self.ser.read(1))[0] & 3 + + def regrd(self, dev, reg, fmt = "B"): + if isinstance(fmt, str): + n = struct.calcsize(fmt) + self.__ser_w(b'r' + struct.pack("BBB", dev, reg, n)) + r = struct.unpack(fmt, self.ser.read(n)) + if len(r) == 1: + return r[0] + else: + return r + else: + n = fmt + self.__ser_w(b'r' + struct.pack("BBB", dev, reg, n)) + return self.ser.read(n) + + def regwr(self, dev, reg, *vv): + r = self.start(dev, 0) + if r: + r = self.write(struct.pack("B", reg)) + if r: + r = self.write(vv) + self.stop() + return r + + def getstatus(self): + """ Update all status variables """ + self.ser.write(b'?') + r = self.ser.read(80) + body = r[1:-1].decode() # remove [ and ] + (self.product, + self.serial, + uptime, + voltage, + current, + temp, + mode, + sda, + scl, + speed, + pullups, + ccitt_crc) = body.split() + self.uptime = int(uptime) + self.voltage = float(voltage) + self.current = float(current) + self.temp = float(temp) + self.mode = mode + self.scl = int(scl) + self.sda = int(sda) + self.speed = int(speed) + self.pullups = int(pullups, 16) + self.ccitt_crc = int(ccitt_crc, 16) + return repr(self) + + def introspect(self): + """ Update all status variables """ + self.ser.write(b'J') + r = self.ser.read(80) + assert len(r) == 80, r + body = r[1:-1].decode() # remove [ and ] + nn = ( + "id ds sp SMB0CF SMB0CN T2 T3 IE EIE1 P0 P0MDIN P0MDOUT P1 P1MDIN P1MDOUT P2 P2MDOUT".split() + + "convs".split() + ) + bb = [int(w, 16) for w in body.split()] + assert len(nn) == len(bb) + return InternalState(zip(nn, bb)) + + def restore(self): + self.ser.write(b'i') + + def __repr__(self): + return "<%s serial=%s uptime=%d, SCL=%d, SDA=%d>" % ( + self.product, + self.serial, + self.uptime, + self.scl, + self.sda) + + def scan(self): + for i in range(8, 120): + print("%02x: %s" % (i, ["-", "PRESENT"][self.start(i, 0)])) + self.stop(); + + def capture_start(self): + self.__ser_w([ord('c')]) + def nstream(): + while 1: + for b in self.ser.read(256): + yield (b >> 4) & 0xf + yield b & 0xf + def parser(): + for n in nstream(): + if n == 0: + pass + elif n == 1: + yield START + bits = [] + elif n == 2: + yield STOP + bits = [] + elif n in (8,9,10,11,12,13,14,15): + # w(str(n&7)) + bits.append(n & 7) + if len(bits) == 3: + b9 = (bits[0] << 6) | (bits[1] << 3) | bits[2] + b8 = (b9 >> 1) + ack = b9 & 1 + yield (b8, ack == 0) + bits = [] + else: + assert 0, "unexpected token" + return parser + + def capture_stop(self): + while self.ser.in_waiting: + self.ser.read(self.ser.in_waiting) + self.__ser_w([ord('c')]) + while self.ser.in_waiting: + self.ser.read(self.ser.in_waiting) + self.__echo(0x40) + + def capture(self): + self.__ser_w([ord('c')]) + while 0: + b = self.ser.read(1) + for c in b: + print("%02x" % c) + w = sys.stdout.write + def nstream(): + while 1: + for b in self.ser.read(256): + yield (b >> 4) & 0xf + yield b & 0xf + bits = [] + for n in nstream(): + if n == 0: + w(".") + elif n == 1: + w("S") + bits = [] + elif n == 2: + w("P\n") + bits = [] + elif n in (8,9,10,11,12,13,14,15): + # w(str(n&7)) + bits.append(n & 7) + if len(bits) == 3: + b9 = (bits[0] << 6) | (bits[1] << 3) | bits[2] + b8 = (b9 >> 1) + ack = b9 & 1 + w('%02x%s' % (b8, " !"[ack])) + bits = [] + else: + assert 0, "unexpected token" + +if __name__ == '__main__': + i2 = I2CDriver(sys.argv[1]) + time.sleep(.1) + while 0: + print(i2.start(0x55, 0)) + time.sleep(.5) + print(i2.stop()) + time.sleep(.5) + def report(): + i2.getstatus() + print("scl=%d sda=%d debug=%04x" % (i2.scl, i2.sda, i2.ccitt_crc)) + + i2.scan() + sys.exit(0) + + if 0: + i2.start(0x49, 0) + i2.write([0x07]) + i2.start(0x49, 1) + i2.read(2) + i2.stop() + + print('regwr', i2.regwr(0xe, 0x10, (0x00 << 3) | 1)) + + t0 = time.time() + for i in range(5): + i2.start(0x0e, 0) + i2.write([0x00]) + i2.start(0x0e, 1) + t = time.time() + print(" ".join(["%02x"%x for x in i2.read(16)]), " took %.3f" % (t-t0)) + t0 = t + i2.stop() + + # i2.start(0x76,0) + + while 0: + rr = i2.regrd(0x0e, 0, 16) + t = time.time() + print(" ".join(["%02x"%x for x in rr]), " took %.6f" % (t-t0)) + t0 = t diff --git a/python/lm75b.py b/python/lm75b.py new file mode 100644 index 0000000..0706fcc --- /dev/null +++ b/python/lm75b.py @@ -0,0 +1,10 @@ +class LM75B: + def __init__(self, i2, a = 0x48): + self.i2 = i2 + self.a = a + + def reg(self, r): + return self.i2.regrd(self.a, r, ">h") + + def read(self): + return (self.reg(0) >> 5) * 0.125 diff --git a/python/setup.py b/python/setup.py index 96f23a5..fd863d2 100644 --- a/python/setup.py +++ b/python/setup.py @@ -16,6 +16,12 @@ setup(name='i2cdriver', description='I2CDriver is a desktop I2C interface', long_description=LONG, license='GPL', - py_modules=['i2cdriver'], - install_requires=['pyserial'] + install_requires=['pyserial'], + py_modules = [ + 'i2cdriver', + 'lm75b', + 'EDS', + 'ht16k33', + 'bargraph', + ], )