diff --git a/examples/bluetooth/ble_temperature_central.py b/examples/bluetooth/ble_temperature_central.py new file mode 100644 index 0000000000..3d609f13ca --- /dev/null +++ b/examples/bluetooth/ble_temperature_central.py @@ -0,0 +1,222 @@ +# This example finds and connects to a BLE temperature sensor (e.g. the one in ble_temperature.py). + +import bluetooth +import random +import struct +import time +import micropython + +from ble_advertising import decode_services, decode_name + +from micropython import const +_IRQ_CENTRAL_CONNECT = const(1 << 0) +_IRQ_CENTRAL_DISCONNECT = const(1 << 1) +_IRQ_GATTS_WRITE = const(1 << 2) +_IRQ_GATTS_READ_REQUEST = const(1 << 3) +_IRQ_SCAN_RESULT = const(1 << 4) +_IRQ_SCAN_COMPLETE = const(1 << 5) +_IRQ_PERIPHERAL_CONNECT = const(1 << 6) +_IRQ_PERIPHERAL_DISCONNECT = const(1 << 7) +_IRQ_GATTC_SERVICE_RESULT = const(1 << 8) +_IRQ_GATTC_CHARACTERISTIC_RESULT = const(1 << 9) +_IRQ_GATTC_DESCRIPTOR_RESULT = const(1 << 10) +_IRQ_GATTC_READ_RESULT = const(1 << 11) +_IRQ_GATTC_WRITE_STATUS = const(1 << 12) +_IRQ_GATTC_NOTIFY = const(1 << 13) +_IRQ_GATTC_INDICATE = const(1 << 14) +_IRQ_ALL = const(0xffff) + +# org.bluetooth.service.environmental_sensing +_ENV_SENSE_UUID = bluetooth.UUID(0x181A) +# org.bluetooth.characteristic.temperature +_TEMP_UUID = bluetooth.UUID(0x2A6E) +_TEMP_CHAR = (_TEMP_UUID, bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,) +_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),) + +# org.bluetooth.characteristic.gap.appearance.xml +_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768) + +class BLETemperatureCentral: + def __init__(self, ble): + self._ble = ble + self._ble.active(True) + self._ble.irq(handler=self._irq) + + self._reset() + + def _reset(self): + # Cached name and address from a successful scan. + self._name = None + self._addr_type = None + self._addr = None + + # Cached value (if we have one) + self._value = None + + # Callbacks for completion of various operations. + # These reset back to None after being invoked. + self._scan_callback = None + self._conn_callback = None + self._read_callback = None + + # Persistent callback for when new data is notified from the device. + self._notify_callback = None + + # Connected device. + self._conn_handle = None + self._value_handle = None + + def _irq(self, event, data): + if event == _IRQ_SCAN_RESULT: + addr_type, addr, connectable, rssi, adv_data = data + if connectable and _ENV_SENSE_UUID in decode_services(adv_data): + # Found a potential device, remember it and stop scanning. + self._addr_type = addr_type + self._addr = bytes(addr) # Note: The addr buffer is owned by modbluetooth, need to copy it. + self._name = decode_name(adv_data) or '?' + self._ble.gap_scan(None) + + elif event == _IRQ_SCAN_COMPLETE: + if self._scan_callback: + if self._addr: + # Found a device during the scan (and the scan was explicitly stopped). + self._scan_callback(self._addr_type, self._addr, self._name) + self._scan_callback = None + else: + # Scan timed out. + self._scan_callback(None, None, None) + + elif event == _IRQ_PERIPHERAL_CONNECT: + # Connect successful. + conn_handle, addr_type, addr, = data + if addr_type == self._addr_type and addr == self._addr: + self._conn_handle = conn_handle + self._ble.gattc_discover_services(self._conn_handle) + + elif event == _IRQ_PERIPHERAL_DISCONNECT: + # Disconnect (either initiated by us or the remote end). + conn_handle, _, _, = data + if conn_handle == self._conn_handle: + # If it was initiated by us, it'll already be reset. + self._reset() + + elif event == _IRQ_GATTC_SERVICE_RESULT: + # Connected device returned a service. + conn_handle, start_handle, end_handle, uuid = data + if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID: + self._ble.gattc_discover_characteristics(self._conn_handle, start_handle, end_handle) + + elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: + # Connected device returned a characteristic. + conn_handle, def_handle, value_handle, properties, uuid = data + if conn_handle == self._conn_handle and uuid == _TEMP_UUID: + self._value_handle = value_handle + # We've finished connecting and discovering device, fire the connect callback. + if self._conn_callback: + self._conn_callback() + + elif event == _IRQ_GATTC_READ_RESULT: + # A read completed successfully. + conn_handle, value_handle, char_data = data + if conn_handle == self._conn_handle and value_handle == self._value_handle: + self._update_value(char_data) + if self._read_callback: + self._read_callback(self._value) + self._read_callback = None + + elif event == _IRQ_GATTC_NOTIFY: + # The ble_temperature.py demo periodically notifies its value. + conn_handle, value_handle, notify_data = data + if conn_handle == self._conn_handle and value_handle == self._value_handle: + self._update_value(notify_data) + if self._notify_callback: + self._notify_callback(self._value) + + + # Returns true if we've successfully connected and discovered characteristics. + def is_connected(self): + return self._conn_handle is not None and self._value_handle is not None + + # Find a device advertising the environmental sensor service. + def scan(self, callback=None): + self._addr_type = None + self._addr = None + self._scan_callback = callback + self._ble.gap_scan(2000, 30000, 30000) + + # Connect to the specified device (otherwise use cached address from a scan). + def connect(self, addr_type=None, addr=None, callback=None): + self._addr_type = addr_type or self._addr_type + self._addr = addr or self._addr + self._conn_callback = callback + if self._addr_type is None or self._addr is None: + return False + self._ble.gap_connect(self._addr_type, self._addr) + return True + + # Disconnect from current device. + def disconnect(self): + if not self._conn_handle: + return + self._ble.gap_disconnect(self._conn_handle) + self._reset() + + # Issues an (asynchronous) read, will invoke callback with data. + def read(self, callback): + if not self.is_connected(): + return + self._read_callback = callback + self._ble.gattc_read(self._conn_handle, self._value_handle) + + # Sets a callback to be invoked when the device notifies us. + def on_notify(self, callback): + self._notify_callback = callback + + def _update_value(self, data): + # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius. + self._value = struct.unpack('