21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 | class PartectorBleConnection:
SERVICE_UUID = "0bd51666-e7cb-469b-8e4d-2742f1ba77cc"
CHAR_UUIDS = {
"std": "e7add780-b042-4876-aae1-112855353cc1",
"aux": "e7add781-b042-4876-aae1-112855353cc1",
"write": "e7add782-b042-4876-aae1-112855353cc1",
"read": "e7add783-b042-4876-aae1-112855353cc1",
"size_dist": "e7add784-b042-4876-aae1-112855353cc1",
}
# static methods ###############################################################################
@staticmethod
def create_connection_queue() -> asyncio.Queue[NaneosDeviceDataPoint]:
"""Create a queue for the scanner."""
queue_connection: asyncio.Queue[NaneosDeviceDataPoint] = asyncio.Queue(maxsize=100)
return queue_connection
# == Lifecycle and Context Management ==========================================================
def __init__(
self,
device: BLEDevice,
loop: asyncio.AbstractEventLoop,
serial_number: int,
queue: asyncio.Queue[NaneosDeviceDataPoint],
) -> None:
"""
Initializes the BLE connection with the given device, event loop, and queue.
Args:
device (BLEDevice): The BLE device to connect to.
loop (asyncio.AbstractEventLoop): The event loop to run the connection in.
serial_number (int): The serial number of the device.
"""
self.SERIAL_NUMBER = serial_number
self._device_type = NaneosDeviceDataPoint.DEV_TYPE_P2 # Thats the deafault value
self._data = NaneosDeviceDataPoint()
self._next_ts = 0.0
self._queue = queue
self._device = device
self._loop = loop
self._task: asyncio.Task | None = None
self._stop_event = asyncio.Event()
self._stop_event.set() # stopped by default
self._client = BleakClient(device, self._disconnect_callback, timeout=10)
async def __aenter__(self) -> PartectorBleConnection:
self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.stop()
# == Public Methods ============================================================================
def start(self) -> None:
"""Starts the scanner."""
if not self._stop_event.is_set():
logger.warning("SN{self._serial_number}: start() called while already running")
return
self._stop_event.clear()
self._task = self._loop.create_task(self._run())
async def stop(self) -> None:
"""Stops the scanner."""
self._stop_event.set()
if self._task and not self._task.done():
await self._task
logger.info(f"SN{self.SERIAL_NUMBER}: PartectorBleConnection stopped")
async def _run(self) -> None:
try:
self._next_ts = int(time.time()) + 1.0
while not self._stop_event.is_set():
try:
wait = self._next_ts - time.time()
if wait > 0:
await asyncio.sleep(wait)
self._next_ts += 1.0
else:
logger.warning(f"SN{self.SERIAL_NUMBER}: Waiting time negative: {wait}")
self._next_ts = int(time.time()) + 1.0
if self._client.is_connected:
self._queue.put_nowait(self._data)
self._data = NaneosDeviceDataPoint(
device_type=self._device_type,
serial_number=self.SERIAL_NUMBER,
connection_type=NaneosDeviceDataPoint.CONN_TYPE_CONNECTED,
# TODO: add firware version from device here
)
continue
await self._client.connect()
await self._client.start_notify(self.CHAR_UUIDS["std"], self._callback_std)
await self._client.start_notify(self.CHAR_UUIDS["aux"], self._callback_aux)
await self._client.start_notify(
self.CHAR_UUIDS["size_dist"], self._callback_size_dist
)
logger.info(f"SN{self.SERIAL_NUMBER}: Connected to {self._device.address}")
self._next_ts = int(time.time()) + 1.0
except asyncio.TimeoutError:
logger.warning(f"SN{self.SERIAL_NUMBER}: Connection timeout.")
await asyncio.sleep(30)
# self._add_old_device_data(values)
# TODO: mark as connected or old device
except BleakDeviceNotFoundError:
logger.warning(f"SN{self.SERIAL_NUMBER}: Device not found.")
await asyncio.sleep(30)
# TODO: mark as connected or old device
except Exception as e:
logger.warning(f"SN{self.SERIAL_NUMBER}: Unknown exception: {e}")
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.warning(f"SN{self.SERIAL_NUMBER}: _run task cancelled.")
except Exception as e:
logger.exception(f"SN{self.SERIAL_NUMBER}: _run task failed: {e}")
finally:
await self._disconnect_gracefully()
async def _disconnect_gracefully(self) -> None:
if not self._client.is_connected:
return
try:
await asyncio.wait_for(self._client.stop_notify(self.CHAR_UUIDS["std"]), timeout=10)
await asyncio.sleep(0.5) # wait for windows to free resources
await asyncio.wait_for(self._client.stop_notify(self.CHAR_UUIDS["aux"]), timeout=10)
await asyncio.sleep(0.5) # wait for windows to free resources
await asyncio.wait_for(
self._client.stop_notify(self.CHAR_UUIDS["size_dist"]), timeout=10
)
await asyncio.sleep(0.5) # wait for windows to free resources
except Exception as e:
logger.exception(f"SN{self.SERIAL_NUMBER}: Failed to stop notify: {e}")
try:
await asyncio.wait_for(self._client.disconnect(), timeout=10)
await asyncio.sleep(1) # wait for windows to free resources
except Exception as e:
logger.exception(f"SN{self.SERIAL_NUMBER}: Failed to disconnect: {e}")
def _disconnect_callback(self, client: BleakClient) -> None:
"""Callback on disconnect."""
logger.debug(f"SN{self.SERIAL_NUMBER}: Disconnect callback called")
def _callback_std(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
"""Callback on data received (std characteristic)."""
self._data.unix_timestamp = int(time.time()) * 1000
self._data = PartectorBleDecoderStd.decode(data, data_structure=self._data)
logger.debug(f"SN{self.SERIAL_NUMBER}: Received std: {data.hex()}")
def _callback_aux(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
"""Callback on data received (aux characteristic)."""
self._data.unix_timestamp = int(time.time() * 1000)
self._data = PartectorBleDecoderAux.decode(data, data_structure=self._data)
logger.debug(f"SN{self.SERIAL_NUMBER}: Received aux: {data.hex()}")
def _callback_size_dist(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
"""Callback on data received (size_dist characteristic)."""
self._device_type = NaneosDeviceDataPoint.DEV_TYPE_P2PRO
self._data.unix_timestamp = int(time.time() * 1000)
self._data = PartectorBleDecoderSize.decode(data, data_structure=self._data)
logger.debug(f"SN{self.SERIAL_NUMBER}: Received size: {data.hex()}")
|