mirror of https://github.com/arendst/Tasmota.git
149 lines
5.1 KiB
Plaintext
149 lines
5.1 KiB
Plaintext
# Art-Net driver
|
|
|
|
class ArtNet
|
|
var matrix # the led matrix
|
|
var port # port number for listening for incoming packets
|
|
var udp_server # instance of `udp` class
|
|
var universe_start # base universe number
|
|
var universe_end # last universe number allowed (excluded)
|
|
# static var artnet_sig = bytes().fromstring("Art-Net\x00") # 8 bytes # signature of packet
|
|
static var artnet_sig_0 = 0x4172742D # "Art-"
|
|
static var artnet_sig_4 = 0x4E657400 # "Net\x00"
|
|
|
|
var packet # try reusing the same packer bytes() object for performance
|
|
|
|
# local copy of led matrix attributes for faster access
|
|
var alternate # field from matrix, alternate lines (reversed). Contains 0 if not alternate, or the number of bytes per pixel
|
|
|
|
def init(matrix, universe_start, port, ip_addr)
|
|
self.matrix = matrix
|
|
self.alternate = matrix.alternate ? matrix.pixel_size() : 0
|
|
# self.v = self.matrix.v
|
|
if universe_start == nil universe_start = 0 end
|
|
self.universe_start = universe_start
|
|
self.universe_end = universe_start + matrix.h
|
|
|
|
if port == nil port = 6454 end
|
|
self.port = int(port)
|
|
if ip_addr == nil ip_addr = "" end
|
|
|
|
self.packet = bytes() # instanciate a single bytes() buffer that will be used for all received packets
|
|
|
|
self.udp_server = udp()
|
|
self.udp_server.begin(ip_addr, self.port)
|
|
|
|
# register as fast_loop
|
|
tasmota.add_fast_loop(/-> self.fast_loop())
|
|
# set sleep to 5 for a smooth animation
|
|
tasmota.global.sleep = 5
|
|
end
|
|
|
|
def stop()
|
|
import introspect
|
|
# if usd_server has a stop() method, call it
|
|
if introspect.get(self.udp_server, "stop")
|
|
self.udp_server.stop()
|
|
end
|
|
self.matrix.clear()
|
|
end
|
|
|
|
def fast_loop()
|
|
var universe_start = self.universe_start
|
|
var universe_end = self.universe_end
|
|
var artnet_sig_0 = self.artnet_sig_0
|
|
var artnet_sig_4 = self.artnet_sig_4
|
|
var dirty = false
|
|
var packet = self.udp_server.read(self.packet)
|
|
while (packet != nil)
|
|
if size(packet) >= 18 &&
|
|
packet.get(0, -4) == artnet_sig_0 && packet.get(4, -4) == artnet_sig_4
|
|
var opcode = packet.get(8, 2) # should be 0x5000
|
|
var protocol = packet.get(10, -2) # big endian, should be 14
|
|
var universe = packet.get(14, 2)
|
|
|
|
if opcode == 0x5000 && protocol == 14 && universe >= universe_start && universe < universe_end
|
|
# tasmota.log("DMX: received Art-Net packet :" + packet.tohex())
|
|
# var seq = packet.get(12, 1)
|
|
# var phy = packet.get(13, 1)
|
|
var data_len = packet.get(16, -2)
|
|
# data starts at offset 18
|
|
if size(packet) >= 18 + data_len # check size
|
|
if self.alternate > 0 && (universe - self.universe_start) % 2
|
|
packet.reverse(18, self.alternate, -1)
|
|
end
|
|
self.matrix.set_bytes(universe, packet, 18, data_len)
|
|
dirty = true
|
|
end
|
|
# import string
|
|
# tasmota.log(string.format("DMX: opcode=0x%04X protocol=%i seq=%i phy=%i universe=%i data_len=%i data=%s",
|
|
# opcode, protocol, seq, phy, universe, data_len, packet[18..-1].tohex()))
|
|
end
|
|
end
|
|
packet = self.udp_server.read(self.packet)
|
|
if packet == nil
|
|
tasmota.delay_microseconds(20) # wait 20 us just in case
|
|
packet = self.udp_server.read(self.packet)
|
|
end
|
|
end
|
|
|
|
if dirty
|
|
self.matrix.dirty()
|
|
self.matrix.show()
|
|
end
|
|
end
|
|
|
|
static def read_persist()
|
|
import persist
|
|
var conf = dyn()
|
|
|
|
conf.gpio = persist.find("artnet_gpio", 0) # gpio number from template
|
|
conf.rows = persist.find("artnet_rows", 5) # number of rows (min: 1)
|
|
conf.cols = persist.find("artnet_cols", 5) # number of columns (min: 1)
|
|
conf.offs = persist.find("artnet_offs", 0) # offset in the led strip where the matrix starts (min: 0)
|
|
conf.alt = persist.find("artnet_alt", false) # are the rows in alternate directions
|
|
|
|
conf.univ = persist.find("artnet_univ", 0) # start universe
|
|
|
|
# conf.addr = persist.find("artnet_addr", "uni") # listening mode, either 'uni' or 'multi' for multicast
|
|
conf.port = persist.find("artnet_port", 6454) # UDP port number
|
|
|
|
conf.auto = persist.find("artnet_auto", true) # autorun at startup
|
|
return conf
|
|
end
|
|
|
|
static def run_from_conf()
|
|
import persist
|
|
|
|
var conf = ArtNet.read_persist()
|
|
var r = conf.rows
|
|
var c = conf.cols
|
|
|
|
var strip = Leds(r * c, gpio.pin(gpio.WS2812, conf.gpio))
|
|
var matrix = strip.create_matrix(r, c, conf.offs)
|
|
if conf.alt matrix.set_alternate(true)
|
|
end
|
|
var dmx = ArtNet(matrix, conf.univ, conf.port)
|
|
|
|
global._artnet = dmx
|
|
end
|
|
|
|
static def stop_global()
|
|
var dmx = global._artnet
|
|
if type(dmx) == 'instance'
|
|
dmx.stop()
|
|
global._artnet = nil # dereference
|
|
tasmota.gc() # force gc
|
|
end
|
|
end
|
|
end
|
|
|
|
return ArtNet
|
|
|
|
#-
|
|
# Example for M5Stack ATOM Matrix (5x5 matrix without alternate)
|
|
import artnet
|
|
# var artnet = ArtNet
|
|
var strip = Leds(25, gpio.pin(gpio.WS2812, 0))
|
|
var m = strip.create_matrix(5, 5, 0)
|
|
var dmx = artnet(m)
|
|
-# |