Context-managed BLE scanner for Partector devices.
This scanner runs in the provided asyncio event loop and collects advertisement data
from BLE devices named "P2" or "PartectorBT". Decoded advertisement payloads are
pushed into an asyncio.Queue for further processing. Can be used with async with
for automatic startup and cleanup.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | class PartectorBleScanner:
"""
Context-managed BLE scanner for Partector devices.
This scanner runs in the provided asyncio event loop and collects advertisement data
from BLE devices named "P2" or "PartectorBT". Decoded advertisement payloads are
pushed into an asyncio.Queue for further processing. Can be used with `async with`
for automatic startup and cleanup.
"""
SCAN_INTERVAL = 0.8 # seconds
BLE_NAMES_NANEOS = {"P2", "PartectorBT"} # P2 on windows, PartectorBT on linux / mac
# static methods ###############################################################################
@staticmethod
def create_scanner_queue() -> asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]]:
"""Create a queue for the scanner."""
queue_scanner: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]] = asyncio.Queue(
maxsize=100
)
return queue_scanner
# == Lifecycle and Context Management ==========================================================
def __init__(
self,
loop: asyncio.AbstractEventLoop,
queue: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]],
) -> None:
"""
Initializes the scanner with the given event loop and queue.
Args:
loop (asyncio.AbstractEventLoop): The event loop to run the scanner in.
queue (asyncio.Queue): The queue to store the scanned data.
"""
self._loop = loop
self._queue = queue
self._task: asyncio.Task | None = None
self._stop_event = asyncio.Event()
self._stop_event.set() # stopped by default
async def __aenter__(self) -> PartectorBleScanner:
self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.stop()
# == Public Methods ============================================================================
def start(self) -> None:
"""Starts the scanner."""
if not self._stop_event.is_set():
logger.warning("You called PartectorBleScanner.start() but scanner is already running.")
return
logger.debug("Starting PartectorBleScanner...")
self._stop_event.clear()
self._task = self._loop.create_task(self.scan())
async def stop(self) -> None:
"""Stops the scanner."""
logger.debug("Stopping PartectorBleScanner...")
self._stop_event.set()
if self._task and not self._task.done():
await self._task
logger.info("PartectorBleScanner stopped.")
# == Internal Async Processing =================================================================
async def _detection_callback(self, device: BLEDevice, adv: AdvertisementData) -> None:
"""Handles the callbacks from the BleakScanner used in the scan method.
Args:
device (BLEDevice): Bleak BLEDevice object
adv (AdvertisementData): Bleak AdvertisementData object
"""
if not device.name or device.name not in self.BLE_NAMES_NANEOS:
return
adv_data = PartectorBleDecoder.decode_partector_advertisement(adv)
if not adv_data:
return
decoded = PartectorBleDecoderStd.decode(adv_data[0], data_structure=None)
if not decoded.serial_number:
return
if adv_data[1]:
decoded = PartectorBleDecoderAux.decode(adv_data[1], data_structure=decoded)
decoded.unix_timestamp = int(time.time()) * 1000
decoded.connection_type = NaneosDeviceDataPoint.CONN_TYPE_ADVERTISEMENT
if self._queue.full(): # if the queue is full, make space by removing the oldest item
await self._queue.get()
await self._queue.put((device, decoded))
async def scan(self) -> None:
"""Scans for BLE devices and calls the _detection_callback method for each device found."""
scanner = BleakScanner(self._detection_callback)
while not self._stop_event.is_set():
try:
async with scanner:
await asyncio.sleep(self.SCAN_INTERVAL)
except Exception as e:
logger.exception(e)
await asyncio.sleep(self.SCAN_INTERVAL) # small backoff before retry
|
__init__(loop, queue)
Initializes the scanner with the given event loop and queue.
Parameters:
Name |
Type |
Description |
Default |
loop
|
AbstractEventLoop
|
The event loop to run the scanner in.
|
required
|
queue
|
Queue
|
The queue to store the scanned data.
|
required
|
Source code in src/naneos/partector_ble/partector_ble_scanner.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | def __init__(
self,
loop: asyncio.AbstractEventLoop,
queue: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]],
) -> None:
"""
Initializes the scanner with the given event loop and queue.
Args:
loop (asyncio.AbstractEventLoop): The event loop to run the scanner in.
queue (asyncio.Queue): The queue to store the scanned data.
"""
self._loop = loop
self._queue = queue
self._task: asyncio.Task | None = None
self._stop_event = asyncio.Event()
self._stop_event.set() # stopped by default
|
create_scanner_queue()
staticmethod
Create a queue for the scanner.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
| @staticmethod
def create_scanner_queue() -> asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]]:
"""Create a queue for the scanner."""
queue_scanner: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]] = asyncio.Queue(
maxsize=100
)
return queue_scanner
|
scan()
async
Scans for BLE devices and calls the _detection_callback method for each device found.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
117
118
119
120
121
122
123
124
125
126
127
128 | async def scan(self) -> None:
"""Scans for BLE devices and calls the _detection_callback method for each device found."""
scanner = BleakScanner(self._detection_callback)
while not self._stop_event.is_set():
try:
async with scanner:
await asyncio.sleep(self.SCAN_INTERVAL)
except Exception as e:
logger.exception(e)
await asyncio.sleep(self.SCAN_INTERVAL) # small backoff before retry
|
start()
Starts the scanner.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
71
72
73
74
75
76
77
78
79 | def start(self) -> None:
"""Starts the scanner."""
if not self._stop_event.is_set():
logger.warning("You called PartectorBleScanner.start() but scanner is already running.")
return
logger.debug("Starting PartectorBleScanner...")
self._stop_event.clear()
self._task = self._loop.create_task(self.scan())
|
stop()
async
Stops the scanner.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
| async def stop(self) -> None:
"""Stops the scanner."""
logger.debug("Stopping PartectorBleScanner...")
self._stop_event.set()
if self._task and not self._task.done():
await self._task
logger.info("PartectorBleScanner stopped.")
|