Skip to content

partector_ble_connection

PartectorBleConnection

Source code in src/naneos/partector_ble/partector_ble_connection.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
class PartectorBleConnection:
    SERVICE_UUID = "0bd51666-e7cb-469b-8e4d-2742f1ba77cc"
    CHAR_UUIDS = {
        "std": "e7add780-b042-4876-aae1-112855353cc1",
        "aux": "e7add781-b042-4876-aae1-112855353cc1",
        "write": "e7add782-b042-4876-aae1-112855353cc1",
        "read": "e7add783-b042-4876-aae1-112855353cc1",
        "size_dist": "e7add784-b042-4876-aae1-112855353cc1",
    }

    # static methods ###############################################################################
    @staticmethod
    def create_connection_queue() -> asyncio.Queue[NaneosDeviceDataPoint]:
        """Create a queue for the scanner."""
        queue_connection: asyncio.Queue[NaneosDeviceDataPoint] = asyncio.Queue(maxsize=100)

        return queue_connection

    # == Lifecycle and Context Management ==========================================================
    def __init__(
        self,
        device: BLEDevice,
        loop: asyncio.AbstractEventLoop,
        serial_number: int,
        queue: asyncio.Queue[NaneosDeviceDataPoint],
    ) -> None:
        """
        Initializes the BLE connection with the given device, event loop, and queue.

        Args:
            device (BLEDevice): The BLE device to connect to.
            loop (asyncio.AbstractEventLoop): The event loop to run the connection in.
            serial_number (int): The serial number of the device.
        """
        self.SERIAL_NUMBER = serial_number
        self._device_type = NaneosDeviceDataPoint.DEV_TYPE_P2  # Thats the deafault value
        self._data = NaneosDeviceDataPoint()
        self._next_ts = 0.0
        self._queue = queue

        self._device = device
        self._loop = loop
        self._task: asyncio.Task | None = None
        self._stop_event = asyncio.Event()
        self._stop_event.set()  # stopped by default
        self._client = BleakClient(device, self._disconnect_callback, timeout=10)

    async def __aenter__(self) -> PartectorBleConnection:
        self.start()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.stop()

    # == Public Methods ============================================================================
    def start(self) -> None:
        """Starts the scanner."""
        if not self._stop_event.is_set():
            logger.warning("SN{self._serial_number}: start() called while already running")
            return
        self._stop_event.clear()
        self._task = self._loop.create_task(self._run())

    async def stop(self) -> None:
        """Stops the scanner."""
        self._stop_event.set()
        if self._task and not self._task.done():
            await self._task
        logger.info(f"SN{self.SERIAL_NUMBER}: PartectorBleConnection stopped")

    async def _run(self) -> None:
        try:
            self._next_ts = int(time.time()) + 1.0

            while not self._stop_event.is_set():
                try:
                    wait = self._next_ts - time.time()
                    if wait > 0:
                        await asyncio.sleep(wait)
                        self._next_ts += 1.0
                    else:
                        logger.warning(f"SN{self.SERIAL_NUMBER}: Waiting time negative: {wait}")
                        self._next_ts = int(time.time()) + 1.0

                    if self._client.is_connected:
                        self._queue.put_nowait(self._data)
                        self._data = NaneosDeviceDataPoint(
                            device_type=self._device_type,
                            serial_number=self.SERIAL_NUMBER,
                            connection_type=NaneosDeviceDataPoint.CONN_TYPE_CONNECTED,
                            # TODO: add firware version from device here
                        )
                        continue

                    await self._client.connect()
                    await self._client.start_notify(self.CHAR_UUIDS["std"], self._callback_std)
                    await self._client.start_notify(self.CHAR_UUIDS["aux"], self._callback_aux)
                    await self._client.start_notify(
                        self.CHAR_UUIDS["size_dist"], self._callback_size_dist
                    )
                    logger.info(f"SN{self.SERIAL_NUMBER}: Connected to {self._device.address}")
                    self._next_ts = int(time.time()) + 1.0
                except asyncio.TimeoutError:
                    logger.warning(f"SN{self.SERIAL_NUMBER}: Connection timeout.")
                    await asyncio.sleep(30)
                    # self._add_old_device_data(values)
                    # TODO: mark as connected or old device
                except BleakDeviceNotFoundError:
                    logger.warning(f"SN{self.SERIAL_NUMBER}: Device not found.")
                    await asyncio.sleep(30)
                    # TODO: mark as connected or old device
                except Exception as e:
                    logger.warning(f"SN{self.SERIAL_NUMBER}: Unknown exception: {e}")
                    await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            logger.warning(f"SN{self.SERIAL_NUMBER}: _run task cancelled.")
        except Exception as e:
            logger.exception(f"SN{self.SERIAL_NUMBER}: _run task failed: {e}")
        finally:
            await self._disconnect_gracefully()

    async def _disconnect_gracefully(self) -> None:
        if not self._client.is_connected:
            return

        try:
            await asyncio.wait_for(self._client.stop_notify(self.CHAR_UUIDS["std"]), timeout=10)
            await asyncio.sleep(0.5)  # wait for windows to free resources
            await asyncio.wait_for(self._client.stop_notify(self.CHAR_UUIDS["aux"]), timeout=10)
            await asyncio.sleep(0.5)  # wait for windows to free resources
            await asyncio.wait_for(
                self._client.stop_notify(self.CHAR_UUIDS["size_dist"]), timeout=10
            )
            await asyncio.sleep(0.5)  # wait for windows to free resources
        except Exception as e:
            logger.exception(f"SN{self.SERIAL_NUMBER}: Failed to stop notify: {e}")

        try:
            await asyncio.wait_for(self._client.disconnect(), timeout=10)
            await asyncio.sleep(1)  # wait for windows to free resources
        except Exception as e:
            logger.exception(f"SN{self.SERIAL_NUMBER}: Failed to disconnect: {e}")

    def _disconnect_callback(self, client: BleakClient) -> None:
        """Callback on disconnect."""
        logger.debug(f"SN{self.SERIAL_NUMBER}: Disconnect callback called")

    def _callback_std(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
        """Callback on data received (std characteristic)."""
        self._data.unix_timestamp = int(time.time()) * 1000
        self._data = PartectorBleDecoderStd.decode(data, data_structure=self._data)

        logger.debug(f"SN{self.SERIAL_NUMBER}: Received std: {data.hex()}")

    def _callback_aux(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
        """Callback on data received (aux characteristic)."""
        self._data.unix_timestamp = int(time.time() * 1000)
        self._data = PartectorBleDecoderAux.decode(data, data_structure=self._data)

        logger.debug(f"SN{self.SERIAL_NUMBER}: Received aux: {data.hex()}")

    def _callback_size_dist(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
        """Callback on data received (size_dist characteristic)."""
        self._device_type = NaneosDeviceDataPoint.DEV_TYPE_P2PRO
        self._data.unix_timestamp = int(time.time() * 1000)
        self._data = PartectorBleDecoderSize.decode(data, data_structure=self._data)

        logger.debug(f"SN{self.SERIAL_NUMBER}: Received size: {data.hex()}")

__init__(device, loop, serial_number, queue)

Initializes the BLE connection with the given device, event loop, and queue.

Parameters:

Name Type Description Default
device BLEDevice

The BLE device to connect to.

required
loop AbstractEventLoop

The event loop to run the connection in.

required
serial_number int

The serial number of the device.

required
Source code in src/naneos/partector_ble/partector_ble_connection.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    device: BLEDevice,
    loop: asyncio.AbstractEventLoop,
    serial_number: int,
    queue: asyncio.Queue[NaneosDeviceDataPoint],
) -> None:
    """
    Initializes the BLE connection with the given device, event loop, and queue.

    Args:
        device (BLEDevice): The BLE device to connect to.
        loop (asyncio.AbstractEventLoop): The event loop to run the connection in.
        serial_number (int): The serial number of the device.
    """
    self.SERIAL_NUMBER = serial_number
    self._device_type = NaneosDeviceDataPoint.DEV_TYPE_P2  # Thats the deafault value
    self._data = NaneosDeviceDataPoint()
    self._next_ts = 0.0
    self._queue = queue

    self._device = device
    self._loop = loop
    self._task: asyncio.Task | None = None
    self._stop_event = asyncio.Event()
    self._stop_event.set()  # stopped by default
    self._client = BleakClient(device, self._disconnect_callback, timeout=10)

create_connection_queue() staticmethod

Create a queue for the scanner.

Source code in src/naneos/partector_ble/partector_ble_connection.py
32
33
34
35
36
37
@staticmethod
def create_connection_queue() -> asyncio.Queue[NaneosDeviceDataPoint]:
    """Create a queue for the scanner."""
    queue_connection: asyncio.Queue[NaneosDeviceDataPoint] = asyncio.Queue(maxsize=100)

    return queue_connection

start()

Starts the scanner.

Source code in src/naneos/partector_ble/partector_ble_connection.py
76
77
78
79
80
81
82
def start(self) -> None:
    """Starts the scanner."""
    if not self._stop_event.is_set():
        logger.warning("SN{self._serial_number}: start() called while already running")
        return
    self._stop_event.clear()
    self._task = self._loop.create_task(self._run())

stop() async

Stops the scanner.

Source code in src/naneos/partector_ble/partector_ble_connection.py
84
85
86
87
88
89
async def stop(self) -> None:
    """Stops the scanner."""
    self._stop_event.set()
    if self._task and not self._task.done():
        await self._task
    logger.info(f"SN{self.SERIAL_NUMBER}: PartectorBleConnection stopped")