Skip to content

naneos.partector_ble.partector_ble_manager

PartectorBleManager

Bases: Thread

Source code in src/naneos/partector_ble/partector_ble_manager.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class PartectorBleManager(threading.Thread):
    def __init__(self) -> None:
        super().__init__(daemon=True)
        self._stop_event = threading.Event()
        self._task_stop_event = asyncio.Event()

        self._queue_scanner = PartectorBleScanner.create_scanner_queue()
        self._queue_connection = PartectorBleConnection.create_connection_queue()
        self._connections: dict[int, tuple[asyncio.Task, int]] = {}  # key: serial_number

        self._data: dict[int, pd.DataFrame] = {}

    def get_data(self) -> dict[int, pd.DataFrame]:
        """Returns the data dictionary and deletes it."""
        data = self._data
        self._data = {}
        return data

    def stop(self) -> None:
        self._task_stop_event.set()
        self._stop_event.set()

    def run(self) -> None:
        try:
            asyncio.run(self._async_run())
        except RuntimeError as e:
            logger.exception(f"BLEManager loop exited with: {e}")

    def get_connected_device_strings(self) -> list[str]:
        """Returns a list of connected device strings."""
        # first make a copy to avoid runtime dict change issues
        connections_copy = self._connections.copy()
        sns = connections_copy.keys()
        device_types = [connections_copy[s][1] for s in sns]

        sns_list = []
        for sn, dev_type in zip(sns, device_types):
            if dev_type == NaneosDeviceDataPoint.DEV_TYPE_P2PRO:
                sns_list.append(f"SN{sn} (P2 Pro)")
        for sn, dev_type in zip(sns, device_types):
            if dev_type == NaneosDeviceDataPoint.DEV_TYPE_P2:
                sns_list.append(f"SN{sn} (P2)")

        return sns_list

    def get_connected_serial_numbers(self) -> list[int | None]:
        """Returns a list of connected serial numbers."""
        return list(self._connections.keys())

    async def _bleak_is_bluetooth_adapter_available(self) -> bool:
        """Check if the Bluetooth adapter is available and powered on."""
        try:
            # Try to get adapter info - this will fail if adapter is not available
            scanner = BleakScanner()
            # Test if we can discover devices briefly
            await scanner.start()
            await scanner.stop()
            return True
        except Exception as e:
            logger.debug(f"Bluetooth adapter not available: {e}")
            return False

    async def _linux_is_bluetooth_adapter_available(self) -> bool:
        """
        Nutzt BlueZ (bluetoothctl show), um zu prüfen, ob
        - ein Bluetooth-Controller existiert und
        - er eingeschaltet ("Powered: yes") ist.
        """
        try:
            proc = await asyncio.create_subprocess_exec(
                "bluetoothctl",
                "show",
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            stdout, stderr = await proc.communicate()

            if proc.returncode != 0:
                logger.debug(
                    "bluetoothctl show failed with code %s: %s",
                    proc.returncode,
                    stderr.decode(errors="ignore").strip(),
                )
                return False

            output = stdout.decode(errors="ignore")

            if "No default controller available" in output:
                logger.debug("No default Bluetooth controller available (BlueZ).")
                return False

            powered = None
            for line in output.splitlines():
                line = line.strip()
                if line.lower().startswith("powered:"):
                    powered = "yes" in line.lower()
                    break

            if powered is not None:
                return powered

            logger.debug("Bluetooth controller found but no 'Powered' field in output.")
            return False

        except FileNotFoundError:
            logger.debug("bluetoothctl not found on system.")
            return False

        except Exception as e:
            logger.debug(f"Error while checking Bluetooth adapter via bluetoothctl: {e}")
            return False

    async def _is_bluetooth_adapter_available(self) -> bool:
        if sys.platform.startswith("linux"):
            return await self._linux_is_bluetooth_adapter_available()
        else:
            return await self._bleak_is_bluetooth_adapter_available()

    async def _wait_for_bluetooth_adapter(self) -> None:
        """Wait for the Bluetooth adapter to become available."""
        adapter_check_interval = 3.0  # seconds

        while not self._stop_event.is_set():
            if await self._is_bluetooth_adapter_available():
                logger.info("Bluetooth adapter is available and ready.")
                return

            logger.info(
                f"Bluetooth adapter not available. Retrying in {adapter_check_interval} seconds..."
            )
            await asyncio.sleep(adapter_check_interval)

    async def _async_run(self):
        self._loop = asyncio.get_event_loop()
        while not self._stop_event.is_set():
            try:
                # Wait for Bluetooth adapter to become available
                await self._wait_for_bluetooth_adapter()
                self._task_stop_event.clear()

                async with PartectorBleScanner(loop=self._loop, queue=self._queue_scanner):
                    logger.info("Scanner started.")
                    await self._manager_loop()
                await self._kill_all_connections()  # just to be safe
            except asyncio.CancelledError:
                logger.info("BLEManager cancelled.")
            finally:
                logger.info("BLEManager cleanup complete.")

    async def _manager_loop(self) -> None:
        while not self._stop_event.is_set():
            try:
                if not await self._is_bluetooth_adapter_available():
                    logger.warning("Bluetooth adapter lost. Stopping all connections...")
                    await self._kill_all_connections()
                    return

                await asyncio.sleep(0.3)

                await self._scanner_queue_routine()
                await self._connection_queue_routine()
                await self._check_device_types()
                await self._remove_done_tasks()

            except asyncio.TimeoutError:
                continue
            except Exception as e:
                logger.exception(f"Error in manager loop: {e}")

        await self._finish_all_connections()

    async def _kill_all_connections(self) -> None:
        self._task_stop_event.set()

        for serial in list(self._connections.keys()):
            if not self._connections[serial][0].done():
                logger.info(f"Cancelling connection task {serial}.")
                self._connections[serial][0].cancel()
            self._connections.pop(serial, None)
            logger.info(f"{serial}: Connection task cancelled and popped.")

    async def _finish_all_connections_blocking(self) -> None:
        while list(self._connections.keys()):
            serial = list(self._connections.keys())[0]

            if not self._connections[serial][0].done():
                await asyncio.sleep(1)
            else:
                self._connections.pop(serial, None)

    async def _finish_all_connections(self) -> None:
        self._task_stop_event.set()
        await asyncio.sleep(1)  # give tasks some time to finish gracefully

        # wait max 5s for _finish_all_connections_blocking to finish
        try:
            await asyncio.wait_for(self._finish_all_connections_blocking(), timeout=7)
        except asyncio.TimeoutError:
            logger.warning("Timeout waiting for connections to finish. Forcing cancellation.")

        for serial in list(self._connections.keys()):
            if not self._connections[serial][0].done():
                logger.warning(f"Forcing connection task {serial} to cancel.")
                self._connections[serial][0].cancel()
                await asyncio.sleep(0.1)  # small delay to allow cancellation to propagate
                # logger.info(f"Waiting for connection task {serial} to finish.")
                # await self._connections[serial]

            self._connections.pop(serial, None)
            logger.info(f"{serial}: Connection task finished and popped.")

    async def _task_connection(self, device: BLEDevice, serial: int) -> None:
        try:
            async with PartectorBleConnection(
                device=device, loop=self._loop, serial_number=serial, queue=self._queue_connection
            ):
                while not self._task_stop_event.is_set():
                    await asyncio.sleep(0.5)

        except asyncio.CancelledError:
            logger.info(f"{serial}: Connection task cancelled.")
        except Exception as e:
            logger.warning(f"{serial}: Connection task failed: {e}")
        finally:
            logger.info(f"{serial}: Connection task finished.")

    async def _scanner_queue_routine(self) -> None:
        """Process scanner queue with batch collection to reduce DataFrame operations.

        Instead of calling add_data_point_to_dict for each item (expensive),
        collect all items first, then add them in bulk.
        """
        to_check: dict[int, BLEDevice] = {}
        batch_data: list[NaneosDeviceDataPoint] = []

        # Collect all available items from queue (non-blocking batch)
        while not self._queue_scanner.empty():
            try:
                device, decoded = self._queue_scanner.get_nowait()
            except asyncio.QueueEmpty:
                break

            if not decoded.serial_number:
                continue

            batch_data.append(decoded)
            to_check[decoded.serial_number] = device

        # Add all data points at once (more efficient than individual additions)
        for decoded in batch_data:
            self._data = NaneosDeviceDataPoint.add_data_point_to_dict(self._data, decoded)

        # check for new devices
        for serial, device in to_check.items():
            if serial in self._connections:
                continue  # already connected

            logger.info(f"New device detected: serial={serial}, address={device.address}")
            task = self._loop.create_task(self._task_connection(device, serial))
            self._connections[serial] = (task, NaneosDeviceDataPoint.DEV_TYPE_P2)

    async def _connection_queue_routine(self) -> None:
        """Process connection queue with batch collection to reduce DataFrame operations.

        Instead of calling add_data_point_to_dict for each item (expensive),
        collect all items first, then add them in bulk.
        """
        batch_data: list[NaneosDeviceDataPoint] = []

        # Collect all available items from queue (non-blocking batch)
        while not self._queue_connection.empty():
            try:
                data = self._queue_connection.get_nowait()
            except asyncio.QueueEmpty:
                break

            batch_data.append(data)

        # Add all data points at once (more efficient than individual additions)
        for data in batch_data:
            self._data = NaneosDeviceDataPoint.add_data_point_to_dict(self._data, data)

    async def _check_device_types(self) -> None:
        for serial in self._data.keys():
            if serial not in self._connections:
                continue

            current_type = self._connections[serial][1]
            data_points = self._data[serial]

            # get last value of device_type column
            if data_points.empty:
                continue
            last_device_type = data_points["device_type"].iloc[-1]
            if last_device_type != current_type:
                self._connections[serial] = (
                    self._connections[serial][0],
                    last_device_type,
                )

    async def _remove_done_tasks(self) -> None:
        """Remove completed tasks from the connections dictionary."""
        for serial in list(self._connections.keys()):
            if self._connections[serial][0].done():
                self._connections.pop(serial, None)
                logger.info(f"{serial}: Connection task finished and popped.")

get_connected_device_strings()

Returns a list of connected device strings.

Source code in src/naneos/partector_ble/partector_ble_manager.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def get_connected_device_strings(self) -> list[str]:
    """Returns a list of connected device strings."""
    # first make a copy to avoid runtime dict change issues
    connections_copy = self._connections.copy()
    sns = connections_copy.keys()
    device_types = [connections_copy[s][1] for s in sns]

    sns_list = []
    for sn, dev_type in zip(sns, device_types):
        if dev_type == NaneosDeviceDataPoint.DEV_TYPE_P2PRO:
            sns_list.append(f"SN{sn} (P2 Pro)")
    for sn, dev_type in zip(sns, device_types):
        if dev_type == NaneosDeviceDataPoint.DEV_TYPE_P2:
            sns_list.append(f"SN{sn} (P2)")

    return sns_list

get_connected_serial_numbers()

Returns a list of connected serial numbers.

Source code in src/naneos/partector_ble/partector_ble_manager.py
67
68
69
def get_connected_serial_numbers(self) -> list[int | None]:
    """Returns a list of connected serial numbers."""
    return list(self._connections.keys())

get_data()

Returns the data dictionary and deletes it.

Source code in src/naneos/partector_ble/partector_ble_manager.py
34
35
36
37
38
def get_data(self) -> dict[int, pd.DataFrame]:
    """Returns the data dictionary and deletes it."""
    data = self._data
    self._data = {}
    return data