Skip to content

partector_ble_scanner

PartectorBleScanner

Context-managed BLE scanner for Partector devices.

This scanner runs in the provided asyncio event loop and collects advertisement data from BLE devices named "P2" or "PartectorBT". Decoded advertisement payloads are pushed into an asyncio.Queue for further processing. Can be used with async with for automatic startup and cleanup.

Source code in src/naneos/partector_ble/partector_ble_scanner.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class PartectorBleScanner:
    """
    Context-managed BLE scanner for Partector devices.

    This scanner runs in the provided asyncio event loop and collects advertisement data
    from BLE devices named "P2" or "PartectorBT". Decoded advertisement payloads are
    pushed into an asyncio.Queue for further processing. Can be used with `async with`
    for automatic startup and cleanup.
    """

    SCAN_INTERVAL = 0.8  # seconds
    BLE_NAMES_NANEOS = {"P2", "PartectorBT"}  # P2 on windows, PartectorBT on linux / mac

    # static methods ###############################################################################
    @staticmethod
    def create_scanner_queue() -> asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]]:
        """Create a queue for the scanner."""
        queue_scanner: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]] = asyncio.Queue(
            maxsize=100
        )

        return queue_scanner

    # == Lifecycle and Context Management ==========================================================
    def __init__(
        self,
        loop: asyncio.AbstractEventLoop,
        queue: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]],
    ) -> None:
        """
        Initializes the scanner with the given event loop and queue.

        Args:
            loop (asyncio.AbstractEventLoop): The event loop to run the scanner in.
            queue (asyncio.Queue): The queue to store the scanned data.
        """
        self._loop = loop
        self._queue = queue

        self._task: asyncio.Task | None = None

        self._stop_event = asyncio.Event()
        self._stop_event.set()  # stopped by default

    async def __aenter__(self) -> PartectorBleScanner:
        self.start()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.stop()

    # == Public Methods ============================================================================
    def start(self) -> None:
        """Starts the scanner."""
        if not self._stop_event.is_set():
            logger.warning("You called PartectorBleScanner.start() but scanner is already running.")
            return

        logger.debug("Starting PartectorBleScanner...")
        self._stop_event.clear()
        self._task = self._loop.create_task(self.scan())

    async def stop(self) -> None:
        """Stops the scanner."""
        logger.debug("Stopping PartectorBleScanner...")
        self._stop_event.set()
        if self._task and not self._task.done():
            await self._task
        logger.info("PartectorBleScanner stopped.")

    # == Internal Async Processing =================================================================
    async def _detection_callback(self, device: BLEDevice, adv: AdvertisementData) -> None:
        """Handles the callbacks from the BleakScanner used in the scan method.

        Args:
            device (BLEDevice): Bleak BLEDevice object
            adv (AdvertisementData): Bleak AdvertisementData object
        """

        if not device.name or device.name not in self.BLE_NAMES_NANEOS:
            return

        adv_data = PartectorBleDecoder.decode_partector_advertisement(adv)
        if not adv_data:
            return

        decoded = PartectorBleDecoderStd.decode(adv_data[0], data_structure=None)
        if not decoded.serial_number:
            return
        if adv_data[1]:
            decoded = PartectorBleDecoderAux.decode(adv_data[1], data_structure=decoded)
        decoded.unix_timestamp = int(time.time()) * 1000
        decoded.connection_type = NaneosDeviceDataPoint.CONN_TYPE_ADVERTISEMENT

        if self._queue.full():  # if the queue is full, make space by removing the oldest item
            await self._queue.get()
        await self._queue.put((device, decoded))

    async def scan(self) -> None:
        """Scans for BLE devices and calls the _detection_callback method for each device found."""

        scanner = BleakScanner(self._detection_callback)

        while not self._stop_event.is_set():
            try:
                async with scanner:
                    await asyncio.sleep(self.SCAN_INTERVAL)
            except Exception as e:
                logger.exception(e)
                await asyncio.sleep(self.SCAN_INTERVAL)  # small backoff before retry

__init__(loop, queue)

Initializes the scanner with the given event loop and queue.

Parameters:

Name Type Description Default
loop AbstractEventLoop

The event loop to run the scanner in.

required
queue Queue

The queue to store the scanned data.

required
Source code in src/naneos/partector_ble/partector_ble_scanner.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    loop: asyncio.AbstractEventLoop,
    queue: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]],
) -> None:
    """
    Initializes the scanner with the given event loop and queue.

    Args:
        loop (asyncio.AbstractEventLoop): The event loop to run the scanner in.
        queue (asyncio.Queue): The queue to store the scanned data.
    """
    self._loop = loop
    self._queue = queue

    self._task: asyncio.Task | None = None

    self._stop_event = asyncio.Event()
    self._stop_event.set()  # stopped by default

create_scanner_queue() staticmethod

Create a queue for the scanner.

Source code in src/naneos/partector_ble/partector_ble_scanner.py
33
34
35
36
37
38
39
40
@staticmethod
def create_scanner_queue() -> asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]]:
    """Create a queue for the scanner."""
    queue_scanner: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]] = asyncio.Queue(
        maxsize=100
    )

    return queue_scanner

scan() async

Scans for BLE devices and calls the _detection_callback method for each device found.

Source code in src/naneos/partector_ble/partector_ble_scanner.py
117
118
119
120
121
122
123
124
125
126
127
128
async def scan(self) -> None:
    """Scans for BLE devices and calls the _detection_callback method for each device found."""

    scanner = BleakScanner(self._detection_callback)

    while not self._stop_event.is_set():
        try:
            async with scanner:
                await asyncio.sleep(self.SCAN_INTERVAL)
        except Exception as e:
            logger.exception(e)
            await asyncio.sleep(self.SCAN_INTERVAL)  # small backoff before retry

start()

Starts the scanner.

Source code in src/naneos/partector_ble/partector_ble_scanner.py
71
72
73
74
75
76
77
78
79
def start(self) -> None:
    """Starts the scanner."""
    if not self._stop_event.is_set():
        logger.warning("You called PartectorBleScanner.start() but scanner is already running.")
        return

    logger.debug("Starting PartectorBleScanner...")
    self._stop_event.clear()
    self._task = self._loop.create_task(self.scan())

stop() async

Stops the scanner.

Source code in src/naneos/partector_ble/partector_ble_scanner.py
81
82
83
84
85
86
87
async def stop(self) -> None:
    """Stops the scanner."""
    logger.debug("Stopping PartectorBleScanner...")
    self._stop_event.set()
    if self._task and not self._task.done():
        await self._task
    logger.info("PartectorBleScanner stopped.")