22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327 | class PartectorBleManager(threading.Thread):
def __init__(self) -> None:
super().__init__(daemon=True)
self._stop_event = threading.Event()
self._task_stop_event = asyncio.Event()
self._queue_scanner = PartectorBleScanner.create_scanner_queue()
self._queue_connection = PartectorBleConnection.create_connection_queue()
self._connections: dict[int, tuple[asyncio.Task, int]] = {} # key: serial_number
self._data: dict[int, pd.DataFrame] = {}
def get_data(self) -> dict[int, pd.DataFrame]:
"""Returns the data dictionary and deletes it."""
data = self._data
self._data = {}
return data
def stop(self) -> None:
self._task_stop_event.set()
self._stop_event.set()
def run(self) -> None:
try:
asyncio.run(self._async_run())
except RuntimeError as e:
logger.exception(f"BLEManager loop exited with: {e}")
def get_connected_device_strings(self) -> list[str]:
"""Returns a list of connected device strings."""
# first make a copy to avoid runtime dict change issues
connections_copy = self._connections.copy()
sns = connections_copy.keys()
device_types = [connections_copy[s][1] for s in sns]
sns_list = []
for sn, dev_type in zip(sns, device_types):
if dev_type == NaneosDeviceDataPoint.DEV_TYPE_P2PRO:
sns_list.append(f"SN{sn} (P2 Pro)")
for sn, dev_type in zip(sns, device_types):
if dev_type == NaneosDeviceDataPoint.DEV_TYPE_P2:
sns_list.append(f"SN{sn} (P2)")
return sns_list
def get_connected_serial_numbers(self) -> list[int | None]:
"""Returns a list of connected serial numbers."""
return list(self._connections.keys())
async def _bleak_is_bluetooth_adapter_available(self) -> bool:
"""Check if the Bluetooth adapter is available and powered on."""
try:
# Try to get adapter info - this will fail if adapter is not available
scanner = BleakScanner()
# Test if we can discover devices briefly
await scanner.start()
await scanner.stop()
return True
except Exception as e:
logger.debug(f"Bluetooth adapter not available: {e}")
return False
async def _linux_is_bluetooth_adapter_available(self) -> bool:
"""
Nutzt BlueZ (bluetoothctl show), um zu prüfen, ob
- ein Bluetooth-Controller existiert und
- er eingeschaltet ("Powered: yes") ist.
"""
try:
proc = await asyncio.create_subprocess_exec(
"bluetoothctl",
"show",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.debug(
"bluetoothctl show failed with code %s: %s",
proc.returncode,
stderr.decode(errors="ignore").strip(),
)
return False
output = stdout.decode(errors="ignore")
if "No default controller available" in output:
logger.debug("No default Bluetooth controller available (BlueZ).")
return False
powered = None
for line in output.splitlines():
line = line.strip()
if line.lower().startswith("powered:"):
powered = "yes" in line.lower()
break
if powered is not None:
return powered
logger.debug("Bluetooth controller found but no 'Powered' field in output.")
return False
except FileNotFoundError:
logger.debug("bluetoothctl not found on system.")
return False
except Exception as e:
logger.debug(f"Error while checking Bluetooth adapter via bluetoothctl: {e}")
return False
async def _is_bluetooth_adapter_available(self) -> bool:
if sys.platform.startswith("linux"):
return await self._linux_is_bluetooth_adapter_available()
else:
return await self._bleak_is_bluetooth_adapter_available()
async def _wait_for_bluetooth_adapter(self) -> None:
"""Wait for the Bluetooth adapter to become available."""
adapter_check_interval = 3.0 # seconds
while not self._stop_event.is_set():
if await self._is_bluetooth_adapter_available():
logger.info("Bluetooth adapter is available and ready.")
return
logger.info(
f"Bluetooth adapter not available. Retrying in {adapter_check_interval} seconds..."
)
await asyncio.sleep(adapter_check_interval)
async def _async_run(self):
self._loop = asyncio.get_event_loop()
while not self._stop_event.is_set():
try:
# Wait for Bluetooth adapter to become available
await self._wait_for_bluetooth_adapter()
self._task_stop_event.clear()
async with PartectorBleScanner(loop=self._loop, queue=self._queue_scanner):
logger.info("Scanner started.")
await self._manager_loop()
await self._kill_all_connections() # just to be safe
except asyncio.CancelledError:
logger.info("BLEManager cancelled.")
finally:
logger.info("BLEManager cleanup complete.")
async def _manager_loop(self) -> None:
while not self._stop_event.is_set():
try:
if not await self._is_bluetooth_adapter_available():
logger.warning("Bluetooth adapter lost. Stopping all connections...")
await self._kill_all_connections()
return
await asyncio.sleep(0.3)
await self._scanner_queue_routine()
await self._connection_queue_routine()
await self._check_device_types()
await self._remove_done_tasks()
except asyncio.TimeoutError:
continue
except Exception as e:
logger.exception(f"Error in manager loop: {e}")
await self._finish_all_connections()
async def _kill_all_connections(self) -> None:
self._task_stop_event.set()
for serial in list(self._connections.keys()):
if not self._connections[serial][0].done():
logger.info(f"Cancelling connection task {serial}.")
self._connections[serial][0].cancel()
self._connections.pop(serial, None)
logger.info(f"{serial}: Connection task cancelled and popped.")
async def _finish_all_connections_blocking(self) -> None:
while list(self._connections.keys()):
serial = list(self._connections.keys())[0]
if not self._connections[serial][0].done():
await asyncio.sleep(1)
else:
self._connections.pop(serial, None)
async def _finish_all_connections(self) -> None:
self._task_stop_event.set()
await asyncio.sleep(1) # give tasks some time to finish gracefully
# wait max 5s for _finish_all_connections_blocking to finish
try:
await asyncio.wait_for(self._finish_all_connections_blocking(), timeout=7)
except asyncio.TimeoutError:
logger.warning("Timeout waiting for connections to finish. Forcing cancellation.")
for serial in list(self._connections.keys()):
if not self._connections[serial][0].done():
logger.warning(f"Forcing connection task {serial} to cancel.")
self._connections[serial][0].cancel()
await asyncio.sleep(0.1) # small delay to allow cancellation to propagate
# logger.info(f"Waiting for connection task {serial} to finish.")
# await self._connections[serial]
self._connections.pop(serial, None)
logger.info(f"{serial}: Connection task finished and popped.")
async def _task_connection(self, device: BLEDevice, serial: int) -> None:
try:
async with PartectorBleConnection(
device=device, loop=self._loop, serial_number=serial, queue=self._queue_connection
):
while not self._task_stop_event.is_set():
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info(f"{serial}: Connection task cancelled.")
except Exception as e:
logger.warning(f"{serial}: Connection task failed: {e}")
finally:
logger.info(f"{serial}: Connection task finished.")
async def _scanner_queue_routine(self) -> None:
"""Process scanner queue with batch collection to reduce DataFrame operations.
Instead of calling add_data_point_to_dict for each item (expensive),
collect all items first, then add them in bulk.
"""
to_check: dict[int, BLEDevice] = {}
batch_data: list[NaneosDeviceDataPoint] = []
# Collect all available items from queue (non-blocking batch)
while not self._queue_scanner.empty():
try:
device, decoded = self._queue_scanner.get_nowait()
except asyncio.QueueEmpty:
break
if not decoded.serial_number:
continue
batch_data.append(decoded)
to_check[decoded.serial_number] = device
# Add all data points at once (more efficient than individual additions)
for decoded in batch_data:
self._data = NaneosDeviceDataPoint.add_data_point_to_dict(self._data, decoded)
# check for new devices
for serial, device in to_check.items():
if serial in self._connections:
continue # already connected
logger.info(f"New device detected: serial={serial}, address={device.address}")
task = self._loop.create_task(self._task_connection(device, serial))
self._connections[serial] = (task, NaneosDeviceDataPoint.DEV_TYPE_P2)
async def _connection_queue_routine(self) -> None:
"""Process connection queue with batch collection to reduce DataFrame operations.
Instead of calling add_data_point_to_dict for each item (expensive),
collect all items first, then add them in bulk.
"""
batch_data: list[NaneosDeviceDataPoint] = []
# Collect all available items from queue (non-blocking batch)
while not self._queue_connection.empty():
try:
data = self._queue_connection.get_nowait()
except asyncio.QueueEmpty:
break
batch_data.append(data)
# Add all data points at once (more efficient than individual additions)
for data in batch_data:
self._data = NaneosDeviceDataPoint.add_data_point_to_dict(self._data, data)
async def _check_device_types(self) -> None:
for serial in self._data.keys():
if serial not in self._connections:
continue
current_type = self._connections[serial][1]
data_points = self._data[serial]
# get last value of device_type column
if data_points.empty:
continue
last_device_type = data_points["device_type"].iloc[-1]
if last_device_type != current_type:
self._connections[serial] = (
self._connections[serial][0],
last_device_type,
)
async def _remove_done_tasks(self) -> None:
"""Remove completed tasks from the connections dictionary."""
for serial in list(self._connections.keys()):
if self._connections[serial][0].done():
self._connections.pop(serial, None)
logger.info(f"{serial}: Connection task finished and popped.")
|