Context-managed BLE scanner for Partector devices.
This scanner runs in the provided asyncio event loop and collects advertisement data
from BLE devices named "P2" or "PartectorBT". Decoded advertisement payloads are
pushed into an asyncio.Queue for further processing. Can be used with async with
for automatic startup and cleanup.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 | class PartectorBleScanner:
"""
Context-managed BLE scanner for Partector devices.
This scanner runs in the provided asyncio event loop and collects advertisement data
from BLE devices named "P2" or "PartectorBT". Decoded advertisement payloads are
pushed into an asyncio.Queue for further processing. Can be used with `async with`
for automatic startup and cleanup.
"""
SCAN_INTERVAL = 0.8 # seconds
BLE_NAMES_NANEOS = {"P2", "PartectorBT"} # P2 on windows, PartectorBT on linux / mac
# static methods ###############################################################################
@staticmethod
def create_scanner_queue() -> asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]]:
"""Create a queue for the scanner."""
# Increased maxsize to 500 to handle bursts from multiple devices
# Prevents message loss on systems with many concurrent BLE connections
queue_scanner: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]] = asyncio.Queue(
maxsize=500
)
return queue_scanner
# == Lifecycle and Context Management ==========================================================
def __init__(
self,
loop: asyncio.AbstractEventLoop,
queue: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]],
) -> None:
"""
Initializes the scanner with the given event loop and queue.
Args:
loop (asyncio.AbstractEventLoop): The event loop to run the scanner in.
queue (asyncio.Queue): The queue to store the scanned data.
"""
self._loop = loop
self._queue = queue
self._task: asyncio.Task | None = None
self._stop_event = asyncio.Event()
self._stop_event.set() # stopped by default
async def __aenter__(self) -> PartectorBleScanner:
self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.stop()
# == Public Methods ============================================================================
def start(self) -> None:
"""Starts the scanner."""
if not self._stop_event.is_set():
logger.warning("You called PartectorBleScanner.start() but scanner is already running.")
return
logger.debug("Starting PartectorBleScanner...")
self._stop_event.clear()
self._task = self._loop.create_task(self.scan())
async def stop(self) -> None:
"""Stops the scanner."""
logger.debug("Stopping PartectorBleScanner...")
self._stop_event.set()
if self._task and not self._task.done():
await self._task
logger.info("PartectorBleScanner stopped.")
# == Internal Async Processing =================================================================
async def _detection_callback(self, device: BLEDevice, adv: AdvertisementData) -> None:
"""Handles the callbacks from the BleakScanner used in the scan method.
Args:
device (BLEDevice): Bleak BLEDevice object
adv (AdvertisementData): Bleak AdvertisementData object
"""
if not device.name or device.name not in self.BLE_NAMES_NANEOS:
return
adv_data = PartectorBleDecoder.decode_partector_advertisement(adv)
if not adv_data:
return
decoded = PartectorBleDecoderStd.decode(adv_data[0], data_structure=None)
if not decoded.serial_number:
return
if adv_data[1]:
decoded = PartectorBleDecoderAux.decode(adv_data[1], data_structure=decoded)
decoded.unix_timestamp = int(time.time()) * 1000
decoded.connection_type = NaneosDeviceDataPoint.CONN_TYPE_ADVERTISEMENT
# Non-blocking put with overflow handling: drop oldest item if queue is full
# This prevents callbacks from being delayed by queue operations
try:
if self._queue.full():
try:
self._queue.get_nowait() # Remove oldest item
except asyncio.QueueEmpty:
pass
self._queue.put_nowait((device, decoded))
except asyncio.QueueFull:
logger.debug(f"Scanner queue full, dropping advertisement from {device.address}")
async def scan(self) -> None:
"""Scans for BLE devices and calls the _detection_callback method for each device found."""
scanner = BleakScanner(self._detection_callback)
while not self._stop_event.is_set():
try:
async with scanner:
await asyncio.sleep(self.SCAN_INTERVAL)
except Exception as e:
logger.exception(e)
await asyncio.sleep(self.SCAN_INTERVAL) # small backoff before retry
|
__init__(loop, queue)
Initializes the scanner with the given event loop and queue.
Parameters:
| Name |
Type |
Description |
Default |
loop
|
AbstractEventLoop
|
The event loop to run the scanner in.
|
required
|
queue
|
Queue
|
The queue to store the scanned data.
|
required
|
Source code in src/naneos/partector_ble/partector_ble_scanner.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | def __init__(
self,
loop: asyncio.AbstractEventLoop,
queue: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]],
) -> None:
"""
Initializes the scanner with the given event loop and queue.
Args:
loop (asyncio.AbstractEventLoop): The event loop to run the scanner in.
queue (asyncio.Queue): The queue to store the scanned data.
"""
self._loop = loop
self._queue = queue
self._task: asyncio.Task | None = None
self._stop_event = asyncio.Event()
self._stop_event.set() # stopped by default
|
create_scanner_queue()
staticmethod
Create a queue for the scanner.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
33
34
35
36
37
38
39
40
41
42 | @staticmethod
def create_scanner_queue() -> asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]]:
"""Create a queue for the scanner."""
# Increased maxsize to 500 to handle bursts from multiple devices
# Prevents message loss on systems with many concurrent BLE connections
queue_scanner: asyncio.Queue[tuple[BLEDevice, NaneosDeviceDataPoint]] = asyncio.Queue(
maxsize=500
)
return queue_scanner
|
scan()
async
Scans for BLE devices and calls the _detection_callback method for each device found.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
127
128
129
130
131
132
133
134
135
136
137
138 | async def scan(self) -> None:
"""Scans for BLE devices and calls the _detection_callback method for each device found."""
scanner = BleakScanner(self._detection_callback)
while not self._stop_event.is_set():
try:
async with scanner:
await asyncio.sleep(self.SCAN_INTERVAL)
except Exception as e:
logger.exception(e)
await asyncio.sleep(self.SCAN_INTERVAL) # small backoff before retry
|
start()
Starts the scanner.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
73
74
75
76
77
78
79
80
81 | def start(self) -> None:
"""Starts the scanner."""
if not self._stop_event.is_set():
logger.warning("You called PartectorBleScanner.start() but scanner is already running.")
return
logger.debug("Starting PartectorBleScanner...")
self._stop_event.clear()
self._task = self._loop.create_task(self.scan())
|
stop()
async
Stops the scanner.
Source code in src/naneos/partector_ble/partector_ble_scanner.py
| async def stop(self) -> None:
"""Stops the scanner."""
logger.debug("Stopping PartectorBleScanner...")
self._stop_event.set()
if self._task and not self._task.done():
await self._task
logger.info("PartectorBleScanner stopped.")
|