initial import of main module
This commit is contained in:
parent
06b3639cb8
commit
c9398d50ee
|
@ -0,0 +1,96 @@
|
|||
import struct
|
||||
|
||||
class Dig2:
|
||||
def __init__(self, i2, a = 0x14):
|
||||
self.i2 = i2
|
||||
self.a = a
|
||||
|
||||
def raw(self, b0, b1):
|
||||
self.i2.regwr(self.a, 0, b0, b1)
|
||||
|
||||
def hex(self, b):
|
||||
self.i2.regwr(self.a, 1, b)
|
||||
|
||||
def dec(self, b):
|
||||
self.i2.regwr(self.a, 2, b)
|
||||
|
||||
def dp(self, p0, p1):
|
||||
self.i2.regwr(self.a, 3, (p1 << 1) | p0)
|
||||
|
||||
def brightness(self, b):
|
||||
self.i2.regwr(self.a, 4, b)
|
||||
|
||||
class LED:
|
||||
def __init__(self, i2, a = 0x08):
|
||||
self.i2 = i2
|
||||
self.a = a
|
||||
|
||||
def rgb(self, r, g, b, t = 0):
|
||||
if t == 0:
|
||||
self.i2.start(self.a, 0)
|
||||
self.i2.write(bytes((0, r, g, b)))
|
||||
self.i2.stop()
|
||||
else:
|
||||
self.i2.start(self.a, 0)
|
||||
self.i2.write(bytes((1, r, g, b, t)))
|
||||
self.i2.stop()
|
||||
|
||||
def hex(self, hhh, t = 0):
|
||||
r = (hhh >> 16) & 0xff
|
||||
g = (hhh >> 8) & 0xff
|
||||
b = hhh & 0xff
|
||||
self.rgb(r, g, b, t)
|
||||
|
||||
class Pot:
|
||||
def __init__(self, i2, a = 0x28):
|
||||
self.i2 = i2
|
||||
self.a = a
|
||||
|
||||
def read(self):
|
||||
self.i2.start(self.a, 1)
|
||||
(r,) = struct.unpack("B", self.i2.read(1))
|
||||
self.i2.stop()
|
||||
return r
|
||||
|
||||
def raw(self):
|
||||
return self.i2.regrd(self.a, 0, "H")
|
||||
|
||||
def rd(self, r):
|
||||
return self.i2.regrd(self.a, r)
|
||||
|
||||
class Beep:
|
||||
def __init__(self, i2, a = 0x30):
|
||||
self.i2 = i2
|
||||
self.a = a
|
||||
|
||||
def beep(self, dur, note):
|
||||
self.i2.regwr(self.a, dur, note)
|
||||
|
||||
class Remote:
|
||||
def __init__(self, i2, a = 0x60):
|
||||
self.i2 = i2
|
||||
self.a = a
|
||||
|
||||
def key(self):
|
||||
while True:
|
||||
r = self.i2.regrd(self.a, 0)
|
||||
if r != 0:
|
||||
return chr(r)
|
||||
|
||||
def raw(self):
|
||||
r = self.i2.regrd(self.a, 1, "4BH")
|
||||
if r[:4] != (0xff, 0xff, 0xff, 0xff):
|
||||
return r
|
||||
else:
|
||||
return None
|
||||
|
||||
class Temp:
|
||||
def __init__(self, i2, a = 0x48):
|
||||
self.i2 = i2
|
||||
self.a = a
|
||||
|
||||
def reg(self, r):
|
||||
return self.i2.regrd(self.a, r, ">h")
|
||||
|
||||
def read(self):
|
||||
return (self.reg(0) >> 5) * 0.125
|
|
@ -0,0 +1,35 @@
|
|||
import sys
|
||||
import serial
|
||||
import time
|
||||
import struct
|
||||
import random
|
||||
|
||||
from ht16k33 import HT16K33
|
||||
|
||||
class bargraph(HT16K33):
|
||||
def image(self, bb):
|
||||
def swiz(b):
|
||||
bs = [str((b >> n) & 1) for n in range(8)]
|
||||
return int(bs[7] + bs[0] + bs[1] + bs[2] + bs[3] + bs[4] + bs[5] + bs[6], 2)
|
||||
bb = [swiz(b) for b in bb]
|
||||
self.load([b for s in zip(bb,bb) for b in s])
|
||||
|
||||
def char(self, c):
|
||||
n = ord(c)
|
||||
ch = font[n * 8:n * 8 + 8]
|
||||
self.image(ch)
|
||||
|
||||
def set(self, pix):
|
||||
rr = pix
|
||||
def paint(r, i):
|
||||
""" Paint pixel i """
|
||||
blk = i // 12
|
||||
i %= 12
|
||||
b = i // 4
|
||||
m = 1 << ((i % 4) + 4 * blk)
|
||||
r[b] |= m
|
||||
red = [0,0,0]
|
||||
grn = [0,0,0]
|
||||
[paint(red, i) for i in range(24) if (pix[i] & 1)]
|
||||
[paint(grn, i) for i in range(24) if (pix[i] & 2)]
|
||||
self.load([red[0], grn[0], red[1], grn[1], red[2], grn[2]])
|
|
@ -0,0 +1,22 @@
|
|||
class HT16K33:
|
||||
def __init__(self, i2, a = 0x70):
|
||||
self.i2 = i2
|
||||
self.a = a
|
||||
self.command(0x21) # Clock on
|
||||
self.command(0x81) # Display on
|
||||
self.bright(15)
|
||||
self.load([0] * 16)
|
||||
|
||||
def bright(self, n):
|
||||
assert 0 <= n < 16
|
||||
self.command(0xe0 + n)
|
||||
|
||||
def command(self, b):
|
||||
assert(self.i2.start(self.a, 0))
|
||||
assert(self.i2.write([b]))
|
||||
self.i2.stop()
|
||||
|
||||
def load(self, b128):
|
||||
self.i2.start(self.a, 0)
|
||||
self.i2.write([0] + b128)
|
||||
self.i2.stop()
|
|
@ -0,0 +1,357 @@
|
|||
import sys
|
||||
import serial
|
||||
import time
|
||||
import struct
|
||||
from collections import OrderedDict
|
||||
|
||||
__version__ = '0.0.1'
|
||||
|
||||
PYTHON2 = (sys.version_info < (3, 0))
|
||||
|
||||
from lm75b import *
|
||||
import EDS
|
||||
import bargraph
|
||||
|
||||
class I2CTimeout(Exception):
|
||||
pass
|
||||
|
||||
class InternalState(OrderedDict):
|
||||
def __repr__(self):
|
||||
return "".join(["%8s %4x\n" % (k, v) for (k, v) in self.items()])
|
||||
|
||||
class START:
|
||||
pass
|
||||
|
||||
class STOP:
|
||||
pass
|
||||
|
||||
class I2CDriver:
|
||||
"""
|
||||
I2CDriver interface.
|
||||
|
||||
The following variables are available:
|
||||
|
||||
product product code e.g. 'i2cdriver1'
|
||||
serial serial string of I2CDriver
|
||||
uptime time since I2CDriver boot, in seconds
|
||||
voltage USB voltage, in V
|
||||
current current used by attached device, in mA
|
||||
temp temperature, in degrees C
|
||||
scl state of SCL
|
||||
sda state of SDA
|
||||
speed current device speed in KHz (100 or 400)
|
||||
mode IO mode (I2C or bitbang)
|
||||
pullups programmable pullup enable pins
|
||||
ccitt_crc CCITT-16 CRC of all transmitted and received bytes
|
||||
|
||||
"""
|
||||
def __init__(self, port = "/dev/ttyUSB0", reset = True):
|
||||
self.ser = serial.Serial(port, 1000000, timeout = 1)
|
||||
|
||||
# May be in capture or monitor mode, send char and wait for 50 ms
|
||||
self.ser.write(b'@')
|
||||
time.sleep(.050)
|
||||
|
||||
# May be waiting up to 64 bytes of input (command code 0xff)
|
||||
self.ser.write(b'@' * 64)
|
||||
self.ser.flush()
|
||||
|
||||
while self.ser.inWaiting():
|
||||
self.ser.read(self.ser.inWaiting())
|
||||
|
||||
for c in [0x55, 0x00, 0xff, 0xaa]:
|
||||
r = self.__echo(c)
|
||||
if r != c:
|
||||
print('Echo test failed - not attached?')
|
||||
print('Expected %r but received %r' % (c, r))
|
||||
raise IOError
|
||||
self.getstatus()
|
||||
if reset == "never":
|
||||
return
|
||||
if reset or (self.scl, self.sda) != (1, 1):
|
||||
print('RESET')
|
||||
if self.reset() != 3:
|
||||
assert 0
|
||||
self.getstatus()
|
||||
# self.reboot()
|
||||
self.setspeed(100)
|
||||
|
||||
if PYTHON2:
|
||||
def __ser_w(self, s):
|
||||
if isinstance(s, list):
|
||||
s = "".join([chr(c) for c in s])
|
||||
self.ser.write(s)
|
||||
else:
|
||||
def __ser_w(self, s):
|
||||
if isinstance(s, list):
|
||||
s = bytes(s)
|
||||
self.ser.write(s)
|
||||
|
||||
def __echo(self, c):
|
||||
self.__ser_w([ord('e'), c])
|
||||
r = self.ser.read(1)
|
||||
if PYTHON2:
|
||||
return ord(r[0])
|
||||
else:
|
||||
return r[0]
|
||||
|
||||
def setspeed(self, s):
|
||||
assert s in (100, 400)
|
||||
c = {100:b'1', 400:b'4'}[s]
|
||||
self.__ser_w(c)
|
||||
|
||||
def setpullups(self, s):
|
||||
assert 0 <= s < 64
|
||||
self.__ser_w([ord('u'), s])
|
||||
|
||||
def start(self, b, rw):
|
||||
""" start the i2c transaction """
|
||||
self.__ser_w([ord('s'), (b << 1) | rw])
|
||||
return self.ack()
|
||||
|
||||
def ack(self):
|
||||
a = ord(self.ser.read(1))
|
||||
if a & 2:
|
||||
raise I2CTimeout
|
||||
return (a & 1) != 0
|
||||
|
||||
def stop(self):
|
||||
""" stop the i2c transaction """
|
||||
self.ser.write(b'p')
|
||||
|
||||
def read(self, l):
|
||||
""" Read l bytes from the I2C device, and NAK the last byte """
|
||||
r = []
|
||||
if l >= 64:
|
||||
bulkpart = (l-1) // 64
|
||||
for i in range(bulkpart):
|
||||
self.__ser_w([ord('a'), 64])
|
||||
r.append(self.ser.read(64))
|
||||
l -= 64 * bulkpart
|
||||
assert 0 <= l <= 64
|
||||
self.__ser_w([0x80 + l - 1])
|
||||
r.append(self.ser.read(l))
|
||||
return b''.join(r)
|
||||
|
||||
def write(self, bb):
|
||||
""" Write bb to the I2C device """
|
||||
ack = True
|
||||
for i in range(0, len(bb), 64):
|
||||
sub = bb[i:i + 64]
|
||||
self.__ser_w([0xc0 + len(sub) - 1])
|
||||
self.__ser_w(sub)
|
||||
ack = self.ack()
|
||||
return ack
|
||||
|
||||
def monitor(self, s):
|
||||
if s:
|
||||
self.__ser_w(b'm')
|
||||
time.sleep(.1)
|
||||
else:
|
||||
self.__ser_w(b' ')
|
||||
time.sleep(.1)
|
||||
self.__echo(0x40)
|
||||
|
||||
def reboot(self):
|
||||
self.__ser_w(b'_')
|
||||
time.sleep(.5)
|
||||
|
||||
def reset(self):
|
||||
self.__ser_w(b'x')
|
||||
return struct.unpack("B", self.ser.read(1))[0] & 3
|
||||
|
||||
def regrd(self, dev, reg, fmt = "B"):
|
||||
if isinstance(fmt, str):
|
||||
n = struct.calcsize(fmt)
|
||||
self.__ser_w(b'r' + struct.pack("BBB", dev, reg, n))
|
||||
r = struct.unpack(fmt, self.ser.read(n))
|
||||
if len(r) == 1:
|
||||
return r[0]
|
||||
else:
|
||||
return r
|
||||
else:
|
||||
n = fmt
|
||||
self.__ser_w(b'r' + struct.pack("BBB", dev, reg, n))
|
||||
return self.ser.read(n)
|
||||
|
||||
def regwr(self, dev, reg, *vv):
|
||||
r = self.start(dev, 0)
|
||||
if r:
|
||||
r = self.write(struct.pack("B", reg))
|
||||
if r:
|
||||
r = self.write(vv)
|
||||
self.stop()
|
||||
return r
|
||||
|
||||
def getstatus(self):
|
||||
""" Update all status variables """
|
||||
self.ser.write(b'?')
|
||||
r = self.ser.read(80)
|
||||
body = r[1:-1].decode() # remove [ and ]
|
||||
(self.product,
|
||||
self.serial,
|
||||
uptime,
|
||||
voltage,
|
||||
current,
|
||||
temp,
|
||||
mode,
|
||||
sda,
|
||||
scl,
|
||||
speed,
|
||||
pullups,
|
||||
ccitt_crc) = body.split()
|
||||
self.uptime = int(uptime)
|
||||
self.voltage = float(voltage)
|
||||
self.current = float(current)
|
||||
self.temp = float(temp)
|
||||
self.mode = mode
|
||||
self.scl = int(scl)
|
||||
self.sda = int(sda)
|
||||
self.speed = int(speed)
|
||||
self.pullups = int(pullups, 16)
|
||||
self.ccitt_crc = int(ccitt_crc, 16)
|
||||
return repr(self)
|
||||
|
||||
def introspect(self):
|
||||
""" Update all status variables """
|
||||
self.ser.write(b'J')
|
||||
r = self.ser.read(80)
|
||||
assert len(r) == 80, r
|
||||
body = r[1:-1].decode() # remove [ and ]
|
||||
nn = (
|
||||
"id ds sp SMB0CF SMB0CN T2 T3 IE EIE1 P0 P0MDIN P0MDOUT P1 P1MDIN P1MDOUT P2 P2MDOUT".split() +
|
||||
"convs".split()
|
||||
)
|
||||
bb = [int(w, 16) for w in body.split()]
|
||||
assert len(nn) == len(bb)
|
||||
return InternalState(zip(nn, bb))
|
||||
|
||||
def restore(self):
|
||||
self.ser.write(b'i')
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s serial=%s uptime=%d, SCL=%d, SDA=%d>" % (
|
||||
self.product,
|
||||
self.serial,
|
||||
self.uptime,
|
||||
self.scl,
|
||||
self.sda)
|
||||
|
||||
def scan(self):
|
||||
for i in range(8, 120):
|
||||
print("%02x: %s" % (i, ["-", "PRESENT"][self.start(i, 0)]))
|
||||
self.stop();
|
||||
|
||||
def capture_start(self):
|
||||
self.__ser_w([ord('c')])
|
||||
def nstream():
|
||||
while 1:
|
||||
for b in self.ser.read(256):
|
||||
yield (b >> 4) & 0xf
|
||||
yield b & 0xf
|
||||
def parser():
|
||||
for n in nstream():
|
||||
if n == 0:
|
||||
pass
|
||||
elif n == 1:
|
||||
yield START
|
||||
bits = []
|
||||
elif n == 2:
|
||||
yield STOP
|
||||
bits = []
|
||||
elif n in (8,9,10,11,12,13,14,15):
|
||||
# w(str(n&7))
|
||||
bits.append(n & 7)
|
||||
if len(bits) == 3:
|
||||
b9 = (bits[0] << 6) | (bits[1] << 3) | bits[2]
|
||||
b8 = (b9 >> 1)
|
||||
ack = b9 & 1
|
||||
yield (b8, ack == 0)
|
||||
bits = []
|
||||
else:
|
||||
assert 0, "unexpected token"
|
||||
return parser
|
||||
|
||||
def capture_stop(self):
|
||||
while self.ser.in_waiting:
|
||||
self.ser.read(self.ser.in_waiting)
|
||||
self.__ser_w([ord('c')])
|
||||
while self.ser.in_waiting:
|
||||
self.ser.read(self.ser.in_waiting)
|
||||
self.__echo(0x40)
|
||||
|
||||
def capture(self):
|
||||
self.__ser_w([ord('c')])
|
||||
while 0:
|
||||
b = self.ser.read(1)
|
||||
for c in b:
|
||||
print("%02x" % c)
|
||||
w = sys.stdout.write
|
||||
def nstream():
|
||||
while 1:
|
||||
for b in self.ser.read(256):
|
||||
yield (b >> 4) & 0xf
|
||||
yield b & 0xf
|
||||
bits = []
|
||||
for n in nstream():
|
||||
if n == 0:
|
||||
w(".")
|
||||
elif n == 1:
|
||||
w("S")
|
||||
bits = []
|
||||
elif n == 2:
|
||||
w("P\n")
|
||||
bits = []
|
||||
elif n in (8,9,10,11,12,13,14,15):
|
||||
# w(str(n&7))
|
||||
bits.append(n & 7)
|
||||
if len(bits) == 3:
|
||||
b9 = (bits[0] << 6) | (bits[1] << 3) | bits[2]
|
||||
b8 = (b9 >> 1)
|
||||
ack = b9 & 1
|
||||
w('%02x%s' % (b8, " !"[ack]))
|
||||
bits = []
|
||||
else:
|
||||
assert 0, "unexpected token"
|
||||
|
||||
if __name__ == '__main__':
|
||||
i2 = I2CDriver(sys.argv[1])
|
||||
time.sleep(.1)
|
||||
while 0:
|
||||
print(i2.start(0x55, 0))
|
||||
time.sleep(.5)
|
||||
print(i2.stop())
|
||||
time.sleep(.5)
|
||||
def report():
|
||||
i2.getstatus()
|
||||
print("scl=%d sda=%d debug=%04x" % (i2.scl, i2.sda, i2.ccitt_crc))
|
||||
|
||||
i2.scan()
|
||||
sys.exit(0)
|
||||
|
||||
if 0:
|
||||
i2.start(0x49, 0)
|
||||
i2.write([0x07])
|
||||
i2.start(0x49, 1)
|
||||
i2.read(2)
|
||||
i2.stop()
|
||||
|
||||
print('regwr', i2.regwr(0xe, 0x10, (0x00 << 3) | 1))
|
||||
|
||||
t0 = time.time()
|
||||
for i in range(5):
|
||||
i2.start(0x0e, 0)
|
||||
i2.write([0x00])
|
||||
i2.start(0x0e, 1)
|
||||
t = time.time()
|
||||
print(" ".join(["%02x"%x for x in i2.read(16)]), " took %.3f" % (t-t0))
|
||||
t0 = t
|
||||
i2.stop()
|
||||
|
||||
# i2.start(0x76,0)
|
||||
|
||||
while 0:
|
||||
rr = i2.regrd(0x0e, 0, 16)
|
||||
t = time.time()
|
||||
print(" ".join(["%02x"%x for x in rr]), " took %.6f" % (t-t0))
|
||||
t0 = t
|
|
@ -0,0 +1,10 @@
|
|||
class LM75B:
|
||||
def __init__(self, i2, a = 0x48):
|
||||
self.i2 = i2
|
||||
self.a = a
|
||||
|
||||
def reg(self, r):
|
||||
return self.i2.regrd(self.a, r, ">h")
|
||||
|
||||
def read(self):
|
||||
return (self.reg(0) >> 5) * 0.125
|
|
@ -16,6 +16,12 @@ setup(name='i2cdriver',
|
|||
description='I2CDriver is a desktop I2C interface',
|
||||
long_description=LONG,
|
||||
license='GPL',
|
||||
py_modules=['i2cdriver'],
|
||||
install_requires=['pyserial']
|
||||
install_requires=['pyserial'],
|
||||
py_modules = [
|
||||
'i2cdriver',
|
||||
'lm75b',
|
||||
'EDS',
|
||||
'ht16k33',
|
||||
'bargraph',
|
||||
],
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue