diff --git a/internal_filesystem/apps/com.micropythonos.scan_bluetooth/META-INF/MANIFEST.JSON b/internal_filesystem/apps/com.micropythonos.scan_bluetooth/META-INF/MANIFEST.JSON index bc61ab72..53754530 100644 --- a/internal_filesystem/apps/com.micropythonos.scan_bluetooth/META-INF/MANIFEST.JSON +++ b/internal_filesystem/apps/com.micropythonos.scan_bluetooth/META-INF/MANIFEST.JSON @@ -6,7 +6,7 @@ "icon_url": "https://apps.micropythonos.com/apps/com.micropythonos.scan_bluetooth/icons/com.micropythonos.scan_bluetooth_0.0.1_64x64.png", "download_url": "https://apps.micropythonos.com/apps/com.micropythonos.scan_bluetooth/mpks/com.micropythonos.scan_bluetooth_0.0.1.mpk", "fullname": "com.micropythonos.scan_bluetooth", -"version": "0.0.1", +"version": "0.1.0", "category": "development", "activities": [ { diff --git a/internal_filesystem/apps/com.micropythonos.scan_bluetooth/assets/scan_bluetooth.py b/internal_filesystem/apps/com.micropythonos.scan_bluetooth/assets/scan_bluetooth.py index 9ac9a1ee..efbf7c4d 100644 --- a/internal_filesystem/apps/com.micropythonos.scan_bluetooth/assets/scan_bluetooth.py +++ b/internal_filesystem/apps/com.micropythonos.scan_bluetooth/assets/scan_bluetooth.py @@ -3,27 +3,32 @@ https://docs.micropython.org/en/latest/library/bluetooth.html """ -import time - try: import bluetooth except ImportError: # Linux test runner may not provide bluetooth module bluetooth = None +import sys + import lvgl as lv from micropython import const -from mpos import Activity +from mpos import Activity, TaskManager -SCAN_DURATION = const(1000) # Duration of each BLE scan in milliseconds -_IRQ_SCAN_RESULT = const(5) +# Scan for 5 seconds, +SCAN_DURATION_MS = const(5000) # Duration of each BLE scan in milliseconds +# with very low interval/window (to maximize detection rate): +INTERVAL_US = const(30000) +WINDOW_US = const(30000) +_IRQ_SCAN_RESULT = const(5) +_IRQ_SCAN_DONE = const(6) # BLE Advertising Data Types (Standardized by Bluetooth SIG) -_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_SHORT_NAME = const(8) +_ADV_TYPE_NAME = const(9) -def decode_field(payload: bytes, adv_type: int) -> list: - results = [] +def decode_name(payload: bytes) -> str | None: i = 0 payload_len = len(payload) while i < payload_len: @@ -31,40 +36,12 @@ def decode_field(payload: bytes, adv_type: int) -> list: if length == 0 or i + length >= payload_len: break field_type = payload[i + 1] - if field_type == adv_type: - results.append(payload[i + 2 : i + length + 1]) + if field_type in (_ADV_TYPE_SHORT_NAME, _ADV_TYPE_NAME): + if new_name := payload[i + 2 : i + length + 1]: + return str(new_name, "utf-8") + else: + print(f"Unsupported: {field_type=} with {length=}") i += length + 1 - return results - - -class BluetoothScanner: - def __init__(self, device_callback): - if bluetooth is None: - raise RuntimeError("Bluetooth module not available") - self.device_callback = device_callback - self.ble = bluetooth.BLE() - self.ble.irq(self.ble_irq_handler) - - def __enter__(self): - print("Activating BLE") - self.ble.active(True) - return self - - def ble_irq_handler(self, event: int, data: tuple) -> None: - if event == _IRQ_SCAN_RESULT: - addr_type, addr, adv_type, rssi, adv_data = data - addr = ":".join(f"{b:02x}" for b in addr) - names = decode_field(adv_data, _ADV_TYPE_NAME) - name = str(names[0], "utf-8") if names else "Unknown" - self.device_callback(addr, rssi, name) - - def scan(self, duration_ms: int): - print(f"BLE scanning for {duration_ms}ms...") - self.ble.gap_scan(duration_ms, 20000, 10000) - - def __exit__(self, exc_type, exc_val, exc_tb): - print("Deactivating BLE") - self.ble.active(False) def set_dynamic_column_widths(table, font=None, padding=8): @@ -85,22 +62,31 @@ def set_cell_value(table, *, row: int, values: tuple): class ScanBluetooth(Activity): - refresh_timer = None - def onCreate(self): - screen = lv.obj() - screen.set_flex_flow(lv.FLEX_FLOW.COLUMN) - screen.set_style_pad_all(0, 0) - screen.set_size(lv.pct(100), lv.pct(100)) + main_content = lv.obj() + main_content.set_flex_flow(lv.FLEX_FLOW.COLUMN) + main_content.set_style_pad_all(0, 0) + main_content.set_size(lv.pct(100), lv.pct(100)) + + info_column = lv.obj(main_content) + info_column.set_flex_flow(lv.FLEX_FLOW.COLUMN) + info_column.set_style_pad_all(1, 1) + info_column.set_size(lv.pct(100), lv.SIZE_CONTENT) + + self.info_label = lv.label(info_column) + self.info_label.set_style_text_font(lv.font_montserrat_14, 0) if bluetooth is None: - label = lv.label(screen) - label.set_text("Bluetooth not available on this platform") - label.center() - self.setContentView(screen) + self.info("Bluetooth not available on this platform") + self.setContentView(main_content) return - self.table = lv.table(screen) + tabel_column = lv.obj(main_content) + tabel_column.set_flex_flow(lv.FLEX_FLOW.COLUMN) + tabel_column.set_style_pad_all(0, 0) + tabel_column.set_size(lv.pct(100), lv.SIZE_CONTENT) + + self.table = lv.table(tabel_column) set_cell_value( self.table, row=0, @@ -108,52 +94,88 @@ def onCreate(self): ) set_dynamic_column_widths(self.table) + self.scan_count = 0 self.mac2column = {} self.mac2counts = {} + self.mac2name = {} - self.scanner_cm = BluetoothScanner(device_callback=self.scan_callback) - self.scanner = self.scanner_cm.__enter__() # Activate BLE + self.ble = bluetooth.BLE() - self.setContentView(screen) + self.setContentView(main_content) - def scan_callback(self, addr, rssi, name): - if not (column_index := self.mac2column.get(addr)): - column_index = len(self.mac2column) + 1 - self.mac2column[addr] = column_index - self.mac2counts[addr] = 1 - else: - self.mac2counts[addr] += 1 + def info(self, text): + print(text) + self.info_label.set_text(text) - set_cell_value( - self.table, - row=column_index, - values=( - str(column_index), - addr, - f"{rssi} dBm", - str(self.mac2counts[addr]), - name, - ), - ) + async def ble_scan(self): + """Check sensor every second""" + while self.scanning: + print(f"async scan for {SCAN_DURATION_MS}ms...") + self.ble.gap_scan(SCAN_DURATION_MS, INTERVAL_US, WINDOW_US, True) + await TaskManager.sleep_ms(SCAN_DURATION_MS + 100) def onResume(self, screen): super().onResume(screen) if bluetooth is None: return - def update(timer): - self.scanner.scan(SCAN_DURATION) - set_dynamic_column_widths(self.table) - time.sleep_ms(SCAN_DURATION + 100) # Wait ? - print(f"Scan complete: {len(self.mac2column)} unique devices") + self.info("Activating Bluetooth...") + self.ble.irq(self.ble_irq_handler) + self.ble.active(True) - self.refresh_timer = lv.timer_create(update, SCAN_DURATION + 1000, None) + self.scanning = True + TaskManager.create_task(self.ble_scan()) def onPause(self, screen): super().onPause(screen) if bluetooth is None: return - self.scanner.__exit__(None, None, None) # Deactivate BLE - if self.refresh_timer: - self.refresh_timer.delete() - self.refresh_timer = None + + self.scanning = False + + self.info("Stop scanning...") + self.ble.gap_scan(None) + self.info("Deactivating BLE...") + self.ble.active(False) + self.info("BLE deactivated") + + def ble_irq_handler(self, event: int, data: tuple) -> None: + try: + if event == _IRQ_SCAN_RESULT: + addr_type, addr, adv_type, rssi, adv_data = data + addr = ":".join(f"{b:02x}" for b in addr) + print(f"{addr=} {rssi=} {len(adv_data)=}") + if name := decode_name(adv_data): + self.mac2name[addr] = name + else: + name = self.mac2name.get(addr, "Unknown") + + if not (column_index := self.mac2column.get(addr)): + column_index = len(self.mac2column) + 1 + self.mac2column[addr] = column_index + self.mac2counts[addr] = 1 + else: + self.mac2counts[addr] += 1 + + set_cell_value( + self.table, + row=column_index, + values=( + str(column_index), + addr, + f"{rssi} dBm", + str(self.mac2counts[addr]), + name, + ), + ) + elif event == _IRQ_SCAN_DONE: + set_dynamic_column_widths(self.table) + self.scan_count += 1 + self.info( + f"{len(self.mac2column)} unique devices (Scan {self.scan_count})" + ) + else: + print(f"Ignored BLE {event=}") + except Exception as e: + sys.print_exception(e) + print(f"Error in BLE IRQ handler {event=}: {e}")